Skip to main content

deltachat/
context.rs

1//! Context module.
2
3use std::collections::{BTreeMap, HashMap};
4use std::ffi::OsString;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicBool;
8use std::sync::{Arc, OnceLock, Weak};
9use std::time::Duration;
10
11use anyhow::{Result, bail, ensure};
12use async_channel::{self as channel, Receiver, Sender};
13use pgp::composed::SignedPublicKey;
14use ratelimit::Ratelimit;
15use tokio::sync::{Mutex, Notify, RwLock};
16
17use crate::chat::{ChatId, get_chat_cnt};
18use crate::config::Config;
19use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
20use crate::contact::{Contact, ContactId};
21use crate::debug_logging::DebugLogging;
22use crate::events::{Event, EventEmitter, EventType, Events};
23use crate::imap::{Imap, ServerMetadata};
24use crate::log::warn;
25use crate::logged_debug_assert;
26use crate::message::{self, MessageState, MsgId};
27use crate::net::tls::{SpkiHashStore, TlsSessionStore};
28use crate::peer_channels::Iroh;
29use crate::push::PushSubscriber;
30use crate::quota::QuotaInfo;
31use crate::scheduler::{ConnectivityStore, SchedulerState};
32use crate::sql::Sql;
33use crate::stock_str::StockStrings;
34use crate::tools::{self, duration_to_str, time, time_elapsed};
35use crate::transport::ConfiguredLoginParam;
36use crate::{chatlist_events, stats};
37
38pub use crate::scheduler::connectivity::Connectivity;
39
40/// Builder for the [`Context`].
41///
42/// Many arguments to the [`Context`] are kind of optional and only needed to handle
43/// multiple contexts, for which the [account manager](crate::accounts::Accounts) should be
44/// used.  This builder makes creating a new context simpler, especially for the
45/// standalone-context case.
46///
47/// # Examples
48///
49/// Creating a new database:
50///
51/// ```
52/// # let rt = tokio::runtime::Runtime::new().unwrap();
53/// # rt.block_on(async move {
54/// use deltachat::context::ContextBuilder;
55///
56/// let dir = tempfile::tempdir().unwrap();
57/// let context = ContextBuilder::new(dir.path().join("db"))
58///      .open()
59///      .await
60///      .unwrap();
61/// drop(context);
62/// # });
63/// ```
64#[derive(Clone, Debug)]
65pub struct ContextBuilder {
66    dbfile: PathBuf,
67    id: u32,
68    events: Events,
69    stock_strings: StockStrings,
70    password: Option<String>,
71
72    push_subscriber: Option<PushSubscriber>,
73}
74
75impl ContextBuilder {
76    /// Create the builder using the given database file.
77    ///
78    /// The *dbfile* should be in a dedicated directory and this directory must exist.  The
79    /// [`Context`] will create other files and folders in the same directory as the
80    /// database file used.
81    pub fn new(dbfile: PathBuf) -> Self {
82        ContextBuilder {
83            dbfile,
84            id: rand::random(),
85            events: Events::new(),
86            stock_strings: StockStrings::new(),
87            password: None,
88            push_subscriber: None,
89        }
90    }
91
92    /// Sets the context ID.
93    ///
94    /// This identifier is used e.g. in [`Event`]s to identify which [`Context`] an event
95    /// belongs to.  The only real limit on it is that it should not conflict with any other
96    /// [`Context`]s you currently have open.  So if you handle multiple [`Context`]s you
97    /// may want to use this.
98    ///
99    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
100    /// common case for using multiple [`Context`] instances.
101    pub fn with_id(mut self, id: u32) -> Self {
102        self.id = id;
103        self
104    }
105
106    /// Sets the event channel for this [`Context`].
107    ///
108    /// Mostly useful when using multiple [`Context`]s, this allows creating one [`Events`]
109    /// channel and passing it to all [`Context`]s so all events are received on the same
110    /// channel.
111    ///
112    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
113    /// common case for using multiple [`Context`] instances.
114    pub fn with_events(mut self, events: Events) -> Self {
115        self.events = events;
116        self
117    }
118
119    /// Sets the [`StockStrings`] map to use for this [`Context`].
120    ///
121    /// This is useful in order to share the same translation strings in all [`Context`]s.
122    /// The mapping may be empty when set, it will be populated by
123    /// [`Context::set_stock_translation`] or [`Accounts::set_stock_translation`] calls.
124    ///
125    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
126    /// common case for using multiple [`Context`] instances.
127    ///
128    /// [`Accounts::set_stock_translation`]: crate::accounts::Accounts::set_stock_translation
129    pub fn with_stock_strings(mut self, stock_strings: StockStrings) -> Self {
130        self.stock_strings = stock_strings;
131        self
132    }
133
134    /// Sets the password to unlock the database.
135    /// Deprecated 2025-11:
136    /// - Db encryption does nothing with blobs, so fs/disk encryption is recommended.
137    /// - Isolation from other apps is needed anyway.
138    ///
139    /// If an encrypted database is used it must be opened with a password.  Setting a
140    /// password on a new database will enable encryption.
141    #[deprecated(since = "TBD")]
142    pub fn with_password(mut self, password: String) -> Self {
143        self.password = Some(password);
144        self
145    }
146
147    /// Sets push subscriber.
148    pub(crate) fn with_push_subscriber(mut self, push_subscriber: PushSubscriber) -> Self {
149        self.push_subscriber = Some(push_subscriber);
150        self
151    }
152
153    /// Builds the [`Context`] without opening it.
154    pub async fn build(self) -> Result<Context> {
155        let push_subscriber = self.push_subscriber.unwrap_or_default();
156        let context = Context::new_closed(
157            &self.dbfile,
158            self.id,
159            self.events,
160            self.stock_strings,
161            push_subscriber,
162        )
163        .await?;
164        Ok(context)
165    }
166
167    /// Builds the [`Context`] and opens it.
168    ///
169    /// Returns error if context cannot be opened.
170    pub async fn open(self) -> Result<Context> {
171        let password = self.password.clone().unwrap_or_default();
172        let context = self.build().await?;
173        match context.open(password).await? {
174            true => Ok(context),
175            false => bail!("database could not be decrypted, incorrect or missing password"),
176        }
177    }
178}
179
180/// The context for a single DeltaChat account.
181///
182/// This contains all the state for a single DeltaChat account, including background tasks
183/// running in Tokio to operate the account.  The [`Context`] can be cheaply cloned.
184///
185/// Each context, and thus each account, must be associated with an directory where all the
186/// state is kept.  This state is also preserved between restarts.
187///
188/// To use multiple accounts it is best to look at the [accounts
189/// manager][crate::accounts::Accounts] which handles storing multiple accounts in a single
190/// directory structure and handles loading them all concurrently.
191#[derive(Clone, Debug)]
192pub struct Context {
193    pub(crate) inner: Arc<InnerContext>,
194}
195
196impl Deref for Context {
197    type Target = InnerContext;
198
199    fn deref(&self) -> &Self::Target {
200        &self.inner
201    }
202}
203
204/// A weak reference to a [`Context`]
205///
206/// Can be used to obtain a [`Context`]. An existing weak reference does not prevent the corresponding [`Context`] from being dropped.
207#[derive(Clone, Debug)]
208pub(crate) struct WeakContext {
209    inner: Weak<InnerContext>,
210}
211
212impl WeakContext {
213    /// Returns the [`Context`] if it is still available.
214    pub(crate) fn upgrade(&self) -> Result<Context> {
215        let inner = self
216            .inner
217            .upgrade()
218            .ok_or_else(|| anyhow::anyhow!("Inner struct has been dropped"))?;
219        Ok(Context { inner })
220    }
221}
222
223/// Actual context, expensive to clone.
224#[derive(Debug)]
225pub struct InnerContext {
226    /// Blob directory path
227    pub(crate) blobdir: PathBuf,
228    pub(crate) sql: Sql,
229    /// The global "ongoing" process state.
230    ///
231    /// This is a global mutex-like state for operations which should be modal in the
232    /// clients.
233    running_state: RwLock<RunningState>,
234    /// Mutex to enforce only a single running oauth2 is running.
235    pub(crate) oauth2_mutex: Mutex<()>,
236    /// Mutex to prevent a race condition when a "your pw is wrong" warning is sent, resulting in multiple messages being sent.
237    pub(crate) wrong_pw_warning_mutex: Mutex<()>,
238    /// Mutex to prevent running housekeeping from multiple threads at once.
239    pub(crate) housekeeping_mutex: Mutex<()>,
240
241    /// Mutex to prevent multiple IMAP loops from fetching the messages at once.
242    ///
243    /// Without this mutex IMAP loops may waste traffic downloading the same message
244    /// from multiple IMAP servers and create multiple copies of the same message
245    /// in the database if the check for duplicates and creating a message
246    /// happens in separate database transactions.
247    pub(crate) fetch_msgs_mutex: Mutex<()>,
248
249    pub(crate) translated_stockstrings: StockStrings,
250    pub(crate) events: Events,
251
252    pub(crate) scheduler: SchedulerState,
253    pub(crate) ratelimit: RwLock<Ratelimit>,
254
255    /// Recently loaded quota information for each trasnport, if any.
256    /// If quota was never tried to load, then the transport doesn't have an entry in the BTreeMap.
257    pub(crate) quota: RwLock<BTreeMap<u32, QuotaInfo>>,
258
259    /// Notify about new messages.
260    ///
261    /// This causes [`Context::wait_next_msgs`] to wake up.
262    pub(crate) new_msgs_notify: Notify,
263
264    /// Server ID response if ID capability is supported
265    /// and the server returned non-NIL on the inbox connection.
266    /// <https://datatracker.ietf.org/doc/html/rfc2971>
267    pub(crate) server_id: RwLock<Option<HashMap<String, String>>>,
268
269    /// IMAP METADATA.
270    pub(crate) metadata: RwLock<Option<ServerMetadata>>,
271
272    /// ID for this `Context` in the current process.
273    ///
274    /// This allows for multiple `Context`s open in a single process where each context can
275    /// be identified by this ID.
276    pub(crate) id: u32,
277
278    creation_time: tools::Time,
279
280    /// The text of the last error logged and emitted as an event.
281    /// If the ui wants to display an error after a failure,
282    /// `last_error` should be used to avoid races with the event thread.
283    pub(crate) last_error: parking_lot::RwLock<String>,
284
285    /// It's not possible to emit migration errors as an event,
286    /// because at the time of the migration, there is no event emitter yet.
287    /// So, this holds the error that happened during migration, if any.
288    /// This is necessary for the possibly-failible PGP migration,
289    /// which happened 2025-05, and can be removed a few releases later.
290    pub(crate) migration_error: parking_lot::RwLock<Option<String>>,
291
292    /// If debug logging is enabled, this contains all necessary information
293    ///
294    /// Standard RwLock instead of [`tokio::sync::RwLock`] is used
295    /// because the lock is used from synchronous [`Context::emit_event`].
296    pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
297
298    /// Push subscriber to store device token
299    /// and register for heartbeat notifications.
300    pub(crate) push_subscriber: PushSubscriber,
301
302    /// True if account has subscribed to push notifications via IMAP.
303    pub(crate) push_subscribed: AtomicBool,
304
305    /// TLS session resumption cache.
306    pub(crate) tls_session_store: TlsSessionStore,
307
308    /// Store for TLS SPKI hashes.
309    ///
310    /// Used to remember public keys
311    /// of TLS certificates to accept them
312    /// even after they expire.
313    pub(crate) spki_hash_store: SpkiHashStore,
314
315    /// Iroh for realtime peer channels.
316    pub(crate) iroh: Arc<RwLock<Option<Iroh>>>,
317
318    /// The own fingerprint, if it was computed already.
319    /// tokio::sync::OnceCell would be possible to use, but overkill for our usecase;
320    /// the standard library's OnceLock is enough, and it's a lot smaller in memory.
321    pub(crate) self_fingerprint: OnceLock<String>,
322
323    /// OpenPGP certificate aka Transferrable Public Key.
324    ///
325    /// It is generated on first use from the secret key stored in the database.
326    ///
327    /// Mutex is also held while generating the key to avoid generating the key twice.
328    pub(crate) self_public_key: Mutex<Option<SignedPublicKey>>,
329
330    /// `Connectivity` values for mailboxes, unordered. Used to compute the aggregate connectivity,
331    /// see [`Context::get_connectivity()`].
332    pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
333}
334
335/// The state of ongoing process.
336#[derive(Debug, Default)]
337enum RunningState {
338    /// Ongoing process is allocated.
339    Running { cancel_sender: Sender<()> },
340
341    /// Cancel signal has been sent, waiting for ongoing process to be freed.
342    ShallStop { request: tools::Time },
343
344    /// There is no ongoing process, a new one can be allocated.
345    #[default]
346    Stopped,
347}
348
349/// Return some info about deltachat-core
350///
351/// This contains information mostly about the library itself, the
352/// actual keys and their values which will be present are not
353/// guaranteed.  Calling [Context::get_info] also includes information
354/// about the context on top of the information here.
355#[expect(clippy::arithmetic_side_effects)]
356pub fn get_info() -> BTreeMap<&'static str, String> {
357    let mut res = BTreeMap::new();
358
359    #[cfg(debug_assertions)]
360    res.insert(
361        "debug_assertions",
362        "On - DO NOT RELEASE THIS BUILD".to_string(),
363    );
364    #[cfg(not(debug_assertions))]
365    res.insert("debug_assertions", "Off".to_string());
366
367    res.insert("deltachat_core_version", format!("v{DC_VERSION_STR}"));
368    res.insert("sqlite_version", rusqlite::version().to_string());
369    res.insert("arch", (std::mem::size_of::<usize>() * 8).to_string());
370    res.insert("num_cpus", num_cpus::get().to_string());
371    res.insert("level", "awesome".into());
372    res
373}
374
375impl Context {
376    /// Creates new context and opens the database.
377    pub async fn new(
378        dbfile: &Path,
379        id: u32,
380        events: Events,
381        stock_strings: StockStrings,
382    ) -> Result<Context> {
383        let context =
384            Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
385
386        // Open the database if is not encrypted.
387        if context.check_passphrase("".to_string()).await? {
388            context.sql.open(&context, "".to_string()).await?;
389        }
390        Ok(context)
391    }
392
393    /// Creates new context without opening the database.
394    pub async fn new_closed(
395        dbfile: &Path,
396        id: u32,
397        events: Events,
398        stockstrings: StockStrings,
399        push_subscriber: PushSubscriber,
400    ) -> Result<Context> {
401        let mut blob_fname = OsString::new();
402        blob_fname.push(dbfile.file_name().unwrap_or_default());
403        blob_fname.push("-blobs");
404        let blobdir = dbfile.with_file_name(blob_fname);
405        if !blobdir.exists() {
406            tokio::fs::create_dir_all(&blobdir).await?;
407        }
408        let context = Context::with_blobdir(
409            dbfile.into(),
410            blobdir,
411            id,
412            events,
413            stockstrings,
414            push_subscriber,
415        )?;
416        Ok(context)
417    }
418
419    /// Returns a weak reference to this [`Context`].
420    pub(crate) fn get_weak_context(&self) -> WeakContext {
421        WeakContext {
422            inner: Arc::downgrade(&self.inner),
423        }
424    }
425
426    /// Opens the database with the given passphrase.
427    /// NB: Db encryption is deprecated, so `passphrase` should be empty normally. See
428    /// [`ContextBuilder::with_password()`] for reasoning.
429    ///
430    /// Returns true if passphrase is correct, false is passphrase is not correct. Fails on other
431    /// errors.
432    #[deprecated(since = "TBD")]
433    pub async fn open(&self, passphrase: String) -> Result<bool> {
434        if self.sql.check_passphrase(passphrase.clone()).await? {
435            self.sql.open(self, passphrase).await?;
436            Ok(true)
437        } else {
438            Ok(false)
439        }
440    }
441
442    /// Changes encrypted database passphrase.
443    /// Deprecated 2025-11, see [`ContextBuilder::with_password()`] for reasoning.
444    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
445        self.sql.change_passphrase(passphrase).await?;
446        Ok(())
447    }
448
449    /// Returns true if database is open.
450    pub async fn is_open(&self) -> bool {
451        self.sql.is_open().await
452    }
453
454    /// Tests the database passphrase.
455    ///
456    /// Returns true if passphrase is correct.
457    ///
458    /// Fails if database is already open.
459    pub(crate) async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
460        self.sql.check_passphrase(passphrase).await
461    }
462
463    pub(crate) fn with_blobdir(
464        dbfile: PathBuf,
465        blobdir: PathBuf,
466        id: u32,
467        events: Events,
468        stockstrings: StockStrings,
469        push_subscriber: PushSubscriber,
470    ) -> Result<Context> {
471        ensure!(
472            blobdir.is_dir(),
473            "Blobdir does not exist: {}",
474            blobdir.display()
475        );
476
477        let new_msgs_notify = Notify::new();
478        // Notify once immediately to allow processing old messages
479        // without starting I/O.
480        new_msgs_notify.notify_one();
481
482        let inner = InnerContext {
483            id,
484            blobdir,
485            running_state: RwLock::new(Default::default()),
486            sql: Sql::new(dbfile),
487            oauth2_mutex: Mutex::new(()),
488            wrong_pw_warning_mutex: Mutex::new(()),
489            housekeeping_mutex: Mutex::new(()),
490            fetch_msgs_mutex: Mutex::new(()),
491            translated_stockstrings: stockstrings,
492            events,
493            scheduler: SchedulerState::new(),
494            ratelimit: RwLock::new(Ratelimit::new(Duration::new(3, 0), 3.0)), // Allow at least 1 message every second + a burst of 3.
495            quota: RwLock::new(BTreeMap::new()),
496            new_msgs_notify,
497            server_id: RwLock::new(None),
498            metadata: RwLock::new(None),
499            creation_time: tools::Time::now(),
500            last_error: parking_lot::RwLock::new("".to_string()),
501            migration_error: parking_lot::RwLock::new(None),
502            debug_logging: std::sync::RwLock::new(None),
503            push_subscriber,
504            push_subscribed: AtomicBool::new(false),
505            tls_session_store: TlsSessionStore::new(),
506            spki_hash_store: SpkiHashStore::new(),
507            iroh: Arc::new(RwLock::new(None)),
508            self_fingerprint: OnceLock::new(),
509            self_public_key: Mutex::new(None),
510            connectivities: parking_lot::Mutex::new(Vec::new()),
511        };
512
513        let ctx = Context {
514            inner: Arc::new(inner),
515        };
516
517        Ok(ctx)
518    }
519
520    /// Starts the IO scheduler.
521    pub async fn start_io(&self) {
522        if !self.is_configured().await.unwrap_or_default() {
523            warn!(self, "can not start io on a context that is not configured");
524            return;
525        }
526
527        // The next line is mainly for iOS:
528        // iOS starts a separate process for receiving notifications and if the user concurrently
529        // starts the app, the UI process opens the database but waits with calling start_io()
530        // until the notifications process finishes.
531        // Now, some configs may have changed, so, we need to invalidate the cache.
532        self.sql.config_cache.write().await.clear();
533
534        self.scheduler.start(self).await;
535    }
536
537    /// Stops the IO scheduler.
538    pub async fn stop_io(&self) {
539        self.scheduler.stop(self).await;
540        if let Some(iroh) = self.iroh.write().await.take() {
541            // Close all QUIC connections.
542
543            // Spawn into a separate task,
544            // because Iroh calls `wait_idle()` internally
545            // and it may take time, especially if the network
546            // has become unavailable.
547            tokio::spawn(async move {
548                // We do not log the error because we do not want the task
549                // to hold the reference to Context.
550                let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
551            });
552        }
553    }
554
555    /// Restarts the IO scheduler if it was running before
556    /// when it is not running this is an no-op
557    pub async fn restart_io_if_running(&self) {
558        self.scheduler.restart(self).await;
559    }
560
561    /// Indicate that the network likely has come back.
562    pub async fn maybe_network(&self) {
563        if let Some(ref iroh) = *self.iroh.read().await {
564            iroh.network_change().await;
565        }
566        self.scheduler.maybe_network().await;
567    }
568
569    /// Returns true if an account is on a chatmail server.
570    pub async fn is_chatmail(&self) -> Result<bool> {
571        self.get_config_bool(Config::IsChatmail).await
572    }
573
574    /// Returns maximum number of recipients the provider allows to send a single email to.
575    pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
576        let is_chatmail = self.is_chatmail().await?;
577        let val = self
578            .get_configured_provider()
579            .await?
580            .and_then(|provider| provider.opt.max_smtp_rcpt_to)
581            .map_or_else(
582                || match is_chatmail {
583                    true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
584                    false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
585                },
586                usize::from,
587            );
588        Ok(val)
589    }
590
591    /// Does a single round of fetching from IMAP and returns.
592    ///
593    /// Can be used even if I/O is currently stopped.
594    /// If I/O is currently stopped, starts a new IMAP connection
595    /// and fetches from Inbox and DeltaChat folders.
596    pub async fn background_fetch(&self) -> Result<()> {
597        if !(self.is_configured().await?) {
598            return Ok(());
599        }
600
601        let address = self.get_primary_self_addr().await?;
602        let time_start = tools::Time::now();
603        info!(self, "background_fetch started fetching {address}.");
604
605        if self.scheduler.is_running().await {
606            self.scheduler.maybe_network().await;
607            self.wait_for_all_work_done().await;
608        } else {
609            // Pause the scheduler to ensure another connection does not start
610            // while we are fetching on a dedicated connection.
611            let _pause_guard = self.scheduler.pause(self).await?;
612
613            // Start a new dedicated connection.
614            let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
615            let mut session = connection.prepare(self).await?;
616
617            // Fetch IMAP folders.
618            let folder = connection.folder.clone();
619            connection
620                .fetch_move_delete(self, &mut session, &folder)
621                .await?;
622
623            // Update quota (to send warning if full) - but only check it once in a while.
624            // note: For now this only checks quota of primary transport,
625            // because background check only checks primary transport at the moment
626            if self
627                .quota_needs_update(
628                    session.transport_id(),
629                    DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT,
630                )
631                .await
632                && let Err(err) = self.update_recent_quota(&mut session, &folder).await
633            {
634                warn!(self, "Failed to update quota: {err:#}.");
635            }
636        }
637
638        info!(
639            self,
640            "background_fetch done for {address} took {:?}.",
641            time_elapsed(&time_start),
642        );
643
644        Ok(())
645    }
646
647    /// Returns a reference to the underlying SQL instance.
648    ///
649    /// Warning: this is only here for testing, not part of the public API.
650    #[cfg(feature = "internals")]
651    pub fn sql(&self) -> &Sql {
652        &self.inner.sql
653    }
654
655    /// Returns database file path.
656    pub fn get_dbfile(&self) -> &Path {
657        self.sql.dbfile.as_path()
658    }
659
660    /// Returns blob directory path.
661    pub fn get_blobdir(&self) -> &Path {
662        self.blobdir.as_path()
663    }
664
665    /// Emits a single event.
666    pub fn emit_event(&self, event: EventType) {
667        {
668            let lock = self.debug_logging.read().expect("RwLock is poisoned");
669            if let Some(debug_logging) = &*lock {
670                debug_logging.log_event(event.clone());
671            }
672        }
673        self.events.emit(Event {
674            id: self.id,
675            typ: event,
676        });
677    }
678
679    /// Emits a generic MsgsChanged event (without chat or message id)
680    pub fn emit_msgs_changed_without_ids(&self) {
681        self.emit_event(EventType::MsgsChanged {
682            chat_id: ChatId::new(0),
683            msg_id: MsgId::new(0),
684        });
685    }
686
687    /// Emits a MsgsChanged event with specified chat and message ids
688    ///
689    /// If IDs are unset, [`Self::emit_msgs_changed_without_ids`]
690    /// or [`Self::emit_msgs_changed_without_msg_id`] should be used
691    /// instead of this function.
692    pub fn emit_msgs_changed(&self, chat_id: ChatId, msg_id: MsgId) {
693        logged_debug_assert!(
694            self,
695            !chat_id.is_unset(),
696            "emit_msgs_changed: chat_id is unset."
697        );
698        logged_debug_assert!(
699            self,
700            !msg_id.is_unset(),
701            "emit_msgs_changed: msg_id is unset."
702        );
703
704        self.emit_event(EventType::MsgsChanged { chat_id, msg_id });
705        chatlist_events::emit_chatlist_changed(self);
706        chatlist_events::emit_chatlist_item_changed(self, chat_id);
707    }
708
709    /// Emits a MsgsChanged event with specified chat and without message id.
710    pub fn emit_msgs_changed_without_msg_id(&self, chat_id: ChatId) {
711        logged_debug_assert!(
712            self,
713            !chat_id.is_unset(),
714            "emit_msgs_changed_without_msg_id: chat_id is unset."
715        );
716
717        self.emit_event(EventType::MsgsChanged {
718            chat_id,
719            msg_id: MsgId::new(0),
720        });
721        chatlist_events::emit_chatlist_changed(self);
722        chatlist_events::emit_chatlist_item_changed(self, chat_id);
723    }
724
725    /// Emits an IncomingMsg event with specified chat and message ids
726    pub fn emit_incoming_msg(&self, chat_id: ChatId, msg_id: MsgId) {
727        debug_assert!(!chat_id.is_unset());
728        debug_assert!(!msg_id.is_unset());
729
730        self.emit_event(EventType::IncomingMsg { chat_id, msg_id });
731        chatlist_events::emit_chatlist_changed(self);
732        chatlist_events::emit_chatlist_item_changed(self, chat_id);
733    }
734
735    /// Emits an LocationChanged event and a WebxdcStatusUpdate in case there is a maps integration
736    pub async fn emit_location_changed(&self, contact_id: Option<ContactId>) -> Result<()> {
737        self.emit_event(EventType::LocationChanged(contact_id));
738
739        if let Some(msg_id) = self
740            .get_config_parsed::<u32>(Config::WebxdcIntegration)
741            .await?
742        {
743            self.emit_event(EventType::WebxdcStatusUpdate {
744                msg_id: MsgId::new(msg_id),
745                status_update_serial: Default::default(),
746            })
747        }
748
749        Ok(())
750    }
751
752    /// Returns a receiver for emitted events.
753    ///
754    /// Multiple emitters can be created, but note that in this case each emitted event will
755    /// only be received by one of the emitters, not by all of them.
756    pub fn get_event_emitter(&self) -> EventEmitter {
757        self.events.get_emitter()
758    }
759
760    /// Get the ID of this context.
761    pub fn get_id(&self) -> u32 {
762        self.id
763    }
764
765    // Ongoing process allocation/free/check
766
767    /// Tries to acquire the global UI "ongoing" mutex.
768    ///
769    /// This is for modal operations during which no other user actions are allowed.  Only
770    /// one such operation is allowed at any given time.
771    ///
772    /// The return value is a cancel token, which will release the ongoing mutex when
773    /// dropped.
774    pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
775        let mut s = self.running_state.write().await;
776        ensure!(
777            matches!(*s, RunningState::Stopped),
778            "There is already another ongoing process running."
779        );
780
781        let (sender, receiver) = channel::bounded(1);
782        *s = RunningState::Running {
783            cancel_sender: sender,
784        };
785
786        Ok(receiver)
787    }
788
789    pub(crate) async fn free_ongoing(&self) {
790        let mut s = self.running_state.write().await;
791        if let RunningState::ShallStop { request } = *s {
792            info!(self, "Ongoing stopped in {:?}", time_elapsed(&request));
793        }
794        *s = RunningState::Stopped;
795    }
796
797    /// Signal an ongoing process to stop.
798    pub async fn stop_ongoing(&self) {
799        let mut s = self.running_state.write().await;
800        match &*s {
801            RunningState::Running { cancel_sender } => {
802                if let Err(err) = cancel_sender.send(()).await {
803                    warn!(self, "could not cancel ongoing: {:#}", err);
804                }
805                info!(self, "Signaling the ongoing process to stop ASAP.",);
806                *s = RunningState::ShallStop {
807                    request: tools::Time::now(),
808                };
809            }
810            RunningState::ShallStop { .. } | RunningState::Stopped => {
811                info!(self, "No ongoing process to stop.",);
812            }
813        }
814    }
815
816    #[allow(unused)]
817    pub(crate) async fn shall_stop_ongoing(&self) -> bool {
818        match &*self.running_state.read().await {
819            RunningState::Running { .. } => false,
820            RunningState::ShallStop { .. } | RunningState::Stopped => true,
821        }
822    }
823
824    /*******************************************************************************
825     * UI chat/message related API
826     ******************************************************************************/
827
828    /// Returns information about the context as key-value pairs.
829    pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
830        let all_transports: Vec<String> = ConfiguredLoginParam::load_all(self)
831            .await?
832            .into_iter()
833            .map(|(transport_id, param)| format!("{transport_id}: {param}"))
834            .collect();
835        let all_transports = if all_transports.is_empty() {
836            "Not configured".to_string()
837        } else {
838            all_transports.join(",")
839        };
840        let chats = get_chat_cnt(self).await?;
841        let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;
842        let request_msgs = message::get_request_msg_cnt(self).await;
843        let contacts = Contact::get_real_cnt(self).await?;
844        let proxy_enabled = self.get_config_int(Config::ProxyEnabled).await?;
845        let dbversion = self
846            .sql
847            .get_raw_config_int("dbversion")
848            .await?
849            .unwrap_or_default();
850        let journal_mode = self
851            .sql
852            .query_get_value("PRAGMA journal_mode;", ())
853            .await?
854            .unwrap_or_else(|| "unknown".to_string());
855        let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
856        let bcc_self = self.get_config_int(Config::BccSelf).await?;
857        let sync_msgs = self.get_config_int(Config::SyncMsgs).await?;
858        let disable_idle = self.get_config_bool(Config::DisableIdle).await?;
859
860        let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;", ()).await?;
861
862        let pub_key_cnt = self
863            .sql
864            .count("SELECT COUNT(*) FROM public_keys;", ())
865            .await?;
866
867        let mut res = get_info();
868
869        // insert values
870        res.insert("bot", self.get_config_int(Config::Bot).await?.to_string());
871        res.insert("number_of_chats", chats.to_string());
872        res.insert("number_of_chat_messages", unblocked_msgs.to_string());
873        res.insert("messages_in_contact_requests", request_msgs.to_string());
874        res.insert("number_of_contacts", contacts.to_string());
875        res.insert("database_dir", self.get_dbfile().display().to_string());
876        res.insert("database_version", dbversion.to_string());
877        res.insert(
878            "database_encrypted",
879            self.sql
880                .is_encrypted()
881                .await
882                .map_or_else(|| "closed".to_string(), |b| b.to_string()),
883        );
884        res.insert("journal_mode", journal_mode);
885        res.insert("blobdir", self.get_blobdir().display().to_string());
886        res.insert(
887            "selfavatar",
888            self.get_config(Config::Selfavatar)
889                .await?
890                .unwrap_or_else(|| "<unset>".to_string()),
891        );
892        res.insert("proxy_enabled", proxy_enabled.to_string());
893        res.insert("used_transport_settings", all_transports);
894
895        if let Some(server_id) = &*self.server_id.read().await {
896            res.insert("imap_server_id", format!("{server_id:?}"));
897        }
898
899        res.insert("is_chatmail", self.is_chatmail().await?.to_string());
900        res.insert(
901            "fix_is_chatmail",
902            self.get_config_bool(Config::FixIsChatmail)
903                .await?
904                .to_string(),
905        );
906        res.insert(
907            "is_muted",
908            self.get_config_bool(Config::IsMuted).await?.to_string(),
909        );
910        res.insert(
911            "private_tag",
912            self.get_config(Config::PrivateTag)
913                .await?
914                .unwrap_or_else(|| "<unset>".to_string()),
915        );
916
917        if let Some(metadata) = &*self.metadata.read().await {
918            if let Some(comment) = &metadata.comment {
919                res.insert("imap_server_comment", format!("{comment:?}"));
920            }
921
922            if let Some(admin) = &metadata.admin {
923                res.insert("imap_server_admin", format!("{admin:?}"));
924            }
925        }
926
927        res.insert(
928            "who_can_call_me",
929            self.get_config_int(Config::WhoCanCallMe).await?.to_string(),
930        );
931        res.insert(
932            "download_limit",
933            self.get_config_int(Config::DownloadLimit)
934                .await?
935                .to_string(),
936        );
937        res.insert("mdns_enabled", mdns_enabled.to_string());
938        res.insert("bcc_self", bcc_self.to_string());
939        res.insert("sync_msgs", sync_msgs.to_string());
940        res.insert("disable_idle", disable_idle.to_string());
941        res.insert("private_key_count", prv_key_cnt.to_string());
942        res.insert("public_key_count", pub_key_cnt.to_string());
943        res.insert(
944            "media_quality",
945            self.get_config_int(Config::MediaQuality).await?.to_string(),
946        );
947        res.insert(
948            "delete_device_after",
949            self.get_config_int(Config::DeleteDeviceAfter)
950                .await?
951                .to_string(),
952        );
953        res.insert(
954            "last_housekeeping",
955            self.get_config_int(Config::LastHousekeeping)
956                .await?
957                .to_string(),
958        );
959        res.insert(
960            "last_cant_decrypt_outgoing_msgs",
961            self.get_config_int(Config::LastCantDecryptOutgoingMsgs)
962                .await?
963                .to_string(),
964        );
965        res.insert(
966            "debug_logging",
967            self.get_config_int(Config::DebugLogging).await?.to_string(),
968        );
969        res.insert(
970            "last_msg_id",
971            self.get_config_int(Config::LastMsgId).await?.to_string(),
972        );
973        res.insert(
974            "gossip_period",
975            self.get_config_int(Config::GossipPeriod).await?.to_string(),
976        );
977        res.insert(
978            "webxdc_realtime_enabled",
979            self.get_config_bool(Config::WebxdcRealtimeEnabled)
980                .await?
981                .to_string(),
982        );
983        res.insert(
984            "donation_request_next_check",
985            self.get_config_i64(Config::DonationRequestNextCheck)
986                .await?
987                .to_string(),
988        );
989        res.insert(
990            "first_key_contacts_msg_id",
991            self.sql
992                .get_raw_config("first_key_contacts_msg_id")
993                .await?
994                .unwrap_or_default(),
995        );
996        res.insert(
997            "stats_id",
998            self.get_config(Config::StatsId)
999                .await?
1000                .unwrap_or_else(|| "<unset>".to_string()),
1001        );
1002        res.insert(
1003            "stats_sending",
1004            stats::should_send_stats(self).await?.to_string(),
1005        );
1006        res.insert(
1007            "stats_last_sent",
1008            self.get_config_i64(Config::StatsLastSent)
1009                .await?
1010                .to_string(),
1011        );
1012        res.insert(
1013            "test_hooks",
1014            self.sql
1015                .get_raw_config("test_hooks")
1016                .await?
1017                .unwrap_or_default(),
1018        );
1019        res.insert(
1020            "std_header_protection_composing",
1021            self.sql
1022                .get_raw_config("std_header_protection_composing")
1023                .await?
1024                .unwrap_or_default(),
1025        );
1026        res.insert(
1027            "team_profile",
1028            self.get_config_bool(Config::TeamProfile).await?.to_string(),
1029        );
1030        res.insert(
1031            "force_encryption",
1032            self.get_config_bool(Config::ForceEncryption)
1033                .await?
1034                .to_string(),
1035        );
1036
1037        let elapsed = time_elapsed(&self.creation_time);
1038        res.insert("uptime", duration_to_str(elapsed));
1039
1040        Ok(res)
1041    }
1042
1043    /// Get a list of fresh, unmuted messages in unblocked chats.
1044    ///
1045    /// The list starts with the most recent message
1046    /// and is typically used to show notifications.
1047    /// Moreover, the number of returned messages
1048    /// can be used for a badge counter on the app icon.
1049    pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
1050        let list = self
1051            .sql
1052            .query_map_vec(
1053                "SELECT m.id
1054FROM msgs m
1055LEFT JOIN contacts ct
1056    ON m.from_id=ct.id
1057LEFT JOIN chats c
1058    ON m.chat_id=c.id
1059WHERE m.state=?
1060AND m.hidden=0
1061AND m.chat_id>9
1062AND ct.blocked=0
1063AND c.blocked=0
1064AND NOT(c.muted_until=-1 OR c.muted_until>?)
1065ORDER BY m.timestamp DESC,m.id DESC",
1066                (MessageState::InFresh, time()),
1067                |row| {
1068                    let msg_id: MsgId = row.get(0)?;
1069                    Ok(msg_id)
1070                },
1071            )
1072            .await?;
1073        Ok(list)
1074    }
1075
1076    /// (deprecated) Returns a list of messages with database ID higher than requested.
1077    ///
1078    /// Blocked contacts and chats are excluded,
1079    /// but self-sent messages and contact requests are included in the results.
1080    ///
1081    /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives,
1082    /// even if it is not fully downloaded yet.
1083    /// The bot needs to wait for the message to be fully downloaded.
1084    /// Since this is usually not the desired behavior,
1085    /// bots should instead use the [`EventType::IncomingMsg`]
1086    /// event for getting notified about new messages.
1087    pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
1088        let last_msg_id = match self.get_config(Config::LastMsgId).await? {
1089            Some(s) => MsgId::new(s.parse()?),
1090            None => {
1091                // If `last_msg_id` is not set yet,
1092                // subtract 1 from the last id,
1093                // so a single message is returned and can
1094                // be marked as seen.
1095                self.sql
1096                    .query_row(
1097                        "SELECT IFNULL((SELECT MAX(id) - 1 FROM msgs), 0)",
1098                        (),
1099                        |row| {
1100                            let msg_id: MsgId = row.get(0)?;
1101                            Ok(msg_id)
1102                        },
1103                    )
1104                    .await?
1105            }
1106        };
1107
1108        let list = self
1109            .sql
1110            .query_map_vec(
1111                "SELECT m.id
1112                     FROM msgs m
1113                     LEFT JOIN contacts ct
1114                            ON m.from_id=ct.id
1115                     LEFT JOIN chats c
1116                            ON m.chat_id=c.id
1117                     WHERE m.id>?
1118                       AND m.hidden=0
1119                       AND m.chat_id>9
1120                       AND ct.blocked=0
1121                       AND c.blocked!=1
1122                     ORDER BY m.id ASC",
1123                (
1124                    last_msg_id.to_u32(), // Explicitly convert to u32 because 0 is allowed.
1125                ),
1126                |row| {
1127                    let msg_id: MsgId = row.get(0)?;
1128                    Ok(msg_id)
1129                },
1130            )
1131            .await?;
1132        Ok(list)
1133    }
1134
1135    /// (deprecated) Returns a list of messages with database ID higher than last marked as seen.
1136    ///
1137    /// This function is supposed to be used by bot to request messages
1138    /// that are not processed yet.
1139    ///
1140    /// Waits for notification and returns a result.
1141    /// Note that the result may be empty if the message is deleted
1142    /// shortly after notification or notification is manually triggered
1143    /// to interrupt waiting.
1144    /// Notification may be manually triggered by calling [`Self::stop_io`].
1145    ///
1146    /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives,
1147    /// even if it is not fully downloaded yet.
1148    /// The bot needs to wait for the message to be fully downloaded.
1149    /// Since this is usually not the desired behavior,
1150    /// bots should instead use the #DC_EVENT_INCOMING_MSG / [`EventType::IncomingMsg`]
1151    /// event for getting notified about new messages.
1152    pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
1153        self.new_msgs_notify.notified().await;
1154        let list = self.get_next_msgs().await?;
1155        Ok(list)
1156    }
1157
1158    /// Searches for messages containing the query string case-insensitively.
1159    ///
1160    /// If `chat_id` is provided this searches only for messages in this chat, if `chat_id`
1161    /// is `None` this searches messages from all chats.
1162    ///
1163    /// NB: Wrt the search in long messages which are shown truncated with the "Show Full Message…"
1164    /// button, we only look at the first several kilobytes. Let's not fix this -- one can send a
1165    /// dictionary in the message that matches any reasonable search request, but the user won't see
1166    /// the match because they should tap on "Show Full Message…" for that. Probably such messages
1167    /// would only clutter search results.
1168    pub async fn search_msgs(&self, chat_id: Option<ChatId>, query: &str) -> Result<Vec<MsgId>> {
1169        let real_query = query.trim().to_lowercase();
1170        if real_query.is_empty() {
1171            return Ok(Vec::new());
1172        }
1173        let str_like_in_text = format!("%{real_query}%");
1174
1175        let list = if let Some(chat_id) = chat_id {
1176            self.sql
1177                .query_map_vec(
1178                    "SELECT m.id AS id
1179                 FROM msgs m
1180                 LEFT JOIN contacts ct
1181                        ON m.from_id=ct.id
1182                 WHERE m.chat_id=?
1183                   AND m.hidden=0
1184                   AND ct.blocked=0
1185                   AND IFNULL(txt_normalized, txt) LIKE ?
1186                 ORDER BY m.timestamp,m.id;",
1187                    (chat_id, str_like_in_text),
1188                    |row| {
1189                        let msg_id: MsgId = row.get("id")?;
1190                        Ok(msg_id)
1191                    },
1192                )
1193                .await?
1194        } else {
1195            // For performance reasons results are sorted only by `id`, that is in the order of
1196            // message reception.
1197            //
1198            // Unlike chat view, sorting by `timestamp` is not necessary but slows down the query by
1199            // ~25% according to benchmarks.
1200            //
1201            // To speed up incremental search, where queries for few characters usually return lots
1202            // of unwanted results that are discarded moments later, we added `LIMIT 1000`.
1203            // According to some tests, this limit speeds up eg. 2 character searches by factor 10.
1204            // The limit is documented and UI may add a hint when getting 1000 results.
1205            self.sql
1206                .query_map_vec(
1207                    "SELECT m.id AS id
1208                 FROM msgs m
1209                 LEFT JOIN contacts ct
1210                        ON m.from_id=ct.id
1211                 LEFT JOIN chats c
1212                        ON m.chat_id=c.id
1213                 WHERE m.chat_id>9
1214                   AND m.hidden=0
1215                   AND c.blocked!=1
1216                   AND ct.blocked=0
1217                   AND IFNULL(txt_normalized, txt) LIKE ?
1218                 ORDER BY m.id DESC LIMIT 1000",
1219                    (str_like_in_text,),
1220                    |row| {
1221                        let msg_id: MsgId = row.get("id")?;
1222                        Ok(msg_id)
1223                    },
1224                )
1225                .await?
1226        };
1227
1228        Ok(list)
1229    }
1230
1231    pub(crate) fn derive_blobdir(dbfile: &Path) -> PathBuf {
1232        let mut blob_fname = OsString::new();
1233        blob_fname.push(dbfile.file_name().unwrap_or_default());
1234        blob_fname.push("-blobs");
1235        dbfile.with_file_name(blob_fname)
1236    }
1237
1238    pub(crate) fn derive_walfile(dbfile: &Path) -> PathBuf {
1239        let mut wal_fname = OsString::new();
1240        wal_fname.push(dbfile.file_name().unwrap_or_default());
1241        wal_fname.push("-wal");
1242        dbfile.with_file_name(wal_fname)
1243    }
1244}
1245
1246#[cfg(test)]
1247mod context_tests;