1use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, bail, ensure};
8use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
9use tokio::sync::RwLock;
10
11use crate::blob::BlobObject;
12use crate::chat::add_device_msg;
13use crate::config::Config;
14use crate::constants::DC_CHAT_ID_TRASH;
15use crate::context::Context;
16use crate::debug_logging::set_debug_logging_xdc;
17use crate::ephemeral::start_ephemeral_timers;
18use crate::imex::BLOBS_BACKUP_NAME;
19use crate::location::delete_orphaned_poi_locations;
20use crate::log::{LogExt, warn};
21use crate::message::Message;
22use crate::message::MsgId;
23use crate::net::dns::prune_dns_cache;
24use crate::net::http::http_cache_cleanup;
25use crate::net::prune_connection_history;
26use crate::param::{Param, Params};
27use crate::stock_str;
28use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
29
30pub trait ToSql: rusqlite::ToSql + Send + Sync {}
33
34impl<T: rusqlite::ToSql + Send + Sync> ToSql for T {}
35
36#[macro_export]
42macro_rules! params_slice {
43 ($($param:expr),+) => {
44 [$(&$param as &dyn $crate::sql::ToSql),+]
45 };
46}
47
48mod migrations;
49mod pool;
50
51use pool::Pool;
52
53#[derive(Debug)]
55pub struct Sql {
56 pub(crate) dbfile: PathBuf,
58
59 pool: RwLock<Option<Pool>>,
61
62 is_encrypted: RwLock<Option<bool>>,
65
66 pub(crate) config_cache: RwLock<HashMap<String, Option<String>>>,
68}
69
70impl Sql {
71 pub fn new(dbfile: PathBuf) -> Sql {
73 Self {
74 dbfile,
75 pool: Default::default(),
76 is_encrypted: Default::default(),
77 config_cache: Default::default(),
78 }
79 }
80
81 pub async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
89 if self.is_open().await {
90 bail!("Database is already opened.");
91 }
92
93 let _lock = self.pool.write().await;
95
96 let connection = Connection::open(&self.dbfile)?;
98 if !passphrase.is_empty() {
99 connection
100 .pragma_update(None, "key", &passphrase)
101 .context("Failed to set PRAGMA key")?;
102 }
103 let key_is_correct = connection
104 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
105 .is_ok();
106
107 Ok(key_is_correct)
108 }
109
110 pub async fn is_open(&self) -> bool {
112 self.pool.read().await.is_some()
113 }
114
115 pub(crate) async fn is_encrypted(&self) -> Option<bool> {
119 *self.is_encrypted.read().await
120 }
121
122 pub(crate) async fn close(&self) {
124 let _ = self.pool.write().await.take();
125 }
127
128 pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
130 let path_str = path
131 .to_str()
132 .with_context(|| format!("path {path:?} is not valid unicode"))?
133 .to_string();
134
135 let mut config_cache = self.config_cache.write().await;
138 config_cache.clear();
139
140 let query_only = false;
141 self.call(query_only, move |conn| {
142 conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
144 .context("failed to attach backup database")?;
145 let res = conn
146 .query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
147 .context("backup passphrase is not correct");
148
149 res.and_then(|_| {
154 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
155 .context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")
156 })
157 .and_then(|_| {
158 conn.execute("VACUUM", [])
159 .context("failed to vacuum the database")
160 })
161 .and(
162 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
163 .context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE"),
164 )
165 .and_then(|_| {
166 conn.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
167 Ok(())
168 })
169 .context("failed to import from attached backup database")
170 })
171 .and(
172 conn.execute("DETACH DATABASE backup", [])
173 .context("failed to detach backup database"),
174 )?;
175 Ok(())
176 })
177 .await
178 }
179
180 const N_DB_CONNECTIONS: usize = 3;
181
182 fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
184 let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
185 for _ in 0..Self::N_DB_CONNECTIONS {
186 let connection = new_connection(dbfile, &passphrase)?;
187 connections.push(connection);
188 }
189
190 let pool = Pool::new(connections);
191 Ok(pool)
192 }
193
194 async fn try_open(&self, context: &Context, dbfile: &Path, passphrase: String) -> Result<()> {
195 *self.pool.write().await = Some(Self::new_pool(dbfile, passphrase.to_string())?);
196
197 if let Err(e) = self.run_migrations(context).await {
198 error!(context, "Running migrations failed: {e:#}");
199 eprintln!("Running migrations failed: {e:#}");
204 context.set_migration_error(&format!("Updating Delta Chat failed. Please send this message to the Delta Chat developers, either at delta@merlinux.eu or at https://support.delta.chat.\n\n{e:#}"));
205 }
210
211 Ok(())
212 }
213
214 pub async fn run_migrations(&self, context: &Context) -> Result<()> {
216 let recode_avatar = migrations::run(context, self)
221 .await
222 .context("failed to run migrations")?;
223
224 if recode_avatar && let Some(avatar) = context.get_config(Config::Selfavatar).await? {
228 let mut blob = BlobObject::from_path(context, Path::new(&avatar))?;
229 match blob.recode_to_avatar_size(context).await {
230 Ok(()) => {
231 if let Some(path) = blob.to_abs_path().to_str() {
232 context
233 .set_config_internal(Config::Selfavatar, Some(path))
234 .await?;
235 } else {
236 warn!(context, "Setting selfavatar failed: non-UTF-8 filename");
237 }
238 }
239 Err(e) => {
240 warn!(context, "Migrations can't recode avatar, removing. {:#}", e);
241 context
242 .set_config_internal(Config::Selfavatar, None)
243 .await?
244 }
245 }
246 }
247
248 Ok(())
249 }
250
251 pub async fn open(&self, context: &Context, passphrase: String) -> Result<()> {
254 if self.is_open().await {
255 error!(
256 context,
257 "Cannot open, database \"{:?}\" already opened.", self.dbfile,
258 );
259 bail!("SQL database is already opened.");
260 }
261
262 let passphrase_nonempty = !passphrase.is_empty();
263 self.try_open(context, &self.dbfile, passphrase).await?;
264 info!(context, "Opened database {:?}.", self.dbfile);
265 *self.is_encrypted.write().await = Some(passphrase_nonempty);
266
267 if let Some(xdc_id) = self
269 .get_raw_config_u32(Config::DebugLogging.as_ref())
270 .await?
271 {
272 set_debug_logging_xdc(context, Some(MsgId::new(xdc_id))).await?;
273 }
274 Ok(())
275 }
276
277 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
283 let mut lock = self.pool.write().await;
284
285 let pool = lock.take().context("SQL connection pool is not open")?;
286 let query_only = false;
287 let conn = pool.get(query_only).await?;
288 if !passphrase.is_empty() {
289 conn.pragma_update(None, "rekey", passphrase.clone())
290 .context("Failed to set PRAGMA rekey")?;
291 }
292 drop(pool);
293
294 *lock = Some(Self::new_pool(&self.dbfile, passphrase.to_string())?);
295
296 Ok(())
297 }
298
299 async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
306 where
307 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
308 R: Send + 'static,
309 {
310 let lock = self.pool.read().await;
311 let pool = lock.as_ref().context("no SQL connection")?;
312 let mut conn = pool.get(query_only).await?;
313 let res = tokio::task::block_in_place(move || function(&mut conn))?;
314 Ok(res)
315 }
316
317 pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
322 where
323 F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
324 R: Send + 'static,
325 {
326 let query_only = false;
327 self.call(query_only, function).await
328 }
329
330 pub async fn execute(
332 &self,
333 query: &str,
334 params: impl rusqlite::Params + Send,
335 ) -> Result<usize> {
336 self.call_write(move |conn| {
337 let res = conn.execute(query, params)?;
338 Ok(res)
339 })
340 .await
341 }
342
343 pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
345 self.call_write(move |conn| {
346 conn.execute(query, params)?;
347 Ok(conn.last_insert_rowid())
348 })
349 .await
350 }
351
352 pub async fn query_map<T, F, G, H>(
356 &self,
357 sql: &str,
358 params: impl rusqlite::Params + Send,
359 f: F,
360 g: G,
361 ) -> Result<H>
362 where
363 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
364 G: Send + FnOnce(rusqlite::AndThenRows<F>) -> Result<H>,
365 H: Send + 'static,
366 {
367 let query_only = true;
368 self.call(query_only, move |conn| {
369 let mut stmt = conn.prepare(sql)?;
370 let res = stmt.query_and_then(params, f)?;
371 g(res)
372 })
373 .await
374 }
375
376 pub async fn query_map_collect<T, C, F>(
380 &self,
381 sql: &str,
382 params: impl rusqlite::Params + Send,
383 f: F,
384 ) -> Result<C>
385 where
386 T: Send + 'static,
387 C: Send + 'static + std::iter::FromIterator<T>,
388 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
389 {
390 self.query_map(sql, params, f, |rows| {
391 rows.collect::<std::result::Result<C, _>>()
392 })
393 .await
394 }
395
396 pub async fn query_map_vec<T, F>(
400 &self,
401 sql: &str,
402 params: impl rusqlite::Params + Send,
403 f: F,
404 ) -> Result<Vec<T>>
405 where
406 T: Send + 'static,
407 F: Send + FnMut(&rusqlite::Row) -> Result<T>,
408 {
409 self.query_map_collect(sql, params, f).await
410 }
411
412 pub async fn count(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<usize> {
414 let count: isize = self.query_row(query, params, |row| row.get(0)).await?;
415 Ok(usize::try_from(count)?)
416 }
417
418 pub async fn exists(&self, sql: &str, params: impl rusqlite::Params + Send) -> Result<bool> {
421 let count = self.count(sql, params).await?;
422 Ok(count > 0)
423 }
424
425 pub async fn query_row<T, F>(
427 &self,
428 query: &str,
429 params: impl rusqlite::Params + Send,
430 f: F,
431 ) -> Result<T>
432 where
433 F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T> + Send,
434 T: Send + 'static,
435 {
436 let query_only = true;
437 self.call(query_only, move |conn| {
438 let res = conn.query_row(query, params, f)?;
439 Ok(res)
440 })
441 .await
442 }
443
444 pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
449 where
450 H: Send + 'static,
451 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
452 {
453 let query_only = false;
454 self.transaction_ex(query_only, callback).await
455 }
456
457 pub async fn transaction_ex<G, H>(&self, query_only: bool, callback: G) -> Result<H>
472 where
473 H: Send + 'static,
474 G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
475 {
476 self.call(query_only, move |conn| {
477 let mut transaction = conn.transaction()?;
478 let ret = callback(&mut transaction);
479
480 match ret {
481 Ok(ret) => {
482 transaction.commit()?;
483 Ok(ret)
484 }
485 Err(err) => {
486 transaction.rollback()?;
487 Err(err)
488 }
489 }
490 })
491 .await
492 }
493
494 pub async fn table_exists(&self, name: &str) -> Result<bool> {
496 let query_only = true;
497 self.call(query_only, move |conn| {
498 let mut exists = false;
499 conn.pragma(None, "table_info", name.to_string(), |_row| {
500 exists = true;
502 Ok(())
503 })?;
504
505 Ok(exists)
506 })
507 .await
508 }
509
510 pub async fn col_exists(&self, table_name: &str, col_name: &str) -> Result<bool> {
512 let query_only = true;
513 self.call(query_only, move |conn| {
514 let mut exists = false;
515 conn.pragma(None, "table_info", table_name.to_string(), |row| {
518 let curr_name: String = row.get(1)?;
519 if col_name == curr_name {
520 exists = true;
521 }
522 Ok(())
523 })?;
524
525 Ok(exists)
526 })
527 .await
528 }
529
530 pub async fn query_row_optional<T, F>(
532 &self,
533 sql: &str,
534 params: impl rusqlite::Params + Send,
535 f: F,
536 ) -> Result<Option<T>>
537 where
538 F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
539 T: Send + 'static,
540 {
541 let query_only = true;
542 self.call(query_only, move |conn| {
543 match conn.query_row(sql.as_ref(), params, f) {
544 Ok(res) => Ok(Some(res)),
545 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
546 Err(err) => Err(err.into()),
547 }
548 })
549 .await
550 }
551
552 pub async fn query_get_value<T>(
555 &self,
556 query: &str,
557 params: impl rusqlite::Params + Send,
558 ) -> Result<Option<T>>
559 where
560 T: rusqlite::types::FromSql + Send + 'static,
561 {
562 self.query_row_optional(query, params, |row| row.get::<_, T>(0))
563 .await
564 }
565
566 pub async fn set_raw_config(&self, key: &str, value: Option<&str>) -> Result<()> {
571 let mut lock = self.config_cache.write().await;
572 if let Some(value) = value {
573 self.execute(
574 "INSERT OR REPLACE INTO config (keyname, value) VALUES (?, ?)",
575 (key, value),
576 )
577 .await?;
578 } else {
579 self.execute("DELETE FROM config WHERE keyname=?", (key,))
580 .await?;
581 }
582 lock.insert(key.to_string(), value.map(|s| s.to_string()));
583 drop(lock);
584
585 Ok(())
586 }
587
588 pub async fn get_raw_config(&self, key: &str) -> Result<Option<String>> {
590 let lock = self.config_cache.read().await;
591 let cached = lock.get(key).cloned();
592 drop(lock);
593
594 if let Some(c) = cached {
595 return Ok(c);
596 }
597
598 let mut lock = self.config_cache.write().await;
599 let value = self
600 .query_get_value("SELECT value FROM config WHERE keyname=?", (key,))
601 .await
602 .context(format!("failed to fetch raw config: {key}"))?;
603 lock.insert(key.to_string(), value.clone());
604 drop(lock);
605
606 Ok(value)
607 }
608
609 pub(crate) async fn uncache_raw_config(&self, key: &str) {
611 let mut lock = self.config_cache.write().await;
612 lock.remove(key);
613 }
614
615 pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
617 self.set_raw_config(key, Some(&format!("{value}"))).await
618 }
619
620 pub async fn get_raw_config_int(&self, key: &str) -> Result<Option<i32>> {
622 self.get_raw_config(key)
623 .await
624 .map(|s| s.and_then(|s| s.parse().ok()))
625 }
626
627 pub async fn get_raw_config_u32(&self, key: &str) -> Result<Option<u32>> {
629 self.get_raw_config(key)
630 .await
631 .map(|s| s.and_then(|s| s.parse().ok()))
632 }
633
634 pub async fn get_raw_config_bool(&self, key: &str) -> Result<bool> {
636 let res = self.get_raw_config_int(key).await?;
639 Ok(res.unwrap_or_default() > 0)
640 }
641
642 pub async fn set_raw_config_bool(&self, key: &str, value: bool) -> Result<()> {
644 let value = if value { Some("1") } else { None };
645 self.set_raw_config(key, value).await
646 }
647
648 pub async fn set_raw_config_int64(&self, key: &str, value: i64) -> Result<()> {
650 self.set_raw_config(key, Some(&format!("{value}"))).await
651 }
652
653 pub async fn get_raw_config_int64(&self, key: &str) -> Result<Option<i64>> {
655 self.get_raw_config(key)
656 .await
657 .map(|s| s.and_then(|r| r.parse().ok()))
658 }
659
660 #[cfg(feature = "internals")]
662 pub fn config_cache(&self) -> &RwLock<HashMap<String, Option<String>>> {
663 &self.config_cache
664 }
665
666 pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
668 let t_start = Time::now();
669 let lock = context.sql.pool.read().await;
670 let Some(pool) = lock.as_ref() else {
671 return Ok(());
673 };
674
675 let query_only = true;
677 let conn = pool.get(query_only).await?;
678 tokio::task::block_in_place(|| {
679 conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
683 conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
684 })?;
685
686 const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
688 let _write_lock = pool.write_lock().await;
689 let t_writers_blocked = Time::now();
690 let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
696 for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
697 read_conns.push(pool.get(query_only).await?);
698 }
699 read_conns.clear();
700 let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
702 conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
703 let pages_total: i64 = row.get(1)?;
704 let pages_checkpointed: i64 = row.get(2)?;
705 Ok((pages_total, pages_checkpointed))
706 })
707 })?;
708 if pages_checkpointed < pages_total {
709 warn!(
710 context,
711 "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
712 );
713 }
714 for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
716 read_conns.push(pool.get(query_only).await?);
717 }
718 let t_readers_blocked = Time::now();
719 tokio::task::block_in_place(|| {
720 let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
721 let blocked: i64 = row.get(0)?;
722 Ok(blocked)
723 })?;
724 ensure!(blocked == 0);
725 Ok(())
726 })?;
727 info!(
728 context,
729 "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
730 time_elapsed(&t_start),
731 time_elapsed(&t_writers_blocked),
732 time_elapsed(&t_readers_blocked),
733 );
734 Ok(())
735 }
736}
737
738fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
745 let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
746 | OpenFlags::SQLITE_OPEN_READ_WRITE
747 | OpenFlags::SQLITE_OPEN_CREATE;
748 let conn = Connection::open_with_flags(path, flags)?;
749 conn.execute_batch(
750 "PRAGMA cipher_memory_security = OFF; -- Too slow on Android
751 PRAGMA secure_delete=on;
752 PRAGMA soft_heap_limit = 8388608; -- 8 MiB limit, same as set in Android SQLiteDatabase.
753 PRAGMA foreign_keys=on;
754 ",
755 )?;
756
757 if cfg!(not(target_os = "ios")) {
761 conn.pragma_update(None, "temp_store", "memory")?;
762 }
763
764 if cfg!(target_os = "ios") {
775 conn.busy_timeout(Duration::new(60, 0))?;
776 } else {
777 conn.busy_timeout(Duration::ZERO)?;
778 }
779
780 if !passphrase.is_empty() {
781 conn.pragma_update(None, "key", passphrase)?;
782 }
783 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
790
791 conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
792 conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
794
795 Ok(conn)
796}
797
798async fn incremental_vacuum(context: &Context) -> Result<()> {
802 context
803 .sql
804 .call_write(move |conn| {
805 let mut stmt = conn
806 .prepare("PRAGMA incremental_vacuum")
807 .context("Failed to prepare incremental_vacuum statement")?;
808
809 let mut rows = stmt
813 .query(())
814 .context("Failed to run incremental_vacuum statement")?;
815 let mut row_count = 0;
816 while let Some(_row) = rows
817 .next()
818 .context("Failed to step incremental_vacuum statement")?
819 {
820 row_count += 1;
821 }
822 info!(context, "Incremental vacuum freed {row_count} pages.");
823 Ok(())
824 })
825 .await
826}
827
828pub async fn housekeeping(context: &Context) -> Result<()> {
830 if let Err(e) = context
833 .set_config_internal(Config::LastHousekeeping, Some(&time().to_string()))
834 .await
835 {
836 warn!(context, "Can't set config: {e:#}.");
837 }
838
839 http_cache_cleanup(context)
840 .await
841 .context("Failed to cleanup HTTP cache")
842 .log_err(context)
843 .ok();
844 migrations::msgs_to_key_contacts(context)
845 .await
846 .context("migrations::msgs_to_key_contacts")
847 .log_err(context)
848 .ok();
849
850 if let Err(err) = remove_unused_files(context).await {
851 warn!(
852 context,
853 "Housekeeping: cannot remove unused files: {:#}.", err
854 );
855 }
856
857 if let Err(err) = start_ephemeral_timers(context).await {
858 warn!(
859 context,
860 "Housekeeping: cannot start ephemeral timers: {:#}.", err
861 );
862 }
863
864 if let Err(err) = prune_tombstones(&context.sql).await {
865 warn!(
866 context,
867 "Housekeeping: Cannot prune message tombstones: {:#}.", err
868 );
869 }
870
871 maybe_add_mvbox_move_deprecation_message(context)
872 .await
873 .context("maybe_add_mvbox_move_deprecation_message")
874 .log_err(context)
875 .ok();
876
877 if let Err(err) = incremental_vacuum(context).await {
878 warn!(context, "Failed to run incremental vacuum: {err:#}.");
879 }
880 if let Err(err) = Sql::wal_checkpoint(context).await {
885 warn!(context, "wal_checkpoint() failed: {err:#}.");
886 debug_assert!(false);
887 }
888
889 context
890 .sql
891 .execute(
892 "DELETE FROM msgs_mdns WHERE msg_id NOT IN \
893 (SELECT id FROM msgs WHERE chat_id!=?)",
894 (DC_CHAT_ID_TRASH,),
895 )
896 .await
897 .context("failed to remove old MDNs")
898 .log_err(context)
899 .ok();
900
901 context
902 .sql
903 .execute(
904 "DELETE FROM msgs_status_updates WHERE msg_id NOT IN \
905 (SELECT id FROM msgs WHERE chat_id!=?)",
906 (DC_CHAT_ID_TRASH,),
907 )
908 .await
909 .context("failed to remove old webxdc status updates")
910 .log_err(context)
911 .ok();
912
913 prune_connection_history(context)
914 .await
915 .context("Failed to prune connection history")
916 .log_err(context)
917 .ok();
918 prune_dns_cache(context)
919 .await
920 .context("Failed to prune DNS cache")
921 .log_err(context)
922 .ok();
923
924 delete_orphaned_poi_locations(context)
927 .await
928 .context("Failed to delete orphaned POI locations")
929 .log_err(context)
930 .ok();
931
932 info!(context, "Housekeeping done.");
933 Ok(())
934}
935
936async fn maybe_add_mvbox_move_deprecation_message(context: &Context) -> Result<()> {
939 if !context.get_config_bool(Config::OnlyFetchMvbox).await?
940 && context.get_config_bool(Config::MvboxMove).await?
941 {
942 let mut msg = Message::new_text(stock_str::mvbox_move_deprecation(context).await);
943 add_device_msg(context, Some("mvbox_move_deprecation"), Some(&mut msg)).await?;
944 }
945 Ok(())
946}
947
948pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
950 row.get(idx).or_else(|err| match row.get_ref(idx)? {
951 ValueRef::Null => Ok(Vec::new()),
952 ValueRef::Text(text) => Ok(text.to_vec()),
953 ValueRef::Blob(blob) => Ok(blob.to_vec()),
954 ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
955 })
956}
957
958pub async fn remove_unused_files(context: &Context) -> Result<()> {
960 let mut files_in_use = HashSet::new();
961 let mut unreferenced_count = 0;
962
963 info!(context, "Start housekeeping...");
964 maybe_add_from_param(
965 &context.sql,
966 &mut files_in_use,
967 "SELECT param FROM msgs WHERE chat_id!=3 AND type!=10;",
968 Param::File,
969 )
970 .await?;
971 maybe_add_from_param(
972 &context.sql,
973 &mut files_in_use,
974 "SELECT param FROM chats;",
975 Param::ProfileImage,
976 )
977 .await?;
978 maybe_add_from_param(
979 &context.sql,
980 &mut files_in_use,
981 "SELECT param FROM contacts;",
982 Param::ProfileImage,
983 )
984 .await?;
985
986 context
987 .sql
988 .query_map(
989 "SELECT value FROM config;",
990 (),
991 |row| {
992 let row: String = row.get(0)?;
993 Ok(row)
994 },
995 |rows| {
996 for row in rows {
997 maybe_add_file(&mut files_in_use, &row?);
998 }
999 Ok(())
1000 },
1001 )
1002 .await
1003 .context("housekeeping: failed to SELECT value FROM config")?;
1004
1005 context
1006 .sql
1007 .query_map(
1008 "SELECT blobname FROM http_cache",
1009 (),
1010 |row| {
1011 let row: String = row.get(0)?;
1012 Ok(row)
1013 },
1014 |rows| {
1015 for row in rows {
1016 maybe_add_file(&mut files_in_use, &row?);
1017 }
1018 Ok(())
1019 },
1020 )
1021 .await
1022 .context("Failed to SELECT blobname FROM http_cache")?;
1023
1024 info!(context, "{} files in use.", files_in_use.len());
1025 let blobdir = context.get_blobdir();
1027 for p in [&blobdir.join(BLOBS_BACKUP_NAME), blobdir] {
1028 match tokio::fs::read_dir(p).await {
1029 Ok(mut dir_handle) => {
1030 let diff = std::time::Duration::from_secs(60 * 60);
1032 let keep_files_newer_than = SystemTime::now()
1033 .checked_sub(diff)
1034 .unwrap_or(SystemTime::UNIX_EPOCH);
1035
1036 while let Ok(Some(entry)) = dir_handle.next_entry().await {
1037 let name_f = entry.file_name();
1038 let name_s = name_f.to_string_lossy();
1039
1040 if p == blobdir
1041 && (is_file_in_use(&files_in_use, None, &name_s)
1042 || is_file_in_use(&files_in_use, Some(".waveform"), &name_s)
1043 || is_file_in_use(&files_in_use, Some("-preview.jpg"), &name_s))
1044 {
1045 continue;
1046 }
1047
1048 let stats = match tokio::fs::metadata(entry.path()).await {
1049 Err(err) => {
1050 warn!(
1051 context,
1052 "Cannot get metadata for {}: {:#}.",
1053 entry.path().display(),
1054 err
1055 );
1056 continue;
1057 }
1058 Ok(stats) => stats,
1059 };
1060
1061 if stats.is_dir() {
1062 if let Err(e) = tokio::fs::remove_dir(entry.path()).await {
1063 info!(
1066 context,
1067 "Housekeeping: Cannot rmdir {}: {:#}.",
1068 entry.path().display(),
1069 e
1070 );
1071 }
1072 continue;
1073 }
1074
1075 unreferenced_count += 1;
1076 let recently_created = stats.created().is_ok_and(|t| t > keep_files_newer_than);
1077 let recently_modified =
1078 stats.modified().is_ok_and(|t| t > keep_files_newer_than);
1079 let recently_accessed =
1080 stats.accessed().is_ok_and(|t| t > keep_files_newer_than);
1081
1082 if p == blobdir && (recently_created || recently_modified || recently_accessed)
1083 {
1084 info!(
1085 context,
1086 "Housekeeping: Keeping new unreferenced file #{}: {:?}.",
1087 unreferenced_count,
1088 entry.file_name(),
1089 );
1090 continue;
1091 }
1092
1093 info!(
1094 context,
1095 "Housekeeping: Deleting unreferenced file #{}: {:?}.",
1096 unreferenced_count,
1097 entry.file_name()
1098 );
1099 let path = entry.path();
1100 if let Err(err) = delete_file(context, &path).await {
1101 error!(
1102 context,
1103 "Failed to delete unused file {}: {:#}.",
1104 path.display(),
1105 err
1106 );
1107 }
1108 }
1109 }
1110 Err(err) => {
1111 if !p.ends_with(BLOBS_BACKUP_NAME) {
1112 warn!(
1113 context,
1114 "Housekeeping: Cannot read dir {}: {:#}.",
1115 p.display(),
1116 err
1117 );
1118 }
1119 }
1120 }
1121 }
1122
1123 Ok(())
1124}
1125
1126fn is_file_in_use(files_in_use: &HashSet<String>, namespc_opt: Option<&str>, name: &str) -> bool {
1127 let name_to_check = if let Some(namespc) = namespc_opt {
1128 let Some(name) = name.strip_suffix(namespc) else {
1129 return false;
1130 };
1131 name
1132 } else {
1133 name
1134 };
1135 files_in_use.contains(name_to_check)
1136}
1137
1138fn maybe_add_file(files_in_use: &mut HashSet<String>, file: &str) {
1139 if let Some(file) = file.strip_prefix("$BLOBDIR/") {
1140 files_in_use.insert(file.to_string());
1141 }
1142}
1143
1144async fn maybe_add_from_param(
1145 sql: &Sql,
1146 files_in_use: &mut HashSet<String>,
1147 query: &str,
1148 param_id: Param,
1149) -> Result<()> {
1150 sql.query_map(
1151 query,
1152 (),
1153 |row| {
1154 let row: String = row.get(0)?;
1155 Ok(row)
1156 },
1157 |rows| {
1158 for row in rows {
1159 let param: Params = row?.parse().unwrap_or_default();
1160 if let Some(file) = param.get(param_id) {
1161 maybe_add_file(files_in_use, file);
1162 }
1163 }
1164 Ok(())
1165 },
1166 )
1167 .await
1168 .context(format!("housekeeping: failed to add_from_param {query}"))?;
1169
1170 Ok(())
1171}
1172
1173async fn prune_tombstones(sql: &Sql) -> Result<()> {
1176 let timestamp_max = time().saturating_sub(2 * 24 * 3600);
1178 sql.execute(
1179 "DELETE FROM msgs
1180 WHERE chat_id=?
1181 AND timestamp<=?
1182 AND NOT EXISTS (
1183 SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!=''
1184 )",
1185 (DC_CHAT_ID_TRASH, timestamp_max),
1186 )
1187 .await?;
1188 Ok(())
1189}
1190
1191#[cfg(test)]
1192mod sql_tests;