deltachat/sql/pool/
wal_checkpoint.rs

1//! # WAL checkpointing for SQLite connection pool.
2
3use anyhow::{Result, ensure};
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::sql::Sql;
8use crate::tools::{Time, time_elapsed};
9
10use super::Pool;
11
12/// Information about WAL checkpointing call for logging.
13#[derive(Debug)]
14pub(crate) struct WalCheckpointStats {
15    /// Duration of the whole WAL checkpointing.
16    pub total_duration: Duration,
17
18    /// Duration for which WAL checkpointing blocked the writers.
19    pub writers_blocked_duration: Duration,
20
21    /// Duration for which WAL checkpointing blocked the readers.
22    pub readers_blocked_duration: Duration,
23
24    /// Number of pages in WAL before truncating.
25    pub pages_total: i64,
26
27    /// Number of checkpointed WAL pages.
28    ///
29    /// It should be the same as `pages_total`
30    /// unless there are external connections to the database
31    /// that are not in the pool.
32    pub pages_checkpointed: i64,
33}
34
35/// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes.
36pub(super) async fn wal_checkpoint(pool: &Pool) -> Result<WalCheckpointStats> {
37    let t_start = Time::now();
38
39    // Do as much work as possible without blocking anybody.
40    let query_only = true;
41    let conn = pool.get(query_only).await?;
42    tokio::task::block_in_place(|| {
43        // Execute some transaction causing the WAL file to be opened so that the
44        // `wal_checkpoint()` can proceed, otherwise it fails when called the first time,
45        // see https://sqlite.org/forum/forumpost/7512d76a05268fc8.
46        conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
47        conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
48    })?;
49
50    // Kick out writers. `write_mutex` should be locked before taking an `InnerPool.semaphore`
51    // permit to avoid ABBA deadlocks, so drop `conn` which holds a semaphore permit.
52    drop(conn);
53    let _write_lock = Arc::clone(&pool.inner.write_mutex).lock_owned().await;
54    let t_writers_blocked = Time::now();
55    let conn = pool.get(query_only).await?;
56    // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so
57    // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's
58    // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and
59    // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new
60    // readers.
61    let mut read_conns = Vec::with_capacity(Sql::N_DB_CONNECTIONS - 1);
62    for _ in 0..(Sql::N_DB_CONNECTIONS - 1) {
63        read_conns.push(pool.get(query_only).await?);
64    }
65    read_conns.clear();
66    // Checkpoint the remaining WAL pages without blocking readers.
67    let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
68        conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
69        conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
70            let pages_total: i64 = row.get(1)?;
71            let pages_checkpointed: i64 = row.get(2)?;
72            Ok((pages_total, pages_checkpointed))
73        })
74    })?;
75    // Kick out readers to avoid blocking/SQLITE_BUSY.
76    for _ in 0..(Sql::N_DB_CONNECTIONS - 1) {
77        read_conns.push(pool.get(query_only).await?);
78    }
79    let t_readers_blocked = Time::now();
80    tokio::task::block_in_place(|| {
81        let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
82            let blocked: i64 = row.get(0)?;
83            Ok(blocked)
84        })?;
85        ensure!(blocked == 0);
86        Ok(())
87    })?;
88    Ok(WalCheckpointStats {
89        total_duration: time_elapsed(&t_start),
90        writers_blocked_duration: time_elapsed(&t_writers_blocked),
91        readers_blocked_duration: time_elapsed(&t_readers_blocked),
92        pages_total,
93        pages_checkpointed,
94    })
95}