1use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5
6use anyhow::{Context as _, Result, bail, ensure};
7use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
8use tokio::sync::RwLock;
9
10use crate::blob::BlobObject;
11use crate::chat::add_device_msg;
12use crate::config::Config;
13use crate::constants::DC_CHAT_ID_TRASH;
14use crate::context::Context;
15use crate::debug_logging::set_debug_logging_xdc;
16use crate::ephemeral::start_ephemeral_timers;
17use crate::imex::BLOBS_BACKUP_NAME;
18use crate::location::delete_orphaned_poi_locations;
19use crate::log::{LogExt, warn};
20use crate::message::{Message, MsgId};
21use crate::net::dns::prune_dns_cache;
22use crate::net::http::http_cache_cleanup;
23use crate::net::prune_connection_history;
24use crate::param::{Param, Params};
25use crate::stock_str;
26use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
27
28pub trait ToSql: rusqlite::ToSql + Send + Sync {}
31
32impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
33
34#[macro_export]
40macro_rules! params_slice {
41 ($($param:expr),+) => {
42 [$(&$param as &dyn $crate::sql::ToSql),+]
43 };
44}
45
46mod migrations;
47mod pool;
48
49use pool::Pool;
50
51#[derive(Debug)]
53pub struct Sql {
54 pub(crate) dbfile: PathBuf,
56
57 pool: RwLock<Option<Pool>>,
59
60 is_encrypted: RwLock<Option<bool>>,
63
64 pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
66}
67
68impl Sql {
69 pub fn new(dbfile: PathBuf) -> Sql {
71 Self {
72 dbfile,
73 pool: Default::default(),
74 is_encrypted: Default::default(),
75 config_cache: Default::default(),
76 }
77 }
78
79 pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
87 if self.is_open().await {
88 bail!("Database is already opened.");
89 }
90
91 let _lock = self.pool.write().await;
93
94 let connection = Connection::open(&self.dbfile)?;
96 if !passphrase.is_empty() {
97 connection
98 .pragma_update(None, "key", &passphrase)
99 .context("Failed to set PRAGMA key")?;
100 }
101 let key_is_correct = connection
102 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
103 .is_ok();
104
105 Ok(key_is_correct)
106 }
107
108 pub async fn is_open(&self) -> bool {
110 self.pool.read().await.is_some()
111 }
112
113 pub(crate) async fn is_encrypted(&self) -> Option<bool> {
117 *self.is_encrypted.read().await
118 }
119
120 pub(crate) async fn close(&self) {
122 let _ = self.pool.write().await.take();
123 }
125
126 pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
128 let path_str = path
129 .to_str()
130 .with_context(|| format!("path {path:?} is not valid unicode"))?
131 .to_string();
132
133 let mut config_cache = self.config_cache.write().await;
136 config_cache.clear();
137
138 let query_only = false;
139 self.call(query_only, move |conn| {
140 conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
142 .context("failed to attach backup database")?;
143 let res = conn
144 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
145 .context("backup passphrase is not correct");
146
147 res.and_then(|_| {
152 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
153 .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
154 })
155 .and_then(|_| {
156 conn.execute("VACUUM", [])
157 .context("failed to vacuum the database")
158 })
159 .and(
160 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
161 .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
162 )
163 .and_then(|_| {
164 conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
165 Ok(())
166 })
167 .context("failed to import from attached backup database")
168 })
169 .and(
170 conn.execute("DETACH DATABASE backup", [])
171 .context("failed to detach backup database"),
172 )?;
173 Ok(())
174 })
175 .await
176 }
177
178 const N_DB_CONNECTIONS: usize = 3;
179
180 fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
182 let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
183 for _ in 0..Self::N_DB_CONNECTIONS {
184 let connection = new_connection(dbfile, &passphrase)?;
185 connections.push(connection);
186 }
187
188 let pool = Pool::new(connections);
189 Ok(pool)
190 }
191
192 async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
193 *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
194
195 if let Err(e) = self.run_migrations(context).await {
196 error!(context, "Running migrations failed: {e:#}");
197 eprintln!("Running migrations failed: {e:#}");
202 context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
203 }
208
209 Ok(())
210 }
211
212 pub async fn run_migrations(&self, context: &Context) -> Result<()> {
214 let (_update_icons, disable_server_delete, recode_avatar) = migrations::run(context, self)
220 .await
221 .context("failed to run migrations")?;
222
223 if disable_server_delete {
227 if context.get_config_delete_server_after().await?.is_some() {
230 let mut msg = Message::new_text(stock_str::delete_server_turned_off(context).await);
231 add_device_msg(context, None, Some(&mut msg)).await?;
232 context
233 .set_config_internal(Config::DeleteServerAfter, Some("0"))
234 .await?;
235 }
236 }
237
238 if recode_avatar {
239 if let Some(avatar) = context.get_config(Config::Selfavatar).await? {
240 let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
241 match blob.recode_to_avatar_size(context).await {
242 Ok(()) => {
243 if let Some(path) = blob.to_abs_path().to_str() {
244 context
245 .set_config_internal(Config::Selfavatar, Some(path))
246 .await?;
247 } else {
248 warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
249 }
250 }
251 Err(e) => {
252 warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
253 context
254 .set_config_internal(Config::Selfavatar, None)
255 .await?
256 }
257 }
258 }
259 }
260
261 Ok(())
262 }
263
264 pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
267 if self.is_open().await {
268 error!(
269 context,
270 "Cannot open, database \"{:?}\" already opened.", self.dbfile,
271 );
272 bail!("SQL database is already opened.");
273 }
274
275 let passphrase_nonempty = !passphrase.is_empty();
276 self.try_open(context, &self.dbfile, passphrase).await?;
277 info!(context, "Opened database {:?}.", self.dbfile);
278 *self.is_encrypted.write().await = Some(passphrase_nonempty);
279
280 if let Some(xdc_id) = self
282 .get_raw_config_u32(Config::DebugLogging.as_ref())
283 .await?
284 {
285 set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
286 }
287 Ok(())
288 }
289
290 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
296 let mut lock = self.pool.write().await;
297
298 let pool = lock.take().context("SQL connection pool is not open")?;
299 let query_only = false;
300 let conn = pool.get(query_only).await?;
301 if !passphrase.is_empty() {
302 conn.pragma_update(None, "rekey", passphrase.clone())
303 .context("Failed to set PRAGMA rekey")?;
304 }
305 drop(pool);
306
307 *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
308
309 Ok(())
310 }
311
312 async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
319 where
320 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
321 R: Send + 'static,
322 {
323 let lock = self.pool.read().await;
324 let pool = lock.as_ref().context("no SQL connection")?;
325 let mut conn = pool.get(query_only).await?;
326 let res = tokio::task::block_in_place(move || function(&mut conn))?;
327 Ok(res)
328 }
329
330 pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
335 where
336 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
337 R: Send + 'static,
338 {
339 let query_only = false;
340 self.call(query_only, function).await
341 }
342
343 pub async fn execute(
345 &self,
346 query: &str,
347 params: impl rusqlite::Params + Send,
348 ) -> Result<usize> {
349 self.call_write(move |conn| {
350 let res = conn.execute(query, params)?;
351 Ok(res)
352 })
353 .await
354 }
355
356 pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
358 self.call_write(move |conn| {
359 conn.execute(query, params)?;
360 Ok(conn.last_insert_rowid())
361 })
362 .await
363 }
364
365 pub async fn query_map<T, F, G, H>(
369 &self,
370 sql: &str,
371 params: impl rusqlite::Params + Send,
372 f: F,
373 g: G,
374 ) -> Result<H>
375 where
376 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
377 G: Send + FnOnce(rusqlite::AndThenRows<F>) -> Result<H>,
378 H: Send + 'static,
379 {
380 let query_only = true;
381 self.call(query_only, move |conn| {
382 let mut stmt = conn.prepare(sql)?;
383 let res = stmt.query_and_then(params, f)?;
384 g(res)
385 })
386 .await
387 }
388
389 pub async fn query_map_collect<T, C, F>(
393 &self,
394 sql: &str,
395 params: impl rusqlite::Params + Send,
396 f: F,
397 ) -> Result<C>
398 where
399 T: Send + 'static,
400 C: Send + 'static + std::iter::FromIterator<T>,
401 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
402 {
403 self.query_map(sql, params, f, |rows| {
404 rows.collect::<std::result::Result<C, _>>()
405 })
406 .await
407 }
408
409 pub async fn query_map_vec<T, F>(
413 &self,
414 sql: &str,
415 params: impl rusqlite::Params + Send,
416 f: F,
417 ) -> Result<Vec<T>>
418 where
419 T: Send + 'static,
420 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
421 {
422 self.query_map_collect(sql, params, f).await
423 }
424
425 pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
427 let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
428 Ok(usize::try_from(count)?)
429 }
430
431 pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
434 let count = self.count(sql, params).await?;
435 Ok(count > 0)
436 }
437
438 pub async fn query_row<T, F>(
440 &self,
441 query: &str,
442 params: impl rusqlite::Params + Send,
443 f: F,
444 ) -> Result<T>
445 where
446 F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
447 T: Send + 'static,
448 {
449 let query_only = true;
450 self.call(query_only, move |conn| {
451 let res = conn.query_row(query, params, f)?;
452 Ok(res)
453 })
454 .await
455 }
456
457 pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
462 where
463 H: Send + 'static,
464 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
465 {
466 let query_only = false;
467 self.transaction_ex(query_only, callback).await
468 }
469
470 pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
485 where
486 H: Send + 'static,
487 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
488 {
489 self.call(query_only, move |conn| {
490 let mut transaction = conn.transaction()?;
491 let ret = callback(&mut transaction);
492
493 match ret {
494 Ok(ret) => {
495 transaction.commit()?;
496 Ok(ret)
497 }
498 Err(err) => {
499 transaction.rollback()?;
500 Err(err)
501 }
502 }
503 })
504 .await
505 }
506
507 pub async fn table_exists(&self, name: &str) -> Result<bool> {
509 let query_only = true;
510 self.call(query_only, move |conn| {
511 let mut exists = false;
512 conn.pragma(None, "table_info", name.to_string(), |_row| {
513 exists = true;
515 Ok(())
516 })?;
517
518 Ok(exists)
519 })
520 .await
521 }
522
523 pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
525 let query_only = true;
526 self.call(query_only, move |conn| {
527 let mut exists = false;
528 conn.pragma(None, "table_info", table_name.to_string(), |row| {
531 let curr_name: String = row.get(1)?;
532 if col_name == curr_name {
533 exists = true;
534 }
535 Ok(())
536 })?;
537
538 Ok(exists)
539 })
540 .await
541 }
542
543 pub async fn query_row_optional<T, F>(
545 &self,
546 sql: &str,
547 params: impl rusqlite::Params + Send,
548 f: F,
549 ) -> Result<Option<T>>
550 where
551 F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
552 T: Send + 'static,
553 {
554 let query_only = true;
555 self.call(query_only, move |conn| {
556 match conn.query_row(sql.as_ref(), params, f) {
557 Ok(res) => Ok(Some(res)),
558 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
559 Err(err) => Err(err.into()),
560 }
561 })
562 .await
563 }
564
565 pub async fn query_get_value<T>(
568 &self,
569 query: &str,
570 params: impl rusqlite::Params + Send,
571 ) -> Result<Option<T>>
572 where
573 T: rusqlite::types::FromSql + Send + 'static,
574 {
575 self.query_row_optional(query, params, |row| row.get::<_, T>(0))
576 .await
577 }
578
579 pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
584 let mut lock = self.config_cache.write().await;
585 if let Some(value) = value {
586 self.execute(
587 "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
588 (key, value),
589 )
590 .await?;
591 } else {
592 self.execute("DELETE FROM config WHERE keyname=?", (key,))
593 .await?;
594 }
595 lock.insert(key.to_string(), value.map(|s| s.to_string()));
596 drop(lock);
597
598 Ok(())
599 }
600
601 pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
603 let lock = self.config_cache.read().await;
604 let cached = lock.get(key).cloned();
605 drop(lock);
606
607 if let Some(c) = cached {
608 return Ok(c);
609 }
610
611 let mut lock = self.config_cache.write().await;
612 let value = self
613 .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
614 .await
615 .context(format!("failed to fetch raw config: {key}"))?;
616 lock.insert(key.to_string(), value.clone());
617 drop(lock);
618
619 Ok(value)
620 }
621
622 pub(crate) async fn uncache_raw_config(&self, key: &str) {
624 let mut lock = self.config_cache.write().await;
625 lock.remove(key);
626 }
627
628 pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
630 self.set_raw_config(key, Some(&format!("{value}"))).await
631 }
632
633 pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
635 self.get_raw_config(key)
636 .await
637 .map(|s| s.and_then(|s| s.parse().ok()))
638 }
639
640 pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
642 self.get_raw_config(key)
643 .await
644 .map(|s| s.and_then(|s| s.parse().ok()))
645 }
646
647 pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
649 let res = self.get_raw_config_int(key).await?;
652 Ok(res.unwrap_or_default() > 0)
653 }
654
655 pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
657 let value = if value { Some("1") } else { None };
658 self.set_raw_config(key, value).await
659 }
660
661 pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
663 self.set_raw_config(key, Some(&format!("{value}"))).await
664 }
665
666 pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
668 self.get_raw_config(key)
669 .await
670 .map(|s| s.and_then(|r| r.parse().ok()))
671 }
672
673 #[cfg(feature = "internals")]
675 pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
676 &self.config_cache
677 }
678
679 pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
681 let t_start = Time::now();
682 let lock = context.sql.pool.read().await;
683 let Some(pool) = lock.as_ref() else {
684 return Ok(());
686 };
687
688 let query_only = true;
690 let conn = pool.get(query_only).await?;
691 tokio::task::block_in_place(|| {
692 conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
696 conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
697 })?;
698
699 const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
701 let _write_lock = pool.write_lock().await;
702 let t_writers_blocked = Time::now();
703 let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
709 for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
710 read_conns.push(pool.get(query_only).await?);
711 }
712 read_conns.clear();
713 let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
715 conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
716 let pages_total: i64 = row.get(1)?;
717 let pages_checkpointed: i64 = row.get(2)?;
718 Ok((pages_total, pages_checkpointed))
719 })
720 })?;
721 if pages_checkpointed < pages_total {
722 warn!(
723 context,
724 "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
725 );
726 }
727 for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
729 read_conns.push(pool.get(query_only).await?);
730 }
731 let t_readers_blocked = Time::now();
732 tokio::task::block_in_place(|| {
733 let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
734 let blocked: i64 = row.get(0)?;
735 Ok(blocked)
736 })?;
737 ensure!(blocked == 0);
738 Ok(())
739 })?;
740 info!(
741 context,
742 "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
743 time_elapsed(&t_start),
744 time_elapsed(&t_writers_blocked),
745 time_elapsed(&t_readers_blocked),
746 );
747 Ok(())
748 }
749}
750
751fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
758 let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
759 | OpenFlags::SQLITE_OPEN_READ_WRITE
760 | OpenFlags::SQLITE_OPEN_CREATE;
761 let conn = Connection::open_with_flags(path, flags)?;
762 conn.execute_batch(
763 "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
764 PRAGMA secure_delete=on;
765 PRAGMA busy_timeout = 0; -- fail immediately
766 PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
767 PRAGMA foreign_keys=on;
768 ",
769 )?;
770
771 if cfg!(not(target_os = "ios")) {
775 conn.pragma_update(None, "temp_store", "memory")?;
776 }
777
778 if !passphrase.is_empty() {
779 conn.pragma_update(None, "key", passphrase)?;
780 }
781 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
788
789 conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
790 conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
792
793 Ok(conn)
794}
795
796async fn incremental_vacuum(context: &Context) -> Result<()> {
800 context
801 .sql
802 .call_write(move |conn| {
803 let mut stmt = conn
804 .prepare("PRAGMA incremental_vacuum")
805 .context("Failed to prepare incremental_vacuum statement")?;
806
807 let mut rows = stmt
811 .query(())
812 .context("Failed to run incremental_vacuum statement")?;
813 let mut row_count = 0;
814 while let Some(_row) = rows
815 .next()
816 .context("Failed to step incremental_vacuum statement")?
817 {
818 row_count += 1;
819 }
820 info!(context, "Incremental vacuum freed {row_count} pages.");
821 Ok(())
822 })
823 .await
824}
825
826pub async fn housekeeping(context: &Context) -> Result<()> {
828 if let Err(e) = context
831 .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
832 .await
833 {
834 warn!(context, "Can't set config: {e:#}.");
835 }
836
837 http_cache_cleanup(context)
838 .await
839 .context("Failed to cleanup HTTP cache")
840 .log_err(context)
841 .ok();
842 migrations::msgs_to_key_contacts(context)
843 .await
844 .context("migrations::msgs_to_key_contacts")
845 .log_err(context)
846 .ok();
847
848 if let Err(err) = remove_unused_files(context).await {
849 warn!(
850 context,
851 "Housekeeping: cannot remove unused files: {:#}.", err
852 );
853 }
854
855 if let Err(err) = start_ephemeral_timers(context).await {
856 warn!(
857 context,
858 "Housekeeping: cannot start ephemeral timers: {:#}.", err
859 );
860 }
861
862 if let Err(err) = prune_tombstones(&context.sql).await {
863 warn!(
864 context,
865 "Housekeeping: Cannot prune message tombstones: {:#}.", err
866 );
867 }
868
869 if let Err(err) = incremental_vacuum(context).await {
870 warn!(context, "Failed to run incremental vacuum: {err:#}.");
871 }
872 if let Err(err) = Sql::wal_checkpoint(context).await {
877 warn!(context, "wal_checkpoint() failed: {err:#}.");
878 debug_assert!(false);
879 }
880
881 context
882 .sql
883 .execute(
884 "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
885 (SELECT id FROM msgs WHERE chat_id!=?)",
886 (DC_CHAT_ID_TRASH,),
887 )
888 .await
889 .context("failed to remove old MDNs")
890 .log_err(context)
891 .ok();
892
893 context
894 .sql
895 .execute(
896 "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
897 (SELECT id FROM msgs WHERE chat_id!=?)",
898 (DC_CHAT_ID_TRASH,),
899 )
900 .await
901 .context("failed to remove old webxdc status updates")
902 .log_err(context)
903 .ok();
904
905 prune_connection_history(context)
906 .await
907 .context("Failed to prune connection history")
908 .log_err(context)
909 .ok();
910 prune_dns_cache(context)
911 .await
912 .context("Failed to prune DNS cache")
913 .log_err(context)
914 .ok();
915
916 delete_orphaned_poi_locations(context)
919 .await
920 .context("Failed to delete orphaned POI locations")
921 .log_err(context)
922 .ok();
923
924 info!(context, "Housekeeping done.");
925 Ok(())
926}
927
928pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
930 row.get(idx).or_else(|err| match row.get_ref(idx)? {
931 ValueRef::Null => Ok(Vec::new()),
932 ValueRef::Text(text) => Ok(text.to_vec()),
933 ValueRef::Blob(blob) => Ok(blob.to_vec()),
934 ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
935 })
936}
937
938pub async fn remove_unused_files(context: &Context) -> Result<()> {
940 let mut files_in_use = HashSet::new();
941 let mut unreferenced_count = 0;
942
943 info!(context, "Start housekeeping...");
944 maybe_add_from_param(
945 &context.sql,
946 &mut files_in_use,
947 "SELECT param FROM msgs WHERE chat_id!=3 AND type!=10;",
948 Param::File,
949 )
950 .await?;
951 maybe_add_from_param(
952 &context.sql,
953 &mut files_in_use,
954 "SELECT param FROM chats;",
955 Param::ProfileImage,
956 )
957 .await?;
958 maybe_add_from_param(
959 &context.sql,
960 &mut files_in_use,
961 "SELECT param FROM contacts;",
962 Param::ProfileImage,
963 )
964 .await?;
965
966 context
967 .sql
968 .query_map(
969 "SELECT value FROM config;",
970 (),
971 |row| {
972 let row: String = row.get(0)?;
973 Ok(row)
974 },
975 |rows| {
976 for row in rows {
977 maybe_add_file(&mut files_in_use, &row?);
978 }
979 Ok(())
980 },
981 )
982 .await
983 .context("housekeeping: failed to SELECT value FROM config")?;
984
985 context
986 .sql
987 .query_map(
988 "SELECT blobname FROM http_cache",
989 (),
990 |row| {
991 let row: String = row.get(0)?;
992 Ok(row)
993 },
994 |rows| {
995 for row in rows {
996 maybe_add_file(&mut files_in_use, &row?);
997 }
998 Ok(())
999 },
1000 )
1001 .await
1002 .context("Failed to SELECT blobname FROM http_cache")?;
1003
1004 info!(context, "{} files in use.", files_in_use.len());
1005 let blobdir = context.get_blobdir();
1007 for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
1008 match tokio::fs::read_dir(p).await {
1009 Ok(mut dir_handle) => {
1010 let diff = std::time::Duration::from_secs(60 * 60);
1012 let keep_files_newer_than = SystemTime::now()
1013 .checked_sub(diff)
1014 .unwrap_or(SystemTime::UNIX_EPOCH);
1015
1016 while let Ok(Some(entry)) = dir_handle.next_entry().await {
1017 let name_f = entry.file_name();
1018 let name_s = name_f.to_string_lossy();
1019
1020 if p == blobdir
1021 && (is_file_in_use(&files_in_use, None, &name_s)
1022 || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
1023 || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
1024 {
1025 continue;
1026 }
1027
1028 let stats = match tokio::fs::metadata(entry.path()).await {
1029 Err(err) => {
1030 warn!(
1031 context,
1032 "Cannot get metadata for {}: {:#}.",
1033 entry.path().display(),
1034 err
1035 );
1036 continue;
1037 }
1038 Ok(stats) => stats,
1039 };
1040
1041 if stats.is_dir() {
1042 if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
1043 info!(
1046 context,
1047 "Housekeeping: Cannot rmdir {}: {:#}.",
1048 entry.path().display(),
1049 e
1050 );
1051 }
1052 continue;
1053 }
1054
1055 unreferenced_count += 1;
1056 let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
1057 let recently_modified =
1058 stats.modified().is_ok_and(|t| t > keep_files_newer_than);
1059 let recently_accessed =
1060 stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
1061
1062 if p == blobdir && (recently_created || recently_modified || recently_accessed)
1063 {
1064 info!(
1065 context,
1066 "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
1067 unreferenced_count,
1068 entry.file_name(),
1069 );
1070 continue;
1071 }
1072
1073 info!(
1074 context,
1075 "Housekeeping: Deleting unreferenced file #{}: {:?}.",
1076 unreferenced_count,
1077 entry.file_name()
1078 );
1079 let path = entry.path();
1080 if let Err(err) = delete_file(context, &path).await {
1081 error!(
1082 context,
1083 "Failed to delete unused file {}: {:#}.",
1084 path.display(),
1085 err
1086 );
1087 }
1088 }
1089 }
1090 Err(err) => {
1091 if !p.ends_with(BLOBS_BACKUP_NAME) {
1092 warn!(
1093 context,
1094 "Housekeeping: Cannot read dir {}: {:#}.",
1095 p.display(),
1096 err
1097 );
1098 }
1099 }
1100 }
1101 }
1102
1103 Ok(())
1104}
1105
1106fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
1107 let name_to_check = if let Some(namespc) = namespc_opt {
1108 let Some(name) = name.strip_suffix(namespc) else {
1109 return false;
1110 };
1111 name
1112 } else {
1113 name
1114 };
1115 files_in_use.contains(name_to_check)
1116}
1117
1118fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1119 if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1120 files_in_use.insert(file.to_string());
1121 }
1122}
1123
1124async fn maybe_add_from_param(
1125 sql: &Sql,
1126 files_in_use: &mut HashSet<String>,
1127 query: &str,
1128 param_id: Param,
1129) -> Result<()> {
1130 sql.query_map(
1131 query,
1132 (),
1133 |row| {
1134 let row: String = row.get(0)?;
1135 Ok(row)
1136 },
1137 |rows| {
1138 for row in rows {
1139 let param: Params = row?.parse().unwrap_or_default();
1140 if let Some(file) = param.get(param_id) {
1141 maybe_add_file(files_in_use, file);
1142 }
1143 }
1144 Ok(())
1145 },
1146 )
1147 .await
1148 .context(format!("housekeeping: failed to add_from_param {query}"))?;
1149
1150 Ok(())
1151}
1152
1153async fn prune_tombstones(sql: &Sql) -> Result<()> {
1156 let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1158 sql.execute(
1159 "DELETE FROM msgs
1160 WHERE chat_id=?
1161 AND timestamp<=?
1162 AND NOT EXISTS (
1163 SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1164 )",
1165 (DC_CHAT_ID_TRASH, timestamp_max),
1166 )
1167 .await?;
1168 Ok(())
1169}
1170
1171#[cfg(test)]
1172mod sql_tests;