deltachat/
sql.rs

1//! # SQLite wrapper.
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, bail, ensure};
8use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
9use tokio::sync::RwLock;
10
11use crate::blob::BlobObject;
12use crate::config::Config;
13use crate::constants::DC_CHAT_ID_TRASH;
14use crate::context::Context;
15use crate::debug_logging::set_debug_logging_xdc;
16use crate::ephemeral::start_ephemeral_timers;
17use crate::imex::BLOBS_BACKUP_NAME;
18use crate::location::delete_orphaned_poi_locations;
19use crate::log::{LogExt, warn};
20use crate::message::MsgId;
21use crate::net::dns::prune_dns_cache;
22use crate::net::http::http_cache_cleanup;
23use crate::net::prune_connection_history;
24use crate::param::{Param, Params};
25use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
26
27/// Extension to [`rusqlite::ToSql`] trait
28/// which also includes [`Send`] and [`Sync`].
29pub trait ToSql: rusqlite::ToSql + Send + Sync {}
30
31impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
32
33/// Constructs a slice of trait object references `&dyn ToSql`.
34///
35/// One of the uses is passing more than 16 parameters
36/// to a query, because [`rusqlite::Params`] is only implemented
37/// for tuples of up to 16 elements.
38#[macro_export]
39macro_rules! params_slice {
40    ($($param:expr),+) => {
41        [$(&$param as &dyn $crate::sql::ToSql),+]
42    };
43}
44
45mod migrations;
46mod pool;
47
48use pool::Pool;
49
50/// A wrapper around the underlying Sqlite3 object.
51#[derive(Debug)]
52pub struct Sql {
53    /// Database file path
54    pub(crate) dbfile: PathBuf,
55
56    /// SQL connection pool.
57    pool: RwLock<Option<Pool>>,
58
59    /// None if the database is not open, true if it is open with passphrase and false if it is
60    /// open without a passphrase.
61    is_encrypted: RwLock<Option<bool>>,
62
63    /// Cache of `config` table.
64    pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
65}
66
67impl Sql {
68    /// Creates new SQL database.
69    pub fn new(dbfile: PathBuf) -> Sql {
70        Self {
71            dbfile,
72            pool: Default::default(),
73            is_encrypted: Default::default(),
74            config_cache: Default::default(),
75        }
76    }
77
78    /// Tests SQLCipher passphrase.
79    ///
80    /// Returns true if passphrase is correct, i.e. the database is new or can be unlocked with
81    /// this passphrase, and false if the database is already encrypted with another passphrase or
82    /// corrupted.
83    ///
84    /// Fails if database is already open.
85    pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
86        if self.is_open().await {
87            bail!("Database is already opened.");
88        }
89
90        // Hold the lock to prevent other thread from opening the database.
91        let _lock = self.pool.write().await;
92
93        // Test that the key is correct using a single connection.
94        let connection = Connection::open(&self.dbfile)?;
95        if !passphrase.is_empty() {
96            connection
97                .pragma_update(None, "key", &passphrase)
98                .context("Failed to set PRAGMA key")?;
99        }
100        let key_is_correct = connection
101            .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
102            .is_ok();
103
104        Ok(key_is_correct)
105    }
106
107    /// Checks if there is currently a connection to the underlying Sqlite database.
108    pub async fn is_open(&self) -> bool {
109        self.pool.read().await.is_some()
110    }
111
112    /// Returns true if the database is encrypted.
113    ///
114    /// If database is not open, returns `None`.
115    pub(crate) async fn is_encrypted(&self) -> Option<bool> {
116        *self.is_encrypted.read().await
117    }
118
119    /// Closes all underlying Sqlite connections.
120    pub(crate) async fn close(&self) {
121        let _ = self.pool.write().await.take();
122        // drop closes the connection
123    }
124
125    /// Imports the database from a separate file with the given passphrase.
126    pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
127        let path_str = path
128            .to_str()
129            .with_context(|| format!("path {path:?} is not valid unicode"))?
130            .to_string();
131
132        // Keep `config_cache` locked all the time the db is imported so that nobody can use invalid
133        // values from there. And clear it immediately so as not to forget in case of errors.
134        let mut config_cache = self.config_cache.write().await;
135        config_cache.clear();
136
137        let query_only = false;
138        self.call(query_only, move |conn| {
139            // Check that backup passphrase is correct before resetting our database.
140            conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
141                .context("failed to attach backup database")?;
142            let res = conn
143                .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
144                .context("backup passphrase is not correct");
145
146            // Reset the database without reopening it. We don't want to reopen the database because we
147            // don't have main database passphrase at this point.
148            // See <https://sqlite.org/c3ref/c_dbconfig_enable_fkey.html> for documentation.
149            // Without resetting import may fail due to existing tables.
150            res.and_then(|_| {
151                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
152                    .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
153            })
154            .and_then(|_| {
155                conn.execute("VACUUM", [])
156                    .context("failed to vacuum the database")
157            })
158            .and(
159                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
160                    .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
161            )
162            .and_then(|_| {
163                conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
164                    Ok(())
165                })
166                .context("failed to import from attached backup database")
167            })
168            .and(
169                conn.execute("DETACH DATABASE backup", [])
170                    .context("failed to detach backup database"),
171            )?;
172            Ok(())
173        })
174        .await
175    }
176
177    const N_DB_CONNECTIONS: usize = 3;
178
179    /// Creates a new connection pool.
180    fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
181        let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
182        for _ in 0..Self::N_DB_CONNECTIONS {
183            let connection = new_connection(dbfile, &passphrase)?;
184            connections.push(connection);
185        }
186
187        let pool = Pool::new(connections);
188        Ok(pool)
189    }
190
191    async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
192        *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
193
194        if let Err(e) = self.run_migrations(context).await {
195            error!(context, "Running migrations failed: {e:#}");
196            // Emiting an error event probably doesn't work
197            // because we are in the process of opening the context,
198            // so there is no event emitter yet.
199            // So, try to report the error in other ways:
200            eprintln!("Running migrations failed: {e:#}");
201            context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
202            // We can't simply close the db for two reasons:
203            // a. backup export would fail
204            // b. The UI would think that the account is unconfigured (because `is_configured()` fails)
205            // and remove the account when the user presses "Back"
206        }
207
208        Ok(())
209    }
210
211    /// Updates SQL schema to the latest version.
212    pub async fn run_migrations(&self, context: &Context) -> Result<()> {
213        // (1) update low-level database structure.
214        // this should be done before updates that use high-level objects that
215        // rely themselves on the low-level structure.
216
217        let recode_avatar = migrations::run(context, self)
218            .await
219            .context("failed to run migrations")?;
220
221        // (2) updates that require high-level objects
222        // the structure is complete now and all objects are usable
223
224        if recode_avatar && let Some(avatar) = context.get_config(Config::Selfavatar).await? {
225            let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
226            match blob.recode_to_avatar_size(context).await {
227                Ok(()) => {
228                    if let Some(path) = blob.to_abs_path().to_str() {
229                        context
230                            .set_config_internal(Config::Selfavatar, Some(path))
231                            .await?;
232                    } else {
233                        warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
234                    }
235                }
236                Err(e) => {
237                    warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
238                    context
239                        .set_config_internal(Config::Selfavatar, None)
240                        .await?
241                }
242            }
243        }
244
245        Ok(())
246    }
247
248    /// Opens the provided database and runs any necessary migrations.
249    /// If a database is already open, this will return an error.
250    pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
251        if self.is_open().await {
252            error!(
253                context,
254                "Cannot open, database \"{:?}\" already opened.", self.dbfile,
255            );
256            bail!("SQL database is already opened.");
257        }
258
259        let passphrase_nonempty = !passphrase.is_empty();
260        self.try_open(context, &self.dbfile, passphrase).await?;
261        info!(context, "Opened database {:?}.", self.dbfile);
262        *self.is_encrypted.write().await = Some(passphrase_nonempty);
263
264        // setup debug logging if there is an entry containing its id
265        if let Some(xdc_id) = self
266            .get_raw_config_u32(Config::DebugLogging.as_ref())
267            .await?
268        {
269            set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
270        }
271        Ok(())
272    }
273
274    /// Changes the passphrase of encrypted database.
275    ///
276    /// The database must already be encrypted and the passphrase cannot be empty.
277    /// It is impossible to turn encrypted database into unencrypted
278    /// and vice versa this way, use import/export for this.
279    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
280        let mut lock = self.pool.write().await;
281
282        let pool = lock.take().context("SQL connection pool is not open")?;
283        let query_only = false;
284        let conn = pool.get(query_only).await?;
285        if !passphrase.is_empty() {
286            conn.pragma_update(None, "rekey", passphrase.clone())
287                .context("Failed to set PRAGMA rekey")?;
288        }
289        drop(pool);
290
291        *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
292
293        Ok(())
294    }
295
296    /// Allocates a connection and calls `function` with the connection.
297    ///
298    /// If `query_only` is true, allocates read-only connection,
299    /// otherwise allocates write connection.
300    ///
301    /// Returns the result of the function.
302    async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
303    where
304        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
305        R: Send + 'static,
306    {
307        let lock = self.pool.read().await;
308        let pool = lock.as_ref().context("no SQL connection")?;
309        let mut conn = pool.get(query_only).await?;
310        let res = tokio::task::block_in_place(move || function(&mut conn))?;
311        Ok(res)
312    }
313
314    /// Allocates a connection and calls given function, assuming it does write queries, with the
315    /// connection.
316    ///
317    /// Returns the result of the function.
318    pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
319    where
320        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
321        R: Send + 'static,
322    {
323        let query_only = false;
324        self.call(query_only, function).await
325    }
326
327    /// Execute `query` assuming it is a write query, returning the number of affected rows.
328    pub async fn execute(
329        &self,
330        query: &str,
331        params: impl rusqlite::Params + Send,
332    ) -> Result<usize> {
333        self.call_write(move |conn| {
334            let res = conn.execute(query, params)?;
335            Ok(res)
336        })
337        .await
338    }
339
340    /// Executes the given query, returning the last inserted row ID.
341    pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
342        self.call_write(move |conn| {
343            conn.execute(query, params)?;
344            Ok(conn.last_insert_rowid())
345        })
346        .await
347    }
348
349    /// Prepares and executes the statement and maps a function over the resulting rows.
350    /// Then executes the second function over the returned iterator and returns the
351    /// result of that function.
352    pub async fn query_map<T, F, G, H>(
353        &self,
354        sql: &str,
355        params: impl rusqlite::Params + Send,
356        f: F,
357        g: G,
358    ) -> Result<H>
359    where
360        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
361        G: Send + FnOnce(rusqlite::AndThenRows<F>) -> Result<H>,
362        H: Send + 'static,
363    {
364        let query_only = true;
365        self.call(query_only, move |conn| {
366            let mut stmt = conn.prepare(sql)?;
367            let res = stmt.query_and_then(params, f)?;
368            g(res)
369        })
370        .await
371    }
372
373    /// Prepares and executes the statement and maps a function over the resulting rows.
374    ///
375    /// Collects the resulting rows into a generic structure.
376    pub async fn query_map_collect<T, C, F>(
377        &self,
378        sql: &str,
379        params: impl rusqlite::Params + Send,
380        f: F,
381    ) -> Result<C>
382    where
383        T: Send + 'static,
384        C: Send + 'static + std::iter::FromIterator<T>,
385        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
386    {
387        self.query_map(sql, params, f, |rows| {
388            rows.collect::<std::result::Result<C, _>>()
389        })
390        .await
391    }
392
393    /// Prepares and executes the statement and maps a function over the resulting rows.
394    ///
395    /// Collects the resulting rows into a `Vec`.
396    pub async fn query_map_vec<T, F>(
397        &self,
398        sql: &str,
399        params: impl rusqlite::Params + Send,
400        f: F,
401    ) -> Result<Vec<T>>
402    where
403        T: Send + 'static,
404        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
405    {
406        self.query_map_collect(sql, params, f).await
407    }
408
409    /// Used for executing `SELECT COUNT` statements only. Returns the resulting count.
410    pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
411        let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
412        Ok(usize::try_from(count)?)
413    }
414
415    /// Used for executing `SELECT COUNT` statements only. Returns `true`, if the count is at least
416    /// one, `false` otherwise.
417    pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
418        let count = self.count(sql, params).await?;
419        Ok(count > 0)
420    }
421
422    /// Execute a query which is expected to return one row.
423    pub async fn query_row<T, F>(
424        &self,
425        query: &str,
426        params: impl rusqlite::Params + Send,
427        f: F,
428    ) -> Result<T>
429    where
430        F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
431        T: Send + 'static,
432    {
433        let query_only = true;
434        self.call(query_only, move |conn| {
435            let res = conn.query_row(query, params, f)?;
436            Ok(res)
437        })
438        .await
439    }
440
441    /// Execute the function inside a transaction assuming that it does writes.
442    ///
443    /// If the function returns an error, the transaction will be rolled back. If it does not return an
444    /// error, the transaction will be committed.
445    pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
446    where
447        H: Send + 'static,
448        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
449    {
450        let query_only = false;
451        self.transaction_ex(query_only, callback).await
452    }
453
454    /// Execute the function inside a transaction.
455    ///
456    /// * `query_only` - Whether the function only executes read statements (queries) and can be run
457    ///   in parallel with other transactions. NB: Creating and modifying temporary tables are also
458    ///   allowed with `query_only`, temporary tables aren't visible in other connections, but you
459    ///   need to pass `PRAGMA query_only=0;` to SQLite before that:
460    ///   ```text
461    ///   pragma_update(None, "query_only", "0")
462    ///   ```
463    ///   Also temporary tables need to be dropped because the connection is returned to the pool
464    ///   then.
465    ///
466    /// If the function returns an error, the transaction will be rolled back. If it does not return
467    /// an error, the transaction will be committed.
468    pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
469    where
470        H: Send + 'static,
471        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
472    {
473        self.call(query_only, move |conn| {
474            let mut transaction = conn.transaction()?;
475            let ret = callback(&mut transaction);
476
477            match ret {
478                Ok(ret) => {
479                    transaction.commit()?;
480                    Ok(ret)
481                }
482                Err(err) => {
483                    transaction.rollback()?;
484                    Err(err)
485                }
486            }
487        })
488        .await
489    }
490
491    /// Query the database if the requested table already exists.
492    pub async fn table_exists(&self, name: &str) -> Result<bool> {
493        let query_only = true;
494        self.call(query_only, move |conn| {
495            let mut exists = false;
496            conn.pragma(None, "table_info", name.to_string(), |_row| {
497                // will only be executed if the info was found
498                exists = true;
499                Ok(())
500            })?;
501
502            Ok(exists)
503        })
504        .await
505    }
506
507    /// Check if a column exists in a given table.
508    pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
509        let query_only = true;
510        self.call(query_only, move |conn| {
511            let mut exists = false;
512            // `PRAGMA table_info` returns one row per column,
513            // each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
514            conn.pragma(None, "table_info", table_name.to_string(), |row| {
515                let curr_name: String = row.get(1)?;
516                if col_name == curr_name {
517                    exists = true;
518                }
519                Ok(())
520            })?;
521
522            Ok(exists)
523        })
524        .await
525    }
526
527    /// Execute a query which is expected to return zero or one row.
528    pub async fn query_row_optional<T, F>(
529        &self,
530        sql: &str,
531        params: impl rusqlite::Params + Send,
532        f: F,
533    ) -> Result<Option<T>>
534    where
535        F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
536        T: Send + 'static,
537    {
538        let query_only = true;
539        self.call(query_only, move |conn| {
540            match conn.query_row(sql.as_ref(), params, f) {
541                Ok(res) => Ok(Some(res)),
542                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
543                Err(err) => Err(err.into()),
544            }
545        })
546        .await
547    }
548
549    /// Executes a query which is expected to return one row and one
550    /// column. If the query does not return any rows, returns `Ok(None)`.
551    pub async fn query_get_value<T>(
552        &self,
553        query: &str,
554        params: impl rusqlite::Params + Send,
555    ) -> Result<Option<T>>
556    where
557        T: rusqlite::types::FromSql + Send + 'static,
558    {
559        self.query_row_optional(query, params, |row| row.get::<_, T>(0))
560            .await
561    }
562
563    /// Set private configuration options.
564    ///
565    /// Setting `None` deletes the value.  On failure an error message
566    /// will already have been logged.
567    pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
568        let mut lock = self.config_cache.write().await;
569        if let Some(value) = value {
570            self.execute(
571                "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
572                (key, value),
573            )
574            .await?;
575        } else {
576            self.execute("DELETE FROM config WHERE keyname=?", (key,))
577                .await?;
578        }
579        lock.insert(key.to_string(), value.map(|s| s.to_string()));
580        drop(lock);
581
582        Ok(())
583    }
584
585    /// Get configuration options from the database.
586    pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
587        let lock = self.config_cache.read().await;
588        let cached = lock.get(key).cloned();
589        drop(lock);
590
591        if let Some(c) = cached {
592            return Ok(c);
593        }
594
595        let mut lock = self.config_cache.write().await;
596        let value = self
597            .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
598            .await
599            .context(format!("failed to fetch raw config: {key}"))?;
600        lock.insert(key.to_string(), value.clone());
601        drop(lock);
602
603        Ok(value)
604    }
605
606    /// Removes the `key`'s value from the cache.
607    pub(crate) async fn uncache_raw_config(&self, key: &str) {
608        let mut lock = self.config_cache.write().await;
609        lock.remove(key);
610    }
611
612    /// Sets configuration for the given key to 32-bit signed integer value.
613    pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
614        self.set_raw_config(key, Some(&format!("{value}"))).await
615    }
616
617    /// Returns 32-bit signed integer configuration value for the given key.
618    pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
619        self.get_raw_config(key)
620            .await
621            .map(|s| s.and_then(|s| s.parse().ok()))
622    }
623
624    /// Returns 32-bit unsigned integer configuration value for the given key.
625    pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
626        self.get_raw_config(key)
627            .await
628            .map(|s| s.and_then(|s| s.parse().ok()))
629    }
630
631    /// Returns boolean configuration value for the given key.
632    pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
633        // Not the most obvious way to encode bool as string, but it is matter
634        // of backward compatibility.
635        let res = self.get_raw_config_int(key).await?;
636        Ok(res.unwrap_or_default() > 0)
637    }
638
639    /// Sets configuration for the given key to boolean value.
640    pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
641        let value = if value { Some("1") } else { None };
642        self.set_raw_config(key, value).await
643    }
644
645    /// Sets configuration for the given key to 64-bit signed integer value.
646    pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
647        self.set_raw_config(key, Some(&format!("{value}"))).await
648    }
649
650    /// Returns 64-bit signed integer configuration value for the given key.
651    pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
652        self.get_raw_config(key)
653            .await
654            .map(|s| s.and_then(|r| r.parse().ok()))
655    }
656
657    /// Returns configuration cache.
658    #[cfg(feature = "internals")]
659    pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
660        &self.config_cache
661    }
662
663    /// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes.
664    pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
665        let t_start = Time::now();
666        let lock = context.sql.pool.read().await;
667        let Some(pool) = lock.as_ref() else {
668            // No db connections, nothing to checkpoint.
669            return Ok(());
670        };
671
672        // Do as much work as possible without blocking anybody.
673        let query_only = true;
674        let conn = pool.get(query_only).await?;
675        tokio::task::block_in_place(|| {
676            // Execute some transaction causing the WAL file to be opened so that the
677            // `wal_checkpoint()` can proceed, otherwise it fails when called the first time,
678            // see https://sqlite.org/forum/forumpost/7512d76a05268fc8.
679            conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
680            conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
681        })?;
682
683        // Kick out writers.
684        const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
685        let _write_lock = pool.write_lock().await;
686        let t_writers_blocked = Time::now();
687        // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so
688        // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's
689        // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and
690        // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new
691        // readers.
692        let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
693        for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
694            read_conns.push(pool.get(query_only).await?);
695        }
696        read_conns.clear();
697        // Checkpoint the remaining WAL pages without blocking readers.
698        let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
699            conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
700                let pages_total: i64 = row.get(1)?;
701                let pages_checkpointed: i64 = row.get(2)?;
702                Ok((pages_total, pages_checkpointed))
703            })
704        })?;
705        if pages_checkpointed < pages_total {
706            warn!(
707                context,
708                "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
709            );
710        }
711        // Kick out readers to avoid blocking/SQLITE_BUSY.
712        for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
713            read_conns.push(pool.get(query_only).await?);
714        }
715        let t_readers_blocked = Time::now();
716        tokio::task::block_in_place(|| {
717            let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
718                let blocked: i64 = row.get(0)?;
719                Ok(blocked)
720            })?;
721            ensure!(blocked == 0);
722            Ok(())
723        })?;
724        info!(
725            context,
726            "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
727            time_elapsed(&t_start),
728            time_elapsed(&t_writers_blocked),
729            time_elapsed(&t_readers_blocked),
730        );
731        Ok(())
732    }
733}
734
735/// Creates a new SQLite connection.
736///
737/// `path` is the database path.
738///
739/// `passphrase` is the SQLCipher database passphrase.
740/// Empty string if database is not encrypted.
741fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
742    let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
743        | OpenFlags::SQLITE_OPEN_READ_WRITE
744        | OpenFlags::SQLITE_OPEN_CREATE;
745    let conn = Connection::open_with_flags(path, flags)?;
746    conn.execute_batch(
747        "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
748         PRAGMA secure_delete=on;
749         PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
750         PRAGMA foreign_keys=on;
751         ",
752    )?;
753
754    // Avoid SQLITE_IOERR_GETTEMPPATH errors on Android and maybe other systems.
755    // Downside is more RAM consumption esp. on VACUUM.
756    // Therefore, on systems known to have working default (using files), stay with that.
757    if cfg!(not(target_os = "ios")) {
758        conn.pragma_update(None, "temp_store", "memory")?;
759    }
760
761    // Fail immediately when the database is busy,
762    // except for iOS. On iOS we don't have
763    // `accounts.lock` lockfile and the database
764    // is used by two processes:
765    // main process and the notification extension.
766    // Due to a bug they both may run at the same time
767    // and try to write to the database.
768    // As a workaround, we wait up to 1 minute and retry
769    // instead of failing immediately and
770    // possibly missing a message.
771    if cfg!(target_os = "ios") {
772        conn.busy_timeout(Duration::new(60, 0))?;
773    } else {
774        conn.busy_timeout(Duration::ZERO)?;
775    }
776
777    if !passphrase.is_empty() {
778        conn.pragma_update(None, "key", passphrase)?;
779    }
780    // Try to enable auto_vacuum. This will only be
781    // applied if the database is new or after successful
782    // VACUUM, which usually happens before backup export.
783    // When auto_vacuum is INCREMENTAL, it is possible to
784    // use PRAGMA incremental_vacuum to return unused
785    // database pages to the filesystem.
786    conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
787
788    conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
789    // Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
790    conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
791
792    Ok(conn)
793}
794
795// Tries to clear the freelist to free some space on the disk.
796//
797// This only works if auto_vacuum is enabled.
798async fn incremental_vacuum(context: &Context) -> Result<()> {
799    context
800        .sql
801        .call_write(move |conn| {
802            let mut stmt = conn
803                .prepare("PRAGMA incremental_vacuum")
804                .context("Failed to prepare incremental_vacuum statement")?;
805
806            // It is important to step the statement until it returns no more rows.
807            // Otherwise it will not free as many pages as it can:
808            // <https://stackoverflow.com/questions/53746807/sqlite-incremental-vacuum-removing-only-one-free-page>.
809            let mut rows = stmt
810                .query(())
811                .context("Failed to run incremental_vacuum statement")?;
812            let mut row_count = 0;
813            while let Some(_row) = rows
814                .next()
815                .context("Failed to step incremental_vacuum statement")?
816            {
817                row_count += 1;
818            }
819            info!(context, "Incremental vacuum freed {row_count} pages.");
820            Ok(())
821        })
822        .await
823}
824
825/// Cleanup the account to restore some storage and optimize the database.
826pub async fn housekeeping(context: &Context) -> Result<()> {
827    // Setting `Config::LastHousekeeping` at the beginning avoids endless loops when things do not
828    // work out for whatever reason or are interrupted by the OS.
829    if let Err(e) = context
830        .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
831        .await
832    {
833        warn!(context, "Can't set config: {e:#}.");
834    }
835
836    http_cache_cleanup(context)
837        .await
838        .context("Failed to cleanup HTTP cache")
839        .log_err(context)
840        .ok();
841    migrations::msgs_to_key_contacts(context)
842        .await
843        .context("migrations::msgs_to_key_contacts")
844        .log_err(context)
845        .ok();
846
847    if let Err(err) = remove_unused_files(context).await {
848        warn!(
849            context,
850            "Housekeeping: cannot remove unused files: {:#}.", err
851        );
852    }
853
854    if let Err(err) = start_ephemeral_timers(context).await {
855        warn!(
856            context,
857            "Housekeeping: cannot start ephemeral timers: {:#}.", err
858        );
859    }
860
861    if let Err(err) = prune_tombstones(&context.sql).await {
862        warn!(
863            context,
864            "Housekeeping: Cannot prune message tombstones: {:#}.", err
865        );
866    }
867
868    if let Err(err) = incremental_vacuum(context).await {
869        warn!(context, "Failed to run incremental vacuum: {err:#}.");
870    }
871    // Work around possible checkpoint starvations (there were cases reported when a WAL file is
872    // bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does
873    // not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see
874    // https://www.sqlite.org/wal.html.
875    if let Err(err) = Sql::wal_checkpoint(context).await {
876        warn!(context, "wal_checkpoint() failed: {err:#}.");
877        debug_assert!(false);
878    }
879
880    context
881        .sql
882        .execute(
883            "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
884            (SELECT id FROM msgs WHERE chat_id!=?)",
885            (DC_CHAT_ID_TRASH,),
886        )
887        .await
888        .context("failed to remove old MDNs")
889        .log_err(context)
890        .ok();
891
892    context
893        .sql
894        .execute(
895            "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
896            (SELECT id FROM msgs WHERE chat_id!=?)",
897            (DC_CHAT_ID_TRASH,),
898        )
899        .await
900        .context("failed to remove old webxdc status updates")
901        .log_err(context)
902        .ok();
903
904    prune_connection_history(context)
905        .await
906        .context("Failed to prune connection history")
907        .log_err(context)
908        .ok();
909    prune_dns_cache(context)
910        .await
911        .context("Failed to prune DNS cache")
912        .log_err(context)
913        .ok();
914
915    // Delete POI locations
916    // which don't have corresponding message.
917    delete_orphaned_poi_locations(context)
918        .await
919        .context("Failed to delete orphaned POI locations")
920        .log_err(context)
921        .ok();
922
923    info!(context, "Housekeeping done.");
924    Ok(())
925}
926
927/// Get the value of a column `idx` of the `row` as `Vec<u8>`.
928pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
929    row.get(idx).or_else(|err| match row.get_ref(idx)? {
930        ValueRef::Null => Ok(Vec::new()),
931        ValueRef::Text(text) => Ok(text.to_vec()),
932        ValueRef::Blob(blob) => Ok(blob.to_vec()),
933        ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
934    })
935}
936
937/// Enumerates used files in the blobdir and removes unused ones.
938pub async fn remove_unused_files(context: &Context) -> Result<()> {
939    let mut files_in_use = HashSet::new();
940    let mut unreferenced_count = 0;
941
942    info!(context, "Start housekeeping...");
943    maybe_add_from_param(
944        &context.sql,
945        &mut files_in_use,
946        "SELECT param FROM msgs  WHERE chat_id!=3   AND type!=10;",
947        Param::File,
948    )
949    .await?;
950    maybe_add_from_param(
951        &context.sql,
952        &mut files_in_use,
953        "SELECT param FROM chats;",
954        Param::ProfileImage,
955    )
956    .await?;
957    maybe_add_from_param(
958        &context.sql,
959        &mut files_in_use,
960        "SELECT param FROM contacts;",
961        Param::ProfileImage,
962    )
963    .await?;
964
965    context
966        .sql
967        .query_map(
968            "SELECT value FROM config;",
969            (),
970            |row| {
971                let row: String = row.get(0)?;
972                Ok(row)
973            },
974            |rows| {
975                for row in rows {
976                    maybe_add_file(&mut files_in_use, &row?);
977                }
978                Ok(())
979            },
980        )
981        .await
982        .context("housekeeping: failed to SELECT value FROM config")?;
983
984    context
985        .sql
986        .query_map(
987            "SELECT blobname FROM http_cache",
988            (),
989            |row| {
990                let row: String = row.get(0)?;
991                Ok(row)
992            },
993            |rows| {
994                for row in rows {
995                    maybe_add_file(&mut files_in_use, &row?);
996                }
997                Ok(())
998            },
999        )
1000        .await
1001        .context("Failed to SELECT blobname FROM http_cache")?;
1002
1003    info!(context, "{} files in use.", files_in_use.len());
1004    /* go through directories and delete unused files */
1005    let blobdir = context.get_blobdir();
1006    for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
1007        match tokio::fs::read_dir(p).await {
1008            Ok(mut dir_handle) => {
1009                /* avoid deletion of files that are just created to build a message object */
1010                let diff = std::time::Duration::from_secs(60 * 60);
1011                let keep_files_newer_than = SystemTime::now()
1012                    .checked_sub(diff)
1013                    .unwrap_or(SystemTime::UNIX_EPOCH);
1014
1015                while let Ok(Some(entry)) = dir_handle.next_entry().await {
1016                    let name_f = entry.file_name();
1017                    let name_s = name_f.to_string_lossy();
1018
1019                    if p == blobdir
1020                        && (is_file_in_use(&files_in_use, None, &name_s)
1021                            || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
1022                            || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
1023                    {
1024                        continue;
1025                    }
1026
1027                    let stats = match tokio::fs::metadata(entry.path()).await {
1028                        Err(err) => {
1029                            warn!(
1030                                context,
1031                                "Cannot get metadata for {}: {:#}.",
1032                                entry.path().display(),
1033                                err
1034                            );
1035                            continue;
1036                        }
1037                        Ok(stats) => stats,
1038                    };
1039
1040                    if stats.is_dir() {
1041                        if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
1042                            // The dir could be created not by a user, but by a desktop
1043                            // environment f.e. So, no warning.
1044                            info!(
1045                                context,
1046                                "Housekeeping: Cannot rmdir {}: {:#}.",
1047                                entry.path().display(),
1048                                e
1049                            );
1050                        }
1051                        continue;
1052                    }
1053
1054                    unreferenced_count += 1;
1055                    let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
1056                    let recently_modified =
1057                        stats.modified().is_ok_and(|t| t > keep_files_newer_than);
1058                    let recently_accessed =
1059                        stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
1060
1061                    if p == blobdir && (recently_created || recently_modified || recently_accessed)
1062                    {
1063                        info!(
1064                            context,
1065                            "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
1066                            unreferenced_count,
1067                            entry.file_name(),
1068                        );
1069                        continue;
1070                    }
1071
1072                    info!(
1073                        context,
1074                        "Housekeeping: Deleting unreferenced file #{}: {:?}.",
1075                        unreferenced_count,
1076                        entry.file_name()
1077                    );
1078                    let path = entry.path();
1079                    if let Err(err) = delete_file(context, &path).await {
1080                        error!(
1081                            context,
1082                            "Failed to delete unused file {}: {:#}.",
1083                            path.display(),
1084                            err
1085                        );
1086                    }
1087                }
1088            }
1089            Err(err) => {
1090                if !p.ends_with(BLOBS_BACKUP_NAME) {
1091                    warn!(
1092                        context,
1093                        "Housekeeping: Cannot read dir {}: {:#}.",
1094                        p.display(),
1095                        err
1096                    );
1097                }
1098            }
1099        }
1100    }
1101
1102    Ok(())
1103}
1104
1105fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
1106    let name_to_check = if let Some(namespc) = namespc_opt {
1107        let Some(name) = name.strip_suffix(namespc) else {
1108            return false;
1109        };
1110        name
1111    } else {
1112        name
1113    };
1114    files_in_use.contains(name_to_check)
1115}
1116
1117fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1118    if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1119        files_in_use.insert(file.to_string());
1120    }
1121}
1122
1123async fn maybe_add_from_param(
1124    sql: &Sql,
1125    files_in_use: &mut HashSet<String>,
1126    query: &str,
1127    param_id: Param,
1128) -> Result<()> {
1129    sql.query_map(
1130        query,
1131        (),
1132        |row| {
1133            let row: String = row.get(0)?;
1134            Ok(row)
1135        },
1136        |rows| {
1137            for row in rows {
1138                let param: Params = row?.parse().unwrap_or_default();
1139                if let Some(file) = param.get(param_id) {
1140                    maybe_add_file(files_in_use, file);
1141                }
1142            }
1143            Ok(())
1144        },
1145    )
1146    .await
1147    .context(format!("housekeeping: failed to add_from_param {query}"))?;
1148
1149    Ok(())
1150}
1151
1152/// Removes from the database stale locally deleted messages that also don't
1153/// have a server UID.
1154async fn prune_tombstones(sql: &Sql) -> Result<()> {
1155    // Keep tombstones for the last two days to prevent redownloading locally deleted messages.
1156    let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1157    sql.execute(
1158        "DELETE FROM msgs
1159         WHERE chat_id=?
1160         AND timestamp<=?
1161         AND NOT EXISTS (
1162         SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1163         )",
1164        (DC_CHAT_ID_TRASH, timestamp_max),
1165    )
1166    .await?;
1167    Ok(())
1168}
1169
1170#[cfg(test)]
1171mod sql_tests;