Skip to main content

deltachat/
imap.rs

1//! # IMAP handling module.
2//!
3//! uses [async-email/async-imap](https://github.com/async-email/async-imap)
4//! to implement connect, fetch, delete functionality with standard IMAP servers.
5
6use std::{
7    cmp::max,
8    cmp::min,
9    collections::{BTreeMap, HashMap},
10    iter::Peekable,
11    mem::take,
12    sync::atomic::Ordering,
13    time::{Duration, UNIX_EPOCH},
14};
15
16use anyhow::{Context as _, Result, bail, ensure, format_err};
17use async_channel::{self, Receiver, Sender};
18use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
19use futures::{FutureExt as _, TryStreamExt};
20use futures_lite::FutureExt;
21use ratelimit::Ratelimit;
22use url::Url;
23
24use crate::chat::{self, ChatIdBlocked, add_device_msg};
25use crate::config::Config;
26use crate::constants::{Blocked, DC_VERSION_STR};
27use crate::contact::ContactId;
28use crate::context::Context;
29use crate::ensure_and_debug_assert;
30use crate::events::EventType;
31use crate::headerdef::{HeaderDef, HeaderDefMap};
32use crate::log::{LogExt, warn};
33use crate::message::{self, Message};
34use crate::mimeparser;
35use crate::net::proxy::ProxyConfig;
36use crate::net::session::SessionStream;
37use crate::oauth2::get_oauth2_access_token;
38use crate::push::encrypt_device_token;
39use crate::receive_imf::{
40    ReceivedMsg, from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner,
41};
42use crate::scheduler::connectivity::ConnectivityStore;
43use crate::stock_str;
44use crate::tools::{self, create_id, duration_to_str, time};
45use crate::transport::{
46    ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
47};
48use crate::{
49    calls::{UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata},
50    ephemeral::delete_expired_imap_messages,
51};
52
53pub(crate) mod capabilities;
54mod client;
55mod idle;
56pub mod select_folder;
57pub(crate) mod session;
58
59use client::{Client, determine_capabilities};
60use session::Session;
61
62pub(crate) const GENERATED_PREFIX: &str = "GEN_";
63
64const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
65                             MESSAGE-ID \
66                             X-MICROSOFT-ORIGINAL-MESSAGE-ID\
67                             )])";
68const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
69
70#[derive(Debug)]
71pub(crate) struct Imap {
72    /// ID of the transport configuration in the `transports` table.
73    ///
74    /// This ID is used to namespace records in the `imap` table.
75    transport_id: u32,
76
77    pub(crate) idle_interrupt_receiver: Receiver<()>,
78
79    /// Email address.
80    pub(crate) addr: String,
81
82    /// Login parameters.
83    lp: Vec<ConfiguredServerLoginParam>,
84
85    /// Password.
86    password: String,
87
88    /// Proxy configuration.
89    proxy_config: Option<ProxyConfig>,
90
91    strict_tls: bool,
92
93    oauth2: bool,
94
95    /// Watched folder.
96    pub(crate) folder: String,
97
98    authentication_failed_once: bool,
99
100    pub(crate) connectivity: ConnectivityStore,
101
102    conn_last_try: tools::Time,
103    conn_backoff_ms: u64,
104
105    /// Rate limit for successful IMAP connections.
106    ///
107    /// This rate limit prevents busy loop in case the server refuses logins
108    /// or in case connection gets dropped over and over due to IMAP bug,
109    /// e.g. the server returning invalid response to SELECT command
110    /// immediately after logging in or returning an error in response to LOGIN command
111    /// due to internal server error.
112    ratelimit: Ratelimit,
113
114    /// IMAP UID resync request sender.
115    pub(crate) resync_request_sender: async_channel::Sender<()>,
116
117    /// IMAP UID resync request receiver.
118    pub(crate) resync_request_receiver: async_channel::Receiver<()>,
119}
120
121#[derive(Debug)]
122struct OAuth2 {
123    user: String,
124    access_token: String,
125}
126
127#[derive(Debug, Default)]
128pub(crate) struct ServerMetadata {
129    /// IMAP METADATA `/shared/comment` as defined in
130    /// <https://www.rfc-editor.org/rfc/rfc5464#section-6.2.1>.
131    pub comment: Option<String>,
132
133    /// IMAP METADATA `/shared/admin` as defined in
134    /// <https://www.rfc-editor.org/rfc/rfc5464#section-6.2.2>.
135    pub admin: Option<String>,
136
137    pub iroh_relay: Option<Url>,
138
139    /// ICE servers for WebRTC calls.
140    pub ice_servers: Vec<UnresolvedIceServer>,
141
142    /// Timestamp when ICE servers are considered
143    /// expired and should be updated.
144    ///
145    /// If ICE servers are about to expire, new TURN credentials
146    /// should be fetched from the server
147    /// to be ready for WebRTC calls.
148    pub ice_servers_expiration_timestamp: i64,
149}
150
151impl async_imap::Authenticator for OAuth2 {
152    type Response = String;
153
154    fn process(&mut self, _data: &[u8]) -> Self::Response {
155        format!(
156            "user={}\x01auth=Bearer {}\x01\x01",
157            self.user, self.access_token
158        )
159    }
160}
161
162#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
163pub enum FolderMeaning {
164    Unknown,
165
166    /// Spam folder.
167    Spam,
168    Inbox,
169    Trash,
170
171    /// Virtual folders.
172    ///
173    /// On Gmail there are virtual folders marked as \\All, \\Important and \\Flagged.
174    /// Delta Chat ignores these folders because the same messages can be fetched
175    /// from the real folder and the result of moving and deleting messages via
176    /// virtual folder is unclear.
177    Virtual,
178}
179
180struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
181    inner: Peekable<T>,
182}
183
184impl<T, I> From<I> for UidGrouper<T>
185where
186    T: Iterator<Item = (i64, u32, String)>,
187    I: IntoIterator<IntoIter = T>,
188{
189    fn from(inner: I) -> Self {
190        Self {
191            inner: inner.into_iter().peekable(),
192        }
193    }
194}
195
196impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
197    // Tuple of folder, row IDs, and UID range as a string.
198    type Item = (String, Vec<i64>, String);
199
200    #[expect(clippy::arithmetic_side_effects)]
201    fn next(&mut self) -> Option<Self::Item> {
202        let (_, _, folder) = self.inner.peek().cloned()?;
203
204        let mut uid_set = String::new();
205        let mut rowid_set = Vec::new();
206
207        while uid_set.len() < 1000 {
208            // Construct a new range.
209            if let Some((start_rowid, start_uid, _)) = self
210                .inner
211                .next_if(|(_, _, start_folder)| start_folder == &folder)
212            {
213                rowid_set.push(start_rowid);
214                let mut end_uid = start_uid;
215
216                while let Some((next_rowid, next_uid, _)) =
217                    self.inner.next_if(|(_, next_uid, next_folder)| {
218                        next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
219                    })
220                {
221                    end_uid = next_uid;
222                    rowid_set.push(next_rowid);
223                }
224
225                let uid_range = UidRange {
226                    start: start_uid,
227                    end: end_uid,
228                };
229                if !uid_set.is_empty() {
230                    uid_set.push(',');
231                }
232                uid_set.push_str(&uid_range.to_string());
233            } else {
234                break;
235            }
236        }
237
238        Some((folder, rowid_set, uid_set))
239    }
240}
241
242impl Imap {
243    /// Creates new disconnected IMAP client using the specific login parameters.
244    pub async fn new(
245        context: &Context,
246        transport_id: u32,
247        param: ConfiguredLoginParam,
248        idle_interrupt_receiver: Receiver<()>,
249    ) -> Result<Self> {
250        let lp = param.imap.clone();
251        let password = param.imap_password.clone();
252        let proxy_config = ProxyConfig::load(context).await?;
253        let addr = &param.addr;
254        let strict_tls = param.strict_tls(proxy_config.is_some());
255        let oauth2 = param.oauth2;
256        let folder = param
257            .imap_folder
258            .clone()
259            .unwrap_or_else(|| "INBOX".to_string());
260        ensure_and_debug_assert!(!folder.is_empty(), "Watched folder name cannot be empty");
261        let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
262        Ok(Imap {
263            transport_id,
264            idle_interrupt_receiver,
265            addr: addr.to_string(),
266            lp,
267            password,
268            proxy_config,
269            strict_tls,
270            oauth2,
271            folder,
272            authentication_failed_once: false,
273            connectivity: Default::default(),
274            conn_last_try: UNIX_EPOCH,
275            conn_backoff_ms: 0,
276            // 1 connection per minute + a burst of 2.
277            ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
278            resync_request_sender,
279            resync_request_receiver,
280        })
281    }
282
283    /// Creates new disconnected IMAP client using configured parameters.
284    pub async fn new_configured(
285        context: &Context,
286        idle_interrupt_receiver: Receiver<()>,
287    ) -> Result<Self> {
288        let (transport_id, param) = ConfiguredLoginParam::load(context)
289            .await?
290            .context("Not configured")?;
291        let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
292        Ok(imap)
293    }
294
295    /// Returns transport ID of the IMAP client.
296    pub fn transport_id(&self) -> u32 {
297        self.transport_id
298    }
299
300    /// Connects to IMAP server and returns a new IMAP session.
301    ///
302    /// Calling this function is not enough to perform IMAP operations. Use [`Imap::prepare`]
303    /// instead if you are going to actually use connection rather than trying connection
304    /// parameters.
305    pub(crate) async fn connect(
306        &mut self,
307        context: &Context,
308        configuring: bool,
309    ) -> Result<Session> {
310        let now = tools::Time::now();
311        let until_can_send = max(
312            min(self.conn_last_try, now)
313                .checked_add(Duration::from_millis(self.conn_backoff_ms))
314                .unwrap_or(now),
315            now,
316        )
317        .duration_since(now)?;
318        let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
319        if !ratelimit_duration.is_zero() {
320            warn!(
321                context,
322                "IMAP got rate limited, waiting for {} until can connect.",
323                duration_to_str(ratelimit_duration),
324            );
325            let interrupted = async {
326                tokio::time::sleep(ratelimit_duration).await;
327                false
328            }
329            .race(self.idle_interrupt_receiver.recv().map(|_| true))
330            .await;
331            if interrupted {
332                info!(
333                    context,
334                    "Connecting to IMAP without waiting for ratelimit due to interrupt."
335                );
336            }
337        }
338
339        info!(context, "Connecting to IMAP server.");
340        self.connectivity.set_connecting(context);
341
342        self.conn_last_try = tools::Time::now();
343        const BACKOFF_MIN_MS: u64 = 2000;
344        const BACKOFF_MAX_MS: u64 = 80_000;
345        self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
346        self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(rand::random_range(
347            (self.conn_backoff_ms / 2)..=self.conn_backoff_ms,
348        ));
349        self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
350
351        let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
352        let mut first_error = None;
353        for lp in login_params {
354            info!(context, "IMAP trying to connect to {}.", &lp.connection);
355            let connection_candidate = lp.connection.clone();
356            let client = match Client::connect(
357                context,
358                self.proxy_config.clone(),
359                self.strict_tls,
360                &connection_candidate,
361            )
362            .await
363            .with_context(|| format!("IMAP failed to connect to {connection_candidate}"))
364            {
365                Ok(client) => client,
366                Err(err) => {
367                    warn!(context, "{err:#}.");
368                    first_error.get_or_insert(err);
369                    continue;
370                }
371            };
372
373            self.conn_backoff_ms = BACKOFF_MIN_MS;
374            self.ratelimit.send();
375
376            let imap_user: &str = lp.user.as_ref();
377            let imap_pw: &str = &self.password;
378
379            let login_res = if self.oauth2 {
380                info!(context, "Logging into IMAP server with OAuth 2.");
381                let addr: &str = self.addr.as_ref();
382
383                let token = get_oauth2_access_token(context, addr, imap_pw, true)
384                    .await?
385                    .context("IMAP could not get OAUTH token")?;
386                let auth = OAuth2 {
387                    user: imap_user.into(),
388                    access_token: token,
389                };
390                client.authenticate("XOAUTH2", auth).await
391            } else {
392                info!(context, "Logging into IMAP server with LOGIN.");
393                client.login(imap_user, imap_pw).await
394            };
395
396            match login_res {
397                Ok(mut session) => {
398                    let capabilities = determine_capabilities(&mut session).await?;
399                    let resync_request_sender = self.resync_request_sender.clone();
400
401                    let session = if capabilities.can_compress {
402                        info!(context, "Enabling IMAP compression.");
403                        let compressed_session = session
404                            .compress(|s| {
405                                let session_stream: Box<dyn SessionStream> = Box::new(s);
406                                session_stream
407                            })
408                            .await
409                            .context("Failed to enable IMAP compression")?;
410                        Session::new(
411                            compressed_session,
412                            capabilities,
413                            resync_request_sender,
414                            self.transport_id,
415                        )
416                    } else {
417                        Session::new(
418                            session,
419                            capabilities,
420                            resync_request_sender,
421                            self.transport_id,
422                        )
423                    };
424
425                    // Store server ID in the context to display in account info.
426                    let mut lock = context.server_id.write().await;
427                    lock.clone_from(&session.capabilities.server_id);
428
429                    self.authentication_failed_once = false;
430                    context.emit_event(EventType::ImapConnected(format!(
431                        "IMAP-LOGIN as {}",
432                        lp.user
433                    )));
434                    self.connectivity.set_preparing(context);
435                    info!(context, "Successfully logged into IMAP server.");
436                    return Ok(session);
437                }
438
439                Err(err) => {
440                    let imap_user = lp.user.to_owned();
441                    let message = stock_str::cannot_login(context, &imap_user);
442
443                    warn!(context, "IMAP failed to login: {err:#}.");
444                    first_error.get_or_insert(format_err!("{message} ({err:#})"));
445
446                    // If it looks like the password is wrong, send a notification:
447                    let _lock = context.wrong_pw_warning_mutex.lock().await;
448                    if err.to_string().to_lowercase().contains("authentication") {
449                        if self.authentication_failed_once
450                            && !configuring
451                            && context.get_config_bool(Config::NotifyAboutWrongPw).await?
452                        {
453                            let mut msg = Message::new_text(message);
454                            if let Err(e) = chat::add_device_msg_with_importance(
455                                context,
456                                None,
457                                Some(&mut msg),
458                                true,
459                            )
460                            .await
461                            {
462                                warn!(context, "Failed to add device message: {e:#}.");
463                            } else {
464                                context
465                                    .set_config_internal(Config::NotifyAboutWrongPw, None)
466                                    .await
467                                    .log_err(context)
468                                    .ok();
469                            }
470                        } else {
471                            self.authentication_failed_once = true;
472                        }
473                    } else {
474                        self.authentication_failed_once = false;
475                    }
476                }
477            }
478        }
479
480        Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
481    }
482
483    /// Prepare a new IMAP session.
484    ///
485    /// This creates a new IMAP connection and ensures
486    /// that folders are created and IMAP capabilities are determined.
487    pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
488        let configuring = false;
489        let session = match self.connect(context, configuring).await {
490            Ok(session) => session,
491            Err(err) => {
492                self.connectivity.set_err(context, format!("{err:#}"));
493                return Err(err);
494            }
495        };
496
497        Ok(session)
498    }
499
500    /// FETCH-MOVE-DELETE iteration.
501    ///
502    /// Prefetches headers and downloads new message from the folder, moves messages away from the
503    /// folder and deletes messages in the folder.
504    pub async fn fetch_move_delete(
505        &mut self,
506        context: &Context,
507        session: &mut Session,
508        watch_folder: &str,
509    ) -> Result<()> {
510        ensure_and_debug_assert!(!watch_folder.is_empty(), "Watched folder cannot be empty");
511        if !context.sql.is_open().await {
512            // probably shutdown
513            bail!("IMAP operation attempted while it is torn down");
514        }
515
516        let msgs_fetched = self
517            .fetch_new_messages(context, session, watch_folder)
518            .await
519            .context("fetch_new_messages")?;
520        if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
521            // New messages were fetched and shall be deleted later, restart ephemeral loop.
522            // Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are
523            // fetched while the per-chat ephemeral timers start as soon as the messages are marked
524            // as noticed.
525            context.scheduler.interrupt_ephemeral_task().await;
526        }
527
528        // Mark expired messages for deletion. Note that `delete_expired_imap_messages` is
529        // not well optimized and should not be called before fetching.
530        delete_expired_imap_messages(context, session.transport_id(), session.is_chatmail())
531            .await
532            .context("delete_expired_imap_messages")?;
533
534        session
535            .move_delete_messages(context, watch_folder)
536            .await
537            .context("move_delete_messages")?;
538
539        Ok(())
540    }
541
542    /// Fetches new messages.
543    ///
544    /// Returns true if at least one message was fetched.
545    #[expect(clippy::arithmetic_side_effects)]
546    pub(crate) async fn fetch_new_messages(
547        &mut self,
548        context: &Context,
549        session: &mut Session,
550        folder: &str,
551    ) -> Result<bool> {
552        let transport_id = session.transport_id();
553
554        let folder_exists = session
555            .select_with_uidvalidity(context, folder)
556            .await
557            .with_context(|| format!("Failed to select folder {folder:?}"))?;
558
559        if !session.new_mail {
560            info!(
561                context,
562                "Transport {transport_id}: No new emails in folder {folder:?}."
563            );
564            return Ok(false);
565        }
566        // Make sure not to return before setting new_mail to false
567        // Otherwise, we will skip IDLE and go into an infinite loop
568        session.new_mail = false;
569
570        if !folder_exists {
571            return Ok(false);
572        }
573
574        let mut read_cnt = 0;
575        loop {
576            let (n, fetch_more) =
577                Box::pin(self.fetch_new_msg_batch(context, session, folder)).await?;
578            read_cnt += n;
579            if !fetch_more {
580                return Ok(read_cnt > 0);
581            }
582        }
583    }
584
585    /// Returns number of messages processed and whether the function should be called again.
586    #[expect(clippy::arithmetic_side_effects)]
587    async fn fetch_new_msg_batch(
588        &mut self,
589        context: &Context,
590        session: &mut Session,
591        folder: &str,
592    ) -> Result<(usize, bool)> {
593        let transport_id = self.transport_id;
594        let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
595        let old_uid_next = get_uid_next(context, transport_id, folder).await?;
596        info!(
597            context,
598            "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
599        );
600
601        let uids_to_prefetch = 500;
602        let msgs = session
603            .prefetch(old_uid_next, uids_to_prefetch)
604            .await
605            .context("prefetch")?;
606        let read_cnt = msgs.len();
607        let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
608
609        let mut uids_fetch: Vec<u32> = Vec::new();
610        let mut available_post_msgs: Vec<String> = Vec::new();
611        let mut download_later: Vec<String> = Vec::new();
612        let mut uid_message_ids = BTreeMap::new();
613        let mut largest_uid_skipped = None;
614
615        let download_limit: Option<u32> = context
616            .get_config_parsed(Config::DownloadLimit)
617            .await?
618            .filter(|&l| 0 < l);
619
620        // Store the info about IMAP messages in the database.
621        for (uid, ref fetch_response) in msgs {
622            let headers = match get_fetch_headers(fetch_response) {
623                Ok(headers) => headers,
624                Err(err) => {
625                    warn!(context, "Failed to parse FETCH headers: {err:#}.");
626                    continue;
627                }
628            };
629
630            let message_id = prefetch_get_message_id(&headers);
631            let size = fetch_response
632                .size
633                .context("imap fetch response does not contain size")?;
634
635            // Determine the target folder where the message should be moved to.
636            //
637            // We only move the messages from the INBOX and Spam folders.
638            // This is required to avoid infinite MOVE loop on IMAP servers
639            // that alias `DeltaChat` folder to other names.
640            // For example, some Dovecot servers alias `DeltaChat` folder to `INBOX.DeltaChat`.
641            // In this case moving from `INBOX.DeltaChat` to `DeltaChat`
642            // results in the messages getting a new UID,
643            // so the messages will be detected as new
644            // in the `INBOX.DeltaChat` folder again.
645            let delete = if let Some(message_id) = &message_id {
646                message::rfc724_mid_exists_ex(context, message_id, "deleted=1")
647                    .await?
648                    .is_some_and(|(_msg_id, deleted)| deleted)
649            } else {
650                false
651            };
652
653            // Generate a fake Message-ID to identify the message in the database
654            // if the message has no real Message-ID.
655            let message_id = message_id.unwrap_or_else(create_message_id);
656
657            if delete {
658                info!(context, "Deleting locally deleted message {message_id}.");
659            }
660
661            let target = if delete { "" } else { folder };
662
663            context
664                .sql
665                .execute(
666                    "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
667                       VALUES         (?,            ?,          ?,      ?,   ?,           ?)
668                       ON CONFLICT(transport_id, folder, uid, uidvalidity)
669                       DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
670                                     target=excluded.target",
671                    (
672                        self.transport_id,
673                        &message_id,
674                        &folder,
675                        uid,
676                        uid_validity,
677                        target,
678                    ),
679                )
680                .await?;
681
682            // Download only the messages which have reached their target folder if there are
683            // multiple devices. This prevents race conditions in multidevice case, where one
684            // device tries to download the message while another device moves the message at the
685            // same time. Even in single device case it is possible to fail downloading the first
686            // message, move it to the movebox and then download the second message before
687            // downloading the first one, if downloading from inbox before moving is allowed.
688            if folder == target
689                && prefetch_should_download(context, &headers, &message_id, fetch_response.flags())
690                    .await
691                    .context("prefetch_should_download")?
692            {
693                if headers
694                    .get_header_value(HeaderDef::ChatIsPostMessage)
695                    .is_some()
696                {
697                    info!(context, "{message_id:?} is a post-message.");
698                    available_post_msgs.push(message_id.clone());
699
700                    let is_bot = context.get_config_bool(Config::Bot).await?;
701                    if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
702                    {
703                        uids_fetch.push(uid);
704                        uid_message_ids.insert(uid, message_id);
705                    } else {
706                        if download_limit.is_none_or(|download_limit| size <= download_limit) {
707                            // Download later after all the small messages are downloaded,
708                            // so that large messages don't delay receiving small messages
709                            download_later.push(message_id.clone());
710                        }
711                        largest_uid_skipped = Some(uid);
712                    }
713                } else {
714                    info!(context, "{message_id:?} is not a post-message.");
715                    if download_limit.is_none_or(|download_limit| size <= download_limit) {
716                        uids_fetch.push(uid);
717                        uid_message_ids.insert(uid, message_id);
718                    } else {
719                        download_later.push(message_id.clone());
720                        largest_uid_skipped = Some(uid);
721                    }
722                };
723            } else {
724                largest_uid_skipped = Some(uid);
725            }
726        }
727
728        if !uids_fetch.is_empty() {
729            self.connectivity.set_working(context);
730        }
731
732        let (sender, receiver) = async_channel::unbounded();
733
734        let mut received_msgs = Vec::with_capacity(uids_fetch.len());
735        let mailbox_uid_next = session
736            .selected_mailbox
737            .as_ref()
738            .with_context(|| format!("Expected {folder:?} to be selected"))?
739            .uid_next
740            .unwrap_or_default();
741
742        let update_uids_future = async {
743            let mut largest_uid_fetched: u32 = 0;
744
745            while let Ok((uid, received_msg_opt)) = receiver.recv().await {
746                largest_uid_fetched = max(largest_uid_fetched, uid);
747                if let Some(received_msg) = received_msg_opt {
748                    received_msgs.push(received_msg)
749                }
750            }
751
752            largest_uid_fetched
753        };
754
755        let actually_download_messages_future = async {
756            session
757                .fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
758                .await
759                .context("fetch_many_msgs")
760        };
761
762        let (largest_uid_fetched, fetch_res) =
763            tokio::join!(update_uids_future, actually_download_messages_future);
764
765        // Advance uid_next to the largest fetched UID plus 1.
766        //
767        // This may be larger than `mailbox_uid_next`
768        // if the message has arrived after selecting mailbox
769        // and determining its UIDNEXT and before prefetch.
770        let mut new_uid_next = largest_uid_fetched + 1;
771        let fetch_more = fetch_res.is_ok() && {
772            let prefetch_uid_next = old_uid_next + uids_to_prefetch;
773            // If we have successfully fetched all messages we planned during prefetch,
774            // then we have covered at least the range between old UIDNEXT
775            // and UIDNEXT of the mailbox at the time of selecting it.
776            new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
777
778            new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
779
780            prefetch_uid_next < mailbox_uid_next
781        };
782        if new_uid_next > old_uid_next {
783            set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
784        }
785
786        info!(context, "{} mails read from \"{}\".", read_cnt, folder);
787
788        if !received_msgs.is_empty() {
789            context.emit_event(EventType::IncomingMsgBunch);
790        }
791
792        chat::mark_old_messages_as_noticed(context, received_msgs).await?;
793
794        if fetch_res.is_ok() {
795            info!(
796                context,
797                "available_post_msgs: {}, download_later: {}.",
798                available_post_msgs.len(),
799                download_later.len(),
800            );
801            let trans_fn = |t: &mut rusqlite::Transaction| {
802                let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
803                for rfc724_mid in available_post_msgs {
804                    stmt.execute((rfc724_mid,))
805                        .context("INSERT OR IGNORE INTO available_post_msgs")?;
806                }
807                let mut stmt =
808                    t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
809                for rfc724_mid in download_later {
810                    stmt.execute((rfc724_mid,))
811                        .context("INSERT OR IGNORE INTO download")?;
812                }
813                Ok(())
814            };
815            context.sql.transaction(trans_fn).await?;
816        }
817
818        // Now fail if fetching failed, so we will
819        // establish a new session if this one is broken.
820        fetch_res?;
821
822        Ok((read_cnt, fetch_more))
823    }
824}
825
826impl Session {
827    /// Synchronizes UIDs for all folders.
828    pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
829        let all_folders = self
830            .list_folders()
831            .await
832            .context("listing folders for resync")?;
833        for folder in all_folders {
834            let folder_meaning = get_folder_meaning(&folder);
835            if !matches!(
836                folder_meaning,
837                FolderMeaning::Virtual | FolderMeaning::Unknown
838            ) {
839                self.resync_folder_uids(context, folder.name(), folder_meaning)
840                    .await?;
841            }
842        }
843        Ok(())
844    }
845
846    /// Synchronizes UIDs in the database with UIDs on the server.
847    ///
848    /// It is assumed that no operations are taking place on the same
849    /// folder at the moment. Make sure to run it in the same
850    /// thread/task as other network operations on this folder to
851    /// avoid race conditions.
852    pub(crate) async fn resync_folder_uids(
853        &mut self,
854        context: &Context,
855        folder: &str,
856        folder_meaning: FolderMeaning,
857    ) -> Result<()> {
858        let uid_validity;
859        // Collect pairs of UID and Message-ID.
860        let mut msgs = BTreeMap::new();
861
862        let folder_exists = self.select_with_uidvalidity(context, folder).await?;
863        let transport_id = self.transport_id();
864        if folder_exists {
865            let mut list = self
866                .uid_fetch("1:*", RFC724MID_UID)
867                .await
868                .with_context(|| format!("Can't resync folder {folder}"))?;
869            while let Some(fetch) = list.try_next().await? {
870                let headers = match get_fetch_headers(&fetch) {
871                    Ok(headers) => headers,
872                    Err(err) => {
873                        warn!(context, "Failed to parse FETCH headers: {}", err);
874                        continue;
875                    }
876                };
877                let message_id = prefetch_get_message_id(&headers);
878
879                if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
880                    msgs.insert(
881                        uid,
882                        (
883                            rfc724_mid,
884                            target_folder(context, folder, folder_meaning, &headers).await?,
885                        ),
886                    );
887                }
888            }
889
890            info!(
891                context,
892                "resync_folder_uids: Collected {} message IDs in {folder}.",
893                msgs.len(),
894            );
895
896            uid_validity = get_uidvalidity(context, transport_id, folder).await?;
897        } else {
898            warn!(context, "resync_folder_uids: No folder {folder}.");
899            uid_validity = 0;
900        }
901
902        // Write collected UIDs to SQLite database.
903        context
904            .sql
905            .transaction(move |transaction| {
906                transaction.execute("DELETE FROM imap WHERE transport_id=? AND folder=?", (transport_id, folder,))?;
907                for (uid, (rfc724_mid, target)) in &msgs {
908                    // This may detect previously undetected moved
909                    // messages, so we update server_folder too.
910                    transaction.execute(
911                        "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
912                         VALUES           (?,            ?,          ?,      ?,   ?,           ?)
913                         ON CONFLICT(transport_id, folder, uid, uidvalidity)
914                         DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
915                                       target=excluded.target",
916                        (transport_id, rfc724_mid, folder, uid, uid_validity, target),
917                    )?;
918                }
919                Ok(())
920            })
921            .await?;
922        Ok(())
923    }
924
925    /// Deletes batch of messages identified by their UID from the currently
926    /// selected folder.
927    async fn delete_message_batch(
928        &mut self,
929        context: &Context,
930        uid_set: &str,
931        row_ids: Vec<i64>,
932    ) -> Result<()> {
933        // mark the message for deletion
934        self.add_flag_finalized_with_set(uid_set, "\\Deleted")
935            .await?;
936        context
937            .sql
938            .transaction(|transaction| {
939                let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
940                for row_id in row_ids {
941                    stmt.execute((row_id,))?;
942                }
943                Ok(())
944            })
945            .await
946            .context("Cannot remove deleted messages from imap table")?;
947
948        context.emit_event(EventType::ImapMessageDeleted(format!(
949            "IMAP messages {uid_set} marked as deleted"
950        )));
951        Ok(())
952    }
953
954    /// Moves batch of messages identified by their UID from the currently
955    /// selected folder to the target folder.
956    async fn move_message_batch(
957        &mut self,
958        context: &Context,
959        set: &str,
960        row_ids: Vec<i64>,
961        target: &str,
962    ) -> Result<()> {
963        if self.can_move() {
964            match self.uid_mv(set, &target).await {
965                Ok(()) => {
966                    // Messages are moved or don't exist, IMAP returns OK response in both cases.
967                    context
968                        .sql
969                        .transaction(|transaction| {
970                            let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
971                            for row_id in row_ids {
972                                stmt.execute((row_id,))?;
973                            }
974                            Ok(())
975                        })
976                        .await
977                        .context("Cannot delete moved messages from imap table")?;
978                    context.emit_event(EventType::ImapMessageMoved(format!(
979                        "IMAP messages {set} moved to {target}"
980                    )));
981                    return Ok(());
982                }
983                Err(err) => {
984                    warn!(
985                        context,
986                        "Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
987                        set,
988                        target,
989                        err
990                    );
991                }
992            }
993        }
994
995        // Server does not support MOVE or MOVE failed.
996        // Copy messages to the destination folder if needed and mark records for deletion.
997        info!(
998            context,
999            "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
1000        );
1001        self.uid_copy(&set, &target).await?;
1002        context
1003            .sql
1004            .transaction(|transaction| {
1005                let mut stmt = transaction.prepare("UPDATE imap SET target='' WHERE id = ?")?;
1006                for row_id in row_ids {
1007                    stmt.execute((row_id,))?;
1008                }
1009                Ok(())
1010            })
1011            .await
1012            .context("Cannot plan deletion of messages")?;
1013        context.emit_event(EventType::ImapMessageMoved(format!(
1014            "IMAP messages {set} copied to {target}"
1015        )));
1016        Ok(())
1017    }
1018
1019    /// Moves and deletes messages as planned in the `imap` table.
1020    ///
1021    /// This is the only place where messages are moved or deleted on the IMAP server.
1022    async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
1023        let transport_id = self.transport_id();
1024        let rows = context
1025            .sql
1026            .query_map_vec(
1027                "SELECT id, uid, target FROM imap
1028                 WHERE folder = ?
1029                 AND transport_id = ?
1030                 AND target != folder
1031                 ORDER BY target, uid",
1032                (folder, transport_id),
1033                |row| {
1034                    let rowid: i64 = row.get(0)?;
1035                    let uid: u32 = row.get(1)?;
1036                    let target: String = row.get(2)?;
1037                    Ok((rowid, uid, target))
1038                },
1039            )
1040            .await?;
1041
1042        for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
1043            // Select folder inside the loop to avoid selecting it if there are no pending
1044            // MOVE/DELETE operations. This does not result in multiple SELECT commands
1045            // being sent because `select_folder()` does nothing if the folder is already
1046            // selected.
1047            let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1048            ensure!(folder_exists, "No folder {folder}");
1049
1050            // Empty target folder name means messages should be deleted.
1051            if target.is_empty() {
1052                self.delete_message_batch(context, &uid_set, rowid_set)
1053                    .await
1054                    .with_context(|| format!("cannot delete batch of messages {uid_set:?}"))?;
1055            } else {
1056                self.move_message_batch(context, &uid_set, rowid_set, &target)
1057                    .await
1058                    .with_context(|| {
1059                        format!("cannot move batch of messages {uid_set:?} to folder {target:?}",)
1060                    })?;
1061            }
1062        }
1063
1064        // Expunge folder if needed, e.g. if some jobs have
1065        // deleted messages on the server.
1066        if let Err(err) = self.maybe_close_folder(context).await {
1067            warn!(context, "Failed to close folder: {err:#}.");
1068        }
1069
1070        Ok(())
1071    }
1072
1073    /// Stores pending `\Seen` flags for messages in `imap_markseen` table.
1074    pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
1075        if context.get_config_bool(Config::TeamProfile).await? {
1076            return Ok(());
1077        }
1078
1079        context
1080            .sql
1081            .execute(
1082                "DELETE FROM imap_markseen WHERE id NOT IN (SELECT imap.id FROM imap)",
1083                (),
1084            )
1085            .await?;
1086
1087        let transport_id = self.transport_id();
1088        let mut rows = context
1089            .sql
1090            .query_map_vec(
1091                "SELECT imap.id, uid, folder FROM imap, imap_markseen
1092                 WHERE imap.id = imap_markseen.id
1093                 AND imap.transport_id=?
1094                 AND target = folder",
1095                (transport_id,),
1096                |row| {
1097                    let rowid: i64 = row.get(0)?;
1098                    let uid: u32 = row.get(1)?;
1099                    let folder: String = row.get(2)?;
1100                    Ok((rowid, uid, folder))
1101                },
1102            )
1103            .await?;
1104
1105        // Number of SQL results is expected to be low as
1106        // we usually don't have many messages to mark on IMAP at once.
1107        // We are sorting outside of SQL to avoid SQLite constructing a query plan
1108        // that scans the whole `imap` table. Scanning `imap_markseen` is fine
1109        // as it should not have many items.
1110        // If you change the SQL query, test it with `EXPLAIN QUERY PLAN`.
1111        rows.sort_unstable_by(|(_rowid1, uid1, folder1), (_rowid2, uid2, folder2)| {
1112            (folder1, uid1).cmp(&(folder2, uid2))
1113        });
1114
1115        for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
1116            let folder_exists = match self.select_with_uidvalidity(context, &folder).await {
1117                Err(err) => {
1118                    warn!(
1119                        context,
1120                        "store_seen_flags_on_imap: Transport {transport_id}: Failed to select {folder}, will retry later: {err:#}."
1121                    );
1122                    continue;
1123                }
1124                Ok(folder_exists) => folder_exists,
1125            };
1126            if !folder_exists {
1127                warn!(context, "store_seen_flags_on_imap: No folder {folder}.");
1128            } else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
1129                warn!(
1130                    context,
1131                    "Transport {transport_id}: Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}."
1132                );
1133                continue;
1134            } else {
1135                info!(
1136                    context,
1137                    "Transport {transport_id}: Marked messages {uid_set} in folder {folder} as seen."
1138                );
1139            }
1140            context
1141                .sql
1142                .transaction(|transaction| {
1143                    let mut stmt = transaction.prepare("DELETE FROM imap_markseen WHERE id = ?")?;
1144                    for rowid in rowid_set {
1145                        stmt.execute((rowid,))?;
1146                    }
1147                    Ok(())
1148                })
1149                .await
1150                .context("Cannot remove messages marked as seen from imap_markseen table")?;
1151        }
1152
1153        Ok(())
1154    }
1155
1156    /// Fetches a list of messages by server UID.
1157    ///
1158    /// Sends pairs of UID and info about each downloaded message to the provided channel.
1159    /// Received message info is optional because UID may be ignored
1160    /// if the message has a `\Deleted` flag.
1161    ///
1162    /// The channel is used to return the results because the function may fail
1163    /// due to network errors before it finishes fetching all the messages.
1164    /// In this case caller still may want to process all the results
1165    /// received over the channel and persist last seen UID in the database
1166    /// before bubbling up the failure.
1167    ///
1168    /// If the message is incorrect or there is a failure to write a message to the database,
1169    /// it is skipped and the error is logged.
1170    #[expect(clippy::arithmetic_side_effects)]
1171    pub(crate) async fn fetch_many_msgs(
1172        &mut self,
1173        context: &Context,
1174        folder: &str,
1175        request_uids: Vec<u32>,
1176        uid_message_ids: &BTreeMap<u32, String>,
1177        received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1178    ) -> Result<()> {
1179        if request_uids.is_empty() {
1180            return Ok(());
1181        }
1182
1183        for (request_uids, set) in build_sequence_sets(&request_uids)? {
1184            info!(context, "Starting UID FETCH of message set \"{}\".", set);
1185            let mut fetch_responses = self
1186                .uid_fetch(&set, BODY_FULL)
1187                .await
1188                .with_context(|| format!("fetching messages {set} from folder {folder:?}"))?;
1189
1190            // Map from UIDs to unprocessed FETCH results. We put unprocessed FETCH results here
1191            // when we want to process other messages first.
1192            let mut uid_msgs = HashMap::with_capacity(request_uids.len());
1193
1194            let mut count = 0;
1195            for &request_uid in &request_uids {
1196                // Check if FETCH response is already in `uid_msgs`.
1197                let mut fetch_response = uid_msgs.remove(&request_uid);
1198
1199                // Try to find a requested UID in returned FETCH responses.
1200                while fetch_response.is_none() {
1201                    let Some(next_fetch_response) = fetch_responses
1202                        .try_next()
1203                        .await
1204                        .context("Failed to process IMAP FETCH result")?
1205                    else {
1206                        // No more FETCH responses received from the server.
1207                        break;
1208                    };
1209
1210                    if let Some(next_uid) = next_fetch_response.uid {
1211                        if next_uid == request_uid {
1212                            fetch_response = Some(next_fetch_response);
1213                        } else if !request_uids.contains(&next_uid) {
1214                            // (size of `request_uids` is bounded by IMAP command length limit,
1215                            // search in this vector is always fast)
1216
1217                            // Unwanted UIDs are possible because of unsolicited responses, e.g. if
1218                            // another client changes \Seen flag on a message after we do a prefetch but
1219                            // before fetch. It's not an error if we receive such unsolicited response.
1220                            info!(
1221                                context,
1222                                "Skipping not requested FETCH response for UID {}.", next_uid
1223                            );
1224                        } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
1225                            warn!(context, "Got duplicated UID {}.", next_uid);
1226                        }
1227                    } else {
1228                        info!(context, "Skipping FETCH response without UID.");
1229                    }
1230                }
1231
1232                let fetch_response = match fetch_response {
1233                    Some(fetch) => fetch,
1234                    None => {
1235                        warn!(
1236                            context,
1237                            "Missed UID {} in the server response.", request_uid
1238                        );
1239                        continue;
1240                    }
1241                };
1242                count += 1;
1243
1244                let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
1245                let body = fetch_response.body();
1246
1247                if is_deleted {
1248                    info!(context, "Not processing deleted msg {}.", request_uid);
1249                    received_msgs_channel.send((request_uid, None)).await?;
1250                    continue;
1251                }
1252
1253                let body = if let Some(body) = body {
1254                    body
1255                } else {
1256                    info!(
1257                        context,
1258                        "Not processing message {} without a BODY.", request_uid
1259                    );
1260                    received_msgs_channel.send((request_uid, None)).await?;
1261                    continue;
1262                };
1263
1264                let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
1265
1266                let Some(rfc724_mid) = uid_message_ids.get(&request_uid) else {
1267                    error!(
1268                        context,
1269                        "No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
1270                        request_uid
1271                    );
1272                    continue;
1273                };
1274
1275                info!(
1276                    context,
1277                    "Passing message UID {} to receive_imf().", request_uid
1278                );
1279                let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
1280
1281                // If there was an error receiving the message, show a device message:
1282                let received_msg = match res {
1283                    Err(err) => {
1284                        warn!(context, "receive_imf error: {err:#}.");
1285
1286                        let text = format!(
1287                            // No trailing '.' to avoid from the Android UI treating it as a part of
1288                            // URL.
1289                            "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/",
1290                        );
1291                        let mut msg = Message::new_text(text);
1292                        add_device_msg(context, None, Some(&mut msg)).await?;
1293                        None
1294                    }
1295                    Ok(msg) => msg,
1296                };
1297                received_msgs_channel
1298                    .send((request_uid, received_msg))
1299                    .await?;
1300            }
1301
1302            // If we don't process the whole response, IMAP client is left in a broken state where
1303            // it will try to process the rest of response as the next response.
1304            //
1305            // Make sure to not ignore the errors, because
1306            // if connection times out, it will return
1307            // infinite stream of `Some(Err(_))` results.
1308            while fetch_responses
1309                .try_next()
1310                .await
1311                .context("Failed to drain FETCH responses")?
1312                .is_some()
1313            {}
1314
1315            if count != request_uids.len() {
1316                warn!(
1317                    context,
1318                    "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
1319                    count,
1320                    request_uids.len(),
1321                    request_uids,
1322                );
1323            } else {
1324                info!(
1325                    context,
1326                    "Successfully received {} UIDs.",
1327                    request_uids.len()
1328                );
1329            }
1330        }
1331
1332        Ok(())
1333    }
1334
1335    /// Retrieves server metadata if it is supported, otherwise uses fallback one.
1336    ///
1337    /// We get [`/shared/comment`](https://www.rfc-editor.org/rfc/rfc5464#section-6.2.1)
1338    /// and [`/shared/admin`](https://www.rfc-editor.org/rfc/rfc5464#section-6.2.2)
1339    /// metadata.
1340    #[expect(clippy::arithmetic_side_effects)]
1341    pub(crate) async fn update_metadata(&mut self, context: &Context) -> Result<()> {
1342        let mut lock = context.metadata.write().await;
1343
1344        if !self.can_metadata() {
1345            *lock = Some(Default::default());
1346        }
1347        if let Some(ref mut old_metadata) = *lock {
1348            let now = time();
1349
1350            // Refresh TURN server credentials if they expire in 12 hours.
1351            if now + 3600 * 12 < old_metadata.ice_servers_expiration_timestamp {
1352                return Ok(());
1353            }
1354
1355            let mut got_turn_server = false;
1356            if self.can_metadata() {
1357                info!(context, "ICE servers expired, requesting new credentials.");
1358                let mailbox = "";
1359                let options = "";
1360                let metadata = self
1361                    .get_metadata(mailbox, options, "(/shared/vendor/deltachat/turn)")
1362                    .await?;
1363                for m in metadata {
1364                    if m.entry == "/shared/vendor/deltachat/turn"
1365                        && let Some(value) = m.value
1366                    {
1367                        match create_ice_servers_from_metadata(&value).await {
1368                            Ok((parsed_timestamp, parsed_ice_servers)) => {
1369                                old_metadata.ice_servers_expiration_timestamp = parsed_timestamp;
1370                                old_metadata.ice_servers = parsed_ice_servers;
1371                                got_turn_server = true;
1372                            }
1373                            Err(err) => {
1374                                warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1375                            }
1376                        }
1377                    }
1378                }
1379            }
1380            if !got_turn_server {
1381                info!(context, "Will use fallback ICE servers.");
1382                // Set expiration timestamp 7 days in the future so we don't request it again.
1383                old_metadata.ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1384                old_metadata.ice_servers = create_fallback_ice_servers();
1385            }
1386            return Ok(());
1387        }
1388
1389        info!(
1390            context,
1391            "Server supports metadata, retrieving server comment and admin contact."
1392        );
1393
1394        let mut comment = None;
1395        let mut admin = None;
1396        let mut iroh_relay = None;
1397        let mut ice_servers = None;
1398        let mut ice_servers_expiration_timestamp = 0;
1399
1400        let mailbox = "";
1401        let options = "";
1402        let metadata = self
1403            .get_metadata(
1404                mailbox,
1405                options,
1406                "(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
1407            )
1408            .await?;
1409        for m in metadata {
1410            match m.entry.as_ref() {
1411                "/shared/comment" => {
1412                    comment = m.value;
1413                }
1414                "/shared/admin" => {
1415                    admin = m.value;
1416                }
1417                "/shared/vendor/deltachat/irohrelay" => {
1418                    if let Some(value) = m.value {
1419                        if let Ok(url) = Url::parse(&value) {
1420                            iroh_relay = Some(url);
1421                        } else {
1422                            warn!(
1423                                context,
1424                                "Got invalid URL from iroh relay metadata: {:?}.", value
1425                            );
1426                        }
1427                    }
1428                }
1429                "/shared/vendor/deltachat/turn" => {
1430                    if let Some(value) = m.value {
1431                        match create_ice_servers_from_metadata(&value).await {
1432                            Ok((parsed_timestamp, parsed_ice_servers)) => {
1433                                ice_servers_expiration_timestamp = parsed_timestamp;
1434                                ice_servers = Some(parsed_ice_servers);
1435                            }
1436                            Err(err) => {
1437                                warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1438                            }
1439                        }
1440                    }
1441                }
1442                _ => {}
1443            }
1444        }
1445        let ice_servers = if let Some(ice_servers) = ice_servers {
1446            ice_servers
1447        } else {
1448            // Set expiration timestamp 7 days in the future so we don't request it again.
1449            ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1450            create_fallback_ice_servers()
1451        };
1452
1453        *lock = Some(ServerMetadata {
1454            comment,
1455            admin,
1456            iroh_relay,
1457            ice_servers,
1458            ice_servers_expiration_timestamp,
1459        });
1460        Ok(())
1461    }
1462
1463    /// Stores device token into /private/devicetoken IMAP METADATA of the Inbox.
1464    pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
1465        if context.push_subscribed.load(Ordering::Relaxed) {
1466            return Ok(());
1467        }
1468
1469        let transport_id = self.transport_id();
1470
1471        let Some(device_token) = context.push_subscriber.device_token().await else {
1472            return Ok(());
1473        };
1474
1475        if self.can_metadata() && self.can_push() {
1476            info!(
1477                context,
1478                "Transport {transport_id}: Subscribing for push notifications."
1479            );
1480
1481            let old_encrypted_device_token =
1482                context.get_config(Config::EncryptedDeviceToken).await?;
1483
1484            // Whether we need to update encrypted device token.
1485            let device_token_changed = old_encrypted_device_token.is_none()
1486                || context.get_config(Config::DeviceToken).await?.as_ref() != Some(&device_token);
1487
1488            let new_encrypted_device_token;
1489            if device_token_changed {
1490                let encrypted_device_token = encrypt_device_token(&device_token)
1491                    .context("Failed to encrypt device token")?;
1492
1493                // We expect that the server supporting `XDELTAPUSH` capability
1494                // has non-synchronizing literals support as well:
1495                // <https://www.rfc-editor.org/rfc/rfc7888>.
1496                let encrypted_device_token_len = encrypted_device_token.len();
1497
1498                // Store device token saved on the server
1499                // to prevent storing duplicate tokens.
1500                // The server cannot deduplicate on its own
1501                // because encryption gives a different
1502                // result each time.
1503                context
1504                    .set_config_internal(Config::DeviceToken, Some(&device_token))
1505                    .await?;
1506                context
1507                    .set_config_internal(
1508                        Config::EncryptedDeviceToken,
1509                        Some(&encrypted_device_token),
1510                    )
1511                    .await?;
1512
1513                if encrypted_device_token_len <= 4096 {
1514                    new_encrypted_device_token = Some(encrypted_device_token);
1515                } else {
1516                    // If Apple or Google (FCM) gives us a very large token,
1517                    // do not even try to give it to IMAP servers.
1518                    //
1519                    // Limit of 4096 is arbitrarily selected
1520                    // to be the same as required by LITERAL- IMAP extension.
1521                    //
1522                    // Dovecot supports LITERAL+ and non-synchronizing literals
1523                    // of any length, but there is no reason for tokens
1524                    // to be that large even after OpenPGP encryption.
1525                    warn!(context, "Device token is too long for LITERAL-, ignoring.");
1526                    new_encrypted_device_token = None;
1527                }
1528            } else {
1529                new_encrypted_device_token = old_encrypted_device_token;
1530            }
1531
1532            // Store new encrypted device token on the server
1533            // even if it is the same as the old one.
1534            if let Some(encrypted_device_token) = new_encrypted_device_token {
1535                self.run_command_and_check_ok(&format_setmetadata(
1536                    "INBOX",
1537                    &encrypted_device_token,
1538                ))
1539                .await
1540                .context("SETMETADATA command failed")?;
1541
1542                context.push_subscribed.store(true, Ordering::Relaxed);
1543            }
1544        } else if !context.push_subscriber.heartbeat_subscribed().await {
1545            let context = context.clone();
1546            // Subscribe for heartbeat notifications.
1547            tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
1548        }
1549
1550        Ok(())
1551    }
1552}
1553
1554fn format_setmetadata(folder: &str, device_token: &str) -> String {
1555    let device_token_len = device_token.len();
1556    format!(
1557        "SETMETADATA \"{folder}\" (/private/devicetoken {{{device_token_len}+}}\r\n{device_token})"
1558    )
1559}
1560
1561impl Session {
1562    /// Returns success if we successfully set the flag or we otherwise
1563    /// think add_flag should not be retried: Disconnection during setting
1564    /// the flag, or other imap-errors, returns Ok as well.
1565    ///
1566    /// Returning error means that the operation can be retried.
1567    async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
1568        if flag == "\\Deleted" {
1569            self.selected_folder_needs_expunge = true;
1570        }
1571        let query = format!("+FLAGS ({flag})");
1572        let mut responses = self
1573            .uid_store(uid_set, &query)
1574            .await
1575            .with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
1576        while let Some(_response) = responses.try_next().await? {
1577            // Read all the responses
1578        }
1579        Ok(())
1580    }
1581}
1582
1583impl Session {
1584    /// Return whether the server sent an unsolicited EXISTS or FETCH response.
1585    ///
1586    /// Drains all responses from `session.unsolicited_responses` in the process.
1587    ///
1588    /// If this returns `true`, this means that new emails arrived
1589    /// or flags have been changed.
1590    /// In this case we may want to skip next IDLE and do a round
1591    /// of fetching new messages and synchronizing seen flags.
1592    fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
1593        use UnsolicitedResponse::*;
1594        use async_imap::imap_proto::Response;
1595        use async_imap::imap_proto::ResponseCode;
1596
1597        let folder = self.selected_folder.as_deref().unwrap_or_default();
1598        let mut should_refetch = false;
1599        while let Ok(response) = self.unsolicited_responses.try_recv() {
1600            match response {
1601                Exists(_) => {
1602                    info!(
1603                        context,
1604                        "Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
1605                    );
1606                    should_refetch = true;
1607                }
1608
1609                Expunge(_) | Recent(_) => {}
1610                Other(ref response_data) => {
1611                    match response_data.parsed() {
1612                        Response::Fetch { .. } => {
1613                            info!(
1614                                context,
1615                                "Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
1616                            );
1617                            should_refetch = true;
1618                        }
1619
1620                        // We are not interested in the following responses and they are are
1621                        // sent quite frequently, so, we ignore them without logging them.
1622                        Response::Done {
1623                            code: Some(ResponseCode::CopyUid(_, _, _)),
1624                            ..
1625                        } => {}
1626
1627                        _ => {
1628                            info!(context, "{folder:?}: got unsolicited response {response:?}")
1629                        }
1630                    }
1631                }
1632                _ => {
1633                    info!(context, "{folder:?}: got unsolicited response {response:?}")
1634                }
1635            }
1636        }
1637        Ok(should_refetch)
1638    }
1639}
1640
1641async fn should_move_out_of_spam(
1642    context: &Context,
1643    headers: &[mailparse::MailHeader<'_>],
1644) -> Result<bool> {
1645    if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
1646        // If this is a chat message (i.e. has a ChatVersion header), then this might be
1647        // a securejoin message. We can't find out at this point as we didn't prefetch
1648        // the SecureJoin header. So, we always move chat messages out of Spam.
1649        // Two possibilities to change this would be:
1650        // 1. Remove the `&& !context.is_spam_folder(folder).await?` check from
1651        // `fetch_new_messages()`, and then let `receive_imf()` check
1652        // if it's a spam message and should be hidden.
1653        // 2. Or add a flag to the ChatVersion header that this is a securejoin
1654        // request, and return `true` here only if the message has this flag.
1655        // `receive_imf()` can then check if the securejoin request is valid.
1656        return Ok(true);
1657    }
1658
1659    if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
1660        if msg.chat_blocked != Blocked::Not {
1661            // Blocked or contact request message in the spam folder, leave it there.
1662            return Ok(false);
1663        }
1664    } else {
1665        let from = match mimeparser::get_from(headers) {
1666            Some(f) => f,
1667            None => return Ok(false),
1668        };
1669        // No chat found.
1670        let (from_id, blocked_contact, _origin) =
1671            match from_field_to_contact_id(context, &from, None, true, true)
1672                .await
1673                .context("from_field_to_contact_id")?
1674            {
1675                Some(res) => res,
1676                None => {
1677                    warn!(
1678                        context,
1679                        "Contact with From address {:?} cannot exist, not moving out of spam", from
1680                    );
1681                    return Ok(false);
1682                }
1683            };
1684        if blocked_contact {
1685            // Contact is blocked, leave the message in spam.
1686            return Ok(false);
1687        }
1688
1689        if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
1690            if chat_id_blocked.blocked != Blocked::Not {
1691                return Ok(false);
1692            }
1693        } else if from_id != ContactId::SELF {
1694            // No chat with this contact found.
1695            return Ok(false);
1696        }
1697    }
1698
1699    Ok(true)
1700}
1701
1702/// Returns target folder for a message found in the Spam folder.
1703/// If this returns None, the message will not be moved out of the
1704/// Spam folder, and as `fetch_new_messages()` doesn't download
1705/// messages from the Spam folder, the message will be ignored.
1706async fn spam_target_folder_cfg(
1707    context: &Context,
1708    headers: &[mailparse::MailHeader<'_>],
1709) -> Result<Option<Config>> {
1710    if !should_move_out_of_spam(context, headers).await? {
1711        return Ok(None);
1712    }
1713
1714    Ok(Some(Config::ConfiguredInboxFolder))
1715}
1716
1717/// Returns `ConfiguredInboxFolder` or `ConfiguredMvboxFolder` if
1718/// the message needs to be moved from `folder`. Otherwise returns `None`.
1719pub async fn target_folder_cfg(
1720    context: &Context,
1721    folder: &str,
1722    folder_meaning: FolderMeaning,
1723    headers: &[mailparse::MailHeader<'_>],
1724) -> Result<Option<Config>> {
1725    if folder == "DeltaChat" {
1726        return Ok(None);
1727    }
1728
1729    if folder_meaning == FolderMeaning::Spam {
1730        spam_target_folder_cfg(context, headers).await
1731    } else {
1732        Ok(None)
1733    }
1734}
1735
1736pub async fn target_folder(
1737    context: &Context,
1738    folder: &str,
1739    folder_meaning: FolderMeaning,
1740    headers: &[mailparse::MailHeader<'_>],
1741) -> Result<String> {
1742    match target_folder_cfg(context, folder, folder_meaning, headers).await? {
1743        Some(config) => match context.get_config(config).await? {
1744            Some(target) => Ok(target),
1745            None => Ok(folder.to_string()),
1746        },
1747        None => Ok(folder.to_string()),
1748    }
1749}
1750
1751/// Try to get the folder meaning by the name of the folder only used if the server does not support XLIST.
1752// TODO: lots languages missing - maybe there is a list somewhere on other MUAs?
1753// however, if we fail to find out the sent-folder,
1754// only watching this folder is not working. at least, this is no show stopper.
1755// CAVE: if possible, take care not to add a name here that is "sent" in one language
1756// but sth. different in others - a hard job.
1757fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
1758    // source: <https://stackoverflow.com/questions/2185391/localized-gmail-imap-folders>
1759    const SPAM_NAMES: &[&str] = &[
1760        "spam",
1761        "junk",
1762        "Correio electrónico não solicitado",
1763        "Correo basura",
1764        "Lixo",
1765        "Nettsøppel",
1766        "Nevyžádaná pošta",
1767        "No solicitado",
1768        "Ongewenst",
1769        "Posta indesiderata",
1770        "Skräp",
1771        "Wiadomości-śmieci",
1772        "Önemsiz",
1773        "Ανεπιθύμητα",
1774        "Спам",
1775        "垃圾邮件",
1776        "垃圾郵件",
1777        "迷惑メール",
1778        "스팸",
1779    ];
1780    const TRASH_NAMES: &[&str] = &[
1781        "Trash",
1782        "Bin",
1783        "Caixote do lixo",
1784        "Cestino",
1785        "Corbeille",
1786        "Papelera",
1787        "Papierkorb",
1788        "Papirkurv",
1789        "Papperskorgen",
1790        "Prullenbak",
1791        "Rubujo",
1792        "Κάδος απορριμμάτων",
1793        "Корзина",
1794        "Кошик",
1795        "ゴミ箱",
1796        "垃圾桶",
1797        "已删除邮件",
1798        "휴지통",
1799    ];
1800    let lower = folder_name.to_lowercase();
1801
1802    if lower == "inbox" {
1803        FolderMeaning::Inbox
1804    } else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1805        FolderMeaning::Spam
1806    } else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1807        FolderMeaning::Trash
1808    } else {
1809        FolderMeaning::Unknown
1810    }
1811}
1812
1813fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
1814    for attr in folder_attrs {
1815        match attr {
1816            NameAttribute::Trash => return FolderMeaning::Trash,
1817            NameAttribute::Junk => return FolderMeaning::Spam,
1818            NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
1819            NameAttribute::Extension(label) => {
1820                match label.as_ref() {
1821                    "\\Spam" => return FolderMeaning::Spam,
1822                    "\\Important" => return FolderMeaning::Virtual,
1823                    _ => {}
1824                };
1825            }
1826            _ => {}
1827        }
1828    }
1829    FolderMeaning::Unknown
1830}
1831
1832pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
1833    match get_folder_meaning_by_attrs(folder.attributes()) {
1834        FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
1835        meaning => meaning,
1836    }
1837}
1838
1839/// Parses the headers from the FETCH result.
1840fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader<'_>>> {
1841    match prefetch_msg.header() {
1842        Some(header_bytes) => {
1843            let (headers, _) = mailparse::parse_headers(header_bytes)?;
1844            Ok(headers)
1845        }
1846        None => Ok(Vec::new()),
1847    }
1848}
1849
1850pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
1851    headers
1852        .get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
1853        .or_else(|| headers.get_header_value(HeaderDef::MessageId))
1854        .and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
1855}
1856
1857pub(crate) fn create_message_id() -> String {
1858    format!("{}{}", GENERATED_PREFIX, create_id())
1859}
1860
1861/// Determines whether the message should be downloaded based on prefetched headers.
1862pub(crate) async fn prefetch_should_download(
1863    context: &Context,
1864    headers: &[mailparse::MailHeader<'_>],
1865    message_id: &str,
1866    mut flags: impl Iterator<Item = Flag<'_>>,
1867) -> Result<bool> {
1868    if message::rfc724_mid_download_tried(context, message_id).await? {
1869        if let Some(from) = mimeparser::get_from(headers)
1870            && context.is_self_addr(&from.addr).await?
1871        {
1872            markseen_on_imap_table(context, message_id).await?;
1873        }
1874        return Ok(false);
1875    }
1876
1877    // We do not know the Message-ID or the Message-ID is missing (in this case, we create one in
1878    // the further process).
1879
1880    let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
1881        let from = from.to_ascii_lowercase();
1882        from.contains("mailer-daemon") || from.contains("mail-daemon")
1883    } else {
1884        false
1885    };
1886
1887    let from = match mimeparser::get_from(headers) {
1888        Some(f) => f,
1889        None => return Ok(false),
1890    };
1891    let (_from_id, blocked_contact, _origin) =
1892        match from_field_to_contact_id(context, &from, None, true, true).await? {
1893            Some(res) => res,
1894            None => return Ok(false),
1895        };
1896    // prevent_rename=true as this might be a mailing list message and in this case it would be bad if we rename the contact.
1897    // (prevent_rename is the last argument of from_field_to_contact_id())
1898
1899    // New SecureJoin is fully encrypted,
1900    // but for compatibility we still download legacy `Secure-Join: vc-request` messages.
1901    let is_legacy_securejoin = headers.get_header_value(HeaderDef::SecureJoin).is_some();
1902
1903    let is_encrypted = headers
1904        .get_header_value(HeaderDef::ContentType)
1905        .is_some_and(|content_type| {
1906            mailparse::parse_content_type(&content_type).mimetype == "multipart/encrypted"
1907        });
1908
1909    if flags.any(|f| f == Flag::Draft) {
1910        info!(context, "Ignoring draft message");
1911        return Ok(false);
1912    }
1913
1914    let should_download = maybe_ndn
1915        || (!blocked_contact
1916            && (is_legacy_securejoin
1917                || is_encrypted
1918                || !context.get_config_bool(Config::ForceEncryption).await?));
1919    Ok(should_download)
1920}
1921
1922/// Schedule marking the message as Seen on IMAP by adding all known IMAP messages corresponding to
1923/// the given Message-ID to `imap_markseen` table.
1924pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
1925    context
1926        .sql
1927        .execute(
1928            "INSERT OR IGNORE INTO imap_markseen (id)
1929             SELECT id FROM imap WHERE rfc724_mid=?",
1930            (message_id,),
1931        )
1932        .await?;
1933    context.scheduler.interrupt_inbox().await;
1934
1935    Ok(())
1936}
1937
1938/// uid_next is the next unique identifier value from the last time we fetched a folder
1939/// See <https://tools.ietf.org/html/rfc3501#section-2.3.1.1>
1940/// This function is used to update our uid_next after fetching messages.
1941pub(crate) async fn set_uid_next(
1942    context: &Context,
1943    transport_id: u32,
1944    folder: &str,
1945    uid_next: u32,
1946) -> Result<()> {
1947    context
1948        .sql
1949        .execute(
1950            "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
1951                ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
1952            (transport_id, folder, uid_next),
1953        )
1954        .await?;
1955    Ok(())
1956}
1957
1958/// uid_next is the next unique identifier value from the last time we fetched a folder
1959/// See <https://tools.ietf.org/html/rfc3501#section-2.3.1.1>
1960/// This method returns the uid_next from the last time we fetched messages.
1961/// We can compare this to the current uid_next to find out whether there are new messages
1962/// and fetch from this value on to get all new messages.
1963async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
1964    Ok(context
1965        .sql
1966        .query_get_value(
1967            "SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
1968            (transport_id, folder),
1969        )
1970        .await?
1971        .unwrap_or(0))
1972}
1973
1974pub(crate) async fn set_uidvalidity(
1975    context: &Context,
1976    transport_id: u32,
1977    folder: &str,
1978    uidvalidity: u32,
1979) -> Result<()> {
1980    context
1981        .sql
1982        .execute(
1983            "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
1984                ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
1985            (transport_id, folder, uidvalidity),
1986        )
1987        .await?;
1988    Ok(())
1989}
1990
1991async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
1992    Ok(context
1993        .sql
1994        .query_get_value(
1995            "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
1996            (transport_id, folder),
1997        )
1998        .await?
1999        .unwrap_or(0))
2000}
2001
2002pub(crate) async fn set_modseq(
2003    context: &Context,
2004    transport_id: u32,
2005    folder: &str,
2006    modseq: u64,
2007) -> Result<()> {
2008    context
2009        .sql
2010        .execute(
2011            "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
2012                ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
2013            (transport_id, folder, modseq),
2014        )
2015        .await?;
2016    Ok(())
2017}
2018
2019/// Builds a list of sequence/uid sets. The returned sets have each no more than around 1000
2020/// characters because according to <https://tools.ietf.org/html/rfc2683#section-3.2.1.5>
2021/// command lines should not be much more than 1000 chars (servers should allow at least 8000 chars)
2022#[expect(clippy::arithmetic_side_effects)]
2023fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
2024    // first, try to find consecutive ranges:
2025    let mut ranges: Vec<UidRange> = vec![];
2026
2027    for &current in uids {
2028        if let Some(last) = ranges.last_mut()
2029            && last.end + 1 == current
2030        {
2031            last.end = current;
2032            continue;
2033        }
2034
2035        ranges.push(UidRange {
2036            start: current,
2037            end: current,
2038        });
2039    }
2040
2041    // Second, sort the uids into uid sets that are each below ~1000 characters
2042    let mut result = vec![];
2043    let (mut last_uids, mut last_str) = (Vec::new(), String::new());
2044    for range in ranges {
2045        last_uids.reserve((range.end - range.start + 1).try_into()?);
2046        (range.start..=range.end).for_each(|u| last_uids.push(u));
2047        if !last_str.is_empty() {
2048            last_str.push(',');
2049        }
2050        last_str.push_str(&range.to_string());
2051
2052        if last_str.len() > 990 {
2053            result.push((take(&mut last_uids), take(&mut last_str)));
2054        }
2055    }
2056    result.push((last_uids, last_str));
2057
2058    result.retain(|(_, s)| !s.is_empty());
2059    Ok(result)
2060}
2061
2062struct UidRange {
2063    start: u32,
2064    end: u32,
2065    // If start == end, then this range represents a single number
2066}
2067
2068impl std::fmt::Display for UidRange {
2069    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2070        if self.start == self.end {
2071            write!(f, "{}", self.start)
2072        } else {
2073            write!(f, "{}:{}", self.start, self.end)
2074        }
2075    }
2076}
2077
2078#[cfg(test)]
2079mod imap_tests;