1use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, bail};
8use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
9use tokio::sync::RwLock;
10
11use crate::blob::BlobObject;
12use crate::chat::add_device_msg;
13use crate::config::Config;
14use crate::constants::DC_CHAT_ID_TRASH;
15use crate::context::Context;
16use crate::debug_logging::set_debug_logging_xdc;
17use crate::ephemeral::start_ephemeral_timers;
18use crate::imex::BLOBS_BACKUP_NAME;
19use crate::location::delete_orphaned_poi_locations;
20use crate::log::{LogExt, warn};
21use crate::message::Message;
22use crate::message::MsgId;
23use crate::net::dns::prune_dns_cache;
24use crate::net::http::http_cache_cleanup;
25use crate::net::prune_connection_history;
26use crate::param::{Param, Params};
27use crate::stock_str;
28use crate::tools::{SystemTime, delete_file, time};
29
30pub trait ToSql: rusqlite::ToSql + Send + Sync {}
33
34impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
35
36#[macro_export]
42macro_rules! params_slice {
43 ($($param:expr),+) => {
44 [$(&$param as &dyn $crate::sql::ToSql),+]
45 };
46}
47
48mod migrations;
49mod pool;
50
51use pool::{Pool, WalCheckpointStats};
52
53#[derive(Debug)]
55pub struct Sql {
56 pub(crate) dbfile: PathBuf,
58
59 pool: RwLock<Option<Pool>>,
61
62 is_encrypted: RwLock<Option<bool>>,
65
66 pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
68}
69
70impl Sql {
71 pub fn new(dbfile: PathBuf) -> Sql {
73 Self {
74 dbfile,
75 pool: Default::default(),
76 is_encrypted: Default::default(),
77 config_cache: Default::default(),
78 }
79 }
80
81 pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
89 if self.is_open().await {
90 bail!("Database is already opened.");
91 }
92
93 let _lock = self.pool.write().await;
95
96 let connection = Connection::open(&self.dbfile)?;
98 if !passphrase.is_empty() {
99 connection
100 .pragma_update(None, "key", &passphrase)
101 .context("Failed to set PRAGMA key")?;
102 }
103 let key_is_correct = connection
104 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
105 .is_ok();
106
107 Ok(key_is_correct)
108 }
109
110 pub async fn is_open(&self) -> bool {
112 self.pool.read().await.is_some()
113 }
114
115 pub(crate) async fn is_encrypted(&self) -> Option<bool> {
119 *self.is_encrypted.read().await
120 }
121
122 pub(crate) async fn close(&self) {
124 let _ = self.pool.write().await.take();
125 }
127
128 pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
130 let path_str = path
131 .to_str()
132 .with_context(|| format!("path {path:?} is not valid unicode"))?
133 .to_string();
134
135 let mut config_cache = self.config_cache.write().await;
138 config_cache.clear();
139
140 let query_only = false;
141 self.call(query_only, move |conn| {
142 conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
144 .context("failed to attach backup database")?;
145 let res = conn
146 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
147 .context("backup passphrase is not correct");
148
149 res.and_then(|_| {
154 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
155 .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
156 })
157 .and_then(|_| {
158 conn.execute("VACUUM", [])
159 .context("failed to vacuum the database")
160 })
161 .and(
162 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
163 .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
164 )
165 .and_then(|_| {
166 conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
167 Ok(())
168 })
169 .context("failed to import from attached backup database")
170 })
171 .and(
172 conn.execute("DETACH DATABASE backup", [])
173 .context("failed to detach backup database"),
174 )?;
175 Ok(())
176 })
177 .await
178 }
179
180 const N_DB_CONNECTIONS: usize = 3;
181
182 fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
184 let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
185 for _ in 0..Self::N_DB_CONNECTIONS {
186 let connection = new_connection(dbfile, &passphrase)?;
187 connections.push(connection);
188 }
189
190 let pool = Pool::new(connections);
191 Ok(pool)
192 }
193
194 async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
195 *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
196
197 if let Err(e) = self.run_migrations(context).await {
198 error!(context, "Running migrations failed: {e:#}");
199 eprintln!("Running migrations failed: {e:#}");
204 context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
205 }
210
211 Ok(())
212 }
213
214 pub async fn run_migrations(&self, context: &Context) -> Result<()> {
216 let recode_avatar = migrations::run(context, self)
221 .await
222 .context("failed to run migrations")?;
223
224 if recode_avatar && let Some(avatar) = context.get_config(Config::Selfavatar).await? {
228 let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
229 match blob.recode_to_avatar_size(context).await {
230 Ok(()) => {
231 if let Some(path) = blob.to_abs_path().to_str() {
232 context
233 .set_config_internal(Config::Selfavatar, Some(path))
234 .await?;
235 } else {
236 warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
237 }
238 }
239 Err(e) => {
240 warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
241 context
242 .set_config_internal(Config::Selfavatar, None)
243 .await?
244 }
245 }
246 }
247
248 Ok(())
249 }
250
251 pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
254 if self.is_open().await {
255 error!(
256 context,
257 "Cannot open, database \"{:?}\" already opened.", self.dbfile,
258 );
259 bail!("SQL database is already opened.");
260 }
261
262 let passphrase_nonempty = !passphrase.is_empty();
263 self.try_open(context, &self.dbfile, passphrase).await?;
264 info!(context, "Opened database {:?}.", self.dbfile);
265 *self.is_encrypted.write().await = Some(passphrase_nonempty);
266
267 if let Some(xdc_id) = self
269 .get_raw_config_u32(Config::DebugLogging.as_ref())
270 .await?
271 {
272 set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
273 }
274 Ok(())
275 }
276
277 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
283 let mut lock = self.pool.write().await;
284
285 let pool = lock.take().context("SQL connection pool is not open")?;
286 let query_only = false;
287 let conn = pool.get(query_only).await?;
288 if !passphrase.is_empty() {
289 conn.pragma_update(None, "rekey", passphrase.clone())
290 .context("Failed to set PRAGMA rekey")?;
291 }
292 drop(pool);
293
294 *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
295
296 Ok(())
297 }
298
299 async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
306 where
307 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
308 R: Send + 'static,
309 {
310 let lock = self.pool.read().await;
311 let pool = lock.as_ref().context("no SQL connection")?;
312 let mut conn = pool.get(query_only).await?;
313 let res = tokio::task::block_in_place(move || function(&mut conn))?;
314 Ok(res)
315 }
316
317 pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
322 where
323 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
324 R: Send + 'static,
325 {
326 let query_only = false;
327 self.call(query_only, function).await
328 }
329
330 pub async fn execute(
332 &self,
333 query: &str,
334 params: impl rusqlite::Params + Send,
335 ) -> Result<usize> {
336 self.call_write(move |conn| {
337 let res = conn.execute(query, params)?;
338 Ok(res)
339 })
340 .await
341 }
342
343 pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
345 self.call_write(move |conn| {
346 conn.execute(query, params)?;
347 Ok(conn.last_insert_rowid())
348 })
349 .await
350 }
351
352 pub async fn query_map<T, F, G, H>(
356 &self,
357 sql: &str,
358 params: impl rusqlite::Params + Send,
359 f: F,
360 g: G,
361 ) -> Result<H>
362 where
363 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
364 G: Send + FnOnce(rusqlite::AndThenRows<F>) -> Result<H>,
365 H: Send + 'static,
366 {
367 let query_only = true;
368 self.call(query_only, move |conn| {
369 let mut stmt = conn.prepare(sql)?;
370 let res = stmt.query_and_then(params, f)?;
371 g(res)
372 })
373 .await
374 }
375
376 pub async fn query_map_collect<T, C, F>(
380 &self,
381 sql: &str,
382 params: impl rusqlite::Params + Send,
383 f: F,
384 ) -> Result<C>
385 where
386 T: Send + 'static,
387 C: Send + 'static + std::iter::FromIterator<T>,
388 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
389 {
390 self.query_map(sql, params, f, |rows| {
391 rows.collect::<std::result::Result<C, _>>()
392 })
393 .await
394 }
395
396 pub async fn query_map_vec<T, F>(
400 &self,
401 sql: &str,
402 params: impl rusqlite::Params + Send,
403 f: F,
404 ) -> Result<Vec<T>>
405 where
406 T: Send + 'static,
407 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
408 {
409 self.query_map_collect(sql, params, f).await
410 }
411
412 pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
414 let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
415 Ok(usize::try_from(count)?)
416 }
417
418 pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
421 let count = self.count(sql, params).await?;
422 Ok(count > 0)
423 }
424
425 pub async fn query_row<T, F>(
427 &self,
428 query: &str,
429 params: impl rusqlite::Params + Send,
430 f: F,
431 ) -> Result<T>
432 where
433 F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
434 T: Send + 'static,
435 {
436 let query_only = true;
437 self.call(query_only, move |conn| {
438 let res = conn.query_row(query, params, f)?;
439 Ok(res)
440 })
441 .await
442 }
443
444 pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
449 where
450 H: Send + 'static,
451 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
452 {
453 let query_only = false;
454 self.transaction_ex(query_only, callback).await
455 }
456
457 pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
472 where
473 H: Send + 'static,
474 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
475 {
476 self.call(query_only, move |conn| {
477 let mut transaction = conn.transaction()?;
478 let ret = callback(&mut transaction);
479
480 match ret {
481 Ok(ret) => {
482 transaction.commit()?;
483 Ok(ret)
484 }
485 Err(err) => {
486 transaction.rollback()?;
487 Err(err)
488 }
489 }
490 })
491 .await
492 }
493
494 pub async fn table_exists(&self, name: &str) -> Result<bool> {
496 let query_only = true;
497 self.call(query_only, move |conn| {
498 let mut exists = false;
499 conn.pragma(None, "table_info", name.to_string(), |_row| {
500 exists = true;
502 Ok(())
503 })?;
504
505 Ok(exists)
506 })
507 .await
508 }
509
510 pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
512 let query_only = true;
513 self.call(query_only, move |conn| {
514 let mut exists = false;
515 conn.pragma(None, "table_info", table_name.to_string(), |row| {
518 let curr_name: String = row.get(1)?;
519 if col_name == curr_name {
520 exists = true;
521 }
522 Ok(())
523 })?;
524
525 Ok(exists)
526 })
527 .await
528 }
529
530 pub async fn query_row_optional<T, F>(
532 &self,
533 sql: &str,
534 params: impl rusqlite::Params + Send,
535 f: F,
536 ) -> Result<Option<T>>
537 where
538 F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
539 T: Send + 'static,
540 {
541 let query_only = true;
542 self.call(query_only, move |conn| {
543 match conn.query_row(sql.as_ref(), params, f) {
544 Ok(res) => Ok(Some(res)),
545 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
546 Err(err) => Err(err.into()),
547 }
548 })
549 .await
550 }
551
552 pub async fn query_get_value<T>(
555 &self,
556 query: &str,
557 params: impl rusqlite::Params + Send,
558 ) -> Result<Option<T>>
559 where
560 T: rusqlite::types::FromSql + Send + 'static,
561 {
562 self.query_row_optional(query, params, |row| row.get::<_, T>(0))
563 .await
564 }
565
566 pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
571 let mut lock = self.config_cache.write().await;
572 if let Some(value) = value {
573 self.execute(
574 "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
575 (key, value),
576 )
577 .await?;
578 } else {
579 self.execute("DELETE FROM config WHERE keyname=?", (key,))
580 .await?;
581 }
582 lock.insert(key.to_string(), value.map(|s| s.to_string()));
583 drop(lock);
584
585 Ok(())
586 }
587
588 pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
590 let lock = self.config_cache.read().await;
591 let cached = lock.get(key).cloned();
592 drop(lock);
593
594 if let Some(c) = cached {
595 return Ok(c);
596 }
597
598 let mut lock = self.config_cache.write().await;
599 let value = self
600 .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
601 .await
602 .context(format!("failed to fetch raw config: {key}"))?;
603 lock.insert(key.to_string(), value.clone());
604 drop(lock);
605
606 Ok(value)
607 }
608
609 pub(crate) async fn uncache_raw_config(&self, key: &str) {
611 let mut lock = self.config_cache.write().await;
612 lock.remove(key);
613 }
614
615 pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
617 self.set_raw_config(key, Some(&format!("{value}"))).await
618 }
619
620 pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
622 self.get_raw_config(key)
623 .await
624 .map(|s| s.and_then(|s| s.parse().ok()))
625 }
626
627 pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
629 self.get_raw_config(key)
630 .await
631 .map(|s| s.and_then(|s| s.parse().ok()))
632 }
633
634 pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
636 let res = self.get_raw_config_int(key).await?;
639 Ok(res.unwrap_or_default() > 0)
640 }
641
642 pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
644 let value = if value { Some("1") } else { None };
645 self.set_raw_config(key, value).await
646 }
647
648 pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
650 self.set_raw_config(key, Some(&format!("{value}"))).await
651 }
652
653 pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
655 self.get_raw_config(key)
656 .await
657 .map(|s| s.and_then(|r| r.parse().ok()))
658 }
659
660 #[cfg(feature = "internals")]
662 pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
663 &self.config_cache
664 }
665
666 pub(crate) async fn wal_checkpoint(&self, context: &Context) -> Result<()> {
668 let lock = self.pool.read().await;
669 let Some(pool) = lock.as_ref() else {
670 return Ok(());
672 };
673
674 let WalCheckpointStats {
675 total_duration,
676 writers_blocked_duration,
677 readers_blocked_duration,
678 pages_total,
679 pages_checkpointed,
680 } = pool.wal_checkpoint().await?;
681 if pages_checkpointed < pages_total {
682 warn!(
683 context,
684 "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
685 );
686 }
687 info!(
688 context,
689 "wal_checkpoint: Total time: {total_duration:?}. Writers blocked for: {writers_blocked_duration:?}. Readers blocked for: {readers_blocked_duration:?}."
690 );
691 Ok(())
692 }
693}
694
695fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
702 let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
703 | OpenFlags::SQLITE_OPEN_READ_WRITE
704 | OpenFlags::SQLITE_OPEN_CREATE;
705 let conn = Connection::open_with_flags(path, flags)?;
706 conn.execute_batch(
707 "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
708 PRAGMA secure_delete=on;
709 PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
710 PRAGMA foreign_keys=on;
711 ",
712 )?;
713
714 if cfg!(not(target_os = "ios")) {
718 conn.pragma_update(None, "temp_store", "memory")?;
719 }
720
721 if cfg!(target_os = "ios") {
732 conn.busy_timeout(Duration::new(60, 0))?;
733 } else {
734 conn.busy_timeout(Duration::ZERO)?;
735 }
736
737 if !passphrase.is_empty() {
738 conn.pragma_update(None, "key", passphrase)?;
739 }
740 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
747
748 conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
749 conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
751
752 Ok(conn)
753}
754
755#[expect(clippy::arithmetic_side_effects)]
759async fn incremental_vacuum(context: &Context) -> Result<()> {
760 context
761 .sql
762 .call_write(move |conn| {
763 let mut stmt = conn
764 .prepare("PRAGMA incremental_vacuum")
765 .context("Failed to prepare incremental_vacuum statement")?;
766
767 let mut rows = stmt
771 .query(())
772 .context("Failed to run incremental_vacuum statement")?;
773 let mut row_count = 0;
774 while let Some(_row) = rows
775 .next()
776 .context("Failed to step incremental_vacuum statement")?
777 {
778 row_count += 1;
779 }
780 info!(context, "Incremental vacuum freed {row_count} pages.");
781 Ok(())
782 })
783 .await
784}
785
786pub async fn housekeeping(context: &Context) -> Result<()> {
788 let Ok(_housekeeping_lock) = context.housekeeping_mutex.try_lock() else {
789 return Ok(());
791 };
792 if let Err(e) = context
795 .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
796 .await
797 {
798 warn!(context, "Can't set config: {e:#}.");
799 }
800
801 http_cache_cleanup(context)
802 .await
803 .context("Failed to cleanup HTTP cache")
804 .log_err(context)
805 .ok();
806 migrations::msgs_to_key_contacts(context)
807 .await
808 .context("migrations::msgs_to_key_contacts")
809 .log_err(context)
810 .ok();
811
812 if let Err(err) = remove_unused_files(context).await {
813 warn!(
814 context,
815 "Housekeeping: cannot remove unused files: {:#}.", err
816 );
817 }
818
819 if let Err(err) = start_ephemeral_timers(context).await {
820 warn!(
821 context,
822 "Housekeeping: cannot start ephemeral timers: {:#}.", err
823 );
824 }
825
826 if let Err(err) = prune_tombstones(&context.sql).await {
827 warn!(
828 context,
829 "Housekeeping: Cannot prune message tombstones: {:#}.", err
830 );
831 }
832
833 maybe_add_mvbox_move_deprecation_message(context)
834 .await
835 .context("maybe_add_mvbox_move_deprecation_message")
836 .log_err(context)
837 .ok();
838
839 if let Err(err) = incremental_vacuum(context).await {
840 warn!(context, "Failed to run incremental vacuum: {err:#}.");
841 }
842 if let Err(err) = Sql::wal_checkpoint(&context.sql, context).await {
847 warn!(context, "wal_checkpoint() failed: {err:#}.");
848 debug_assert!(false);
849 }
850
851 context
852 .sql
853 .execute(
854 "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
855 (SELECT id FROM msgs WHERE chat_id!=?)",
856 (DC_CHAT_ID_TRASH,),
857 )
858 .await
859 .context("failed to remove old MDNs")
860 .log_err(context)
861 .ok();
862
863 context
864 .sql
865 .execute(
866 "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
867 (SELECT id FROM msgs WHERE chat_id!=?)",
868 (DC_CHAT_ID_TRASH,),
869 )
870 .await
871 .context("failed to remove old webxdc status updates")
872 .log_err(context)
873 .ok();
874
875 prune_connection_history(context)
876 .await
877 .context("Failed to prune connection history")
878 .log_err(context)
879 .ok();
880 prune_dns_cache(context)
881 .await
882 .context("Failed to prune DNS cache")
883 .log_err(context)
884 .ok();
885
886 delete_orphaned_poi_locations(context)
889 .await
890 .context("Failed to delete orphaned POI locations")
891 .log_err(context)
892 .ok();
893
894 info!(context, "Housekeeping done.");
895 Ok(())
896}
897
898async fn maybe_add_mvbox_move_deprecation_message(context: &Context) -> Result<()> {
901 if !context.get_config_bool(Config::OnlyFetchMvbox).await?
902 && context.get_config_bool(Config::MvboxMove).await?
903 {
904 let mut msg = Message::new_text(stock_str::mvbox_move_deprecation(context).await);
905 add_device_msg(context, Some("mvbox_move_deprecation"), Some(&mut msg)).await?;
906 }
907 Ok(())
908}
909
910pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
912 row.get(idx).or_else(|err| match row.get_ref(idx)? {
913 ValueRef::Null => Ok(Vec::new()),
914 ValueRef::Text(text) => Ok(text.to_vec()),
915 ValueRef::Blob(blob) => Ok(blob.to_vec()),
916 ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
917 })
918}
919
920#[expect(clippy::arithmetic_side_effects)]
922pub async fn remove_unused_files(context: &Context) -> Result<()> {
923 let mut files_in_use = HashSet::new();
924 let mut unreferenced_count = 0;
925
926 info!(context, "Start housekeeping...");
927 maybe_add_from_param(
928 &context.sql,
929 &mut files_in_use,
930 "SELECT param FROM msgs WHERE chat_id!=3 AND type!=10;",
931 Param::File,
932 )
933 .await?;
934 maybe_add_from_param(
935 &context.sql,
936 &mut files_in_use,
937 "SELECT param FROM chats;",
938 Param::ProfileImage,
939 )
940 .await?;
941 maybe_add_from_param(
942 &context.sql,
943 &mut files_in_use,
944 "SELECT param FROM contacts;",
945 Param::ProfileImage,
946 )
947 .await?;
948
949 context
950 .sql
951 .query_map(
952 "SELECT value FROM config;",
953 (),
954 |row| {
955 let row: String = row.get(0)?;
956 Ok(row)
957 },
958 |rows| {
959 for row in rows {
960 maybe_add_file(&mut files_in_use, &row?);
961 }
962 Ok(())
963 },
964 )
965 .await
966 .context("housekeeping: failed to SELECT value FROM config")?;
967
968 context
969 .sql
970 .query_map(
971 "SELECT blobname FROM http_cache",
972 (),
973 |row| {
974 let row: String = row.get(0)?;
975 Ok(row)
976 },
977 |rows| {
978 for row in rows {
979 maybe_add_file(&mut files_in_use, &row?);
980 }
981 Ok(())
982 },
983 )
984 .await
985 .context("Failed to SELECT blobname FROM http_cache")?;
986
987 info!(context, "{} files in use.", files_in_use.len());
988 let blobdir = context.get_blobdir();
990 for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
991 match tokio::fs::read_dir(p).await {
992 Ok(mut dir_handle) => {
993 let diff = std::time::Duration::from_secs(60 * 60);
995 let keep_files_newer_than = SystemTime::now()
996 .checked_sub(diff)
997 .unwrap_or(SystemTime::UNIX_EPOCH);
998
999 while let Ok(Some(entry)) = dir_handle.next_entry().await {
1000 let name_f = entry.file_name();
1001 let name_s = name_f.to_string_lossy();
1002
1003 if p == blobdir
1004 && (is_file_in_use(&files_in_use, None, &name_s)
1005 || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
1006 || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
1007 {
1008 continue;
1009 }
1010
1011 let stats = match tokio::fs::metadata(entry.path()).await {
1012 Err(err) => {
1013 warn!(
1014 context,
1015 "Cannot get metadata for {}: {:#}.",
1016 entry.path().display(),
1017 err
1018 );
1019 continue;
1020 }
1021 Ok(stats) => stats,
1022 };
1023
1024 if stats.is_dir() {
1025 if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
1026 info!(
1029 context,
1030 "Housekeeping: Cannot rmdir {}: {:#}.",
1031 entry.path().display(),
1032 e
1033 );
1034 }
1035 continue;
1036 }
1037
1038 unreferenced_count += 1;
1039 let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
1040 let recently_modified =
1041 stats.modified().is_ok_and(|t| t > keep_files_newer_than);
1042 let recently_accessed =
1043 stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
1044
1045 if p == blobdir && (recently_created || recently_modified || recently_accessed)
1046 {
1047 info!(
1048 context,
1049 "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
1050 unreferenced_count,
1051 entry.file_name(),
1052 );
1053 continue;
1054 }
1055
1056 info!(
1057 context,
1058 "Housekeeping: Deleting unreferenced file #{}: {:?}.",
1059 unreferenced_count,
1060 entry.file_name()
1061 );
1062 let path = entry.path();
1063 if let Err(err) = delete_file(context, &path).await {
1064 error!(
1065 context,
1066 "Failed to delete unused file {}: {:#}.",
1067 path.display(),
1068 err
1069 );
1070 }
1071 }
1072 }
1073 Err(err) => {
1074 if !p.ends_with(BLOBS_BACKUP_NAME) {
1075 warn!(
1076 context,
1077 "Housekeeping: Cannot read dir {}: {:#}.",
1078 p.display(),
1079 err
1080 );
1081 }
1082 }
1083 }
1084 }
1085
1086 Ok(())
1087}
1088
1089fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
1090 let name_to_check = if let Some(namespc) = namespc_opt {
1091 let Some(name) = name.strip_suffix(namespc) else {
1092 return false;
1093 };
1094 name
1095 } else {
1096 name
1097 };
1098 files_in_use.contains(name_to_check)
1099}
1100
1101fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1102 if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1103 files_in_use.insert(file.to_string());
1104 }
1105}
1106
1107async fn maybe_add_from_param(
1108 sql: &Sql,
1109 files_in_use: &mut HashSet<String>,
1110 query: &str,
1111 param_id: Param,
1112) -> Result<()> {
1113 sql.query_map(
1114 query,
1115 (),
1116 |row| {
1117 let row: String = row.get(0)?;
1118 Ok(row)
1119 },
1120 |rows| {
1121 for row in rows {
1122 let param: Params = row?.parse().unwrap_or_default();
1123 if let Some(file) = param.get(param_id) {
1124 maybe_add_file(files_in_use, file);
1125 }
1126 }
1127 Ok(())
1128 },
1129 )
1130 .await
1131 .context(format!("housekeeping: failed to add_from_param {query}"))?;
1132
1133 Ok(())
1134}
1135
1136async fn prune_tombstones(sql: &Sql) -> Result<()> {
1139 let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1141 sql.execute(
1142 "DELETE FROM msgs
1143 WHERE chat_id=?
1144 AND timestamp<=?
1145 AND NOT EXISTS (
1146 SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1147 )",
1148 (DC_CHAT_ID_TRASH, timestamp_max),
1149 )
1150 .await?;
1151 Ok(())
1152}
1153
1154#[cfg(test)]
1155mod sql_tests;