deltachat/
sql.rs

1//! # SQLite wrapper.
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5
6use anyhow::{bail, Context as _, Result};
7use rusqlite::{config::DbConfig, types::ValueRef, Connection, OpenFlags, Row};
8use tokio::sync::RwLock;
9
10use crate::blob::BlobObject;
11use crate::chat::{self, add_device_msg, update_device_icon, update_saved_messages_icon};
12use crate::config::Config;
13use crate::constants::DC_CHAT_ID_TRASH;
14use crate::context::Context;
15use crate::debug_logging::set_debug_logging_xdc;
16use crate::ephemeral::start_ephemeral_timers;
17use crate::imex::BLOBS_BACKUP_NAME;
18use crate::location::delete_orphaned_poi_locations;
19use crate::log::LogExt;
20use crate::message::{Message, MsgId};
21use crate::net::dns::prune_dns_cache;
22use crate::net::http::http_cache_cleanup;
23use crate::net::prune_connection_history;
24use crate::param::{Param, Params};
25use crate::peerstate::Peerstate;
26use crate::stock_str;
27use crate::tools::{delete_file, time, SystemTime};
28
29/// Extension to [`rusqlite::ToSql`] trait
30/// which also includes [`Send`] and [`Sync`].
31pub trait ToSql: rusqlite::ToSql + Send + Sync {}
32
33impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
34
35/// Constructs a slice of trait object references `&dyn ToSql`.
36///
37/// One of the uses is passing more than 16 parameters
38/// to a query, because [`rusqlite::Params`] is only implemented
39/// for tuples of up to 16 elements.
40#[macro_export]
41macro_rules! params_slice {
42    ($($param:expr),+) => {
43        [$(&$param as &dyn $crate::sql::ToSql),+]
44    };
45}
46
47mod migrations;
48mod pool;
49
50use pool::Pool;
51
52/// A wrapper around the underlying Sqlite3 object.
53#[derive(Debug)]
54pub struct Sql {
55    /// Database file path
56    pub(crate) dbfile: PathBuf,
57
58    /// SQL connection pool.
59    pool: RwLock<Option<Pool>>,
60
61    /// None if the database is not open, true if it is open with passphrase and false if it is
62    /// open without a passphrase.
63    is_encrypted: RwLock<Option<bool>>,
64
65    /// Cache of `config` table.
66    pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
67}
68
69impl Sql {
70    /// Creates new SQL database.
71    pub fn new(dbfile: PathBuf) -> Sql {
72        Self {
73            dbfile,
74            pool: Default::default(),
75            is_encrypted: Default::default(),
76            config_cache: Default::default(),
77        }
78    }
79
80    /// Tests SQLCipher passphrase.
81    ///
82    /// Returns true if passphrase is correct, i.e. the database is new or can be unlocked with
83    /// this passphrase, and false if the database is already encrypted with another passphrase or
84    /// corrupted.
85    ///
86    /// Fails if database is already open.
87    pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
88        if self.is_open().await {
89            bail!("Database is already opened.");
90        }
91
92        // Hold the lock to prevent other thread from opening the database.
93        let _lock = self.pool.write().await;
94
95        // Test that the key is correct using a single connection.
96        let connection = Connection::open(&self.dbfile)?;
97        if !passphrase.is_empty() {
98            connection
99                .pragma_update(None, "key", &passphrase)
100                .context("Failed to set PRAGMA key")?;
101        }
102        let key_is_correct = connection
103            .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
104            .is_ok();
105
106        Ok(key_is_correct)
107    }
108
109    /// Checks if there is currently a connection to the underlying Sqlite database.
110    pub async fn is_open(&self) -> bool {
111        self.pool.read().await.is_some()
112    }
113
114    /// Returns true if the database is encrypted.
115    ///
116    /// If database is not open, returns `None`.
117    pub(crate) async fn is_encrypted(&self) -> Option<bool> {
118        *self.is_encrypted.read().await
119    }
120
121    /// Closes all underlying Sqlite connections.
122    pub(crate) async fn close(&self) {
123        let _ = self.pool.write().await.take();
124        // drop closes the connection
125    }
126
127    /// Imports the database from a separate file with the given passphrase.
128    pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
129        let path_str = path
130            .to_str()
131            .with_context(|| format!("path {path:?} is not valid unicode"))?
132            .to_string();
133
134        // Keep `config_cache` locked all the time the db is imported so that nobody can use invalid
135        // values from there. And clear it immediately so as not to forget in case of errors.
136        let mut config_cache = self.config_cache.write().await;
137        config_cache.clear();
138
139        let query_only = false;
140        self.call(query_only, move |conn| {
141            // Check that backup passphrase is correct before resetting our database.
142            conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
143                .context("failed to attach backup database")?;
144            let res = conn
145                .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
146                .context("backup passphrase is not correct");
147
148            // Reset the database without reopening it. We don't want to reopen the database because we
149            // don't have main database passphrase at this point.
150            // See <https://sqlite.org/c3ref/c_dbconfig_enable_fkey.html> for documentation.
151            // Without resetting import may fail due to existing tables.
152            res.and_then(|_| {
153                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
154                    .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
155            })
156            .and_then(|_| {
157                conn.execute("VACUUM", [])
158                    .context("failed to vacuum the database")
159            })
160            .and(
161                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
162                    .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
163            )
164            .and_then(|_| {
165                conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
166                    Ok(())
167                })
168                .context("failed to import from attached backup database")
169            })
170            .and(
171                conn.execute("DETACH DATABASE backup", [])
172                    .context("failed to detach backup database"),
173            )?;
174            Ok(())
175        })
176        .await
177    }
178
179    /// Creates a new connection pool.
180    fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
181        let mut connections = Vec::new();
182        for _ in 0..3 {
183            let connection = new_connection(dbfile, &passphrase)?;
184            connections.push(connection);
185        }
186
187        let pool = Pool::new(connections);
188        Ok(pool)
189    }
190
191    async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
192        *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
193
194        self.run_migrations(context).await?;
195
196        Ok(())
197    }
198
199    /// Updates SQL schema to the latest version.
200    pub async fn run_migrations(&self, context: &Context) -> Result<()> {
201        // (1) update low-level database structure.
202        // this should be done before updates that use high-level objects that
203        // rely themselves on the low-level structure.
204
205        let (recalc_fingerprints, update_icons, disable_server_delete, recode_avatar) =
206            migrations::run(context, self)
207                .await
208                .context("failed to run migrations")?;
209
210        // (2) updates that require high-level objects
211        // the structure is complete now and all objects are usable
212
213        if recalc_fingerprints {
214            info!(context, "[migration] recalc fingerprints");
215            let addrs = self
216                .query_map(
217                    "SELECT addr FROM acpeerstates;",
218                    (),
219                    |row| row.get::<_, String>(0),
220                    |addrs| {
221                        addrs
222                            .collect::<std::result::Result<Vec<_>, _>>()
223                            .map_err(Into::into)
224                    },
225                )
226                .await?;
227            for addr in &addrs {
228                if let Some(ref mut peerstate) = Peerstate::from_addr(context, addr).await? {
229                    peerstate.recalc_fingerprint();
230                    peerstate.save_to_db(self).await?;
231                }
232            }
233        }
234
235        if update_icons {
236            update_saved_messages_icon(context).await?;
237            update_device_icon(context).await?;
238        }
239
240        if disable_server_delete {
241            // We now always watch all folders and delete messages there if delete_server is enabled.
242            // So, for people who have delete_server enabled, disable it and add a hint to the devicechat:
243            if context.get_config_delete_server_after().await?.is_some() {
244                let mut msg = Message::new_text(stock_str::delete_server_turned_off(context).await);
245                add_device_msg(context, None, Some(&mut msg)).await?;
246                context
247                    .set_config_internal(Config::DeleteServerAfter, Some("0"))
248                    .await?;
249            }
250        }
251
252        if recode_avatar {
253            if let Some(avatar) = context.get_config(Config::Selfavatar).await? {
254                let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
255                match blob.recode_to_avatar_size(context).await {
256                    Ok(()) => {
257                        if let Some(path) = blob.to_abs_path().to_str() {
258                            context
259                                .set_config_internal(Config::Selfavatar, Some(path))
260                                .await?;
261                        } else {
262                            warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
263                        }
264                    }
265                    Err(e) => {
266                        warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
267                        context
268                            .set_config_internal(Config::Selfavatar, None)
269                            .await?
270                    }
271                }
272            }
273        }
274
275        Ok(())
276    }
277
278    /// Opens the provided database and runs any necessary migrations.
279    /// If a database is already open, this will return an error.
280    pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
281        if self.is_open().await {
282            error!(
283                context,
284                "Cannot open, database \"{:?}\" already opened.", self.dbfile,
285            );
286            bail!("SQL database is already opened.");
287        }
288
289        let passphrase_nonempty = !passphrase.is_empty();
290        if let Err(err) = self.try_open(context, &self.dbfile, passphrase).await {
291            self.close().await;
292            return Err(err);
293        }
294        info!(context, "Opened database {:?}.", self.dbfile);
295        *self.is_encrypted.write().await = Some(passphrase_nonempty);
296
297        // setup debug logging if there is an entry containing its id
298        if let Some(xdc_id) = self
299            .get_raw_config_u32(Config::DebugLogging.as_ref())
300            .await?
301        {
302            set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
303        }
304        chat::resume_securejoin_wait(context)
305            .await
306            .log_err(context)
307            .ok();
308        Ok(())
309    }
310
311    /// Changes the passphrase of encrypted database.
312    ///
313    /// The database must already be encrypted and the passphrase cannot be empty.
314    /// It is impossible to turn encrypted database into unencrypted
315    /// and vice versa this way, use import/export for this.
316    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
317        let mut lock = self.pool.write().await;
318
319        let pool = lock.take().context("SQL connection pool is not open")?;
320        let query_only = false;
321        let conn = pool.get(query_only).await?;
322        if !passphrase.is_empty() {
323            conn.pragma_update(None, "rekey", passphrase.clone())
324                .context("Failed to set PRAGMA rekey")?;
325        }
326        drop(pool);
327
328        *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
329
330        Ok(())
331    }
332
333    /// Allocates a connection and calls `function` with the connection.
334    ///
335    /// If `query_only` is true, allocates read-only connection,
336    /// otherwise allocates write connection.
337    ///
338    /// Returns the result of the function.
339    async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
340    where
341        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
342        R: Send + 'static,
343    {
344        let lock = self.pool.read().await;
345        let pool = lock.as_ref().context("no SQL connection")?;
346        let mut conn = pool.get(query_only).await?;
347        let res = tokio::task::block_in_place(move || function(&mut conn))?;
348        Ok(res)
349    }
350
351    /// Allocates a connection and calls given function, assuming it does write queries, with the
352    /// connection.
353    ///
354    /// Returns the result of the function.
355    pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
356    where
357        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
358        R: Send + 'static,
359    {
360        let query_only = false;
361        self.call(query_only, function).await
362    }
363
364    /// Execute `query` assuming it is a write query, returning the number of affected rows.
365    pub async fn execute(
366        &self,
367        query: &str,
368        params: impl rusqlite::Params + Send,
369    ) -> Result<usize> {
370        self.call_write(move |conn| {
371            let res = conn.execute(query, params)?;
372            Ok(res)
373        })
374        .await
375    }
376
377    /// Executes the given query, returning the last inserted row ID.
378    pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
379        self.call_write(move |conn| {
380            conn.execute(query, params)?;
381            Ok(conn.last_insert_rowid())
382        })
383        .await
384    }
385
386    /// Prepares and executes the statement and maps a function over the resulting rows.
387    /// Then executes the second function over the returned iterator and returns the
388    /// result of that function.
389    pub async fn query_map<T, F, G, H>(
390        &self,
391        sql: &str,
392        params: impl rusqlite::Params + Send,
393        f: F,
394        mut g: G,
395    ) -> Result<H>
396    where
397        F: Send + FnMut(&rusqlite::Row) -> rusqlite::Result<T>,
398        G: Send + FnMut(rusqlite::MappedRows<F>) -> Result<H>,
399        H: Send + 'static,
400    {
401        let query_only = true;
402        self.call(query_only, move |conn| {
403            let mut stmt = conn.prepare(sql)?;
404            let res = stmt.query_map(params, f)?;
405            g(res)
406        })
407        .await
408    }
409
410    /// Used for executing `SELECT COUNT` statements only. Returns the resulting count.
411    pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
412        let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
413        Ok(usize::try_from(count)?)
414    }
415
416    /// Used for executing `SELECT COUNT` statements only. Returns `true`, if the count is at least
417    /// one, `false` otherwise.
418    pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
419        let count = self.count(sql, params).await?;
420        Ok(count > 0)
421    }
422
423    /// Execute a query which is expected to return one row.
424    pub async fn query_row<T, F>(
425        &self,
426        query: &str,
427        params: impl rusqlite::Params + Send,
428        f: F,
429    ) -> Result<T>
430    where
431        F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
432        T: Send + 'static,
433    {
434        let query_only = true;
435        self.call(query_only, move |conn| {
436            let res = conn.query_row(query, params, f)?;
437            Ok(res)
438        })
439        .await
440    }
441
442    /// Execute the function inside a transaction assuming that it does writes.
443    ///
444    /// If the function returns an error, the transaction will be rolled back. If it does not return an
445    /// error, the transaction will be committed.
446    pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
447    where
448        H: Send + 'static,
449        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
450    {
451        let query_only = false;
452        self.transaction_ex(query_only, callback).await
453    }
454
455    /// Execute the function inside a transaction.
456    ///
457    /// * `query_only` - Whether the function only executes read statements (queries) and can be run
458    ///   in parallel with other transactions. NB: Creating and modifying temporary tables are also
459    ///   allowed with `query_only`, temporary tables aren't visible in other connections, but you
460    ///   need to pass `PRAGMA query_only=0;` to SQLite before that:
461    ///   ```text
462    ///   pragma_update(None, "query_only", "0")
463    ///   ```
464    ///   Also temporary tables need to be dropped because the connection is returned to the pool
465    ///   then.
466    ///
467    /// If the function returns an error, the transaction will be rolled back. If it does not return
468    /// an error, the transaction will be committed.
469    pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
470    where
471        H: Send + 'static,
472        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
473    {
474        self.call(query_only, move |conn| {
475            let mut transaction = conn.transaction()?;
476            let ret = callback(&mut transaction);
477
478            match ret {
479                Ok(ret) => {
480                    transaction.commit()?;
481                    Ok(ret)
482                }
483                Err(err) => {
484                    transaction.rollback()?;
485                    Err(err)
486                }
487            }
488        })
489        .await
490    }
491
492    /// Query the database if the requested table already exists.
493    pub async fn table_exists(&self, name: &str) -> Result<bool> {
494        let query_only = true;
495        self.call(query_only, move |conn| {
496            let mut exists = false;
497            conn.pragma(None, "table_info", name.to_string(), |_row| {
498                // will only be executed if the info was found
499                exists = true;
500                Ok(())
501            })?;
502
503            Ok(exists)
504        })
505        .await
506    }
507
508    /// Check if a column exists in a given table.
509    pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
510        let query_only = true;
511        self.call(query_only, move |conn| {
512            let mut exists = false;
513            // `PRAGMA table_info` returns one row per column,
514            // each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
515            conn.pragma(None, "table_info", table_name.to_string(), |row| {
516                let curr_name: String = row.get(1)?;
517                if col_name == curr_name {
518                    exists = true;
519                }
520                Ok(())
521            })?;
522
523            Ok(exists)
524        })
525        .await
526    }
527
528    /// Execute a query which is expected to return zero or one row.
529    pub async fn query_row_optional<T, F>(
530        &self,
531        sql: &str,
532        params: impl rusqlite::Params + Send,
533        f: F,
534    ) -> Result<Option<T>>
535    where
536        F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
537        T: Send + 'static,
538    {
539        let query_only = true;
540        self.call(query_only, move |conn| {
541            match conn.query_row(sql.as_ref(), params, f) {
542                Ok(res) => Ok(Some(res)),
543                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
544                Err(err) => Err(err.into()),
545            }
546        })
547        .await
548    }
549
550    /// Executes a query which is expected to return one row and one
551    /// column. If the query does not return any rows, returns `Ok(None)`.
552    pub async fn query_get_value<T>(
553        &self,
554        query: &str,
555        params: impl rusqlite::Params + Send,
556    ) -> Result<Option<T>>
557    where
558        T: rusqlite::types::FromSql + Send + 'static,
559    {
560        self.query_row_optional(query, params, |row| row.get::<_, T>(0))
561            .await
562    }
563
564    /// Set private configuration options.
565    ///
566    /// Setting `None` deletes the value.  On failure an error message
567    /// will already have been logged.
568    pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
569        let mut lock = self.config_cache.write().await;
570        if let Some(value) = value {
571            self.execute(
572                "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
573                (key, value),
574            )
575            .await?;
576        } else {
577            self.execute("DELETE FROM config WHERE keyname=?", (key,))
578                .await?;
579        }
580        lock.insert(key.to_string(), value.map(|s| s.to_string()));
581        drop(lock);
582
583        Ok(())
584    }
585
586    /// Get configuration options from the database.
587    pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
588        let lock = self.config_cache.read().await;
589        let cached = lock.get(key).cloned();
590        drop(lock);
591
592        if let Some(c) = cached {
593            return Ok(c);
594        }
595
596        let mut lock = self.config_cache.write().await;
597        let value = self
598            .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
599            .await
600            .context(format!("failed to fetch raw config: {key}"))?;
601        lock.insert(key.to_string(), value.clone());
602        drop(lock);
603
604        Ok(value)
605    }
606
607    /// Sets configuration for the given key to 32-bit signed integer value.
608    pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
609        self.set_raw_config(key, Some(&format!("{value}"))).await
610    }
611
612    /// Returns 32-bit signed integer configuration value for the given key.
613    pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
614        self.get_raw_config(key)
615            .await
616            .map(|s| s.and_then(|s| s.parse().ok()))
617    }
618
619    /// Returns 32-bit unsigned integer configuration value for the given key.
620    pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
621        self.get_raw_config(key)
622            .await
623            .map(|s| s.and_then(|s| s.parse().ok()))
624    }
625
626    /// Returns boolean configuration value for the given key.
627    pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
628        // Not the most obvious way to encode bool as string, but it is matter
629        // of backward compatibility.
630        let res = self.get_raw_config_int(key).await?;
631        Ok(res.unwrap_or_default() > 0)
632    }
633
634    /// Sets configuration for the given key to boolean value.
635    pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
636        let value = if value { Some("1") } else { None };
637        self.set_raw_config(key, value).await
638    }
639
640    /// Sets configuration for the given key to 64-bit signed integer value.
641    pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
642        self.set_raw_config(key, Some(&format!("{value}"))).await
643    }
644
645    /// Returns 64-bit signed integer configuration value for the given key.
646    pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
647        self.get_raw_config(key)
648            .await
649            .map(|s| s.and_then(|r| r.parse().ok()))
650    }
651
652    /// Returns configuration cache.
653    #[cfg(feature = "internals")]
654    pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
655        &self.config_cache
656    }
657}
658
659/// Creates a new SQLite connection.
660///
661/// `path` is the database path.
662///
663/// `passphrase` is the SQLCipher database passphrase.
664/// Empty string if database is not encrypted.
665fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
666    let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
667        | OpenFlags::SQLITE_OPEN_READ_WRITE
668        | OpenFlags::SQLITE_OPEN_CREATE;
669    let conn = Connection::open_with_flags(path, flags)?;
670    conn.execute_batch(
671        "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
672         PRAGMA secure_delete=on;
673         PRAGMA busy_timeout = 0; -- fail immediately
674         PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
675         PRAGMA foreign_keys=on;
676         ",
677    )?;
678
679    // Avoid SQLITE_IOERR_GETTEMPPATH errors on Android and maybe other systems.
680    // Downside is more RAM consumption esp. on VACUUM.
681    // Therefore, on systems known to have working default (using files), stay with that.
682    if cfg!(not(target_os = "ios")) {
683        conn.pragma_update(None, "temp_store", "memory")?;
684    }
685
686    if !passphrase.is_empty() {
687        conn.pragma_update(None, "key", passphrase)?;
688    }
689    // Try to enable auto_vacuum. This will only be
690    // applied if the database is new or after successful
691    // VACUUM, which usually happens before backup export.
692    // When auto_vacuum is INCREMENTAL, it is possible to
693    // use PRAGMA incremental_vacuum to return unused
694    // database pages to the filesystem.
695    conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
696
697    conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
698    // Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
699    conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
700
701    Ok(conn)
702}
703
704// Tries to clear the freelist to free some space on the disk.
705//
706// This only works if auto_vacuum is enabled.
707async fn incremental_vacuum(context: &Context) -> Result<()> {
708    context
709        .sql
710        .call_write(move |conn| {
711            let mut stmt = conn
712                .prepare("PRAGMA incremental_vacuum")
713                .context("Failed to prepare incremental_vacuum statement")?;
714
715            // It is important to step the statement until it returns no more rows.
716            // Otherwise it will not free as many pages as it can:
717            // <https://stackoverflow.com/questions/53746807/sqlite-incremental-vacuum-removing-only-one-free-page>.
718            let mut rows = stmt
719                .query(())
720                .context("Failed to run incremental_vacuum statement")?;
721            let mut row_count = 0;
722            while let Some(_row) = rows
723                .next()
724                .context("Failed to step incremental_vacuum statement")?
725            {
726                row_count += 1;
727            }
728            info!(context, "Incremental vacuum freed {row_count} pages.");
729            Ok(())
730        })
731        .await
732}
733
734/// Cleanup the account to restore some storage and optimize the database.
735pub async fn housekeeping(context: &Context) -> Result<()> {
736    // Setting `Config::LastHousekeeping` at the beginning avoids endless loops when things do not
737    // work out for whatever reason or are interrupted by the OS.
738    if let Err(e) = context
739        .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
740        .await
741    {
742        warn!(context, "Can't set config: {e:#}.");
743    }
744
745    http_cache_cleanup(context)
746        .await
747        .context("Failed to cleanup HTTP cache")
748        .log_err(context)
749        .ok();
750
751    if let Err(err) = remove_unused_files(context).await {
752        warn!(
753            context,
754            "Housekeeping: cannot remove unused files: {:#}.", err
755        );
756    }
757
758    if let Err(err) = start_ephemeral_timers(context).await {
759        warn!(
760            context,
761            "Housekeeping: cannot start ephemeral timers: {:#}.", err
762        );
763    }
764
765    if let Err(err) = prune_tombstones(&context.sql).await {
766        warn!(
767            context,
768            "Housekeeping: Cannot prune message tombstones: {:#}.", err
769        );
770    }
771
772    if let Err(err) = incremental_vacuum(context).await {
773        warn!(context, "Failed to run incremental vacuum: {err:#}.");
774    }
775
776    context
777        .sql
778        .execute(
779            "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
780            (SELECT id FROM msgs WHERE chat_id!=?)",
781            (DC_CHAT_ID_TRASH,),
782        )
783        .await
784        .context("failed to remove old MDNs")
785        .log_err(context)
786        .ok();
787
788    context
789        .sql
790        .execute(
791            "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
792            (SELECT id FROM msgs WHERE chat_id!=?)",
793            (DC_CHAT_ID_TRASH,),
794        )
795        .await
796        .context("failed to remove old webxdc status updates")
797        .log_err(context)
798        .ok();
799
800    prune_connection_history(context)
801        .await
802        .context("Failed to prune connection history")
803        .log_err(context)
804        .ok();
805    prune_dns_cache(context)
806        .await
807        .context("Failed to prune DNS cache")
808        .log_err(context)
809        .ok();
810
811    // Delete POI locations
812    // which don't have corresponding message.
813    delete_orphaned_poi_locations(context)
814        .await
815        .context("Failed to delete orphaned POI locations")
816        .log_err(context)
817        .ok();
818
819    info!(context, "Housekeeping done.");
820    Ok(())
821}
822
823/// Get the value of a column `idx` of the `row` as `Vec<u8>`.
824pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
825    row.get(idx).or_else(|err| match row.get_ref(idx)? {
826        ValueRef::Null => Ok(Vec::new()),
827        ValueRef::Text(text) => Ok(text.to_vec()),
828        ValueRef::Blob(blob) => Ok(blob.to_vec()),
829        ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
830    })
831}
832
833/// Enumerates used files in the blobdir and removes unused ones.
834pub async fn remove_unused_files(context: &Context) -> Result<()> {
835    let mut files_in_use = HashSet::new();
836    let mut unreferenced_count = 0;
837
838    info!(context, "Start housekeeping...");
839    maybe_add_from_param(
840        &context.sql,
841        &mut files_in_use,
842        "SELECT param FROM msgs  WHERE chat_id!=3   AND type!=10;",
843        Param::File,
844    )
845    .await?;
846    maybe_add_from_param(
847        &context.sql,
848        &mut files_in_use,
849        "SELECT param FROM chats;",
850        Param::ProfileImage,
851    )
852    .await?;
853    maybe_add_from_param(
854        &context.sql,
855        &mut files_in_use,
856        "SELECT param FROM contacts;",
857        Param::ProfileImage,
858    )
859    .await?;
860
861    context
862        .sql
863        .query_map(
864            "SELECT value FROM config;",
865            (),
866            |row| row.get::<_, String>(0),
867            |rows| {
868                for row in rows {
869                    maybe_add_file(&mut files_in_use, &row?);
870                }
871                Ok(())
872            },
873        )
874        .await
875        .context("housekeeping: failed to SELECT value FROM config")?;
876
877    context
878        .sql
879        .query_map(
880            "SELECT blobname FROM http_cache",
881            (),
882            |row| row.get::<_, String>(0),
883            |rows| {
884                for row in rows {
885                    maybe_add_file(&mut files_in_use, &row?);
886                }
887                Ok(())
888            },
889        )
890        .await
891        .context("Failed to SELECT blobname FROM http_cache")?;
892
893    info!(context, "{} files in use.", files_in_use.len());
894    /* go through directories and delete unused files */
895    let blobdir = context.get_blobdir();
896    for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
897        match tokio::fs::read_dir(p).await {
898            Ok(mut dir_handle) => {
899                /* avoid deletion of files that are just created to build a message object */
900                let diff = std::time::Duration::from_secs(60 * 60);
901                let keep_files_newer_than = SystemTime::now()
902                    .checked_sub(diff)
903                    .unwrap_or(SystemTime::UNIX_EPOCH);
904
905                while let Ok(Some(entry)) = dir_handle.next_entry().await {
906                    let name_f = entry.file_name();
907                    let name_s = name_f.to_string_lossy();
908
909                    if p == blobdir
910                        && (is_file_in_use(&files_in_use, None, &name_s)
911                            || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
912                            || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
913                    {
914                        continue;
915                    }
916
917                    let stats = match tokio::fs::metadata(entry.path()).await {
918                        Err(err) => {
919                            warn!(
920                                context,
921                                "Cannot get metadata for {}: {:#}.",
922                                entry.path().display(),
923                                err
924                            );
925                            continue;
926                        }
927                        Ok(stats) => stats,
928                    };
929
930                    if stats.is_dir() {
931                        if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
932                            // The dir could be created not by a user, but by a desktop
933                            // environment f.e. So, no warning.
934                            info!(
935                                context,
936                                "Housekeeping: Cannot rmdir {}: {:#}.",
937                                entry.path().display(),
938                                e
939                            );
940                        }
941                        continue;
942                    }
943
944                    unreferenced_count += 1;
945                    let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
946                    let recently_modified =
947                        stats.modified().is_ok_and(|t| t > keep_files_newer_than);
948                    let recently_accessed =
949                        stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
950
951                    if p == blobdir && (recently_created || recently_modified || recently_accessed)
952                    {
953                        info!(
954                            context,
955                            "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
956                            unreferenced_count,
957                            entry.file_name(),
958                        );
959                        continue;
960                    }
961
962                    info!(
963                        context,
964                        "Housekeeping: Deleting unreferenced file #{}: {:?}.",
965                        unreferenced_count,
966                        entry.file_name()
967                    );
968                    let path = entry.path();
969                    if let Err(err) = delete_file(context, &path).await {
970                        error!(
971                            context,
972                            "Failed to delete unused file {}: {:#}.",
973                            path.display(),
974                            err
975                        );
976                    }
977                }
978            }
979            Err(err) => {
980                if !p.ends_with(BLOBS_BACKUP_NAME) {
981                    warn!(
982                        context,
983                        "Housekeeping: Cannot read dir {}: {:#}.",
984                        p.display(),
985                        err
986                    );
987                }
988            }
989        }
990    }
991
992    Ok(())
993}
994
995fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
996    let name_to_check = if let Some(namespc) = namespc_opt {
997        let Some(name) = name.strip_suffix(namespc) else {
998            return false;
999        };
1000        name
1001    } else {
1002        name
1003    };
1004    files_in_use.contains(name_to_check)
1005}
1006
1007fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1008    if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1009        files_in_use.insert(file.to_string());
1010    }
1011}
1012
1013async fn maybe_add_from_param(
1014    sql: &Sql,
1015    files_in_use: &mut HashSet<String>,
1016    query: &str,
1017    param_id: Param,
1018) -> Result<()> {
1019    sql.query_map(
1020        query,
1021        (),
1022        |row| row.get::<_, String>(0),
1023        |rows| {
1024            for row in rows {
1025                let param: Params = row?.parse().unwrap_or_default();
1026                if let Some(file) = param.get(param_id) {
1027                    maybe_add_file(files_in_use, file);
1028                }
1029            }
1030            Ok(())
1031        },
1032    )
1033    .await
1034    .context(format!("housekeeping: failed to add_from_param {query}"))?;
1035
1036    Ok(())
1037}
1038
1039/// Removes from the database stale locally deleted messages that also don't
1040/// have a server UID.
1041async fn prune_tombstones(sql: &Sql) -> Result<()> {
1042    // Keep tombstones for the last two days to prevent redownloading locally deleted messages.
1043    let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1044    sql.execute(
1045        "DELETE FROM msgs
1046         WHERE chat_id=?
1047         AND timestamp<=?
1048         AND NOT EXISTS (
1049         SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1050         )",
1051        (DC_CHAT_ID_TRASH, timestamp_max),
1052    )
1053    .await?;
1054    Ok(())
1055}
1056
1057#[cfg(test)]
1058mod sql_tests;