deltachat/sql/
pool.rs

1//! # SQLite connection pool.
2//!
3//! The connection pool holds a number of SQLite connections and allows to allocate them.
4//! When allocated connection is dropped, underlying connection is returned back to the pool.
5//!
6//! The pool is organized as a stack. It always allocates the most recently used connection.
7//! Each SQLite connection has its own page cache, so allocating recently used connections
8//! improves the performance compared to, for example, organizing the pool as a queue
9//! and returning the least recently used connection each time.
10//!
11//! Pool returns at most one write connection (with `PRAGMA query_only=0`).
12//! This ensures that there never are multiple write transactions at once.
13//!
14//! Doing the locking ourselves instead of relying on SQLite has these reasons:
15//!
16//! - SQLite's locking mechanism is non-async, blocking a thread
17//! - SQLite's locking mechanism just sleeps in a loop, which is really inefficient
18//!
19//! ---
20//!
21//! More considerations on alternatives to the current approach:
22//!
23//! We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions.
24//!
25//! In order to never get concurrency issues, we could make all transactions IMMEDIATE,
26//! but this would mean that there can never be two simultaneous transactions.
27//!
28//! Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks.
29//!
30//! DEFERRED write transactions without doing the locking ourselves would have these drawbacks:
31//!
32//! 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop.
33//! 2. If there are other write transactions, we block the db connection until
34//!    upgraded. If some reader comes then, it has to get the next, less used connection with a
35//!    worse per-connection page cache (SQLite allows one write and any number of reads in parallel).
36//! 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY.
37//! 4. If upon a successful upgrade to a write transaction the db has been modified,
38//!    the transaction has to be rolled back and retried, which means extra work in terms of
39//!    CPU/battery.
40//!
41//! The only pro of making write transactions DEFERRED w/o the external locking would be some
42//! parallelism between them.
43//!
44//! Another option would be to make write transactions IMMEDIATE, also
45//! w/o the external locking. But then cons 1. - 3. above would still be valid.
46
47use std::ops::{Deref, DerefMut};
48use std::sync::{Arc, Weak};
49
50use anyhow::{Context, Result};
51use rusqlite::Connection;
52use tokio::sync::{Mutex, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore};
53
54mod wal_checkpoint;
55pub(crate) use wal_checkpoint::WalCheckpointStats;
56
57/// Inner connection pool.
58#[derive(Debug)]
59struct InnerPool {
60    /// Available connections.
61    connections: parking_lot::Mutex<Vec<Connection>>,
62
63    /// Counts the number of available connections.
64    semaphore: Arc<Semaphore>,
65
66    /// Write mutex.
67    ///
68    /// This mutex ensures there is at most
69    /// one write connection with `query_only=0`.
70    ///
71    /// This mutex is locked when write connection
72    /// is outside the pool.
73    pub(crate) write_mutex: Arc<Mutex<()>>,
74
75    /// WAL checkpointing mutex.
76    ///
77    /// This mutex ensures that no more than one thread
78    /// runs WAL checkpointing at the same time.
79    ///
80    /// Normal procedures acquire either one read connection
81    /// or one write connection with a write mutex,
82    /// and return the resources without trying to acquire
83    /// more connections or trying to acquire write mutex
84    /// without returning the read connection first.
85    /// WAL checkpointing is special, it tries to acquire all
86    /// connections and the write mutex,
87    /// so two threads doing this at the same time
88    /// may result in a deadlock with one thread
89    /// waiting for a write lock and the other thread
90    /// waiting for a connection.
91    wal_checkpoint_mutex: Mutex<()>,
92}
93
94impl InnerPool {
95    /// Puts a connection into the pool.
96    ///
97    /// The connection could be new or returned back.
98    fn put(&self, connection: Connection) {
99        let mut connections = self.connections.lock();
100        connections.push(connection);
101        drop(connections);
102    }
103
104    /// Retrieves a connection from the pool.
105    ///
106    /// Sets `query_only` pragma to the provided value
107    /// to prevent accidental misuse of connection
108    /// for writing when reading is intended.
109    /// Only pass `query_only=false` if you want
110    /// to use the connection for writing.
111    pub async fn get(self: Arc<Self>, query_only: bool) -> Result<PooledConnection> {
112        if query_only {
113            let permit = self.semaphore.clone().acquire_owned().await?;
114            let conn = {
115                let mut connections = self.connections.lock();
116                connections
117                    .pop()
118                    .context("Got a permit when there are no connections in the pool")?
119            };
120            let conn = PooledConnection {
121                pool: Arc::downgrade(&self),
122                conn: Some(conn),
123                _permit: permit,
124                _write_mutex_guard: None,
125            };
126            conn.pragma_update(None, "query_only", "1")?;
127            Ok(conn)
128        } else {
129            // We get write guard first to avoid taking a permit
130            // and not using it, blocking a reader from getting a connection
131            // while being ourselves blocked by another wrtier.
132            let write_mutex_guard = Arc::clone(&self.write_mutex).lock_owned().await;
133
134            // We may still have to wait for a connection
135            // to be returned by some reader.
136            let permit = self.semaphore.clone().acquire_owned().await?;
137            let conn = {
138                let mut connections = self.connections.lock();
139                connections.pop().context(
140                    "Got a permit and write lock when there are no connections in the pool",
141                )?
142            };
143            let conn = PooledConnection {
144                pool: Arc::downgrade(&self),
145                conn: Some(conn),
146                _permit: permit,
147                _write_mutex_guard: Some(write_mutex_guard),
148            };
149            conn.pragma_update(None, "query_only", "0")?;
150            Ok(conn)
151        }
152    }
153}
154
155/// Pooled connection.
156pub struct PooledConnection {
157    /// Weak reference to the pool used to return the connection back.
158    pool: Weak<InnerPool>,
159
160    /// Only `None` right after moving the connection back to the pool.
161    conn: Option<Connection>,
162
163    /// Semaphore permit, dropped after returning the connection to the pool.
164    _permit: OwnedSemaphorePermit,
165
166    /// Write mutex guard.
167    ///
168    /// `None` for read-only connections with `PRAGMA query_only=1`.
169    _write_mutex_guard: Option<OwnedMutexGuard<()>>,
170}
171
172impl Drop for PooledConnection {
173    fn drop(&mut self) {
174        // Put the connection back unless the pool is already dropped.
175        if let Some(pool) = self.pool.upgrade()
176            && let Some(conn) = self.conn.take()
177        {
178            pool.put(conn);
179        }
180    }
181}
182
183impl Deref for PooledConnection {
184    type Target = Connection;
185
186    fn deref(&self) -> &Connection {
187        self.conn.as_ref().unwrap()
188    }
189}
190
191impl DerefMut for PooledConnection {
192    fn deref_mut(&mut self) -> &mut Connection {
193        self.conn.as_mut().unwrap()
194    }
195}
196
197/// Connection pool.
198#[derive(Clone, Debug)]
199pub struct Pool {
200    /// Reference to the actual connection pool.
201    inner: Arc<InnerPool>,
202}
203
204impl Pool {
205    /// Creates a new connection pool.
206    pub fn new(connections: Vec<Connection>) -> Self {
207        let semaphore = Arc::new(Semaphore::new(connections.len()));
208        let inner = Arc::new(InnerPool {
209            connections: parking_lot::Mutex::new(connections),
210            semaphore,
211            write_mutex: Default::default(),
212            wal_checkpoint_mutex: Default::default(),
213        });
214        Pool { inner }
215    }
216
217    pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
218        Arc::clone(&self.inner).get(query_only).await
219    }
220
221    /// Truncates the WAL file.
222    pub(crate) async fn wal_checkpoint(&self) -> Result<WalCheckpointStats> {
223        wal_checkpoint::wal_checkpoint(self).await
224    }
225}