deltachat/
sql.rs

1//! # SQLite wrapper.
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5
6use anyhow::{Context as _, Result, bail, ensure};
7use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
8use tokio::sync::RwLock;
9
10use crate::blob::BlobObject;
11use crate::chat::add_device_msg;
12use crate::config::Config;
13use crate::constants::DC_CHAT_ID_TRASH;
14use crate::context::Context;
15use crate::debug_logging::set_debug_logging_xdc;
16use crate::ephemeral::start_ephemeral_timers;
17use crate::imex::BLOBS_BACKUP_NAME;
18use crate::location::delete_orphaned_poi_locations;
19use crate::log::{LogExt, warn};
20use crate::message::{Message, MsgId};
21use crate::net::dns::prune_dns_cache;
22use crate::net::http::http_cache_cleanup;
23use crate::net::prune_connection_history;
24use crate::param::{Param, Params};
25use crate::stock_str;
26use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
27
28/// Extension to [`rusqlite::ToSql`] trait
29/// which also includes [`Send`] and [`Sync`].
30pub trait ToSql: rusqlite::ToSql + Send + Sync {}
31
32impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
33
34/// Constructs a slice of trait object references `&dyn ToSql`.
35///
36/// One of the uses is passing more than 16 parameters
37/// to a query, because [`rusqlite::Params`] is only implemented
38/// for tuples of up to 16 elements.
39#[macro_export]
40macro_rules! params_slice {
41    ($($param:expr),+) => {
42        [$(&$param as &dyn $crate::sql::ToSql),+]
43    };
44}
45
46mod migrations;
47mod pool;
48
49use pool::Pool;
50
51/// A wrapper around the underlying Sqlite3 object.
52#[derive(Debug)]
53pub struct Sql {
54    /// Database file path
55    pub(crate) dbfile: PathBuf,
56
57    /// SQL connection pool.
58    pool: RwLock<Option<Pool>>,
59
60    /// None if the database is not open, true if it is open with passphrase and false if it is
61    /// open without a passphrase.
62    is_encrypted: RwLock<Option<bool>>,
63
64    /// Cache of `config` table.
65    pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
66}
67
68impl Sql {
69    /// Creates new SQL database.
70    pub fn new(dbfile: PathBuf) -> Sql {
71        Self {
72            dbfile,
73            pool: Default::default(),
74            is_encrypted: Default::default(),
75            config_cache: Default::default(),
76        }
77    }
78
79    /// Tests SQLCipher passphrase.
80    ///
81    /// Returns true if passphrase is correct, i.e. the database is new or can be unlocked with
82    /// this passphrase, and false if the database is already encrypted with another passphrase or
83    /// corrupted.
84    ///
85    /// Fails if database is already open.
86    pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
87        if self.is_open().await {
88            bail!("Database is already opened.");
89        }
90
91        // Hold the lock to prevent other thread from opening the database.
92        let _lock = self.pool.write().await;
93
94        // Test that the key is correct using a single connection.
95        let connection = Connection::open(&self.dbfile)?;
96        if !passphrase.is_empty() {
97            connection
98                .pragma_update(None, "key", &passphrase)
99                .context("Failed to set PRAGMA key")?;
100        }
101        let key_is_correct = connection
102            .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
103            .is_ok();
104
105        Ok(key_is_correct)
106    }
107
108    /// Checks if there is currently a connection to the underlying Sqlite database.
109    pub async fn is_open(&self) -> bool {
110        self.pool.read().await.is_some()
111    }
112
113    /// Returns true if the database is encrypted.
114    ///
115    /// If database is not open, returns `None`.
116    pub(crate) async fn is_encrypted(&self) -> Option<bool> {
117        *self.is_encrypted.read().await
118    }
119
120    /// Closes all underlying Sqlite connections.
121    pub(crate) async fn close(&self) {
122        let _ = self.pool.write().await.take();
123        // drop closes the connection
124    }
125
126    /// Imports the database from a separate file with the given passphrase.
127    pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
128        let path_str = path
129            .to_str()
130            .with_context(|| format!("path {path:?} is not valid unicode"))?
131            .to_string();
132
133        // Keep `config_cache` locked all the time the db is imported so that nobody can use invalid
134        // values from there. And clear it immediately so as not to forget in case of errors.
135        let mut config_cache = self.config_cache.write().await;
136        config_cache.clear();
137
138        let query_only = false;
139        self.call(query_only, move |conn| {
140            // Check that backup passphrase is correct before resetting our database.
141            conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
142                .context("failed to attach backup database")?;
143            let res = conn
144                .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
145                .context("backup passphrase is not correct");
146
147            // Reset the database without reopening it. We don't want to reopen the database because we
148            // don't have main database passphrase at this point.
149            // See <https://sqlite.org/c3ref/c_dbconfig_enable_fkey.html> for documentation.
150            // Without resetting import may fail due to existing tables.
151            res.and_then(|_| {
152                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
153                    .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
154            })
155            .and_then(|_| {
156                conn.execute("VACUUM", [])
157                    .context("failed to vacuum the database")
158            })
159            .and(
160                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
161                    .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
162            )
163            .and_then(|_| {
164                conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
165                    Ok(())
166                })
167                .context("failed to import from attached backup database")
168            })
169            .and(
170                conn.execute("DETACH DATABASE backup", [])
171                    .context("failed to detach backup database"),
172            )?;
173            Ok(())
174        })
175        .await
176    }
177
178    const N_DB_CONNECTIONS: usize = 3;
179
180    /// Creates a new connection pool.
181    fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
182        let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
183        for _ in 0..Self::N_DB_CONNECTIONS {
184            let connection = new_connection(dbfile, &passphrase)?;
185            connections.push(connection);
186        }
187
188        let pool = Pool::new(connections);
189        Ok(pool)
190    }
191
192    async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
193        *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
194
195        if let Err(e) = self.run_migrations(context).await {
196            error!(context, "Running migrations failed: {e:#}");
197            // Emiting an error event probably doesn't work
198            // because we are in the process of opening the context,
199            // so there is no event emitter yet.
200            // So, try to report the error in other ways:
201            eprintln!("Running migrations failed: {e:#}");
202            context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
203            // We can't simply close the db for two reasons:
204            // a. backup export would fail
205            // b. The UI would think that the account is unconfigured (because `is_configured()` fails)
206            // and remove the account when the user presses "Back"
207        }
208
209        Ok(())
210    }
211
212    /// Updates SQL schema to the latest version.
213    pub async fn run_migrations(&self, context: &Context) -> Result<()> {
214        // (1) update low-level database structure.
215        // this should be done before updates that use high-level objects that
216        // rely themselves on the low-level structure.
217
218        // `update_icons` is not used anymore, since it's not necessary anymore to "update" icons:
219        let (_update_icons, disable_server_delete, recode_avatar) = migrations::run(context, self)
220            .await
221            .context("failed to run migrations")?;
222
223        // (2) updates that require high-level objects
224        // the structure is complete now and all objects are usable
225
226        if disable_server_delete {
227            // We now always watch all folders and delete messages there if delete_server is enabled.
228            // So, for people who have delete_server enabled, disable it and add a hint to the devicechat:
229            if context.get_config_delete_server_after().await?.is_some() {
230                let mut msg = Message::new_text(stock_str::delete_server_turned_off(context).await);
231                add_device_msg(context, None, Some(&mut msg)).await?;
232                context
233                    .set_config_internal(Config::DeleteServerAfter, Some("0"))
234                    .await?;
235            }
236        }
237
238        if recode_avatar {
239            if let Some(avatar) = context.get_config(Config::Selfavatar).await? {
240                let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
241                match blob.recode_to_avatar_size(context).await {
242                    Ok(()) => {
243                        if let Some(path) = blob.to_abs_path().to_str() {
244                            context
245                                .set_config_internal(Config::Selfavatar, Some(path))
246                                .await?;
247                        } else {
248                            warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
249                        }
250                    }
251                    Err(e) => {
252                        warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
253                        context
254                            .set_config_internal(Config::Selfavatar, None)
255                            .await?
256                    }
257                }
258            }
259        }
260
261        Ok(())
262    }
263
264    /// Opens the provided database and runs any necessary migrations.
265    /// If a database is already open, this will return an error.
266    pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
267        if self.is_open().await {
268            error!(
269                context,
270                "Cannot open, database \"{:?}\" already opened.", self.dbfile,
271            );
272            bail!("SQL database is already opened.");
273        }
274
275        let passphrase_nonempty = !passphrase.is_empty();
276        self.try_open(context, &self.dbfile, passphrase).await?;
277        info!(context, "Opened database {:?}.", self.dbfile);
278        *self.is_encrypted.write().await = Some(passphrase_nonempty);
279
280        // setup debug logging if there is an entry containing its id
281        if let Some(xdc_id) = self
282            .get_raw_config_u32(Config::DebugLogging.as_ref())
283            .await?
284        {
285            set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
286        }
287        Ok(())
288    }
289
290    /// Changes the passphrase of encrypted database.
291    ///
292    /// The database must already be encrypted and the passphrase cannot be empty.
293    /// It is impossible to turn encrypted database into unencrypted
294    /// and vice versa this way, use import/export for this.
295    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
296        let mut lock = self.pool.write().await;
297
298        let pool = lock.take().context("SQL connection pool is not open")?;
299        let query_only = false;
300        let conn = pool.get(query_only).await?;
301        if !passphrase.is_empty() {
302            conn.pragma_update(None, "rekey", passphrase.clone())
303                .context("Failed to set PRAGMA rekey")?;
304        }
305        drop(pool);
306
307        *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
308
309        Ok(())
310    }
311
312    /// Allocates a connection and calls `function` with the connection.
313    ///
314    /// If `query_only` is true, allocates read-only connection,
315    /// otherwise allocates write connection.
316    ///
317    /// Returns the result of the function.
318    async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
319    where
320        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
321        R: Send + 'static,
322    {
323        let lock = self.pool.read().await;
324        let pool = lock.as_ref().context("no SQL connection")?;
325        let mut conn = pool.get(query_only).await?;
326        let res = tokio::task::block_in_place(move || function(&mut conn))?;
327        Ok(res)
328    }
329
330    /// Allocates a connection and calls given function, assuming it does write queries, with the
331    /// connection.
332    ///
333    /// Returns the result of the function.
334    pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
335    where
336        F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
337        R: Send + 'static,
338    {
339        let query_only = false;
340        self.call(query_only, function).await
341    }
342
343    /// Execute `query` assuming it is a write query, returning the number of affected rows.
344    pub async fn execute(
345        &self,
346        query: &str,
347        params: impl rusqlite::Params + Send,
348    ) -> Result<usize> {
349        self.call_write(move |conn| {
350            let res = conn.execute(query, params)?;
351            Ok(res)
352        })
353        .await
354    }
355
356    /// Executes the given query, returning the last inserted row ID.
357    pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
358        self.call_write(move |conn| {
359            conn.execute(query, params)?;
360            Ok(conn.last_insert_rowid())
361        })
362        .await
363    }
364
365    /// Prepares and executes the statement and maps a function over the resulting rows.
366    /// Then executes the second function over the returned iterator and returns the
367    /// result of that function.
368    pub async fn query_map<T, F, G, H>(
369        &self,
370        sql: &str,
371        params: impl rusqlite::Params + Send,
372        f: F,
373        g: G,
374    ) -> Result<H>
375    where
376        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
377        G: Send + FnOnce(rusqlite::AndThenRows<F>) -> Result<H>,
378        H: Send + 'static,
379    {
380        let query_only = true;
381        self.call(query_only, move |conn| {
382            let mut stmt = conn.prepare(sql)?;
383            let res = stmt.query_and_then(params, f)?;
384            g(res)
385        })
386        .await
387    }
388
389    /// Prepares and executes the statement and maps a function over the resulting rows.
390    ///
391    /// Collects the resulting rows into a generic structure.
392    pub async fn query_map_collect<T, C, F>(
393        &self,
394        sql: &str,
395        params: impl rusqlite::Params + Send,
396        f: F,
397    ) -> Result<C>
398    where
399        T: Send + 'static,
400        C: Send + 'static + std::iter::FromIterator<T>,
401        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
402    {
403        self.query_map(sql, params, f, |rows| {
404            rows.collect::<std::result::Result<C, _>>()
405        })
406        .await
407    }
408
409    /// Prepares and executes the statement and maps a function over the resulting rows.
410    ///
411    /// Collects the resulting rows into a `Vec`.
412    pub async fn query_map_vec<T, F>(
413        &self,
414        sql: &str,
415        params: impl rusqlite::Params + Send,
416        f: F,
417    ) -> Result<Vec<T>>
418    where
419        T: Send + 'static,
420        F: Send + FnMut(&rusqlite::Row) -> Result<T>,
421    {
422        self.query_map_collect(sql, params, f).await
423    }
424
425    /// Used for executing `SELECT COUNT` statements only. Returns the resulting count.
426    pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
427        let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
428        Ok(usize::try_from(count)?)
429    }
430
431    /// Used for executing `SELECT COUNT` statements only. Returns `true`, if the count is at least
432    /// one, `false` otherwise.
433    pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
434        let count = self.count(sql, params).await?;
435        Ok(count > 0)
436    }
437
438    /// Execute a query which is expected to return one row.
439    pub async fn query_row<T, F>(
440        &self,
441        query: &str,
442        params: impl rusqlite::Params + Send,
443        f: F,
444    ) -> Result<T>
445    where
446        F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
447        T: Send + 'static,
448    {
449        let query_only = true;
450        self.call(query_only, move |conn| {
451            let res = conn.query_row(query, params, f)?;
452            Ok(res)
453        })
454        .await
455    }
456
457    /// Execute the function inside a transaction assuming that it does writes.
458    ///
459    /// If the function returns an error, the transaction will be rolled back. If it does not return an
460    /// error, the transaction will be committed.
461    pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
462    where
463        H: Send + 'static,
464        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
465    {
466        let query_only = false;
467        self.transaction_ex(query_only, callback).await
468    }
469
470    /// Execute the function inside a transaction.
471    ///
472    /// * `query_only` - Whether the function only executes read statements (queries) and can be run
473    ///   in parallel with other transactions. NB: Creating and modifying temporary tables are also
474    ///   allowed with `query_only`, temporary tables aren't visible in other connections, but you
475    ///   need to pass `PRAGMA query_only=0;` to SQLite before that:
476    ///   ```text
477    ///   pragma_update(None, "query_only", "0")
478    ///   ```
479    ///   Also temporary tables need to be dropped because the connection is returned to the pool
480    ///   then.
481    ///
482    /// If the function returns an error, the transaction will be rolled back. If it does not return
483    /// an error, the transaction will be committed.
484    pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
485    where
486        H: Send + 'static,
487        G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
488    {
489        self.call(query_only, move |conn| {
490            let mut transaction = conn.transaction()?;
491            let ret = callback(&mut transaction);
492
493            match ret {
494                Ok(ret) => {
495                    transaction.commit()?;
496                    Ok(ret)
497                }
498                Err(err) => {
499                    transaction.rollback()?;
500                    Err(err)
501                }
502            }
503        })
504        .await
505    }
506
507    /// Query the database if the requested table already exists.
508    pub async fn table_exists(&self, name: &str) -> Result<bool> {
509        let query_only = true;
510        self.call(query_only, move |conn| {
511            let mut exists = false;
512            conn.pragma(None, "table_info", name.to_string(), |_row| {
513                // will only be executed if the info was found
514                exists = true;
515                Ok(())
516            })?;
517
518            Ok(exists)
519        })
520        .await
521    }
522
523    /// Check if a column exists in a given table.
524    pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
525        let query_only = true;
526        self.call(query_only, move |conn| {
527            let mut exists = false;
528            // `PRAGMA table_info` returns one row per column,
529            // each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
530            conn.pragma(None, "table_info", table_name.to_string(), |row| {
531                let curr_name: String = row.get(1)?;
532                if col_name == curr_name {
533                    exists = true;
534                }
535                Ok(())
536            })?;
537
538            Ok(exists)
539        })
540        .await
541    }
542
543    /// Execute a query which is expected to return zero or one row.
544    pub async fn query_row_optional<T, F>(
545        &self,
546        sql: &str,
547        params: impl rusqlite::Params + Send,
548        f: F,
549    ) -> Result<Option<T>>
550    where
551        F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
552        T: Send + 'static,
553    {
554        let query_only = true;
555        self.call(query_only, move |conn| {
556            match conn.query_row(sql.as_ref(), params, f) {
557                Ok(res) => Ok(Some(res)),
558                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
559                Err(err) => Err(err.into()),
560            }
561        })
562        .await
563    }
564
565    /// Executes a query which is expected to return one row and one
566    /// column. If the query does not return any rows, returns `Ok(None)`.
567    pub async fn query_get_value<T>(
568        &self,
569        query: &str,
570        params: impl rusqlite::Params + Send,
571    ) -> Result<Option<T>>
572    where
573        T: rusqlite::types::FromSql + Send + 'static,
574    {
575        self.query_row_optional(query, params, |row| row.get::<_, T>(0))
576            .await
577    }
578
579    /// Set private configuration options.
580    ///
581    /// Setting `None` deletes the value.  On failure an error message
582    /// will already have been logged.
583    pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
584        let mut lock = self.config_cache.write().await;
585        if let Some(value) = value {
586            self.execute(
587                "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
588                (key, value),
589            )
590            .await?;
591        } else {
592            self.execute("DELETE FROM config WHERE keyname=?", (key,))
593                .await?;
594        }
595        lock.insert(key.to_string(), value.map(|s| s.to_string()));
596        drop(lock);
597
598        Ok(())
599    }
600
601    /// Get configuration options from the database.
602    pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
603        let lock = self.config_cache.read().await;
604        let cached = lock.get(key).cloned();
605        drop(lock);
606
607        if let Some(c) = cached {
608            return Ok(c);
609        }
610
611        let mut lock = self.config_cache.write().await;
612        let value = self
613            .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
614            .await
615            .context(format!("failed to fetch raw config: {key}"))?;
616        lock.insert(key.to_string(), value.clone());
617        drop(lock);
618
619        Ok(value)
620    }
621
622    /// Removes the `key`'s value from the cache.
623    pub(crate) async fn uncache_raw_config(&self, key: &str) {
624        let mut lock = self.config_cache.write().await;
625        lock.remove(key);
626    }
627
628    /// Sets configuration for the given key to 32-bit signed integer value.
629    pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
630        self.set_raw_config(key, Some(&format!("{value}"))).await
631    }
632
633    /// Returns 32-bit signed integer configuration value for the given key.
634    pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
635        self.get_raw_config(key)
636            .await
637            .map(|s| s.and_then(|s| s.parse().ok()))
638    }
639
640    /// Returns 32-bit unsigned integer configuration value for the given key.
641    pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
642        self.get_raw_config(key)
643            .await
644            .map(|s| s.and_then(|s| s.parse().ok()))
645    }
646
647    /// Returns boolean configuration value for the given key.
648    pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
649        // Not the most obvious way to encode bool as string, but it is matter
650        // of backward compatibility.
651        let res = self.get_raw_config_int(key).await?;
652        Ok(res.unwrap_or_default() > 0)
653    }
654
655    /// Sets configuration for the given key to boolean value.
656    pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
657        let value = if value { Some("1") } else { None };
658        self.set_raw_config(key, value).await
659    }
660
661    /// Sets configuration for the given key to 64-bit signed integer value.
662    pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
663        self.set_raw_config(key, Some(&format!("{value}"))).await
664    }
665
666    /// Returns 64-bit signed integer configuration value for the given key.
667    pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
668        self.get_raw_config(key)
669            .await
670            .map(|s| s.and_then(|r| r.parse().ok()))
671    }
672
673    /// Returns configuration cache.
674    #[cfg(feature = "internals")]
675    pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
676        &self.config_cache
677    }
678
679    /// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes.
680    pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
681        let t_start = Time::now();
682        let lock = context.sql.pool.read().await;
683        let Some(pool) = lock.as_ref() else {
684            // No db connections, nothing to checkpoint.
685            return Ok(());
686        };
687
688        // Do as much work as possible without blocking anybody.
689        let query_only = true;
690        let conn = pool.get(query_only).await?;
691        tokio::task::block_in_place(|| {
692            // Execute some transaction causing the WAL file to be opened so that the
693            // `wal_checkpoint()` can proceed, otherwise it fails when called the first time,
694            // see https://sqlite.org/forum/forumpost/7512d76a05268fc8.
695            conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
696            conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
697        })?;
698
699        // Kick out writers.
700        const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
701        let _write_lock = pool.write_lock().await;
702        let t_writers_blocked = Time::now();
703        // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so
704        // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's
705        // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and
706        // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new
707        // readers.
708        let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
709        for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
710            read_conns.push(pool.get(query_only).await?);
711        }
712        read_conns.clear();
713        // Checkpoint the remaining WAL pages without blocking readers.
714        let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
715            conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
716                let pages_total: i64 = row.get(1)?;
717                let pages_checkpointed: i64 = row.get(2)?;
718                Ok((pages_total, pages_checkpointed))
719            })
720        })?;
721        if pages_checkpointed < pages_total {
722            warn!(
723                context,
724                "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
725            );
726        }
727        // Kick out readers to avoid blocking/SQLITE_BUSY.
728        for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
729            read_conns.push(pool.get(query_only).await?);
730        }
731        let t_readers_blocked = Time::now();
732        tokio::task::block_in_place(|| {
733            let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
734                let blocked: i64 = row.get(0)?;
735                Ok(blocked)
736            })?;
737            ensure!(blocked == 0);
738            Ok(())
739        })?;
740        info!(
741            context,
742            "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
743            time_elapsed(&t_start),
744            time_elapsed(&t_writers_blocked),
745            time_elapsed(&t_readers_blocked),
746        );
747        Ok(())
748    }
749}
750
751/// Creates a new SQLite connection.
752///
753/// `path` is the database path.
754///
755/// `passphrase` is the SQLCipher database passphrase.
756/// Empty string if database is not encrypted.
757fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
758    let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
759        | OpenFlags::SQLITE_OPEN_READ_WRITE
760        | OpenFlags::SQLITE_OPEN_CREATE;
761    let conn = Connection::open_with_flags(path, flags)?;
762    conn.execute_batch(
763        "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
764         PRAGMA secure_delete=on;
765         PRAGMA busy_timeout = 0; -- fail immediately
766         PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
767         PRAGMA foreign_keys=on;
768         ",
769    )?;
770
771    // Avoid SQLITE_IOERR_GETTEMPPATH errors on Android and maybe other systems.
772    // Downside is more RAM consumption esp. on VACUUM.
773    // Therefore, on systems known to have working default (using files), stay with that.
774    if cfg!(not(target_os = "ios")) {
775        conn.pragma_update(None, "temp_store", "memory")?;
776    }
777
778    if !passphrase.is_empty() {
779        conn.pragma_update(None, "key", passphrase)?;
780    }
781    // Try to enable auto_vacuum. This will only be
782    // applied if the database is new or after successful
783    // VACUUM, which usually happens before backup export.
784    // When auto_vacuum is INCREMENTAL, it is possible to
785    // use PRAGMA incremental_vacuum to return unused
786    // database pages to the filesystem.
787    conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
788
789    conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
790    // Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
791    conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
792
793    Ok(conn)
794}
795
796// Tries to clear the freelist to free some space on the disk.
797//
798// This only works if auto_vacuum is enabled.
799async fn incremental_vacuum(context: &Context) -> Result<()> {
800    context
801        .sql
802        .call_write(move |conn| {
803            let mut stmt = conn
804                .prepare("PRAGMA incremental_vacuum")
805                .context("Failed to prepare incremental_vacuum statement")?;
806
807            // It is important to step the statement until it returns no more rows.
808            // Otherwise it will not free as many pages as it can:
809            // <https://stackoverflow.com/questions/53746807/sqlite-incremental-vacuum-removing-only-one-free-page>.
810            let mut rows = stmt
811                .query(())
812                .context("Failed to run incremental_vacuum statement")?;
813            let mut row_count = 0;
814            while let Some(_row) = rows
815                .next()
816                .context("Failed to step incremental_vacuum statement")?
817            {
818                row_count += 1;
819            }
820            info!(context, "Incremental vacuum freed {row_count} pages.");
821            Ok(())
822        })
823        .await
824}
825
826/// Cleanup the account to restore some storage and optimize the database.
827pub async fn housekeeping(context: &Context) -> Result<()> {
828    // Setting `Config::LastHousekeeping` at the beginning avoids endless loops when things do not
829    // work out for whatever reason or are interrupted by the OS.
830    if let Err(e) = context
831        .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
832        .await
833    {
834        warn!(context, "Can't set config: {e:#}.");
835    }
836
837    http_cache_cleanup(context)
838        .await
839        .context("Failed to cleanup HTTP cache")
840        .log_err(context)
841        .ok();
842    migrations::msgs_to_key_contacts(context)
843        .await
844        .context("migrations::msgs_to_key_contacts")
845        .log_err(context)
846        .ok();
847
848    if let Err(err) = remove_unused_files(context).await {
849        warn!(
850            context,
851            "Housekeeping: cannot remove unused files: {:#}.", err
852        );
853    }
854
855    if let Err(err) = start_ephemeral_timers(context).await {
856        warn!(
857            context,
858            "Housekeeping: cannot start ephemeral timers: {:#}.", err
859        );
860    }
861
862    if let Err(err) = prune_tombstones(&context.sql).await {
863        warn!(
864            context,
865            "Housekeeping: Cannot prune message tombstones: {:#}.", err
866        );
867    }
868
869    if let Err(err) = incremental_vacuum(context).await {
870        warn!(context, "Failed to run incremental vacuum: {err:#}.");
871    }
872    // Work around possible checkpoint starvations (there were cases reported when a WAL file is
873    // bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does
874    // not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see
875    // https://www.sqlite.org/wal.html.
876    if let Err(err) = Sql::wal_checkpoint(context).await {
877        warn!(context, "wal_checkpoint() failed: {err:#}.");
878        debug_assert!(false);
879    }
880
881    context
882        .sql
883        .execute(
884            "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
885            (SELECT id FROM msgs WHERE chat_id!=?)",
886            (DC_CHAT_ID_TRASH,),
887        )
888        .await
889        .context("failed to remove old MDNs")
890        .log_err(context)
891        .ok();
892
893    context
894        .sql
895        .execute(
896            "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
897            (SELECT id FROM msgs WHERE chat_id!=?)",
898            (DC_CHAT_ID_TRASH,),
899        )
900        .await
901        .context("failed to remove old webxdc status updates")
902        .log_err(context)
903        .ok();
904
905    prune_connection_history(context)
906        .await
907        .context("Failed to prune connection history")
908        .log_err(context)
909        .ok();
910    prune_dns_cache(context)
911        .await
912        .context("Failed to prune DNS cache")
913        .log_err(context)
914        .ok();
915
916    // Delete POI locations
917    // which don't have corresponding message.
918    delete_orphaned_poi_locations(context)
919        .await
920        .context("Failed to delete orphaned POI locations")
921        .log_err(context)
922        .ok();
923
924    info!(context, "Housekeeping done.");
925    Ok(())
926}
927
928/// Get the value of a column `idx` of the `row` as `Vec<u8>`.
929pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
930    row.get(idx).or_else(|err| match row.get_ref(idx)? {
931        ValueRef::Null => Ok(Vec::new()),
932        ValueRef::Text(text) => Ok(text.to_vec()),
933        ValueRef::Blob(blob) => Ok(blob.to_vec()),
934        ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
935    })
936}
937
938/// Enumerates used files in the blobdir and removes unused ones.
939pub async fn remove_unused_files(context: &Context) -> Result<()> {
940    let mut files_in_use = HashSet::new();
941    let mut unreferenced_count = 0;
942
943    info!(context, "Start housekeeping...");
944    maybe_add_from_param(
945        &context.sql,
946        &mut files_in_use,
947        "SELECT param FROM msgs  WHERE chat_id!=3   AND type!=10;",
948        Param::File,
949    )
950    .await?;
951    maybe_add_from_param(
952        &context.sql,
953        &mut files_in_use,
954        "SELECT param FROM chats;",
955        Param::ProfileImage,
956    )
957    .await?;
958    maybe_add_from_param(
959        &context.sql,
960        &mut files_in_use,
961        "SELECT param FROM contacts;",
962        Param::ProfileImage,
963    )
964    .await?;
965
966    context
967        .sql
968        .query_map(
969            "SELECT value FROM config;",
970            (),
971            |row| {
972                let row: String = row.get(0)?;
973                Ok(row)
974            },
975            |rows| {
976                for row in rows {
977                    maybe_add_file(&mut files_in_use, &row?);
978                }
979                Ok(())
980            },
981        )
982        .await
983        .context("housekeeping: failed to SELECT value FROM config")?;
984
985    context
986        .sql
987        .query_map(
988            "SELECT blobname FROM http_cache",
989            (),
990            |row| {
991                let row: String = row.get(0)?;
992                Ok(row)
993            },
994            |rows| {
995                for row in rows {
996                    maybe_add_file(&mut files_in_use, &row?);
997                }
998                Ok(())
999            },
1000        )
1001        .await
1002        .context("Failed to SELECT blobname FROM http_cache")?;
1003
1004    info!(context, "{} files in use.", files_in_use.len());
1005    /* go through directories and delete unused files */
1006    let blobdir = context.get_blobdir();
1007    for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
1008        match tokio::fs::read_dir(p).await {
1009            Ok(mut dir_handle) => {
1010                /* avoid deletion of files that are just created to build a message object */
1011                let diff = std::time::Duration::from_secs(60 * 60);
1012                let keep_files_newer_than = SystemTime::now()
1013                    .checked_sub(diff)
1014                    .unwrap_or(SystemTime::UNIX_EPOCH);
1015
1016                while let Ok(Some(entry)) = dir_handle.next_entry().await {
1017                    let name_f = entry.file_name();
1018                    let name_s = name_f.to_string_lossy();
1019
1020                    if p == blobdir
1021                        && (is_file_in_use(&files_in_use, None, &name_s)
1022                            || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
1023                            || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
1024                    {
1025                        continue;
1026                    }
1027
1028                    let stats = match tokio::fs::metadata(entry.path()).await {
1029                        Err(err) => {
1030                            warn!(
1031                                context,
1032                                "Cannot get metadata for {}: {:#}.",
1033                                entry.path().display(),
1034                                err
1035                            );
1036                            continue;
1037                        }
1038                        Ok(stats) => stats,
1039                    };
1040
1041                    if stats.is_dir() {
1042                        if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
1043                            // The dir could be created not by a user, but by a desktop
1044                            // environment f.e. So, no warning.
1045                            info!(
1046                                context,
1047                                "Housekeeping: Cannot rmdir {}: {:#}.",
1048                                entry.path().display(),
1049                                e
1050                            );
1051                        }
1052                        continue;
1053                    }
1054
1055                    unreferenced_count += 1;
1056                    let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
1057                    let recently_modified =
1058                        stats.modified().is_ok_and(|t| t > keep_files_newer_than);
1059                    let recently_accessed =
1060                        stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
1061
1062                    if p == blobdir && (recently_created || recently_modified || recently_accessed)
1063                    {
1064                        info!(
1065                            context,
1066                            "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
1067                            unreferenced_count,
1068                            entry.file_name(),
1069                        );
1070                        continue;
1071                    }
1072
1073                    info!(
1074                        context,
1075                        "Housekeeping: Deleting unreferenced file #{}: {:?}.",
1076                        unreferenced_count,
1077                        entry.file_name()
1078                    );
1079                    let path = entry.path();
1080                    if let Err(err) = delete_file(context, &path).await {
1081                        error!(
1082                            context,
1083                            "Failed to delete unused file {}: {:#}.",
1084                            path.display(),
1085                            err
1086                        );
1087                    }
1088                }
1089            }
1090            Err(err) => {
1091                if !p.ends_with(BLOBS_BACKUP_NAME) {
1092                    warn!(
1093                        context,
1094                        "Housekeeping: Cannot read dir {}: {:#}.",
1095                        p.display(),
1096                        err
1097                    );
1098                }
1099            }
1100        }
1101    }
1102
1103    Ok(())
1104}
1105
1106fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
1107    let name_to_check = if let Some(namespc) = namespc_opt {
1108        let Some(name) = name.strip_suffix(namespc) else {
1109            return false;
1110        };
1111        name
1112    } else {
1113        name
1114    };
1115    files_in_use.contains(name_to_check)
1116}
1117
1118fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1119    if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1120        files_in_use.insert(file.to_string());
1121    }
1122}
1123
1124async fn maybe_add_from_param(
1125    sql: &Sql,
1126    files_in_use: &mut HashSet<String>,
1127    query: &str,
1128    param_id: Param,
1129) -> Result<()> {
1130    sql.query_map(
1131        query,
1132        (),
1133        |row| {
1134            let row: String = row.get(0)?;
1135            Ok(row)
1136        },
1137        |rows| {
1138            for row in rows {
1139                let param: Params = row?.parse().unwrap_or_default();
1140                if let Some(file) = param.get(param_id) {
1141                    maybe_add_file(files_in_use, file);
1142                }
1143            }
1144            Ok(())
1145        },
1146    )
1147    .await
1148    .context(format!("housekeeping: failed to add_from_param {query}"))?;
1149
1150    Ok(())
1151}
1152
1153/// Removes from the database stale locally deleted messages that also don't
1154/// have a server UID.
1155async fn prune_tombstones(sql: &Sql) -> Result<()> {
1156    // Keep tombstones for the last two days to prevent redownloading locally deleted messages.
1157    let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1158    sql.execute(
1159        "DELETE FROM msgs
1160         WHERE chat_id=?
1161         AND timestamp<=?
1162         AND NOT EXISTS (
1163         SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1164         )",
1165        (DC_CHAT_ID_TRASH, timestamp_max),
1166    )
1167    .await?;
1168    Ok(())
1169}
1170
1171#[cfg(test)]
1172mod sql_tests;