Skip to main content

deltachat/
context.rs

1//! Context module.
2
3use std::collections::{BTreeMap, HashMap};
4use std::ffi::OsString;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicBool;
8use std::sync::{Arc, OnceLock, Weak};
9use std::time::Duration;
10
11use anyhow::{Result, bail, ensure};
12use async_channel::{self as channel, Receiver, Sender};
13use pgp::composed::SignedPublicKey;
14use ratelimit::Ratelimit;
15use tokio::sync::{Mutex, Notify, RwLock};
16
17use crate::chat::{ChatId, get_chat_cnt};
18use crate::config::Config;
19use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
20use crate::contact::{Contact, ContactId};
21use crate::debug_logging::DebugLogging;
22use crate::events::{Event, EventEmitter, EventType, Events};
23use crate::imap::{Imap, ServerMetadata};
24use crate::log::warn;
25use crate::logged_debug_assert;
26use crate::message::{self, MessageState, MsgId};
27use crate::net::tls::{SpkiHashStore, TlsSessionStore};
28use crate::peer_channels::Iroh;
29use crate::push::PushSubscriber;
30use crate::quota::QuotaInfo;
31use crate::scheduler::{ConnectivityStore, SchedulerState};
32use crate::sql::Sql;
33use crate::stock_str::StockStrings;
34use crate::timesmearing::SmearedTimestamp;
35use crate::tools::{self, duration_to_str, time, time_elapsed};
36use crate::transport::ConfiguredLoginParam;
37use crate::{chatlist_events, stats};
38
39pub use crate::scheduler::connectivity::Connectivity;
40
41/// Builder for the [`Context`].
42///
43/// Many arguments to the [`Context`] are kind of optional and only needed to handle
44/// multiple contexts, for which the [account manager](crate::accounts::Accounts) should be
45/// used.  This builder makes creating a new context simpler, especially for the
46/// standalone-context case.
47///
48/// # Examples
49///
50/// Creating a new database:
51///
52/// ```
53/// # let rt = tokio::runtime::Runtime::new().unwrap();
54/// # rt.block_on(async move {
55/// use deltachat::context::ContextBuilder;
56///
57/// let dir = tempfile::tempdir().unwrap();
58/// let context = ContextBuilder::new(dir.path().join("db"))
59///      .open()
60///      .await
61///      .unwrap();
62/// drop(context);
63/// # });
64/// ```
65#[derive(Clone, Debug)]
66pub struct ContextBuilder {
67    dbfile: PathBuf,
68    id: u32,
69    events: Events,
70    stock_strings: StockStrings,
71    password: Option<String>,
72
73    push_subscriber: Option<PushSubscriber>,
74}
75
76impl ContextBuilder {
77    /// Create the builder using the given database file.
78    ///
79    /// The *dbfile* should be in a dedicated directory and this directory must exist.  The
80    /// [`Context`] will create other files and folders in the same directory as the
81    /// database file used.
82    pub fn new(dbfile: PathBuf) -> Self {
83        ContextBuilder {
84            dbfile,
85            id: rand::random(),
86            events: Events::new(),
87            stock_strings: StockStrings::new(),
88            password: None,
89            push_subscriber: None,
90        }
91    }
92
93    /// Sets the context ID.
94    ///
95    /// This identifier is used e.g. in [`Event`]s to identify which [`Context`] an event
96    /// belongs to.  The only real limit on it is that it should not conflict with any other
97    /// [`Context`]s you currently have open.  So if you handle multiple [`Context`]s you
98    /// may want to use this.
99    ///
100    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
101    /// common case for using multiple [`Context`] instances.
102    pub fn with_id(mut self, id: u32) -> Self {
103        self.id = id;
104        self
105    }
106
107    /// Sets the event channel for this [`Context`].
108    ///
109    /// Mostly useful when using multiple [`Context`]s, this allows creating one [`Events`]
110    /// channel and passing it to all [`Context`]s so all events are received on the same
111    /// channel.
112    ///
113    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
114    /// common case for using multiple [`Context`] instances.
115    pub fn with_events(mut self, events: Events) -> Self {
116        self.events = events;
117        self
118    }
119
120    /// Sets the [`StockStrings`] map to use for this [`Context`].
121    ///
122    /// This is useful in order to share the same translation strings in all [`Context`]s.
123    /// The mapping may be empty when set, it will be populated by
124    /// [`Context::set_stock_translation`] or [`Accounts::set_stock_translation`] calls.
125    ///
126    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
127    /// common case for using multiple [`Context`] instances.
128    ///
129    /// [`Accounts::set_stock_translation`]: crate::accounts::Accounts::set_stock_translation
130    pub fn with_stock_strings(mut self, stock_strings: StockStrings) -> Self {
131        self.stock_strings = stock_strings;
132        self
133    }
134
135    /// Sets the password to unlock the database.
136    /// Deprecated 2025-11:
137    /// - Db encryption does nothing with blobs, so fs/disk encryption is recommended.
138    /// - Isolation from other apps is needed anyway.
139    ///
140    /// If an encrypted database is used it must be opened with a password.  Setting a
141    /// password on a new database will enable encryption.
142    #[deprecated(since = "TBD")]
143    pub fn with_password(mut self, password: String) -> Self {
144        self.password = Some(password);
145        self
146    }
147
148    /// Sets push subscriber.
149    pub(crate) fn with_push_subscriber(mut self, push_subscriber: PushSubscriber) -> Self {
150        self.push_subscriber = Some(push_subscriber);
151        self
152    }
153
154    /// Builds the [`Context`] without opening it.
155    pub async fn build(self) -> Result<Context> {
156        let push_subscriber = self.push_subscriber.unwrap_or_default();
157        let context = Context::new_closed(
158            &self.dbfile,
159            self.id,
160            self.events,
161            self.stock_strings,
162            push_subscriber,
163        )
164        .await?;
165        Ok(context)
166    }
167
168    /// Builds the [`Context`] and opens it.
169    ///
170    /// Returns error if context cannot be opened.
171    pub async fn open(self) -> Result<Context> {
172        let password = self.password.clone().unwrap_or_default();
173        let context = self.build().await?;
174        match context.open(password).await? {
175            true => Ok(context),
176            false => bail!("database could not be decrypted, incorrect or missing password"),
177        }
178    }
179}
180
181/// The context for a single DeltaChat account.
182///
183/// This contains all the state for a single DeltaChat account, including background tasks
184/// running in Tokio to operate the account.  The [`Context`] can be cheaply cloned.
185///
186/// Each context, and thus each account, must be associated with an directory where all the
187/// state is kept.  This state is also preserved between restarts.
188///
189/// To use multiple accounts it is best to look at the [accounts
190/// manager][crate::accounts::Accounts] which handles storing multiple accounts in a single
191/// directory structure and handles loading them all concurrently.
192#[derive(Clone, Debug)]
193pub struct Context {
194    pub(crate) inner: Arc<InnerContext>,
195}
196
197impl Deref for Context {
198    type Target = InnerContext;
199
200    fn deref(&self) -> &Self::Target {
201        &self.inner
202    }
203}
204
205/// A weak reference to a [`Context`]
206///
207/// Can be used to obtain a [`Context`]. An existing weak reference does not prevent the corresponding [`Context`] from being dropped.
208#[derive(Clone, Debug)]
209pub(crate) struct WeakContext {
210    inner: Weak<InnerContext>,
211}
212
213impl WeakContext {
214    /// Returns the [`Context`] if it is still available.
215    pub(crate) fn upgrade(&self) -> Result<Context> {
216        let inner = self
217            .inner
218            .upgrade()
219            .ok_or_else(|| anyhow::anyhow!("Inner struct has been dropped"))?;
220        Ok(Context { inner })
221    }
222}
223
224/// Actual context, expensive to clone.
225#[derive(Debug)]
226pub struct InnerContext {
227    /// Blob directory path
228    pub(crate) blobdir: PathBuf,
229    pub(crate) sql: Sql,
230    pub(crate) smeared_timestamp: SmearedTimestamp,
231    /// The global "ongoing" process state.
232    ///
233    /// This is a global mutex-like state for operations which should be modal in the
234    /// clients.
235    running_state: RwLock<RunningState>,
236    /// Mutex to enforce only a single running oauth2 is running.
237    pub(crate) oauth2_mutex: Mutex<()>,
238    /// Mutex to prevent a race condition when a "your pw is wrong" warning is sent, resulting in multiple messages being sent.
239    pub(crate) wrong_pw_warning_mutex: Mutex<()>,
240    /// Mutex to prevent running housekeeping from multiple threads at once.
241    pub(crate) housekeeping_mutex: Mutex<()>,
242
243    /// Mutex to prevent multiple IMAP loops from fetching the messages at once.
244    ///
245    /// Without this mutex IMAP loops may waste traffic downloading the same message
246    /// from multiple IMAP servers and create multiple copies of the same message
247    /// in the database if the check for duplicates and creating a message
248    /// happens in separate database transactions.
249    pub(crate) fetch_msgs_mutex: Mutex<()>,
250
251    pub(crate) translated_stockstrings: StockStrings,
252    pub(crate) events: Events,
253
254    pub(crate) scheduler: SchedulerState,
255    pub(crate) ratelimit: RwLock<Ratelimit>,
256
257    /// Recently loaded quota information for each trasnport, if any.
258    /// If quota was never tried to load, then the transport doesn't have an entry in the BTreeMap.
259    pub(crate) quota: RwLock<BTreeMap<u32, QuotaInfo>>,
260
261    /// Notify about new messages.
262    ///
263    /// This causes [`Context::wait_next_msgs`] to wake up.
264    pub(crate) new_msgs_notify: Notify,
265
266    /// Server ID response if ID capability is supported
267    /// and the server returned non-NIL on the inbox connection.
268    /// <https://datatracker.ietf.org/doc/html/rfc2971>
269    pub(crate) server_id: RwLock<Option<HashMap<String, String>>>,
270
271    /// IMAP METADATA.
272    pub(crate) metadata: RwLock<Option<ServerMetadata>>,
273
274    /// ID for this `Context` in the current process.
275    ///
276    /// This allows for multiple `Context`s open in a single process where each context can
277    /// be identified by this ID.
278    pub(crate) id: u32,
279
280    creation_time: tools::Time,
281
282    /// The text of the last error logged and emitted as an event.
283    /// If the ui wants to display an error after a failure,
284    /// `last_error` should be used to avoid races with the event thread.
285    pub(crate) last_error: parking_lot::RwLock<String>,
286
287    /// It's not possible to emit migration errors as an event,
288    /// because at the time of the migration, there is no event emitter yet.
289    /// So, this holds the error that happened during migration, if any.
290    /// This is necessary for the possibly-failible PGP migration,
291    /// which happened 2025-05, and can be removed a few releases later.
292    pub(crate) migration_error: parking_lot::RwLock<Option<String>>,
293
294    /// If debug logging is enabled, this contains all necessary information
295    ///
296    /// Standard RwLock instead of [`tokio::sync::RwLock`] is used
297    /// because the lock is used from synchronous [`Context::emit_event`].
298    pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
299
300    /// Push subscriber to store device token
301    /// and register for heartbeat notifications.
302    pub(crate) push_subscriber: PushSubscriber,
303
304    /// True if account has subscribed to push notifications via IMAP.
305    pub(crate) push_subscribed: AtomicBool,
306
307    /// TLS session resumption cache.
308    pub(crate) tls_session_store: TlsSessionStore,
309
310    /// Store for TLS SPKI hashes.
311    ///
312    /// Used to remember public keys
313    /// of TLS certificates to accept them
314    /// even after they expire.
315    pub(crate) spki_hash_store: SpkiHashStore,
316
317    /// Iroh for realtime peer channels.
318    pub(crate) iroh: Arc<RwLock<Option<Iroh>>>,
319
320    /// The own fingerprint, if it was computed already.
321    /// tokio::sync::OnceCell would be possible to use, but overkill for our usecase;
322    /// the standard library's OnceLock is enough, and it's a lot smaller in memory.
323    pub(crate) self_fingerprint: OnceLock<String>,
324
325    /// OpenPGP certificate aka Transferrable Public Key.
326    ///
327    /// It is generated on first use from the secret key stored in the database.
328    ///
329    /// Mutex is also held while generating the key to avoid generating the key twice.
330    pub(crate) self_public_key: Mutex<Option<SignedPublicKey>>,
331
332    /// `Connectivity` values for mailboxes, unordered. Used to compute the aggregate connectivity,
333    /// see [`Context::get_connectivity()`].
334    pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
335
336    #[expect(clippy::type_complexity)]
337    /// Transforms the root of the cryptographic payload before encryption.
338    pub(crate) pre_encrypt_mime_hook: parking_lot::Mutex<
339        Option<
340            for<'a> fn(
341                &Context,
342                mail_builder::mime::MimePart<'a>,
343            ) -> mail_builder::mime::MimePart<'a>,
344        >,
345    >,
346}
347
348/// The state of ongoing process.
349#[derive(Debug, Default)]
350enum RunningState {
351    /// Ongoing process is allocated.
352    Running { cancel_sender: Sender<()> },
353
354    /// Cancel signal has been sent, waiting for ongoing process to be freed.
355    ShallStop { request: tools::Time },
356
357    /// There is no ongoing process, a new one can be allocated.
358    #[default]
359    Stopped,
360}
361
362/// Return some info about deltachat-core
363///
364/// This contains information mostly about the library itself, the
365/// actual keys and their values which will be present are not
366/// guaranteed.  Calling [Context::get_info] also includes information
367/// about the context on top of the information here.
368#[expect(clippy::arithmetic_side_effects)]
369pub fn get_info() -> BTreeMap<&'static str, String> {
370    let mut res = BTreeMap::new();
371
372    #[cfg(debug_assertions)]
373    res.insert(
374        "debug_assertions",
375        "On - DO NOT RELEASE THIS BUILD".to_string(),
376    );
377    #[cfg(not(debug_assertions))]
378    res.insert("debug_assertions", "Off".to_string());
379
380    res.insert("deltachat_core_version", format!("v{DC_VERSION_STR}"));
381    res.insert("sqlite_version", rusqlite::version().to_string());
382    res.insert("arch", (std::mem::size_of::<usize>() * 8).to_string());
383    res.insert("num_cpus", num_cpus::get().to_string());
384    res.insert("level", "awesome".into());
385    res
386}
387
388impl Context {
389    /// Creates new context and opens the database.
390    pub async fn new(
391        dbfile: &Path,
392        id: u32,
393        events: Events,
394        stock_strings: StockStrings,
395    ) -> Result<Context> {
396        let context =
397            Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
398
399        // Open the database if is not encrypted.
400        if context.check_passphrase("".to_string()).await? {
401            context.sql.open(&context, "".to_string()).await?;
402        }
403        Ok(context)
404    }
405
406    /// Creates new context without opening the database.
407    pub async fn new_closed(
408        dbfile: &Path,
409        id: u32,
410        events: Events,
411        stockstrings: StockStrings,
412        push_subscriber: PushSubscriber,
413    ) -> Result<Context> {
414        let mut blob_fname = OsString::new();
415        blob_fname.push(dbfile.file_name().unwrap_or_default());
416        blob_fname.push("-blobs");
417        let blobdir = dbfile.with_file_name(blob_fname);
418        if !blobdir.exists() {
419            tokio::fs::create_dir_all(&blobdir).await?;
420        }
421        let context = Context::with_blobdir(
422            dbfile.into(),
423            blobdir,
424            id,
425            events,
426            stockstrings,
427            push_subscriber,
428        )?;
429        Ok(context)
430    }
431
432    /// Returns a weak reference to this [`Context`].
433    pub(crate) fn get_weak_context(&self) -> WeakContext {
434        WeakContext {
435            inner: Arc::downgrade(&self.inner),
436        }
437    }
438
439    /// Opens the database with the given passphrase.
440    /// NB: Db encryption is deprecated, so `passphrase` should be empty normally. See
441    /// [`ContextBuilder::with_password()`] for reasoning.
442    ///
443    /// Returns true if passphrase is correct, false is passphrase is not correct. Fails on other
444    /// errors.
445    #[deprecated(since = "TBD")]
446    pub async fn open(&self, passphrase: String) -> Result<bool> {
447        if self.sql.check_passphrase(passphrase.clone()).await? {
448            self.sql.open(self, passphrase).await?;
449            Ok(true)
450        } else {
451            Ok(false)
452        }
453    }
454
455    /// Changes encrypted database passphrase.
456    /// Deprecated 2025-11, see [`ContextBuilder::with_password()`] for reasoning.
457    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
458        self.sql.change_passphrase(passphrase).await?;
459        Ok(())
460    }
461
462    /// Returns true if database is open.
463    pub async fn is_open(&self) -> bool {
464        self.sql.is_open().await
465    }
466
467    /// Tests the database passphrase.
468    ///
469    /// Returns true if passphrase is correct.
470    ///
471    /// Fails if database is already open.
472    pub(crate) async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
473        self.sql.check_passphrase(passphrase).await
474    }
475
476    pub(crate) fn with_blobdir(
477        dbfile: PathBuf,
478        blobdir: PathBuf,
479        id: u32,
480        events: Events,
481        stockstrings: StockStrings,
482        push_subscriber: PushSubscriber,
483    ) -> Result<Context> {
484        ensure!(
485            blobdir.is_dir(),
486            "Blobdir does not exist: {}",
487            blobdir.display()
488        );
489
490        let new_msgs_notify = Notify::new();
491        // Notify once immediately to allow processing old messages
492        // without starting I/O.
493        new_msgs_notify.notify_one();
494
495        let inner = InnerContext {
496            id,
497            blobdir,
498            running_state: RwLock::new(Default::default()),
499            sql: Sql::new(dbfile),
500            smeared_timestamp: SmearedTimestamp::new(),
501            oauth2_mutex: Mutex::new(()),
502            wrong_pw_warning_mutex: Mutex::new(()),
503            housekeeping_mutex: Mutex::new(()),
504            fetch_msgs_mutex: Mutex::new(()),
505            translated_stockstrings: stockstrings,
506            events,
507            scheduler: SchedulerState::new(),
508            ratelimit: RwLock::new(Ratelimit::new(Duration::new(3, 0), 3.0)), // Allow at least 1 message every second + a burst of 3.
509            quota: RwLock::new(BTreeMap::new()),
510            new_msgs_notify,
511            server_id: RwLock::new(None),
512            metadata: RwLock::new(None),
513            creation_time: tools::Time::now(),
514            last_error: parking_lot::RwLock::new("".to_string()),
515            migration_error: parking_lot::RwLock::new(None),
516            debug_logging: std::sync::RwLock::new(None),
517            push_subscriber,
518            push_subscribed: AtomicBool::new(false),
519            tls_session_store: TlsSessionStore::new(),
520            spki_hash_store: SpkiHashStore::new(),
521            iroh: Arc::new(RwLock::new(None)),
522            self_fingerprint: OnceLock::new(),
523            self_public_key: Mutex::new(None),
524            connectivities: parking_lot::Mutex::new(Vec::new()),
525            pre_encrypt_mime_hook: None.into(),
526        };
527
528        let ctx = Context {
529            inner: Arc::new(inner),
530        };
531
532        Ok(ctx)
533    }
534
535    /// Starts the IO scheduler.
536    pub async fn start_io(&self) {
537        if !self.is_configured().await.unwrap_or_default() {
538            warn!(self, "can not start io on a context that is not configured");
539            return;
540        }
541
542        // The next line is mainly for iOS:
543        // iOS starts a separate process for receiving notifications and if the user concurrently
544        // starts the app, the UI process opens the database but waits with calling start_io()
545        // until the notifications process finishes.
546        // Now, some configs may have changed, so, we need to invalidate the cache.
547        self.sql.config_cache.write().await.clear();
548
549        self.scheduler.start(self).await;
550    }
551
552    /// Stops the IO scheduler.
553    pub async fn stop_io(&self) {
554        self.scheduler.stop(self).await;
555        if let Some(iroh) = self.iroh.write().await.take() {
556            // Close all QUIC connections.
557
558            // Spawn into a separate task,
559            // because Iroh calls `wait_idle()` internally
560            // and it may take time, especially if the network
561            // has become unavailable.
562            tokio::spawn(async move {
563                // We do not log the error because we do not want the task
564                // to hold the reference to Context.
565                let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
566            });
567        }
568    }
569
570    /// Restarts the IO scheduler if it was running before
571    /// when it is not running this is an no-op
572    pub async fn restart_io_if_running(&self) {
573        self.scheduler.restart(self).await;
574    }
575
576    /// Indicate that the network likely has come back.
577    pub async fn maybe_network(&self) {
578        if let Some(ref iroh) = *self.iroh.read().await {
579            iroh.network_change().await;
580        }
581        self.scheduler.maybe_network().await;
582    }
583
584    /// Returns true if an account is on a chatmail server.
585    pub async fn is_chatmail(&self) -> Result<bool> {
586        self.get_config_bool(Config::IsChatmail).await
587    }
588
589    /// Returns maximum number of recipients the provider allows to send a single email to.
590    pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
591        let is_chatmail = self.is_chatmail().await?;
592        let val = self
593            .get_configured_provider()
594            .await?
595            .and_then(|provider| provider.opt.max_smtp_rcpt_to)
596            .map_or_else(
597                || match is_chatmail {
598                    true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
599                    false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
600                },
601                usize::from,
602            );
603        Ok(val)
604    }
605
606    /// Does a single round of fetching from IMAP and returns.
607    ///
608    /// Can be used even if I/O is currently stopped.
609    /// If I/O is currently stopped, starts a new IMAP connection
610    /// and fetches from Inbox and DeltaChat folders.
611    pub async fn background_fetch(&self) -> Result<()> {
612        if !(self.is_configured().await?) {
613            return Ok(());
614        }
615
616        let address = self.get_primary_self_addr().await?;
617        let time_start = tools::Time::now();
618        info!(self, "background_fetch started fetching {address}.");
619
620        if self.scheduler.is_running().await {
621            self.scheduler.maybe_network().await;
622            self.wait_for_all_work_done().await;
623        } else {
624            // Pause the scheduler to ensure another connection does not start
625            // while we are fetching on a dedicated connection.
626            let _pause_guard = self.scheduler.pause(self).await?;
627
628            // Start a new dedicated connection.
629            let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
630            let mut session = connection.prepare(self).await?;
631
632            // Fetch IMAP folders.
633            let folder = connection.folder.clone();
634            connection
635                .fetch_move_delete(self, &mut session, &folder)
636                .await?;
637
638            // Update quota (to send warning if full) - but only check it once in a while.
639            // note: For now this only checks quota of primary transport,
640            // because background check only checks primary transport at the moment
641            if self
642                .quota_needs_update(
643                    session.transport_id(),
644                    DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT,
645                )
646                .await
647                && let Err(err) = self.update_recent_quota(&mut session, &folder).await
648            {
649                warn!(self, "Failed to update quota: {err:#}.");
650            }
651        }
652
653        info!(
654            self,
655            "background_fetch done for {address} took {:?}.",
656            time_elapsed(&time_start),
657        );
658
659        Ok(())
660    }
661
662    /// Returns a reference to the underlying SQL instance.
663    ///
664    /// Warning: this is only here for testing, not part of the public API.
665    #[cfg(feature = "internals")]
666    pub fn sql(&self) -> &Sql {
667        &self.inner.sql
668    }
669
670    /// Returns database file path.
671    pub fn get_dbfile(&self) -> &Path {
672        self.sql.dbfile.as_path()
673    }
674
675    /// Returns blob directory path.
676    pub fn get_blobdir(&self) -> &Path {
677        self.blobdir.as_path()
678    }
679
680    /// Emits a single event.
681    pub fn emit_event(&self, event: EventType) {
682        {
683            let lock = self.debug_logging.read().expect("RwLock is poisoned");
684            if let Some(debug_logging) = &*lock {
685                debug_logging.log_event(event.clone());
686            }
687        }
688        self.events.emit(Event {
689            id: self.id,
690            typ: event,
691        });
692    }
693
694    /// Emits a generic MsgsChanged event (without chat or message id)
695    pub fn emit_msgs_changed_without_ids(&self) {
696        self.emit_event(EventType::MsgsChanged {
697            chat_id: ChatId::new(0),
698            msg_id: MsgId::new(0),
699        });
700    }
701
702    /// Emits a MsgsChanged event with specified chat and message ids
703    ///
704    /// If IDs are unset, [`Self::emit_msgs_changed_without_ids`]
705    /// or [`Self::emit_msgs_changed_without_msg_id`] should be used
706    /// instead of this function.
707    pub fn emit_msgs_changed(&self, chat_id: ChatId, msg_id: MsgId) {
708        logged_debug_assert!(
709            self,
710            !chat_id.is_unset(),
711            "emit_msgs_changed: chat_id is unset."
712        );
713        logged_debug_assert!(
714            self,
715            !msg_id.is_unset(),
716            "emit_msgs_changed: msg_id is unset."
717        );
718
719        self.emit_event(EventType::MsgsChanged { chat_id, msg_id });
720        chatlist_events::emit_chatlist_changed(self);
721        chatlist_events::emit_chatlist_item_changed(self, chat_id);
722    }
723
724    /// Emits a MsgsChanged event with specified chat and without message id.
725    pub fn emit_msgs_changed_without_msg_id(&self, chat_id: ChatId) {
726        logged_debug_assert!(
727            self,
728            !chat_id.is_unset(),
729            "emit_msgs_changed_without_msg_id: chat_id is unset."
730        );
731
732        self.emit_event(EventType::MsgsChanged {
733            chat_id,
734            msg_id: MsgId::new(0),
735        });
736        chatlist_events::emit_chatlist_changed(self);
737        chatlist_events::emit_chatlist_item_changed(self, chat_id);
738    }
739
740    /// Emits an IncomingMsg event with specified chat and message ids
741    pub fn emit_incoming_msg(&self, chat_id: ChatId, msg_id: MsgId) {
742        debug_assert!(!chat_id.is_unset());
743        debug_assert!(!msg_id.is_unset());
744
745        self.emit_event(EventType::IncomingMsg { chat_id, msg_id });
746        chatlist_events::emit_chatlist_changed(self);
747        chatlist_events::emit_chatlist_item_changed(self, chat_id);
748    }
749
750    /// Emits an LocationChanged event and a WebxdcStatusUpdate in case there is a maps integration
751    pub async fn emit_location_changed(&self, contact_id: Option<ContactId>) -> Result<()> {
752        self.emit_event(EventType::LocationChanged(contact_id));
753
754        if let Some(msg_id) = self
755            .get_config_parsed::<u32>(Config::WebxdcIntegration)
756            .await?
757        {
758            self.emit_event(EventType::WebxdcStatusUpdate {
759                msg_id: MsgId::new(msg_id),
760                status_update_serial: Default::default(),
761            })
762        }
763
764        Ok(())
765    }
766
767    /// Returns a receiver for emitted events.
768    ///
769    /// Multiple emitters can be created, but note that in this case each emitted event will
770    /// only be received by one of the emitters, not by all of them.
771    pub fn get_event_emitter(&self) -> EventEmitter {
772        self.events.get_emitter()
773    }
774
775    /// Get the ID of this context.
776    pub fn get_id(&self) -> u32 {
777        self.id
778    }
779
780    // Ongoing process allocation/free/check
781
782    /// Tries to acquire the global UI "ongoing" mutex.
783    ///
784    /// This is for modal operations during which no other user actions are allowed.  Only
785    /// one such operation is allowed at any given time.
786    ///
787    /// The return value is a cancel token, which will release the ongoing mutex when
788    /// dropped.
789    pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
790        let mut s = self.running_state.write().await;
791        ensure!(
792            matches!(*s, RunningState::Stopped),
793            "There is already another ongoing process running."
794        );
795
796        let (sender, receiver) = channel::bounded(1);
797        *s = RunningState::Running {
798            cancel_sender: sender,
799        };
800
801        Ok(receiver)
802    }
803
804    pub(crate) async fn free_ongoing(&self) {
805        let mut s = self.running_state.write().await;
806        if let RunningState::ShallStop { request } = *s {
807            info!(self, "Ongoing stopped in {:?}", time_elapsed(&request));
808        }
809        *s = RunningState::Stopped;
810    }
811
812    /// Signal an ongoing process to stop.
813    pub async fn stop_ongoing(&self) {
814        let mut s = self.running_state.write().await;
815        match &*s {
816            RunningState::Running { cancel_sender } => {
817                if let Err(err) = cancel_sender.send(()).await {
818                    warn!(self, "could not cancel ongoing: {:#}", err);
819                }
820                info!(self, "Signaling the ongoing process to stop ASAP.",);
821                *s = RunningState::ShallStop {
822                    request: tools::Time::now(),
823                };
824            }
825            RunningState::ShallStop { .. } | RunningState::Stopped => {
826                info!(self, "No ongoing process to stop.",);
827            }
828        }
829    }
830
831    #[allow(unused)]
832    pub(crate) async fn shall_stop_ongoing(&self) -> bool {
833        match &*self.running_state.read().await {
834            RunningState::Running { .. } => false,
835            RunningState::ShallStop { .. } | RunningState::Stopped => true,
836        }
837    }
838
839    /*******************************************************************************
840     * UI chat/message related API
841     ******************************************************************************/
842
843    /// Returns information about the context as key-value pairs.
844    pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
845        let all_transports: Vec<String> = ConfiguredLoginParam::load_all(self)
846            .await?
847            .into_iter()
848            .map(|(transport_id, param)| format!("{transport_id}: {param}"))
849            .collect();
850        let all_transports = if all_transports.is_empty() {
851            "Not configured".to_string()
852        } else {
853            all_transports.join(",")
854        };
855        let chats = get_chat_cnt(self).await?;
856        let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;
857        let request_msgs = message::get_request_msg_cnt(self).await;
858        let contacts = Contact::get_real_cnt(self).await?;
859        let proxy_enabled = self.get_config_int(Config::ProxyEnabled).await?;
860        let dbversion = self
861            .sql
862            .get_raw_config_int("dbversion")
863            .await?
864            .unwrap_or_default();
865        let journal_mode = self
866            .sql
867            .query_get_value("PRAGMA journal_mode;", ())
868            .await?
869            .unwrap_or_else(|| "unknown".to_string());
870        let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
871        let bcc_self = self.get_config_int(Config::BccSelf).await?;
872        let sync_msgs = self.get_config_int(Config::SyncMsgs).await?;
873        let disable_idle = self.get_config_bool(Config::DisableIdle).await?;
874
875        let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;", ()).await?;
876
877        let pub_key_cnt = self
878            .sql
879            .count("SELECT COUNT(*) FROM public_keys;", ())
880            .await?;
881
882        let mut res = get_info();
883
884        // insert values
885        res.insert("bot", self.get_config_int(Config::Bot).await?.to_string());
886        res.insert("number_of_chats", chats.to_string());
887        res.insert("number_of_chat_messages", unblocked_msgs.to_string());
888        res.insert("messages_in_contact_requests", request_msgs.to_string());
889        res.insert("number_of_contacts", contacts.to_string());
890        res.insert("database_dir", self.get_dbfile().display().to_string());
891        res.insert("database_version", dbversion.to_string());
892        res.insert(
893            "database_encrypted",
894            self.sql
895                .is_encrypted()
896                .await
897                .map_or_else(|| "closed".to_string(), |b| b.to_string()),
898        );
899        res.insert("journal_mode", journal_mode);
900        res.insert("blobdir", self.get_blobdir().display().to_string());
901        res.insert(
902            "selfavatar",
903            self.get_config(Config::Selfavatar)
904                .await?
905                .unwrap_or_else(|| "<unset>".to_string()),
906        );
907        res.insert("proxy_enabled", proxy_enabled.to_string());
908        res.insert("used_transport_settings", all_transports);
909
910        if let Some(server_id) = &*self.server_id.read().await {
911            res.insert("imap_server_id", format!("{server_id:?}"));
912        }
913
914        res.insert("is_chatmail", self.is_chatmail().await?.to_string());
915        res.insert(
916            "fix_is_chatmail",
917            self.get_config_bool(Config::FixIsChatmail)
918                .await?
919                .to_string(),
920        );
921        res.insert(
922            "is_muted",
923            self.get_config_bool(Config::IsMuted).await?.to_string(),
924        );
925        res.insert(
926            "private_tag",
927            self.get_config(Config::PrivateTag)
928                .await?
929                .unwrap_or_else(|| "<unset>".to_string()),
930        );
931
932        if let Some(metadata) = &*self.metadata.read().await {
933            if let Some(comment) = &metadata.comment {
934                res.insert("imap_server_comment", format!("{comment:?}"));
935            }
936
937            if let Some(admin) = &metadata.admin {
938                res.insert("imap_server_admin", format!("{admin:?}"));
939            }
940        }
941
942        res.insert(
943            "who_can_call_me",
944            self.get_config_int(Config::WhoCanCallMe).await?.to_string(),
945        );
946        res.insert(
947            "download_limit",
948            self.get_config_int(Config::DownloadLimit)
949                .await?
950                .to_string(),
951        );
952        res.insert("mdns_enabled", mdns_enabled.to_string());
953        res.insert("bcc_self", bcc_self.to_string());
954        res.insert("sync_msgs", sync_msgs.to_string());
955        res.insert("disable_idle", disable_idle.to_string());
956        res.insert("private_key_count", prv_key_cnt.to_string());
957        res.insert("public_key_count", pub_key_cnt.to_string());
958        res.insert(
959            "media_quality",
960            self.get_config_int(Config::MediaQuality).await?.to_string(),
961        );
962        res.insert(
963            "delete_device_after",
964            self.get_config_int(Config::DeleteDeviceAfter)
965                .await?
966                .to_string(),
967        );
968        res.insert(
969            "delete_server_after",
970            self.get_config_int(Config::DeleteServerAfter)
971                .await?
972                .to_string(),
973        );
974        res.insert(
975            "last_housekeeping",
976            self.get_config_int(Config::LastHousekeeping)
977                .await?
978                .to_string(),
979        );
980        res.insert(
981            "last_cant_decrypt_outgoing_msgs",
982            self.get_config_int(Config::LastCantDecryptOutgoingMsgs)
983                .await?
984                .to_string(),
985        );
986        res.insert(
987            "debug_logging",
988            self.get_config_int(Config::DebugLogging).await?.to_string(),
989        );
990        res.insert(
991            "last_msg_id",
992            self.get_config_int(Config::LastMsgId).await?.to_string(),
993        );
994        res.insert(
995            "gossip_period",
996            self.get_config_int(Config::GossipPeriod).await?.to_string(),
997        );
998        res.insert(
999            "webxdc_realtime_enabled",
1000            self.get_config_bool(Config::WebxdcRealtimeEnabled)
1001                .await?
1002                .to_string(),
1003        );
1004        res.insert(
1005            "donation_request_next_check",
1006            self.get_config_i64(Config::DonationRequestNextCheck)
1007                .await?
1008                .to_string(),
1009        );
1010        res.insert(
1011            "first_key_contacts_msg_id",
1012            self.sql
1013                .get_raw_config("first_key_contacts_msg_id")
1014                .await?
1015                .unwrap_or_default(),
1016        );
1017        res.insert(
1018            "stats_id",
1019            self.get_config(Config::StatsId)
1020                .await?
1021                .unwrap_or_else(|| "<unset>".to_string()),
1022        );
1023        res.insert(
1024            "stats_sending",
1025            stats::should_send_stats(self).await?.to_string(),
1026        );
1027        res.insert(
1028            "stats_last_sent",
1029            self.get_config_i64(Config::StatsLastSent)
1030                .await?
1031                .to_string(),
1032        );
1033        res.insert(
1034            "test_hooks",
1035            self.sql
1036                .get_raw_config("test_hooks")
1037                .await?
1038                .unwrap_or_default(),
1039        );
1040        res.insert(
1041            "std_header_protection_composing",
1042            self.sql
1043                .get_raw_config("std_header_protection_composing")
1044                .await?
1045                .unwrap_or_default(),
1046        );
1047        res.insert(
1048            "team_profile",
1049            self.get_config_bool(Config::TeamProfile).await?.to_string(),
1050        );
1051        res.insert(
1052            "force_encryption",
1053            self.get_config_bool(Config::ForceEncryption)
1054                .await?
1055                .to_string(),
1056        );
1057
1058        let elapsed = time_elapsed(&self.creation_time);
1059        res.insert("uptime", duration_to_str(elapsed));
1060
1061        Ok(res)
1062    }
1063
1064    /// Get a list of fresh, unmuted messages in unblocked chats.
1065    ///
1066    /// The list starts with the most recent message
1067    /// and is typically used to show notifications.
1068    /// Moreover, the number of returned messages
1069    /// can be used for a badge counter on the app icon.
1070    pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
1071        let list = self
1072            .sql
1073            .query_map_vec(
1074                "SELECT m.id
1075FROM msgs m
1076LEFT JOIN contacts ct
1077    ON m.from_id=ct.id
1078LEFT JOIN chats c
1079    ON m.chat_id=c.id
1080WHERE m.state=?
1081AND m.hidden=0
1082AND m.chat_id>9
1083AND ct.blocked=0
1084AND c.blocked=0
1085AND NOT(c.muted_until=-1 OR c.muted_until>?)
1086ORDER BY m.timestamp DESC,m.id DESC",
1087                (MessageState::InFresh, time()),
1088                |row| {
1089                    let msg_id: MsgId = row.get(0)?;
1090                    Ok(msg_id)
1091                },
1092            )
1093            .await?;
1094        Ok(list)
1095    }
1096
1097    /// (deprecated) Returns a list of messages with database ID higher than requested.
1098    ///
1099    /// Blocked contacts and chats are excluded,
1100    /// but self-sent messages and contact requests are included in the results.
1101    ///
1102    /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives,
1103    /// even if it is not fully downloaded yet.
1104    /// The bot needs to wait for the message to be fully downloaded.
1105    /// Since this is usually not the desired behavior,
1106    /// bots should instead use the [`EventType::IncomingMsg`]
1107    /// event for getting notified about new messages.
1108    pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
1109        let last_msg_id = match self.get_config(Config::LastMsgId).await? {
1110            Some(s) => MsgId::new(s.parse()?),
1111            None => {
1112                // If `last_msg_id` is not set yet,
1113                // subtract 1 from the last id,
1114                // so a single message is returned and can
1115                // be marked as seen.
1116                self.sql
1117                    .query_row(
1118                        "SELECT IFNULL((SELECT MAX(id) - 1 FROM msgs), 0)",
1119                        (),
1120                        |row| {
1121                            let msg_id: MsgId = row.get(0)?;
1122                            Ok(msg_id)
1123                        },
1124                    )
1125                    .await?
1126            }
1127        };
1128
1129        let list = self
1130            .sql
1131            .query_map_vec(
1132                "SELECT m.id
1133                     FROM msgs m
1134                     LEFT JOIN contacts ct
1135                            ON m.from_id=ct.id
1136                     LEFT JOIN chats c
1137                            ON m.chat_id=c.id
1138                     WHERE m.id>?
1139                       AND m.hidden=0
1140                       AND m.chat_id>9
1141                       AND ct.blocked=0
1142                       AND c.blocked!=1
1143                     ORDER BY m.id ASC",
1144                (
1145                    last_msg_id.to_u32(), // Explicitly convert to u32 because 0 is allowed.
1146                ),
1147                |row| {
1148                    let msg_id: MsgId = row.get(0)?;
1149                    Ok(msg_id)
1150                },
1151            )
1152            .await?;
1153        Ok(list)
1154    }
1155
1156    /// (deprecated) Returns a list of messages with database ID higher than last marked as seen.
1157    ///
1158    /// This function is supposed to be used by bot to request messages
1159    /// that are not processed yet.
1160    ///
1161    /// Waits for notification and returns a result.
1162    /// Note that the result may be empty if the message is deleted
1163    /// shortly after notification or notification is manually triggered
1164    /// to interrupt waiting.
1165    /// Notification may be manually triggered by calling [`Self::stop_io`].
1166    ///
1167    /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives,
1168    /// even if it is not fully downloaded yet.
1169    /// The bot needs to wait for the message to be fully downloaded.
1170    /// Since this is usually not the desired behavior,
1171    /// bots should instead use the #DC_EVENT_INCOMING_MSG / [`EventType::IncomingMsg`]
1172    /// event for getting notified about new messages.
1173    pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
1174        self.new_msgs_notify.notified().await;
1175        let list = self.get_next_msgs().await?;
1176        Ok(list)
1177    }
1178
1179    /// Searches for messages containing the query string case-insensitively.
1180    ///
1181    /// If `chat_id` is provided this searches only for messages in this chat, if `chat_id`
1182    /// is `None` this searches messages from all chats.
1183    ///
1184    /// NB: Wrt the search in long messages which are shown truncated with the "Show Full Message…"
1185    /// button, we only look at the first several kilobytes. Let's not fix this -- one can send a
1186    /// dictionary in the message that matches any reasonable search request, but the user won't see
1187    /// the match because they should tap on "Show Full Message…" for that. Probably such messages
1188    /// would only clutter search results.
1189    pub async fn search_msgs(&self, chat_id: Option<ChatId>, query: &str) -> Result<Vec<MsgId>> {
1190        let real_query = query.trim().to_lowercase();
1191        if real_query.is_empty() {
1192            return Ok(Vec::new());
1193        }
1194        let str_like_in_text = format!("%{real_query}%");
1195
1196        let list = if let Some(chat_id) = chat_id {
1197            self.sql
1198                .query_map_vec(
1199                    "SELECT m.id AS id
1200                 FROM msgs m
1201                 LEFT JOIN contacts ct
1202                        ON m.from_id=ct.id
1203                 WHERE m.chat_id=?
1204                   AND m.hidden=0
1205                   AND ct.blocked=0
1206                   AND IFNULL(txt_normalized, txt) LIKE ?
1207                 ORDER BY m.timestamp,m.id;",
1208                    (chat_id, str_like_in_text),
1209                    |row| {
1210                        let msg_id: MsgId = row.get("id")?;
1211                        Ok(msg_id)
1212                    },
1213                )
1214                .await?
1215        } else {
1216            // For performance reasons results are sorted only by `id`, that is in the order of
1217            // message reception.
1218            //
1219            // Unlike chat view, sorting by `timestamp` is not necessary but slows down the query by
1220            // ~25% according to benchmarks.
1221            //
1222            // To speed up incremental search, where queries for few characters usually return lots
1223            // of unwanted results that are discarded moments later, we added `LIMIT 1000`.
1224            // According to some tests, this limit speeds up eg. 2 character searches by factor 10.
1225            // The limit is documented and UI may add a hint when getting 1000 results.
1226            self.sql
1227                .query_map_vec(
1228                    "SELECT m.id AS id
1229                 FROM msgs m
1230                 LEFT JOIN contacts ct
1231                        ON m.from_id=ct.id
1232                 LEFT JOIN chats c
1233                        ON m.chat_id=c.id
1234                 WHERE m.chat_id>9
1235                   AND m.hidden=0
1236                   AND c.blocked!=1
1237                   AND ct.blocked=0
1238                   AND IFNULL(txt_normalized, txt) LIKE ?
1239                 ORDER BY m.id DESC LIMIT 1000",
1240                    (str_like_in_text,),
1241                    |row| {
1242                        let msg_id: MsgId = row.get("id")?;
1243                        Ok(msg_id)
1244                    },
1245                )
1246                .await?
1247        };
1248
1249        Ok(list)
1250    }
1251
1252    pub(crate) fn derive_blobdir(dbfile: &Path) -> PathBuf {
1253        let mut blob_fname = OsString::new();
1254        blob_fname.push(dbfile.file_name().unwrap_or_default());
1255        blob_fname.push("-blobs");
1256        dbfile.with_file_name(blob_fname)
1257    }
1258
1259    pub(crate) fn derive_walfile(dbfile: &Path) -> PathBuf {
1260        let mut wal_fname = OsString::new();
1261        wal_fname.push(dbfile.file_name().unwrap_or_default());
1262        wal_fname.push("-wal");
1263        dbfile.with_file_name(wal_fname)
1264    }
1265}
1266
1267#[cfg(test)]
1268mod context_tests;