1use std::collections::{BTreeMap, HashMap};
4use std::ffi::OsString;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicBool;
8use std::sync::{Arc, OnceLock, Weak};
9use std::time::Duration;
10
11use anyhow::{Result, bail, ensure};
12use async_channel::{self as channel, Receiver, Sender};
13use pgp::composed::SignedPublicKey;
14use ratelimit::Ratelimit;
15use tokio::sync::{Mutex, Notify, RwLock};
16
17use crate::chat::{ChatId, get_chat_cnt};
18use crate::config::Config;
19use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
20use crate::contact::{Contact, ContactId};
21use crate::debug_logging::DebugLogging;
22use crate::events::{Event, EventEmitter, EventType, Events};
23use crate::imap::{Imap, ServerMetadata};
24use crate::log::warn;
25use crate::logged_debug_assert;
26use crate::message::{self, MessageState, MsgId};
27use crate::net::tls::{SpkiHashStore, TlsSessionStore};
28use crate::peer_channels::Iroh;
29use crate::push::PushSubscriber;
30use crate::quota::QuotaInfo;
31use crate::scheduler::{ConnectivityStore, SchedulerState};
32use crate::sql::Sql;
33use crate::stock_str::StockStrings;
34use crate::tools::{self, duration_to_str, time, time_elapsed};
35use crate::transport::ConfiguredLoginParam;
36use crate::{chatlist_events, stats};
37
38pub use crate::scheduler::connectivity::Connectivity;
39
40#[derive(Clone, Debug)]
65pub struct ContextBuilder {
66 dbfile: PathBuf,
67 id: u32,
68 events: Events,
69 stock_strings: StockStrings,
70 password: Option<String>,
71
72 push_subscriber: Option<PushSubscriber>,
73}
74
75impl ContextBuilder {
76 pub fn new(dbfile: PathBuf) -> Self {
82 ContextBuilder {
83 dbfile,
84 id: rand::random(),
85 events: Events::new(),
86 stock_strings: StockStrings::new(),
87 password: None,
88 push_subscriber: None,
89 }
90 }
91
92 pub fn with_id(mut self, id: u32) -> Self {
102 self.id = id;
103 self
104 }
105
106 pub fn with_events(mut self, events: Events) -> Self {
115 self.events = events;
116 self
117 }
118
119 pub fn with_stock_strings(mut self, stock_strings: StockStrings) -> Self {
130 self.stock_strings = stock_strings;
131 self
132 }
133
134 #[deprecated(since = "TBD")]
142 pub fn with_password(mut self, password: String) -> Self {
143 self.password = Some(password);
144 self
145 }
146
147 pub(crate) fn with_push_subscriber(mut self, push_subscriber: PushSubscriber) -> Self {
149 self.push_subscriber = Some(push_subscriber);
150 self
151 }
152
153 pub async fn build(self) -> Result<Context> {
155 let push_subscriber = self.push_subscriber.unwrap_or_default();
156 let context = Context::new_closed(
157 &self.dbfile,
158 self.id,
159 self.events,
160 self.stock_strings,
161 push_subscriber,
162 )
163 .await?;
164 Ok(context)
165 }
166
167 pub async fn open(self) -> Result<Context> {
171 let password = self.password.clone().unwrap_or_default();
172 let context = self.build().await?;
173 match context.open(password).await? {
174 true => Ok(context),
175 false => bail!("database could not be decrypted, incorrect or missing password"),
176 }
177 }
178}
179
180#[derive(Clone, Debug)]
192pub struct Context {
193 pub(crate) inner: Arc<InnerContext>,
194}
195
196impl Deref for Context {
197 type Target = InnerContext;
198
199 fn deref(&self) -> &Self::Target {
200 &self.inner
201 }
202}
203
204#[derive(Clone, Debug)]
208pub(crate) struct WeakContext {
209 inner: Weak<InnerContext>,
210}
211
212impl WeakContext {
213 pub(crate) fn upgrade(&self) -> Result<Context> {
215 let inner = self
216 .inner
217 .upgrade()
218 .ok_or_else(|| anyhow::anyhow!("Inner struct has been dropped"))?;
219 Ok(Context { inner })
220 }
221}
222
223#[derive(Debug)]
225pub struct InnerContext {
226 pub(crate) blobdir: PathBuf,
228 pub(crate) sql: Sql,
229 running_state: RwLock<RunningState>,
234 pub(crate) oauth2_mutex: Mutex<()>,
236 pub(crate) wrong_pw_warning_mutex: Mutex<()>,
238 pub(crate) housekeeping_mutex: Mutex<()>,
240
241 pub(crate) fetch_msgs_mutex: Mutex<()>,
248
249 pub(crate) translated_stockstrings: StockStrings,
250 pub(crate) events: Events,
251
252 pub(crate) scheduler: SchedulerState,
253 pub(crate) ratelimit: RwLock<Ratelimit>,
254
255 pub(crate) quota: RwLock<BTreeMap<u32, QuotaInfo>>,
258
259 pub(crate) new_msgs_notify: Notify,
263
264 pub(crate) server_id: RwLock<Option<HashMap<String, String>>>,
268
269 pub(crate) metadata: RwLock<Option<ServerMetadata>>,
271
272 pub(crate) id: u32,
277
278 creation_time: tools::Time,
279
280 pub(crate) last_error: parking_lot::RwLock<String>,
284
285 pub(crate) migration_error: parking_lot::RwLock<Option<String>>,
291
292 pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
297
298 pub(crate) push_subscriber: PushSubscriber,
301
302 pub(crate) push_subscribed: AtomicBool,
304
305 pub(crate) tls_session_store: TlsSessionStore,
307
308 pub(crate) spki_hash_store: SpkiHashStore,
314
315 pub(crate) iroh: Arc<RwLock<Option<Iroh>>>,
317
318 pub(crate) self_fingerprint: OnceLock<String>,
322
323 pub(crate) self_public_key: Mutex<Option<SignedPublicKey>>,
329
330 pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
333}
334
335#[derive(Debug, Default)]
337enum RunningState {
338 Running { cancel_sender: Sender<()> },
340
341 ShallStop { request: tools::Time },
343
344 #[default]
346 Stopped,
347}
348
349#[expect(clippy::arithmetic_side_effects)]
356pub fn get_info() -> BTreeMap<&'static str, String> {
357 let mut res = BTreeMap::new();
358
359 #[cfg(debug_assertions)]
360 res.insert(
361 "debug_assertions",
362 "On - DO NOT RELEASE THIS BUILD".to_string(),
363 );
364 #[cfg(not(debug_assertions))]
365 res.insert("debug_assertions", "Off".to_string());
366
367 res.insert("deltachat_core_version", format!("v{DC_VERSION_STR}"));
368 res.insert("sqlite_version", rusqlite::version().to_string());
369 res.insert("arch", (std::mem::size_of::<usize>() * 8).to_string());
370 res.insert("num_cpus", num_cpus::get().to_string());
371 res.insert("level", "awesome".into());
372 res
373}
374
375impl Context {
376 pub async fn new(
378 dbfile: &Path,
379 id: u32,
380 events: Events,
381 stock_strings: StockStrings,
382 ) -> Result<Context> {
383 let context =
384 Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
385
386 if context.check_passphrase("".to_string()).await? {
388 context.sql.open(&context, "".to_string()).await?;
389 }
390 Ok(context)
391 }
392
393 pub async fn new_closed(
395 dbfile: &Path,
396 id: u32,
397 events: Events,
398 stockstrings: StockStrings,
399 push_subscriber: PushSubscriber,
400 ) -> Result<Context> {
401 let mut blob_fname = OsString::new();
402 blob_fname.push(dbfile.file_name().unwrap_or_default());
403 blob_fname.push("-blobs");
404 let blobdir = dbfile.with_file_name(blob_fname);
405 if !blobdir.exists() {
406 tokio::fs::create_dir_all(&blobdir).await?;
407 }
408 let context = Context::with_blobdir(
409 dbfile.into(),
410 blobdir,
411 id,
412 events,
413 stockstrings,
414 push_subscriber,
415 )?;
416 Ok(context)
417 }
418
419 pub(crate) fn get_weak_context(&self) -> WeakContext {
421 WeakContext {
422 inner: Arc::downgrade(&self.inner),
423 }
424 }
425
426 #[deprecated(since = "TBD")]
433 pub async fn open(&self, passphrase: String) -> Result<bool> {
434 if self.sql.check_passphrase(passphrase.clone()).await? {
435 self.sql.open(self, passphrase).await?;
436 Ok(true)
437 } else {
438 Ok(false)
439 }
440 }
441
442 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
445 self.sql.change_passphrase(passphrase).await?;
446 Ok(())
447 }
448
449 pub async fn is_open(&self) -> bool {
451 self.sql.is_open().await
452 }
453
454 pub(crate) async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
460 self.sql.check_passphrase(passphrase).await
461 }
462
463 pub(crate) fn with_blobdir(
464 dbfile: PathBuf,
465 blobdir: PathBuf,
466 id: u32,
467 events: Events,
468 stockstrings: StockStrings,
469 push_subscriber: PushSubscriber,
470 ) -> Result<Context> {
471 ensure!(
472 blobdir.is_dir(),
473 "Blobdir does not exist: {}",
474 blobdir.display()
475 );
476
477 let new_msgs_notify = Notify::new();
478 new_msgs_notify.notify_one();
481
482 let inner = InnerContext {
483 id,
484 blobdir,
485 running_state: RwLock::new(Default::default()),
486 sql: Sql::new(dbfile),
487 oauth2_mutex: Mutex::new(()),
488 wrong_pw_warning_mutex: Mutex::new(()),
489 housekeeping_mutex: Mutex::new(()),
490 fetch_msgs_mutex: Mutex::new(()),
491 translated_stockstrings: stockstrings,
492 events,
493 scheduler: SchedulerState::new(),
494 ratelimit: RwLock::new(Ratelimit::new(Duration::new(3, 0), 3.0)), quota: RwLock::new(BTreeMap::new()),
496 new_msgs_notify,
497 server_id: RwLock::new(None),
498 metadata: RwLock::new(None),
499 creation_time: tools::Time::now(),
500 last_error: parking_lot::RwLock::new("".to_string()),
501 migration_error: parking_lot::RwLock::new(None),
502 debug_logging: std::sync::RwLock::new(None),
503 push_subscriber,
504 push_subscribed: AtomicBool::new(false),
505 tls_session_store: TlsSessionStore::new(),
506 spki_hash_store: SpkiHashStore::new(),
507 iroh: Arc::new(RwLock::new(None)),
508 self_fingerprint: OnceLock::new(),
509 self_public_key: Mutex::new(None),
510 connectivities: parking_lot::Mutex::new(Vec::new()),
511 };
512
513 let ctx = Context {
514 inner: Arc::new(inner),
515 };
516
517 Ok(ctx)
518 }
519
520 pub async fn start_io(&self) {
522 if !self.is_configured().await.unwrap_or_default() {
523 warn!(self, "can not start io on a context that is not configured");
524 return;
525 }
526
527 self.sql.config_cache.write().await.clear();
533
534 self.scheduler.start(self).await;
535 }
536
537 pub async fn stop_io(&self) {
539 self.scheduler.stop(self).await;
540 if let Some(iroh) = self.iroh.write().await.take() {
541 tokio::spawn(async move {
548 let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
551 });
552 }
553 }
554
555 pub async fn restart_io_if_running(&self) {
558 self.scheduler.restart(self).await;
559 }
560
561 pub async fn maybe_network(&self) {
563 if let Some(ref iroh) = *self.iroh.read().await {
564 iroh.network_change().await;
565 }
566 self.scheduler.maybe_network().await;
567 }
568
569 pub async fn is_chatmail(&self) -> Result<bool> {
575 self.get_config_bool(Config::IsChatmail).await
576 }
577
578 pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
580 let is_chatmail = self.is_chatmail().await?;
581 let val = self
582 .get_configured_provider()
583 .await?
584 .and_then(|provider| provider.opt.max_smtp_rcpt_to)
585 .map_or_else(
586 || match is_chatmail {
587 true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
588 false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
589 },
590 usize::from,
591 );
592 Ok(val)
593 }
594
595 pub async fn background_fetch(&self) -> Result<()> {
601 if !(self.is_configured().await?) {
602 return Ok(());
603 }
604
605 let address = self.get_primary_self_addr().await?;
606 let time_start = tools::Time::now();
607 info!(self, "background_fetch started fetching {address}.");
608
609 if self.scheduler.is_running().await {
610 self.scheduler.maybe_network().await;
611 self.wait_for_all_work_done().await;
612 } else {
613 let _pause_guard = self.scheduler.pause(self).await?;
616
617 let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
619 let mut session = connection.prepare(self).await?;
620
621 let folder = connection.folder.clone();
623 connection
624 .fetch_move_delete(self, &mut session, &folder)
625 .await?;
626
627 if self
631 .quota_needs_update(
632 session.transport_id(),
633 DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT,
634 )
635 .await
636 && let Err(err) = self.update_recent_quota(&mut session, &folder).await
637 {
638 warn!(self, "Failed to update quota: {err:#}.");
639 }
640 }
641
642 info!(
643 self,
644 "background_fetch done for {address} took {:?}.",
645 time_elapsed(&time_start),
646 );
647
648 Ok(())
649 }
650
651 #[cfg(feature = "internals")]
655 pub fn sql(&self) -> &Sql {
656 &self.inner.sql
657 }
658
659 pub fn get_dbfile(&self) -> &Path {
661 self.sql.dbfile.as_path()
662 }
663
664 pub fn get_blobdir(&self) -> &Path {
666 self.blobdir.as_path()
667 }
668
669 pub fn emit_event(&self, event: EventType) {
671 {
672 let lock = self.debug_logging.read().expect("RwLock is poisoned");
673 if let Some(debug_logging) = &*lock {
674 debug_logging.log_event(event.clone());
675 }
676 }
677 self.events.emit(Event {
678 id: self.id,
679 typ: event,
680 });
681 }
682
683 pub fn emit_msgs_changed_without_ids(&self) {
685 self.emit_event(EventType::MsgsChanged {
686 chat_id: ChatId::new(0),
687 msg_id: MsgId::new(0),
688 });
689 }
690
691 pub fn emit_msgs_changed(&self, chat_id: ChatId, msg_id: MsgId) {
697 logged_debug_assert!(
698 self,
699 !chat_id.is_unset(),
700 "emit_msgs_changed: chat_id is unset."
701 );
702 logged_debug_assert!(
703 self,
704 !msg_id.is_unset(),
705 "emit_msgs_changed: msg_id is unset."
706 );
707
708 self.emit_event(EventType::MsgsChanged { chat_id, msg_id });
709 chatlist_events::emit_chatlist_changed(self);
710 chatlist_events::emit_chatlist_item_changed(self, chat_id);
711 }
712
713 pub fn emit_msgs_changed_without_msg_id(&self, chat_id: ChatId) {
715 logged_debug_assert!(
716 self,
717 !chat_id.is_unset(),
718 "emit_msgs_changed_without_msg_id: chat_id is unset."
719 );
720
721 self.emit_event(EventType::MsgsChanged {
722 chat_id,
723 msg_id: MsgId::new(0),
724 });
725 chatlist_events::emit_chatlist_changed(self);
726 chatlist_events::emit_chatlist_item_changed(self, chat_id);
727 }
728
729 pub fn emit_incoming_msg(&self, chat_id: ChatId, msg_id: MsgId) {
731 debug_assert!(!chat_id.is_unset());
732 debug_assert!(!msg_id.is_unset());
733
734 self.emit_event(EventType::IncomingMsg { chat_id, msg_id });
735 chatlist_events::emit_chatlist_changed(self);
736 chatlist_events::emit_chatlist_item_changed(self, chat_id);
737 }
738
739 pub async fn emit_location_changed(&self, contact_id: Option<ContactId>) -> Result<()> {
741 self.emit_event(EventType::LocationChanged(contact_id));
742
743 if let Some(msg_id) = self
744 .get_config_parsed::<u32>(Config::WebxdcIntegration)
745 .await?
746 {
747 self.emit_event(EventType::WebxdcStatusUpdate {
748 msg_id: MsgId::new(msg_id),
749 status_update_serial: Default::default(),
750 })
751 }
752
753 Ok(())
754 }
755
756 pub fn get_event_emitter(&self) -> EventEmitter {
761 self.events.get_emitter()
762 }
763
764 pub fn get_id(&self) -> u32 {
766 self.id
767 }
768
769 pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
779 let mut s = self.running_state.write().await;
780 ensure!(
781 matches!(*s, RunningState::Stopped),
782 "There is already another ongoing process running."
783 );
784
785 let (sender, receiver) = channel::bounded(1);
786 *s = RunningState::Running {
787 cancel_sender: sender,
788 };
789
790 Ok(receiver)
791 }
792
793 pub(crate) async fn free_ongoing(&self) {
794 let mut s = self.running_state.write().await;
795 if let RunningState::ShallStop { request } = *s {
796 info!(self, "Ongoing stopped in {:?}", time_elapsed(&request));
797 }
798 *s = RunningState::Stopped;
799 }
800
801 pub async fn stop_ongoing(&self) {
803 let mut s = self.running_state.write().await;
804 match &*s {
805 RunningState::Running { cancel_sender } => {
806 if let Err(err) = cancel_sender.send(()).await {
807 warn!(self, "could not cancel ongoing: {:#}", err);
808 }
809 info!(self, "Signaling the ongoing process to stop ASAP.",);
810 *s = RunningState::ShallStop {
811 request: tools::Time::now(),
812 };
813 }
814 RunningState::ShallStop { .. } | RunningState::Stopped => {
815 info!(self, "No ongoing process to stop.",);
816 }
817 }
818 }
819
820 #[allow(unused)]
821 pub(crate) async fn shall_stop_ongoing(&self) -> bool {
822 match &*self.running_state.read().await {
823 RunningState::Running { .. } => false,
824 RunningState::ShallStop { .. } | RunningState::Stopped => true,
825 }
826 }
827
828 pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
834 let all_transports: Vec<String> = ConfiguredLoginParam::load_all(self)
835 .await?
836 .into_iter()
837 .map(|(transport_id, param)| format!("{transport_id}: {param}"))
838 .collect();
839 let all_transports = if all_transports.is_empty() {
840 "Not configured".to_string()
841 } else {
842 all_transports.join(",")
843 };
844 let chats = get_chat_cnt(self).await?;
845 let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;
846 let request_msgs = message::get_request_msg_cnt(self).await;
847 let contacts = Contact::get_real_cnt(self).await?;
848 let proxy_enabled = self.get_config_int(Config::ProxyEnabled).await?;
849 let dbversion = self
850 .sql
851 .get_raw_config_int("dbversion")
852 .await?
853 .unwrap_or_default();
854 let journal_mode = self
855 .sql
856 .query_get_value("PRAGMA journal_mode;", ())
857 .await?
858 .unwrap_or_else(|| "unknown".to_string());
859 let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
860 let bcc_self = self.get_config_int(Config::BccSelf).await?;
861 let sync_msgs = self.get_config_int(Config::SyncMsgs).await?;
862 let disable_idle = self.get_config_bool(Config::DisableIdle).await?;
863
864 let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;", ()).await?;
865
866 let pub_key_cnt = self
867 .sql
868 .count("SELECT COUNT(*) FROM public_keys;", ())
869 .await?;
870
871 let mut res = get_info();
872
873 res.insert("bot", self.get_config_int(Config::Bot).await?.to_string());
875 res.insert("number_of_chats", chats.to_string());
876 res.insert("number_of_chat_messages", unblocked_msgs.to_string());
877 res.insert("messages_in_contact_requests", request_msgs.to_string());
878 res.insert("number_of_contacts", contacts.to_string());
879 res.insert("database_dir", self.get_dbfile().display().to_string());
880 res.insert("database_version", dbversion.to_string());
881 res.insert(
882 "database_encrypted",
883 self.sql
884 .is_encrypted()
885 .await
886 .map_or_else(|| "closed".to_string(), |b| b.to_string()),
887 );
888 res.insert("journal_mode", journal_mode);
889 res.insert("blobdir", self.get_blobdir().display().to_string());
890 res.insert(
891 "selfavatar",
892 self.get_config(Config::Selfavatar)
893 .await?
894 .unwrap_or_else(|| "<unset>".to_string()),
895 );
896 res.insert("proxy_enabled", proxy_enabled.to_string());
897 res.insert("used_transport_settings", all_transports);
898
899 if let Some(server_id) = &*self.server_id.read().await {
900 res.insert("imap_server_id", format!("{server_id:?}"));
901 }
902
903 res.insert("is_chatmail", self.is_chatmail().await?.to_string());
904 res.insert(
905 "fix_is_chatmail",
906 self.get_config_bool(Config::FixIsChatmail)
907 .await?
908 .to_string(),
909 );
910 res.insert(
911 "is_muted",
912 self.get_config_bool(Config::IsMuted).await?.to_string(),
913 );
914 res.insert(
915 "private_tag",
916 self.get_config(Config::PrivateTag)
917 .await?
918 .unwrap_or_else(|| "<unset>".to_string()),
919 );
920
921 if let Some(metadata) = &*self.metadata.read().await {
922 if let Some(comment) = &metadata.comment {
923 res.insert("imap_server_comment", format!("{comment:?}"));
924 }
925
926 if let Some(admin) = &metadata.admin {
927 res.insert("imap_server_admin", format!("{admin:?}"));
928 }
929 }
930
931 res.insert(
932 "who_can_call_me",
933 self.get_config_int(Config::WhoCanCallMe).await?.to_string(),
934 );
935 res.insert(
936 "download_limit",
937 self.get_config_int(Config::DownloadLimit)
938 .await?
939 .to_string(),
940 );
941 res.insert("mdns_enabled", mdns_enabled.to_string());
942 res.insert("bcc_self", bcc_self.to_string());
943 res.insert("sync_msgs", sync_msgs.to_string());
944 res.insert("disable_idle", disable_idle.to_string());
945 res.insert("private_key_count", prv_key_cnt.to_string());
946 res.insert("public_key_count", pub_key_cnt.to_string());
947 res.insert(
948 "media_quality",
949 self.get_config_int(Config::MediaQuality).await?.to_string(),
950 );
951 res.insert(
952 "delete_device_after",
953 self.get_config_int(Config::DeleteDeviceAfter)
954 .await?
955 .to_string(),
956 );
957 res.insert(
958 "last_housekeeping",
959 self.get_config_int(Config::LastHousekeeping)
960 .await?
961 .to_string(),
962 );
963 res.insert(
964 "last_cant_decrypt_outgoing_msgs",
965 self.get_config_int(Config::LastCantDecryptOutgoingMsgs)
966 .await?
967 .to_string(),
968 );
969 res.insert(
970 "debug_logging",
971 self.get_config_int(Config::DebugLogging).await?.to_string(),
972 );
973 res.insert(
974 "last_msg_id",
975 self.get_config_int(Config::LastMsgId).await?.to_string(),
976 );
977 res.insert(
978 "gossip_period",
979 self.get_config_int(Config::GossipPeriod).await?.to_string(),
980 );
981 res.insert(
982 "webxdc_realtime_enabled",
983 self.get_config_bool(Config::WebxdcRealtimeEnabled)
984 .await?
985 .to_string(),
986 );
987 res.insert(
988 "donation_request_next_check",
989 self.get_config_i64(Config::DonationRequestNextCheck)
990 .await?
991 .to_string(),
992 );
993 res.insert(
994 "first_key_contacts_msg_id",
995 self.sql
996 .get_raw_config("first_key_contacts_msg_id")
997 .await?
998 .unwrap_or_default(),
999 );
1000 res.insert(
1001 "stats_id",
1002 self.get_config(Config::StatsId)
1003 .await?
1004 .unwrap_or_else(|| "<unset>".to_string()),
1005 );
1006 res.insert(
1007 "stats_sending",
1008 stats::should_send_stats(self).await?.to_string(),
1009 );
1010 res.insert(
1011 "stats_last_sent",
1012 self.get_config_i64(Config::StatsLastSent)
1013 .await?
1014 .to_string(),
1015 );
1016 res.insert(
1017 "test_hooks",
1018 self.sql
1019 .get_raw_config("test_hooks")
1020 .await?
1021 .unwrap_or_default(),
1022 );
1023 res.insert(
1024 "std_header_protection_composing",
1025 self.sql
1026 .get_raw_config("std_header_protection_composing")
1027 .await?
1028 .unwrap_or_default(),
1029 );
1030 res.insert(
1031 "team_profile",
1032 self.get_config_bool(Config::TeamProfile).await?.to_string(),
1033 );
1034 res.insert(
1035 "force_encryption",
1036 self.get_config_bool(Config::ForceEncryption)
1037 .await?
1038 .to_string(),
1039 );
1040
1041 let elapsed = time_elapsed(&self.creation_time);
1042 res.insert("uptime", duration_to_str(elapsed));
1043
1044 Ok(res)
1045 }
1046
1047 pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
1054 let list = self
1055 .sql
1056 .query_map_vec(
1057 "SELECT m.id
1058FROM msgs m
1059LEFT JOIN contacts ct
1060 ON m.from_id=ct.id
1061LEFT JOIN chats c
1062 ON m.chat_id=c.id
1063WHERE m.state=?
1064AND m.hidden=0
1065AND m.chat_id>9
1066AND ct.blocked=0
1067AND c.blocked=0
1068AND NOT(c.muted_until=-1 OR c.muted_until>?)
1069ORDER BY m.timestamp DESC,m.id DESC",
1070 (MessageState::InFresh, time()),
1071 |row| {
1072 let msg_id: MsgId = row.get(0)?;
1073 Ok(msg_id)
1074 },
1075 )
1076 .await?;
1077 Ok(list)
1078 }
1079
1080 pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
1092 let last_msg_id = match self.get_config(Config::LastMsgId).await? {
1093 Some(s) => MsgId::new(s.parse()?),
1094 None => {
1095 self.sql
1100 .query_row(
1101 "SELECT IFNULL((SELECT MAX(id) - 1 FROM msgs), 0)",
1102 (),
1103 |row| {
1104 let msg_id: MsgId = row.get(0)?;
1105 Ok(msg_id)
1106 },
1107 )
1108 .await?
1109 }
1110 };
1111
1112 let list = self
1113 .sql
1114 .query_map_vec(
1115 "SELECT m.id
1116 FROM msgs m
1117 LEFT JOIN contacts ct
1118 ON m.from_id=ct.id
1119 LEFT JOIN chats c
1120 ON m.chat_id=c.id
1121 WHERE m.id>?
1122 AND m.hidden=0
1123 AND m.chat_id>9
1124 AND ct.blocked=0
1125 AND c.blocked!=1
1126 ORDER BY m.id ASC",
1127 (
1128 last_msg_id.to_u32(), ),
1130 |row| {
1131 let msg_id: MsgId = row.get(0)?;
1132 Ok(msg_id)
1133 },
1134 )
1135 .await?;
1136 Ok(list)
1137 }
1138
1139 pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
1157 self.new_msgs_notify.notified().await;
1158 let list = self.get_next_msgs().await?;
1159 Ok(list)
1160 }
1161
1162 pub async fn search_msgs(&self, chat_id: Option<ChatId>, query: &str) -> Result<Vec<MsgId>> {
1173 let real_query = query.trim().to_lowercase();
1174 if real_query.is_empty() {
1175 return Ok(Vec::new());
1176 }
1177 let str_like_in_text = format!("%{real_query}%");
1178
1179 let list = if let Some(chat_id) = chat_id {
1180 self.sql
1181 .query_map_vec(
1182 "SELECT m.id AS id
1183 FROM msgs m
1184 LEFT JOIN contacts ct
1185 ON m.from_id=ct.id
1186 WHERE m.chat_id=?
1187 AND m.hidden=0
1188 AND ct.blocked=0
1189 AND IFNULL(txt_normalized, txt) LIKE ?
1190 ORDER BY m.timestamp,m.id;",
1191 (chat_id, str_like_in_text),
1192 |row| {
1193 let msg_id: MsgId = row.get("id")?;
1194 Ok(msg_id)
1195 },
1196 )
1197 .await?
1198 } else {
1199 self.sql
1210 .query_map_vec(
1211 "SELECT m.id AS id
1212 FROM msgs m
1213 LEFT JOIN contacts ct
1214 ON m.from_id=ct.id
1215 LEFT JOIN chats c
1216 ON m.chat_id=c.id
1217 WHERE m.chat_id>9
1218 AND m.hidden=0
1219 AND c.blocked!=1
1220 AND ct.blocked=0
1221 AND IFNULL(txt_normalized, txt) LIKE ?
1222 ORDER BY m.id DESC LIMIT 1000",
1223 (str_like_in_text,),
1224 |row| {
1225 let msg_id: MsgId = row.get("id")?;
1226 Ok(msg_id)
1227 },
1228 )
1229 .await?
1230 };
1231
1232 Ok(list)
1233 }
1234
1235 pub(crate) fn derive_blobdir(dbfile: &Path) -> PathBuf {
1236 let mut blob_fname = OsString::new();
1237 blob_fname.push(dbfile.file_name().unwrap_or_default());
1238 blob_fname.push("-blobs");
1239 dbfile.with_file_name(blob_fname)
1240 }
1241
1242 pub(crate) fn derive_walfile(dbfile: &Path) -> PathBuf {
1243 let mut wal_fname = OsString::new();
1244 wal_fname.push(dbfile.file_name().unwrap_or_default());
1245 wal_fname.push("-wal");
1246 dbfile.with_file_name(wal_fname)
1247 }
1248}
1249
1250#[cfg(test)]
1251mod context_tests;