Skip to main content

deltachat/
context.rs

1//! Context module.
2
3use std::collections::{BTreeMap, HashMap};
4use std::ffi::OsString;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicBool;
8use std::sync::{Arc, OnceLock, Weak};
9use std::time::Duration;
10
11use anyhow::{Result, bail, ensure};
12use async_channel::{self as channel, Receiver, Sender};
13use pgp::composed::SignedPublicKey;
14use ratelimit::Ratelimit;
15use tokio::sync::{Mutex, Notify, RwLock};
16
17use crate::chat::{ChatId, get_chat_cnt};
18use crate::config::Config;
19use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
20use crate::contact::{Contact, ContactId};
21use crate::debug_logging::DebugLogging;
22use crate::events::{Event, EventEmitter, EventType, Events};
23use crate::imap::{Imap, ServerMetadata};
24use crate::log::warn;
25use crate::logged_debug_assert;
26use crate::message::{self, MessageState, MsgId};
27use crate::net::tls::{SpkiHashStore, TlsSessionStore};
28use crate::peer_channels::Iroh;
29use crate::push::PushSubscriber;
30use crate::quota::QuotaInfo;
31use crate::scheduler::{ConnectivityStore, SchedulerState};
32use crate::sql::Sql;
33use crate::stock_str::StockStrings;
34use crate::timesmearing::SmearedTimestamp;
35use crate::tools::{self, duration_to_str, time, time_elapsed};
36use crate::transport::ConfiguredLoginParam;
37use crate::{chatlist_events, stats};
38
39pub use crate::scheduler::connectivity::Connectivity;
40
41/// Builder for the [`Context`].
42///
43/// Many arguments to the [`Context`] are kind of optional and only needed to handle
44/// multiple contexts, for which the [account manager](crate::accounts::Accounts) should be
45/// used.  This builder makes creating a new context simpler, especially for the
46/// standalone-context case.
47///
48/// # Examples
49///
50/// Creating a new database:
51///
52/// ```
53/// # let rt = tokio::runtime::Runtime::new().unwrap();
54/// # rt.block_on(async move {
55/// use deltachat::context::ContextBuilder;
56///
57/// let dir = tempfile::tempdir().unwrap();
58/// let context = ContextBuilder::new(dir.path().join("db"))
59///      .open()
60///      .await
61///      .unwrap();
62/// drop(context);
63/// # });
64/// ```
65#[derive(Clone, Debug)]
66pub struct ContextBuilder {
67    dbfile: PathBuf,
68    id: u32,
69    events: Events,
70    stock_strings: StockStrings,
71    password: Option<String>,
72
73    push_subscriber: Option<PushSubscriber>,
74}
75
76impl ContextBuilder {
77    /// Create the builder using the given database file.
78    ///
79    /// The *dbfile* should be in a dedicated directory and this directory must exist.  The
80    /// [`Context`] will create other files and folders in the same directory as the
81    /// database file used.
82    pub fn new(dbfile: PathBuf) -> Self {
83        ContextBuilder {
84            dbfile,
85            id: rand::random(),
86            events: Events::new(),
87            stock_strings: StockStrings::new(),
88            password: None,
89            push_subscriber: None,
90        }
91    }
92
93    /// Sets the context ID.
94    ///
95    /// This identifier is used e.g. in [`Event`]s to identify which [`Context`] an event
96    /// belongs to.  The only real limit on it is that it should not conflict with any other
97    /// [`Context`]s you currently have open.  So if you handle multiple [`Context`]s you
98    /// may want to use this.
99    ///
100    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
101    /// common case for using multiple [`Context`] instances.
102    pub fn with_id(mut self, id: u32) -> Self {
103        self.id = id;
104        self
105    }
106
107    /// Sets the event channel for this [`Context`].
108    ///
109    /// Mostly useful when using multiple [`Context`]s, this allows creating one [`Events`]
110    /// channel and passing it to all [`Context`]s so all events are received on the same
111    /// channel.
112    ///
113    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
114    /// common case for using multiple [`Context`] instances.
115    pub fn with_events(mut self, events: Events) -> Self {
116        self.events = events;
117        self
118    }
119
120    /// Sets the [`StockStrings`] map to use for this [`Context`].
121    ///
122    /// This is useful in order to share the same translation strings in all [`Context`]s.
123    /// The mapping may be empty when set, it will be populated by
124    /// [`Context::set_stock_translation`] or [`Accounts::set_stock_translation`] calls.
125    ///
126    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
127    /// common case for using multiple [`Context`] instances.
128    ///
129    /// [`Accounts::set_stock_translation`]: crate::accounts::Accounts::set_stock_translation
130    pub fn with_stock_strings(mut self, stock_strings: StockStrings) -> Self {
131        self.stock_strings = stock_strings;
132        self
133    }
134
135    /// Sets the password to unlock the database.
136    /// Deprecated 2025-11:
137    /// - Db encryption does nothing with blobs, so fs/disk encryption is recommended.
138    /// - Isolation from other apps is needed anyway.
139    ///
140    /// If an encrypted database is used it must be opened with a password.  Setting a
141    /// password on a new database will enable encryption.
142    #[deprecated(since = "TBD")]
143    pub fn with_password(mut self, password: String) -> Self {
144        self.password = Some(password);
145        self
146    }
147
148    /// Sets push subscriber.
149    pub(crate) fn with_push_subscriber(mut self, push_subscriber: PushSubscriber) -> Self {
150        self.push_subscriber = Some(push_subscriber);
151        self
152    }
153
154    /// Builds the [`Context`] without opening it.
155    pub async fn build(self) -> Result<Context> {
156        let push_subscriber = self.push_subscriber.unwrap_or_default();
157        let context = Context::new_closed(
158            &self.dbfile,
159            self.id,
160            self.events,
161            self.stock_strings,
162            push_subscriber,
163        )
164        .await?;
165        Ok(context)
166    }
167
168    /// Builds the [`Context`] and opens it.
169    ///
170    /// Returns error if context cannot be opened.
171    pub async fn open(self) -> Result<Context> {
172        let password = self.password.clone().unwrap_or_default();
173        let context = self.build().await?;
174        match context.open(password).await? {
175            true => Ok(context),
176            false => bail!("database could not be decrypted, incorrect or missing password"),
177        }
178    }
179}
180
181/// The context for a single DeltaChat account.
182///
183/// This contains all the state for a single DeltaChat account, including background tasks
184/// running in Tokio to operate the account.  The [`Context`] can be cheaply cloned.
185///
186/// Each context, and thus each account, must be associated with an directory where all the
187/// state is kept.  This state is also preserved between restarts.
188///
189/// To use multiple accounts it is best to look at the [accounts
190/// manager][crate::accounts::Accounts] which handles storing multiple accounts in a single
191/// directory structure and handles loading them all concurrently.
192#[derive(Clone, Debug)]
193pub struct Context {
194    pub(crate) inner: Arc<InnerContext>,
195}
196
197impl Deref for Context {
198    type Target = InnerContext;
199
200    fn deref(&self) -> &Self::Target {
201        &self.inner
202    }
203}
204
205/// A weak reference to a [`Context`]
206///
207/// Can be used to obtain a [`Context`]. An existing weak reference does not prevent the corresponding [`Context`] from being dropped.
208#[derive(Clone, Debug)]
209pub(crate) struct WeakContext {
210    inner: Weak<InnerContext>,
211}
212
213impl WeakContext {
214    /// Returns the [`Context`] if it is still available.
215    pub(crate) fn upgrade(&self) -> Result<Context> {
216        let inner = self
217            .inner
218            .upgrade()
219            .ok_or_else(|| anyhow::anyhow!("Inner struct has been dropped"))?;
220        Ok(Context { inner })
221    }
222}
223
224/// Actual context, expensive to clone.
225#[derive(Debug)]
226pub struct InnerContext {
227    /// Blob directory path
228    pub(crate) blobdir: PathBuf,
229    pub(crate) sql: Sql,
230    pub(crate) smeared_timestamp: SmearedTimestamp,
231    /// The global "ongoing" process state.
232    ///
233    /// This is a global mutex-like state for operations which should be modal in the
234    /// clients.
235    running_state: RwLock<RunningState>,
236    /// Mutex to enforce only a single running oauth2 is running.
237    pub(crate) oauth2_mutex: Mutex<()>,
238    /// Mutex to prevent a race condition when a "your pw is wrong" warning is sent, resulting in multiple messages being sent.
239    pub(crate) wrong_pw_warning_mutex: Mutex<()>,
240    /// Mutex to prevent running housekeeping from multiple threads at once.
241    pub(crate) housekeeping_mutex: Mutex<()>,
242
243    /// Mutex to prevent multiple IMAP loops from fetching the messages at once.
244    ///
245    /// Without this mutex IMAP loops may waste traffic downloading the same message
246    /// from multiple IMAP servers and create multiple copies of the same message
247    /// in the database if the check for duplicates and creating a message
248    /// happens in separate database transactions.
249    pub(crate) fetch_msgs_mutex: Mutex<()>,
250
251    pub(crate) translated_stockstrings: StockStrings,
252    pub(crate) events: Events,
253
254    pub(crate) scheduler: SchedulerState,
255    pub(crate) ratelimit: RwLock<Ratelimit>,
256
257    /// Recently loaded quota information for each trasnport, if any.
258    /// If quota was never tried to load, then the transport doesn't have an entry in the BTreeMap.
259    pub(crate) quota: RwLock<BTreeMap<u32, QuotaInfo>>,
260
261    /// Notify about new messages.
262    ///
263    /// This causes [`Context::wait_next_msgs`] to wake up.
264    pub(crate) new_msgs_notify: Notify,
265
266    /// Server ID response if ID capability is supported
267    /// and the server returned non-NIL on the inbox connection.
268    /// <https://datatracker.ietf.org/doc/html/rfc2971>
269    pub(crate) server_id: RwLock<Option<HashMap<String, String>>>,
270
271    /// IMAP METADATA.
272    pub(crate) metadata: RwLock<Option<ServerMetadata>>,
273
274    /// ID for this `Context` in the current process.
275    ///
276    /// This allows for multiple `Context`s open in a single process where each context can
277    /// be identified by this ID.
278    pub(crate) id: u32,
279
280    creation_time: tools::Time,
281
282    /// The text of the last error logged and emitted as an event.
283    /// If the ui wants to display an error after a failure,
284    /// `last_error` should be used to avoid races with the event thread.
285    pub(crate) last_error: parking_lot::RwLock<String>,
286
287    /// It's not possible to emit migration errors as an event,
288    /// because at the time of the migration, there is no event emitter yet.
289    /// So, this holds the error that happened during migration, if any.
290    /// This is necessary for the possibly-failible PGP migration,
291    /// which happened 2025-05, and can be removed a few releases later.
292    pub(crate) migration_error: parking_lot::RwLock<Option<String>>,
293
294    /// If debug logging is enabled, this contains all necessary information
295    ///
296    /// Standard RwLock instead of [`tokio::sync::RwLock`] is used
297    /// because the lock is used from synchronous [`Context::emit_event`].
298    pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
299
300    /// Push subscriber to store device token
301    /// and register for heartbeat notifications.
302    pub(crate) push_subscriber: PushSubscriber,
303
304    /// True if account has subscribed to push notifications via IMAP.
305    pub(crate) push_subscribed: AtomicBool,
306
307    /// TLS session resumption cache.
308    pub(crate) tls_session_store: TlsSessionStore,
309
310    /// Store for TLS SPKI hashes.
311    ///
312    /// Used to remember public keys
313    /// of TLS certificates to accept them
314    /// even after they expire.
315    pub(crate) spki_hash_store: SpkiHashStore,
316
317    /// Iroh for realtime peer channels.
318    pub(crate) iroh: Arc<RwLock<Option<Iroh>>>,
319
320    /// The own fingerprint, if it was computed already.
321    /// tokio::sync::OnceCell would be possible to use, but overkill for our usecase;
322    /// the standard library's OnceLock is enough, and it's a lot smaller in memory.
323    pub(crate) self_fingerprint: OnceLock<String>,
324
325    /// OpenPGP certificate aka Transferrable Public Key.
326    ///
327    /// It is generated on first use from the secret key stored in the database.
328    ///
329    /// Mutex is also held while generating the key to avoid generating the key twice.
330    pub(crate) self_public_key: Mutex<Option<SignedPublicKey>>,
331
332    /// `Connectivity` values for mailboxes, unordered. Used to compute the aggregate connectivity,
333    /// see [`Context::get_connectivity()`].
334    pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
335}
336
337/// The state of ongoing process.
338#[derive(Debug, Default)]
339enum RunningState {
340    /// Ongoing process is allocated.
341    Running { cancel_sender: Sender<()> },
342
343    /// Cancel signal has been sent, waiting for ongoing process to be freed.
344    ShallStop { request: tools::Time },
345
346    /// There is no ongoing process, a new one can be allocated.
347    #[default]
348    Stopped,
349}
350
351/// Return some info about deltachat-core
352///
353/// This contains information mostly about the library itself, the
354/// actual keys and their values which will be present are not
355/// guaranteed.  Calling [Context::get_info] also includes information
356/// about the context on top of the information here.
357#[expect(clippy::arithmetic_side_effects)]
358pub fn get_info() -> BTreeMap<&'static str, String> {
359    let mut res = BTreeMap::new();
360
361    #[cfg(debug_assertions)]
362    res.insert(
363        "debug_assertions",
364        "On - DO NOT RELEASE THIS BUILD".to_string(),
365    );
366    #[cfg(not(debug_assertions))]
367    res.insert("debug_assertions", "Off".to_string());
368
369    res.insert("deltachat_core_version", format!("v{DC_VERSION_STR}"));
370    res.insert("sqlite_version", rusqlite::version().to_string());
371    res.insert("arch", (std::mem::size_of::<usize>() * 8).to_string());
372    res.insert("num_cpus", num_cpus::get().to_string());
373    res.insert("level", "awesome".into());
374    res
375}
376
377impl Context {
378    /// Creates new context and opens the database.
379    pub async fn new(
380        dbfile: &Path,
381        id: u32,
382        events: Events,
383        stock_strings: StockStrings,
384    ) -> Result<Context> {
385        let context =
386            Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
387
388        // Open the database if is not encrypted.
389        if context.check_passphrase("".to_string()).await? {
390            context.sql.open(&context, "".to_string()).await?;
391        }
392        Ok(context)
393    }
394
395    /// Creates new context without opening the database.
396    pub async fn new_closed(
397        dbfile: &Path,
398        id: u32,
399        events: Events,
400        stockstrings: StockStrings,
401        push_subscriber: PushSubscriber,
402    ) -> Result<Context> {
403        let mut blob_fname = OsString::new();
404        blob_fname.push(dbfile.file_name().unwrap_or_default());
405        blob_fname.push("-blobs");
406        let blobdir = dbfile.with_file_name(blob_fname);
407        if !blobdir.exists() {
408            tokio::fs::create_dir_all(&blobdir).await?;
409        }
410        let context = Context::with_blobdir(
411            dbfile.into(),
412            blobdir,
413            id,
414            events,
415            stockstrings,
416            push_subscriber,
417        )?;
418        Ok(context)
419    }
420
421    /// Returns a weak reference to this [`Context`].
422    pub(crate) fn get_weak_context(&self) -> WeakContext {
423        WeakContext {
424            inner: Arc::downgrade(&self.inner),
425        }
426    }
427
428    /// Opens the database with the given passphrase.
429    /// NB: Db encryption is deprecated, so `passphrase` should be empty normally. See
430    /// [`ContextBuilder::with_password()`] for reasoning.
431    ///
432    /// Returns true if passphrase is correct, false is passphrase is not correct. Fails on other
433    /// errors.
434    #[deprecated(since = "TBD")]
435    pub async fn open(&self, passphrase: String) -> Result<bool> {
436        if self.sql.check_passphrase(passphrase.clone()).await? {
437            self.sql.open(self, passphrase).await?;
438            Ok(true)
439        } else {
440            Ok(false)
441        }
442    }
443
444    /// Changes encrypted database passphrase.
445    /// Deprecated 2025-11, see [`ContextBuilder::with_password()`] for reasoning.
446    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
447        self.sql.change_passphrase(passphrase).await?;
448        Ok(())
449    }
450
451    /// Returns true if database is open.
452    pub async fn is_open(&self) -> bool {
453        self.sql.is_open().await
454    }
455
456    /// Tests the database passphrase.
457    ///
458    /// Returns true if passphrase is correct.
459    ///
460    /// Fails if database is already open.
461    pub(crate) async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
462        self.sql.check_passphrase(passphrase).await
463    }
464
465    pub(crate) fn with_blobdir(
466        dbfile: PathBuf,
467        blobdir: PathBuf,
468        id: u32,
469        events: Events,
470        stockstrings: StockStrings,
471        push_subscriber: PushSubscriber,
472    ) -> Result<Context> {
473        ensure!(
474            blobdir.is_dir(),
475            "Blobdir does not exist: {}",
476            blobdir.display()
477        );
478
479        let new_msgs_notify = Notify::new();
480        // Notify once immediately to allow processing old messages
481        // without starting I/O.
482        new_msgs_notify.notify_one();
483
484        let inner = InnerContext {
485            id,
486            blobdir,
487            running_state: RwLock::new(Default::default()),
488            sql: Sql::new(dbfile),
489            smeared_timestamp: SmearedTimestamp::new(),
490            oauth2_mutex: Mutex::new(()),
491            wrong_pw_warning_mutex: Mutex::new(()),
492            housekeeping_mutex: Mutex::new(()),
493            fetch_msgs_mutex: Mutex::new(()),
494            translated_stockstrings: stockstrings,
495            events,
496            scheduler: SchedulerState::new(),
497            ratelimit: RwLock::new(Ratelimit::new(Duration::new(3, 0), 3.0)), // Allow at least 1 message every second + a burst of 3.
498            quota: RwLock::new(BTreeMap::new()),
499            new_msgs_notify,
500            server_id: RwLock::new(None),
501            metadata: RwLock::new(None),
502            creation_time: tools::Time::now(),
503            last_error: parking_lot::RwLock::new("".to_string()),
504            migration_error: parking_lot::RwLock::new(None),
505            debug_logging: std::sync::RwLock::new(None),
506            push_subscriber,
507            push_subscribed: AtomicBool::new(false),
508            tls_session_store: TlsSessionStore::new(),
509            spki_hash_store: SpkiHashStore::new(),
510            iroh: Arc::new(RwLock::new(None)),
511            self_fingerprint: OnceLock::new(),
512            self_public_key: Mutex::new(None),
513            connectivities: parking_lot::Mutex::new(Vec::new()),
514        };
515
516        let ctx = Context {
517            inner: Arc::new(inner),
518        };
519
520        Ok(ctx)
521    }
522
523    /// Starts the IO scheduler.
524    pub async fn start_io(&self) {
525        if !self.is_configured().await.unwrap_or_default() {
526            warn!(self, "can not start io on a context that is not configured");
527            return;
528        }
529
530        // The next line is mainly for iOS:
531        // iOS starts a separate process for receiving notifications and if the user concurrently
532        // starts the app, the UI process opens the database but waits with calling start_io()
533        // until the notifications process finishes.
534        // Now, some configs may have changed, so, we need to invalidate the cache.
535        self.sql.config_cache.write().await.clear();
536
537        self.scheduler.start(self).await;
538    }
539
540    /// Stops the IO scheduler.
541    pub async fn stop_io(&self) {
542        self.scheduler.stop(self).await;
543        if let Some(iroh) = self.iroh.write().await.take() {
544            // Close all QUIC connections.
545
546            // Spawn into a separate task,
547            // because Iroh calls `wait_idle()` internally
548            // and it may take time, especially if the network
549            // has become unavailable.
550            tokio::spawn(async move {
551                // We do not log the error because we do not want the task
552                // to hold the reference to Context.
553                let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
554            });
555        }
556    }
557
558    /// Restarts the IO scheduler if it was running before
559    /// when it is not running this is an no-op
560    pub async fn restart_io_if_running(&self) {
561        self.scheduler.restart(self).await;
562    }
563
564    /// Indicate that the network likely has come back.
565    pub async fn maybe_network(&self) {
566        if let Some(ref iroh) = *self.iroh.read().await {
567            iroh.network_change().await;
568        }
569        self.scheduler.maybe_network().await;
570    }
571
572    /// Returns true if an account is on a chatmail server.
573    pub async fn is_chatmail(&self) -> Result<bool> {
574        self.get_config_bool(Config::IsChatmail).await
575    }
576
577    /// Returns maximum number of recipients the provider allows to send a single email to.
578    pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
579        let is_chatmail = self.is_chatmail().await?;
580        let val = self
581            .get_configured_provider()
582            .await?
583            .and_then(|provider| provider.opt.max_smtp_rcpt_to)
584            .map_or_else(
585                || match is_chatmail {
586                    true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
587                    false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
588                },
589                usize::from,
590            );
591        Ok(val)
592    }
593
594    /// Does a single round of fetching from IMAP and returns.
595    ///
596    /// Can be used even if I/O is currently stopped.
597    /// If I/O is currently stopped, starts a new IMAP connection
598    /// and fetches from Inbox and DeltaChat folders.
599    pub async fn background_fetch(&self) -> Result<()> {
600        if !(self.is_configured().await?) {
601            return Ok(());
602        }
603
604        let address = self.get_primary_self_addr().await?;
605        let time_start = tools::Time::now();
606        info!(self, "background_fetch started fetching {address}.");
607
608        if self.scheduler.is_running().await {
609            self.scheduler.maybe_network().await;
610            self.wait_for_all_work_done().await;
611        } else {
612            // Pause the scheduler to ensure another connection does not start
613            // while we are fetching on a dedicated connection.
614            let _pause_guard = self.scheduler.pause(self).await?;
615
616            // Start a new dedicated connection.
617            let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
618            let mut session = connection.prepare(self).await?;
619
620            // Fetch IMAP folders.
621            let folder = connection.folder.clone();
622            connection
623                .fetch_move_delete(self, &mut session, &folder)
624                .await?;
625
626            // Update quota (to send warning if full) - but only check it once in a while.
627            // note: For now this only checks quota of primary transport,
628            // because background check only checks primary transport at the moment
629            if self
630                .quota_needs_update(
631                    session.transport_id(),
632                    DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT,
633                )
634                .await
635                && let Err(err) = self.update_recent_quota(&mut session, &folder).await
636            {
637                warn!(self, "Failed to update quota: {err:#}.");
638            }
639        }
640
641        info!(
642            self,
643            "background_fetch done for {address} took {:?}.",
644            time_elapsed(&time_start),
645        );
646
647        Ok(())
648    }
649
650    /// Returns a reference to the underlying SQL instance.
651    ///
652    /// Warning: this is only here for testing, not part of the public API.
653    #[cfg(feature = "internals")]
654    pub fn sql(&self) -> &Sql {
655        &self.inner.sql
656    }
657
658    /// Returns database file path.
659    pub fn get_dbfile(&self) -> &Path {
660        self.sql.dbfile.as_path()
661    }
662
663    /// Returns blob directory path.
664    pub fn get_blobdir(&self) -> &Path {
665        self.blobdir.as_path()
666    }
667
668    /// Emits a single event.
669    pub fn emit_event(&self, event: EventType) {
670        {
671            let lock = self.debug_logging.read().expect("RwLock is poisoned");
672            if let Some(debug_logging) = &*lock {
673                debug_logging.log_event(event.clone());
674            }
675        }
676        self.events.emit(Event {
677            id: self.id,
678            typ: event,
679        });
680    }
681
682    /// Emits a generic MsgsChanged event (without chat or message id)
683    pub fn emit_msgs_changed_without_ids(&self) {
684        self.emit_event(EventType::MsgsChanged {
685            chat_id: ChatId::new(0),
686            msg_id: MsgId::new(0),
687        });
688    }
689
690    /// Emits a MsgsChanged event with specified chat and message ids
691    ///
692    /// If IDs are unset, [`Self::emit_msgs_changed_without_ids`]
693    /// or [`Self::emit_msgs_changed_without_msg_id`] should be used
694    /// instead of this function.
695    pub fn emit_msgs_changed(&self, chat_id: ChatId, msg_id: MsgId) {
696        logged_debug_assert!(
697            self,
698            !chat_id.is_unset(),
699            "emit_msgs_changed: chat_id is unset."
700        );
701        logged_debug_assert!(
702            self,
703            !msg_id.is_unset(),
704            "emit_msgs_changed: msg_id is unset."
705        );
706
707        self.emit_event(EventType::MsgsChanged { chat_id, msg_id });
708        chatlist_events::emit_chatlist_changed(self);
709        chatlist_events::emit_chatlist_item_changed(self, chat_id);
710    }
711
712    /// Emits a MsgsChanged event with specified chat and without message id.
713    pub fn emit_msgs_changed_without_msg_id(&self, chat_id: ChatId) {
714        logged_debug_assert!(
715            self,
716            !chat_id.is_unset(),
717            "emit_msgs_changed_without_msg_id: chat_id is unset."
718        );
719
720        self.emit_event(EventType::MsgsChanged {
721            chat_id,
722            msg_id: MsgId::new(0),
723        });
724        chatlist_events::emit_chatlist_changed(self);
725        chatlist_events::emit_chatlist_item_changed(self, chat_id);
726    }
727
728    /// Emits an IncomingMsg event with specified chat and message ids
729    pub fn emit_incoming_msg(&self, chat_id: ChatId, msg_id: MsgId) {
730        debug_assert!(!chat_id.is_unset());
731        debug_assert!(!msg_id.is_unset());
732
733        self.emit_event(EventType::IncomingMsg { chat_id, msg_id });
734        chatlist_events::emit_chatlist_changed(self);
735        chatlist_events::emit_chatlist_item_changed(self, chat_id);
736    }
737
738    /// Emits an LocationChanged event and a WebxdcStatusUpdate in case there is a maps integration
739    pub async fn emit_location_changed(&self, contact_id: Option<ContactId>) -> Result<()> {
740        self.emit_event(EventType::LocationChanged(contact_id));
741
742        if let Some(msg_id) = self
743            .get_config_parsed::<u32>(Config::WebxdcIntegration)
744            .await?
745        {
746            self.emit_event(EventType::WebxdcStatusUpdate {
747                msg_id: MsgId::new(msg_id),
748                status_update_serial: Default::default(),
749            })
750        }
751
752        Ok(())
753    }
754
755    /// Returns a receiver for emitted events.
756    ///
757    /// Multiple emitters can be created, but note that in this case each emitted event will
758    /// only be received by one of the emitters, not by all of them.
759    pub fn get_event_emitter(&self) -> EventEmitter {
760        self.events.get_emitter()
761    }
762
763    /// Get the ID of this context.
764    pub fn get_id(&self) -> u32 {
765        self.id
766    }
767
768    // Ongoing process allocation/free/check
769
770    /// Tries to acquire the global UI "ongoing" mutex.
771    ///
772    /// This is for modal operations during which no other user actions are allowed.  Only
773    /// one such operation is allowed at any given time.
774    ///
775    /// The return value is a cancel token, which will release the ongoing mutex when
776    /// dropped.
777    pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
778        let mut s = self.running_state.write().await;
779        ensure!(
780            matches!(*s, RunningState::Stopped),
781            "There is already another ongoing process running."
782        );
783
784        let (sender, receiver) = channel::bounded(1);
785        *s = RunningState::Running {
786            cancel_sender: sender,
787        };
788
789        Ok(receiver)
790    }
791
792    pub(crate) async fn free_ongoing(&self) {
793        let mut s = self.running_state.write().await;
794        if let RunningState::ShallStop { request } = *s {
795            info!(self, "Ongoing stopped in {:?}", time_elapsed(&request));
796        }
797        *s = RunningState::Stopped;
798    }
799
800    /// Signal an ongoing process to stop.
801    pub async fn stop_ongoing(&self) {
802        let mut s = self.running_state.write().await;
803        match &*s {
804            RunningState::Running { cancel_sender } => {
805                if let Err(err) = cancel_sender.send(()).await {
806                    warn!(self, "could not cancel ongoing: {:#}", err);
807                }
808                info!(self, "Signaling the ongoing process to stop ASAP.",);
809                *s = RunningState::ShallStop {
810                    request: tools::Time::now(),
811                };
812            }
813            RunningState::ShallStop { .. } | RunningState::Stopped => {
814                info!(self, "No ongoing process to stop.",);
815            }
816        }
817    }
818
819    #[allow(unused)]
820    pub(crate) async fn shall_stop_ongoing(&self) -> bool {
821        match &*self.running_state.read().await {
822            RunningState::Running { .. } => false,
823            RunningState::ShallStop { .. } | RunningState::Stopped => true,
824        }
825    }
826
827    /*******************************************************************************
828     * UI chat/message related API
829     ******************************************************************************/
830
831    /// Returns information about the context as key-value pairs.
832    pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
833        let all_transports: Vec<String> = ConfiguredLoginParam::load_all(self)
834            .await?
835            .into_iter()
836            .map(|(transport_id, param)| format!("{transport_id}: {param}"))
837            .collect();
838        let all_transports = if all_transports.is_empty() {
839            "Not configured".to_string()
840        } else {
841            all_transports.join(",")
842        };
843        let chats = get_chat_cnt(self).await?;
844        let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;
845        let request_msgs = message::get_request_msg_cnt(self).await;
846        let contacts = Contact::get_real_cnt(self).await?;
847        let proxy_enabled = self.get_config_int(Config::ProxyEnabled).await?;
848        let dbversion = self
849            .sql
850            .get_raw_config_int("dbversion")
851            .await?
852            .unwrap_or_default();
853        let journal_mode = self
854            .sql
855            .query_get_value("PRAGMA journal_mode;", ())
856            .await?
857            .unwrap_or_else(|| "unknown".to_string());
858        let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
859        let bcc_self = self.get_config_int(Config::BccSelf).await?;
860        let sync_msgs = self.get_config_int(Config::SyncMsgs).await?;
861        let disable_idle = self.get_config_bool(Config::DisableIdle).await?;
862
863        let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;", ()).await?;
864
865        let pub_key_cnt = self
866            .sql
867            .count("SELECT COUNT(*) FROM public_keys;", ())
868            .await?;
869
870        let mut res = get_info();
871
872        // insert values
873        res.insert("bot", self.get_config_int(Config::Bot).await?.to_string());
874        res.insert("number_of_chats", chats.to_string());
875        res.insert("number_of_chat_messages", unblocked_msgs.to_string());
876        res.insert("messages_in_contact_requests", request_msgs.to_string());
877        res.insert("number_of_contacts", contacts.to_string());
878        res.insert("database_dir", self.get_dbfile().display().to_string());
879        res.insert("database_version", dbversion.to_string());
880        res.insert(
881            "database_encrypted",
882            self.sql
883                .is_encrypted()
884                .await
885                .map_or_else(|| "closed".to_string(), |b| b.to_string()),
886        );
887        res.insert("journal_mode", journal_mode);
888        res.insert("blobdir", self.get_blobdir().display().to_string());
889        res.insert(
890            "selfavatar",
891            self.get_config(Config::Selfavatar)
892                .await?
893                .unwrap_or_else(|| "<unset>".to_string()),
894        );
895        res.insert("proxy_enabled", proxy_enabled.to_string());
896        res.insert("used_transport_settings", all_transports);
897
898        if let Some(server_id) = &*self.server_id.read().await {
899            res.insert("imap_server_id", format!("{server_id:?}"));
900        }
901
902        res.insert("is_chatmail", self.is_chatmail().await?.to_string());
903        res.insert(
904            "fix_is_chatmail",
905            self.get_config_bool(Config::FixIsChatmail)
906                .await?
907                .to_string(),
908        );
909        res.insert(
910            "is_muted",
911            self.get_config_bool(Config::IsMuted).await?.to_string(),
912        );
913        res.insert(
914            "private_tag",
915            self.get_config(Config::PrivateTag)
916                .await?
917                .unwrap_or_else(|| "<unset>".to_string()),
918        );
919
920        if let Some(metadata) = &*self.metadata.read().await {
921            if let Some(comment) = &metadata.comment {
922                res.insert("imap_server_comment", format!("{comment:?}"));
923            }
924
925            if let Some(admin) = &metadata.admin {
926                res.insert("imap_server_admin", format!("{admin:?}"));
927            }
928        }
929
930        res.insert(
931            "who_can_call_me",
932            self.get_config_int(Config::WhoCanCallMe).await?.to_string(),
933        );
934        res.insert(
935            "download_limit",
936            self.get_config_int(Config::DownloadLimit)
937                .await?
938                .to_string(),
939        );
940        res.insert("mdns_enabled", mdns_enabled.to_string());
941        res.insert("bcc_self", bcc_self.to_string());
942        res.insert("sync_msgs", sync_msgs.to_string());
943        res.insert("disable_idle", disable_idle.to_string());
944        res.insert("private_key_count", prv_key_cnt.to_string());
945        res.insert("public_key_count", pub_key_cnt.to_string());
946        res.insert(
947            "media_quality",
948            self.get_config_int(Config::MediaQuality).await?.to_string(),
949        );
950        res.insert(
951            "delete_device_after",
952            self.get_config_int(Config::DeleteDeviceAfter)
953                .await?
954                .to_string(),
955        );
956        res.insert(
957            "last_housekeeping",
958            self.get_config_int(Config::LastHousekeeping)
959                .await?
960                .to_string(),
961        );
962        res.insert(
963            "last_cant_decrypt_outgoing_msgs",
964            self.get_config_int(Config::LastCantDecryptOutgoingMsgs)
965                .await?
966                .to_string(),
967        );
968        res.insert(
969            "debug_logging",
970            self.get_config_int(Config::DebugLogging).await?.to_string(),
971        );
972        res.insert(
973            "last_msg_id",
974            self.get_config_int(Config::LastMsgId).await?.to_string(),
975        );
976        res.insert(
977            "gossip_period",
978            self.get_config_int(Config::GossipPeriod).await?.to_string(),
979        );
980        res.insert(
981            "webxdc_realtime_enabled",
982            self.get_config_bool(Config::WebxdcRealtimeEnabled)
983                .await?
984                .to_string(),
985        );
986        res.insert(
987            "donation_request_next_check",
988            self.get_config_i64(Config::DonationRequestNextCheck)
989                .await?
990                .to_string(),
991        );
992        res.insert(
993            "first_key_contacts_msg_id",
994            self.sql
995                .get_raw_config("first_key_contacts_msg_id")
996                .await?
997                .unwrap_or_default(),
998        );
999        res.insert(
1000            "stats_id",
1001            self.get_config(Config::StatsId)
1002                .await?
1003                .unwrap_or_else(|| "<unset>".to_string()),
1004        );
1005        res.insert(
1006            "stats_sending",
1007            stats::should_send_stats(self).await?.to_string(),
1008        );
1009        res.insert(
1010            "stats_last_sent",
1011            self.get_config_i64(Config::StatsLastSent)
1012                .await?
1013                .to_string(),
1014        );
1015        res.insert(
1016            "test_hooks",
1017            self.sql
1018                .get_raw_config("test_hooks")
1019                .await?
1020                .unwrap_or_default(),
1021        );
1022        res.insert(
1023            "std_header_protection_composing",
1024            self.sql
1025                .get_raw_config("std_header_protection_composing")
1026                .await?
1027                .unwrap_or_default(),
1028        );
1029        res.insert(
1030            "team_profile",
1031            self.get_config_bool(Config::TeamProfile).await?.to_string(),
1032        );
1033        res.insert(
1034            "force_encryption",
1035            self.get_config_bool(Config::ForceEncryption)
1036                .await?
1037                .to_string(),
1038        );
1039
1040        let elapsed = time_elapsed(&self.creation_time);
1041        res.insert("uptime", duration_to_str(elapsed));
1042
1043        Ok(res)
1044    }
1045
1046    /// Get a list of fresh, unmuted messages in unblocked chats.
1047    ///
1048    /// The list starts with the most recent message
1049    /// and is typically used to show notifications.
1050    /// Moreover, the number of returned messages
1051    /// can be used for a badge counter on the app icon.
1052    pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
1053        let list = self
1054            .sql
1055            .query_map_vec(
1056                "SELECT m.id
1057FROM msgs m
1058LEFT JOIN contacts ct
1059    ON m.from_id=ct.id
1060LEFT JOIN chats c
1061    ON m.chat_id=c.id
1062WHERE m.state=?
1063AND m.hidden=0
1064AND m.chat_id>9
1065AND ct.blocked=0
1066AND c.blocked=0
1067AND NOT(c.muted_until=-1 OR c.muted_until>?)
1068ORDER BY m.timestamp DESC,m.id DESC",
1069                (MessageState::InFresh, time()),
1070                |row| {
1071                    let msg_id: MsgId = row.get(0)?;
1072                    Ok(msg_id)
1073                },
1074            )
1075            .await?;
1076        Ok(list)
1077    }
1078
1079    /// (deprecated) Returns a list of messages with database ID higher than requested.
1080    ///
1081    /// Blocked contacts and chats are excluded,
1082    /// but self-sent messages and contact requests are included in the results.
1083    ///
1084    /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives,
1085    /// even if it is not fully downloaded yet.
1086    /// The bot needs to wait for the message to be fully downloaded.
1087    /// Since this is usually not the desired behavior,
1088    /// bots should instead use the [`EventType::IncomingMsg`]
1089    /// event for getting notified about new messages.
1090    pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
1091        let last_msg_id = match self.get_config(Config::LastMsgId).await? {
1092            Some(s) => MsgId::new(s.parse()?),
1093            None => {
1094                // If `last_msg_id` is not set yet,
1095                // subtract 1 from the last id,
1096                // so a single message is returned and can
1097                // be marked as seen.
1098                self.sql
1099                    .query_row(
1100                        "SELECT IFNULL((SELECT MAX(id) - 1 FROM msgs), 0)",
1101                        (),
1102                        |row| {
1103                            let msg_id: MsgId = row.get(0)?;
1104                            Ok(msg_id)
1105                        },
1106                    )
1107                    .await?
1108            }
1109        };
1110
1111        let list = self
1112            .sql
1113            .query_map_vec(
1114                "SELECT m.id
1115                     FROM msgs m
1116                     LEFT JOIN contacts ct
1117                            ON m.from_id=ct.id
1118                     LEFT JOIN chats c
1119                            ON m.chat_id=c.id
1120                     WHERE m.id>?
1121                       AND m.hidden=0
1122                       AND m.chat_id>9
1123                       AND ct.blocked=0
1124                       AND c.blocked!=1
1125                     ORDER BY m.id ASC",
1126                (
1127                    last_msg_id.to_u32(), // Explicitly convert to u32 because 0 is allowed.
1128                ),
1129                |row| {
1130                    let msg_id: MsgId = row.get(0)?;
1131                    Ok(msg_id)
1132                },
1133            )
1134            .await?;
1135        Ok(list)
1136    }
1137
1138    /// (deprecated) Returns a list of messages with database ID higher than last marked as seen.
1139    ///
1140    /// This function is supposed to be used by bot to request messages
1141    /// that are not processed yet.
1142    ///
1143    /// Waits for notification and returns a result.
1144    /// Note that the result may be empty if the message is deleted
1145    /// shortly after notification or notification is manually triggered
1146    /// to interrupt waiting.
1147    /// Notification may be manually triggered by calling [`Self::stop_io`].
1148    ///
1149    /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives,
1150    /// even if it is not fully downloaded yet.
1151    /// The bot needs to wait for the message to be fully downloaded.
1152    /// Since this is usually not the desired behavior,
1153    /// bots should instead use the #DC_EVENT_INCOMING_MSG / [`EventType::IncomingMsg`]
1154    /// event for getting notified about new messages.
1155    pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
1156        self.new_msgs_notify.notified().await;
1157        let list = self.get_next_msgs().await?;
1158        Ok(list)
1159    }
1160
1161    /// Searches for messages containing the query string case-insensitively.
1162    ///
1163    /// If `chat_id` is provided this searches only for messages in this chat, if `chat_id`
1164    /// is `None` this searches messages from all chats.
1165    ///
1166    /// NB: Wrt the search in long messages which are shown truncated with the "Show Full Message…"
1167    /// button, we only look at the first several kilobytes. Let's not fix this -- one can send a
1168    /// dictionary in the message that matches any reasonable search request, but the user won't see
1169    /// the match because they should tap on "Show Full Message…" for that. Probably such messages
1170    /// would only clutter search results.
1171    pub async fn search_msgs(&self, chat_id: Option<ChatId>, query: &str) -> Result<Vec<MsgId>> {
1172        let real_query = query.trim().to_lowercase();
1173        if real_query.is_empty() {
1174            return Ok(Vec::new());
1175        }
1176        let str_like_in_text = format!("%{real_query}%");
1177
1178        let list = if let Some(chat_id) = chat_id {
1179            self.sql
1180                .query_map_vec(
1181                    "SELECT m.id AS id
1182                 FROM msgs m
1183                 LEFT JOIN contacts ct
1184                        ON m.from_id=ct.id
1185                 WHERE m.chat_id=?
1186                   AND m.hidden=0
1187                   AND ct.blocked=0
1188                   AND IFNULL(txt_normalized, txt) LIKE ?
1189                 ORDER BY m.timestamp,m.id;",
1190                    (chat_id, str_like_in_text),
1191                    |row| {
1192                        let msg_id: MsgId = row.get("id")?;
1193                        Ok(msg_id)
1194                    },
1195                )
1196                .await?
1197        } else {
1198            // For performance reasons results are sorted only by `id`, that is in the order of
1199            // message reception.
1200            //
1201            // Unlike chat view, sorting by `timestamp` is not necessary but slows down the query by
1202            // ~25% according to benchmarks.
1203            //
1204            // To speed up incremental search, where queries for few characters usually return lots
1205            // of unwanted results that are discarded moments later, we added `LIMIT 1000`.
1206            // According to some tests, this limit speeds up eg. 2 character searches by factor 10.
1207            // The limit is documented and UI may add a hint when getting 1000 results.
1208            self.sql
1209                .query_map_vec(
1210                    "SELECT m.id AS id
1211                 FROM msgs m
1212                 LEFT JOIN contacts ct
1213                        ON m.from_id=ct.id
1214                 LEFT JOIN chats c
1215                        ON m.chat_id=c.id
1216                 WHERE m.chat_id>9
1217                   AND m.hidden=0
1218                   AND c.blocked!=1
1219                   AND ct.blocked=0
1220                   AND IFNULL(txt_normalized, txt) LIKE ?
1221                 ORDER BY m.id DESC LIMIT 1000",
1222                    (str_like_in_text,),
1223                    |row| {
1224                        let msg_id: MsgId = row.get("id")?;
1225                        Ok(msg_id)
1226                    },
1227                )
1228                .await?
1229        };
1230
1231        Ok(list)
1232    }
1233
1234    pub(crate) fn derive_blobdir(dbfile: &Path) -> PathBuf {
1235        let mut blob_fname = OsString::new();
1236        blob_fname.push(dbfile.file_name().unwrap_or_default());
1237        blob_fname.push("-blobs");
1238        dbfile.with_file_name(blob_fname)
1239    }
1240
1241    pub(crate) fn derive_walfile(dbfile: &Path) -> PathBuf {
1242        let mut wal_fname = OsString::new();
1243        wal_fname.push(dbfile.file_name().unwrap_or_default());
1244        wal_fname.push("-wal");
1245        dbfile.with_file_name(wal_fname)
1246    }
1247}
1248
1249#[cfg(test)]
1250mod context_tests;