deltachat/
sql.rs

1//! # SQLite wrapper.
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5
6use anyhow::{Context as _, Result, bail};
7use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
8use tokio::sync::RwLock;
9
10use crate::blob::BlobObject;
11use crate::chat::add_device_msg;
12use crate::config::Config;
13use crate::constants::DC_CHAT_ID_TRASH;
14use crate::context::Context;
15use crate::debug_logging::set_debug_logging_xdc;
16use crate::ephemeral::start_ephemeral_timers;
17use crate::imex::BLOBS_BACKUP_NAME;
18use crate::location::delete_orphaned_poi_locations;
19use crate::log::{LogExt, error, info, warn};
20use crate::message::{Message, MsgId};
21use crate::net::dns::prune_dns_cache;
22use crate::net::http::http_cache_cleanup;
23use crate::net::prune_connection_history;
24use crate::param::{Param, Params};
25use crate::stock_str;
26use crate::tools::{SystemTime, delete_file, time};
27
28/// Extension to [`rusqlite::ToSql`] trait
29/// which also includes [`Send`] and [`Sync`].
30pub trait ToSql: rusqlite::ToSql + Send + Sync {}
31
32impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
33
34/// Constructs a slice of trait object references `&dyn ToSql`.
35///
36/// One of the uses is passing more than 16 parameters
37/// to a query, because [`rusqlite::Params`] is only implemented
38/// for tuples of up to 16 elements.
39#[macro_export]
40macro_rules! params_slice {
41    ($($param:expr),+) => {
42        [$(&$param as &dyn $crate::sql::ToSql),+]
43    };
44}
45
46mod migrations;
47mod pool;
48
49use pool::Pool;
50
51/// A wrapper around the underlying Sqlite3 object.
52#[derive(Debug)]
53pub struct Sql {
54    /// Database file path
55    pub(crate) dbfile: PathBuf,
56
57    /// SQL connection pool.
58    pool: RwLock<Option<Pool>>,
59
60    /// None if the database is not open, true if it is open with passphrase and false if it is
61    /// open without a passphrase.
62    is_encrypted: RwLock<Option<bool>>,
63
64    /// Cache of `config` table.
65    pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
66}
67
68impl Sql {
69    /// Creates new SQL database.
70    pub fn new(dbfile: PathBuf) -> Sql {
71        Self {
72            dbfile,
73            pool: Default::default(),
74            is_encrypted: Default::default(),
75            config_cache: Default::default(),
76        }
77    }
78
79    /// Tests SQLCipher passphrase.
80    ///
81    /// Returns true if passphrase is correct, i.e. the database is new or can be unlocked with
82    /// this passphrase, and false if the database is already encrypted with another passphrase or
83    /// corrupted.
84    ///
85    /// Fails if database is already open.
86    pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
87        if self.is_open().await {
88            bail!("Database is already opened.");
89        }
90
91        // Hold the lock to prevent other thread from opening the database.
92        let _lock = self.pool.write().await;
93
94        // Test that the key is correct using a single connection.
95        let connection = Connection::open(&self.dbfile)?;
96        if !passphrase.is_empty() {
97            connection
98                .pragma_update(None, "key", &passphrase)
99                .context("Failed to set PRAGMA key")?;
100        }
101        let key_is_correct = connection
102            .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
103            .is_ok();
104
105        Ok(key_is_correct)
106    }
107
108    /// Checks if there is currently a connection to the underlying Sqlite database.
109    pub async fn is_open(&self) -> bool {
110        self.pool.read().await.is_some()
111    }
112
113    /// Returns true if the database is encrypted.
114    ///
115    /// If database is not open, returns `None`.
116    pub(crate) async fn is_encrypted(&self) -> Option<bool> {
117        *self.is_encrypted.read().await
118    }
119
120    /// Closes all underlying Sqlite connections.
121    pub(crate) async fn close(&self) {
122        let _ = self.pool.write().await.take();
123        // drop closes the connection
124    }
125
126    /// Imports the database from a separate file with the given passphrase.
127    pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
128        let path_str = path
129            .to_str()
130            .with_context(|| format!("path {path:?} is not valid unicode"))?
131            .to_string();
132
133        // Keep `config_cache` locked all the time the db is imported so that nobody can use invalid
134        // values from there. And clear it immediately so as not to forget in case of errors.
135        let mut config_cache = self.config_cache.write().await;
136        config_cache.clear();
137
138        let query_only = false;
139        self.call(query_only, move |conn| {
140            // Check that backup passphrase is correct before resetting our database.
141            conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
142                .context("failed to attach backup database")?;
143            let res = conn
144                .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
145                .context("backup passphrase is not correct");
146
147            // Reset the database without reopening it. We don't want to reopen the database because we
148            // don't have main database passphrase at this point.
149            // See <https://sqlite.org/c3ref/c_dbconfig_enable_fkey.html> for documentation.
150            // Without resetting import may fail due to existing tables.
151            res.and_then(|_| {
152                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
153                    .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
154            })
155            .and_then(|_| {
156                conn.execute("VACUUM", [])
157                    .context("failed to vacuum the database")
158            })
159            .and(
160                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
161                    .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
162            )
163            .and_then(|_| {
164                conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
165                    Ok(())
166                })
167                .context("failed to import from attached backup database")
168            })
169            .and(
170                conn.execute("DETACH DATABASE backup", [])
171                    .context("failed to detach backup database"),
172            )?;
173            Ok(())
174        })
175        .await
176    }
177
178    /// Creates a new connection pool.
179    fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
180        let mut connections = Vec::new();
181        for _ in 0..3 {
182            let connection = new_connection(dbfile, &passphrase)?;
183            connections.push(connection);
184        }
185
186        let pool = Pool::new(connections);
187        Ok(pool)
188    }
189
190    async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
191        *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
192
193        if let Err(e) = self.run_migrations(context).await {
194            error!(context, "Running migrations failed: {e:#}");
195            // Emiting an error event probably doesn't work
196            // because we are in the process of opening the context,
197            // so there is no event emitter yet.
198            // So, try to report the error in other ways:
199            eprintln!("Running migrations failed: {e:#}");
200            context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
201            // We can't simply close the db for two reasons:
202            // a. backup export would fail
203            // b. The UI would think that the account is unconfigured (because `is_configured()` fails)
204            // and remove the account when the user presses "Back"
205        }
206
207        Ok(())
208    }
209
210    /// Updates SQL schema to the latest version.
211    pub async fn run_migrations(&self, context: &Context) -> Result<()> {
212        // (1) update low-level database structure.
213        // this should be done before updates that use high-level objects that
214        // rely themselves on the low-level structure.
215
216        // `update_icons` is not used anymore, since it's not necessary anymore to "update" icons:
217        let (_update_icons, disable_server_delete, recode_avatar) = migrations::run(context, self)
218            .await
219            .context("failed to run migrations")?;
220
221        // (2) updates that require high-level objects
222        // the structure is complete now and all objects are usable
223
224        if disable_server_delete {
225            // We now always watch all folders and delete messages there if delete_server is enabled.
226            // So, for people who have delete_server enabled, disable it and add a hint to the devicechat:
227            if context.get_config_delete_server_after().await?.is_some() {
228                let mut msg = Message::new_text(stock_str::delete_server_turned_off(context).await);
229                add_device_msg(context, None, Some(&mut msg)).await?;
230                context
231                    .set_config_internal(Config::DeleteServerAfter, Some("0"))
232                    .await?;
233            }
234        }
235
236        if recode_avatar {
237            if let Some(avatar) = context.get_config(Config::Selfavatar).await? {
238                let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
239                match blob.recode_to_avatar_size(context).await {
240                    Ok(()) => {
241                        if let Some(path) = blob.to_abs_path().to_str() {
242                            context
243                                .set_config_internal(Config::Selfavatar, Some(path))
244                                .await?;
245                        } else {
246                            warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
247                        }
248                    }
249                    Err(e) => {
250                        warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
251                        context
252                            .set_config_internal(Config::Selfavatar, None)
253                            .await?
254                    }
255                }
256            }
257        }
258
259        Ok(())
260    }
261
262    /// Opens the provided database and runs any necessary migrations.
263    /// If a database is already open, this will return an error.
264    pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
265        if self.is_open().await {
266            error!(
267                context,
268                "Cannot open, database \"{:?}\" already opened.", self.dbfile,
269            );
270            bail!("SQL database is already opened.");
271        }
272
273        let passphrase_nonempty = !passphrase.is_empty();
274        self.try_open(context, &self.dbfile, passphrase).await?;
275        info!(context, "Opened database {:?}.", self.dbfile);
276        *self.is_encrypted.write().await = Some(passphrase_nonempty);
277
278        // setup debug logging if there is an entry containing its id
279        if let Some(xdc_id) = self
280            .get_raw_config_u32(Config::DebugLogging.as_ref())
281            .await?
282        {
283            set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
284        }
285        Ok(())
286    }
287
288    /// Changes the passphrase of encrypted database.
289    ///
290    /// The database must already be encrypted and the passphrase cannot be empty.
291    /// It is impossible to turn encrypted database into unencrypted
292    /// and vice versa this way, use import/export for this.
293    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
294        let mut lock = self.pool.write().await;
295
296        let pool = lock.take().context("SQL connection pool is not open")?;
297        let query_only = false;
298        let conn = pool.get(query_only).await?;
299        if !passphrase.is_empty() {
300            conn.pragma_update(None, "rekey", passphrase.clone())
301                .context("Failed to set PRAGMA rekey")?;
302        }
303        drop(pool);
304
305        *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
306
307        Ok(())
308    }
309
310    /// Allocates a connection and calls `function` with the connection.
311    ///
312    /// If `query_only` is true, allocates read-only connection,
313    /// otherwise allocates write connection.
314    ///
315    /// Returns the result of the function.
316    async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
317    where
318        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
319        R: Send + 'static,
320    {
321        let lock = self.pool.read().await;
322        let pool = lock.as_ref().context("no SQL connection")?;
323        let mut conn = pool.get(query_only).await?;
324        let res = tokio::task::block_in_place(move || function(&mut conn))?;
325        Ok(res)
326    }
327
328    /// Allocates a connection and calls given function, assuming it does write queries, with the
329    /// connection.
330    ///
331    /// Returns the result of the function.
332    pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
333    where
334        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
335        R: Send + 'static,
336    {
337        let query_only = false;
338        self.call(query_only, function).await
339    }
340
341    /// Execute `query` assuming it is a write query, returning the number of affected rows.
342    pub async fn execute(
343        &self,
344        query: &str,
345        params: impl rusqlite::Params + Send,
346    ) -> Result<usize> {
347        self.call_write(move |conn| {
348            let res = conn.execute(query, params)?;
349            Ok(res)
350        })
351        .await
352    }
353
354    /// Executes the given query, returning the last inserted row ID.
355    pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
356        self.call_write(move |conn| {
357            conn.execute(query, params)?;
358            Ok(conn.last_insert_rowid())
359        })
360        .await
361    }
362
363    /// Prepares and executes the statement and maps a function over the resulting rows.
364    /// Then executes the second function over the returned iterator and returns the
365    /// result of that function.
366    pub async fn query_map<T, F, G, H>(
367        &self,
368        sql: &str,
369        params: impl rusqlite::Params + Send,
370        f: F,
371        mut g: G,
372    ) -> Result<H>
373    where
374        F: Send + FnMut(&rusqlite::Row) -> rusqlite::Result<T>,
375        G: Send + FnMut(rusqlite::MappedRows<F>) -> Result<H>,
376        H: Send + 'static,
377    {
378        let query_only = true;
379        self.call(query_only, move |conn| {
380            let mut stmt = conn.prepare(sql)?;
381            let res = stmt.query_map(params, f)?;
382            g(res)
383        })
384        .await
385    }
386
387    /// Used for executing `SELECT COUNT` statements only. Returns the resulting count.
388    pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
389        let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
390        Ok(usize::try_from(count)?)
391    }
392
393    /// Used for executing `SELECT COUNT` statements only. Returns `true`, if the count is at least
394    /// one, `false` otherwise.
395    pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
396        let count = self.count(sql, params).await?;
397        Ok(count > 0)
398    }
399
400    /// Execute a query which is expected to return one row.
401    pub async fn query_row<T, F>(
402        &self,
403        query: &str,
404        params: impl rusqlite::Params + Send,
405        f: F,
406    ) -> Result<T>
407    where
408        F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
409        T: Send + 'static,
410    {
411        let query_only = true;
412        self.call(query_only, move |conn| {
413            let res = conn.query_row(query, params, f)?;
414            Ok(res)
415        })
416        .await
417    }
418
419    /// Execute the function inside a transaction assuming that it does writes.
420    ///
421    /// If the function returns an error, the transaction will be rolled back. If it does not return an
422    /// error, the transaction will be committed.
423    pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
424    where
425        H: Send + 'static,
426        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
427    {
428        let query_only = false;
429        self.transaction_ex(query_only, callback).await
430    }
431
432    /// Execute the function inside a transaction.
433    ///
434    /// * `query_only` - Whether the function only executes read statements (queries) and can be run
435    ///   in parallel with other transactions. NB: Creating and modifying temporary tables are also
436    ///   allowed with `query_only`, temporary tables aren't visible in other connections, but you
437    ///   need to pass `PRAGMA query_only=0;` to SQLite before that:
438    ///   ```text
439    ///   pragma_update(None, "query_only", "0")
440    ///   ```
441    ///   Also temporary tables need to be dropped because the connection is returned to the pool
442    ///   then.
443    ///
444    /// If the function returns an error, the transaction will be rolled back. If it does not return
445    /// an error, the transaction will be committed.
446    pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
447    where
448        H: Send + 'static,
449        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
450    {
451        self.call(query_only, move |conn| {
452            let mut transaction = conn.transaction()?;
453            let ret = callback(&mut transaction);
454
455            match ret {
456                Ok(ret) => {
457                    transaction.commit()?;
458                    Ok(ret)
459                }
460                Err(err) => {
461                    transaction.rollback()?;
462                    Err(err)
463                }
464            }
465        })
466        .await
467    }
468
469    /// Query the database if the requested table already exists.
470    pub async fn table_exists(&self, name: &str) -> Result<bool> {
471        let query_only = true;
472        self.call(query_only, move |conn| {
473            let mut exists = false;
474            conn.pragma(None, "table_info", name.to_string(), |_row| {
475                // will only be executed if the info was found
476                exists = true;
477                Ok(())
478            })?;
479
480            Ok(exists)
481        })
482        .await
483    }
484
485    /// Check if a column exists in a given table.
486    pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
487        let query_only = true;
488        self.call(query_only, move |conn| {
489            let mut exists = false;
490            // `PRAGMA table_info` returns one row per column,
491            // each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
492            conn.pragma(None, "table_info", table_name.to_string(), |row| {
493                let curr_name: String = row.get(1)?;
494                if col_name == curr_name {
495                    exists = true;
496                }
497                Ok(())
498            })?;
499
500            Ok(exists)
501        })
502        .await
503    }
504
505    /// Execute a query which is expected to return zero or one row.
506    pub async fn query_row_optional<T, F>(
507        &self,
508        sql: &str,
509        params: impl rusqlite::Params + Send,
510        f: F,
511    ) -> Result<Option<T>>
512    where
513        F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
514        T: Send + 'static,
515    {
516        let query_only = true;
517        self.call(query_only, move |conn| {
518            match conn.query_row(sql.as_ref(), params, f) {
519                Ok(res) => Ok(Some(res)),
520                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
521                Err(err) => Err(err.into()),
522            }
523        })
524        .await
525    }
526
527    /// Executes a query which is expected to return one row and one
528    /// column. If the query does not return any rows, returns `Ok(None)`.
529    pub async fn query_get_value<T>(
530        &self,
531        query: &str,
532        params: impl rusqlite::Params + Send,
533    ) -> Result<Option<T>>
534    where
535        T: rusqlite::types::FromSql + Send + 'static,
536    {
537        self.query_row_optional(query, params, |row| row.get::<_, T>(0))
538            .await
539    }
540
541    /// Set private configuration options.
542    ///
543    /// Setting `None` deletes the value.  On failure an error message
544    /// will already have been logged.
545    pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
546        let mut lock = self.config_cache.write().await;
547        if let Some(value) = value {
548            self.execute(
549                "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
550                (key, value),
551            )
552            .await?;
553        } else {
554            self.execute("DELETE FROM config WHERE keyname=?", (key,))
555                .await?;
556        }
557        lock.insert(key.to_string(), value.map(|s| s.to_string()));
558        drop(lock);
559
560        Ok(())
561    }
562
563    /// Get configuration options from the database.
564    pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
565        let lock = self.config_cache.read().await;
566        let cached = lock.get(key).cloned();
567        drop(lock);
568
569        if let Some(c) = cached {
570            return Ok(c);
571        }
572
573        let mut lock = self.config_cache.write().await;
574        let value = self
575            .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
576            .await
577            .context(format!("failed to fetch raw config: {key}"))?;
578        lock.insert(key.to_string(), value.clone());
579        drop(lock);
580
581        Ok(value)
582    }
583
584    /// Removes the `key`'s value from the cache.
585    pub(crate) async fn uncache_raw_config(&self, key: &str) {
586        let mut lock = self.config_cache.write().await;
587        lock.remove(key);
588    }
589
590    /// Sets configuration for the given key to 32-bit signed integer value.
591    pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
592        self.set_raw_config(key, Some(&format!("{value}"))).await
593    }
594
595    /// Returns 32-bit signed integer configuration value for the given key.
596    pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
597        self.get_raw_config(key)
598            .await
599            .map(|s| s.and_then(|s| s.parse().ok()))
600    }
601
602    /// Returns 32-bit unsigned integer configuration value for the given key.
603    pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
604        self.get_raw_config(key)
605            .await
606            .map(|s| s.and_then(|s| s.parse().ok()))
607    }
608
609    /// Returns boolean configuration value for the given key.
610    pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
611        // Not the most obvious way to encode bool as string, but it is matter
612        // of backward compatibility.
613        let res = self.get_raw_config_int(key).await?;
614        Ok(res.unwrap_or_default() > 0)
615    }
616
617    /// Sets configuration for the given key to boolean value.
618    pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
619        let value = if value { Some("1") } else { None };
620        self.set_raw_config(key, value).await
621    }
622
623    /// Sets configuration for the given key to 64-bit signed integer value.
624    pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
625        self.set_raw_config(key, Some(&format!("{value}"))).await
626    }
627
628    /// Returns 64-bit signed integer configuration value for the given key.
629    pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
630        self.get_raw_config(key)
631            .await
632            .map(|s| s.and_then(|r| r.parse().ok()))
633    }
634
635    /// Returns configuration cache.
636    #[cfg(feature = "internals")]
637    pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
638        &self.config_cache
639    }
640}
641
642/// Creates a new SQLite connection.
643///
644/// `path` is the database path.
645///
646/// `passphrase` is the SQLCipher database passphrase.
647/// Empty string if database is not encrypted.
648fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
649    let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
650        | OpenFlags::SQLITE_OPEN_READ_WRITE
651        | OpenFlags::SQLITE_OPEN_CREATE;
652    let conn = Connection::open_with_flags(path, flags)?;
653    conn.execute_batch(
654        "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
655         PRAGMA secure_delete=on;
656         PRAGMA busy_timeout = 0; -- fail immediately
657         PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
658         PRAGMA foreign_keys=on;
659         ",
660    )?;
661
662    // Avoid SQLITE_IOERR_GETTEMPPATH errors on Android and maybe other systems.
663    // Downside is more RAM consumption esp. on VACUUM.
664    // Therefore, on systems known to have working default (using files), stay with that.
665    if cfg!(not(target_os = "ios")) {
666        conn.pragma_update(None, "temp_store", "memory")?;
667    }
668
669    if !passphrase.is_empty() {
670        conn.pragma_update(None, "key", passphrase)?;
671    }
672    // Try to enable auto_vacuum. This will only be
673    // applied if the database is new or after successful
674    // VACUUM, which usually happens before backup export.
675    // When auto_vacuum is INCREMENTAL, it is possible to
676    // use PRAGMA incremental_vacuum to return unused
677    // database pages to the filesystem.
678    conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
679
680    conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
681    // Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
682    conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
683
684    Ok(conn)
685}
686
687// Tries to clear the freelist to free some space on the disk.
688//
689// This only works if auto_vacuum is enabled.
690async fn incremental_vacuum(context: &Context) -> Result<()> {
691    context
692        .sql
693        .call_write(move |conn| {
694            let mut stmt = conn
695                .prepare("PRAGMA incremental_vacuum")
696                .context("Failed to prepare incremental_vacuum statement")?;
697
698            // It is important to step the statement until it returns no more rows.
699            // Otherwise it will not free as many pages as it can:
700            // <https://stackoverflow.com/questions/53746807/sqlite-incremental-vacuum-removing-only-one-free-page>.
701            let mut rows = stmt
702                .query(())
703                .context("Failed to run incremental_vacuum statement")?;
704            let mut row_count = 0;
705            while let Some(_row) = rows
706                .next()
707                .context("Failed to step incremental_vacuum statement")?
708            {
709                row_count += 1;
710            }
711            info!(context, "Incremental vacuum freed {row_count} pages.");
712            Ok(())
713        })
714        .await
715}
716
717/// Cleanup the account to restore some storage and optimize the database.
718pub async fn housekeeping(context: &Context) -> Result<()> {
719    // Setting `Config::LastHousekeeping` at the beginning avoids endless loops when things do not
720    // work out for whatever reason or are interrupted by the OS.
721    if let Err(e) = context
722        .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
723        .await
724    {
725        warn!(context, "Can't set config: {e:#}.");
726    }
727
728    http_cache_cleanup(context)
729        .await
730        .context("Failed to cleanup HTTP cache")
731        .log_err(context)
732        .ok();
733    migrations::msgs_to_key_contacts(context)
734        .await
735        .context("migrations::msgs_to_key_contacts")
736        .log_err(context)
737        .ok();
738
739    if let Err(err) = remove_unused_files(context).await {
740        warn!(
741            context,
742            "Housekeeping: cannot remove unused files: {:#}.", err
743        );
744    }
745
746    if let Err(err) = start_ephemeral_timers(context).await {
747        warn!(
748            context,
749            "Housekeeping: cannot start ephemeral timers: {:#}.", err
750        );
751    }
752
753    if let Err(err) = prune_tombstones(&context.sql).await {
754        warn!(
755            context,
756            "Housekeeping: Cannot prune message tombstones: {:#}.", err
757        );
758    }
759
760    if let Err(err) = incremental_vacuum(context).await {
761        warn!(context, "Failed to run incremental vacuum: {err:#}.");
762    }
763
764    context
765        .sql
766        .execute(
767            "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
768            (SELECT id FROM msgs WHERE chat_id!=?)",
769            (DC_CHAT_ID_TRASH,),
770        )
771        .await
772        .context("failed to remove old MDNs")
773        .log_err(context)
774        .ok();
775
776    context
777        .sql
778        .execute(
779            "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
780            (SELECT id FROM msgs WHERE chat_id!=?)",
781            (DC_CHAT_ID_TRASH,),
782        )
783        .await
784        .context("failed to remove old webxdc status updates")
785        .log_err(context)
786        .ok();
787
788    prune_connection_history(context)
789        .await
790        .context("Failed to prune connection history")
791        .log_err(context)
792        .ok();
793    prune_dns_cache(context)
794        .await
795        .context("Failed to prune DNS cache")
796        .log_err(context)
797        .ok();
798
799    // Delete POI locations
800    // which don't have corresponding message.
801    delete_orphaned_poi_locations(context)
802        .await
803        .context("Failed to delete orphaned POI locations")
804        .log_err(context)
805        .ok();
806
807    info!(context, "Housekeeping done.");
808    Ok(())
809}
810
811/// Get the value of a column `idx` of the `row` as `Vec<u8>`.
812pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
813    row.get(idx).or_else(|err| match row.get_ref(idx)? {
814        ValueRef::Null => Ok(Vec::new()),
815        ValueRef::Text(text) => Ok(text.to_vec()),
816        ValueRef::Blob(blob) => Ok(blob.to_vec()),
817        ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
818    })
819}
820
821/// Enumerates used files in the blobdir and removes unused ones.
822pub async fn remove_unused_files(context: &Context) -> Result<()> {
823    let mut files_in_use = HashSet::new();
824    let mut unreferenced_count = 0;
825
826    info!(context, "Start housekeeping...");
827    maybe_add_from_param(
828        &context.sql,
829        &mut files_in_use,
830        "SELECT param FROM msgs  WHERE chat_id!=3   AND type!=10;",
831        Param::File,
832    )
833    .await?;
834    maybe_add_from_param(
835        &context.sql,
836        &mut files_in_use,
837        "SELECT param FROM chats;",
838        Param::ProfileImage,
839    )
840    .await?;
841    maybe_add_from_param(
842        &context.sql,
843        &mut files_in_use,
844        "SELECT param FROM contacts;",
845        Param::ProfileImage,
846    )
847    .await?;
848
849    context
850        .sql
851        .query_map(
852            "SELECT value FROM config;",
853            (),
854            |row| row.get::<_, String>(0),
855            |rows| {
856                for row in rows {
857                    maybe_add_file(&mut files_in_use, &row?);
858                }
859                Ok(())
860            },
861        )
862        .await
863        .context("housekeeping: failed to SELECT value FROM config")?;
864
865    context
866        .sql
867        .query_map(
868            "SELECT blobname FROM http_cache",
869            (),
870            |row| row.get::<_, String>(0),
871            |rows| {
872                for row in rows {
873                    maybe_add_file(&mut files_in_use, &row?);
874                }
875                Ok(())
876            },
877        )
878        .await
879        .context("Failed to SELECT blobname FROM http_cache")?;
880
881    info!(context, "{} files in use.", files_in_use.len());
882    /* go through directories and delete unused files */
883    let blobdir = context.get_blobdir();
884    for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
885        match tokio::fs::read_dir(p).await {
886            Ok(mut dir_handle) => {
887                /* avoid deletion of files that are just created to build a message object */
888                let diff = std::time::Duration::from_secs(60 * 60);
889                let keep_files_newer_than = SystemTime::now()
890                    .checked_sub(diff)
891                    .unwrap_or(SystemTime::UNIX_EPOCH);
892
893                while let Ok(Some(entry)) = dir_handle.next_entry().await {
894                    let name_f = entry.file_name();
895                    let name_s = name_f.to_string_lossy();
896
897                    if p == blobdir
898                        && (is_file_in_use(&files_in_use, None, &name_s)
899                            || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
900                            || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
901                    {
902                        continue;
903                    }
904
905                    let stats = match tokio::fs::metadata(entry.path()).await {
906                        Err(err) => {
907                            warn!(
908                                context,
909                                "Cannot get metadata for {}: {:#}.",
910                                entry.path().display(),
911                                err
912                            );
913                            continue;
914                        }
915                        Ok(stats) => stats,
916                    };
917
918                    if stats.is_dir() {
919                        if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
920                            // The dir could be created not by a user, but by a desktop
921                            // environment f.e. So, no warning.
922                            info!(
923                                context,
924                                "Housekeeping: Cannot rmdir {}: {:#}.",
925                                entry.path().display(),
926                                e
927                            );
928                        }
929                        continue;
930                    }
931
932                    unreferenced_count += 1;
933                    let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
934                    let recently_modified =
935                        stats.modified().is_ok_and(|t| t > keep_files_newer_than);
936                    let recently_accessed =
937                        stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
938
939                    if p == blobdir && (recently_created || recently_modified || recently_accessed)
940                    {
941                        info!(
942                            context,
943                            "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
944                            unreferenced_count,
945                            entry.file_name(),
946                        );
947                        continue;
948                    }
949
950                    info!(
951                        context,
952                        "Housekeeping: Deleting unreferenced file #{}: {:?}.",
953                        unreferenced_count,
954                        entry.file_name()
955                    );
956                    let path = entry.path();
957                    if let Err(err) = delete_file(context, &path).await {
958                        error!(
959                            context,
960                            "Failed to delete unused file {}: {:#}.",
961                            path.display(),
962                            err
963                        );
964                    }
965                }
966            }
967            Err(err) => {
968                if !p.ends_with(BLOBS_BACKUP_NAME) {
969                    warn!(
970                        context,
971                        "Housekeeping: Cannot read dir {}: {:#}.",
972                        p.display(),
973                        err
974                    );
975                }
976            }
977        }
978    }
979
980    Ok(())
981}
982
983fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
984    let name_to_check = if let Some(namespc) = namespc_opt {
985        let Some(name) = name.strip_suffix(namespc) else {
986            return false;
987        };
988        name
989    } else {
990        name
991    };
992    files_in_use.contains(name_to_check)
993}
994
995fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
996    if let Some(file) = file.strip_prefix("$BLOBDIR/") {
997        files_in_use.insert(file.to_string());
998    }
999}
1000
1001async fn maybe_add_from_param(
1002    sql: &Sql,
1003    files_in_use: &mut HashSet<String>,
1004    query: &str,
1005    param_id: Param,
1006) -> Result<()> {
1007    sql.query_map(
1008        query,
1009        (),
1010        |row| row.get::<_, String>(0),
1011        |rows| {
1012            for row in rows {
1013                let param: Params = row?.parse().unwrap_or_default();
1014                if let Some(file) = param.get(param_id) {
1015                    maybe_add_file(files_in_use, file);
1016                }
1017            }
1018            Ok(())
1019        },
1020    )
1021    .await
1022    .context(format!("housekeeping: failed to add_from_param {query}"))?;
1023
1024    Ok(())
1025}
1026
1027/// Removes from the database stale locally deleted messages that also don't
1028/// have a server UID.
1029async fn prune_tombstones(sql: &Sql) -> Result<()> {
1030    // Keep tombstones for the last two days to prevent redownloading locally deleted messages.
1031    let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1032    sql.execute(
1033        "DELETE FROM msgs
1034         WHERE chat_id=?
1035         AND timestamp<=?
1036         AND NOT EXISTS (
1037         SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1038         )",
1039        (DC_CHAT_ID_TRASH, timestamp_max),
1040    )
1041    .await?;
1042    Ok(())
1043}
1044
1045#[cfg(test)]
1046mod sql_tests;