1use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5
6use anyhow::{Context as _, Result, bail};
7use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
8use tokio::sync::RwLock;
9
10use crate::blob::BlobObject;
11use crate::chat::add_device_msg;
12use crate::config::Config;
13use crate::constants::DC_CHAT_ID_TRASH;
14use crate::context::Context;
15use crate::debug_logging::set_debug_logging_xdc;
16use crate::ephemeral::start_ephemeral_timers;
17use crate::imex::BLOBS_BACKUP_NAME;
18use crate::location::delete_orphaned_poi_locations;
19use crate::log::{LogExt, error, info, warn};
20use crate::message::{Message, MsgId};
21use crate::net::dns::prune_dns_cache;
22use crate::net::http::http_cache_cleanup;
23use crate::net::prune_connection_history;
24use crate::param::{Param, Params};
25use crate::stock_str;
26use crate::tools::{SystemTime, delete_file, time};
27
28pub trait ToSql: rusqlite::ToSql + Send + Sync {}
31
32impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
33
34#[macro_export]
40macro_rules! params_slice {
41 ($($param:expr),+) => {
42 [$(&$param as &dyn $crate::sql::ToSql),+]
43 };
44}
45
46mod migrations;
47mod pool;
48
49use pool::Pool;
50
51#[derive(Debug)]
53pub struct Sql {
54 pub(crate) dbfile: PathBuf,
56
57 pool: RwLock<Option<Pool>>,
59
60 is_encrypted: RwLock<Option<bool>>,
63
64 pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
66}
67
68impl Sql {
69 pub fn new(dbfile: PathBuf) -> Sql {
71 Self {
72 dbfile,
73 pool: Default::default(),
74 is_encrypted: Default::default(),
75 config_cache: Default::default(),
76 }
77 }
78
79 pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
87 if self.is_open().await {
88 bail!("Database is already opened.");
89 }
90
91 let _lock = self.pool.write().await;
93
94 let connection = Connection::open(&self.dbfile)?;
96 if !passphrase.is_empty() {
97 connection
98 .pragma_update(None, "key", &passphrase)
99 .context("Failed to set PRAGMA key")?;
100 }
101 let key_is_correct = connection
102 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
103 .is_ok();
104
105 Ok(key_is_correct)
106 }
107
108 pub async fn is_open(&self) -> bool {
110 self.pool.read().await.is_some()
111 }
112
113 pub(crate) async fn is_encrypted(&self) -> Option<bool> {
117 *self.is_encrypted.read().await
118 }
119
120 pub(crate) async fn close(&self) {
122 let _ = self.pool.write().await.take();
123 }
125
126 pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
128 let path_str = path
129 .to_str()
130 .with_context(|| format!("path {path:?} is not valid unicode"))?
131 .to_string();
132
133 let mut config_cache = self.config_cache.write().await;
136 config_cache.clear();
137
138 let query_only = false;
139 self.call(query_only, move |conn| {
140 conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
142 .context("failed to attach backup database")?;
143 let res = conn
144 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
145 .context("backup passphrase is not correct");
146
147 res.and_then(|_| {
152 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
153 .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
154 })
155 .and_then(|_| {
156 conn.execute("VACUUM", [])
157 .context("failed to vacuum the database")
158 })
159 .and(
160 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
161 .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
162 )
163 .and_then(|_| {
164 conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
165 Ok(())
166 })
167 .context("failed to import from attached backup database")
168 })
169 .and(
170 conn.execute("DETACH DATABASE backup", [])
171 .context("failed to detach backup database"),
172 )?;
173 Ok(())
174 })
175 .await
176 }
177
178 fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
180 let mut connections = Vec::new();
181 for _ in 0..3 {
182 let connection = new_connection(dbfile, &passphrase)?;
183 connections.push(connection);
184 }
185
186 let pool = Pool::new(connections);
187 Ok(pool)
188 }
189
190 async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
191 *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
192
193 if let Err(e) = self.run_migrations(context).await {
194 error!(context, "Running migrations failed: {e:#}");
195 eprintln!("Running migrations failed: {e:#}");
200 context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
201 }
206
207 Ok(())
208 }
209
210 pub async fn run_migrations(&self, context: &Context) -> Result<()> {
212 let (_update_icons, disable_server_delete, recode_avatar) = migrations::run(context, self)
218 .await
219 .context("failed to run migrations")?;
220
221 if disable_server_delete {
225 if context.get_config_delete_server_after().await?.is_some() {
228 let mut msg = Message::new_text(stock_str::delete_server_turned_off(context).await);
229 add_device_msg(context, None, Some(&mut msg)).await?;
230 context
231 .set_config_internal(Config::DeleteServerAfter, Some("0"))
232 .await?;
233 }
234 }
235
236 if recode_avatar {
237 if let Some(avatar) = context.get_config(Config::Selfavatar).await? {
238 let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
239 match blob.recode_to_avatar_size(context).await {
240 Ok(()) => {
241 if let Some(path) = blob.to_abs_path().to_str() {
242 context
243 .set_config_internal(Config::Selfavatar, Some(path))
244 .await?;
245 } else {
246 warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
247 }
248 }
249 Err(e) => {
250 warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
251 context
252 .set_config_internal(Config::Selfavatar, None)
253 .await?
254 }
255 }
256 }
257 }
258
259 Ok(())
260 }
261
262 pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
265 if self.is_open().await {
266 error!(
267 context,
268 "Cannot open, database \"{:?}\" already opened.", self.dbfile,
269 );
270 bail!("SQL database is already opened.");
271 }
272
273 let passphrase_nonempty = !passphrase.is_empty();
274 self.try_open(context, &self.dbfile, passphrase).await?;
275 info!(context, "Opened database {:?}.", self.dbfile);
276 *self.is_encrypted.write().await = Some(passphrase_nonempty);
277
278 if let Some(xdc_id) = self
280 .get_raw_config_u32(Config::DebugLogging.as_ref())
281 .await?
282 {
283 set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
284 }
285 Ok(())
286 }
287
288 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
294 let mut lock = self.pool.write().await;
295
296 let pool = lock.take().context("SQL connection pool is not open")?;
297 let query_only = false;
298 let conn = pool.get(query_only).await?;
299 if !passphrase.is_empty() {
300 conn.pragma_update(None, "rekey", passphrase.clone())
301 .context("Failed to set PRAGMA rekey")?;
302 }
303 drop(pool);
304
305 *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
306
307 Ok(())
308 }
309
310 async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
317 where
318 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
319 R: Send + 'static,
320 {
321 let lock = self.pool.read().await;
322 let pool = lock.as_ref().context("no SQL connection")?;
323 let mut conn = pool.get(query_only).await?;
324 let res = tokio::task::block_in_place(move || function(&mut conn))?;
325 Ok(res)
326 }
327
328 pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
333 where
334 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
335 R: Send + 'static,
336 {
337 let query_only = false;
338 self.call(query_only, function).await
339 }
340
341 pub async fn execute(
343 &self,
344 query: &str,
345 params: impl rusqlite::Params + Send,
346 ) -> Result<usize> {
347 self.call_write(move |conn| {
348 let res = conn.execute(query, params)?;
349 Ok(res)
350 })
351 .await
352 }
353
354 pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
356 self.call_write(move |conn| {
357 conn.execute(query, params)?;
358 Ok(conn.last_insert_rowid())
359 })
360 .await
361 }
362
363 pub async fn query_map<T, F, G, H>(
367 &self,
368 sql: &str,
369 params: impl rusqlite::Params + Send,
370 f: F,
371 mut g: G,
372 ) -> Result<H>
373 where
374 F: Send + FnMut(&rusqlite::Row) -> rusqlite::Result<T>,
375 G: Send + FnMut(rusqlite::MappedRows<F>) -> Result<H>,
376 H: Send + 'static,
377 {
378 let query_only = true;
379 self.call(query_only, move |conn| {
380 let mut stmt = conn.prepare(sql)?;
381 let res = stmt.query_map(params, f)?;
382 g(res)
383 })
384 .await
385 }
386
387 pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
389 let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
390 Ok(usize::try_from(count)?)
391 }
392
393 pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
396 let count = self.count(sql, params).await?;
397 Ok(count > 0)
398 }
399
400 pub async fn query_row<T, F>(
402 &self,
403 query: &str,
404 params: impl rusqlite::Params + Send,
405 f: F,
406 ) -> Result<T>
407 where
408 F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
409 T: Send + 'static,
410 {
411 let query_only = true;
412 self.call(query_only, move |conn| {
413 let res = conn.query_row(query, params, f)?;
414 Ok(res)
415 })
416 .await
417 }
418
419 pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
424 where
425 H: Send + 'static,
426 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
427 {
428 let query_only = false;
429 self.transaction_ex(query_only, callback).await
430 }
431
432 pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
447 where
448 H: Send + 'static,
449 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
450 {
451 self.call(query_only, move |conn| {
452 let mut transaction = conn.transaction()?;
453 let ret = callback(&mut transaction);
454
455 match ret {
456 Ok(ret) => {
457 transaction.commit()?;
458 Ok(ret)
459 }
460 Err(err) => {
461 transaction.rollback()?;
462 Err(err)
463 }
464 }
465 })
466 .await
467 }
468
469 pub async fn table_exists(&self, name: &str) -> Result<bool> {
471 let query_only = true;
472 self.call(query_only, move |conn| {
473 let mut exists = false;
474 conn.pragma(None, "table_info", name.to_string(), |_row| {
475 exists = true;
477 Ok(())
478 })?;
479
480 Ok(exists)
481 })
482 .await
483 }
484
485 pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
487 let query_only = true;
488 self.call(query_only, move |conn| {
489 let mut exists = false;
490 conn.pragma(None, "table_info", table_name.to_string(), |row| {
493 let curr_name: String = row.get(1)?;
494 if col_name == curr_name {
495 exists = true;
496 }
497 Ok(())
498 })?;
499
500 Ok(exists)
501 })
502 .await
503 }
504
505 pub async fn query_row_optional<T, F>(
507 &self,
508 sql: &str,
509 params: impl rusqlite::Params + Send,
510 f: F,
511 ) -> Result<Option<T>>
512 where
513 F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
514 T: Send + 'static,
515 {
516 let query_only = true;
517 self.call(query_only, move |conn| {
518 match conn.query_row(sql.as_ref(), params, f) {
519 Ok(res) => Ok(Some(res)),
520 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
521 Err(err) => Err(err.into()),
522 }
523 })
524 .await
525 }
526
527 pub async fn query_get_value<T>(
530 &self,
531 query: &str,
532 params: impl rusqlite::Params + Send,
533 ) -> Result<Option<T>>
534 where
535 T: rusqlite::types::FromSql + Send + 'static,
536 {
537 self.query_row_optional(query, params, |row| row.get::<_, T>(0))
538 .await
539 }
540
541 pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
546 let mut lock = self.config_cache.write().await;
547 if let Some(value) = value {
548 self.execute(
549 "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
550 (key, value),
551 )
552 .await?;
553 } else {
554 self.execute("DELETE FROM config WHERE keyname=?", (key,))
555 .await?;
556 }
557 lock.insert(key.to_string(), value.map(|s| s.to_string()));
558 drop(lock);
559
560 Ok(())
561 }
562
563 pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
565 let lock = self.config_cache.read().await;
566 let cached = lock.get(key).cloned();
567 drop(lock);
568
569 if let Some(c) = cached {
570 return Ok(c);
571 }
572
573 let mut lock = self.config_cache.write().await;
574 let value = self
575 .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
576 .await
577 .context(format!("failed to fetch raw config: {key}"))?;
578 lock.insert(key.to_string(), value.clone());
579 drop(lock);
580
581 Ok(value)
582 }
583
584 pub(crate) async fn uncache_raw_config(&self, key: &str) {
586 let mut lock = self.config_cache.write().await;
587 lock.remove(key);
588 }
589
590 pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
592 self.set_raw_config(key, Some(&format!("{value}"))).await
593 }
594
595 pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
597 self.get_raw_config(key)
598 .await
599 .map(|s| s.and_then(|s| s.parse().ok()))
600 }
601
602 pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
604 self.get_raw_config(key)
605 .await
606 .map(|s| s.and_then(|s| s.parse().ok()))
607 }
608
609 pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
611 let res = self.get_raw_config_int(key).await?;
614 Ok(res.unwrap_or_default() > 0)
615 }
616
617 pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
619 let value = if value { Some("1") } else { None };
620 self.set_raw_config(key, value).await
621 }
622
623 pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
625 self.set_raw_config(key, Some(&format!("{value}"))).await
626 }
627
628 pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
630 self.get_raw_config(key)
631 .await
632 .map(|s| s.and_then(|r| r.parse().ok()))
633 }
634
635 #[cfg(feature = "internals")]
637 pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
638 &self.config_cache
639 }
640}
641
642fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
649 let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
650 | OpenFlags::SQLITE_OPEN_READ_WRITE
651 | OpenFlags::SQLITE_OPEN_CREATE;
652 let conn = Connection::open_with_flags(path, flags)?;
653 conn.execute_batch(
654 "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
655 PRAGMA secure_delete=on;
656 PRAGMA busy_timeout = 0; -- fail immediately
657 PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
658 PRAGMA foreign_keys=on;
659 ",
660 )?;
661
662 if cfg!(not(target_os = "ios")) {
666 conn.pragma_update(None, "temp_store", "memory")?;
667 }
668
669 if !passphrase.is_empty() {
670 conn.pragma_update(None, "key", passphrase)?;
671 }
672 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
679
680 conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
681 conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
683
684 Ok(conn)
685}
686
687async fn incremental_vacuum(context: &Context) -> Result<()> {
691 context
692 .sql
693 .call_write(move |conn| {
694 let mut stmt = conn
695 .prepare("PRAGMA incremental_vacuum")
696 .context("Failed to prepare incremental_vacuum statement")?;
697
698 let mut rows = stmt
702 .query(())
703 .context("Failed to run incremental_vacuum statement")?;
704 let mut row_count = 0;
705 while let Some(_row) = rows
706 .next()
707 .context("Failed to step incremental_vacuum statement")?
708 {
709 row_count += 1;
710 }
711 info!(context, "Incremental vacuum freed {row_count} pages.");
712 Ok(())
713 })
714 .await
715}
716
717pub async fn housekeeping(context: &Context) -> Result<()> {
719 if let Err(e) = context
722 .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
723 .await
724 {
725 warn!(context, "Can't set config: {e:#}.");
726 }
727
728 http_cache_cleanup(context)
729 .await
730 .context("Failed to cleanup HTTP cache")
731 .log_err(context)
732 .ok();
733 migrations::msgs_to_key_contacts(context)
734 .await
735 .context("migrations::msgs_to_key_contacts")
736 .log_err(context)
737 .ok();
738
739 if let Err(err) = remove_unused_files(context).await {
740 warn!(
741 context,
742 "Housekeeping: cannot remove unused files: {:#}.", err
743 );
744 }
745
746 if let Err(err) = start_ephemeral_timers(context).await {
747 warn!(
748 context,
749 "Housekeeping: cannot start ephemeral timers: {:#}.", err
750 );
751 }
752
753 if let Err(err) = prune_tombstones(&context.sql).await {
754 warn!(
755 context,
756 "Housekeeping: Cannot prune message tombstones: {:#}.", err
757 );
758 }
759
760 if let Err(err) = incremental_vacuum(context).await {
761 warn!(context, "Failed to run incremental vacuum: {err:#}.");
762 }
763
764 context
765 .sql
766 .execute(
767 "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
768 (SELECT id FROM msgs WHERE chat_id!=?)",
769 (DC_CHAT_ID_TRASH,),
770 )
771 .await
772 .context("failed to remove old MDNs")
773 .log_err(context)
774 .ok();
775
776 context
777 .sql
778 .execute(
779 "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
780 (SELECT id FROM msgs WHERE chat_id!=?)",
781 (DC_CHAT_ID_TRASH,),
782 )
783 .await
784 .context("failed to remove old webxdc status updates")
785 .log_err(context)
786 .ok();
787
788 prune_connection_history(context)
789 .await
790 .context("Failed to prune connection history")
791 .log_err(context)
792 .ok();
793 prune_dns_cache(context)
794 .await
795 .context("Failed to prune DNS cache")
796 .log_err(context)
797 .ok();
798
799 delete_orphaned_poi_locations(context)
802 .await
803 .context("Failed to delete orphaned POI locations")
804 .log_err(context)
805 .ok();
806
807 info!(context, "Housekeeping done.");
808 Ok(())
809}
810
811pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
813 row.get(idx).or_else(|err| match row.get_ref(idx)? {
814 ValueRef::Null => Ok(Vec::new()),
815 ValueRef::Text(text) => Ok(text.to_vec()),
816 ValueRef::Blob(blob) => Ok(blob.to_vec()),
817 ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
818 })
819}
820
821pub async fn remove_unused_files(context: &Context) -> Result<()> {
823 let mut files_in_use = HashSet::new();
824 let mut unreferenced_count = 0;
825
826 info!(context, "Start housekeeping...");
827 maybe_add_from_param(
828 &context.sql,
829 &mut files_in_use,
830 "SELECT param FROM msgs WHERE chat_id!=3 AND type!=10;",
831 Param::File,
832 )
833 .await?;
834 maybe_add_from_param(
835 &context.sql,
836 &mut files_in_use,
837 "SELECT param FROM chats;",
838 Param::ProfileImage,
839 )
840 .await?;
841 maybe_add_from_param(
842 &context.sql,
843 &mut files_in_use,
844 "SELECT param FROM contacts;",
845 Param::ProfileImage,
846 )
847 .await?;
848
849 context
850 .sql
851 .query_map(
852 "SELECT value FROM config;",
853 (),
854 |row| row.get::<_, String>(0),
855 |rows| {
856 for row in rows {
857 maybe_add_file(&mut files_in_use, &row?);
858 }
859 Ok(())
860 },
861 )
862 .await
863 .context("housekeeping: failed to SELECT value FROM config")?;
864
865 context
866 .sql
867 .query_map(
868 "SELECT blobname FROM http_cache",
869 (),
870 |row| row.get::<_, String>(0),
871 |rows| {
872 for row in rows {
873 maybe_add_file(&mut files_in_use, &row?);
874 }
875 Ok(())
876 },
877 )
878 .await
879 .context("Failed to SELECT blobname FROM http_cache")?;
880
881 info!(context, "{} files in use.", files_in_use.len());
882 let blobdir = context.get_blobdir();
884 for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
885 match tokio::fs::read_dir(p).await {
886 Ok(mut dir_handle) => {
887 let diff = std::time::Duration::from_secs(60 * 60);
889 let keep_files_newer_than = SystemTime::now()
890 .checked_sub(diff)
891 .unwrap_or(SystemTime::UNIX_EPOCH);
892
893 while let Ok(Some(entry)) = dir_handle.next_entry().await {
894 let name_f = entry.file_name();
895 let name_s = name_f.to_string_lossy();
896
897 if p == blobdir
898 && (is_file_in_use(&files_in_use, None, &name_s)
899 || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
900 || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
901 {
902 continue;
903 }
904
905 let stats = match tokio::fs::metadata(entry.path()).await {
906 Err(err) => {
907 warn!(
908 context,
909 "Cannot get metadata for {}: {:#}.",
910 entry.path().display(),
911 err
912 );
913 continue;
914 }
915 Ok(stats) => stats,
916 };
917
918 if stats.is_dir() {
919 if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
920 info!(
923 context,
924 "Housekeeping: Cannot rmdir {}: {:#}.",
925 entry.path().display(),
926 e
927 );
928 }
929 continue;
930 }
931
932 unreferenced_count += 1;
933 let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
934 let recently_modified =
935 stats.modified().is_ok_and(|t| t > keep_files_newer_than);
936 let recently_accessed =
937 stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
938
939 if p == blobdir && (recently_created || recently_modified || recently_accessed)
940 {
941 info!(
942 context,
943 "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
944 unreferenced_count,
945 entry.file_name(),
946 );
947 continue;
948 }
949
950 info!(
951 context,
952 "Housekeeping: Deleting unreferenced file #{}: {:?}.",
953 unreferenced_count,
954 entry.file_name()
955 );
956 let path = entry.path();
957 if let Err(err) = delete_file(context, &path).await {
958 error!(
959 context,
960 "Failed to delete unused file {}: {:#}.",
961 path.display(),
962 err
963 );
964 }
965 }
966 }
967 Err(err) => {
968 if !p.ends_with(BLOBS_BACKUP_NAME) {
969 warn!(
970 context,
971 "Housekeeping: Cannot read dir {}: {:#}.",
972 p.display(),
973 err
974 );
975 }
976 }
977 }
978 }
979
980 Ok(())
981}
982
983fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
984 let name_to_check = if let Some(namespc) = namespc_opt {
985 let Some(name) = name.strip_suffix(namespc) else {
986 return false;
987 };
988 name
989 } else {
990 name
991 };
992 files_in_use.contains(name_to_check)
993}
994
995fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
996 if let Some(file) = file.strip_prefix("$BLOBDIR/") {
997 files_in_use.insert(file.to_string());
998 }
999}
1000
1001async fn maybe_add_from_param(
1002 sql: &Sql,
1003 files_in_use: &mut HashSet<String>,
1004 query: &str,
1005 param_id: Param,
1006) -> Result<()> {
1007 sql.query_map(
1008 query,
1009 (),
1010 |row| row.get::<_, String>(0),
1011 |rows| {
1012 for row in rows {
1013 let param: Params = row?.parse().unwrap_or_default();
1014 if let Some(file) = param.get(param_id) {
1015 maybe_add_file(files_in_use, file);
1016 }
1017 }
1018 Ok(())
1019 },
1020 )
1021 .await
1022 .context(format!("housekeeping: failed to add_from_param {query}"))?;
1023
1024 Ok(())
1025}
1026
1027async fn prune_tombstones(sql: &Sql) -> Result<()> {
1030 let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1032 sql.execute(
1033 "DELETE FROM msgs
1034 WHERE chat_id=?
1035 AND timestamp<=?
1036 AND NOT EXISTS (
1037 SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1038 )",
1039 (DC_CHAT_ID_TRASH, timestamp_max),
1040 )
1041 .await?;
1042 Ok(())
1043}
1044
1045#[cfg(test)]
1046mod sql_tests;