1use std::collections::{BTreeMap, HashMap};
4use std::ffi::OsString;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicBool;
8use std::sync::{Arc, OnceLock, Weak};
9use std::time::Duration;
10
11use anyhow::{Result, bail, ensure};
12use async_channel::{self as channel, Receiver, Sender};
13use pgp::composed::SignedPublicKey;
14use ratelimit::Ratelimit;
15use tokio::sync::{Mutex, Notify, RwLock};
16
17use crate::chat::{ChatId, get_chat_cnt};
18use crate::config::Config;
19use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
20use crate::contact::{Contact, ContactId};
21use crate::debug_logging::DebugLogging;
22use crate::events::{Event, EventEmitter, EventType, Events};
23use crate::imap::{Imap, ServerMetadata};
24use crate::log::warn;
25use crate::logged_debug_assert;
26use crate::message::{self, MessageState, MsgId};
27use crate::net::tls::{SpkiHashStore, TlsSessionStore};
28use crate::peer_channels::Iroh;
29use crate::push::PushSubscriber;
30use crate::quota::QuotaInfo;
31use crate::scheduler::{ConnectivityStore, SchedulerState};
32use crate::sql::Sql;
33use crate::stock_str::StockStrings;
34use crate::timesmearing::SmearedTimestamp;
35use crate::tools::{self, duration_to_str, time, time_elapsed};
36use crate::transport::ConfiguredLoginParam;
37use crate::{chatlist_events, stats};
38
39pub use crate::scheduler::connectivity::Connectivity;
40
41#[derive(Clone, Debug)]
66pub struct ContextBuilder {
67 dbfile: PathBuf,
68 id: u32,
69 events: Events,
70 stock_strings: StockStrings,
71 password: Option<String>,
72
73 push_subscriber: Option<PushSubscriber>,
74}
75
76impl ContextBuilder {
77 pub fn new(dbfile: PathBuf) -> Self {
83 ContextBuilder {
84 dbfile,
85 id: rand::random(),
86 events: Events::new(),
87 stock_strings: StockStrings::new(),
88 password: None,
89 push_subscriber: None,
90 }
91 }
92
93 pub fn with_id(mut self, id: u32) -> Self {
103 self.id = id;
104 self
105 }
106
107 pub fn with_events(mut self, events: Events) -> Self {
116 self.events = events;
117 self
118 }
119
120 pub fn with_stock_strings(mut self, stock_strings: StockStrings) -> Self {
131 self.stock_strings = stock_strings;
132 self
133 }
134
135 #[deprecated(since = "TBD")]
143 pub fn with_password(mut self, password: String) -> Self {
144 self.password = Some(password);
145 self
146 }
147
148 pub(crate) fn with_push_subscriber(mut self, push_subscriber: PushSubscriber) -> Self {
150 self.push_subscriber = Some(push_subscriber);
151 self
152 }
153
154 pub async fn build(self) -> Result<Context> {
156 let push_subscriber = self.push_subscriber.unwrap_or_default();
157 let context = Context::new_closed(
158 &self.dbfile,
159 self.id,
160 self.events,
161 self.stock_strings,
162 push_subscriber,
163 )
164 .await?;
165 Ok(context)
166 }
167
168 pub async fn open(self) -> Result<Context> {
172 let password = self.password.clone().unwrap_or_default();
173 let context = self.build().await?;
174 match context.open(password).await? {
175 true => Ok(context),
176 false => bail!("database could not be decrypted, incorrect or missing password"),
177 }
178 }
179}
180
181#[derive(Clone, Debug)]
193pub struct Context {
194 pub(crate) inner: Arc<InnerContext>,
195}
196
197impl Deref for Context {
198 type Target = InnerContext;
199
200 fn deref(&self) -> &Self::Target {
201 &self.inner
202 }
203}
204
205#[derive(Clone, Debug)]
209pub(crate) struct WeakContext {
210 inner: Weak<InnerContext>,
211}
212
213impl WeakContext {
214 pub(crate) fn upgrade(&self) -> Result<Context> {
216 let inner = self
217 .inner
218 .upgrade()
219 .ok_or_else(|| anyhow::anyhow!("Inner struct has been dropped"))?;
220 Ok(Context { inner })
221 }
222}
223
224#[derive(Debug)]
226pub struct InnerContext {
227 pub(crate) blobdir: PathBuf,
229 pub(crate) sql: Sql,
230 pub(crate) smeared_timestamp: SmearedTimestamp,
231 running_state: RwLock<RunningState>,
236 pub(crate) oauth2_mutex: Mutex<()>,
238 pub(crate) wrong_pw_warning_mutex: Mutex<()>,
240 pub(crate) housekeeping_mutex: Mutex<()>,
242
243 pub(crate) fetch_msgs_mutex: Mutex<()>,
250
251 pub(crate) translated_stockstrings: StockStrings,
252 pub(crate) events: Events,
253
254 pub(crate) scheduler: SchedulerState,
255 pub(crate) ratelimit: RwLock<Ratelimit>,
256
257 pub(crate) quota: RwLock<BTreeMap<u32, QuotaInfo>>,
260
261 pub(crate) new_msgs_notify: Notify,
265
266 pub(crate) server_id: RwLock<Option<HashMap<String, String>>>,
270
271 pub(crate) metadata: RwLock<Option<ServerMetadata>>,
273
274 pub(crate) id: u32,
279
280 creation_time: tools::Time,
281
282 pub(crate) last_error: parking_lot::RwLock<String>,
286
287 pub(crate) migration_error: parking_lot::RwLock<Option<String>>,
293
294 pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
299
300 pub(crate) push_subscriber: PushSubscriber,
303
304 pub(crate) push_subscribed: AtomicBool,
306
307 pub(crate) tls_session_store: TlsSessionStore,
309
310 pub(crate) spki_hash_store: SpkiHashStore,
316
317 pub(crate) iroh: Arc<RwLock<Option<Iroh>>>,
319
320 pub(crate) self_fingerprint: OnceLock<String>,
324
325 pub(crate) self_public_key: Mutex<Option<SignedPublicKey>>,
331
332 pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
335}
336
337#[derive(Debug, Default)]
339enum RunningState {
340 Running { cancel_sender: Sender<()> },
342
343 ShallStop { request: tools::Time },
345
346 #[default]
348 Stopped,
349}
350
351#[expect(clippy::arithmetic_side_effects)]
358pub fn get_info() -> BTreeMap<&'static str, String> {
359 let mut res = BTreeMap::new();
360
361 #[cfg(debug_assertions)]
362 res.insert(
363 "debug_assertions",
364 "On - DO NOT RELEASE THIS BUILD".to_string(),
365 );
366 #[cfg(not(debug_assertions))]
367 res.insert("debug_assertions", "Off".to_string());
368
369 res.insert("deltachat_core_version", format!("v{DC_VERSION_STR}"));
370 res.insert("sqlite_version", rusqlite::version().to_string());
371 res.insert("arch", (std::mem::size_of::<usize>() * 8).to_string());
372 res.insert("num_cpus", num_cpus::get().to_string());
373 res.insert("level", "awesome".into());
374 res
375}
376
377impl Context {
378 pub async fn new(
380 dbfile: &Path,
381 id: u32,
382 events: Events,
383 stock_strings: StockStrings,
384 ) -> Result<Context> {
385 let context =
386 Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
387
388 if context.check_passphrase("".to_string()).await? {
390 context.sql.open(&context, "".to_string()).await?;
391 }
392 Ok(context)
393 }
394
395 pub async fn new_closed(
397 dbfile: &Path,
398 id: u32,
399 events: Events,
400 stockstrings: StockStrings,
401 push_subscriber: PushSubscriber,
402 ) -> Result<Context> {
403 let mut blob_fname = OsString::new();
404 blob_fname.push(dbfile.file_name().unwrap_or_default());
405 blob_fname.push("-blobs");
406 let blobdir = dbfile.with_file_name(blob_fname);
407 if !blobdir.exists() {
408 tokio::fs::create_dir_all(&blobdir).await?;
409 }
410 let context = Context::with_blobdir(
411 dbfile.into(),
412 blobdir,
413 id,
414 events,
415 stockstrings,
416 push_subscriber,
417 )?;
418 Ok(context)
419 }
420
421 pub(crate) fn get_weak_context(&self) -> WeakContext {
423 WeakContext {
424 inner: Arc::downgrade(&self.inner),
425 }
426 }
427
428 #[deprecated(since = "TBD")]
435 pub async fn open(&self, passphrase: String) -> Result<bool> {
436 if self.sql.check_passphrase(passphrase.clone()).await? {
437 self.sql.open(self, passphrase).await?;
438 Ok(true)
439 } else {
440 Ok(false)
441 }
442 }
443
444 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
447 self.sql.change_passphrase(passphrase).await?;
448 Ok(())
449 }
450
451 pub async fn is_open(&self) -> bool {
453 self.sql.is_open().await
454 }
455
456 pub(crate) async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
462 self.sql.check_passphrase(passphrase).await
463 }
464
465 pub(crate) fn with_blobdir(
466 dbfile: PathBuf,
467 blobdir: PathBuf,
468 id: u32,
469 events: Events,
470 stockstrings: StockStrings,
471 push_subscriber: PushSubscriber,
472 ) -> Result<Context> {
473 ensure!(
474 blobdir.is_dir(),
475 "Blobdir does not exist: {}",
476 blobdir.display()
477 );
478
479 let new_msgs_notify = Notify::new();
480 new_msgs_notify.notify_one();
483
484 let inner = InnerContext {
485 id,
486 blobdir,
487 running_state: RwLock::new(Default::default()),
488 sql: Sql::new(dbfile),
489 smeared_timestamp: SmearedTimestamp::new(),
490 oauth2_mutex: Mutex::new(()),
491 wrong_pw_warning_mutex: Mutex::new(()),
492 housekeeping_mutex: Mutex::new(()),
493 fetch_msgs_mutex: Mutex::new(()),
494 translated_stockstrings: stockstrings,
495 events,
496 scheduler: SchedulerState::new(),
497 ratelimit: RwLock::new(Ratelimit::new(Duration::new(3, 0), 3.0)), quota: RwLock::new(BTreeMap::new()),
499 new_msgs_notify,
500 server_id: RwLock::new(None),
501 metadata: RwLock::new(None),
502 creation_time: tools::Time::now(),
503 last_error: parking_lot::RwLock::new("".to_string()),
504 migration_error: parking_lot::RwLock::new(None),
505 debug_logging: std::sync::RwLock::new(None),
506 push_subscriber,
507 push_subscribed: AtomicBool::new(false),
508 tls_session_store: TlsSessionStore::new(),
509 spki_hash_store: SpkiHashStore::new(),
510 iroh: Arc::new(RwLock::new(None)),
511 self_fingerprint: OnceLock::new(),
512 self_public_key: Mutex::new(None),
513 connectivities: parking_lot::Mutex::new(Vec::new()),
514 };
515
516 let ctx = Context {
517 inner: Arc::new(inner),
518 };
519
520 Ok(ctx)
521 }
522
523 pub async fn start_io(&self) {
525 if !self.is_configured().await.unwrap_or_default() {
526 warn!(self, "can not start io on a context that is not configured");
527 return;
528 }
529
530 self.sql.config_cache.write().await.clear();
536
537 self.scheduler.start(self).await;
538 }
539
540 pub async fn stop_io(&self) {
542 self.scheduler.stop(self).await;
543 if let Some(iroh) = self.iroh.write().await.take() {
544 tokio::spawn(async move {
551 let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
554 });
555 }
556 }
557
558 pub async fn restart_io_if_running(&self) {
561 self.scheduler.restart(self).await;
562 }
563
564 pub async fn maybe_network(&self) {
566 if let Some(ref iroh) = *self.iroh.read().await {
567 iroh.network_change().await;
568 }
569 self.scheduler.maybe_network().await;
570 }
571
572 pub async fn is_chatmail(&self) -> Result<bool> {
574 self.get_config_bool(Config::IsChatmail).await
575 }
576
577 pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
579 let is_chatmail = self.is_chatmail().await?;
580 let val = self
581 .get_configured_provider()
582 .await?
583 .and_then(|provider| provider.opt.max_smtp_rcpt_to)
584 .map_or_else(
585 || match is_chatmail {
586 true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
587 false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
588 },
589 usize::from,
590 );
591 Ok(val)
592 }
593
594 pub async fn background_fetch(&self) -> Result<()> {
600 if !(self.is_configured().await?) {
601 return Ok(());
602 }
603
604 let address = self.get_primary_self_addr().await?;
605 let time_start = tools::Time::now();
606 info!(self, "background_fetch started fetching {address}.");
607
608 if self.scheduler.is_running().await {
609 self.scheduler.maybe_network().await;
610 self.wait_for_all_work_done().await;
611 } else {
612 let _pause_guard = self.scheduler.pause(self).await?;
615
616 let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
618 let mut session = connection.prepare(self).await?;
619
620 let folder = connection.folder.clone();
622 connection
623 .fetch_move_delete(self, &mut session, &folder)
624 .await?;
625
626 if self
630 .quota_needs_update(
631 session.transport_id(),
632 DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT,
633 )
634 .await
635 && let Err(err) = self.update_recent_quota(&mut session, &folder).await
636 {
637 warn!(self, "Failed to update quota: {err:#}.");
638 }
639 }
640
641 info!(
642 self,
643 "background_fetch done for {address} took {:?}.",
644 time_elapsed(&time_start),
645 );
646
647 Ok(())
648 }
649
650 #[cfg(feature = "internals")]
654 pub fn sql(&self) -> &Sql {
655 &self.inner.sql
656 }
657
658 pub fn get_dbfile(&self) -> &Path {
660 self.sql.dbfile.as_path()
661 }
662
663 pub fn get_blobdir(&self) -> &Path {
665 self.blobdir.as_path()
666 }
667
668 pub fn emit_event(&self, event: EventType) {
670 {
671 let lock = self.debug_logging.read().expect("RwLock is poisoned");
672 if let Some(debug_logging) = &*lock {
673 debug_logging.log_event(event.clone());
674 }
675 }
676 self.events.emit(Event {
677 id: self.id,
678 typ: event,
679 });
680 }
681
682 pub fn emit_msgs_changed_without_ids(&self) {
684 self.emit_event(EventType::MsgsChanged {
685 chat_id: ChatId::new(0),
686 msg_id: MsgId::new(0),
687 });
688 }
689
690 pub fn emit_msgs_changed(&self, chat_id: ChatId, msg_id: MsgId) {
696 logged_debug_assert!(
697 self,
698 !chat_id.is_unset(),
699 "emit_msgs_changed: chat_id is unset."
700 );
701 logged_debug_assert!(
702 self,
703 !msg_id.is_unset(),
704 "emit_msgs_changed: msg_id is unset."
705 );
706
707 self.emit_event(EventType::MsgsChanged { chat_id, msg_id });
708 chatlist_events::emit_chatlist_changed(self);
709 chatlist_events::emit_chatlist_item_changed(self, chat_id);
710 }
711
712 pub fn emit_msgs_changed_without_msg_id(&self, chat_id: ChatId) {
714 logged_debug_assert!(
715 self,
716 !chat_id.is_unset(),
717 "emit_msgs_changed_without_msg_id: chat_id is unset."
718 );
719
720 self.emit_event(EventType::MsgsChanged {
721 chat_id,
722 msg_id: MsgId::new(0),
723 });
724 chatlist_events::emit_chatlist_changed(self);
725 chatlist_events::emit_chatlist_item_changed(self, chat_id);
726 }
727
728 pub fn emit_incoming_msg(&self, chat_id: ChatId, msg_id: MsgId) {
730 debug_assert!(!chat_id.is_unset());
731 debug_assert!(!msg_id.is_unset());
732
733 self.emit_event(EventType::IncomingMsg { chat_id, msg_id });
734 chatlist_events::emit_chatlist_changed(self);
735 chatlist_events::emit_chatlist_item_changed(self, chat_id);
736 }
737
738 pub async fn emit_location_changed(&self, contact_id: Option<ContactId>) -> Result<()> {
740 self.emit_event(EventType::LocationChanged(contact_id));
741
742 if let Some(msg_id) = self
743 .get_config_parsed::<u32>(Config::WebxdcIntegration)
744 .await?
745 {
746 self.emit_event(EventType::WebxdcStatusUpdate {
747 msg_id: MsgId::new(msg_id),
748 status_update_serial: Default::default(),
749 })
750 }
751
752 Ok(())
753 }
754
755 pub fn get_event_emitter(&self) -> EventEmitter {
760 self.events.get_emitter()
761 }
762
763 pub fn get_id(&self) -> u32 {
765 self.id
766 }
767
768 pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
778 let mut s = self.running_state.write().await;
779 ensure!(
780 matches!(*s, RunningState::Stopped),
781 "There is already another ongoing process running."
782 );
783
784 let (sender, receiver) = channel::bounded(1);
785 *s = RunningState::Running {
786 cancel_sender: sender,
787 };
788
789 Ok(receiver)
790 }
791
792 pub(crate) async fn free_ongoing(&self) {
793 let mut s = self.running_state.write().await;
794 if let RunningState::ShallStop { request } = *s {
795 info!(self, "Ongoing stopped in {:?}", time_elapsed(&request));
796 }
797 *s = RunningState::Stopped;
798 }
799
800 pub async fn stop_ongoing(&self) {
802 let mut s = self.running_state.write().await;
803 match &*s {
804 RunningState::Running { cancel_sender } => {
805 if let Err(err) = cancel_sender.send(()).await {
806 warn!(self, "could not cancel ongoing: {:#}", err);
807 }
808 info!(self, "Signaling the ongoing process to stop ASAP.",);
809 *s = RunningState::ShallStop {
810 request: tools::Time::now(),
811 };
812 }
813 RunningState::ShallStop { .. } | RunningState::Stopped => {
814 info!(self, "No ongoing process to stop.",);
815 }
816 }
817 }
818
819 #[allow(unused)]
820 pub(crate) async fn shall_stop_ongoing(&self) -> bool {
821 match &*self.running_state.read().await {
822 RunningState::Running { .. } => false,
823 RunningState::ShallStop { .. } | RunningState::Stopped => true,
824 }
825 }
826
827 pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
833 let all_transports: Vec<String> = ConfiguredLoginParam::load_all(self)
834 .await?
835 .into_iter()
836 .map(|(transport_id, param)| format!("{transport_id}: {param}"))
837 .collect();
838 let all_transports = if all_transports.is_empty() {
839 "Not configured".to_string()
840 } else {
841 all_transports.join(",")
842 };
843 let chats = get_chat_cnt(self).await?;
844 let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;
845 let request_msgs = message::get_request_msg_cnt(self).await;
846 let contacts = Contact::get_real_cnt(self).await?;
847 let proxy_enabled = self.get_config_int(Config::ProxyEnabled).await?;
848 let dbversion = self
849 .sql
850 .get_raw_config_int("dbversion")
851 .await?
852 .unwrap_or_default();
853 let journal_mode = self
854 .sql
855 .query_get_value("PRAGMA journal_mode;", ())
856 .await?
857 .unwrap_or_else(|| "unknown".to_string());
858 let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
859 let bcc_self = self.get_config_int(Config::BccSelf).await?;
860 let sync_msgs = self.get_config_int(Config::SyncMsgs).await?;
861 let disable_idle = self.get_config_bool(Config::DisableIdle).await?;
862
863 let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;", ()).await?;
864
865 let pub_key_cnt = self
866 .sql
867 .count("SELECT COUNT(*) FROM public_keys;", ())
868 .await?;
869
870 let mut res = get_info();
871
872 res.insert("bot", self.get_config_int(Config::Bot).await?.to_string());
874 res.insert("number_of_chats", chats.to_string());
875 res.insert("number_of_chat_messages", unblocked_msgs.to_string());
876 res.insert("messages_in_contact_requests", request_msgs.to_string());
877 res.insert("number_of_contacts", contacts.to_string());
878 res.insert("database_dir", self.get_dbfile().display().to_string());
879 res.insert("database_version", dbversion.to_string());
880 res.insert(
881 "database_encrypted",
882 self.sql
883 .is_encrypted()
884 .await
885 .map_or_else(|| "closed".to_string(), |b| b.to_string()),
886 );
887 res.insert("journal_mode", journal_mode);
888 res.insert("blobdir", self.get_blobdir().display().to_string());
889 res.insert(
890 "selfavatar",
891 self.get_config(Config::Selfavatar)
892 .await?
893 .unwrap_or_else(|| "<unset>".to_string()),
894 );
895 res.insert("proxy_enabled", proxy_enabled.to_string());
896 res.insert("used_transport_settings", all_transports);
897
898 if let Some(server_id) = &*self.server_id.read().await {
899 res.insert("imap_server_id", format!("{server_id:?}"));
900 }
901
902 res.insert("is_chatmail", self.is_chatmail().await?.to_string());
903 res.insert(
904 "fix_is_chatmail",
905 self.get_config_bool(Config::FixIsChatmail)
906 .await?
907 .to_string(),
908 );
909 res.insert(
910 "is_muted",
911 self.get_config_bool(Config::IsMuted).await?.to_string(),
912 );
913 res.insert(
914 "private_tag",
915 self.get_config(Config::PrivateTag)
916 .await?
917 .unwrap_or_else(|| "<unset>".to_string()),
918 );
919
920 if let Some(metadata) = &*self.metadata.read().await {
921 if let Some(comment) = &metadata.comment {
922 res.insert("imap_server_comment", format!("{comment:?}"));
923 }
924
925 if let Some(admin) = &metadata.admin {
926 res.insert("imap_server_admin", format!("{admin:?}"));
927 }
928 }
929
930 res.insert(
931 "who_can_call_me",
932 self.get_config_int(Config::WhoCanCallMe).await?.to_string(),
933 );
934 res.insert(
935 "download_limit",
936 self.get_config_int(Config::DownloadLimit)
937 .await?
938 .to_string(),
939 );
940 res.insert("mdns_enabled", mdns_enabled.to_string());
941 res.insert("bcc_self", bcc_self.to_string());
942 res.insert("sync_msgs", sync_msgs.to_string());
943 res.insert("disable_idle", disable_idle.to_string());
944 res.insert("private_key_count", prv_key_cnt.to_string());
945 res.insert("public_key_count", pub_key_cnt.to_string());
946 res.insert(
947 "media_quality",
948 self.get_config_int(Config::MediaQuality).await?.to_string(),
949 );
950 res.insert(
951 "delete_device_after",
952 self.get_config_int(Config::DeleteDeviceAfter)
953 .await?
954 .to_string(),
955 );
956 res.insert(
957 "last_housekeeping",
958 self.get_config_int(Config::LastHousekeeping)
959 .await?
960 .to_string(),
961 );
962 res.insert(
963 "last_cant_decrypt_outgoing_msgs",
964 self.get_config_int(Config::LastCantDecryptOutgoingMsgs)
965 .await?
966 .to_string(),
967 );
968 res.insert(
969 "debug_logging",
970 self.get_config_int(Config::DebugLogging).await?.to_string(),
971 );
972 res.insert(
973 "last_msg_id",
974 self.get_config_int(Config::LastMsgId).await?.to_string(),
975 );
976 res.insert(
977 "gossip_period",
978 self.get_config_int(Config::GossipPeriod).await?.to_string(),
979 );
980 res.insert(
981 "webxdc_realtime_enabled",
982 self.get_config_bool(Config::WebxdcRealtimeEnabled)
983 .await?
984 .to_string(),
985 );
986 res.insert(
987 "donation_request_next_check",
988 self.get_config_i64(Config::DonationRequestNextCheck)
989 .await?
990 .to_string(),
991 );
992 res.insert(
993 "first_key_contacts_msg_id",
994 self.sql
995 .get_raw_config("first_key_contacts_msg_id")
996 .await?
997 .unwrap_or_default(),
998 );
999 res.insert(
1000 "stats_id",
1001 self.get_config(Config::StatsId)
1002 .await?
1003 .unwrap_or_else(|| "<unset>".to_string()),
1004 );
1005 res.insert(
1006 "stats_sending",
1007 stats::should_send_stats(self).await?.to_string(),
1008 );
1009 res.insert(
1010 "stats_last_sent",
1011 self.get_config_i64(Config::StatsLastSent)
1012 .await?
1013 .to_string(),
1014 );
1015 res.insert(
1016 "test_hooks",
1017 self.sql
1018 .get_raw_config("test_hooks")
1019 .await?
1020 .unwrap_or_default(),
1021 );
1022 res.insert(
1023 "std_header_protection_composing",
1024 self.sql
1025 .get_raw_config("std_header_protection_composing")
1026 .await?
1027 .unwrap_or_default(),
1028 );
1029 res.insert(
1030 "team_profile",
1031 self.get_config_bool(Config::TeamProfile).await?.to_string(),
1032 );
1033 res.insert(
1034 "force_encryption",
1035 self.get_config_bool(Config::ForceEncryption)
1036 .await?
1037 .to_string(),
1038 );
1039
1040 let elapsed = time_elapsed(&self.creation_time);
1041 res.insert("uptime", duration_to_str(elapsed));
1042
1043 Ok(res)
1044 }
1045
1046 pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
1053 let list = self
1054 .sql
1055 .query_map_vec(
1056 "SELECT m.id
1057FROM msgs m
1058LEFT JOIN contacts ct
1059 ON m.from_id=ct.id
1060LEFT JOIN chats c
1061 ON m.chat_id=c.id
1062WHERE m.state=?
1063AND m.hidden=0
1064AND m.chat_id>9
1065AND ct.blocked=0
1066AND c.blocked=0
1067AND NOT(c.muted_until=-1 OR c.muted_until>?)
1068ORDER BY m.timestamp DESC,m.id DESC",
1069 (MessageState::InFresh, time()),
1070 |row| {
1071 let msg_id: MsgId = row.get(0)?;
1072 Ok(msg_id)
1073 },
1074 )
1075 .await?;
1076 Ok(list)
1077 }
1078
1079 pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
1091 let last_msg_id = match self.get_config(Config::LastMsgId).await? {
1092 Some(s) => MsgId::new(s.parse()?),
1093 None => {
1094 self.sql
1099 .query_row(
1100 "SELECT IFNULL((SELECT MAX(id) - 1 FROM msgs), 0)",
1101 (),
1102 |row| {
1103 let msg_id: MsgId = row.get(0)?;
1104 Ok(msg_id)
1105 },
1106 )
1107 .await?
1108 }
1109 };
1110
1111 let list = self
1112 .sql
1113 .query_map_vec(
1114 "SELECT m.id
1115 FROM msgs m
1116 LEFT JOIN contacts ct
1117 ON m.from_id=ct.id
1118 LEFT JOIN chats c
1119 ON m.chat_id=c.id
1120 WHERE m.id>?
1121 AND m.hidden=0
1122 AND m.chat_id>9
1123 AND ct.blocked=0
1124 AND c.blocked!=1
1125 ORDER BY m.id ASC",
1126 (
1127 last_msg_id.to_u32(), ),
1129 |row| {
1130 let msg_id: MsgId = row.get(0)?;
1131 Ok(msg_id)
1132 },
1133 )
1134 .await?;
1135 Ok(list)
1136 }
1137
1138 pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
1156 self.new_msgs_notify.notified().await;
1157 let list = self.get_next_msgs().await?;
1158 Ok(list)
1159 }
1160
1161 pub async fn search_msgs(&self, chat_id: Option<ChatId>, query: &str) -> Result<Vec<MsgId>> {
1172 let real_query = query.trim().to_lowercase();
1173 if real_query.is_empty() {
1174 return Ok(Vec::new());
1175 }
1176 let str_like_in_text = format!("%{real_query}%");
1177
1178 let list = if let Some(chat_id) = chat_id {
1179 self.sql
1180 .query_map_vec(
1181 "SELECT m.id AS id
1182 FROM msgs m
1183 LEFT JOIN contacts ct
1184 ON m.from_id=ct.id
1185 WHERE m.chat_id=?
1186 AND m.hidden=0
1187 AND ct.blocked=0
1188 AND IFNULL(txt_normalized, txt) LIKE ?
1189 ORDER BY m.timestamp,m.id;",
1190 (chat_id, str_like_in_text),
1191 |row| {
1192 let msg_id: MsgId = row.get("id")?;
1193 Ok(msg_id)
1194 },
1195 )
1196 .await?
1197 } else {
1198 self.sql
1209 .query_map_vec(
1210 "SELECT m.id AS id
1211 FROM msgs m
1212 LEFT JOIN contacts ct
1213 ON m.from_id=ct.id
1214 LEFT JOIN chats c
1215 ON m.chat_id=c.id
1216 WHERE m.chat_id>9
1217 AND m.hidden=0
1218 AND c.blocked!=1
1219 AND ct.blocked=0
1220 AND IFNULL(txt_normalized, txt) LIKE ?
1221 ORDER BY m.id DESC LIMIT 1000",
1222 (str_like_in_text,),
1223 |row| {
1224 let msg_id: MsgId = row.get("id")?;
1225 Ok(msg_id)
1226 },
1227 )
1228 .await?
1229 };
1230
1231 Ok(list)
1232 }
1233
1234 pub(crate) fn derive_blobdir(dbfile: &Path) -> PathBuf {
1235 let mut blob_fname = OsString::new();
1236 blob_fname.push(dbfile.file_name().unwrap_or_default());
1237 blob_fname.push("-blobs");
1238 dbfile.with_file_name(blob_fname)
1239 }
1240
1241 pub(crate) fn derive_walfile(dbfile: &Path) -> PathBuf {
1242 let mut wal_fname = OsString::new();
1243 wal_fname.push(dbfile.file_name().unwrap_or_default());
1244 wal_fname.push("-wal");
1245 dbfile.with_file_name(wal_fname)
1246 }
1247}
1248
1249#[cfg(test)]
1250mod context_tests;