deltachat/sql/pool.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
//! # SQLite connection pool.
//!
//! The connection pool holds a number of SQLite connections and allows to allocate them.
//! When allocated connection is dropped, underlying connection is returned back to the pool.
//!
//! The pool is organized as a stack. It always allocates the most recently used connection.
//! Each SQLite connection has its own page cache, so allocating recently used connections
//! improves the performance compared to, for example, organizing the pool as a queue
//! and returning the least recently used connection each time.
//!
//! Pool returns at most one write connection (with `PRAGMA query_only=0`).
//! This ensures that there never are multiple write transactions at once.
//!
//! Doing the locking ourselves instead of relying on SQLite has these reasons:
//!
//! - SQLite's locking mechanism is non-async, blocking a thread
//! - SQLite's locking mechanism just sleeps in a loop, which is really inefficient
//!
//! ---
//!
//! More considerations on alternatives to the current approach:
//!
//! We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions.
//!
//! In order to never get concurrency issues, we could make all transactions IMMEDIATE,
//! but this would mean that there can never be two simultaneous transactions.
//!
//! Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks.
//!
//! DEFERRED write transactions without doing the locking ourselves would have these drawbacks:
//!
//! 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop.
//! 2. If there are other write transactions, we block the db connection until
//! upgraded. If some reader comes then, it has to get the next, less used connection with a
//! worse per-connection page cache (SQLite allows one write and any number of reads in parallel).
//! 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY.
//! 4. If upon a successful upgrade to a write transaction the db has been modified,
//! the transaction has to be rolled back and retried, which means extra work in terms of
//! CPU/battery.
//!
//! The only pro of making write transactions DEFERRED w/o the external locking would be some
//! parallelism between them.
//!
//! Another option would be to make write transactions IMMEDIATE, also
//! w/o the external locking. But then cons 1. - 3. above would still be valid.
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use anyhow::{Context, Result};
use rusqlite::Connection;
use tokio::sync::{Mutex, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore};
/// Inner connection pool.
#[derive(Debug)]
struct InnerPool {
/// Available connections.
connections: parking_lot::Mutex<Vec<Connection>>,
/// Counts the number of available connections.
semaphore: Arc<Semaphore>,
/// Write mutex.
///
/// This mutex ensures there is at most
/// one write connection with `query_only=0`.
///
/// This mutex is locked when write connection
/// is outside the pool.
write_mutex: Arc<Mutex<()>>,
}
impl InnerPool {
/// Puts a connection into the pool.
///
/// The connection could be new or returned back.
fn put(&self, connection: Connection) {
let mut connections = self.connections.lock();
connections.push(connection);
drop(connections);
}
/// Retrieves a connection from the pool.
///
/// Sets `query_only` pragma to the provided value
/// to prevent accidental misuse of connection
/// for writing when reading is intended.
/// Only pass `query_only=false` if you want
/// to use the connection for writing.
pub async fn get(self: Arc<Self>, query_only: bool) -> Result<PooledConnection> {
if query_only {
let permit = self.semaphore.clone().acquire_owned().await?;
let conn = {
let mut connections = self.connections.lock();
connections
.pop()
.context("Got a permit when there are no connections in the pool")?
};
conn.pragma_update(None, "query_only", "1")?;
let conn = PooledConnection {
pool: Arc::downgrade(&self),
conn: Some(conn),
_permit: permit,
_write_mutex_guard: None,
};
Ok(conn)
} else {
// We get write guard first to avoid taking a permit
// and not using it, blocking a reader from getting a connection
// while being ourselves blocked by another wrtier.
let write_mutex_guard = Arc::clone(&self.write_mutex).lock_owned().await;
// We may still have to wait for a connection
// to be returned by some reader.
let permit = self.semaphore.clone().acquire_owned().await?;
let conn = {
let mut connections = self.connections.lock();
connections.pop().context(
"Got a permit and write lock when there are no connections in the pool",
)?
};
conn.pragma_update(None, "query_only", "0")?;
let conn = PooledConnection {
pool: Arc::downgrade(&self),
conn: Some(conn),
_permit: permit,
_write_mutex_guard: Some(write_mutex_guard),
};
Ok(conn)
}
}
}
/// Pooled connection.
pub struct PooledConnection {
/// Weak reference to the pool used to return the connection back.
pool: Weak<InnerPool>,
/// Only `None` right after moving the connection back to the pool.
conn: Option<Connection>,
/// Semaphore permit, dropped after returning the connection to the pool.
_permit: OwnedSemaphorePermit,
/// Write mutex guard.
///
/// `None` for read-only connections with `PRAGMA query_only=1`.
_write_mutex_guard: Option<OwnedMutexGuard<()>>,
}
impl Drop for PooledConnection {
fn drop(&mut self) {
// Put the connection back unless the pool is already dropped.
if let Some(pool) = self.pool.upgrade() {
if let Some(conn) = self.conn.take() {
pool.put(conn);
}
}
}
}
impl Deref for PooledConnection {
type Target = Connection;
fn deref(&self) -> &Connection {
self.conn.as_ref().unwrap()
}
}
impl DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Connection {
self.conn.as_mut().unwrap()
}
}
/// Connection pool.
#[derive(Clone, Debug)]
pub struct Pool {
/// Reference to the actual connection pool.
inner: Arc<InnerPool>,
}
impl Pool {
/// Creates a new connection pool.
pub fn new(connections: Vec<Connection>) -> Self {
let semaphore = Arc::new(Semaphore::new(connections.len()));
let inner = Arc::new(InnerPool {
connections: parking_lot::Mutex::new(connections),
semaphore,
write_mutex: Default::default(),
});
Pool { inner }
}
pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
Arc::clone(&self.inner).get(query_only).await
}
}