deltachat/
sql.rs

1//! # SQLite wrapper.
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, bail, ensure};
8use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
9use tokio::sync::RwLock;
10
11use crate::blob::BlobObject;
12use crate::chat::add_device_msg;
13use crate::config::Config;
14use crate::constants::DC_CHAT_ID_TRASH;
15use crate::context::Context;
16use crate::debug_logging::set_debug_logging_xdc;
17use crate::ephemeral::start_ephemeral_timers;
18use crate::imex::BLOBS_BACKUP_NAME;
19use crate::location::delete_orphaned_poi_locations;
20use crate::log::{LogExt, warn};
21use crate::message::Message;
22use crate::message::MsgId;
23use crate::net::dns::prune_dns_cache;
24use crate::net::http::http_cache_cleanup;
25use crate::net::prune_connection_history;
26use crate::param::{Param, Params};
27use crate::stock_str;
28use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
29
30/// Extension to [`rusqlite::ToSql`] trait
31/// which also includes [`Send`] and [`Sync`].
32pub trait ToSql: rusqlite::ToSql + Send + Sync {}
33
34impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
35
36/// Constructs a slice of trait object references `&dyn ToSql`.
37///
38/// One of the uses is passing more than 16 parameters
39/// to a query, because [`rusqlite::Params`] is only implemented
40/// for tuples of up to 16 elements.
41#[macro_export]
42macro_rules! params_slice {
43    ($($param:expr),+) => {
44        [$(&$param as &dyn $crate::sql::ToSql),+]
45    };
46}
47
48mod migrations;
49mod pool;
50
51use pool::Pool;
52
53/// A wrapper around the underlying Sqlite3 object.
54#[derive(Debug)]
55pub struct Sql {
56    /// Database file path
57    pub(crate) dbfile: PathBuf,
58
59    /// SQL connection pool.
60    pool: RwLock<Option<Pool>>,
61
62    /// None if the database is not open, true if it is open with passphrase and false if it is
63    /// open without a passphrase.
64    is_encrypted: RwLock<Option<bool>>,
65
66    /// Cache of `config` table.
67    pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
68}
69
70impl Sql {
71    /// Creates new SQL database.
72    pub fn new(dbfile: PathBuf) -> Sql {
73        Self {
74            dbfile,
75            pool: Default::default(),
76            is_encrypted: Default::default(),
77            config_cache: Default::default(),
78        }
79    }
80
81    /// Tests SQLCipher passphrase.
82    ///
83    /// Returns true if passphrase is correct, i.e. the database is new or can be unlocked with
84    /// this passphrase, and false if the database is already encrypted with another passphrase or
85    /// corrupted.
86    ///
87    /// Fails if database is already open.
88    pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
89        if self.is_open().await {
90            bail!("Database is already opened.");
91        }
92
93        // Hold the lock to prevent other thread from opening the database.
94        let _lock = self.pool.write().await;
95
96        // Test that the key is correct using a single connection.
97        let connection = Connection::open(&self.dbfile)?;
98        if !passphrase.is_empty() {
99            connection
100                .pragma_update(None, "key", &passphrase)
101                .context("Failed to set PRAGMA key")?;
102        }
103        let key_is_correct = connection
104            .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
105            .is_ok();
106
107        Ok(key_is_correct)
108    }
109
110    /// Checks if there is currently a connection to the underlying Sqlite database.
111    pub async fn is_open(&self) -> bool {
112        self.pool.read().await.is_some()
113    }
114
115    /// Returns true if the database is encrypted.
116    ///
117    /// If database is not open, returns `None`.
118    pub(crate) async fn is_encrypted(&self) -> Option<bool> {
119        *self.is_encrypted.read().await
120    }
121
122    /// Closes all underlying Sqlite connections.
123    pub(crate) async fn close(&self) {
124        let _ = self.pool.write().await.take();
125        // drop closes the connection
126    }
127
128    /// Imports the database from a separate file with the given passphrase.
129    pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
130        let path_str = path
131            .to_str()
132            .with_context(|| format!("path {path:?} is not valid unicode"))?
133            .to_string();
134
135        // Keep `config_cache` locked all the time the db is imported so that nobody can use invalid
136        // values from there. And clear it immediately so as not to forget in case of errors.
137        let mut config_cache = self.config_cache.write().await;
138        config_cache.clear();
139
140        let query_only = false;
141        self.call(query_only, move |conn| {
142            // Check that backup passphrase is correct before resetting our database.
143            conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
144                .context("failed to attach backup database")?;
145            let res = conn
146                .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
147                .context("backup passphrase is not correct");
148
149            // Reset the database without reopening it. We don't want to reopen the database because we
150            // don't have main database passphrase at this point.
151            // See <https://sqlite.org/c3ref/c_dbconfig_enable_fkey.html> for documentation.
152            // Without resetting import may fail due to existing tables.
153            res.and_then(|_| {
154                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
155                    .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
156            })
157            .and_then(|_| {
158                conn.execute("VACUUM", [])
159                    .context("failed to vacuum the database")
160            })
161            .and(
162                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
163                    .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
164            )
165            .and_then(|_| {
166                conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
167                    Ok(())
168                })
169                .context("failed to import from attached backup database")
170            })
171            .and(
172                conn.execute("DETACH DATABASE backup", [])
173                    .context("failed to detach backup database"),
174            )?;
175            Ok(())
176        })
177        .await
178    }
179
180    const N_DB_CONNECTIONS: usize = 3;
181
182    /// Creates a new connection pool.
183    fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
184        let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
185        for _ in 0..Self::N_DB_CONNECTIONS {
186            let connection = new_connection(dbfile, &passphrase)?;
187            connections.push(connection);
188        }
189
190        let pool = Pool::new(connections);
191        Ok(pool)
192    }
193
194    async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
195        *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
196
197        if let Err(e) = self.run_migrations(context).await {
198            error!(context, "Running migrations failed: {e:#}");
199            // Emiting an error event probably doesn't work
200            // because we are in the process of opening the context,
201            // so there is no event emitter yet.
202            // So, try to report the error in other ways:
203            eprintln!("Running migrations failed: {e:#}");
204            context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
205            // We can't simply close the db for two reasons:
206            // a. backup export would fail
207            // b. The UI would think that the account is unconfigured (because `is_configured()` fails)
208            // and remove the account when the user presses "Back"
209        }
210
211        Ok(())
212    }
213
214    /// Updates SQL schema to the latest version.
215    pub async fn run_migrations(&self, context: &Context) -> Result<()> {
216        // (1) update low-level database structure.
217        // this should be done before updates that use high-level objects that
218        // rely themselves on the low-level structure.
219
220        let recode_avatar = migrations::run(context, self)
221            .await
222            .context("failed to run migrations")?;
223
224        // (2) updates that require high-level objects
225        // the structure is complete now and all objects are usable
226
227        if recode_avatar && let Some(avatar) = context.get_config(Config::Selfavatar).await? {
228            let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
229            match blob.recode_to_avatar_size(context).await {
230                Ok(()) => {
231                    if let Some(path) = blob.to_abs_path().to_str() {
232                        context
233                            .set_config_internal(Config::Selfavatar, Some(path))
234                            .await?;
235                    } else {
236                        warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
237                    }
238                }
239                Err(e) => {
240                    warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
241                    context
242                        .set_config_internal(Config::Selfavatar, None)
243                        .await?
244                }
245            }
246        }
247
248        Ok(())
249    }
250
251    /// Opens the provided database and runs any necessary migrations.
252    /// If a database is already open, this will return an error.
253    pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
254        if self.is_open().await {
255            error!(
256                context,
257                "Cannot open, database \"{:?}\" already opened.", self.dbfile,
258            );
259            bail!("SQL database is already opened.");
260        }
261
262        let passphrase_nonempty = !passphrase.is_empty();
263        self.try_open(context, &self.dbfile, passphrase).await?;
264        info!(context, "Opened database {:?}.", self.dbfile);
265        *self.is_encrypted.write().await = Some(passphrase_nonempty);
266
267        // setup debug logging if there is an entry containing its id
268        if let Some(xdc_id) = self
269            .get_raw_config_u32(Config::DebugLogging.as_ref())
270            .await?
271        {
272            set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
273        }
274        Ok(())
275    }
276
277    /// Changes the passphrase of encrypted database.
278    ///
279    /// The database must already be encrypted and the passphrase cannot be empty.
280    /// It is impossible to turn encrypted database into unencrypted
281    /// and vice versa this way, use import/export for this.
282    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
283        let mut lock = self.pool.write().await;
284
285        let pool = lock.take().context("SQL connection pool is not open")?;
286        let query_only = false;
287        let conn = pool.get(query_only).await?;
288        if !passphrase.is_empty() {
289            conn.pragma_update(None, "rekey", passphrase.clone())
290                .context("Failed to set PRAGMA rekey")?;
291        }
292        drop(pool);
293
294        *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
295
296        Ok(())
297    }
298
299    /// Allocates a connection and calls `function` with the connection.
300    ///
301    /// If `query_only` is true, allocates read-only connection,
302    /// otherwise allocates write connection.
303    ///
304    /// Returns the result of the function.
305    async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
306    where
307        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
308        R: Send + 'static,
309    {
310        let lock = self.pool.read().await;
311        let pool = lock.as_ref().context("no SQL connection")?;
312        let mut conn = pool.get(query_only).await?;
313        let res = tokio::task::block_in_place(move || function(&mut conn))?;
314        Ok(res)
315    }
316
317    /// Allocates a connection and calls given function, assuming it does write queries, with the
318    /// connection.
319    ///
320    /// Returns the result of the function.
321    pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
322    where
323        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
324        R: Send + 'static,
325    {
326        let query_only = false;
327        self.call(query_only, function).await
328    }
329
330    /// Execute `query` assuming it is a write query, returning the number of affected rows.
331    pub async fn execute(
332        &self,
333        query: &str,
334        params: impl rusqlite::Params + Send,
335    ) -> Result<usize> {
336        self.call_write(move |conn| {
337            let res = conn.execute(query, params)?;
338            Ok(res)
339        })
340        .await
341    }
342
343    /// Executes the given query, returning the last inserted row ID.
344    pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
345        self.call_write(move |conn| {
346            conn.execute(query, params)?;
347            Ok(conn.last_insert_rowid())
348        })
349        .await
350    }
351
352    /// Prepares and executes the statement and maps a function over the resulting rows.
353    /// Then executes the second function over the returned iterator and returns the
354    /// result of that function.
355    pub async fn query_map<T, F, G, H>(
356        &self,
357        sql: &str,
358        params: impl rusqlite::Params + Send,
359        f: F,
360        g: G,
361    ) -> Result<H>
362    where
363        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
364        G: Send + FnOnce(rusqlite::AndThenRows<F>) -> Result<H>,
365        H: Send + 'static,
366    {
367        let query_only = true;
368        self.call(query_only, move |conn| {
369            let mut stmt = conn.prepare(sql)?;
370            let res = stmt.query_and_then(params, f)?;
371            g(res)
372        })
373        .await
374    }
375
376    /// Prepares and executes the statement and maps a function over the resulting rows.
377    ///
378    /// Collects the resulting rows into a generic structure.
379    pub async fn query_map_collect<T, C, F>(
380        &self,
381        sql: &str,
382        params: impl rusqlite::Params + Send,
383        f: F,
384    ) -> Result<C>
385    where
386        T: Send + 'static,
387        C: Send + 'static + std::iter::FromIterator<T>,
388        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
389    {
390        self.query_map(sql, params, f, |rows| {
391            rows.collect::<std::result::Result<C, _>>()
392        })
393        .await
394    }
395
396    /// Prepares and executes the statement and maps a function over the resulting rows.
397    ///
398    /// Collects the resulting rows into a `Vec`.
399    pub async fn query_map_vec<T, F>(
400        &self,
401        sql: &str,
402        params: impl rusqlite::Params + Send,
403        f: F,
404    ) -> Result<Vec<T>>
405    where
406        T: Send + 'static,
407        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
408    {
409        self.query_map_collect(sql, params, f).await
410    }
411
412    /// Used for executing `SELECT COUNT` statements only. Returns the resulting count.
413    pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
414        let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
415        Ok(usize::try_from(count)?)
416    }
417
418    /// Used for executing `SELECT COUNT` statements only. Returns `true`, if the count is at least
419    /// one, `false` otherwise.
420    pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
421        let count = self.count(sql, params).await?;
422        Ok(count > 0)
423    }
424
425    /// Execute a query which is expected to return one row.
426    pub async fn query_row<T, F>(
427        &self,
428        query: &str,
429        params: impl rusqlite::Params + Send,
430        f: F,
431    ) -> Result<T>
432    where
433        F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
434        T: Send + 'static,
435    {
436        let query_only = true;
437        self.call(query_only, move |conn| {
438            let res = conn.query_row(query, params, f)?;
439            Ok(res)
440        })
441        .await
442    }
443
444    /// Execute the function inside a transaction assuming that it does writes.
445    ///
446    /// If the function returns an error, the transaction will be rolled back. If it does not return an
447    /// error, the transaction will be committed.
448    pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
449    where
450        H: Send + 'static,
451        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
452    {
453        let query_only = false;
454        self.transaction_ex(query_only, callback).await
455    }
456
457    /// Execute the function inside a transaction.
458    ///
459    /// * `query_only` - Whether the function only executes read statements (queries) and can be run
460    ///   in parallel with other transactions. NB: Creating and modifying temporary tables are also
461    ///   allowed with `query_only`, temporary tables aren't visible in other connections, but you
462    ///   need to pass `PRAGMA query_only=0;` to SQLite before that:
463    ///   ```text
464    ///   pragma_update(None, "query_only", "0")
465    ///   ```
466    ///   Also temporary tables need to be dropped because the connection is returned to the pool
467    ///   then.
468    ///
469    /// If the function returns an error, the transaction will be rolled back. If it does not return
470    /// an error, the transaction will be committed.
471    pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
472    where
473        H: Send + 'static,
474        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
475    {
476        self.call(query_only, move |conn| {
477            let mut transaction = conn.transaction()?;
478            let ret = callback(&mut transaction);
479
480            match ret {
481                Ok(ret) => {
482                    transaction.commit()?;
483                    Ok(ret)
484                }
485                Err(err) => {
486                    transaction.rollback()?;
487                    Err(err)
488                }
489            }
490        })
491        .await
492    }
493
494    /// Query the database if the requested table already exists.
495    pub async fn table_exists(&self, name: &str) -> Result<bool> {
496        let query_only = true;
497        self.call(query_only, move |conn| {
498            let mut exists = false;
499            conn.pragma(None, "table_info", name.to_string(), |_row| {
500                // will only be executed if the info was found
501                exists = true;
502                Ok(())
503            })?;
504
505            Ok(exists)
506        })
507        .await
508    }
509
510    /// Check if a column exists in a given table.
511    pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
512        let query_only = true;
513        self.call(query_only, move |conn| {
514            let mut exists = false;
515            // `PRAGMA table_info` returns one row per column,
516            // each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
517            conn.pragma(None, "table_info", table_name.to_string(), |row| {
518                let curr_name: String = row.get(1)?;
519                if col_name == curr_name {
520                    exists = true;
521                }
522                Ok(())
523            })?;
524
525            Ok(exists)
526        })
527        .await
528    }
529
530    /// Execute a query which is expected to return zero or one row.
531    pub async fn query_row_optional<T, F>(
532        &self,
533        sql: &str,
534        params: impl rusqlite::Params + Send,
535        f: F,
536    ) -> Result<Option<T>>
537    where
538        F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
539        T: Send + 'static,
540    {
541        let query_only = true;
542        self.call(query_only, move |conn| {
543            match conn.query_row(sql.as_ref(), params, f) {
544                Ok(res) => Ok(Some(res)),
545                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
546                Err(err) => Err(err.into()),
547            }
548        })
549        .await
550    }
551
552    /// Executes a query which is expected to return one row and one
553    /// column. If the query does not return any rows, returns `Ok(None)`.
554    pub async fn query_get_value<T>(
555        &self,
556        query: &str,
557        params: impl rusqlite::Params + Send,
558    ) -> Result<Option<T>>
559    where
560        T: rusqlite::types::FromSql + Send + 'static,
561    {
562        self.query_row_optional(query, params, |row| row.get::<_, T>(0))
563            .await
564    }
565
566    /// Set private configuration options.
567    ///
568    /// Setting `None` deletes the value.  On failure an error message
569    /// will already have been logged.
570    pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
571        let mut lock = self.config_cache.write().await;
572        if let Some(value) = value {
573            self.execute(
574                "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
575                (key, value),
576            )
577            .await?;
578        } else {
579            self.execute("DELETE FROM config WHERE keyname=?", (key,))
580                .await?;
581        }
582        lock.insert(key.to_string(), value.map(|s| s.to_string()));
583        drop(lock);
584
585        Ok(())
586    }
587
588    /// Get configuration options from the database.
589    pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
590        let lock = self.config_cache.read().await;
591        let cached = lock.get(key).cloned();
592        drop(lock);
593
594        if let Some(c) = cached {
595            return Ok(c);
596        }
597
598        let mut lock = self.config_cache.write().await;
599        let value = self
600            .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
601            .await
602            .context(format!("failed to fetch raw config: {key}"))?;
603        lock.insert(key.to_string(), value.clone());
604        drop(lock);
605
606        Ok(value)
607    }
608
609    /// Removes the `key`'s value from the cache.
610    pub(crate) async fn uncache_raw_config(&self, key: &str) {
611        let mut lock = self.config_cache.write().await;
612        lock.remove(key);
613    }
614
615    /// Sets configuration for the given key to 32-bit signed integer value.
616    pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
617        self.set_raw_config(key, Some(&format!("{value}"))).await
618    }
619
620    /// Returns 32-bit signed integer configuration value for the given key.
621    pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
622        self.get_raw_config(key)
623            .await
624            .map(|s| s.and_then(|s| s.parse().ok()))
625    }
626
627    /// Returns 32-bit unsigned integer configuration value for the given key.
628    pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
629        self.get_raw_config(key)
630            .await
631            .map(|s| s.and_then(|s| s.parse().ok()))
632    }
633
634    /// Returns boolean configuration value for the given key.
635    pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
636        // Not the most obvious way to encode bool as string, but it is matter
637        // of backward compatibility.
638        let res = self.get_raw_config_int(key).await?;
639        Ok(res.unwrap_or_default() > 0)
640    }
641
642    /// Sets configuration for the given key to boolean value.
643    pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
644        let value = if value { Some("1") } else { None };
645        self.set_raw_config(key, value).await
646    }
647
648    /// Sets configuration for the given key to 64-bit signed integer value.
649    pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
650        self.set_raw_config(key, Some(&format!("{value}"))).await
651    }
652
653    /// Returns 64-bit signed integer configuration value for the given key.
654    pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
655        self.get_raw_config(key)
656            .await
657            .map(|s| s.and_then(|r| r.parse().ok()))
658    }
659
660    /// Returns configuration cache.
661    #[cfg(feature = "internals")]
662    pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
663        &self.config_cache
664    }
665
666    /// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes.
667    pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
668        let t_start = Time::now();
669        let lock = context.sql.pool.read().await;
670        let Some(pool) = lock.as_ref() else {
671            // No db connections, nothing to checkpoint.
672            return Ok(());
673        };
674
675        // Do as much work as possible without blocking anybody.
676        let query_only = true;
677        let conn = pool.get(query_only).await?;
678        tokio::task::block_in_place(|| {
679            // Execute some transaction causing the WAL file to be opened so that the
680            // `wal_checkpoint()` can proceed, otherwise it fails when called the first time,
681            // see https://sqlite.org/forum/forumpost/7512d76a05268fc8.
682            conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
683            conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
684        })?;
685
686        // Kick out writers.
687        const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
688        let _write_lock = pool.write_lock().await;
689        let t_writers_blocked = Time::now();
690        // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so
691        // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's
692        // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and
693        // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new
694        // readers.
695        let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
696        for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
697            read_conns.push(pool.get(query_only).await?);
698        }
699        read_conns.clear();
700        // Checkpoint the remaining WAL pages without blocking readers.
701        let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
702            conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
703                let pages_total: i64 = row.get(1)?;
704                let pages_checkpointed: i64 = row.get(2)?;
705                Ok((pages_total, pages_checkpointed))
706            })
707        })?;
708        if pages_checkpointed < pages_total {
709            warn!(
710                context,
711                "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
712            );
713        }
714        // Kick out readers to avoid blocking/SQLITE_BUSY.
715        for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
716            read_conns.push(pool.get(query_only).await?);
717        }
718        let t_readers_blocked = Time::now();
719        tokio::task::block_in_place(|| {
720            let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
721                let blocked: i64 = row.get(0)?;
722                Ok(blocked)
723            })?;
724            ensure!(blocked == 0);
725            Ok(())
726        })?;
727        info!(
728            context,
729            "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
730            time_elapsed(&t_start),
731            time_elapsed(&t_writers_blocked),
732            time_elapsed(&t_readers_blocked),
733        );
734        Ok(())
735    }
736}
737
738/// Creates a new SQLite connection.
739///
740/// `path` is the database path.
741///
742/// `passphrase` is the SQLCipher database passphrase.
743/// Empty string if database is not encrypted.
744fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
745    let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
746        | OpenFlags::SQLITE_OPEN_READ_WRITE
747        | OpenFlags::SQLITE_OPEN_CREATE;
748    let conn = Connection::open_with_flags(path, flags)?;
749    conn.execute_batch(
750        "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
751         PRAGMA secure_delete=on;
752         PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
753         PRAGMA foreign_keys=on;
754         ",
755    )?;
756
757    // Avoid SQLITE_IOERR_GETTEMPPATH errors on Android and maybe other systems.
758    // Downside is more RAM consumption esp. on VACUUM.
759    // Therefore, on systems known to have working default (using files), stay with that.
760    if cfg!(not(target_os = "ios")) {
761        conn.pragma_update(None, "temp_store", "memory")?;
762    }
763
764    // Fail immediately when the database is busy,
765    // except for iOS. On iOS we don't have
766    // `accounts.lock` lockfile and the database
767    // is used by two processes:
768    // main process and the notification extension.
769    // Due to a bug they both may run at the same time
770    // and try to write to the database.
771    // As a workaround, we wait up to 1 minute and retry
772    // instead of failing immediately and
773    // possibly missing a message.
774    if cfg!(target_os = "ios") {
775        conn.busy_timeout(Duration::new(60, 0))?;
776    } else {
777        conn.busy_timeout(Duration::ZERO)?;
778    }
779
780    if !passphrase.is_empty() {
781        conn.pragma_update(None, "key", passphrase)?;
782    }
783    // Try to enable auto_vacuum. This will only be
784    // applied if the database is new or after successful
785    // VACUUM, which usually happens before backup export.
786    // When auto_vacuum is INCREMENTAL, it is possible to
787    // use PRAGMA incremental_vacuum to return unused
788    // database pages to the filesystem.
789    conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
790
791    conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
792    // Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
793    conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
794
795    Ok(conn)
796}
797
798// Tries to clear the freelist to free some space on the disk.
799//
800// This only works if auto_vacuum is enabled.
801async fn incremental_vacuum(context: &Context) -> Result<()> {
802    context
803        .sql
804        .call_write(move |conn| {
805            let mut stmt = conn
806                .prepare("PRAGMA incremental_vacuum")
807                .context("Failed to prepare incremental_vacuum statement")?;
808
809            // It is important to step the statement until it returns no more rows.
810            // Otherwise it will not free as many pages as it can:
811            // <https://stackoverflow.com/questions/53746807/sqlite-incremental-vacuum-removing-only-one-free-page>.
812            let mut rows = stmt
813                .query(())
814                .context("Failed to run incremental_vacuum statement")?;
815            let mut row_count = 0;
816            while let Some(_row) = rows
817                .next()
818                .context("Failed to step incremental_vacuum statement")?
819            {
820                row_count += 1;
821            }
822            info!(context, "Incremental vacuum freed {row_count} pages.");
823            Ok(())
824        })
825        .await
826}
827
828/// Cleanup the account to restore some storage and optimize the database.
829pub async fn housekeeping(context: &Context) -> Result<()> {
830    // Setting `Config::LastHousekeeping` at the beginning avoids endless loops when things do not
831    // work out for whatever reason or are interrupted by the OS.
832    if let Err(e) = context
833        .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
834        .await
835    {
836        warn!(context, "Can't set config: {e:#}.");
837    }
838
839    http_cache_cleanup(context)
840        .await
841        .context("Failed to cleanup HTTP cache")
842        .log_err(context)
843        .ok();
844    migrations::msgs_to_key_contacts(context)
845        .await
846        .context("migrations::msgs_to_key_contacts")
847        .log_err(context)
848        .ok();
849
850    if let Err(err) = remove_unused_files(context).await {
851        warn!(
852            context,
853            "Housekeeping: cannot remove unused files: {:#}.", err
854        );
855    }
856
857    if let Err(err) = start_ephemeral_timers(context).await {
858        warn!(
859            context,
860            "Housekeeping: cannot start ephemeral timers: {:#}.", err
861        );
862    }
863
864    if let Err(err) = prune_tombstones(&context.sql).await {
865        warn!(
866            context,
867            "Housekeeping: Cannot prune message tombstones: {:#}.", err
868        );
869    }
870
871    maybe_add_mvbox_move_deprecation_message(context)
872        .await
873        .context("maybe_add_mvbox_move_deprecation_message")
874        .log_err(context)
875        .ok();
876
877    if let Err(err) = incremental_vacuum(context).await {
878        warn!(context, "Failed to run incremental vacuum: {err:#}.");
879    }
880    // Work around possible checkpoint starvations (there were cases reported when a WAL file is
881    // bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does
882    // not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see
883    // https://www.sqlite.org/wal.html.
884    if let Err(err) = Sql::wal_checkpoint(context).await {
885        warn!(context, "wal_checkpoint() failed: {err:#}.");
886        debug_assert!(false);
887    }
888
889    context
890        .sql
891        .execute(
892            "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
893            (SELECT id FROM msgs WHERE chat_id!=?)",
894            (DC_CHAT_ID_TRASH,),
895        )
896        .await
897        .context("failed to remove old MDNs")
898        .log_err(context)
899        .ok();
900
901    context
902        .sql
903        .execute(
904            "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
905            (SELECT id FROM msgs WHERE chat_id!=?)",
906            (DC_CHAT_ID_TRASH,),
907        )
908        .await
909        .context("failed to remove old webxdc status updates")
910        .log_err(context)
911        .ok();
912
913    prune_connection_history(context)
914        .await
915        .context("Failed to prune connection history")
916        .log_err(context)
917        .ok();
918    prune_dns_cache(context)
919        .await
920        .context("Failed to prune DNS cache")
921        .log_err(context)
922        .ok();
923
924    // Delete POI locations
925    // which don't have corresponding message.
926    delete_orphaned_poi_locations(context)
927        .await
928        .context("Failed to delete orphaned POI locations")
929        .log_err(context)
930        .ok();
931
932    info!(context, "Housekeeping done.");
933    Ok(())
934}
935
936/// Adds device message about `mvbox_move` config deprecation
937/// if the user has it enabled.
938async fn maybe_add_mvbox_move_deprecation_message(context: &Context) -> Result<()> {
939    if !context.get_config_bool(Config::OnlyFetchMvbox).await?
940        && context.get_config_bool(Config::MvboxMove).await?
941    {
942        let mut msg = Message::new_text(stock_str::mvbox_move_deprecation(context).await);
943        add_device_msg(context, Some("mvbox_move_deprecation"), Some(&mut msg)).await?;
944    }
945    Ok(())
946}
947
948/// Get the value of a column `idx` of the `row` as `Vec<u8>`.
949pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
950    row.get(idx).or_else(|err| match row.get_ref(idx)? {
951        ValueRef::Null => Ok(Vec::new()),
952        ValueRef::Text(text) => Ok(text.to_vec()),
953        ValueRef::Blob(blob) => Ok(blob.to_vec()),
954        ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
955    })
956}
957
958/// Enumerates used files in the blobdir and removes unused ones.
959pub async fn remove_unused_files(context: &Context) -> Result<()> {
960    let mut files_in_use = HashSet::new();
961    let mut unreferenced_count = 0;
962
963    info!(context, "Start housekeeping...");
964    maybe_add_from_param(
965        &context.sql,
966        &mut files_in_use,
967        "SELECT param FROM msgs  WHERE chat_id!=3   AND type!=10;",
968        Param::File,
969    )
970    .await?;
971    maybe_add_from_param(
972        &context.sql,
973        &mut files_in_use,
974        "SELECT param FROM chats;",
975        Param::ProfileImage,
976    )
977    .await?;
978    maybe_add_from_param(
979        &context.sql,
980        &mut files_in_use,
981        "SELECT param FROM contacts;",
982        Param::ProfileImage,
983    )
984    .await?;
985
986    context
987        .sql
988        .query_map(
989            "SELECT value FROM config;",
990            (),
991            |row| {
992                let row: String = row.get(0)?;
993                Ok(row)
994            },
995            |rows| {
996                for row in rows {
997                    maybe_add_file(&mut files_in_use, &row?);
998                }
999                Ok(())
1000            },
1001        )
1002        .await
1003        .context("housekeeping: failed to SELECT value FROM config")?;
1004
1005    context
1006        .sql
1007        .query_map(
1008            "SELECT blobname FROM http_cache",
1009            (),
1010            |row| {
1011                let row: String = row.get(0)?;
1012                Ok(row)
1013            },
1014            |rows| {
1015                for row in rows {
1016                    maybe_add_file(&mut files_in_use, &row?);
1017                }
1018                Ok(())
1019            },
1020        )
1021        .await
1022        .context("Failed to SELECT blobname FROM http_cache")?;
1023
1024    info!(context, "{} files in use.", files_in_use.len());
1025    /* go through directories and delete unused files */
1026    let blobdir = context.get_blobdir();
1027    for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
1028        match tokio::fs::read_dir(p).await {
1029            Ok(mut dir_handle) => {
1030                /* avoid deletion of files that are just created to build a message object */
1031                let diff = std::time::Duration::from_secs(60 * 60);
1032                let keep_files_newer_than = SystemTime::now()
1033                    .checked_sub(diff)
1034                    .unwrap_or(SystemTime::UNIX_EPOCH);
1035
1036                while let Ok(Some(entry)) = dir_handle.next_entry().await {
1037                    let name_f = entry.file_name();
1038                    let name_s = name_f.to_string_lossy();
1039
1040                    if p == blobdir
1041                        && (is_file_in_use(&files_in_use, None, &name_s)
1042                            || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
1043                            || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
1044                    {
1045                        continue;
1046                    }
1047
1048                    let stats = match tokio::fs::metadata(entry.path()).await {
1049                        Err(err) => {
1050                            warn!(
1051                                context,
1052                                "Cannot get metadata for {}: {:#}.",
1053                                entry.path().display(),
1054                                err
1055                            );
1056                            continue;
1057                        }
1058                        Ok(stats) => stats,
1059                    };
1060
1061                    if stats.is_dir() {
1062                        if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
1063                            // The dir could be created not by a user, but by a desktop
1064                            // environment f.e. So, no warning.
1065                            info!(
1066                                context,
1067                                "Housekeeping: Cannot rmdir {}: {:#}.",
1068                                entry.path().display(),
1069                                e
1070                            );
1071                        }
1072                        continue;
1073                    }
1074
1075                    unreferenced_count += 1;
1076                    let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
1077                    let recently_modified =
1078                        stats.modified().is_ok_and(|t| t > keep_files_newer_than);
1079                    let recently_accessed =
1080                        stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
1081
1082                    if p == blobdir && (recently_created || recently_modified || recently_accessed)
1083                    {
1084                        info!(
1085                            context,
1086                            "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
1087                            unreferenced_count,
1088                            entry.file_name(),
1089                        );
1090                        continue;
1091                    }
1092
1093                    info!(
1094                        context,
1095                        "Housekeeping: Deleting unreferenced file #{}: {:?}.",
1096                        unreferenced_count,
1097                        entry.file_name()
1098                    );
1099                    let path = entry.path();
1100                    if let Err(err) = delete_file(context, &path).await {
1101                        error!(
1102                            context,
1103                            "Failed to delete unused file {}: {:#}.",
1104                            path.display(),
1105                            err
1106                        );
1107                    }
1108                }
1109            }
1110            Err(err) => {
1111                if !p.ends_with(BLOBS_BACKUP_NAME) {
1112                    warn!(
1113                        context,
1114                        "Housekeeping: Cannot read dir {}: {:#}.",
1115                        p.display(),
1116                        err
1117                    );
1118                }
1119            }
1120        }
1121    }
1122
1123    Ok(())
1124}
1125
1126fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
1127    let name_to_check = if let Some(namespc) = namespc_opt {
1128        let Some(name) = name.strip_suffix(namespc) else {
1129            return false;
1130        };
1131        name
1132    } else {
1133        name
1134    };
1135    files_in_use.contains(name_to_check)
1136}
1137
1138fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1139    if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1140        files_in_use.insert(file.to_string());
1141    }
1142}
1143
1144async fn maybe_add_from_param(
1145    sql: &Sql,
1146    files_in_use: &mut HashSet<String>,
1147    query: &str,
1148    param_id: Param,
1149) -> Result<()> {
1150    sql.query_map(
1151        query,
1152        (),
1153        |row| {
1154            let row: String = row.get(0)?;
1155            Ok(row)
1156        },
1157        |rows| {
1158            for row in rows {
1159                let param: Params = row?.parse().unwrap_or_default();
1160                if let Some(file) = param.get(param_id) {
1161                    maybe_add_file(files_in_use, file);
1162                }
1163            }
1164            Ok(())
1165        },
1166    )
1167    .await
1168    .context(format!("housekeeping: failed to add_from_param {query}"))?;
1169
1170    Ok(())
1171}
1172
1173/// Removes from the database stale locally deleted messages that also don't
1174/// have a server UID.
1175async fn prune_tombstones(sql: &Sql) -> Result<()> {
1176    // Keep tombstones for the last two days to prevent redownloading locally deleted messages.
1177    let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1178    sql.execute(
1179        "DELETE FROM msgs
1180         WHERE chat_id=?
1181         AND timestamp<=?
1182         AND NOT EXISTS (
1183         SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1184         )",
1185        (DC_CHAT_ID_TRASH, timestamp_max),
1186    )
1187    .await?;
1188    Ok(())
1189}
1190
1191#[cfg(test)]
1192mod sql_tests;