1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
//! # SQLite connection pool.
//!
//! The connection pool holds a number of SQLite connections and allows to allocate them.
//! When allocated connection is dropped, underlying connection is returned back to the pool.
//!
//! The pool is organized as a stack. It always allocates the most recently used connection.
//! Each SQLite connection has its own page cache, so allocating recently used connections
//! improves the performance compared to, for example, organizing the pool as a queue
//! and returning the least recently used connection each time.
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use anyhow::{Context, Result};
use parking_lot::Mutex;
use rusqlite::Connection;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// Inner connection pool.
#[derive(Debug)]
struct InnerPool {
/// Available connections.
connections: Mutex<Vec<Connection>>,
/// Counts the number of available connections.
semaphore: Arc<Semaphore>,
}
impl InnerPool {
/// Puts a connection into the pool.
///
/// The connection could be new or returned back.
fn put(&self, connection: Connection) {
let mut connections = self.connections.lock();
connections.push(connection);
drop(connections);
}
}
/// Pooled connection.
pub struct PooledConnection {
/// Weak reference to the pool used to return the connection back.
pool: Weak<InnerPool>,
/// Only `None` right after moving the connection back to the pool.
conn: Option<Connection>,
/// Semaphore permit, dropped after returning the connection to the pool.
_permit: OwnedSemaphorePermit,
}
impl Drop for PooledConnection {
fn drop(&mut self) {
// Put the connection back unless the pool is already dropped.
if let Some(pool) = self.pool.upgrade() {
if let Some(conn) = self.conn.take() {
pool.put(conn);
}
}
}
}
impl Deref for PooledConnection {
type Target = Connection;
fn deref(&self) -> &Connection {
self.conn.as_ref().unwrap()
}
}
impl DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Connection {
self.conn.as_mut().unwrap()
}
}
/// Connection pool.
#[derive(Clone, Debug)]
pub struct Pool {
/// Reference to the actual connection pool.
inner: Arc<InnerPool>,
}
impl Pool {
/// Creates a new connection pool.
pub fn new(connections: Vec<Connection>) -> Self {
let semaphore = Arc::new(Semaphore::new(connections.len()));
let inner = Arc::new(InnerPool {
connections: Mutex::new(connections),
semaphore,
});
Pool { inner }
}
/// Retrieves a connection from the pool.
pub async fn get(&self) -> Result<PooledConnection> {
let permit = self.inner.semaphore.clone().acquire_owned().await?;
let mut connections = self.inner.connections.lock();
let conn = connections
.pop()
.context("got a permit when there are no connections in the pool")?;
let conn = PooledConnection {
pool: Arc::downgrade(&self.inner),
conn: Some(conn),
_permit: permit,
};
Ok(conn)
}
}