deltachat/
context.rs

1//! Context module.
2
3use std::collections::{BTreeMap, HashMap};
4use std::ffi::OsString;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::AtomicBool;
8use std::sync::{Arc, OnceLock, Weak};
9use std::time::Duration;
10
11use anyhow::{Result, bail, ensure};
12use async_channel::{self as channel, Receiver, Sender};
13use pgp::composed::SignedPublicKey;
14use ratelimit::Ratelimit;
15use tokio::sync::{Mutex, Notify, RwLock};
16
17use crate::chat::{ChatId, get_chat_cnt};
18use crate::config::Config;
19use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
20use crate::contact::{Contact, ContactId};
21use crate::debug_logging::DebugLogging;
22use crate::events::{Event, EventEmitter, EventType, Events};
23use crate::imap::{Imap, ServerMetadata};
24use crate::key::self_fingerprint;
25use crate::log::warn;
26use crate::logged_debug_assert;
27use crate::message::{self, MessageState, MsgId};
28use crate::net::tls::{SpkiHashStore, TlsSessionStore};
29use crate::peer_channels::Iroh;
30use crate::push::PushSubscriber;
31use crate::quota::QuotaInfo;
32use crate::scheduler::{ConnectivityStore, SchedulerState};
33use crate::sql::Sql;
34use crate::stock_str::StockStrings;
35use crate::timesmearing::SmearedTimestamp;
36use crate::tools::{self, duration_to_str, time, time_elapsed};
37use crate::transport::ConfiguredLoginParam;
38use crate::{chatlist_events, stats};
39
40pub use crate::scheduler::connectivity::Connectivity;
41
42/// Builder for the [`Context`].
43///
44/// Many arguments to the [`Context`] are kind of optional and only needed to handle
45/// multiple contexts, for which the [account manager](crate::accounts::Accounts) should be
46/// used.  This builder makes creating a new context simpler, especially for the
47/// standalone-context case.
48///
49/// # Examples
50///
51/// Creating a new database:
52///
53/// ```
54/// # let rt = tokio::runtime::Runtime::new().unwrap();
55/// # rt.block_on(async move {
56/// use deltachat::context::ContextBuilder;
57///
58/// let dir = tempfile::tempdir().unwrap();
59/// let context = ContextBuilder::new(dir.path().join("db"))
60///      .open()
61///      .await
62///      .unwrap();
63/// drop(context);
64/// # });
65/// ```
66#[derive(Clone, Debug)]
67pub struct ContextBuilder {
68    dbfile: PathBuf,
69    id: u32,
70    events: Events,
71    stock_strings: StockStrings,
72    password: Option<String>,
73
74    push_subscriber: Option<PushSubscriber>,
75}
76
77impl ContextBuilder {
78    /// Create the builder using the given database file.
79    ///
80    /// The *dbfile* should be in a dedicated directory and this directory must exist.  The
81    /// [`Context`] will create other files and folders in the same directory as the
82    /// database file used.
83    pub fn new(dbfile: PathBuf) -> Self {
84        ContextBuilder {
85            dbfile,
86            id: rand::random(),
87            events: Events::new(),
88            stock_strings: StockStrings::new(),
89            password: None,
90            push_subscriber: None,
91        }
92    }
93
94    /// Sets the context ID.
95    ///
96    /// This identifier is used e.g. in [`Event`]s to identify which [`Context`] an event
97    /// belongs to.  The only real limit on it is that it should not conflict with any other
98    /// [`Context`]s you currently have open.  So if you handle multiple [`Context`]s you
99    /// may want to use this.
100    ///
101    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
102    /// common case for using multiple [`Context`] instances.
103    pub fn with_id(mut self, id: u32) -> Self {
104        self.id = id;
105        self
106    }
107
108    /// Sets the event channel for this [`Context`].
109    ///
110    /// Mostly useful when using multiple [`Context`]s, this allows creating one [`Events`]
111    /// channel and passing it to all [`Context`]s so all events are received on the same
112    /// channel.
113    ///
114    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
115    /// common case for using multiple [`Context`] instances.
116    pub fn with_events(mut self, events: Events) -> Self {
117        self.events = events;
118        self
119    }
120
121    /// Sets the [`StockStrings`] map to use for this [`Context`].
122    ///
123    /// This is useful in order to share the same translation strings in all [`Context`]s.
124    /// The mapping may be empty when set, it will be populated by
125    /// [`Context::set_stock_translation`] or [`Accounts::set_stock_translation`] calls.
126    ///
127    /// Note that the [account manager](crate::accounts::Accounts) is designed to handle the
128    /// common case for using multiple [`Context`] instances.
129    ///
130    /// [`Accounts::set_stock_translation`]: crate::accounts::Accounts::set_stock_translation
131    pub fn with_stock_strings(mut self, stock_strings: StockStrings) -> Self {
132        self.stock_strings = stock_strings;
133        self
134    }
135
136    /// Sets the password to unlock the database.
137    /// Deprecated 2025-11:
138    /// - Db encryption does nothing with blobs, so fs/disk encryption is recommended.
139    /// - Isolation from other apps is needed anyway.
140    ///
141    /// If an encrypted database is used it must be opened with a password.  Setting a
142    /// password on a new database will enable encryption.
143    #[deprecated(since = "TBD")]
144    pub fn with_password(mut self, password: String) -> Self {
145        self.password = Some(password);
146        self
147    }
148
149    /// Sets push subscriber.
150    pub(crate) fn with_push_subscriber(mut self, push_subscriber: PushSubscriber) -> Self {
151        self.push_subscriber = Some(push_subscriber);
152        self
153    }
154
155    /// Builds the [`Context`] without opening it.
156    pub async fn build(self) -> Result<Context> {
157        let push_subscriber = self.push_subscriber.unwrap_or_default();
158        let context = Context::new_closed(
159            &self.dbfile,
160            self.id,
161            self.events,
162            self.stock_strings,
163            push_subscriber,
164        )
165        .await?;
166        Ok(context)
167    }
168
169    /// Builds the [`Context`] and opens it.
170    ///
171    /// Returns error if context cannot be opened.
172    pub async fn open(self) -> Result<Context> {
173        let password = self.password.clone().unwrap_or_default();
174        let context = self.build().await?;
175        match context.open(password).await? {
176            true => Ok(context),
177            false => bail!("database could not be decrypted, incorrect or missing password"),
178        }
179    }
180}
181
182/// The context for a single DeltaChat account.
183///
184/// This contains all the state for a single DeltaChat account, including background tasks
185/// running in Tokio to operate the account.  The [`Context`] can be cheaply cloned.
186///
187/// Each context, and thus each account, must be associated with an directory where all the
188/// state is kept.  This state is also preserved between restarts.
189///
190/// To use multiple accounts it is best to look at the [accounts
191/// manager][crate::accounts::Accounts] which handles storing multiple accounts in a single
192/// directory structure and handles loading them all concurrently.
193#[derive(Clone, Debug)]
194pub struct Context {
195    pub(crate) inner: Arc<InnerContext>,
196}
197
198impl Deref for Context {
199    type Target = InnerContext;
200
201    fn deref(&self) -> &Self::Target {
202        &self.inner
203    }
204}
205
206/// A weak reference to a [`Context`]
207///
208/// Can be used to obtain a [`Context`]. An existing weak reference does not prevent the corresponding [`Context`] from being dropped.
209#[derive(Clone, Debug)]
210pub(crate) struct WeakContext {
211    inner: Weak<InnerContext>,
212}
213
214impl WeakContext {
215    /// Returns the [`Context`] if it is still available.
216    pub(crate) fn upgrade(&self) -> Result<Context> {
217        let inner = self
218            .inner
219            .upgrade()
220            .ok_or_else(|| anyhow::anyhow!("Inner struct has been dropped"))?;
221        Ok(Context { inner })
222    }
223}
224
225/// Actual context, expensive to clone.
226#[derive(Debug)]
227pub struct InnerContext {
228    /// Blob directory path
229    pub(crate) blobdir: PathBuf,
230    pub(crate) sql: Sql,
231    pub(crate) smeared_timestamp: SmearedTimestamp,
232    /// The global "ongoing" process state.
233    ///
234    /// This is a global mutex-like state for operations which should be modal in the
235    /// clients.
236    running_state: RwLock<RunningState>,
237    /// Mutex to enforce only a single running oauth2 is running.
238    pub(crate) oauth2_mutex: Mutex<()>,
239    /// Mutex to prevent a race condition when a "your pw is wrong" warning is sent, resulting in multiple messages being sent.
240    pub(crate) wrong_pw_warning_mutex: Mutex<()>,
241    /// Mutex to prevent running housekeeping from multiple threads at once.
242    pub(crate) housekeeping_mutex: Mutex<()>,
243
244    /// Mutex to prevent multiple IMAP loops from fetching the messages at once.
245    ///
246    /// Without this mutex IMAP loops may waste traffic downloading the same message
247    /// from multiple IMAP servers and create multiple copies of the same message
248    /// in the database if the check for duplicates and creating a message
249    /// happens in separate database transactions.
250    pub(crate) fetch_msgs_mutex: Mutex<()>,
251
252    pub(crate) translated_stockstrings: StockStrings,
253    pub(crate) events: Events,
254
255    pub(crate) scheduler: SchedulerState,
256    pub(crate) ratelimit: RwLock<Ratelimit>,
257
258    /// Recently loaded quota information for each trasnport, if any.
259    /// If quota was never tried to load, then the transport doesn't have an entry in the BTreeMap.
260    pub(crate) quota: RwLock<BTreeMap<u32, QuotaInfo>>,
261
262    /// Notify about new messages.
263    ///
264    /// This causes [`Context::wait_next_msgs`] to wake up.
265    pub(crate) new_msgs_notify: Notify,
266
267    /// Server ID response if ID capability is supported
268    /// and the server returned non-NIL on the inbox connection.
269    /// <https://datatracker.ietf.org/doc/html/rfc2971>
270    pub(crate) server_id: RwLock<Option<HashMap<String, String>>>,
271
272    /// IMAP METADATA.
273    pub(crate) metadata: RwLock<Option<ServerMetadata>>,
274
275    /// ID for this `Context` in the current process.
276    ///
277    /// This allows for multiple `Context`s open in a single process where each context can
278    /// be identified by this ID.
279    pub(crate) id: u32,
280
281    creation_time: tools::Time,
282
283    /// The text of the last error logged and emitted as an event.
284    /// If the ui wants to display an error after a failure,
285    /// `last_error` should be used to avoid races with the event thread.
286    pub(crate) last_error: parking_lot::RwLock<String>,
287
288    /// It's not possible to emit migration errors as an event,
289    /// because at the time of the migration, there is no event emitter yet.
290    /// So, this holds the error that happened during migration, if any.
291    /// This is necessary for the possibly-failible PGP migration,
292    /// which happened 2025-05, and can be removed a few releases later.
293    pub(crate) migration_error: parking_lot::RwLock<Option<String>>,
294
295    /// If debug logging is enabled, this contains all necessary information
296    ///
297    /// Standard RwLock instead of [`tokio::sync::RwLock`] is used
298    /// because the lock is used from synchronous [`Context::emit_event`].
299    pub(crate) debug_logging: std::sync::RwLock<Option<DebugLogging>>,
300
301    /// Push subscriber to store device token
302    /// and register for heartbeat notifications.
303    pub(crate) push_subscriber: PushSubscriber,
304
305    /// True if account has subscribed to push notifications via IMAP.
306    pub(crate) push_subscribed: AtomicBool,
307
308    /// TLS session resumption cache.
309    pub(crate) tls_session_store: TlsSessionStore,
310
311    /// Store for TLS SPKI hashes.
312    ///
313    /// Used to remember public keys
314    /// of TLS certificates to accept them
315    /// even after they expire.
316    pub(crate) spki_hash_store: SpkiHashStore,
317
318    /// Iroh for realtime peer channels.
319    pub(crate) iroh: Arc<RwLock<Option<Iroh>>>,
320
321    /// The own fingerprint, if it was computed already.
322    /// tokio::sync::OnceCell would be possible to use, but overkill for our usecase;
323    /// the standard library's OnceLock is enough, and it's a lot smaller in memory.
324    pub(crate) self_fingerprint: OnceLock<String>,
325
326    /// OpenPGP certificate aka Transferrable Public Key.
327    ///
328    /// It is generated on first use from the secret key stored in the database.
329    ///
330    /// Mutex is also held while generating the key to avoid generating the key twice.
331    pub(crate) self_public_key: Mutex<Option<SignedPublicKey>>,
332
333    /// `Connectivity` values for mailboxes, unordered. Used to compute the aggregate connectivity,
334    /// see [`Context::get_connectivity()`].
335    pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
336
337    #[expect(clippy::type_complexity)]
338    /// Transforms the root of the cryptographic payload before encryption.
339    pub(crate) pre_encrypt_mime_hook: parking_lot::Mutex<
340        Option<
341            for<'a> fn(
342                &Context,
343                mail_builder::mime::MimePart<'a>,
344            ) -> mail_builder::mime::MimePart<'a>,
345        >,
346    >,
347}
348
349/// The state of ongoing process.
350#[derive(Debug, Default)]
351enum RunningState {
352    /// Ongoing process is allocated.
353    Running { cancel_sender: Sender<()> },
354
355    /// Cancel signal has been sent, waiting for ongoing process to be freed.
356    ShallStop { request: tools::Time },
357
358    /// There is no ongoing process, a new one can be allocated.
359    #[default]
360    Stopped,
361}
362
363/// Return some info about deltachat-core
364///
365/// This contains information mostly about the library itself, the
366/// actual keys and their values which will be present are not
367/// guaranteed.  Calling [Context::get_info] also includes information
368/// about the context on top of the information here.
369#[expect(clippy::arithmetic_side_effects)]
370pub fn get_info() -> BTreeMap<&'static str, String> {
371    let mut res = BTreeMap::new();
372
373    #[cfg(debug_assertions)]
374    res.insert(
375        "debug_assertions",
376        "On - DO NOT RELEASE THIS BUILD".to_string(),
377    );
378    #[cfg(not(debug_assertions))]
379    res.insert("debug_assertions", "Off".to_string());
380
381    res.insert("deltachat_core_version", format!("v{DC_VERSION_STR}"));
382    res.insert("sqlite_version", rusqlite::version().to_string());
383    res.insert("arch", (std::mem::size_of::<usize>() * 8).to_string());
384    res.insert("num_cpus", num_cpus::get().to_string());
385    res.insert("level", "awesome".into());
386    res
387}
388
389impl Context {
390    /// Creates new context and opens the database.
391    pub async fn new(
392        dbfile: &Path,
393        id: u32,
394        events: Events,
395        stock_strings: StockStrings,
396    ) -> Result<Context> {
397        let context =
398            Self::new_closed(dbfile, id, events, stock_strings, Default::default()).await?;
399
400        // Open the database if is not encrypted.
401        if context.check_passphrase("".to_string()).await? {
402            context.sql.open(&context, "".to_string()).await?;
403        }
404        Ok(context)
405    }
406
407    /// Creates new context without opening the database.
408    pub async fn new_closed(
409        dbfile: &Path,
410        id: u32,
411        events: Events,
412        stockstrings: StockStrings,
413        push_subscriber: PushSubscriber,
414    ) -> Result<Context> {
415        let mut blob_fname = OsString::new();
416        blob_fname.push(dbfile.file_name().unwrap_or_default());
417        blob_fname.push("-blobs");
418        let blobdir = dbfile.with_file_name(blob_fname);
419        if !blobdir.exists() {
420            tokio::fs::create_dir_all(&blobdir).await?;
421        }
422        let context = Context::with_blobdir(
423            dbfile.into(),
424            blobdir,
425            id,
426            events,
427            stockstrings,
428            push_subscriber,
429        )?;
430        Ok(context)
431    }
432
433    /// Returns a weak reference to this [`Context`].
434    pub(crate) fn get_weak_context(&self) -> WeakContext {
435        WeakContext {
436            inner: Arc::downgrade(&self.inner),
437        }
438    }
439
440    /// Opens the database with the given passphrase.
441    /// NB: Db encryption is deprecated, so `passphrase` should be empty normally. See
442    /// [`ContextBuilder::with_password()`] for reasoning.
443    ///
444    /// Returns true if passphrase is correct, false is passphrase is not correct. Fails on other
445    /// errors.
446    #[deprecated(since = "TBD")]
447    pub async fn open(&self, passphrase: String) -> Result<bool> {
448        if self.sql.check_passphrase(passphrase.clone()).await? {
449            self.sql.open(self, passphrase).await?;
450            Ok(true)
451        } else {
452            Ok(false)
453        }
454    }
455
456    /// Changes encrypted database passphrase.
457    /// Deprecated 2025-11, see [`ContextBuilder::with_password()`] for reasoning.
458    pub async fn change_passphrase(&self, passphrase: String) -> Result<()> {
459        self.sql.change_passphrase(passphrase).await?;
460        Ok(())
461    }
462
463    /// Returns true if database is open.
464    pub async fn is_open(&self) -> bool {
465        self.sql.is_open().await
466    }
467
468    /// Tests the database passphrase.
469    ///
470    /// Returns true if passphrase is correct.
471    ///
472    /// Fails if database is already open.
473    pub(crate) async fn check_passphrase(&self, passphrase: String) -> Result<bool> {
474        self.sql.check_passphrase(passphrase).await
475    }
476
477    pub(crate) fn with_blobdir(
478        dbfile: PathBuf,
479        blobdir: PathBuf,
480        id: u32,
481        events: Events,
482        stockstrings: StockStrings,
483        push_subscriber: PushSubscriber,
484    ) -> Result<Context> {
485        ensure!(
486            blobdir.is_dir(),
487            "Blobdir does not exist: {}",
488            blobdir.display()
489        );
490
491        let new_msgs_notify = Notify::new();
492        // Notify once immediately to allow processing old messages
493        // without starting I/O.
494        new_msgs_notify.notify_one();
495
496        let inner = InnerContext {
497            id,
498            blobdir,
499            running_state: RwLock::new(Default::default()),
500            sql: Sql::new(dbfile),
501            smeared_timestamp: SmearedTimestamp::new(),
502            oauth2_mutex: Mutex::new(()),
503            wrong_pw_warning_mutex: Mutex::new(()),
504            housekeeping_mutex: Mutex::new(()),
505            fetch_msgs_mutex: Mutex::new(()),
506            translated_stockstrings: stockstrings,
507            events,
508            scheduler: SchedulerState::new(),
509            ratelimit: RwLock::new(Ratelimit::new(Duration::new(3, 0), 3.0)), // Allow at least 1 message every second + a burst of 3.
510            quota: RwLock::new(BTreeMap::new()),
511            new_msgs_notify,
512            server_id: RwLock::new(None),
513            metadata: RwLock::new(None),
514            creation_time: tools::Time::now(),
515            last_error: parking_lot::RwLock::new("".to_string()),
516            migration_error: parking_lot::RwLock::new(None),
517            debug_logging: std::sync::RwLock::new(None),
518            push_subscriber,
519            push_subscribed: AtomicBool::new(false),
520            tls_session_store: TlsSessionStore::new(),
521            spki_hash_store: SpkiHashStore::new(),
522            iroh: Arc::new(RwLock::new(None)),
523            self_fingerprint: OnceLock::new(),
524            self_public_key: Mutex::new(None),
525            connectivities: parking_lot::Mutex::new(Vec::new()),
526            pre_encrypt_mime_hook: None.into(),
527        };
528
529        let ctx = Context {
530            inner: Arc::new(inner),
531        };
532
533        Ok(ctx)
534    }
535
536    /// Starts the IO scheduler.
537    pub async fn start_io(&self) {
538        if !self.is_configured().await.unwrap_or_default() {
539            warn!(self, "can not start io on a context that is not configured");
540            return;
541        }
542
543        // The next line is mainly for iOS:
544        // iOS starts a separate process for receiving notifications and if the user concurrently
545        // starts the app, the UI process opens the database but waits with calling start_io()
546        // until the notifications process finishes.
547        // Now, some configs may have changed, so, we need to invalidate the cache.
548        self.sql.config_cache.write().await.clear();
549
550        self.scheduler.start(self).await;
551    }
552
553    /// Stops the IO scheduler.
554    pub async fn stop_io(&self) {
555        self.scheduler.stop(self).await;
556        if let Some(iroh) = self.iroh.write().await.take() {
557            // Close all QUIC connections.
558
559            // Spawn into a separate task,
560            // because Iroh calls `wait_idle()` internally
561            // and it may take time, especially if the network
562            // has become unavailable.
563            tokio::spawn(async move {
564                // We do not log the error because we do not want the task
565                // to hold the reference to Context.
566                let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
567            });
568        }
569    }
570
571    /// Restarts the IO scheduler if it was running before
572    /// when it is not running this is an no-op
573    pub async fn restart_io_if_running(&self) {
574        self.scheduler.restart(self).await;
575    }
576
577    /// Indicate that the network likely has come back.
578    pub async fn maybe_network(&self) {
579        if let Some(ref iroh) = *self.iroh.read().await {
580            iroh.network_change().await;
581        }
582        self.scheduler.maybe_network().await;
583    }
584
585    /// Returns true if an account is on a chatmail server.
586    pub async fn is_chatmail(&self) -> Result<bool> {
587        self.get_config_bool(Config::IsChatmail).await
588    }
589
590    /// Returns maximum number of recipients the provider allows to send a single email to.
591    pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
592        let is_chatmail = self.is_chatmail().await?;
593        let val = self
594            .get_configured_provider()
595            .await?
596            .and_then(|provider| provider.opt.max_smtp_rcpt_to)
597            .map_or_else(
598                || match is_chatmail {
599                    true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
600                    false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
601                },
602                usize::from,
603            );
604        Ok(val)
605    }
606
607    /// Does a single round of fetching from IMAP and returns.
608    ///
609    /// Can be used even if I/O is currently stopped.
610    /// If I/O is currently stopped, starts a new IMAP connection
611    /// and fetches from Inbox and DeltaChat folders.
612    pub async fn background_fetch(&self) -> Result<()> {
613        if !(self.is_configured().await?) {
614            return Ok(());
615        }
616
617        let address = self.get_primary_self_addr().await?;
618        let time_start = tools::Time::now();
619        info!(self, "background_fetch started fetching {address}.");
620
621        if self.scheduler.is_running().await {
622            self.scheduler.maybe_network().await;
623            self.wait_for_all_work_done().await;
624        } else {
625            // Pause the scheduler to ensure another connection does not start
626            // while we are fetching on a dedicated connection.
627            let _pause_guard = self.scheduler.pause(self).await?;
628
629            // Start a new dedicated connection.
630            let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
631            let mut session = connection.prepare(self).await?;
632
633            // Fetch IMAP folders.
634            let folder = connection.folder.clone();
635            connection
636                .fetch_move_delete(self, &mut session, &folder)
637                .await?;
638
639            // Update quota (to send warning if full) - but only check it once in a while.
640            // note: For now this only checks quota of primary transport,
641            // because background check only checks primary transport at the moment
642            if self
643                .quota_needs_update(
644                    session.transport_id(),
645                    DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT,
646                )
647                .await
648                && let Err(err) = self.update_recent_quota(&mut session, &folder).await
649            {
650                warn!(self, "Failed to update quota: {err:#}.");
651            }
652        }
653
654        info!(
655            self,
656            "background_fetch done for {address} took {:?}.",
657            time_elapsed(&time_start),
658        );
659
660        Ok(())
661    }
662
663    /// Returns a reference to the underlying SQL instance.
664    ///
665    /// Warning: this is only here for testing, not part of the public API.
666    #[cfg(feature = "internals")]
667    pub fn sql(&self) -> &Sql {
668        &self.inner.sql
669    }
670
671    /// Returns database file path.
672    pub fn get_dbfile(&self) -> &Path {
673        self.sql.dbfile.as_path()
674    }
675
676    /// Returns blob directory path.
677    pub fn get_blobdir(&self) -> &Path {
678        self.blobdir.as_path()
679    }
680
681    /// Emits a single event.
682    pub fn emit_event(&self, event: EventType) {
683        {
684            let lock = self.debug_logging.read().expect("RwLock is poisoned");
685            if let Some(debug_logging) = &*lock {
686                debug_logging.log_event(event.clone());
687            }
688        }
689        self.events.emit(Event {
690            id: self.id,
691            typ: event,
692        });
693    }
694
695    /// Emits a generic MsgsChanged event (without chat or message id)
696    pub fn emit_msgs_changed_without_ids(&self) {
697        self.emit_event(EventType::MsgsChanged {
698            chat_id: ChatId::new(0),
699            msg_id: MsgId::new(0),
700        });
701    }
702
703    /// Emits a MsgsChanged event with specified chat and message ids
704    ///
705    /// If IDs are unset, [`Self::emit_msgs_changed_without_ids`]
706    /// or [`Self::emit_msgs_changed_without_msg_id`] should be used
707    /// instead of this function.
708    pub fn emit_msgs_changed(&self, chat_id: ChatId, msg_id: MsgId) {
709        logged_debug_assert!(
710            self,
711            !chat_id.is_unset(),
712            "emit_msgs_changed: chat_id is unset."
713        );
714        logged_debug_assert!(
715            self,
716            !msg_id.is_unset(),
717            "emit_msgs_changed: msg_id is unset."
718        );
719
720        self.emit_event(EventType::MsgsChanged { chat_id, msg_id });
721        chatlist_events::emit_chatlist_changed(self);
722        chatlist_events::emit_chatlist_item_changed(self, chat_id);
723    }
724
725    /// Emits a MsgsChanged event with specified chat and without message id.
726    pub fn emit_msgs_changed_without_msg_id(&self, chat_id: ChatId) {
727        logged_debug_assert!(
728            self,
729            !chat_id.is_unset(),
730            "emit_msgs_changed_without_msg_id: chat_id is unset."
731        );
732
733        self.emit_event(EventType::MsgsChanged {
734            chat_id,
735            msg_id: MsgId::new(0),
736        });
737        chatlist_events::emit_chatlist_changed(self);
738        chatlist_events::emit_chatlist_item_changed(self, chat_id);
739    }
740
741    /// Emits an IncomingMsg event with specified chat and message ids
742    pub fn emit_incoming_msg(&self, chat_id: ChatId, msg_id: MsgId) {
743        debug_assert!(!chat_id.is_unset());
744        debug_assert!(!msg_id.is_unset());
745
746        self.emit_event(EventType::IncomingMsg { chat_id, msg_id });
747        chatlist_events::emit_chatlist_changed(self);
748        chatlist_events::emit_chatlist_item_changed(self, chat_id);
749    }
750
751    /// Emits an LocationChanged event and a WebxdcStatusUpdate in case there is a maps integration
752    pub async fn emit_location_changed(&self, contact_id: Option<ContactId>) -> Result<()> {
753        self.emit_event(EventType::LocationChanged(contact_id));
754
755        if let Some(msg_id) = self
756            .get_config_parsed::<u32>(Config::WebxdcIntegration)
757            .await?
758        {
759            self.emit_event(EventType::WebxdcStatusUpdate {
760                msg_id: MsgId::new(msg_id),
761                status_update_serial: Default::default(),
762            })
763        }
764
765        Ok(())
766    }
767
768    /// Returns a receiver for emitted events.
769    ///
770    /// Multiple emitters can be created, but note that in this case each emitted event will
771    /// only be received by one of the emitters, not by all of them.
772    pub fn get_event_emitter(&self) -> EventEmitter {
773        self.events.get_emitter()
774    }
775
776    /// Get the ID of this context.
777    pub fn get_id(&self) -> u32 {
778        self.id
779    }
780
781    // Ongoing process allocation/free/check
782
783    /// Tries to acquire the global UI "ongoing" mutex.
784    ///
785    /// This is for modal operations during which no other user actions are allowed.  Only
786    /// one such operation is allowed at any given time.
787    ///
788    /// The return value is a cancel token, which will release the ongoing mutex when
789    /// dropped.
790    pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
791        let mut s = self.running_state.write().await;
792        ensure!(
793            matches!(*s, RunningState::Stopped),
794            "There is already another ongoing process running."
795        );
796
797        let (sender, receiver) = channel::bounded(1);
798        *s = RunningState::Running {
799            cancel_sender: sender,
800        };
801
802        Ok(receiver)
803    }
804
805    pub(crate) async fn free_ongoing(&self) {
806        let mut s = self.running_state.write().await;
807        if let RunningState::ShallStop { request } = *s {
808            info!(self, "Ongoing stopped in {:?}", time_elapsed(&request));
809        }
810        *s = RunningState::Stopped;
811    }
812
813    /// Signal an ongoing process to stop.
814    pub async fn stop_ongoing(&self) {
815        let mut s = self.running_state.write().await;
816        match &*s {
817            RunningState::Running { cancel_sender } => {
818                if let Err(err) = cancel_sender.send(()).await {
819                    warn!(self, "could not cancel ongoing: {:#}", err);
820                }
821                info!(self, "Signaling the ongoing process to stop ASAP.",);
822                *s = RunningState::ShallStop {
823                    request: tools::Time::now(),
824                };
825            }
826            RunningState::ShallStop { .. } | RunningState::Stopped => {
827                info!(self, "No ongoing process to stop.",);
828            }
829        }
830    }
831
832    #[allow(unused)]
833    pub(crate) async fn shall_stop_ongoing(&self) -> bool {
834        match &*self.running_state.read().await {
835            RunningState::Running { .. } => false,
836            RunningState::ShallStop { .. } | RunningState::Stopped => true,
837        }
838    }
839
840    /*******************************************************************************
841     * UI chat/message related API
842     ******************************************************************************/
843
844    /// Returns information about the context as key-value pairs.
845    pub async fn get_info(&self) -> Result<BTreeMap<&'static str, String>> {
846        let all_self_addrs = self.get_all_self_addrs().await?.join(", ");
847        let all_transports: Vec<String> = ConfiguredLoginParam::load_all(self)
848            .await?
849            .into_iter()
850            .map(|(transport_id, param)| format!("{transport_id}: {param}"))
851            .collect();
852        let all_transports = if all_transports.is_empty() {
853            "Not configured".to_string()
854        } else {
855            all_transports.join(",")
856        };
857        let chats = get_chat_cnt(self).await?;
858        let unblocked_msgs = message::get_unblocked_msg_cnt(self).await;
859        let request_msgs = message::get_request_msg_cnt(self).await;
860        let contacts = Contact::get_real_cnt(self).await?;
861        let proxy_enabled = self.get_config_int(Config::ProxyEnabled).await?;
862        let dbversion = self
863            .sql
864            .get_raw_config_int("dbversion")
865            .await?
866            .unwrap_or_default();
867        let journal_mode = self
868            .sql
869            .query_get_value("PRAGMA journal_mode;", ())
870            .await?
871            .unwrap_or_else(|| "unknown".to_string());
872        let mdns_enabled = self.get_config_int(Config::MdnsEnabled).await?;
873        let bcc_self = self.get_config_int(Config::BccSelf).await?;
874        let sync_msgs = self.get_config_int(Config::SyncMsgs).await?;
875        let disable_idle = self.get_config_bool(Config::DisableIdle).await?;
876
877        let prv_key_cnt = self.sql.count("SELECT COUNT(*) FROM keypairs;", ()).await?;
878
879        let pub_key_cnt = self
880            .sql
881            .count("SELECT COUNT(*) FROM public_keys;", ())
882            .await?;
883        let fingerprint_str = match self_fingerprint(self).await {
884            Ok(fp) => fp.to_string(),
885            Err(err) => format!("<key failure: {err}>"),
886        };
887
888        let mut res = get_info();
889
890        // insert values
891        res.insert("bot", self.get_config_int(Config::Bot).await?.to_string());
892        res.insert("number_of_chats", chats.to_string());
893        res.insert("number_of_chat_messages", unblocked_msgs.to_string());
894        res.insert("messages_in_contact_requests", request_msgs.to_string());
895        res.insert("number_of_contacts", contacts.to_string());
896        res.insert("database_dir", self.get_dbfile().display().to_string());
897        res.insert("database_version", dbversion.to_string());
898        res.insert(
899            "database_encrypted",
900            self.sql
901                .is_encrypted()
902                .await
903                .map_or_else(|| "closed".to_string(), |b| b.to_string()),
904        );
905        res.insert("journal_mode", journal_mode);
906        res.insert("blobdir", self.get_blobdir().display().to_string());
907        res.insert(
908            "selfavatar",
909            self.get_config(Config::Selfavatar)
910                .await?
911                .unwrap_or_else(|| "<unset>".to_string()),
912        );
913        res.insert("proxy_enabled", proxy_enabled.to_string());
914        res.insert("used_transport_settings", all_transports);
915
916        if let Some(server_id) = &*self.server_id.read().await {
917            res.insert("imap_server_id", format!("{server_id:?}"));
918        }
919
920        res.insert("is_chatmail", self.is_chatmail().await?.to_string());
921        res.insert(
922            "fix_is_chatmail",
923            self.get_config_bool(Config::FixIsChatmail)
924                .await?
925                .to_string(),
926        );
927        res.insert(
928            "is_muted",
929            self.get_config_bool(Config::IsMuted).await?.to_string(),
930        );
931        res.insert(
932            "private_tag",
933            self.get_config(Config::PrivateTag)
934                .await?
935                .unwrap_or_else(|| "<unset>".to_string()),
936        );
937
938        if let Some(metadata) = &*self.metadata.read().await {
939            if let Some(comment) = &metadata.comment {
940                res.insert("imap_server_comment", format!("{comment:?}"));
941            }
942
943            if let Some(admin) = &metadata.admin {
944                res.insert("imap_server_admin", format!("{admin:?}"));
945            }
946        }
947
948        res.insert("all_self_addrs", all_self_addrs);
949        res.insert(
950            "who_can_call_me",
951            self.get_config_int(Config::WhoCanCallMe).await?.to_string(),
952        );
953        res.insert(
954            "download_limit",
955            self.get_config_int(Config::DownloadLimit)
956                .await?
957                .to_string(),
958        );
959        res.insert("mdns_enabled", mdns_enabled.to_string());
960        res.insert("bcc_self", bcc_self.to_string());
961        res.insert("sync_msgs", sync_msgs.to_string());
962        res.insert("disable_idle", disable_idle.to_string());
963        res.insert("private_key_count", prv_key_cnt.to_string());
964        res.insert("public_key_count", pub_key_cnt.to_string());
965        res.insert("fingerprint", fingerprint_str);
966        res.insert(
967            "media_quality",
968            self.get_config_int(Config::MediaQuality).await?.to_string(),
969        );
970        res.insert(
971            "delete_device_after",
972            self.get_config_int(Config::DeleteDeviceAfter)
973                .await?
974                .to_string(),
975        );
976        res.insert(
977            "delete_server_after",
978            self.get_config_int(Config::DeleteServerAfter)
979                .await?
980                .to_string(),
981        );
982        res.insert(
983            "last_housekeeping",
984            self.get_config_int(Config::LastHousekeeping)
985                .await?
986                .to_string(),
987        );
988        res.insert(
989            "last_cant_decrypt_outgoing_msgs",
990            self.get_config_int(Config::LastCantDecryptOutgoingMsgs)
991                .await?
992                .to_string(),
993        );
994        res.insert(
995            "sign_unencrypted",
996            self.get_config_int(Config::SignUnencrypted)
997                .await?
998                .to_string(),
999        );
1000        res.insert(
1001            "debug_logging",
1002            self.get_config_int(Config::DebugLogging).await?.to_string(),
1003        );
1004        res.insert(
1005            "last_msg_id",
1006            self.get_config_int(Config::LastMsgId).await?.to_string(),
1007        );
1008        res.insert(
1009            "gossip_period",
1010            self.get_config_int(Config::GossipPeriod).await?.to_string(),
1011        );
1012        res.insert(
1013            "webxdc_realtime_enabled",
1014            self.get_config_bool(Config::WebxdcRealtimeEnabled)
1015                .await?
1016                .to_string(),
1017        );
1018        res.insert(
1019            "donation_request_next_check",
1020            self.get_config_i64(Config::DonationRequestNextCheck)
1021                .await?
1022                .to_string(),
1023        );
1024        res.insert(
1025            "first_key_contacts_msg_id",
1026            self.sql
1027                .get_raw_config("first_key_contacts_msg_id")
1028                .await?
1029                .unwrap_or_default(),
1030        );
1031        res.insert(
1032            "stats_id",
1033            self.get_config(Config::StatsId)
1034                .await?
1035                .unwrap_or_else(|| "<unset>".to_string()),
1036        );
1037        res.insert(
1038            "stats_sending",
1039            stats::should_send_stats(self).await?.to_string(),
1040        );
1041        res.insert(
1042            "stats_last_sent",
1043            self.get_config_i64(Config::StatsLastSent)
1044                .await?
1045                .to_string(),
1046        );
1047        res.insert(
1048            "test_hooks",
1049            self.sql
1050                .get_raw_config("test_hooks")
1051                .await?
1052                .unwrap_or_default(),
1053        );
1054        res.insert(
1055            "std_header_protection_composing",
1056            self.sql
1057                .get_raw_config("std_header_protection_composing")
1058                .await?
1059                .unwrap_or_default(),
1060        );
1061        res.insert(
1062            "team_profile",
1063            self.get_config_bool(Config::TeamProfile).await?.to_string(),
1064        );
1065
1066        let elapsed = time_elapsed(&self.creation_time);
1067        res.insert("uptime", duration_to_str(elapsed));
1068
1069        Ok(res)
1070    }
1071
1072    /// Get a list of fresh, unmuted messages in unblocked chats.
1073    ///
1074    /// The list starts with the most recent message
1075    /// and is typically used to show notifications.
1076    /// Moreover, the number of returned messages
1077    /// can be used for a badge counter on the app icon.
1078    pub async fn get_fresh_msgs(&self) -> Result<Vec<MsgId>> {
1079        let list = self
1080            .sql
1081            .query_map_vec(
1082                "SELECT m.id
1083FROM msgs m
1084LEFT JOIN contacts ct
1085    ON m.from_id=ct.id
1086LEFT JOIN chats c
1087    ON m.chat_id=c.id
1088WHERE m.state=?
1089AND m.hidden=0
1090AND m.chat_id>9
1091AND ct.blocked=0
1092AND c.blocked=0
1093AND NOT(c.muted_until=-1 OR c.muted_until>?)
1094ORDER BY m.timestamp DESC,m.id DESC",
1095                (MessageState::InFresh, time()),
1096                |row| {
1097                    let msg_id: MsgId = row.get(0)?;
1098                    Ok(msg_id)
1099                },
1100            )
1101            .await?;
1102        Ok(list)
1103    }
1104
1105    /// (deprecated) Returns a list of messages with database ID higher than requested.
1106    ///
1107    /// Blocked contacts and chats are excluded,
1108    /// but self-sent messages and contact requests are included in the results.
1109    ///
1110    /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives,
1111    /// even if it is not fully downloaded yet.
1112    /// The bot needs to wait for the message to be fully downloaded.
1113    /// Since this is usually not the desired behavior,
1114    /// bots should instead use the [`EventType::IncomingMsg`]
1115    /// event for getting notified about new messages.
1116    pub async fn get_next_msgs(&self) -> Result<Vec<MsgId>> {
1117        let last_msg_id = match self.get_config(Config::LastMsgId).await? {
1118            Some(s) => MsgId::new(s.parse()?),
1119            None => {
1120                // If `last_msg_id` is not set yet,
1121                // subtract 1 from the last id,
1122                // so a single message is returned and can
1123                // be marked as seen.
1124                self.sql
1125                    .query_row(
1126                        "SELECT IFNULL((SELECT MAX(id) - 1 FROM msgs), 0)",
1127                        (),
1128                        |row| {
1129                            let msg_id: MsgId = row.get(0)?;
1130                            Ok(msg_id)
1131                        },
1132                    )
1133                    .await?
1134            }
1135        };
1136
1137        let list = self
1138            .sql
1139            .query_map_vec(
1140                "SELECT m.id
1141                     FROM msgs m
1142                     LEFT JOIN contacts ct
1143                            ON m.from_id=ct.id
1144                     LEFT JOIN chats c
1145                            ON m.chat_id=c.id
1146                     WHERE m.id>?
1147                       AND m.hidden=0
1148                       AND m.chat_id>9
1149                       AND ct.blocked=0
1150                       AND c.blocked!=1
1151                     ORDER BY m.id ASC",
1152                (
1153                    last_msg_id.to_u32(), // Explicitly convert to u32 because 0 is allowed.
1154                ),
1155                |row| {
1156                    let msg_id: MsgId = row.get(0)?;
1157                    Ok(msg_id)
1158                },
1159            )
1160            .await?;
1161        Ok(list)
1162    }
1163
1164    /// (deprecated) Returns a list of messages with database ID higher than last marked as seen.
1165    ///
1166    /// This function is supposed to be used by bot to request messages
1167    /// that are not processed yet.
1168    ///
1169    /// Waits for notification and returns a result.
1170    /// Note that the result may be empty if the message is deleted
1171    /// shortly after notification or notification is manually triggered
1172    /// to interrupt waiting.
1173    /// Notification may be manually triggered by calling [`Self::stop_io`].
1174    ///
1175    /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives,
1176    /// even if it is not fully downloaded yet.
1177    /// The bot needs to wait for the message to be fully downloaded.
1178    /// Since this is usually not the desired behavior,
1179    /// bots should instead use the #DC_EVENT_INCOMING_MSG / [`EventType::IncomingMsg`]
1180    /// event for getting notified about new messages.
1181    pub async fn wait_next_msgs(&self) -> Result<Vec<MsgId>> {
1182        self.new_msgs_notify.notified().await;
1183        let list = self.get_next_msgs().await?;
1184        Ok(list)
1185    }
1186
1187    /// Searches for messages containing the query string case-insensitively.
1188    ///
1189    /// If `chat_id` is provided this searches only for messages in this chat, if `chat_id`
1190    /// is `None` this searches messages from all chats.
1191    ///
1192    /// NB: Wrt the search in long messages which are shown truncated with the "Show Full Message…"
1193    /// button, we only look at the first several kilobytes. Let's not fix this -- one can send a
1194    /// dictionary in the message that matches any reasonable search request, but the user won't see
1195    /// the match because they should tap on "Show Full Message…" for that. Probably such messages
1196    /// would only clutter search results.
1197    pub async fn search_msgs(&self, chat_id: Option<ChatId>, query: &str) -> Result<Vec<MsgId>> {
1198        let real_query = query.trim().to_lowercase();
1199        if real_query.is_empty() {
1200            return Ok(Vec::new());
1201        }
1202        let str_like_in_text = format!("%{real_query}%");
1203
1204        let list = if let Some(chat_id) = chat_id {
1205            self.sql
1206                .query_map_vec(
1207                    "SELECT m.id AS id
1208                 FROM msgs m
1209                 LEFT JOIN contacts ct
1210                        ON m.from_id=ct.id
1211                 WHERE m.chat_id=?
1212                   AND m.hidden=0
1213                   AND ct.blocked=0
1214                   AND IFNULL(txt_normalized, txt) LIKE ?
1215                 ORDER BY m.timestamp,m.id;",
1216                    (chat_id, str_like_in_text),
1217                    |row| {
1218                        let msg_id: MsgId = row.get("id")?;
1219                        Ok(msg_id)
1220                    },
1221                )
1222                .await?
1223        } else {
1224            // For performance reasons results are sorted only by `id`, that is in the order of
1225            // message reception.
1226            //
1227            // Unlike chat view, sorting by `timestamp` is not necessary but slows down the query by
1228            // ~25% according to benchmarks.
1229            //
1230            // To speed up incremental search, where queries for few characters usually return lots
1231            // of unwanted results that are discarded moments later, we added `LIMIT 1000`.
1232            // According to some tests, this limit speeds up eg. 2 character searches by factor 10.
1233            // The limit is documented and UI may add a hint when getting 1000 results.
1234            self.sql
1235                .query_map_vec(
1236                    "SELECT m.id AS id
1237                 FROM msgs m
1238                 LEFT JOIN contacts ct
1239                        ON m.from_id=ct.id
1240                 LEFT JOIN chats c
1241                        ON m.chat_id=c.id
1242                 WHERE m.chat_id>9
1243                   AND m.hidden=0
1244                   AND c.blocked!=1
1245                   AND ct.blocked=0
1246                   AND IFNULL(txt_normalized, txt) LIKE ?
1247                 ORDER BY m.id DESC LIMIT 1000",
1248                    (str_like_in_text,),
1249                    |row| {
1250                        let msg_id: MsgId = row.get("id")?;
1251                        Ok(msg_id)
1252                    },
1253                )
1254                .await?
1255        };
1256
1257        Ok(list)
1258    }
1259
1260    pub(crate) fn derive_blobdir(dbfile: &Path) -> PathBuf {
1261        let mut blob_fname = OsString::new();
1262        blob_fname.push(dbfile.file_name().unwrap_or_default());
1263        blob_fname.push("-blobs");
1264        dbfile.with_file_name(blob_fname)
1265    }
1266
1267    pub(crate) fn derive_walfile(dbfile: &Path) -> PathBuf {
1268        let mut wal_fname = OsString::new();
1269        wal_fname.push(dbfile.file_name().unwrap_or_default());
1270        wal_fname.push("-wal");
1271        dbfile.with_file_name(wal_fname)
1272    }
1273}
1274
1275#[cfg(test)]
1276mod context_tests;