deltachat/sql/pool.rs
1//! # SQLite connection pool.
2//!
3//! The connection pool holds a number of SQLite connections and allows to allocate them.
4//! When allocated connection is dropped, underlying connection is returned back to the pool.
5//!
6//! The pool is organized as a stack. It always allocates the most recently used connection.
7//! Each SQLite connection has its own page cache, so allocating recently used connections
8//! improves the performance compared to, for example, organizing the pool as a queue
9//! and returning the least recently used connection each time.
10//!
11//! Pool returns at most one write connection (with `PRAGMA query_only=0`).
12//! This ensures that there never are multiple write transactions at once.
13//!
14//! Doing the locking ourselves instead of relying on SQLite has these reasons:
15//!
16//! - SQLite's locking mechanism is non-async, blocking a thread
17//! - SQLite's locking mechanism just sleeps in a loop, which is really inefficient
18//!
19//! ---
20//!
21//! More considerations on alternatives to the current approach:
22//!
23//! We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions.
24//!
25//! In order to never get concurrency issues, we could make all transactions IMMEDIATE,
26//! but this would mean that there can never be two simultaneous transactions.
27//!
28//! Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks.
29//!
30//! DEFERRED write transactions without doing the locking ourselves would have these drawbacks:
31//!
32//! 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop.
33//! 2. If there are other write transactions, we block the db connection until
34//! upgraded. If some reader comes then, it has to get the next, less used connection with a
35//! worse per-connection page cache (SQLite allows one write and any number of reads in parallel).
36//! 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY.
37//! 4. If upon a successful upgrade to a write transaction the db has been modified,
38//! the transaction has to be rolled back and retried, which means extra work in terms of
39//! CPU/battery.
40//!
41//! The only pro of making write transactions DEFERRED w/o the external locking would be some
42//! parallelism between them.
43//!
44//! Another option would be to make write transactions IMMEDIATE, also
45//! w/o the external locking. But then cons 1. - 3. above would still be valid.
46
47use std::ops::{Deref, DerefMut};
48use std::sync::{Arc, Weak};
49
50use anyhow::{Context, Result};
51use rusqlite::Connection;
52use tokio::sync::{Mutex, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore};
53
54/// Inner connection pool.
55#[derive(Debug)]
56struct InnerPool {
57 /// Available connections.
58 connections: parking_lot::Mutex<Vec<Connection>>,
59
60 /// Counts the number of available connections.
61 semaphore: Arc<Semaphore>,
62
63 /// Write mutex.
64 ///
65 /// This mutex ensures there is at most
66 /// one write connection with `query_only=0`.
67 ///
68 /// This mutex is locked when write connection
69 /// is outside the pool.
70 write_mutex: Arc<Mutex<()>>,
71}
72
73impl InnerPool {
74 /// Puts a connection into the pool.
75 ///
76 /// The connection could be new or returned back.
77 fn put(&self, connection: Connection) {
78 let mut connections = self.connections.lock();
79 connections.push(connection);
80 drop(connections);
81 }
82
83 /// Retrieves a connection from the pool.
84 ///
85 /// Sets `query_only` pragma to the provided value
86 /// to prevent accidental misuse of connection
87 /// for writing when reading is intended.
88 /// Only pass `query_only=false` if you want
89 /// to use the connection for writing.
90 pub async fn get(self: Arc<Self>, query_only: bool) -> Result<PooledConnection> {
91 if query_only {
92 let permit = self.semaphore.clone().acquire_owned().await?;
93 let conn = {
94 let mut connections = self.connections.lock();
95 connections
96 .pop()
97 .context("Got a permit when there are no connections in the pool")?
98 };
99 conn.pragma_update(None, "query_only", "1")?;
100 let conn = PooledConnection {
101 pool: Arc::downgrade(&self),
102 conn: Some(conn),
103 _permit: permit,
104 _write_mutex_guard: None,
105 };
106 Ok(conn)
107 } else {
108 // We get write guard first to avoid taking a permit
109 // and not using it, blocking a reader from getting a connection
110 // while being ourselves blocked by another wrtier.
111 let write_mutex_guard = Arc::clone(&self.write_mutex).lock_owned().await;
112
113 // We may still have to wait for a connection
114 // to be returned by some reader.
115 let permit = self.semaphore.clone().acquire_owned().await?;
116 let conn = {
117 let mut connections = self.connections.lock();
118 connections.pop().context(
119 "Got a permit and write lock when there are no connections in the pool",
120 )?
121 };
122 conn.pragma_update(None, "query_only", "0")?;
123 let conn = PooledConnection {
124 pool: Arc::downgrade(&self),
125 conn: Some(conn),
126 _permit: permit,
127 _write_mutex_guard: Some(write_mutex_guard),
128 };
129 Ok(conn)
130 }
131 }
132}
133
134/// Pooled connection.
135pub struct PooledConnection {
136 /// Weak reference to the pool used to return the connection back.
137 pool: Weak<InnerPool>,
138
139 /// Only `None` right after moving the connection back to the pool.
140 conn: Option<Connection>,
141
142 /// Semaphore permit, dropped after returning the connection to the pool.
143 _permit: OwnedSemaphorePermit,
144
145 /// Write mutex guard.
146 ///
147 /// `None` for read-only connections with `PRAGMA query_only=1`.
148 _write_mutex_guard: Option<OwnedMutexGuard<()>>,
149}
150
151impl Drop for PooledConnection {
152 fn drop(&mut self) {
153 // Put the connection back unless the pool is already dropped.
154 if let Some(pool) = self.pool.upgrade() {
155 if let Some(conn) = self.conn.take() {
156 pool.put(conn);
157 }
158 }
159 }
160}
161
162impl Deref for PooledConnection {
163 type Target = Connection;
164
165 fn deref(&self) -> &Connection {
166 self.conn.as_ref().unwrap()
167 }
168}
169
170impl DerefMut for PooledConnection {
171 fn deref_mut(&mut self) -> &mut Connection {
172 self.conn.as_mut().unwrap()
173 }
174}
175
176/// Connection pool.
177#[derive(Clone, Debug)]
178pub struct Pool {
179 /// Reference to the actual connection pool.
180 inner: Arc<InnerPool>,
181}
182
183impl Pool {
184 /// Creates a new connection pool.
185 pub fn new(connections: Vec<Connection>) -> Self {
186 let semaphore = Arc::new(Semaphore::new(connections.len()));
187 let inner = Arc::new(InnerPool {
188 connections: parking_lot::Mutex::new(connections),
189 semaphore,
190 write_mutex: Default::default(),
191 });
192 Pool { inner }
193 }
194
195 pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
196 Arc::clone(&self.inner).get(query_only).await
197 }
198}