1use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5
6use anyhow::{bail, Context as _, Result};
7use rusqlite::{config::DbConfig, types::ValueRef, Connection, OpenFlags, Row};
8use tokio::sync::RwLock;
9
10use crate::blob::BlobObject;
11use crate::chat::{self, add_device_msg, update_device_icon, update_saved_messages_icon};
12use crate::config::Config;
13use crate::constants::DC_CHAT_ID_TRASH;
14use crate::context::Context;
15use crate::debug_logging::set_debug_logging_xdc;
16use crate::ephemeral::start_ephemeral_timers;
17use crate::imex::BLOBS_BACKUP_NAME;
18use crate::location::delete_orphaned_poi_locations;
19use crate::log::LogExt;
20use crate::message::{Message, MsgId};
21use crate::net::dns::prune_dns_cache;
22use crate::net::http::http_cache_cleanup;
23use crate::net::prune_connection_history;
24use crate::param::{Param, Params};
25use crate::peerstate::Peerstate;
26use crate::stock_str;
27use crate::tools::{delete_file, time, SystemTime};
28
29pub trait ToSql: rusqlite::ToSql + Send + Sync {}
32
33impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
34
35#[macro_export]
41macro_rules! params_slice {
42 ($($param:expr),+) => {
43 [$(&$param as &dyn $crate::sql::ToSql),+]
44 };
45}
46
47mod migrations;
48mod pool;
49
50use pool::Pool;
51
52#[derive(Debug)]
54pub struct Sql {
55 pub(crate) dbfile: PathBuf,
57
58 pool: RwLock<Option<Pool>>,
60
61 is_encrypted: RwLock<Option<bool>>,
64
65 pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
67}
68
69impl Sql {
70 pub fn new(dbfile: PathBuf) -> Sql {
72 Self {
73 dbfile,
74 pool: Default::default(),
75 is_encrypted: Default::default(),
76 config_cache: Default::default(),
77 }
78 }
79
80 pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
88 if self.is_open().await {
89 bail!("Database is already opened.");
90 }
91
92 let _lock = self.pool.write().await;
94
95 let connection = Connection::open(&self.dbfile)?;
97 if !passphrase.is_empty() {
98 connection
99 .pragma_update(None, "key", &passphrase)
100 .context("Failed to set PRAGMA key")?;
101 }
102 let key_is_correct = connection
103 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
104 .is_ok();
105
106 Ok(key_is_correct)
107 }
108
109 pub async fn is_open(&self) -> bool {
111 self.pool.read().await.is_some()
112 }
113
114 pub(crate) async fn is_encrypted(&self) -> Option<bool> {
118 *self.is_encrypted.read().await
119 }
120
121 pub(crate) async fn close(&self) {
123 let _ = self.pool.write().await.take();
124 }
126
127 pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
129 let path_str = path
130 .to_str()
131 .with_context(|| format!("path {path:?} is not valid unicode"))?
132 .to_string();
133
134 let mut config_cache = self.config_cache.write().await;
137 config_cache.clear();
138
139 let query_only = false;
140 self.call(query_only, move |conn| {
141 conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
143 .context("failed to attach backup database")?;
144 let res = conn
145 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
146 .context("backup passphrase is not correct");
147
148 res.and_then(|_| {
153 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
154 .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
155 })
156 .and_then(|_| {
157 conn.execute("VACUUM", [])
158 .context("failed to vacuum the database")
159 })
160 .and(
161 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
162 .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
163 )
164 .and_then(|_| {
165 conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
166 Ok(())
167 })
168 .context("failed to import from attached backup database")
169 })
170 .and(
171 conn.execute("DETACH DATABASE backup", [])
172 .context("failed to detach backup database"),
173 )?;
174 Ok(())
175 })
176 .await
177 }
178
179 fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
181 let mut connections = Vec::new();
182 for _ in 0..3 {
183 let connection = new_connection(dbfile, &passphrase)?;
184 connections.push(connection);
185 }
186
187 let pool = Pool::new(connections);
188 Ok(pool)
189 }
190
191 async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
192 *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
193
194 self.run_migrations(context).await?;
195
196 Ok(())
197 }
198
199 pub async fn run_migrations(&self, context: &Context) -> Result<()> {
201 let (recalc_fingerprints, update_icons, disable_server_delete, recode_avatar) =
206 migrations::run(context, self)
207 .await
208 .context("failed to run migrations")?;
209
210 if recalc_fingerprints {
214 info!(context, "[migration] recalc fingerprints");
215 let addrs = self
216 .query_map(
217 "SELECT addr FROM acpeerstates;",
218 (),
219 |row| row.get::<_, String>(0),
220 |addrs| {
221 addrs
222 .collect::<std::result::Result<Vec<_>, _>>()
223 .map_err(Into::into)
224 },
225 )
226 .await?;
227 for addr in &addrs {
228 if let Some(ref mut peerstate) = Peerstate::from_addr(context, addr).await? {
229 peerstate.recalc_fingerprint();
230 peerstate.save_to_db(self).await?;
231 }
232 }
233 }
234
235 if update_icons {
236 update_saved_messages_icon(context).await?;
237 update_device_icon(context).await?;
238 }
239
240 if disable_server_delete {
241 if context.get_config_delete_server_after().await?.is_some() {
244 let mut msg = Message::new_text(stock_str::delete_server_turned_off(context).await);
245 add_device_msg(context, None, Some(&mut msg)).await?;
246 context
247 .set_config_internal(Config::DeleteServerAfter, Some("0"))
248 .await?;
249 }
250 }
251
252 if recode_avatar {
253 if let Some(avatar) = context.get_config(Config::Selfavatar).await? {
254 let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
255 match blob.recode_to_avatar_size(context).await {
256 Ok(()) => {
257 if let Some(path) = blob.to_abs_path().to_str() {
258 context
259 .set_config_internal(Config::Selfavatar, Some(path))
260 .await?;
261 } else {
262 warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
263 }
264 }
265 Err(e) => {
266 warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
267 context
268 .set_config_internal(Config::Selfavatar, None)
269 .await?
270 }
271 }
272 }
273 }
274
275 Ok(())
276 }
277
278 pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
281 if self.is_open().await {
282 error!(
283 context,
284 "Cannot open, database \"{:?}\" already opened.", self.dbfile,
285 );
286 bail!("SQL database is already opened.");
287 }
288
289 let passphrase_nonempty = !passphrase.is_empty();
290 if let Err(err) = self.try_open(context, &self.dbfile, passphrase).await {
291 self.close().await;
292 return Err(err);
293 }
294 info!(context, "Opened database {:?}.", self.dbfile);
295 *self.is_encrypted.write().await = Some(passphrase_nonempty);
296
297 if let Some(xdc_id) = self
299 .get_raw_config_u32(Config::DebugLogging.as_ref())
300 .await?
301 {
302 set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
303 }
304 chat::resume_securejoin_wait(context)
305 .await
306 .log_err(context)
307 .ok();
308 Ok(())
309 }
310
311 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
317 let mut lock = self.pool.write().await;
318
319 let pool = lock.take().context("SQL connection pool is not open")?;
320 let query_only = false;
321 let conn = pool.get(query_only).await?;
322 if !passphrase.is_empty() {
323 conn.pragma_update(None, "rekey", passphrase.clone())
324 .context("Failed to set PRAGMA rekey")?;
325 }
326 drop(pool);
327
328 *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
329
330 Ok(())
331 }
332
333 async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
340 where
341 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
342 R: Send + 'static,
343 {
344 let lock = self.pool.read().await;
345 let pool = lock.as_ref().context("no SQL connection")?;
346 let mut conn = pool.get(query_only).await?;
347 let res = tokio::task::block_in_place(move || function(&mut conn))?;
348 Ok(res)
349 }
350
351 pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
356 where
357 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
358 R: Send + 'static,
359 {
360 let query_only = false;
361 self.call(query_only, function).await
362 }
363
364 pub async fn execute(
366 &self,
367 query: &str,
368 params: impl rusqlite::Params + Send,
369 ) -> Result<usize> {
370 self.call_write(move |conn| {
371 let res = conn.execute(query, params)?;
372 Ok(res)
373 })
374 .await
375 }
376
377 pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
379 self.call_write(move |conn| {
380 conn.execute(query, params)?;
381 Ok(conn.last_insert_rowid())
382 })
383 .await
384 }
385
386 pub async fn query_map<T, F, G, H>(
390 &self,
391 sql: &str,
392 params: impl rusqlite::Params + Send,
393 f: F,
394 mut g: G,
395 ) -> Result<H>
396 where
397 F: Send + FnMut(&rusqlite::Row) -> rusqlite::Result<T>,
398 G: Send + FnMut(rusqlite::MappedRows<F>) -> Result<H>,
399 H: Send + 'static,
400 {
401 let query_only = true;
402 self.call(query_only, move |conn| {
403 let mut stmt = conn.prepare(sql)?;
404 let res = stmt.query_map(params, f)?;
405 g(res)
406 })
407 .await
408 }
409
410 pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
412 let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
413 Ok(usize::try_from(count)?)
414 }
415
416 pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
419 let count = self.count(sql, params).await?;
420 Ok(count > 0)
421 }
422
423 pub async fn query_row<T, F>(
425 &self,
426 query: &str,
427 params: impl rusqlite::Params + Send,
428 f: F,
429 ) -> Result<T>
430 where
431 F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
432 T: Send + 'static,
433 {
434 let query_only = true;
435 self.call(query_only, move |conn| {
436 let res = conn.query_row(query, params, f)?;
437 Ok(res)
438 })
439 .await
440 }
441
442 pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
447 where
448 H: Send + 'static,
449 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
450 {
451 let query_only = false;
452 self.transaction_ex(query_only, callback).await
453 }
454
455 pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
470 where
471 H: Send + 'static,
472 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
473 {
474 self.call(query_only, move |conn| {
475 let mut transaction = conn.transaction()?;
476 let ret = callback(&mut transaction);
477
478 match ret {
479 Ok(ret) => {
480 transaction.commit()?;
481 Ok(ret)
482 }
483 Err(err) => {
484 transaction.rollback()?;
485 Err(err)
486 }
487 }
488 })
489 .await
490 }
491
492 pub async fn table_exists(&self, name: &str) -> Result<bool> {
494 let query_only = true;
495 self.call(query_only, move |conn| {
496 let mut exists = false;
497 conn.pragma(None, "table_info", name.to_string(), |_row| {
498 exists = true;
500 Ok(())
501 })?;
502
503 Ok(exists)
504 })
505 .await
506 }
507
508 pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
510 let query_only = true;
511 self.call(query_only, move |conn| {
512 let mut exists = false;
513 conn.pragma(None, "table_info", table_name.to_string(), |row| {
516 let curr_name: String = row.get(1)?;
517 if col_name == curr_name {
518 exists = true;
519 }
520 Ok(())
521 })?;
522
523 Ok(exists)
524 })
525 .await
526 }
527
528 pub async fn query_row_optional<T, F>(
530 &self,
531 sql: &str,
532 params: impl rusqlite::Params + Send,
533 f: F,
534 ) -> Result<Option<T>>
535 where
536 F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
537 T: Send + 'static,
538 {
539 let query_only = true;
540 self.call(query_only, move |conn| {
541 match conn.query_row(sql.as_ref(), params, f) {
542 Ok(res) => Ok(Some(res)),
543 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
544 Err(err) => Err(err.into()),
545 }
546 })
547 .await
548 }
549
550 pub async fn query_get_value<T>(
553 &self,
554 query: &str,
555 params: impl rusqlite::Params + Send,
556 ) -> Result<Option<T>>
557 where
558 T: rusqlite::types::FromSql + Send + 'static,
559 {
560 self.query_row_optional(query, params, |row| row.get::<_, T>(0))
561 .await
562 }
563
564 pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
569 let mut lock = self.config_cache.write().await;
570 if let Some(value) = value {
571 self.execute(
572 "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
573 (key, value),
574 )
575 .await?;
576 } else {
577 self.execute("DELETE FROM config WHERE keyname=?", (key,))
578 .await?;
579 }
580 lock.insert(key.to_string(), value.map(|s| s.to_string()));
581 drop(lock);
582
583 Ok(())
584 }
585
586 pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
588 let lock = self.config_cache.read().await;
589 let cached = lock.get(key).cloned();
590 drop(lock);
591
592 if let Some(c) = cached {
593 return Ok(c);
594 }
595
596 let mut lock = self.config_cache.write().await;
597 let value = self
598 .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
599 .await
600 .context(format!("failed to fetch raw config: {key}"))?;
601 lock.insert(key.to_string(), value.clone());
602 drop(lock);
603
604 Ok(value)
605 }
606
607 pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
609 self.set_raw_config(key, Some(&format!("{value}"))).await
610 }
611
612 pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
614 self.get_raw_config(key)
615 .await
616 .map(|s| s.and_then(|s| s.parse().ok()))
617 }
618
619 pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
621 self.get_raw_config(key)
622 .await
623 .map(|s| s.and_then(|s| s.parse().ok()))
624 }
625
626 pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
628 let res = self.get_raw_config_int(key).await?;
631 Ok(res.unwrap_or_default() > 0)
632 }
633
634 pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
636 let value = if value { Some("1") } else { None };
637 self.set_raw_config(key, value).await
638 }
639
640 pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
642 self.set_raw_config(key, Some(&format!("{value}"))).await
643 }
644
645 pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
647 self.get_raw_config(key)
648 .await
649 .map(|s| s.and_then(|r| r.parse().ok()))
650 }
651
652 #[cfg(feature = "internals")]
654 pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
655 &self.config_cache
656 }
657}
658
659fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
666 let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
667 | OpenFlags::SQLITE_OPEN_READ_WRITE
668 | OpenFlags::SQLITE_OPEN_CREATE;
669 let conn = Connection::open_with_flags(path, flags)?;
670 conn.execute_batch(
671 "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
672 PRAGMA secure_delete=on;
673 PRAGMA busy_timeout = 0; -- fail immediately
674 PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
675 PRAGMA foreign_keys=on;
676 ",
677 )?;
678
679 if cfg!(not(target_os = "ios")) {
683 conn.pragma_update(None, "temp_store", "memory")?;
684 }
685
686 if !passphrase.is_empty() {
687 conn.pragma_update(None, "key", passphrase)?;
688 }
689 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
696
697 conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
698 conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
700
701 Ok(conn)
702}
703
704async fn incremental_vacuum(context: &Context) -> Result<()> {
708 context
709 .sql
710 .call_write(move |conn| {
711 let mut stmt = conn
712 .prepare("PRAGMA incremental_vacuum")
713 .context("Failed to prepare incremental_vacuum statement")?;
714
715 let mut rows = stmt
719 .query(())
720 .context("Failed to run incremental_vacuum statement")?;
721 let mut row_count = 0;
722 while let Some(_row) = rows
723 .next()
724 .context("Failed to step incremental_vacuum statement")?
725 {
726 row_count += 1;
727 }
728 info!(context, "Incremental vacuum freed {row_count} pages.");
729 Ok(())
730 })
731 .await
732}
733
734pub async fn housekeeping(context: &Context) -> Result<()> {
736 if let Err(e) = context
739 .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
740 .await
741 {
742 warn!(context, "Can't set config: {e:#}.");
743 }
744
745 http_cache_cleanup(context)
746 .await
747 .context("Failed to cleanup HTTP cache")
748 .log_err(context)
749 .ok();
750
751 if let Err(err) = remove_unused_files(context).await {
752 warn!(
753 context,
754 "Housekeeping: cannot remove unused files: {:#}.", err
755 );
756 }
757
758 if let Err(err) = start_ephemeral_timers(context).await {
759 warn!(
760 context,
761 "Housekeeping: cannot start ephemeral timers: {:#}.", err
762 );
763 }
764
765 if let Err(err) = prune_tombstones(&context.sql).await {
766 warn!(
767 context,
768 "Housekeeping: Cannot prune message tombstones: {:#}.", err
769 );
770 }
771
772 if let Err(err) = incremental_vacuum(context).await {
773 warn!(context, "Failed to run incremental vacuum: {err:#}.");
774 }
775
776 context
777 .sql
778 .execute(
779 "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
780 (SELECT id FROM msgs WHERE chat_id!=?)",
781 (DC_CHAT_ID_TRASH,),
782 )
783 .await
784 .context("failed to remove old MDNs")
785 .log_err(context)
786 .ok();
787
788 context
789 .sql
790 .execute(
791 "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
792 (SELECT id FROM msgs WHERE chat_id!=?)",
793 (DC_CHAT_ID_TRASH,),
794 )
795 .await
796 .context("failed to remove old webxdc status updates")
797 .log_err(context)
798 .ok();
799
800 prune_connection_history(context)
801 .await
802 .context("Failed to prune connection history")
803 .log_err(context)
804 .ok();
805 prune_dns_cache(context)
806 .await
807 .context("Failed to prune DNS cache")
808 .log_err(context)
809 .ok();
810
811 delete_orphaned_poi_locations(context)
814 .await
815 .context("Failed to delete orphaned POI locations")
816 .log_err(context)
817 .ok();
818
819 info!(context, "Housekeeping done.");
820 Ok(())
821}
822
823pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
825 row.get(idx).or_else(|err| match row.get_ref(idx)? {
826 ValueRef::Null => Ok(Vec::new()),
827 ValueRef::Text(text) => Ok(text.to_vec()),
828 ValueRef::Blob(blob) => Ok(blob.to_vec()),
829 ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
830 })
831}
832
833pub async fn remove_unused_files(context: &Context) -> Result<()> {
835 let mut files_in_use = HashSet::new();
836 let mut unreferenced_count = 0;
837
838 info!(context, "Start housekeeping...");
839 maybe_add_from_param(
840 &context.sql,
841 &mut files_in_use,
842 "SELECT param FROM msgs WHERE chat_id!=3 AND type!=10;",
843 Param::File,
844 )
845 .await?;
846 maybe_add_from_param(
847 &context.sql,
848 &mut files_in_use,
849 "SELECT param FROM chats;",
850 Param::ProfileImage,
851 )
852 .await?;
853 maybe_add_from_param(
854 &context.sql,
855 &mut files_in_use,
856 "SELECT param FROM contacts;",
857 Param::ProfileImage,
858 )
859 .await?;
860
861 context
862 .sql
863 .query_map(
864 "SELECT value FROM config;",
865 (),
866 |row| row.get::<_, String>(0),
867 |rows| {
868 for row in rows {
869 maybe_add_file(&mut files_in_use, &row?);
870 }
871 Ok(())
872 },
873 )
874 .await
875 .context("housekeeping: failed to SELECT value FROM config")?;
876
877 context
878 .sql
879 .query_map(
880 "SELECT blobname FROM http_cache",
881 (),
882 |row| row.get::<_, String>(0),
883 |rows| {
884 for row in rows {
885 maybe_add_file(&mut files_in_use, &row?);
886 }
887 Ok(())
888 },
889 )
890 .await
891 .context("Failed to SELECT blobname FROM http_cache")?;
892
893 info!(context, "{} files in use.", files_in_use.len());
894 let blobdir = context.get_blobdir();
896 for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
897 match tokio::fs::read_dir(p).await {
898 Ok(mut dir_handle) => {
899 let diff = std::time::Duration::from_secs(60 * 60);
901 let keep_files_newer_than = SystemTime::now()
902 .checked_sub(diff)
903 .unwrap_or(SystemTime::UNIX_EPOCH);
904
905 while let Ok(Some(entry)) = dir_handle.next_entry().await {
906 let name_f = entry.file_name();
907 let name_s = name_f.to_string_lossy();
908
909 if p == blobdir
910 && (is_file_in_use(&files_in_use, None, &name_s)
911 || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
912 || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
913 {
914 continue;
915 }
916
917 let stats = match tokio::fs::metadata(entry.path()).await {
918 Err(err) => {
919 warn!(
920 context,
921 "Cannot get metadata for {}: {:#}.",
922 entry.path().display(),
923 err
924 );
925 continue;
926 }
927 Ok(stats) => stats,
928 };
929
930 if stats.is_dir() {
931 if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
932 info!(
935 context,
936 "Housekeeping: Cannot rmdir {}: {:#}.",
937 entry.path().display(),
938 e
939 );
940 }
941 continue;
942 }
943
944 unreferenced_count += 1;
945 let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
946 let recently_modified =
947 stats.modified().is_ok_and(|t| t > keep_files_newer_than);
948 let recently_accessed =
949 stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
950
951 if p == blobdir && (recently_created || recently_modified || recently_accessed)
952 {
953 info!(
954 context,
955 "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
956 unreferenced_count,
957 entry.file_name(),
958 );
959 continue;
960 }
961
962 info!(
963 context,
964 "Housekeeping: Deleting unreferenced file #{}: {:?}.",
965 unreferenced_count,
966 entry.file_name()
967 );
968 let path = entry.path();
969 if let Err(err) = delete_file(context, &path).await {
970 error!(
971 context,
972 "Failed to delete unused file {}: {:#}.",
973 path.display(),
974 err
975 );
976 }
977 }
978 }
979 Err(err) => {
980 if !p.ends_with(BLOBS_BACKUP_NAME) {
981 warn!(
982 context,
983 "Housekeeping: Cannot read dir {}: {:#}.",
984 p.display(),
985 err
986 );
987 }
988 }
989 }
990 }
991
992 Ok(())
993}
994
995fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
996 let name_to_check = if let Some(namespc) = namespc_opt {
997 let Some(name) = name.strip_suffix(namespc) else {
998 return false;
999 };
1000 name
1001 } else {
1002 name
1003 };
1004 files_in_use.contains(name_to_check)
1005}
1006
1007fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1008 if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1009 files_in_use.insert(file.to_string());
1010 }
1011}
1012
1013async fn maybe_add_from_param(
1014 sql: &Sql,
1015 files_in_use: &mut HashSet<String>,
1016 query: &str,
1017 param_id: Param,
1018) -> Result<()> {
1019 sql.query_map(
1020 query,
1021 (),
1022 |row| row.get::<_, String>(0),
1023 |rows| {
1024 for row in rows {
1025 let param: Params = row?.parse().unwrap_or_default();
1026 if let Some(file) = param.get(param_id) {
1027 maybe_add_file(files_in_use, file);
1028 }
1029 }
1030 Ok(())
1031 },
1032 )
1033 .await
1034 .context(format!("housekeeping: failed to add_from_param {query}"))?;
1035
1036 Ok(())
1037}
1038
1039async fn prune_tombstones(sql: &Sql) -> Result<()> {
1042 let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1044 sql.execute(
1045 "DELETE FROM msgs
1046 WHERE chat_id=?
1047 AND timestamp<=?
1048 AND NOT EXISTS (
1049 SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1050 )",
1051 (DC_CHAT_ID_TRASH, timestamp_max),
1052 )
1053 .await?;
1054 Ok(())
1055}
1056
1057#[cfg(test)]
1058mod sql_tests;