1use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, bail, ensure};
8use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
9use tokio::sync::RwLock;
10
11use crate::blob::BlobObject;
12use crate::config::Config;
13use crate::constants::DC_CHAT_ID_TRASH;
14use crate::context::Context;
15use crate::debug_logging::set_debug_logging_xdc;
16use crate::ephemeral::start_ephemeral_timers;
17use crate::imex::BLOBS_BACKUP_NAME;
18use crate::location::delete_orphaned_poi_locations;
19use crate::log::{LogExt, warn};
20use crate::message::MsgId;
21use crate::net::dns::prune_dns_cache;
22use crate::net::http::http_cache_cleanup;
23use crate::net::prune_connection_history;
24use crate::param::{Param, Params};
25use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
26
27pub trait ToSql: rusqlite::ToSql + Send + Sync {}
30
31impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
32
33#[macro_export]
39macro_rules! params_slice {
40 ($($param:expr),+) => {
41 [$(&$param as &dyn $crate::sql::ToSql),+]
42 };
43}
44
45mod migrations;
46mod pool;
47
48use pool::Pool;
49
50#[derive(Debug)]
52pub struct Sql {
53 pub(crate) dbfile: PathBuf,
55
56 pool: RwLock<Option<Pool>>,
58
59 is_encrypted: RwLock<Option<bool>>,
62
63 pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
65}
66
67impl Sql {
68 pub fn new(dbfile: PathBuf) -> Sql {
70 Self {
71 dbfile,
72 pool: Default::default(),
73 is_encrypted: Default::default(),
74 config_cache: Default::default(),
75 }
76 }
77
78 pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
86 if self.is_open().await {
87 bail!("Database is already opened.");
88 }
89
90 let _lock = self.pool.write().await;
92
93 let connection = Connection::open(&self.dbfile)?;
95 if !passphrase.is_empty() {
96 connection
97 .pragma_update(None, "key", &passphrase)
98 .context("Failed to set PRAGMA key")?;
99 }
100 let key_is_correct = connection
101 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
102 .is_ok();
103
104 Ok(key_is_correct)
105 }
106
107 pub async fn is_open(&self) -> bool {
109 self.pool.read().await.is_some()
110 }
111
112 pub(crate) async fn is_encrypted(&self) -> Option<bool> {
116 *self.is_encrypted.read().await
117 }
118
119 pub(crate) async fn close(&self) {
121 let _ = self.pool.write().await.take();
122 }
124
125 pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
127 let path_str = path
128 .to_str()
129 .with_context(|| format!("path {path:?} is not valid unicode"))?
130 .to_string();
131
132 let mut config_cache = self.config_cache.write().await;
135 config_cache.clear();
136
137 let query_only = false;
138 self.call(query_only, move |conn| {
139 conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
141 .context("failed to attach backup database")?;
142 let res = conn
143 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
144 .context("backup passphrase is not correct");
145
146 res.and_then(|_| {
151 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
152 .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
153 })
154 .and_then(|_| {
155 conn.execute("VACUUM", [])
156 .context("failed to vacuum the database")
157 })
158 .and(
159 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
160 .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
161 )
162 .and_then(|_| {
163 conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
164 Ok(())
165 })
166 .context("failed to import from attached backup database")
167 })
168 .and(
169 conn.execute("DETACH DATABASE backup", [])
170 .context("failed to detach backup database"),
171 )?;
172 Ok(())
173 })
174 .await
175 }
176
177 const N_DB_CONNECTIONS: usize = 3;
178
179 fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
181 let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
182 for _ in 0..Self::N_DB_CONNECTIONS {
183 let connection = new_connection(dbfile, &passphrase)?;
184 connections.push(connection);
185 }
186
187 let pool = Pool::new(connections);
188 Ok(pool)
189 }
190
191 async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
192 *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
193
194 if let Err(e) = self.run_migrations(context).await {
195 error!(context, "Running migrations failed: {e:#}");
196 eprintln!("Running migrations failed: {e:#}");
201 context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
202 }
207
208 Ok(())
209 }
210
211 pub async fn run_migrations(&self, context: &Context) -> Result<()> {
213 let recode_avatar = migrations::run(context, self)
218 .await
219 .context("failed to run migrations")?;
220
221 if recode_avatar && let Some(avatar) = context.get_config(Config::Selfavatar).await? {
225 let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
226 match blob.recode_to_avatar_size(context).await {
227 Ok(()) => {
228 if let Some(path) = blob.to_abs_path().to_str() {
229 context
230 .set_config_internal(Config::Selfavatar, Some(path))
231 .await?;
232 } else {
233 warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
234 }
235 }
236 Err(e) => {
237 warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
238 context
239 .set_config_internal(Config::Selfavatar, None)
240 .await?
241 }
242 }
243 }
244
245 Ok(())
246 }
247
248 pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
251 if self.is_open().await {
252 error!(
253 context,
254 "Cannot open, database \"{:?}\" already opened.", self.dbfile,
255 );
256 bail!("SQL database is already opened.");
257 }
258
259 let passphrase_nonempty = !passphrase.is_empty();
260 self.try_open(context, &self.dbfile, passphrase).await?;
261 info!(context, "Opened database {:?}.", self.dbfile);
262 *self.is_encrypted.write().await = Some(passphrase_nonempty);
263
264 if let Some(xdc_id) = self
266 .get_raw_config_u32(Config::DebugLogging.as_ref())
267 .await?
268 {
269 set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
270 }
271 Ok(())
272 }
273
274 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
280 let mut lock = self.pool.write().await;
281
282 let pool = lock.take().context("SQL connection pool is not open")?;
283 let query_only = false;
284 let conn = pool.get(query_only).await?;
285 if !passphrase.is_empty() {
286 conn.pragma_update(None, "rekey", passphrase.clone())
287 .context("Failed to set PRAGMA rekey")?;
288 }
289 drop(pool);
290
291 *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
292
293 Ok(())
294 }
295
296 async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
303 where
304 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
305 R: Send + 'static,
306 {
307 let lock = self.pool.read().await;
308 let pool = lock.as_ref().context("no SQL connection")?;
309 let mut conn = pool.get(query_only).await?;
310 let res = tokio::task::block_in_place(move || function(&mut conn))?;
311 Ok(res)
312 }
313
314 pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
319 where
320 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
321 R: Send + 'static,
322 {
323 let query_only = false;
324 self.call(query_only, function).await
325 }
326
327 pub async fn execute(
329 &self,
330 query: &str,
331 params: impl rusqlite::Params + Send,
332 ) -> Result<usize> {
333 self.call_write(move |conn| {
334 let res = conn.execute(query, params)?;
335 Ok(res)
336 })
337 .await
338 }
339
340 pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
342 self.call_write(move |conn| {
343 conn.execute(query, params)?;
344 Ok(conn.last_insert_rowid())
345 })
346 .await
347 }
348
349 pub async fn query_map<T, F, G, H>(
353 &self,
354 sql: &str,
355 params: impl rusqlite::Params + Send,
356 f: F,
357 g: G,
358 ) -> Result<H>
359 where
360 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
361 G: Send + FnOnce(rusqlite::AndThenRows<F>) -> Result<H>,
362 H: Send + 'static,
363 {
364 let query_only = true;
365 self.call(query_only, move |conn| {
366 let mut stmt = conn.prepare(sql)?;
367 let res = stmt.query_and_then(params, f)?;
368 g(res)
369 })
370 .await
371 }
372
373 pub async fn query_map_collect<T, C, F>(
377 &self,
378 sql: &str,
379 params: impl rusqlite::Params + Send,
380 f: F,
381 ) -> Result<C>
382 where
383 T: Send + 'static,
384 C: Send + 'static + std::iter::FromIterator<T>,
385 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
386 {
387 self.query_map(sql, params, f, |rows| {
388 rows.collect::<std::result::Result<C, _>>()
389 })
390 .await
391 }
392
393 pub async fn query_map_vec<T, F>(
397 &self,
398 sql: &str,
399 params: impl rusqlite::Params + Send,
400 f: F,
401 ) -> Result<Vec<T>>
402 where
403 T: Send + 'static,
404 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
405 {
406 self.query_map_collect(sql, params, f).await
407 }
408
409 pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
411 let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
412 Ok(usize::try_from(count)?)
413 }
414
415 pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
418 let count = self.count(sql, params).await?;
419 Ok(count > 0)
420 }
421
422 pub async fn query_row<T, F>(
424 &self,
425 query: &str,
426 params: impl rusqlite::Params + Send,
427 f: F,
428 ) -> Result<T>
429 where
430 F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
431 T: Send + 'static,
432 {
433 let query_only = true;
434 self.call(query_only, move |conn| {
435 let res = conn.query_row(query, params, f)?;
436 Ok(res)
437 })
438 .await
439 }
440
441 pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
446 where
447 H: Send + 'static,
448 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
449 {
450 let query_only = false;
451 self.transaction_ex(query_only, callback).await
452 }
453
454 pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
469 where
470 H: Send + 'static,
471 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
472 {
473 self.call(query_only, move |conn| {
474 let mut transaction = conn.transaction()?;
475 let ret = callback(&mut transaction);
476
477 match ret {
478 Ok(ret) => {
479 transaction.commit()?;
480 Ok(ret)
481 }
482 Err(err) => {
483 transaction.rollback()?;
484 Err(err)
485 }
486 }
487 })
488 .await
489 }
490
491 pub async fn table_exists(&self, name: &str) -> Result<bool> {
493 let query_only = true;
494 self.call(query_only, move |conn| {
495 let mut exists = false;
496 conn.pragma(None, "table_info", name.to_string(), |_row| {
497 exists = true;
499 Ok(())
500 })?;
501
502 Ok(exists)
503 })
504 .await
505 }
506
507 pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
509 let query_only = true;
510 self.call(query_only, move |conn| {
511 let mut exists = false;
512 conn.pragma(None, "table_info", table_name.to_string(), |row| {
515 let curr_name: String = row.get(1)?;
516 if col_name == curr_name {
517 exists = true;
518 }
519 Ok(())
520 })?;
521
522 Ok(exists)
523 })
524 .await
525 }
526
527 pub async fn query_row_optional<T, F>(
529 &self,
530 sql: &str,
531 params: impl rusqlite::Params + Send,
532 f: F,
533 ) -> Result<Option<T>>
534 where
535 F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
536 T: Send + 'static,
537 {
538 let query_only = true;
539 self.call(query_only, move |conn| {
540 match conn.query_row(sql.as_ref(), params, f) {
541 Ok(res) => Ok(Some(res)),
542 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
543 Err(err) => Err(err.into()),
544 }
545 })
546 .await
547 }
548
549 pub async fn query_get_value<T>(
552 &self,
553 query: &str,
554 params: impl rusqlite::Params + Send,
555 ) -> Result<Option<T>>
556 where
557 T: rusqlite::types::FromSql + Send + 'static,
558 {
559 self.query_row_optional(query, params, |row| row.get::<_, T>(0))
560 .await
561 }
562
563 pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
568 let mut lock = self.config_cache.write().await;
569 if let Some(value) = value {
570 self.execute(
571 "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
572 (key, value),
573 )
574 .await?;
575 } else {
576 self.execute("DELETE FROM config WHERE keyname=?", (key,))
577 .await?;
578 }
579 lock.insert(key.to_string(), value.map(|s| s.to_string()));
580 drop(lock);
581
582 Ok(())
583 }
584
585 pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
587 let lock = self.config_cache.read().await;
588 let cached = lock.get(key).cloned();
589 drop(lock);
590
591 if let Some(c) = cached {
592 return Ok(c);
593 }
594
595 let mut lock = self.config_cache.write().await;
596 let value = self
597 .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
598 .await
599 .context(format!("failed to fetch raw config: {key}"))?;
600 lock.insert(key.to_string(), value.clone());
601 drop(lock);
602
603 Ok(value)
604 }
605
606 pub(crate) async fn uncache_raw_config(&self, key: &str) {
608 let mut lock = self.config_cache.write().await;
609 lock.remove(key);
610 }
611
612 pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
614 self.set_raw_config(key, Some(&format!("{value}"))).await
615 }
616
617 pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
619 self.get_raw_config(key)
620 .await
621 .map(|s| s.and_then(|s| s.parse().ok()))
622 }
623
624 pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
626 self.get_raw_config(key)
627 .await
628 .map(|s| s.and_then(|s| s.parse().ok()))
629 }
630
631 pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
633 let res = self.get_raw_config_int(key).await?;
636 Ok(res.unwrap_or_default() > 0)
637 }
638
639 pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
641 let value = if value { Some("1") } else { None };
642 self.set_raw_config(key, value).await
643 }
644
645 pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
647 self.set_raw_config(key, Some(&format!("{value}"))).await
648 }
649
650 pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
652 self.get_raw_config(key)
653 .await
654 .map(|s| s.and_then(|r| r.parse().ok()))
655 }
656
657 #[cfg(feature = "internals")]
659 pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
660 &self.config_cache
661 }
662
663 pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
665 let t_start = Time::now();
666 let lock = context.sql.pool.read().await;
667 let Some(pool) = lock.as_ref() else {
668 return Ok(());
670 };
671
672 let query_only = true;
674 let conn = pool.get(query_only).await?;
675 tokio::task::block_in_place(|| {
676 conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
680 conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
681 })?;
682
683 const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
685 let _write_lock = pool.write_lock().await;
686 let t_writers_blocked = Time::now();
687 let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
693 for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
694 read_conns.push(pool.get(query_only).await?);
695 }
696 read_conns.clear();
697 let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
699 conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
700 let pages_total: i64 = row.get(1)?;
701 let pages_checkpointed: i64 = row.get(2)?;
702 Ok((pages_total, pages_checkpointed))
703 })
704 })?;
705 if pages_checkpointed < pages_total {
706 warn!(
707 context,
708 "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
709 );
710 }
711 for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
713 read_conns.push(pool.get(query_only).await?);
714 }
715 let t_readers_blocked = Time::now();
716 tokio::task::block_in_place(|| {
717 let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
718 let blocked: i64 = row.get(0)?;
719 Ok(blocked)
720 })?;
721 ensure!(blocked == 0);
722 Ok(())
723 })?;
724 info!(
725 context,
726 "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
727 time_elapsed(&t_start),
728 time_elapsed(&t_writers_blocked),
729 time_elapsed(&t_readers_blocked),
730 );
731 Ok(())
732 }
733}
734
735fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
742 let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
743 | OpenFlags::SQLITE_OPEN_READ_WRITE
744 | OpenFlags::SQLITE_OPEN_CREATE;
745 let conn = Connection::open_with_flags(path, flags)?;
746 conn.execute_batch(
747 "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
748 PRAGMA secure_delete=on;
749 PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
750 PRAGMA foreign_keys=on;
751 ",
752 )?;
753
754 if cfg!(not(target_os = "ios")) {
758 conn.pragma_update(None, "temp_store", "memory")?;
759 }
760
761 if cfg!(target_os = "ios") {
772 conn.busy_timeout(Duration::new(60, 0))?;
773 } else {
774 conn.busy_timeout(Duration::ZERO)?;
775 }
776
777 if !passphrase.is_empty() {
778 conn.pragma_update(None, "key", passphrase)?;
779 }
780 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
787
788 conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
789 conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
791
792 Ok(conn)
793}
794
795async fn incremental_vacuum(context: &Context) -> Result<()> {
799 context
800 .sql
801 .call_write(move |conn| {
802 let mut stmt = conn
803 .prepare("PRAGMA incremental_vacuum")
804 .context("Failed to prepare incremental_vacuum statement")?;
805
806 let mut rows = stmt
810 .query(())
811 .context("Failed to run incremental_vacuum statement")?;
812 let mut row_count = 0;
813 while let Some(_row) = rows
814 .next()
815 .context("Failed to step incremental_vacuum statement")?
816 {
817 row_count += 1;
818 }
819 info!(context, "Incremental vacuum freed {row_count} pages.");
820 Ok(())
821 })
822 .await
823}
824
825pub async fn housekeeping(context: &Context) -> Result<()> {
827 if let Err(e) = context
830 .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
831 .await
832 {
833 warn!(context, "Can't set config: {e:#}.");
834 }
835
836 http_cache_cleanup(context)
837 .await
838 .context("Failed to cleanup HTTP cache")
839 .log_err(context)
840 .ok();
841 migrations::msgs_to_key_contacts(context)
842 .await
843 .context("migrations::msgs_to_key_contacts")
844 .log_err(context)
845 .ok();
846
847 if let Err(err) = remove_unused_files(context).await {
848 warn!(
849 context,
850 "Housekeeping: cannot remove unused files: {:#}.", err
851 );
852 }
853
854 if let Err(err) = start_ephemeral_timers(context).await {
855 warn!(
856 context,
857 "Housekeeping: cannot start ephemeral timers: {:#}.", err
858 );
859 }
860
861 if let Err(err) = prune_tombstones(&context.sql).await {
862 warn!(
863 context,
864 "Housekeeping: Cannot prune message tombstones: {:#}.", err
865 );
866 }
867
868 if let Err(err) = incremental_vacuum(context).await {
869 warn!(context, "Failed to run incremental vacuum: {err:#}.");
870 }
871 if let Err(err) = Sql::wal_checkpoint(context).await {
876 warn!(context, "wal_checkpoint() failed: {err:#}.");
877 debug_assert!(false);
878 }
879
880 context
881 .sql
882 .execute(
883 "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
884 (SELECT id FROM msgs WHERE chat_id!=?)",
885 (DC_CHAT_ID_TRASH,),
886 )
887 .await
888 .context("failed to remove old MDNs")
889 .log_err(context)
890 .ok();
891
892 context
893 .sql
894 .execute(
895 "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
896 (SELECT id FROM msgs WHERE chat_id!=?)",
897 (DC_CHAT_ID_TRASH,),
898 )
899 .await
900 .context("failed to remove old webxdc status updates")
901 .log_err(context)
902 .ok();
903
904 prune_connection_history(context)
905 .await
906 .context("Failed to prune connection history")
907 .log_err(context)
908 .ok();
909 prune_dns_cache(context)
910 .await
911 .context("Failed to prune DNS cache")
912 .log_err(context)
913 .ok();
914
915 delete_orphaned_poi_locations(context)
918 .await
919 .context("Failed to delete orphaned POI locations")
920 .log_err(context)
921 .ok();
922
923 info!(context, "Housekeeping done.");
924 Ok(())
925}
926
927pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
929 row.get(idx).or_else(|err| match row.get_ref(idx)? {
930 ValueRef::Null => Ok(Vec::new()),
931 ValueRef::Text(text) => Ok(text.to_vec()),
932 ValueRef::Blob(blob) => Ok(blob.to_vec()),
933 ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
934 })
935}
936
937pub async fn remove_unused_files(context: &Context) -> Result<()> {
939 let mut files_in_use = HashSet::new();
940 let mut unreferenced_count = 0;
941
942 info!(context, "Start housekeeping...");
943 maybe_add_from_param(
944 &context.sql,
945 &mut files_in_use,
946 "SELECT param FROM msgs WHERE chat_id!=3 AND type!=10;",
947 Param::File,
948 )
949 .await?;
950 maybe_add_from_param(
951 &context.sql,
952 &mut files_in_use,
953 "SELECT param FROM chats;",
954 Param::ProfileImage,
955 )
956 .await?;
957 maybe_add_from_param(
958 &context.sql,
959 &mut files_in_use,
960 "SELECT param FROM contacts;",
961 Param::ProfileImage,
962 )
963 .await?;
964
965 context
966 .sql
967 .query_map(
968 "SELECT value FROM config;",
969 (),
970 |row| {
971 let row: String = row.get(0)?;
972 Ok(row)
973 },
974 |rows| {
975 for row in rows {
976 maybe_add_file(&mut files_in_use, &row?);
977 }
978 Ok(())
979 },
980 )
981 .await
982 .context("housekeeping: failed to SELECT value FROM config")?;
983
984 context
985 .sql
986 .query_map(
987 "SELECT blobname FROM http_cache",
988 (),
989 |row| {
990 let row: String = row.get(0)?;
991 Ok(row)
992 },
993 |rows| {
994 for row in rows {
995 maybe_add_file(&mut files_in_use, &row?);
996 }
997 Ok(())
998 },
999 )
1000 .await
1001 .context("Failed to SELECT blobname FROM http_cache")?;
1002
1003 info!(context, "{} files in use.", files_in_use.len());
1004 let blobdir = context.get_blobdir();
1006 for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
1007 match tokio::fs::read_dir(p).await {
1008 Ok(mut dir_handle) => {
1009 let diff = std::time::Duration::from_secs(60 * 60);
1011 let keep_files_newer_than = SystemTime::now()
1012 .checked_sub(diff)
1013 .unwrap_or(SystemTime::UNIX_EPOCH);
1014
1015 while let Ok(Some(entry)) = dir_handle.next_entry().await {
1016 let name_f = entry.file_name();
1017 let name_s = name_f.to_string_lossy();
1018
1019 if p == blobdir
1020 && (is_file_in_use(&files_in_use, None, &name_s)
1021 || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
1022 || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
1023 {
1024 continue;
1025 }
1026
1027 let stats = match tokio::fs::metadata(entry.path()).await {
1028 Err(err) => {
1029 warn!(
1030 context,
1031 "Cannot get metadata for {}: {:#}.",
1032 entry.path().display(),
1033 err
1034 );
1035 continue;
1036 }
1037 Ok(stats) => stats,
1038 };
1039
1040 if stats.is_dir() {
1041 if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
1042 info!(
1045 context,
1046 "Housekeeping: Cannot rmdir {}: {:#}.",
1047 entry.path().display(),
1048 e
1049 );
1050 }
1051 continue;
1052 }
1053
1054 unreferenced_count += 1;
1055 let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
1056 let recently_modified =
1057 stats.modified().is_ok_and(|t| t > keep_files_newer_than);
1058 let recently_accessed =
1059 stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
1060
1061 if p == blobdir && (recently_created || recently_modified || recently_accessed)
1062 {
1063 info!(
1064 context,
1065 "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
1066 unreferenced_count,
1067 entry.file_name(),
1068 );
1069 continue;
1070 }
1071
1072 info!(
1073 context,
1074 "Housekeeping: Deleting unreferenced file #{}: {:?}.",
1075 unreferenced_count,
1076 entry.file_name()
1077 );
1078 let path = entry.path();
1079 if let Err(err) = delete_file(context, &path).await {
1080 error!(
1081 context,
1082 "Failed to delete unused file {}: {:#}.",
1083 path.display(),
1084 err
1085 );
1086 }
1087 }
1088 }
1089 Err(err) => {
1090 if !p.ends_with(BLOBS_BACKUP_NAME) {
1091 warn!(
1092 context,
1093 "Housekeeping: Cannot read dir {}: {:#}.",
1094 p.display(),
1095 err
1096 );
1097 }
1098 }
1099 }
1100 }
1101
1102 Ok(())
1103}
1104
1105fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
1106 let name_to_check = if let Some(namespc) = namespc_opt {
1107 let Some(name) = name.strip_suffix(namespc) else {
1108 return false;
1109 };
1110 name
1111 } else {
1112 name
1113 };
1114 files_in_use.contains(name_to_check)
1115}
1116
1117fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1118 if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1119 files_in_use.insert(file.to_string());
1120 }
1121}
1122
1123async fn maybe_add_from_param(
1124 sql: &Sql,
1125 files_in_use: &mut HashSet<String>,
1126 query: &str,
1127 param_id: Param,
1128) -> Result<()> {
1129 sql.query_map(
1130 query,
1131 (),
1132 |row| {
1133 let row: String = row.get(0)?;
1134 Ok(row)
1135 },
1136 |rows| {
1137 for row in rows {
1138 let param: Params = row?.parse().unwrap_or_default();
1139 if let Some(file) = param.get(param_id) {
1140 maybe_add_file(files_in_use, file);
1141 }
1142 }
1143 Ok(())
1144 },
1145 )
1146 .await
1147 .context(format!("housekeeping: failed to add_from_param {query}"))?;
1148
1149 Ok(())
1150}
1151
1152async fn prune_tombstones(sql: &Sql) -> Result<()> {
1155 let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1157 sql.execute(
1158 "DELETE FROM msgs
1159 WHERE chat_id=?
1160 AND timestamp<=?
1161 AND NOT EXISTS (
1162 SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1163 )",
1164 (DC_CHAT_ID_TRASH, timestamp_max),
1165 )
1166 .await?;
1167 Ok(())
1168}
1169
1170#[cfg(test)]
1171mod sql_tests;