1use std::collections::{BTreeMap, HashMap};
4use std::ffi::OsString;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicBool;
8use std::sync::{Arc, OnceLock, Weak};
9use std::time::Duration;
10
11use anyhow::{Result, bail, ensure};
12use async_channel::{self as channel, Receiver, Sender};
13use pgp::composed::SignedPublicKey;
14use ratelimit::Ratelimit;
15use tokio::sync::{Mutex, Notify, RwLock};
16
17use crate::chat::{ChatId, get_chat_cnt};
18use crate::config::Config;
19use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
20use crate::contact::{Contact, ContactId};
21use crate::debug_logging::DebugLogging;
22use crate::events::{Event, EventEmitter, EventType, Events};
23use crate::imap::{Imap, ServerMetadata};
24use crate::log::warn;
25use crate::logged_debug_assert;
26use crate::message::{self, MessageState, MsgId};
27use crate::net::tls::{SpkiHashStore, TlsSessionStore};
28use crate::peer_channels::Iroh;
29use crate::push::PushSubscriber;
30use crate::quota::QuotaInfo;
31use crate::scheduler::{ConnectivityStore, SchedulerState};
32use crate::sql::Sql;
33use crate::stock_str::StockStrings;
34use crate::timesmearing::SmearedTimestamp;
35use crate::tools::{self, duration_to_str, time, time_elapsed};
36use crate::transport::ConfiguredLoginParam;
37use crate::{chatlist_events, stats};
38
39pub use crate::scheduler::connectivity::Connectivity;
40
41#[derive(Clone, Debug)]
66pub struct ContextBuilder {
67 dbfile: PathBuf,
68 id: u32,
69 events: Events,
70 stock_strings: StockStrings,
71 password: Option<String>,
72
73 push_subscriber: Option<PushSubscriber>,
74}
75
76impl ContextBuilder {
77 pub fn new(dbfile: PathBuf) -> Self {
83 ContextBuilder {
84 dbfile,
85 id: rand::random(),
86 events: Events::new(),
87 stock_strings: StockStrings::new(),
88 password: None,
89 push_subscriber: None,
90 }
91 }
92
93 pub fn with_id(mut self, id: u32) -> Self {
103 self.id = id;
104 self
105 }
106
107 pub fn with_events(mut self, events: Events) -> Self {
116 self.events = events;
117 self
118 }
119
120 pub fn with_stock_strings(mut self, stock_strings: StockStrings) -> Self {
131 self.stock_strings = stock_strings;
132 self
133 }
134
135 #[deprecated(since = "TBD")]
143 pub fn with_password(mut self, password: String) -> Self {
144 self.password = Some(password);
145 self
146 }
147
148 pub(crate) fn with_push_subscriber(mut self, push_subscriber: PushSubscriber) -> Self {
150 self.push_subscriber = Some(push_subscriber);
151 self
152 }
153
154 pub async fn build(self) -> Result<Context> {
156 let push_subscriber = self.push_subscriber.unwrap_or_default();
157 let context = Context::new_closed(
158 &self.dbfile,
159 self.id,
160 self.events,
161 self.stock_strings,
162 push_subscriber,
163 )
164 .await?;
165 Ok(context)
166 }
167
168 pub async fn open(self) -> Result<Context> {
172 let password = self.password.clone().unwrap_or_default();
173 let context = self.build().await?;
174 match context.open(password).await? {
175 true => Ok(context),
176 false => bail!("database could not be decrypted, incorrect or missing password"),
177 }
178 }
179}
180
181#[derive(Clone, Debug)]
193pub struct Context {
194 pub(crate) inner: Arc<InnerContext>,
195}
196
197impl Deref for Context {
198 type Target = InnerContext;
199
200 fn deref(&self) -> &Self::Target {
201 &self.inner
202 }
203}
204
205#[derive(Clone, Debug)]
209pub(crate) struct WeakContext {
210 inner: Weak<InnerContext>,
211}
212
213impl WeakContext {
214 pub(crate) fn upgrade(&self) -> Result<Context> {
216 let inner = self
217 .inner
218 .upgrade()
219 .ok_or_else(|| anyhow::anyhow!("Inner struct has been dropped"))?;
220 Ok(Context { inner })
221 }
222}
223
224#[derive(Debug)]
226pub struct InnerContext {
227 pub(crate) blobdir: PathBuf,
229 pub(crate) sql: Sql,
230 pub(crate) smeared_timestamp: SmearedTimestamp,
231 running_state: RwLock<RunningState>,
236 pub(crate) oauth2_mutex: Mutex<()>,
238 pub(crate) wrong_pw_warning_mutex: Mutex<()>,
240 pub(crate) housekeeping_mutex: Mutex<()>,
242
243 pub(crate) fetch_msgs_mutex: Mutex<()>,
250
251 pub(crate) translated_stockstrings: StockStrings,
252 pub(crate) events: Events,
253
254 pub(crate) scheduler: SchedulerState,
255 pub(crate) ratelimit: RwLock<Ratelimit>,
256
257 pub(crate) quota: RwLock<BTreeMap<u32, QuotaInfo>>,
260
261 pub(crate) new_msgs_notify: Notify,
265
266 pub(crate) server_id: RwLock<Option<HashMap<String, String>>>,
270
271 pub(crate) metadata: RwLock<Option<ServerMetadata>>,
273
274 pub(crate) id: u32,
279
280 creation_time: tools::Time,
281
282 pub(crate) last_error: parking_lot::RwLock<String>,
286
287 pub(crate) migration_error: parking_lot::RwLock<Option<String>>,
293
294 pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
299
300 pub(crate) push_subscriber: PushSubscriber,
303
304 pub(crate) push_subscribed: AtomicBool,
306
307 pub(crate) tls_session_store: TlsSessionStore,
309
310 pub(crate) spki_hash_store: SpkiHashStore,
316
317 pub(crate) iroh: Arc<RwLock<Option<Iroh>>>,
319
320 pub(crate) self_fingerprint: OnceLock<String>,
324
325 pub(crate) self_public_key: Mutex<Option<SignedPublicKey>>,
331
332 pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
335
336 #[expect(clippy::type_complexity)]
337 pub(crate) pre_encrypt_mime_hook: parking_lot::Mutex<
339 Option<
340 for<'a> fn(
341 &Context,
342 mail_builder::mime::MimePart<'a>,
343 ) -> mail_builder::mime::MimePart<'a>,
344 >,
345 >,
346}
347
348#[derive(Debug, Default)]
350enum RunningState {
351 Running { cancel_sender: Sender<()> },
353
354 ShallStop { request: tools::Time },
356
357 #[default]
359 Stopped,
360}
361
362#[expect(clippy::arithmetic_side_effects)]
369pub fn get_info() -> BTreeMap<&'static str, String> {
370 let mut res = BTreeMap::new();
371
372 #[cfg(debug_assertions)]
373 res.insert(
374 "debug_assertions",
375 "On - DO NOT RELEASE THIS BUILD".to_string(),
376 );
377 #[cfg(not(debug_assertions))]
378 res.insert("debug_assertions", "Off".to_string());
379
380 res.insert("deltachat_core_version", format!("v{DC_VERSION_STR}"));
381 res.insert("sqlite_version", rusqlite::version().to_string());
382 res.insert("arch", (std::mem::size_of::<usize>() * 8).to_string());
383 res.insert("num_cpus", num_cpus::get().to_string());
384 res.insert("level", "awesome".into());
385 res
386}
387
388impl Context {
389 pub async fn new(
391 dbfile: &Path,
392 id: u32,
393 events: Events,
394 stock_strings: StockStrings,
395 ) -> Result<Context> {
396 let context =
397 Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
398
399 if context.check_passphrase("".to_string()).await? {
401 context.sql.open(&context, "".to_string()).await?;
402 }
403 Ok(context)
404 }
405
406 pub async fn new_closed(
408 dbfile: &Path,
409 id: u32,
410 events: Events,
411 stockstrings: StockStrings,
412 push_subscriber: PushSubscriber,
413 ) -> Result<Context> {
414 let mut blob_fname = OsString::new();
415 blob_fname.push(dbfile.file_name().unwrap_or_default());
416 blob_fname.push("-blobs");
417 let blobdir = dbfile.with_file_name(blob_fname);
418 if !blobdir.exists() {
419 tokio::fs::create_dir_all(&blobdir).await?;
420 }
421 let context = Context::with_blobdir(
422 dbfile.into(),
423 blobdir,
424 id,
425 events,
426 stockstrings,
427 push_subscriber,
428 )?;
429 Ok(context)
430 }
431
432 pub(crate) fn get_weak_context(&self) -> WeakContext {
434 WeakContext {
435 inner: Arc::downgrade(&self.inner),
436 }
437 }
438
439 #[deprecated(since = "TBD")]
446 pub async fn open(&self, passphrase: String) -> Result<bool> {
447 if self.sql.check_passphrase(passphrase.clone()).await? {
448 self.sql.open(self, passphrase).await?;
449 Ok(true)
450 } else {
451 Ok(false)
452 }
453 }
454
455 pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
458 self.sql.change_passphrase(passphrase).await?;
459 Ok(())
460 }
461
462 pub async fn is_open(&self) -> bool {
464 self.sql.is_open().await
465 }
466
467 pub(crate) async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
473 self.sql.check_passphrase(passphrase).await
474 }
475
476 pub(crate) fn with_blobdir(
477 dbfile: PathBuf,
478 blobdir: PathBuf,
479 id: u32,
480 events: Events,
481 stockstrings: StockStrings,
482 push_subscriber: PushSubscriber,
483 ) -> Result<Context> {
484 ensure!(
485 blobdir.is_dir(),
486 "Blobdir does not exist: {}",
487 blobdir.display()
488 );
489
490 let new_msgs_notify = Notify::new();
491 new_msgs_notify.notify_one();
494
495 let inner = InnerContext {
496 id,
497 blobdir,
498 running_state: RwLock::new(Default::default()),
499 sql: Sql::new(dbfile),
500 smeared_timestamp: SmearedTimestamp::new(),
501 oauth2_mutex: Mutex::new(()),
502 wrong_pw_warning_mutex: Mutex::new(()),
503 housekeeping_mutex: Mutex::new(()),
504 fetch_msgs_mutex: Mutex::new(()),
505 translated_stockstrings: stockstrings,
506 events,
507 scheduler: SchedulerState::new(),
508 ratelimit: RwLock::new(Ratelimit::new(Duration::new(3, 0), 3.0)), quota: RwLock::new(BTreeMap::new()),
510 new_msgs_notify,
511 server_id: RwLock::new(None),
512 metadata: RwLock::new(None),
513 creation_time: tools::Time::now(),
514 last_error: parking_lot::RwLock::new("".to_string()),
515 migration_error: parking_lot::RwLock::new(None),
516 debug_logging: std::sync::RwLock::new(None),
517 push_subscriber,
518 push_subscribed: AtomicBool::new(false),
519 tls_session_store: TlsSessionStore::new(),
520 spki_hash_store: SpkiHashStore::new(),
521 iroh: Arc::new(RwLock::new(None)),
522 self_fingerprint: OnceLock::new(),
523 self_public_key: Mutex::new(None),
524 connectivities: parking_lot::Mutex::new(Vec::new()),
525 pre_encrypt_mime_hook: None.into(),
526 };
527
528 let ctx = Context {
529 inner: Arc::new(inner),
530 };
531
532 Ok(ctx)
533 }
534
535 pub async fn start_io(&self) {
537 if !self.is_configured().await.unwrap_or_default() {
538 warn!(self, "can not start io on a context that is not configured");
539 return;
540 }
541
542 self.sql.config_cache.write().await.clear();
548
549 self.scheduler.start(self).await;
550 }
551
552 pub async fn stop_io(&self) {
554 self.scheduler.stop(self).await;
555 if let Some(iroh) = self.iroh.write().await.take() {
556 tokio::spawn(async move {
563 let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
566 });
567 }
568 }
569
570 pub async fn restart_io_if_running(&self) {
573 self.scheduler.restart(self).await;
574 }
575
576 pub async fn maybe_network(&self) {
578 if let Some(ref iroh) = *self.iroh.read().await {
579 iroh.network_change().await;
580 }
581 self.scheduler.maybe_network().await;
582 }
583
584 pub async fn is_chatmail(&self) -> Result<bool> {
586 self.get_config_bool(Config::IsChatmail).await
587 }
588
589 pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
591 let is_chatmail = self.is_chatmail().await?;
592 let val = self
593 .get_configured_provider()
594 .await?
595 .and_then(|provider| provider.opt.max_smtp_rcpt_to)
596 .map_or_else(
597 || match is_chatmail {
598 true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
599 false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
600 },
601 usize::from,
602 );
603 Ok(val)
604 }
605
606 pub async fn background_fetch(&self) -> Result<()> {
612 if !(self.is_configured().await?) {
613 return Ok(());
614 }
615
616 let address = self.get_primary_self_addr().await?;
617 let time_start = tools::Time::now();
618 info!(self, "background_fetch started fetching {address}.");
619
620 if self.scheduler.is_running().await {
621 self.scheduler.maybe_network().await;
622 self.wait_for_all_work_done().await;
623 } else {
624 let _pause_guard = self.scheduler.pause(self).await?;
627
628 let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
630 let mut session = connection.prepare(self).await?;
631
632 let folder = connection.folder.clone();
634 connection
635 .fetch_move_delete(self, &mut session, &folder)
636 .await?;
637
638 if self
642 .quota_needs_update(
643 session.transport_id(),
644 DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT,
645 )
646 .await
647 && let Err(err) = self.update_recent_quota(&mut session, &folder).await
648 {
649 warn!(self, "Failed to update quota: {err:#}.");
650 }
651 }
652
653 info!(
654 self,
655 "background_fetch done for {address} took {:?}.",
656 time_elapsed(&time_start),
657 );
658
659 Ok(())
660 }
661
662 #[cfg(feature = "internals")]
666 pub fn sql(&self) -> &Sql {
667 &self.inner.sql
668 }
669
670 pub fn get_dbfile(&self) -> &Path {
672 self.sql.dbfile.as_path()
673 }
674
675 pub fn get_blobdir(&self) -> &Path {
677 self.blobdir.as_path()
678 }
679
680 pub fn emit_event(&self, event: EventType) {
682 {
683 let lock = self.debug_logging.read().expect("RwLock is poisoned");
684 if let Some(debug_logging) = &*lock {
685 debug_logging.log_event(event.clone());
686 }
687 }
688 self.events.emit(Event {
689 id: self.id,
690 typ: event,
691 });
692 }
693
694 pub fn emit_msgs_changed_without_ids(&self) {
696 self.emit_event(EventType::MsgsChanged {
697 chat_id: ChatId::new(0),
698 msg_id: MsgId::new(0),
699 });
700 }
701
702 pub fn emit_msgs_changed(&self, chat_id: ChatId, msg_id: MsgId) {
708 logged_debug_assert!(
709 self,
710 !chat_id.is_unset(),
711 "emit_msgs_changed: chat_id is unset."
712 );
713 logged_debug_assert!(
714 self,
715 !msg_id.is_unset(),
716 "emit_msgs_changed: msg_id is unset."
717 );
718
719 self.emit_event(EventType::MsgsChanged { chat_id, msg_id });
720 chatlist_events::emit_chatlist_changed(self);
721 chatlist_events::emit_chatlist_item_changed(self, chat_id);
722 }
723
724 pub fn emit_msgs_changed_without_msg_id(&self, chat_id: ChatId) {
726 logged_debug_assert!(
727 self,
728 !chat_id.is_unset(),
729 "emit_msgs_changed_without_msg_id: chat_id is unset."
730 );
731
732 self.emit_event(EventType::MsgsChanged {
733 chat_id,
734 msg_id: MsgId::new(0),
735 });
736 chatlist_events::emit_chatlist_changed(self);
737 chatlist_events::emit_chatlist_item_changed(self, chat_id);
738 }
739
740 pub fn emit_incoming_msg(&self, chat_id: ChatId, msg_id: MsgId) {
742 debug_assert!(!chat_id.is_unset());
743 debug_assert!(!msg_id.is_unset());
744
745 self.emit_event(EventType::IncomingMsg { chat_id, msg_id });
746 chatlist_events::emit_chatlist_changed(self);
747 chatlist_events::emit_chatlist_item_changed(self, chat_id);
748 }
749
750 pub async fn emit_location_changed(&self, contact_id: Option<ContactId>) -> Result<()> {
752 self.emit_event(EventType::LocationChanged(contact_id));
753
754 if let Some(msg_id) = self
755 .get_config_parsed::<u32>(Config::WebxdcIntegration)
756 .await?
757 {
758 self.emit_event(EventType::WebxdcStatusUpdate {
759 msg_id: MsgId::new(msg_id),
760 status_update_serial: Default::default(),
761 })
762 }
763
764 Ok(())
765 }
766
767 pub fn get_event_emitter(&self) -> EventEmitter {
772 self.events.get_emitter()
773 }
774
775 pub fn get_id(&self) -> u32 {
777 self.id
778 }
779
780 pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
790 let mut s = self.running_state.write().await;
791 ensure!(
792 matches!(*s, RunningState::Stopped),
793 "There is already another ongoing process running."
794 );
795
796 let (sender, receiver) = channel::bounded(1);
797 *s = RunningState::Running {
798 cancel_sender: sender,
799 };
800
801 Ok(receiver)
802 }
803
804 pub(crate) async fn free_ongoing(&self) {
805 let mut s = self.running_state.write().await;
806 if let RunningState::ShallStop { request } = *s {
807 info!(self, "Ongoing stopped in {:?}", time_elapsed(&request));
808 }
809 *s = RunningState::Stopped;
810 }
811
812 pub async fn stop_ongoing(&self) {
814 let mut s = self.running_state.write().await;
815 match &*s {
816 RunningState::Running { cancel_sender } => {
817 if let Err(err) = cancel_sender.send(()).await {
818 warn!(self, "could not cancel ongoing: {:#}", err);
819 }
820 info!(self, "Signaling the ongoing process to stop ASAP.",);
821 *s = RunningState::ShallStop {
822 request: tools::Time::now(),
823 };
824 }
825 RunningState::ShallStop { .. } | RunningState::Stopped => {
826 info!(self, "No ongoing process to stop.",);
827 }
828 }
829 }
830
831 #[allow(unused)]
832 pub(crate) async fn shall_stop_ongoing(&self) -> bool {
833 match &*self.running_state.read().await {
834 RunningState::Running { .. } => false,
835 RunningState::ShallStop { .. } | RunningState::Stopped => true,
836 }
837 }
838
839 pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
845 let all_transports: Vec<String> = ConfiguredLoginParam::load_all(self)
846 .await?
847 .into_iter()
848 .map(|(transport_id, param)| format!("{transport_id}: {param}"))
849 .collect();
850 let all_transports = if all_transports.is_empty() {
851 "Not configured".to_string()
852 } else {
853 all_transports.join(",")
854 };
855 let chats = get_chat_cnt(self).await?;
856 let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;
857 let request_msgs = message::get_request_msg_cnt(self).await;
858 let contacts = Contact::get_real_cnt(self).await?;
859 let proxy_enabled = self.get_config_int(Config::ProxyEnabled).await?;
860 let dbversion = self
861 .sql
862 .get_raw_config_int("dbversion")
863 .await?
864 .unwrap_or_default();
865 let journal_mode = self
866 .sql
867 .query_get_value("PRAGMA journal_mode;", ())
868 .await?
869 .unwrap_or_else(|| "unknown".to_string());
870 let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
871 let bcc_self = self.get_config_int(Config::BccSelf).await?;
872 let sync_msgs = self.get_config_int(Config::SyncMsgs).await?;
873 let disable_idle = self.get_config_bool(Config::DisableIdle).await?;
874
875 let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;", ()).await?;
876
877 let pub_key_cnt = self
878 .sql
879 .count("SELECT COUNT(*) FROM public_keys;", ())
880 .await?;
881
882 let mut res = get_info();
883
884 res.insert("bot", self.get_config_int(Config::Bot).await?.to_string());
886 res.insert("number_of_chats", chats.to_string());
887 res.insert("number_of_chat_messages", unblocked_msgs.to_string());
888 res.insert("messages_in_contact_requests", request_msgs.to_string());
889 res.insert("number_of_contacts", contacts.to_string());
890 res.insert("database_dir", self.get_dbfile().display().to_string());
891 res.insert("database_version", dbversion.to_string());
892 res.insert(
893 "database_encrypted",
894 self.sql
895 .is_encrypted()
896 .await
897 .map_or_else(|| "closed".to_string(), |b| b.to_string()),
898 );
899 res.insert("journal_mode", journal_mode);
900 res.insert("blobdir", self.get_blobdir().display().to_string());
901 res.insert(
902 "selfavatar",
903 self.get_config(Config::Selfavatar)
904 .await?
905 .unwrap_or_else(|| "<unset>".to_string()),
906 );
907 res.insert("proxy_enabled", proxy_enabled.to_string());
908 res.insert("used_transport_settings", all_transports);
909
910 if let Some(server_id) = &*self.server_id.read().await {
911 res.insert("imap_server_id", format!("{server_id:?}"));
912 }
913
914 res.insert("is_chatmail", self.is_chatmail().await?.to_string());
915 res.insert(
916 "fix_is_chatmail",
917 self.get_config_bool(Config::FixIsChatmail)
918 .await?
919 .to_string(),
920 );
921 res.insert(
922 "is_muted",
923 self.get_config_bool(Config::IsMuted).await?.to_string(),
924 );
925 res.insert(
926 "private_tag",
927 self.get_config(Config::PrivateTag)
928 .await?
929 .unwrap_or_else(|| "<unset>".to_string()),
930 );
931
932 if let Some(metadata) = &*self.metadata.read().await {
933 if let Some(comment) = &metadata.comment {
934 res.insert("imap_server_comment", format!("{comment:?}"));
935 }
936
937 if let Some(admin) = &metadata.admin {
938 res.insert("imap_server_admin", format!("{admin:?}"));
939 }
940 }
941
942 res.insert(
943 "who_can_call_me",
944 self.get_config_int(Config::WhoCanCallMe).await?.to_string(),
945 );
946 res.insert(
947 "download_limit",
948 self.get_config_int(Config::DownloadLimit)
949 .await?
950 .to_string(),
951 );
952 res.insert("mdns_enabled", mdns_enabled.to_string());
953 res.insert("bcc_self", bcc_self.to_string());
954 res.insert("sync_msgs", sync_msgs.to_string());
955 res.insert("disable_idle", disable_idle.to_string());
956 res.insert("private_key_count", prv_key_cnt.to_string());
957 res.insert("public_key_count", pub_key_cnt.to_string());
958 res.insert(
959 "media_quality",
960 self.get_config_int(Config::MediaQuality).await?.to_string(),
961 );
962 res.insert(
963 "delete_device_after",
964 self.get_config_int(Config::DeleteDeviceAfter)
965 .await?
966 .to_string(),
967 );
968 res.insert(
969 "delete_server_after",
970 self.get_config_int(Config::DeleteServerAfter)
971 .await?
972 .to_string(),
973 );
974 res.insert(
975 "last_housekeeping",
976 self.get_config_int(Config::LastHousekeeping)
977 .await?
978 .to_string(),
979 );
980 res.insert(
981 "last_cant_decrypt_outgoing_msgs",
982 self.get_config_int(Config::LastCantDecryptOutgoingMsgs)
983 .await?
984 .to_string(),
985 );
986 res.insert(
987 "debug_logging",
988 self.get_config_int(Config::DebugLogging).await?.to_string(),
989 );
990 res.insert(
991 "last_msg_id",
992 self.get_config_int(Config::LastMsgId).await?.to_string(),
993 );
994 res.insert(
995 "gossip_period",
996 self.get_config_int(Config::GossipPeriod).await?.to_string(),
997 );
998 res.insert(
999 "webxdc_realtime_enabled",
1000 self.get_config_bool(Config::WebxdcRealtimeEnabled)
1001 .await?
1002 .to_string(),
1003 );
1004 res.insert(
1005 "donation_request_next_check",
1006 self.get_config_i64(Config::DonationRequestNextCheck)
1007 .await?
1008 .to_string(),
1009 );
1010 res.insert(
1011 "first_key_contacts_msg_id",
1012 self.sql
1013 .get_raw_config("first_key_contacts_msg_id")
1014 .await?
1015 .unwrap_or_default(),
1016 );
1017 res.insert(
1018 "stats_id",
1019 self.get_config(Config::StatsId)
1020 .await?
1021 .unwrap_or_else(|| "<unset>".to_string()),
1022 );
1023 res.insert(
1024 "stats_sending",
1025 stats::should_send_stats(self).await?.to_string(),
1026 );
1027 res.insert(
1028 "stats_last_sent",
1029 self.get_config_i64(Config::StatsLastSent)
1030 .await?
1031 .to_string(),
1032 );
1033 res.insert(
1034 "test_hooks",
1035 self.sql
1036 .get_raw_config("test_hooks")
1037 .await?
1038 .unwrap_or_default(),
1039 );
1040 res.insert(
1041 "std_header_protection_composing",
1042 self.sql
1043 .get_raw_config("std_header_protection_composing")
1044 .await?
1045 .unwrap_or_default(),
1046 );
1047 res.insert(
1048 "team_profile",
1049 self.get_config_bool(Config::TeamProfile).await?.to_string(),
1050 );
1051 res.insert(
1052 "force_encryption",
1053 self.get_config_bool(Config::ForceEncryption)
1054 .await?
1055 .to_string(),
1056 );
1057
1058 let elapsed = time_elapsed(&self.creation_time);
1059 res.insert("uptime", duration_to_str(elapsed));
1060
1061 Ok(res)
1062 }
1063
1064 pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
1071 let list = self
1072 .sql
1073 .query_map_vec(
1074 "SELECT m.id
1075FROM msgs m
1076LEFT JOIN contacts ct
1077 ON m.from_id=ct.id
1078LEFT JOIN chats c
1079 ON m.chat_id=c.id
1080WHERE m.state=?
1081AND m.hidden=0
1082AND m.chat_id>9
1083AND ct.blocked=0
1084AND c.blocked=0
1085AND NOT(c.muted_until=-1 OR c.muted_until>?)
1086ORDER BY m.timestamp DESC,m.id DESC",
1087 (MessageState::InFresh, time()),
1088 |row| {
1089 let msg_id: MsgId = row.get(0)?;
1090 Ok(msg_id)
1091 },
1092 )
1093 .await?;
1094 Ok(list)
1095 }
1096
1097 pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
1109 let last_msg_id = match self.get_config(Config::LastMsgId).await? {
1110 Some(s) => MsgId::new(s.parse()?),
1111 None => {
1112 self.sql
1117 .query_row(
1118 "SELECT IFNULL((SELECT MAX(id) - 1 FROM msgs), 0)",
1119 (),
1120 |row| {
1121 let msg_id: MsgId = row.get(0)?;
1122 Ok(msg_id)
1123 },
1124 )
1125 .await?
1126 }
1127 };
1128
1129 let list = self
1130 .sql
1131 .query_map_vec(
1132 "SELECT m.id
1133 FROM msgs m
1134 LEFT JOIN contacts ct
1135 ON m.from_id=ct.id
1136 LEFT JOIN chats c
1137 ON m.chat_id=c.id
1138 WHERE m.id>?
1139 AND m.hidden=0
1140 AND m.chat_id>9
1141 AND ct.blocked=0
1142 AND c.blocked!=1
1143 ORDER BY m.id ASC",
1144 (
1145 last_msg_id.to_u32(), ),
1147 |row| {
1148 let msg_id: MsgId = row.get(0)?;
1149 Ok(msg_id)
1150 },
1151 )
1152 .await?;
1153 Ok(list)
1154 }
1155
1156 pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
1174 self.new_msgs_notify.notified().await;
1175 let list = self.get_next_msgs().await?;
1176 Ok(list)
1177 }
1178
1179 pub async fn search_msgs(&self, chat_id: Option<ChatId>, query: &str) -> Result<Vec<MsgId>> {
1190 let real_query = query.trim().to_lowercase();
1191 if real_query.is_empty() {
1192 return Ok(Vec::new());
1193 }
1194 let str_like_in_text = format!("%{real_query}%");
1195
1196 let list = if let Some(chat_id) = chat_id {
1197 self.sql
1198 .query_map_vec(
1199 "SELECT m.id AS id
1200 FROM msgs m
1201 LEFT JOIN contacts ct
1202 ON m.from_id=ct.id
1203 WHERE m.chat_id=?
1204 AND m.hidden=0
1205 AND ct.blocked=0
1206 AND IFNULL(txt_normalized, txt) LIKE ?
1207 ORDER BY m.timestamp,m.id;",
1208 (chat_id, str_like_in_text),
1209 |row| {
1210 let msg_id: MsgId = row.get("id")?;
1211 Ok(msg_id)
1212 },
1213 )
1214 .await?
1215 } else {
1216 self.sql
1227 .query_map_vec(
1228 "SELECT m.id AS id
1229 FROM msgs m
1230 LEFT JOIN contacts ct
1231 ON m.from_id=ct.id
1232 LEFT JOIN chats c
1233 ON m.chat_id=c.id
1234 WHERE m.chat_id>9
1235 AND m.hidden=0
1236 AND c.blocked!=1
1237 AND ct.blocked=0
1238 AND IFNULL(txt_normalized, txt) LIKE ?
1239 ORDER BY m.id DESC LIMIT 1000",
1240 (str_like_in_text,),
1241 |row| {
1242 let msg_id: MsgId = row.get("id")?;
1243 Ok(msg_id)
1244 },
1245 )
1246 .await?
1247 };
1248
1249 Ok(list)
1250 }
1251
1252 pub(crate) fn derive_blobdir(dbfile: &Path) -> PathBuf {
1253 let mut blob_fname = OsString::new();
1254 blob_fname.push(dbfile.file_name().unwrap_or_default());
1255 blob_fname.push("-blobs");
1256 dbfile.with_file_name(blob_fname)
1257 }
1258
1259 pub(crate) fn derive_walfile(dbfile: &Path) -> PathBuf {
1260 let mut wal_fname = OsString::new();
1261 wal_fname.push(dbfile.file_name().unwrap_or_default());
1262 wal_fname.push("-wal");
1263 dbfile.with_file_name(wal_fname)
1264 }
1265}
1266
1267#[cfg(test)]
1268mod context_tests;