1use std::collections::{BTreeMap, HashMap};
4use std::ffi::OsString;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicBool;
8use std::sync::{Arc, OnceLock, Weak};
9use std::time::Duration;
10
11use anyhow::{Result, bail, ensure};
12use async_channel::{self as channel, Receiver, Sender};
13use pgp::composed::SignedPublicKey;
14use ratelimit::Ratelimit;
15use tokio::sync::{Mutex, Notify, RwLock};
16
17use crate::chat::{ChatId, get_chat_cnt};
18use crate::config::Config;
19use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
20use crate::contact::{Contact, ContactId};
21use crate::debug_logging::DebugLogging;
22use crate::events::{Event, EventEmitter, EventType, Events};
23use crate::imap::{Imap, ServerMetadata};
24use crate::key::self_fingerprint;
25use crate::log::warn;
26use crate::logged_debug_assert;
27use crate::message::{self, MessageState, MsgId};
28use crate::net::tls::{SpkiHashStore, TlsSessionStore};
29use crate::peer_channels::Iroh;
30use crate::push::PushSubscriber;
31use crate::quota::QuotaInfo;
32use crate::scheduler::{ConnectivityStore, SchedulerState};
33use crate::sql::Sql;
34use crate::stock_str::StockStrings;
35use crate::timesmearing::SmearedTimestamp;
36use crate::tools::{self, duration_to_str, time, time_elapsed};
37use crate::transport::ConfiguredLoginParam;
38use crate::{chatlist_events, stats};
39
40pub use crate::scheduler::connectivity::Connectivity;
41
42#[derive(Clone, Debug)]
67pub struct ContextBuilder {
68 dbfile: PathBuf,
69 id: u32,
70 events: Events,
71 stock_strings: StockStrings,
72 password: Option<String>,
73
74 push_subscriber: Option<PushSubscriber>,
75}
76
77impl ContextBuilder {
78 pub fn new(dbfile: PathBuf) -> Self {
84 ContextBuilder {
85 dbfile,
86 id: rand::random(),
87 events: Events::new(),
88 stock_strings: StockStrings::new(),
89 password: None,
90 push_subscriber: None,
91 }
92 }
93
94 pub fn with_id(mut self, id: u32) -> Self {
104 self.id = id;
105 self
106 }
107
108 pub fn with_events(mut self, events: Events) -> Self {
117 self.events = events;
118 self
119 }
120
121 pub fn with_stock_strings(mut self, stock_strings: StockStrings) -> Self {
132 self.stock_strings = stock_strings;
133 self
134 }
135
136 #[deprecated(since = "TBD")]
144 pub fn with_password(mut self, password: String) -> Self {
145 self.password = Some(password);
146 self
147 }
148
149 pub(crate) fn with_push_subscriber(mut self, push_subscriber: PushSubscriber) -> Self {
151 self.push_subscriber = Some(push_subscriber);
152 self
153 }
154
155 pub async fn build(self) -> Result<Context> {
157 let push_subscriber = self.push_subscriber.unwrap_or_default();
158 let context = Context::new_closed(
159 &self.dbfile,
160 self.id,
161 self.events,
162 self.stock_strings,
163 push_subscriber,
164 )
165 .await?;
166 Ok(context)
167 }
168
169 pub async fn open(self) -> Result<Context> {
173 let password = self.password.clone().unwrap_or_default();
174 let context = self.build().await?;
175 match context.open(password).await? {
176 true => Ok(context),
177 false => bail!("database could not be decrypted, incorrect or missing password"),
178 }
179 }
180}
181
182#[derive(Clone, Debug)]
194pub struct Context {
195 pub(crate) inner: Arc<InnerContext>,
196}
197
198impl Deref for Context {
199 type Target = InnerContext;
200
201 fn deref(&self) -> &Self::Target {
202 &self.inner
203 }
204}
205
206#[derive(Clone, Debug)]
210pub(crate) struct WeakContext {
211 inner: Weak<InnerContext>,
212}
213
214impl WeakContext {
215 pub(crate) fn upgrade(&self) -> Result<Context> {
217 let inner = self
218 .inner
219 .upgrade()
220 .ok_or_else(|| anyhow::anyhow!("Inner struct has been dropped"))?;
221 Ok(Context { inner })
222 }
223}
224
225#[derive(Debug)]
227pub struct InnerContext {
228 pub(crate) blobdir: PathBuf,
230 pub(crate) sql: Sql,
231 pub(crate) smeared_timestamp: SmearedTimestamp,
232 running_state: RwLock<RunningState>,
237 pub(crate) oauth2_mutex: Mutex<()>,
239 pub(crate) wrong_pw_warning_mutex: Mutex<()>,
241 pub(crate) housekeeping_mutex: Mutex<()>,
243
244 pub(crate) fetch_msgs_mutex: Mutex<()>,
251
252 pub(crate) translated_stockstrings: StockStrings,
253 pub(crate) events: Events,
254
255 pub(crate) scheduler: SchedulerState,
256 pub(crate) ratelimit: RwLock<Ratelimit>,
257
258 pub(crate) quota: RwLock<BTreeMap<u32, QuotaInfo>>,
261
262 pub(crate) new_msgs_notify: Notify,
266
267 pub(crate) server_id: RwLock<Option<HashMap<String, String>>>,
271
272 pub(crate) metadata: RwLock<Option<ServerMetadata>>,
274
275 pub(crate) id: u32,
280
281 creation_time: tools::Time,
282
283 pub(crate) last_error: parking_lot::RwLock<String>,
287
288 pub(crate) migration_error: parking_lot::RwLock<Option<String>>,
294
295 pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
300
301 pub(crate) push_subscriber: PushSubscriber,
304
305 pub(crate) push_subscribed: AtomicBool,
307
308 pub(crate) tls_session_store: TlsSessionStore,
310
311 pub(crate) spki_hash_store: SpkiHashStore,
317
318 pub(crate) iroh: Arc<RwLock<Option<Iroh>>>,
320
321 pub(crate) self_fingerprint: OnceLock<String>,
325
326 pub(crate) self_public_key: Mutex<Option<SignedPublicKey>>,
332
333 pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
336
337 #[expect(clippy::type_complexity)]
338 pub(crate) pre_encrypt_mime_hook: parking_lot::Mutex<
340 Option<
341 for<'a> fn(
342 &Context,
343 mail_builder::mime::MimePart<'a>,
344 ) -> mail_builder::mime::MimePart<'a>,
345 >,
346 >,
347}
348
349#[derive(Debug, Default)]
351enum RunningState {
352 Running { cancel_sender: Sender<()> },
354
355 ShallStop { request: tools::Time },
357
358 #[default]
360 Stopped,
361}
362
363#[expect(clippy::arithmetic_side_effects)]
370pub fn get_info() -> BTreeMap<&'static str, String> {
371 let mut res = BTreeMap::new();
372
373 #[cfg(debug_assertions)]
374 res.insert(
375 "debug_assertions",
376 "On - DO NOT RELEASE THIS BUILD".to_string(),
377 );
378 #[cfg(not(debug_assertions))]
379 res.insert("debug_assertions", "Off".to_string());
380
381 res.insert("deltachat_core_version", format!("v{DC_VERSION_STR}"));
382 res.insert("sqlite_version", rusqlite::version().to_string());
383 res.insert("arch", (std::mem::size_of::<usize>() * 8).to_string());
384 res.insert("num_cpus", num_cpus::get().to_string());
385 res.insert("level", "awesome".into());
386 res
387}
388
389impl Context {
390 pub async fn new(
392 dbfile: &Path,
393 id: u32,
394 events: Events,
395 stock_strings: StockStrings,
396 ) -> Result<Context> {
397 let context =
398 Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
399
400 if context.check_passphrase("".to_string()).await? {
402 context.sql.open(&context, "".to_string()).await?;
403 }
404 Ok(context)
405 }
406
407 pub async fn new_closed(
409 dbfile: &Path,
410 id: u32,
411 events: Events,
412 stockstrings: StockStrings,
413 push_subscriber: PushSubscriber,
414 ) -> Result<Context> {
415 let mut blob_fname = OsString::new();
416 blob_fname.push(dbfile.file_name().unwrap_or_default());
417 blob_fname.push("-blobs");
418 let blobdir = dbfile.with_file_name(blob_fname);
419 if !blobdir.exists() {
420 tokio::fs::create_dir_all(&blobdir).await?;
421 }
422 let context = Context::with_blobdir(
423 dbfile.into(),
424 blobdir,
425 id,
426 events,
427 stockstrings,
428 push_subscriber,
429 )?;
430 Ok(context)
431 }
432
433 pub(crate) fn get_weak_context(&self) -> WeakContext {
435 WeakContext {
436 inner: Arc::downgrade(&self.inner),
437 }
438 }
439
440 #[deprecated(since = "TBD")]
447 pub async fn open(&self, passphrase: String) -> Result<bool> {
448 if self.sql.check_passphrase(passphrase.clone()).await? {
449 self.sql.open(self, passphrase).await?;
450 Ok(true)
451 } else {
452 Ok(false)
453 }
454 }
455
456 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
459 self.sql.change_passphrase(passphrase).await?;
460 Ok(())
461 }
462
463 pub async fn is_open(&self) -> bool {
465 self.sql.is_open().await
466 }
467
468 pub(crate) async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
474 self.sql.check_passphrase(passphrase).await
475 }
476
477 pub(crate) fn with_blobdir(
478 dbfile: PathBuf,
479 blobdir: PathBuf,
480 id: u32,
481 events: Events,
482 stockstrings: StockStrings,
483 push_subscriber: PushSubscriber,
484 ) -> Result<Context> {
485 ensure!(
486 blobdir.is_dir(),
487 "Blobdir does not exist: {}",
488 blobdir.display()
489 );
490
491 let new_msgs_notify = Notify::new();
492 new_msgs_notify.notify_one();
495
496 let inner = InnerContext {
497 id,
498 blobdir,
499 running_state: RwLock::new(Default::default()),
500 sql: Sql::new(dbfile),
501 smeared_timestamp: SmearedTimestamp::new(),
502 oauth2_mutex: Mutex::new(()),
503 wrong_pw_warning_mutex: Mutex::new(()),
504 housekeeping_mutex: Mutex::new(()),
505 fetch_msgs_mutex: Mutex::new(()),
506 translated_stockstrings: stockstrings,
507 events,
508 scheduler: SchedulerState::new(),
509 ratelimit: RwLock::new(Ratelimit::new(Duration::new(3, 0), 3.0)), quota: RwLock::new(BTreeMap::new()),
511 new_msgs_notify,
512 server_id: RwLock::new(None),
513 metadata: RwLock::new(None),
514 creation_time: tools::Time::now(),
515 last_error: parking_lot::RwLock::new("".to_string()),
516 migration_error: parking_lot::RwLock::new(None),
517 debug_logging: std::sync::RwLock::new(None),
518 push_subscriber,
519 push_subscribed: AtomicBool::new(false),
520 tls_session_store: TlsSessionStore::new(),
521 spki_hash_store: SpkiHashStore::new(),
522 iroh: Arc::new(RwLock::new(None)),
523 self_fingerprint: OnceLock::new(),
524 self_public_key: Mutex::new(None),
525 connectivities: parking_lot::Mutex::new(Vec::new()),
526 pre_encrypt_mime_hook: None.into(),
527 };
528
529 let ctx = Context {
530 inner: Arc::new(inner),
531 };
532
533 Ok(ctx)
534 }
535
536 pub async fn start_io(&self) {
538 if !self.is_configured().await.unwrap_or_default() {
539 warn!(self, "can not start io on a context that is not configured");
540 return;
541 }
542
543 self.sql.config_cache.write().await.clear();
549
550 self.scheduler.start(self).await;
551 }
552
553 pub async fn stop_io(&self) {
555 self.scheduler.stop(self).await;
556 if let Some(iroh) = self.iroh.write().await.take() {
557 tokio::spawn(async move {
564 let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
567 });
568 }
569 }
570
571 pub async fn restart_io_if_running(&self) {
574 self.scheduler.restart(self).await;
575 }
576
577 pub async fn maybe_network(&self) {
579 if let Some(ref iroh) = *self.iroh.read().await {
580 iroh.network_change().await;
581 }
582 self.scheduler.maybe_network().await;
583 }
584
585 pub async fn is_chatmail(&self) -> Result<bool> {
587 self.get_config_bool(Config::IsChatmail).await
588 }
589
590 pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
592 let is_chatmail = self.is_chatmail().await?;
593 let val = self
594 .get_configured_provider()
595 .await?
596 .and_then(|provider| provider.opt.max_smtp_rcpt_to)
597 .map_or_else(
598 || match is_chatmail {
599 true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
600 false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
601 },
602 usize::from,
603 );
604 Ok(val)
605 }
606
607 pub async fn background_fetch(&self) -> Result<()> {
613 if !(self.is_configured().await?) {
614 return Ok(());
615 }
616
617 let address = self.get_primary_self_addr().await?;
618 let time_start = tools::Time::now();
619 info!(self, "background_fetch started fetching {address}.");
620
621 if self.scheduler.is_running().await {
622 self.scheduler.maybe_network().await;
623 self.wait_for_all_work_done().await;
624 } else {
625 let _pause_guard = self.scheduler.pause(self).await?;
628
629 let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
631 let mut session = connection.prepare(self).await?;
632
633 let folder = connection.folder.clone();
635 connection
636 .fetch_move_delete(self, &mut session, &folder)
637 .await?;
638
639 if self
643 .quota_needs_update(
644 session.transport_id(),
645 DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT,
646 )
647 .await
648 && let Err(err) = self.update_recent_quota(&mut session, &folder).await
649 {
650 warn!(self, "Failed to update quota: {err:#}.");
651 }
652 }
653
654 info!(
655 self,
656 "background_fetch done for {address} took {:?}.",
657 time_elapsed(&time_start),
658 );
659
660 Ok(())
661 }
662
663 #[cfg(feature = "internals")]
667 pub fn sql(&self) -> &Sql {
668 &self.inner.sql
669 }
670
671 pub fn get_dbfile(&self) -> &Path {
673 self.sql.dbfile.as_path()
674 }
675
676 pub fn get_blobdir(&self) -> &Path {
678 self.blobdir.as_path()
679 }
680
681 pub fn emit_event(&self, event: EventType) {
683 {
684 let lock = self.debug_logging.read().expect("RwLock is poisoned");
685 if let Some(debug_logging) = &*lock {
686 debug_logging.log_event(event.clone());
687 }
688 }
689 self.events.emit(Event {
690 id: self.id,
691 typ: event,
692 });
693 }
694
695 pub fn emit_msgs_changed_without_ids(&self) {
697 self.emit_event(EventType::MsgsChanged {
698 chat_id: ChatId::new(0),
699 msg_id: MsgId::new(0),
700 });
701 }
702
703 pub fn emit_msgs_changed(&self, chat_id: ChatId, msg_id: MsgId) {
709 logged_debug_assert!(
710 self,
711 !chat_id.is_unset(),
712 "emit_msgs_changed: chat_id is unset."
713 );
714 logged_debug_assert!(
715 self,
716 !msg_id.is_unset(),
717 "emit_msgs_changed: msg_id is unset."
718 );
719
720 self.emit_event(EventType::MsgsChanged { chat_id, msg_id });
721 chatlist_events::emit_chatlist_changed(self);
722 chatlist_events::emit_chatlist_item_changed(self, chat_id);
723 }
724
725 pub fn emit_msgs_changed_without_msg_id(&self, chat_id: ChatId) {
727 logged_debug_assert!(
728 self,
729 !chat_id.is_unset(),
730 "emit_msgs_changed_without_msg_id: chat_id is unset."
731 );
732
733 self.emit_event(EventType::MsgsChanged {
734 chat_id,
735 msg_id: MsgId::new(0),
736 });
737 chatlist_events::emit_chatlist_changed(self);
738 chatlist_events::emit_chatlist_item_changed(self, chat_id);
739 }
740
741 pub fn emit_incoming_msg(&self, chat_id: ChatId, msg_id: MsgId) {
743 debug_assert!(!chat_id.is_unset());
744 debug_assert!(!msg_id.is_unset());
745
746 self.emit_event(EventType::IncomingMsg { chat_id, msg_id });
747 chatlist_events::emit_chatlist_changed(self);
748 chatlist_events::emit_chatlist_item_changed(self, chat_id);
749 }
750
751 pub async fn emit_location_changed(&self, contact_id: Option<ContactId>) -> Result<()> {
753 self.emit_event(EventType::LocationChanged(contact_id));
754
755 if let Some(msg_id) = self
756 .get_config_parsed::<u32>(Config::WebxdcIntegration)
757 .await?
758 {
759 self.emit_event(EventType::WebxdcStatusUpdate {
760 msg_id: MsgId::new(msg_id),
761 status_update_serial: Default::default(),
762 })
763 }
764
765 Ok(())
766 }
767
768 pub fn get_event_emitter(&self) -> EventEmitter {
773 self.events.get_emitter()
774 }
775
776 pub fn get_id(&self) -> u32 {
778 self.id
779 }
780
781 pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
791 let mut s = self.running_state.write().await;
792 ensure!(
793 matches!(*s, RunningState::Stopped),
794 "There is already another ongoing process running."
795 );
796
797 let (sender, receiver) = channel::bounded(1);
798 *s = RunningState::Running {
799 cancel_sender: sender,
800 };
801
802 Ok(receiver)
803 }
804
805 pub(crate) async fn free_ongoing(&self) {
806 let mut s = self.running_state.write().await;
807 if let RunningState::ShallStop { request } = *s {
808 info!(self, "Ongoing stopped in {:?}", time_elapsed(&request));
809 }
810 *s = RunningState::Stopped;
811 }
812
813 pub async fn stop_ongoing(&self) {
815 let mut s = self.running_state.write().await;
816 match &*s {
817 RunningState::Running { cancel_sender } => {
818 if let Err(err) = cancel_sender.send(()).await {
819 warn!(self, "could not cancel ongoing: {:#}", err);
820 }
821 info!(self, "Signaling the ongoing process to stop ASAP.",);
822 *s = RunningState::ShallStop {
823 request: tools::Time::now(),
824 };
825 }
826 RunningState::ShallStop { .. } | RunningState::Stopped => {
827 info!(self, "No ongoing process to stop.",);
828 }
829 }
830 }
831
832 #[allow(unused)]
833 pub(crate) async fn shall_stop_ongoing(&self) -> bool {
834 match &*self.running_state.read().await {
835 RunningState::Running { .. } => false,
836 RunningState::ShallStop { .. } | RunningState::Stopped => true,
837 }
838 }
839
840 pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
846 let all_self_addrs = self.get_all_self_addrs().await?.join(", ");
847 let all_transports: Vec<String> = ConfiguredLoginParam::load_all(self)
848 .await?
849 .into_iter()
850 .map(|(transport_id, param)| format!("{transport_id}: {param}"))
851 .collect();
852 let all_transports = if all_transports.is_empty() {
853 "Not configured".to_string()
854 } else {
855 all_transports.join(",")
856 };
857 let chats = get_chat_cnt(self).await?;
858 let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;
859 let request_msgs = message::get_request_msg_cnt(self).await;
860 let contacts = Contact::get_real_cnt(self).await?;
861 let proxy_enabled = self.get_config_int(Config::ProxyEnabled).await?;
862 let dbversion = self
863 .sql
864 .get_raw_config_int("dbversion")
865 .await?
866 .unwrap_or_default();
867 let journal_mode = self
868 .sql
869 .query_get_value("PRAGMA journal_mode;", ())
870 .await?
871 .unwrap_or_else(|| "unknown".to_string());
872 let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
873 let bcc_self = self.get_config_int(Config::BccSelf).await?;
874 let sync_msgs = self.get_config_int(Config::SyncMsgs).await?;
875 let disable_idle = self.get_config_bool(Config::DisableIdle).await?;
876
877 let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;", ()).await?;
878
879 let pub_key_cnt = self
880 .sql
881 .count("SELECT COUNT(*) FROM public_keys;", ())
882 .await?;
883 let fingerprint_str = match self_fingerprint(self).await {
884 Ok(fp) => fp.to_string(),
885 Err(err) => format!("<key failure: {err}>"),
886 };
887
888 let mut res = get_info();
889
890 res.insert("bot", self.get_config_int(Config::Bot).await?.to_string());
892 res.insert("number_of_chats", chats.to_string());
893 res.insert("number_of_chat_messages", unblocked_msgs.to_string());
894 res.insert("messages_in_contact_requests", request_msgs.to_string());
895 res.insert("number_of_contacts", contacts.to_string());
896 res.insert("database_dir", self.get_dbfile().display().to_string());
897 res.insert("database_version", dbversion.to_string());
898 res.insert(
899 "database_encrypted",
900 self.sql
901 .is_encrypted()
902 .await
903 .map_or_else(|| "closed".to_string(), |b| b.to_string()),
904 );
905 res.insert("journal_mode", journal_mode);
906 res.insert("blobdir", self.get_blobdir().display().to_string());
907 res.insert(
908 "selfavatar",
909 self.get_config(Config::Selfavatar)
910 .await?
911 .unwrap_or_else(|| "<unset>".to_string()),
912 );
913 res.insert("proxy_enabled", proxy_enabled.to_string());
914 res.insert("used_transport_settings", all_transports);
915
916 if let Some(server_id) = &*self.server_id.read().await {
917 res.insert("imap_server_id", format!("{server_id:?}"));
918 }
919
920 res.insert("is_chatmail", self.is_chatmail().await?.to_string());
921 res.insert(
922 "fix_is_chatmail",
923 self.get_config_bool(Config::FixIsChatmail)
924 .await?
925 .to_string(),
926 );
927 res.insert(
928 "is_muted",
929 self.get_config_bool(Config::IsMuted).await?.to_string(),
930 );
931 res.insert(
932 "private_tag",
933 self.get_config(Config::PrivateTag)
934 .await?
935 .unwrap_or_else(|| "<unset>".to_string()),
936 );
937
938 if let Some(metadata) = &*self.metadata.read().await {
939 if let Some(comment) = &metadata.comment {
940 res.insert("imap_server_comment", format!("{comment:?}"));
941 }
942
943 if let Some(admin) = &metadata.admin {
944 res.insert("imap_server_admin", format!("{admin:?}"));
945 }
946 }
947
948 res.insert("all_self_addrs", all_self_addrs);
949 res.insert(
950 "who_can_call_me",
951 self.get_config_int(Config::WhoCanCallMe).await?.to_string(),
952 );
953 res.insert(
954 "download_limit",
955 self.get_config_int(Config::DownloadLimit)
956 .await?
957 .to_string(),
958 );
959 res.insert("mdns_enabled", mdns_enabled.to_string());
960 res.insert("bcc_self", bcc_self.to_string());
961 res.insert("sync_msgs", sync_msgs.to_string());
962 res.insert("disable_idle", disable_idle.to_string());
963 res.insert("private_key_count", prv_key_cnt.to_string());
964 res.insert("public_key_count", pub_key_cnt.to_string());
965 res.insert("fingerprint", fingerprint_str);
966 res.insert(
967 "media_quality",
968 self.get_config_int(Config::MediaQuality).await?.to_string(),
969 );
970 res.insert(
971 "delete_device_after",
972 self.get_config_int(Config::DeleteDeviceAfter)
973 .await?
974 .to_string(),
975 );
976 res.insert(
977 "delete_server_after",
978 self.get_config_int(Config::DeleteServerAfter)
979 .await?
980 .to_string(),
981 );
982 res.insert(
983 "last_housekeeping",
984 self.get_config_int(Config::LastHousekeeping)
985 .await?
986 .to_string(),
987 );
988 res.insert(
989 "last_cant_decrypt_outgoing_msgs",
990 self.get_config_int(Config::LastCantDecryptOutgoingMsgs)
991 .await?
992 .to_string(),
993 );
994 res.insert(
995 "sign_unencrypted",
996 self.get_config_int(Config::SignUnencrypted)
997 .await?
998 .to_string(),
999 );
1000 res.insert(
1001 "debug_logging",
1002 self.get_config_int(Config::DebugLogging).await?.to_string(),
1003 );
1004 res.insert(
1005 "last_msg_id",
1006 self.get_config_int(Config::LastMsgId).await?.to_string(),
1007 );
1008 res.insert(
1009 "gossip_period",
1010 self.get_config_int(Config::GossipPeriod).await?.to_string(),
1011 );
1012 res.insert(
1013 "webxdc_realtime_enabled",
1014 self.get_config_bool(Config::WebxdcRealtimeEnabled)
1015 .await?
1016 .to_string(),
1017 );
1018 res.insert(
1019 "donation_request_next_check",
1020 self.get_config_i64(Config::DonationRequestNextCheck)
1021 .await?
1022 .to_string(),
1023 );
1024 res.insert(
1025 "first_key_contacts_msg_id",
1026 self.sql
1027 .get_raw_config("first_key_contacts_msg_id")
1028 .await?
1029 .unwrap_or_default(),
1030 );
1031 res.insert(
1032 "stats_id",
1033 self.get_config(Config::StatsId)
1034 .await?
1035 .unwrap_or_else(|| "<unset>".to_string()),
1036 );
1037 res.insert(
1038 "stats_sending",
1039 stats::should_send_stats(self).await?.to_string(),
1040 );
1041 res.insert(
1042 "stats_last_sent",
1043 self.get_config_i64(Config::StatsLastSent)
1044 .await?
1045 .to_string(),
1046 );
1047 res.insert(
1048 "test_hooks",
1049 self.sql
1050 .get_raw_config("test_hooks")
1051 .await?
1052 .unwrap_or_default(),
1053 );
1054 res.insert(
1055 "std_header_protection_composing",
1056 self.sql
1057 .get_raw_config("std_header_protection_composing")
1058 .await?
1059 .unwrap_or_default(),
1060 );
1061 res.insert(
1062 "team_profile",
1063 self.get_config_bool(Config::TeamProfile).await?.to_string(),
1064 );
1065
1066 let elapsed = time_elapsed(&self.creation_time);
1067 res.insert("uptime", duration_to_str(elapsed));
1068
1069 Ok(res)
1070 }
1071
1072 pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
1079 let list = self
1080 .sql
1081 .query_map_vec(
1082 "SELECT m.id
1083FROM msgs m
1084LEFT JOIN contacts ct
1085 ON m.from_id=ct.id
1086LEFT JOIN chats c
1087 ON m.chat_id=c.id
1088WHERE m.state=?
1089AND m.hidden=0
1090AND m.chat_id>9
1091AND ct.blocked=0
1092AND c.blocked=0
1093AND NOT(c.muted_until=-1 OR c.muted_until>?)
1094ORDER BY m.timestamp DESC,m.id DESC",
1095 (MessageState::InFresh, time()),
1096 |row| {
1097 let msg_id: MsgId = row.get(0)?;
1098 Ok(msg_id)
1099 },
1100 )
1101 .await?;
1102 Ok(list)
1103 }
1104
1105 pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
1117 let last_msg_id = match self.get_config(Config::LastMsgId).await? {
1118 Some(s) => MsgId::new(s.parse()?),
1119 None => {
1120 self.sql
1125 .query_row(
1126 "SELECT IFNULL((SELECT MAX(id) - 1 FROM msgs), 0)",
1127 (),
1128 |row| {
1129 let msg_id: MsgId = row.get(0)?;
1130 Ok(msg_id)
1131 },
1132 )
1133 .await?
1134 }
1135 };
1136
1137 let list = self
1138 .sql
1139 .query_map_vec(
1140 "SELECT m.id
1141 FROM msgs m
1142 LEFT JOIN contacts ct
1143 ON m.from_id=ct.id
1144 LEFT JOIN chats c
1145 ON m.chat_id=c.id
1146 WHERE m.id>?
1147 AND m.hidden=0
1148 AND m.chat_id>9
1149 AND ct.blocked=0
1150 AND c.blocked!=1
1151 ORDER BY m.id ASC",
1152 (
1153 last_msg_id.to_u32(), ),
1155 |row| {
1156 let msg_id: MsgId = row.get(0)?;
1157 Ok(msg_id)
1158 },
1159 )
1160 .await?;
1161 Ok(list)
1162 }
1163
1164 pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
1182 self.new_msgs_notify.notified().await;
1183 let list = self.get_next_msgs().await?;
1184 Ok(list)
1185 }
1186
1187 pub async fn search_msgs(&self, chat_id: Option<ChatId>, query: &str) -> Result<Vec<MsgId>> {
1198 let real_query = query.trim().to_lowercase();
1199 if real_query.is_empty() {
1200 return Ok(Vec::new());
1201 }
1202 let str_like_in_text = format!("%{real_query}%");
1203
1204 let list = if let Some(chat_id) = chat_id {
1205 self.sql
1206 .query_map_vec(
1207 "SELECT m.id AS id
1208 FROM msgs m
1209 LEFT JOIN contacts ct
1210 ON m.from_id=ct.id
1211 WHERE m.chat_id=?
1212 AND m.hidden=0
1213 AND ct.blocked=0
1214 AND IFNULL(txt_normalized, txt) LIKE ?
1215 ORDER BY m.timestamp,m.id;",
1216 (chat_id, str_like_in_text),
1217 |row| {
1218 let msg_id: MsgId = row.get("id")?;
1219 Ok(msg_id)
1220 },
1221 )
1222 .await?
1223 } else {
1224 self.sql
1235 .query_map_vec(
1236 "SELECT m.id AS id
1237 FROM msgs m
1238 LEFT JOIN contacts ct
1239 ON m.from_id=ct.id
1240 LEFT JOIN chats c
1241 ON m.chat_id=c.id
1242 WHERE m.chat_id>9
1243 AND m.hidden=0
1244 AND c.blocked!=1
1245 AND ct.blocked=0
1246 AND IFNULL(txt_normalized, txt) LIKE ?
1247 ORDER BY m.id DESC LIMIT 1000",
1248 (str_like_in_text,),
1249 |row| {
1250 let msg_id: MsgId = row.get("id")?;
1251 Ok(msg_id)
1252 },
1253 )
1254 .await?
1255 };
1256
1257 Ok(list)
1258 }
1259
1260 pub(crate) fn derive_blobdir(dbfile: &Path) -> PathBuf {
1261 let mut blob_fname = OsString::new();
1262 blob_fname.push(dbfile.file_name().unwrap_or_default());
1263 blob_fname.push("-blobs");
1264 dbfile.with_file_name(blob_fname)
1265 }
1266
1267 pub(crate) fn derive_walfile(dbfile: &Path) -> PathBuf {
1268 let mut wal_fname = OsString::new();
1269 wal_fname.push(dbfile.file_name().unwrap_or_default());
1270 wal_fname.push("-wal");
1271 dbfile.with_file_name(wal_fname)
1272 }
1273}
1274
1275#[cfg(test)]
1276mod context_tests;