1use std::{
7 cmp::max,
8 cmp::min,
9 collections::{BTreeMap, BTreeSet, HashMap},
10 iter::Peekable,
11 mem::take,
12 sync::atomic::Ordering,
13 time::{Duration, UNIX_EPOCH},
14};
15
16use anyhow::{Context as _, Result, bail, ensure, format_err};
17use async_channel::{self, Receiver, Sender};
18use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
19use futures::{FutureExt as _, TryStreamExt};
20use futures_lite::FutureExt;
21use ratelimit::Ratelimit;
22use url::Url;
23
24use crate::calls::{
25 UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata,
26};
27use crate::chat::{self, ChatId, ChatIdBlocked, add_device_msg};
28use crate::chatlist_events;
29use crate::config::Config;
30use crate::constants::{self, Blocked, DC_VERSION_STR};
31use crate::contact::ContactId;
32use crate::context::Context;
33use crate::events::EventType;
34use crate::headerdef::{HeaderDef, HeaderDefMap};
35use crate::log::{LogExt, warn};
36use crate::message::{self, Message, MessageState, MessengerMessage, MsgId};
37use crate::mimeparser;
38use crate::net::proxy::ProxyConfig;
39use crate::net::session::SessionStream;
40use crate::oauth2::get_oauth2_access_token;
41use crate::push::encrypt_device_token;
42use crate::receive_imf::{
43 ReceivedMsg, from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner,
44};
45use crate::scheduler::connectivity::ConnectivityStore;
46use crate::stock_str;
47use crate::tools::{self, create_id, duration_to_str, time};
48use crate::transport::{
49 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
50};
51
52pub(crate) mod capabilities;
53mod client;
54mod idle;
55pub mod select_folder;
56pub(crate) mod session;
57
58use client::{Client, determine_capabilities};
59use session::Session;
60
61pub(crate) const GENERATED_PREFIX: &str = "GEN_";
62
63const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
64 MESSAGE-ID \
65 X-MICROSOFT-ORIGINAL-MESSAGE-ID\
66 )])";
67const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
68
69#[derive(Debug)]
70pub(crate) struct Imap {
71 transport_id: u32,
75
76 pub(crate) idle_interrupt_receiver: Receiver<()>,
77
78 pub(crate) addr: String,
80
81 lp: Vec<ConfiguredServerLoginParam>,
83
84 password: String,
86
87 proxy_config: Option<ProxyConfig>,
89
90 strict_tls: bool,
91
92 oauth2: bool,
93
94 authentication_failed_once: bool,
95
96 pub(crate) connectivity: ConnectivityStore,
97
98 conn_last_try: tools::Time,
99 conn_backoff_ms: u64,
100
101 ratelimit: Ratelimit,
109
110 pub(crate) resync_request_sender: async_channel::Sender<()>,
112
113 pub(crate) resync_request_receiver: async_channel::Receiver<()>,
115}
116
117#[derive(Debug)]
118struct OAuth2 {
119 user: String,
120 access_token: String,
121}
122
123#[derive(Debug, Default)]
124pub(crate) struct ServerMetadata {
125 pub comment: Option<String>,
128
129 pub admin: Option<String>,
132
133 pub iroh_relay: Option<Url>,
134
135 pub ice_servers: Vec<UnresolvedIceServer>,
137
138 pub ice_servers_expiration_timestamp: i64,
145}
146
147impl async_imap::Authenticator for OAuth2 {
148 type Response = String;
149
150 fn process(&mut self, _data: &[u8]) -> Self::Response {
151 format!(
152 "user={}\x01auth=Bearer {}\x01\x01",
153 self.user, self.access_token
154 )
155 }
156}
157
158#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
159pub enum FolderMeaning {
160 Unknown,
161
162 Spam,
164 Inbox,
165 Mvbox,
166 Trash,
167
168 Virtual,
175}
176
177impl FolderMeaning {
178 pub fn to_config(self) -> Option<Config> {
179 match self {
180 FolderMeaning::Unknown => None,
181 FolderMeaning::Spam => None,
182 FolderMeaning::Inbox => Some(Config::ConfiguredInboxFolder),
183 FolderMeaning::Mvbox => Some(Config::ConfiguredMvboxFolder),
184 FolderMeaning::Trash => None,
185 FolderMeaning::Virtual => None,
186 }
187 }
188}
189
190struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
191 inner: Peekable<T>,
192}
193
194impl<T, I> From<I> for UidGrouper<T>
195where
196 T: Iterator<Item = (i64, u32, String)>,
197 I: IntoIterator<IntoIter = T>,
198{
199 fn from(inner: I) -> Self {
200 Self {
201 inner: inner.into_iter().peekable(),
202 }
203 }
204}
205
206impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
207 type Item = (String, Vec<i64>, String);
209
210 #[expect(clippy::arithmetic_side_effects)]
211 fn next(&mut self) -> Option<Self::Item> {
212 let (_, _, folder) = self.inner.peek().cloned()?;
213
214 let mut uid_set = String::new();
215 let mut rowid_set = Vec::new();
216
217 while uid_set.len() < 1000 {
218 if let Some((start_rowid, start_uid, _)) = self
220 .inner
221 .next_if(|(_, _, start_folder)| start_folder == &folder)
222 {
223 rowid_set.push(start_rowid);
224 let mut end_uid = start_uid;
225
226 while let Some((next_rowid, next_uid, _)) =
227 self.inner.next_if(|(_, next_uid, next_folder)| {
228 next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
229 })
230 {
231 end_uid = next_uid;
232 rowid_set.push(next_rowid);
233 }
234
235 let uid_range = UidRange {
236 start: start_uid,
237 end: end_uid,
238 };
239 if !uid_set.is_empty() {
240 uid_set.push(',');
241 }
242 uid_set.push_str(&uid_range.to_string());
243 } else {
244 break;
245 }
246 }
247
248 Some((folder, rowid_set, uid_set))
249 }
250}
251
252impl Imap {
253 pub async fn new(
255 context: &Context,
256 transport_id: u32,
257 param: ConfiguredLoginParam,
258 idle_interrupt_receiver: Receiver<()>,
259 ) -> Result<Self> {
260 let lp = param.imap.clone();
261 let password = param.imap_password.clone();
262 let proxy_config = ProxyConfig::load(context).await?;
263 let addr = ¶m.addr;
264 let strict_tls = param.strict_tls(proxy_config.is_some());
265 let oauth2 = param.oauth2;
266 let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
267 Ok(Imap {
268 transport_id,
269 idle_interrupt_receiver,
270 addr: addr.to_string(),
271 lp,
272 password,
273 proxy_config,
274 strict_tls,
275 oauth2,
276 authentication_failed_once: false,
277 connectivity: Default::default(),
278 conn_last_try: UNIX_EPOCH,
279 conn_backoff_ms: 0,
280 ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
282 resync_request_sender,
283 resync_request_receiver,
284 })
285 }
286
287 pub async fn new_configured(
289 context: &Context,
290 idle_interrupt_receiver: Receiver<()>,
291 ) -> Result<Self> {
292 let (transport_id, param) = ConfiguredLoginParam::load(context)
293 .await?
294 .context("Not configured")?;
295 let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
296 Ok(imap)
297 }
298
299 pub fn transport_id(&self) -> u32 {
301 self.transport_id
302 }
303
304 pub(crate) async fn connect(
310 &mut self,
311 context: &Context,
312 configuring: bool,
313 ) -> Result<Session> {
314 let now = tools::Time::now();
315 let until_can_send = max(
316 min(self.conn_last_try, now)
317 .checked_add(Duration::from_millis(self.conn_backoff_ms))
318 .unwrap_or(now),
319 now,
320 )
321 .duration_since(now)?;
322 let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
323 if !ratelimit_duration.is_zero() {
324 warn!(
325 context,
326 "IMAP got rate limited, waiting for {} until can connect.",
327 duration_to_str(ratelimit_duration),
328 );
329 let interrupted = async {
330 tokio::time::sleep(ratelimit_duration).await;
331 false
332 }
333 .race(self.idle_interrupt_receiver.recv().map(|_| true))
334 .await;
335 if interrupted {
336 info!(
337 context,
338 "Connecting to IMAP without waiting for ratelimit due to interrupt."
339 );
340 }
341 }
342
343 info!(context, "Connecting to IMAP server.");
344 self.connectivity.set_connecting(context);
345
346 self.conn_last_try = tools::Time::now();
347 const BACKOFF_MIN_MS: u64 = 2000;
348 const BACKOFF_MAX_MS: u64 = 80_000;
349 self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
350 self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(rand::random_range(
351 (self.conn_backoff_ms / 2)..=self.conn_backoff_ms,
352 ));
353 self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
354
355 let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
356 let mut first_error = None;
357 for lp in login_params {
358 info!(context, "IMAP trying to connect to {}.", &lp.connection);
359 let connection_candidate = lp.connection.clone();
360 let client = match Client::connect(
361 context,
362 self.proxy_config.clone(),
363 self.strict_tls,
364 &connection_candidate,
365 )
366 .await
367 .with_context(|| format!("IMAP failed to connect to {connection_candidate}"))
368 {
369 Ok(client) => client,
370 Err(err) => {
371 warn!(context, "{err:#}.");
372 first_error.get_or_insert(err);
373 continue;
374 }
375 };
376
377 self.conn_backoff_ms = BACKOFF_MIN_MS;
378 self.ratelimit.send();
379
380 let imap_user: &str = lp.user.as_ref();
381 let imap_pw: &str = &self.password;
382
383 let login_res = if self.oauth2 {
384 info!(context, "Logging into IMAP server with OAuth 2.");
385 let addr: &str = self.addr.as_ref();
386
387 let token = get_oauth2_access_token(context, addr, imap_pw, true)
388 .await?
389 .context("IMAP could not get OAUTH token")?;
390 let auth = OAuth2 {
391 user: imap_user.into(),
392 access_token: token,
393 };
394 client.authenticate("XOAUTH2", auth).await
395 } else {
396 info!(context, "Logging into IMAP server with LOGIN.");
397 client.login(imap_user, imap_pw).await
398 };
399
400 match login_res {
401 Ok(mut session) => {
402 let capabilities = determine_capabilities(&mut session).await?;
403 let resync_request_sender = self.resync_request_sender.clone();
404
405 let session = if capabilities.can_compress {
406 info!(context, "Enabling IMAP compression.");
407 let compressed_session = session
408 .compress(|s| {
409 let session_stream: Box<dyn SessionStream> = Box::new(s);
410 session_stream
411 })
412 .await
413 .context("Failed to enable IMAP compression")?;
414 Session::new(
415 compressed_session,
416 capabilities,
417 resync_request_sender,
418 self.transport_id,
419 )
420 } else {
421 Session::new(
422 session,
423 capabilities,
424 resync_request_sender,
425 self.transport_id,
426 )
427 };
428
429 let mut lock = context.server_id.write().await;
431 lock.clone_from(&session.capabilities.server_id);
432
433 self.authentication_failed_once = false;
434 context.emit_event(EventType::ImapConnected(format!(
435 "IMAP-LOGIN as {}",
436 lp.user
437 )));
438 self.connectivity.set_preparing(context);
439 info!(context, "Successfully logged into IMAP server.");
440 return Ok(session);
441 }
442
443 Err(err) => {
444 let imap_user = lp.user.to_owned();
445 let message = stock_str::cannot_login(context, &imap_user);
446
447 warn!(context, "IMAP failed to login: {err:#}.");
448 first_error.get_or_insert(format_err!("{message} ({err:#})"));
449
450 let _lock = context.wrong_pw_warning_mutex.lock().await;
452 if err.to_string().to_lowercase().contains("authentication") {
453 if self.authentication_failed_once
454 && !configuring
455 && context.get_config_bool(Config::NotifyAboutWrongPw).await?
456 {
457 let mut msg = Message::new_text(message);
458 if let Err(e) = chat::add_device_msg_with_importance(
459 context,
460 None,
461 Some(&mut msg),
462 true,
463 )
464 .await
465 {
466 warn!(context, "Failed to add device message: {e:#}.");
467 } else {
468 context
469 .set_config_internal(Config::NotifyAboutWrongPw, None)
470 .await
471 .log_err(context)
472 .ok();
473 }
474 } else {
475 self.authentication_failed_once = true;
476 }
477 } else {
478 self.authentication_failed_once = false;
479 }
480 }
481 }
482 }
483
484 Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
485 }
486
487 pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
492 let configuring = false;
493 let mut session = match self.connect(context, configuring).await {
494 Ok(session) => session,
495 Err(err) => {
496 self.connectivity.set_err(context, &err);
497 return Err(err);
498 }
499 };
500
501 let folders_configured = context
502 .sql
503 .get_raw_config_int(constants::DC_FOLDERS_CONFIGURED_KEY)
504 .await?;
505 if folders_configured.unwrap_or_default() < constants::DC_FOLDERS_CONFIGURED_VERSION {
506 self.configure_folders(context, &mut session).await?;
507 }
508
509 Ok(session)
510 }
511
512 pub async fn fetch_move_delete(
517 &mut self,
518 context: &Context,
519 session: &mut Session,
520 watch_folder: &str,
521 folder_meaning: FolderMeaning,
522 ) -> Result<()> {
523 if !context.sql.is_open().await {
524 bail!("IMAP operation attempted while it is torn down");
526 }
527
528 let msgs_fetched = self
529 .fetch_new_messages(context, session, watch_folder, folder_meaning)
530 .await
531 .context("fetch_new_messages")?;
532 if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
533 context.scheduler.interrupt_ephemeral_task().await;
538 }
539
540 session
541 .move_delete_messages(context, watch_folder)
542 .await
543 .context("move_delete_messages")?;
544
545 Ok(())
546 }
547
548 #[expect(clippy::arithmetic_side_effects)]
552 pub(crate) async fn fetch_new_messages(
553 &mut self,
554 context: &Context,
555 session: &mut Session,
556 folder: &str,
557 folder_meaning: FolderMeaning,
558 ) -> Result<bool> {
559 let transport_id = session.transport_id();
560
561 if should_ignore_folder(context, folder, folder_meaning).await? {
562 info!(
563 context,
564 "Transport {transport_id}: Not fetching from {folder:?}."
565 );
566 session.new_mail = false;
567 return Ok(false);
568 }
569
570 let folder_exists = session
571 .select_with_uidvalidity(context, folder)
572 .await
573 .with_context(|| format!("Failed to select folder {folder:?}"))?;
574
575 if !session.new_mail {
576 info!(
577 context,
578 "Transport {transport_id}: No new emails in folder {folder:?}."
579 );
580 return Ok(false);
581 }
582 session.new_mail = false;
585
586 if !folder_exists {
587 return Ok(false);
588 }
589
590 let mut read_cnt = 0;
591 loop {
592 let (n, fetch_more) = self
593 .fetch_new_msg_batch(context, session, folder, folder_meaning)
594 .await?;
595 read_cnt += n;
596 if !fetch_more {
597 return Ok(read_cnt > 0);
598 }
599 }
600 }
601
602 #[expect(clippy::arithmetic_side_effects)]
604 async fn fetch_new_msg_batch(
605 &mut self,
606 context: &Context,
607 session: &mut Session,
608 folder: &str,
609 folder_meaning: FolderMeaning,
610 ) -> Result<(usize, bool)> {
611 let transport_id = self.transport_id;
612 let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
613 let old_uid_next = get_uid_next(context, transport_id, folder).await?;
614 info!(
615 context,
616 "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
617 );
618
619 let uids_to_prefetch = 500;
620 let msgs = session
621 .prefetch(old_uid_next, uids_to_prefetch)
622 .await
623 .context("prefetch")?;
624 let read_cnt = msgs.len();
625 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
626
627 let mut uids_fetch: Vec<u32> = Vec::new();
628 let mut available_post_msgs: Vec<String> = Vec::new();
629 let mut download_later: Vec<String> = Vec::new();
630 let mut uid_message_ids = BTreeMap::new();
631 let mut largest_uid_skipped = None;
632
633 let download_limit: Option<u32> = context
634 .get_config_parsed(Config::DownloadLimit)
635 .await?
636 .filter(|&l| 0 < l);
637
638 for (uid, ref fetch_response) in msgs {
640 let headers = match get_fetch_headers(fetch_response) {
641 Ok(headers) => headers,
642 Err(err) => {
643 warn!(context, "Failed to parse FETCH headers: {err:#}.");
644 continue;
645 }
646 };
647
648 let message_id = prefetch_get_message_id(&headers);
649 let size = fetch_response
650 .size
651 .context("imap fetch response does not contain size")?;
652
653 let delete = if let Some(message_id) = &message_id {
664 message::rfc724_mid_exists_ex(context, message_id, "deleted=1")
665 .await?
666 .is_some_and(|(_msg_id, deleted)| deleted)
667 } else {
668 false
669 };
670
671 let message_id = message_id.unwrap_or_else(create_message_id);
674
675 if delete {
676 info!(context, "Deleting locally deleted message {message_id}.");
677 }
678
679 let _target;
680 let target = if delete {
681 ""
682 } else {
683 _target = target_folder(context, folder, folder_meaning, &headers).await?;
684 &_target
685 };
686
687 context
688 .sql
689 .execute(
690 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
691 VALUES (?, ?, ?, ?, ?, ?)
692 ON CONFLICT(transport_id, folder, uid, uidvalidity)
693 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
694 target=excluded.target",
695 (
696 self.transport_id,
697 &message_id,
698 &folder,
699 uid,
700 uid_validity,
701 target,
702 ),
703 )
704 .await?;
705
706 if folder == target
713 && folder_meaning != FolderMeaning::Spam
718 && prefetch_should_download(
719 context,
720 &headers,
721 &message_id,
722 fetch_response.flags(),
723 )
724 .await.context("prefetch_should_download")?
725 {
726 if headers
727 .get_header_value(HeaderDef::ChatIsPostMessage)
728 .is_some()
729 {
730 info!(context, "{message_id:?} is a post-message.");
731 available_post_msgs.push(message_id.clone());
732
733 if download_limit.is_none_or(|download_limit| size <= download_limit) {
734 download_later.push(message_id.clone());
735 }
736 largest_uid_skipped = Some(uid);
737 } else {
738 info!(context, "{message_id:?} is not a post-message.");
739 if download_limit.is_none_or(|download_limit| size <= download_limit) {
740 uids_fetch.push(uid);
741 uid_message_ids.insert(uid, message_id);
742 } else {
743 download_later.push(message_id.clone());
744 largest_uid_skipped = Some(uid);
745 }
746 };
747 } else {
748 largest_uid_skipped = Some(uid);
749 }
750 }
751
752 if !uids_fetch.is_empty() {
753 self.connectivity.set_working(context);
754 }
755
756 let (sender, receiver) = async_channel::unbounded();
757
758 let mut received_msgs = Vec::with_capacity(uids_fetch.len());
759 let mailbox_uid_next = session
760 .selected_mailbox
761 .as_ref()
762 .with_context(|| format!("Expected {folder:?} to be selected"))?
763 .uid_next
764 .unwrap_or_default();
765
766 let update_uids_future = async {
767 let mut largest_uid_fetched: u32 = 0;
768
769 while let Ok((uid, received_msg_opt)) = receiver.recv().await {
770 largest_uid_fetched = max(largest_uid_fetched, uid);
771 if let Some(received_msg) = received_msg_opt {
772 received_msgs.push(received_msg)
773 }
774 }
775
776 largest_uid_fetched
777 };
778
779 let actually_download_messages_future = async {
780 session
781 .fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
782 .await
783 .context("fetch_many_msgs")
784 };
785
786 let (largest_uid_fetched, fetch_res) =
787 tokio::join!(update_uids_future, actually_download_messages_future);
788
789 let mut new_uid_next = largest_uid_fetched + 1;
795 let fetch_more = fetch_res.is_ok() && {
796 let prefetch_uid_next = old_uid_next + uids_to_prefetch;
797 new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
801
802 new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
803
804 prefetch_uid_next < mailbox_uid_next
805 };
806 if new_uid_next > old_uid_next {
807 set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
808 }
809
810 info!(context, "{} mails read from \"{}\".", read_cnt, folder);
811
812 if !received_msgs.is_empty() {
813 context.emit_event(EventType::IncomingMsgBunch);
814 }
815
816 chat::mark_old_messages_as_noticed(context, received_msgs).await?;
817
818 if fetch_res.is_ok() {
819 info!(
820 context,
821 "available_post_msgs: {}, download_later: {}.",
822 available_post_msgs.len(),
823 download_later.len(),
824 );
825 let trans_fn = |t: &mut rusqlite::Transaction| {
826 let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
827 for rfc724_mid in available_post_msgs {
828 stmt.execute((rfc724_mid,))
829 .context("INSERT OR IGNORE INTO available_post_msgs")?;
830 }
831 let mut stmt =
832 t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
833 for rfc724_mid in download_later {
834 stmt.execute((rfc724_mid,))
835 .context("INSERT OR IGNORE INTO download")?;
836 }
837 Ok(())
838 };
839 context.sql.transaction(trans_fn).await?;
840 }
841
842 fetch_res?;
845
846 Ok((read_cnt, fetch_more))
847 }
848}
849
850impl Session {
851 pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
853 let all_folders = self
854 .list_folders()
855 .await
856 .context("listing folders for resync")?;
857 for folder in all_folders {
858 let folder_meaning = get_folder_meaning(&folder);
859 if !matches!(
860 folder_meaning,
861 FolderMeaning::Virtual | FolderMeaning::Unknown
862 ) {
863 self.resync_folder_uids(context, folder.name(), folder_meaning)
864 .await?;
865 }
866 }
867 Ok(())
868 }
869
870 pub(crate) async fn resync_folder_uids(
877 &mut self,
878 context: &Context,
879 folder: &str,
880 folder_meaning: FolderMeaning,
881 ) -> Result<()> {
882 let uid_validity;
883 let mut msgs = BTreeMap::new();
885
886 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
887 let transport_id = self.transport_id();
888 if folder_exists {
889 let mut list = self
890 .uid_fetch("1:*", RFC724MID_UID)
891 .await
892 .with_context(|| format!("Can't resync folder {folder}"))?;
893 while let Some(fetch) = list.try_next().await? {
894 let headers = match get_fetch_headers(&fetch) {
895 Ok(headers) => headers,
896 Err(err) => {
897 warn!(context, "Failed to parse FETCH headers: {}", err);
898 continue;
899 }
900 };
901 let message_id = prefetch_get_message_id(&headers);
902
903 if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
904 msgs.insert(
905 uid,
906 (
907 rfc724_mid,
908 target_folder(context, folder, folder_meaning, &headers).await?,
909 ),
910 );
911 }
912 }
913
914 info!(
915 context,
916 "resync_folder_uids: Collected {} message IDs in {folder}.",
917 msgs.len(),
918 );
919
920 uid_validity = get_uidvalidity(context, transport_id, folder).await?;
921 } else {
922 warn!(context, "resync_folder_uids: No folder {folder}.");
923 uid_validity = 0;
924 }
925
926 context
928 .sql
929 .transaction(move |transaction| {
930 transaction.execute("DELETE FROM imap WHERE transport_id=? AND folder=?", (transport_id, folder,))?;
931 for (uid, (rfc724_mid, target)) in &msgs {
932 transaction.execute(
935 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
936 VALUES (?, ?, ?, ?, ?, ?)
937 ON CONFLICT(transport_id, folder, uid, uidvalidity)
938 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
939 target=excluded.target",
940 (transport_id, rfc724_mid, folder, uid, uid_validity, target),
941 )?;
942 }
943 Ok(())
944 })
945 .await?;
946 Ok(())
947 }
948
949 async fn delete_message_batch(
952 &mut self,
953 context: &Context,
954 uid_set: &str,
955 row_ids: Vec<i64>,
956 ) -> Result<()> {
957 self.add_flag_finalized_with_set(uid_set, "\\Deleted")
959 .await?;
960 context
961 .sql
962 .transaction(|transaction| {
963 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
964 for row_id in row_ids {
965 stmt.execute((row_id,))?;
966 }
967 Ok(())
968 })
969 .await
970 .context("Cannot remove deleted messages from imap table")?;
971
972 context.emit_event(EventType::ImapMessageDeleted(format!(
973 "IMAP messages {uid_set} marked as deleted"
974 )));
975 Ok(())
976 }
977
978 async fn move_message_batch(
981 &mut self,
982 context: &Context,
983 set: &str,
984 row_ids: Vec<i64>,
985 target: &str,
986 ) -> Result<()> {
987 if self.can_move() {
988 match self.uid_mv(set, &target).await {
989 Ok(()) => {
990 context
992 .sql
993 .transaction(|transaction| {
994 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
995 for row_id in row_ids {
996 stmt.execute((row_id,))?;
997 }
998 Ok(())
999 })
1000 .await
1001 .context("Cannot delete moved messages from imap table")?;
1002 context.emit_event(EventType::ImapMessageMoved(format!(
1003 "IMAP messages {set} moved to {target}"
1004 )));
1005 return Ok(());
1006 }
1007 Err(err) => {
1008 warn!(
1009 context,
1010 "Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
1011 set,
1012 target,
1013 err
1014 );
1015 }
1016 }
1017 }
1018
1019 info!(
1022 context,
1023 "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
1024 );
1025 self.uid_copy(&set, &target).await?;
1026 context
1027 .sql
1028 .transaction(|transaction| {
1029 let mut stmt = transaction.prepare("UPDATE imap SET target='' WHERE id = ?")?;
1030 for row_id in row_ids {
1031 stmt.execute((row_id,))?;
1032 }
1033 Ok(())
1034 })
1035 .await
1036 .context("Cannot plan deletion of messages")?;
1037 context.emit_event(EventType::ImapMessageMoved(format!(
1038 "IMAP messages {set} copied to {target}"
1039 )));
1040 Ok(())
1041 }
1042
1043 async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
1047 let transport_id = self.transport_id();
1048 let rows = context
1049 .sql
1050 .query_map_vec(
1051 "SELECT id, uid, target FROM imap
1052 WHERE folder = ?
1053 AND transport_id = ?
1054 AND target != folder
1055 ORDER BY target, uid",
1056 (folder, transport_id),
1057 |row| {
1058 let rowid: i64 = row.get(0)?;
1059 let uid: u32 = row.get(1)?;
1060 let target: String = row.get(2)?;
1061 Ok((rowid, uid, target))
1062 },
1063 )
1064 .await?;
1065
1066 for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
1067 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1072 ensure!(folder_exists, "No folder {folder}");
1073
1074 if target.is_empty() {
1076 self.delete_message_batch(context, &uid_set, rowid_set)
1077 .await
1078 .with_context(|| format!("cannot delete batch of messages {:?}", &uid_set))?;
1079 } else {
1080 self.move_message_batch(context, &uid_set, rowid_set, &target)
1081 .await
1082 .with_context(|| {
1083 format!(
1084 "cannot move batch of messages {:?} to folder {:?}",
1085 &uid_set, target
1086 )
1087 })?;
1088 }
1089 }
1090
1091 if let Err(err) = self.maybe_close_folder(context).await {
1094 warn!(context, "Failed to close folder: {err:#}.");
1095 }
1096
1097 Ok(())
1098 }
1099
1100 pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
1102 if context.get_config_bool(Config::TeamProfile).await? {
1103 return Ok(());
1104 }
1105
1106 context
1107 .sql
1108 .execute(
1109 "DELETE FROM imap_markseen WHERE id NOT IN (SELECT imap.id FROM imap)",
1110 (),
1111 )
1112 .await?;
1113
1114 let transport_id = self.transport_id();
1115 let mut rows = context
1116 .sql
1117 .query_map_vec(
1118 "SELECT imap.id, uid, folder FROM imap, imap_markseen
1119 WHERE imap.id = imap_markseen.id
1120 AND imap.transport_id=?
1121 AND target = folder",
1122 (transport_id,),
1123 |row| {
1124 let rowid: i64 = row.get(0)?;
1125 let uid: u32 = row.get(1)?;
1126 let folder: String = row.get(2)?;
1127 Ok((rowid, uid, folder))
1128 },
1129 )
1130 .await?;
1131
1132 rows.sort_unstable_by(|(_rowid1, uid1, folder1), (_rowid2, uid2, folder2)| {
1139 (folder1, uid1).cmp(&(folder2, uid2))
1140 });
1141
1142 for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
1143 let folder_exists = match self.select_with_uidvalidity(context, &folder).await {
1144 Err(err) => {
1145 warn!(
1146 context,
1147 "store_seen_flags_on_imap: Failed to select {folder}, will retry later: {err:#}."
1148 );
1149 continue;
1150 }
1151 Ok(folder_exists) => folder_exists,
1152 };
1153 if !folder_exists {
1154 warn!(context, "store_seen_flags_on_imap: No folder {folder}.");
1155 } else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
1156 warn!(
1157 context,
1158 "Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}."
1159 );
1160 continue;
1161 } else {
1162 info!(
1163 context,
1164 "Marked messages {} in folder {} as seen.", uid_set, folder
1165 );
1166 }
1167 context
1168 .sql
1169 .transaction(|transaction| {
1170 let mut stmt = transaction.prepare("DELETE FROM imap_markseen WHERE id = ?")?;
1171 for rowid in rowid_set {
1172 stmt.execute((rowid,))?;
1173 }
1174 Ok(())
1175 })
1176 .await
1177 .context("Cannot remove messages marked as seen from imap_markseen table")?;
1178 }
1179
1180 Ok(())
1181 }
1182
1183 pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
1185 if !self.can_condstore() {
1186 info!(
1187 context,
1188 "Server does not support CONDSTORE, skipping flag synchronization."
1189 );
1190 return Ok(());
1191 }
1192
1193 if context.get_config_bool(Config::TeamProfile).await? {
1194 return Ok(());
1195 }
1196
1197 let folder_exists = self
1198 .select_with_uidvalidity(context, folder)
1199 .await
1200 .context("Failed to select folder")?;
1201 if !folder_exists {
1202 return Ok(());
1203 }
1204
1205 let mailbox = self
1206 .selected_mailbox
1207 .as_ref()
1208 .with_context(|| format!("No mailbox selected, folder: {folder}"))?;
1209
1210 if mailbox.highest_modseq.is_none() {
1213 info!(
1214 context,
1215 "Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
1216 );
1217 return Ok(());
1218 }
1219
1220 let transport_id = self.transport_id();
1221 let mut updated_chat_ids = BTreeSet::new();
1222 let uid_validity = get_uidvalidity(context, transport_id, folder)
1223 .await
1224 .with_context(|| format!("failed to get UID validity for folder {folder}"))?;
1225 let mut highest_modseq = get_modseq(context, transport_id, folder)
1226 .await
1227 .with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
1228 let mut list = self
1229 .uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
1230 .await
1231 .context("failed to fetch flags")?;
1232
1233 let mut got_unsolicited_fetch = false;
1234
1235 while let Some(fetch) = list
1236 .try_next()
1237 .await
1238 .context("failed to get FETCH result")?
1239 {
1240 let uid = if let Some(uid) = fetch.uid {
1241 uid
1242 } else {
1243 info!(context, "FETCH result contains no UID, skipping");
1244 got_unsolicited_fetch = true;
1245 continue;
1246 };
1247 let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
1248 if is_seen
1249 && let Some(chat_id) = mark_seen_by_uid(context, transport_id, folder, uid_validity, uid)
1250 .await
1251 .with_context(|| {
1252 format!("Transport {transport_id}: Failed to update seen status for msg {folder}/{uid}")
1253 })?
1254 {
1255 updated_chat_ids.insert(chat_id);
1256 }
1257
1258 if let Some(modseq) = fetch.modseq {
1259 if modseq > highest_modseq {
1260 highest_modseq = modseq;
1261 }
1262 } else {
1263 warn!(context, "FETCH result contains no MODSEQ");
1264 }
1265 }
1266 drop(list);
1267
1268 if got_unsolicited_fetch {
1269 info!(context, "Got unsolicited fetch, will skip idle");
1274 self.new_mail = true;
1275 }
1276
1277 set_modseq(context, transport_id, folder, highest_modseq)
1278 .await
1279 .with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
1280 if !updated_chat_ids.is_empty() {
1281 context.on_archived_chats_maybe_noticed();
1282 }
1283 for updated_chat_id in updated_chat_ids {
1284 context.emit_event(EventType::MsgsNoticed(updated_chat_id));
1285 chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
1286 }
1287
1288 Ok(())
1289 }
1290
1291 #[expect(clippy::arithmetic_side_effects)]
1306 pub(crate) async fn fetch_many_msgs(
1307 &mut self,
1308 context: &Context,
1309 folder: &str,
1310 request_uids: Vec<u32>,
1311 uid_message_ids: &BTreeMap<u32, String>,
1312 received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1313 ) -> Result<()> {
1314 if request_uids.is_empty() {
1315 return Ok(());
1316 }
1317
1318 for (request_uids, set) in build_sequence_sets(&request_uids)? {
1319 info!(context, "Starting UID FETCH of message set \"{}\".", set);
1320 let mut fetch_responses = self.uid_fetch(&set, BODY_FULL).await.with_context(|| {
1321 format!("fetching messages {} from folder \"{}\"", &set, folder)
1322 })?;
1323
1324 let mut uid_msgs = HashMap::with_capacity(request_uids.len());
1327
1328 let mut count = 0;
1329 for &request_uid in &request_uids {
1330 let mut fetch_response = uid_msgs.remove(&request_uid);
1332
1333 while fetch_response.is_none() {
1335 let Some(next_fetch_response) = fetch_responses
1336 .try_next()
1337 .await
1338 .context("Failed to process IMAP FETCH result")?
1339 else {
1340 break;
1342 };
1343
1344 if let Some(next_uid) = next_fetch_response.uid {
1345 if next_uid == request_uid {
1346 fetch_response = Some(next_fetch_response);
1347 } else if !request_uids.contains(&next_uid) {
1348 info!(
1355 context,
1356 "Skipping not requested FETCH response for UID {}.", next_uid
1357 );
1358 } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
1359 warn!(context, "Got duplicated UID {}.", next_uid);
1360 }
1361 } else {
1362 info!(context, "Skipping FETCH response without UID.");
1363 }
1364 }
1365
1366 let fetch_response = match fetch_response {
1367 Some(fetch) => fetch,
1368 None => {
1369 warn!(
1370 context,
1371 "Missed UID {} in the server response.", request_uid
1372 );
1373 continue;
1374 }
1375 };
1376 count += 1;
1377
1378 let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
1379 let body = fetch_response.body();
1380
1381 if is_deleted {
1382 info!(context, "Not processing deleted msg {}.", request_uid);
1383 received_msgs_channel.send((request_uid, None)).await?;
1384 continue;
1385 }
1386
1387 let body = if let Some(body) = body {
1388 body
1389 } else {
1390 info!(
1391 context,
1392 "Not processing message {} without a BODY.", request_uid
1393 );
1394 received_msgs_channel.send((request_uid, None)).await?;
1395 continue;
1396 };
1397
1398 let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
1399
1400 let Some(rfc724_mid) = uid_message_ids.get(&request_uid) else {
1401 error!(
1402 context,
1403 "No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
1404 request_uid
1405 );
1406 continue;
1407 };
1408
1409 info!(
1410 context,
1411 "Passing message UID {} to receive_imf().", request_uid
1412 );
1413 let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
1414 let received_msg = match res {
1415 Err(err) => {
1416 warn!(context, "receive_imf error: {err:#}.");
1417
1418 let text = format!(
1419 "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/.",
1420 );
1421 let mut msg = Message::new_text(text);
1422 add_device_msg(context, None, Some(&mut msg)).await?;
1423 None
1424 }
1425 Ok(msg) => msg,
1426 };
1427 received_msgs_channel
1428 .send((request_uid, received_msg))
1429 .await?;
1430 }
1431
1432 while fetch_responses
1439 .try_next()
1440 .await
1441 .context("Failed to drain FETCH responses")?
1442 .is_some()
1443 {}
1444
1445 if count != request_uids.len() {
1446 warn!(
1447 context,
1448 "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
1449 count,
1450 request_uids.len(),
1451 request_uids,
1452 );
1453 } else {
1454 info!(
1455 context,
1456 "Successfully received {} UIDs.",
1457 request_uids.len()
1458 );
1459 }
1460 }
1461
1462 Ok(())
1463 }
1464
1465 #[expect(clippy::arithmetic_side_effects)]
1471 pub(crate) async fn update_metadata(&mut self, context: &Context) -> Result<()> {
1472 let mut lock = context.metadata.write().await;
1473
1474 if !self.can_metadata() {
1475 *lock = Some(Default::default());
1476 }
1477 if let Some(ref mut old_metadata) = *lock {
1478 let now = time();
1479
1480 if now + 3600 * 12 < old_metadata.ice_servers_expiration_timestamp {
1482 return Ok(());
1483 }
1484
1485 let mut got_turn_server = false;
1486 if self.can_metadata() {
1487 info!(context, "ICE servers expired, requesting new credentials.");
1488 let mailbox = "";
1489 let options = "";
1490 let metadata = self
1491 .get_metadata(mailbox, options, "(/shared/vendor/deltachat/turn)")
1492 .await?;
1493 for m in metadata {
1494 if m.entry == "/shared/vendor/deltachat/turn"
1495 && let Some(value) = m.value
1496 {
1497 match create_ice_servers_from_metadata(&value).await {
1498 Ok((parsed_timestamp, parsed_ice_servers)) => {
1499 old_metadata.ice_servers_expiration_timestamp = parsed_timestamp;
1500 old_metadata.ice_servers = parsed_ice_servers;
1501 got_turn_server = true;
1502 }
1503 Err(err) => {
1504 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1505 }
1506 }
1507 }
1508 }
1509 }
1510 if !got_turn_server {
1511 info!(context, "Will use fallback ICE servers.");
1512 old_metadata.ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1514 old_metadata.ice_servers = create_fallback_ice_servers();
1515 }
1516 return Ok(());
1517 }
1518
1519 info!(
1520 context,
1521 "Server supports metadata, retrieving server comment and admin contact."
1522 );
1523
1524 let mut comment = None;
1525 let mut admin = None;
1526 let mut iroh_relay = None;
1527 let mut ice_servers = None;
1528 let mut ice_servers_expiration_timestamp = 0;
1529
1530 let mailbox = "";
1531 let options = "";
1532 let metadata = self
1533 .get_metadata(
1534 mailbox,
1535 options,
1536 "(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
1537 )
1538 .await?;
1539 for m in metadata {
1540 match m.entry.as_ref() {
1541 "/shared/comment" => {
1542 comment = m.value;
1543 }
1544 "/shared/admin" => {
1545 admin = m.value;
1546 }
1547 "/shared/vendor/deltachat/irohrelay" => {
1548 if let Some(value) = m.value {
1549 if let Ok(url) = Url::parse(&value) {
1550 iroh_relay = Some(url);
1551 } else {
1552 warn!(
1553 context,
1554 "Got invalid URL from iroh relay metadata: {:?}.", value
1555 );
1556 }
1557 }
1558 }
1559 "/shared/vendor/deltachat/turn" => {
1560 if let Some(value) = m.value {
1561 match create_ice_servers_from_metadata(&value).await {
1562 Ok((parsed_timestamp, parsed_ice_servers)) => {
1563 ice_servers_expiration_timestamp = parsed_timestamp;
1564 ice_servers = Some(parsed_ice_servers);
1565 }
1566 Err(err) => {
1567 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1568 }
1569 }
1570 }
1571 }
1572 _ => {}
1573 }
1574 }
1575 let ice_servers = if let Some(ice_servers) = ice_servers {
1576 ice_servers
1577 } else {
1578 ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1580 create_fallback_ice_servers()
1581 };
1582
1583 *lock = Some(ServerMetadata {
1584 comment,
1585 admin,
1586 iroh_relay,
1587 ice_servers,
1588 ice_servers_expiration_timestamp,
1589 });
1590 Ok(())
1591 }
1592
1593 pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
1595 if context.push_subscribed.load(Ordering::Relaxed) {
1596 return Ok(());
1597 }
1598
1599 let transport_id = self.transport_id();
1600
1601 let Some(device_token) = context.push_subscriber.device_token().await else {
1602 return Ok(());
1603 };
1604
1605 if self.can_metadata() && self.can_push() {
1606 info!(
1607 context,
1608 "Transport {transport_id}: Subscribing for push notifications."
1609 );
1610
1611 let old_encrypted_device_token =
1612 context.get_config(Config::EncryptedDeviceToken).await?;
1613
1614 let device_token_changed = old_encrypted_device_token.is_none()
1616 || context.get_config(Config::DeviceToken).await?.as_ref() != Some(&device_token);
1617
1618 let new_encrypted_device_token;
1619 if device_token_changed {
1620 let encrypted_device_token = encrypt_device_token(&device_token)
1621 .context("Failed to encrypt device token")?;
1622
1623 let encrypted_device_token_len = encrypted_device_token.len();
1627
1628 context
1634 .set_config_internal(Config::DeviceToken, Some(&device_token))
1635 .await?;
1636 context
1637 .set_config_internal(
1638 Config::EncryptedDeviceToken,
1639 Some(&encrypted_device_token),
1640 )
1641 .await?;
1642
1643 if encrypted_device_token_len <= 4096 {
1644 new_encrypted_device_token = Some(encrypted_device_token);
1645 } else {
1646 warn!(context, "Device token is too long for LITERAL-, ignoring.");
1656 new_encrypted_device_token = None;
1657 }
1658 } else {
1659 new_encrypted_device_token = old_encrypted_device_token;
1660 }
1661
1662 if let Some(encrypted_device_token) = new_encrypted_device_token {
1665 let folder = context
1666 .get_config(Config::ConfiguredInboxFolder)
1667 .await?
1668 .context("INBOX is not configured")?;
1669
1670 self.run_command_and_check_ok(&format_setmetadata(
1671 &folder,
1672 &encrypted_device_token,
1673 ))
1674 .await
1675 .context("SETMETADATA command failed")?;
1676
1677 context.push_subscribed.store(true, Ordering::Relaxed);
1678 }
1679 } else if !context.push_subscriber.heartbeat_subscribed().await {
1680 let context = context.clone();
1681 tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
1683 }
1684
1685 Ok(())
1686 }
1687}
1688
1689fn format_setmetadata(folder: &str, device_token: &str) -> String {
1690 let device_token_len = device_token.len();
1691 format!(
1692 "SETMETADATA \"{folder}\" (/private/devicetoken {{{device_token_len}+}}\r\n{device_token})"
1693 )
1694}
1695
1696impl Session {
1697 async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
1703 if flag == "\\Deleted" {
1704 self.selected_folder_needs_expunge = true;
1705 }
1706 let query = format!("+FLAGS ({flag})");
1707 let mut responses = self
1708 .uid_store(uid_set, &query)
1709 .await
1710 .with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
1711 while let Some(_response) = responses.try_next().await? {
1712 }
1714 Ok(())
1715 }
1716
1717 async fn configure_mvbox<'a>(
1726 &mut self,
1727 context: &Context,
1728 folders: &[&'a str],
1729 ) -> Result<Option<&'a str>> {
1730 self.maybe_close_folder(context).await?;
1733
1734 for folder in folders {
1735 info!(context, "Looking for MVBOX-folder \"{}\"...", &folder);
1736 let res = self.examine(&folder).await;
1737 if res.is_ok() {
1738 info!(
1739 context,
1740 "MVBOX-folder {:?} successfully selected, using it.", &folder
1741 );
1742 self.close().await?;
1743 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1746 ensure!(folder_exists, "No MVBOX folder {:?}??", &folder);
1747 return Ok(Some(folder));
1748 }
1749 }
1750
1751 Ok(None)
1752 }
1753}
1754
1755impl Imap {
1756 pub(crate) async fn configure_folders(
1757 &mut self,
1758 context: &Context,
1759 session: &mut Session,
1760 ) -> Result<()> {
1761 let mut folders = session
1762 .list(Some(""), Some("*"))
1763 .await
1764 .context("list_folders failed")?;
1765 let mut delimiter = ".".to_string();
1766 let mut delimiter_is_default = true;
1767 let mut folder_configs = BTreeMap::new();
1768
1769 while let Some(folder) = folders.try_next().await? {
1770 info!(context, "Scanning folder: {:?}", folder);
1771
1772 if let Some(d) = folder.delimiter()
1774 && delimiter_is_default
1775 && !d.is_empty()
1776 && delimiter != d
1777 {
1778 delimiter = d.to_string();
1779 delimiter_is_default = false;
1780 }
1781
1782 let folder_meaning = get_folder_meaning_by_attrs(folder.attributes());
1783 let folder_name_meaning = get_folder_meaning_by_name(folder.name());
1784 if let Some(config) = folder_meaning.to_config() {
1785 folder_configs.insert(config, folder.name().to_string());
1787 } else if let Some(config) = folder_name_meaning.to_config() {
1788 folder_configs
1790 .entry(config)
1791 .or_insert_with(|| folder.name().to_string());
1792 }
1793 }
1794 drop(folders);
1795
1796 info!(context, "Using \"{}\" as folder-delimiter.", delimiter);
1797
1798 let fallback_folder = format!("INBOX{delimiter}DeltaChat");
1799 let mvbox_folder = session
1800 .configure_mvbox(context, &["DeltaChat", &fallback_folder])
1801 .await
1802 .context("failed to configure mvbox")?;
1803
1804 context
1805 .set_config_internal(Config::ConfiguredInboxFolder, Some("INBOX"))
1806 .await?;
1807 if let Some(mvbox_folder) = mvbox_folder {
1808 info!(context, "Setting MVBOX FOLDER TO {}", &mvbox_folder);
1809 context
1810 .set_config_internal(Config::ConfiguredMvboxFolder, Some(mvbox_folder))
1811 .await?;
1812 }
1813 for (config, name) in folder_configs {
1814 context.set_config_internal(config, Some(&name)).await?;
1815 }
1816 context
1817 .sql
1818 .set_raw_config_int(
1819 constants::DC_FOLDERS_CONFIGURED_KEY,
1820 constants::DC_FOLDERS_CONFIGURED_VERSION,
1821 )
1822 .await?;
1823
1824 info!(context, "FINISHED configuring IMAP-folders.");
1825 Ok(())
1826 }
1827}
1828
1829impl Session {
1830 fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
1839 use UnsolicitedResponse::*;
1840 use async_imap::imap_proto::Response;
1841 use async_imap::imap_proto::ResponseCode;
1842
1843 let folder = self.selected_folder.as_deref().unwrap_or_default();
1844 let mut should_refetch = false;
1845 while let Ok(response) = self.unsolicited_responses.try_recv() {
1846 match response {
1847 Exists(_) => {
1848 info!(
1849 context,
1850 "Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
1851 );
1852 should_refetch = true;
1853 }
1854
1855 Expunge(_) | Recent(_) => {}
1856 Other(ref response_data) => {
1857 match response_data.parsed() {
1858 Response::Fetch { .. } => {
1859 info!(
1860 context,
1861 "Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
1862 );
1863 should_refetch = true;
1864 }
1865
1866 Response::Done {
1869 code: Some(ResponseCode::CopyUid(_, _, _)),
1870 ..
1871 } => {}
1872
1873 _ => {
1874 info!(context, "{folder:?}: got unsolicited response {response:?}")
1875 }
1876 }
1877 }
1878 _ => {
1879 info!(context, "{folder:?}: got unsolicited response {response:?}")
1880 }
1881 }
1882 }
1883 Ok(should_refetch)
1884 }
1885}
1886
1887async fn should_move_out_of_spam(
1888 context: &Context,
1889 headers: &[mailparse::MailHeader<'_>],
1890) -> Result<bool> {
1891 if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
1892 return Ok(true);
1903 }
1904
1905 if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
1906 if msg.chat_blocked != Blocked::Not {
1907 return Ok(false);
1909 }
1910 } else {
1911 let from = match mimeparser::get_from(headers) {
1912 Some(f) => f,
1913 None => return Ok(false),
1914 };
1915 let (from_id, blocked_contact, _origin) =
1917 match from_field_to_contact_id(context, &from, None, true, true)
1918 .await
1919 .context("from_field_to_contact_id")?
1920 {
1921 Some(res) => res,
1922 None => {
1923 warn!(
1924 context,
1925 "Contact with From address {:?} cannot exist, not moving out of spam", from
1926 );
1927 return Ok(false);
1928 }
1929 };
1930 if blocked_contact {
1931 return Ok(false);
1933 }
1934
1935 if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
1936 if chat_id_blocked.blocked != Blocked::Not {
1937 return Ok(false);
1938 }
1939 } else if from_id != ContactId::SELF {
1940 return Ok(false);
1942 }
1943 }
1944
1945 Ok(true)
1946}
1947
1948async fn spam_target_folder_cfg(
1953 context: &Context,
1954 headers: &[mailparse::MailHeader<'_>],
1955) -> Result<Option<Config>> {
1956 if !should_move_out_of_spam(context, headers).await? {
1957 return Ok(None);
1958 }
1959
1960 if needs_move_to_mvbox(context, headers).await?
1961 || context.get_config_bool(Config::OnlyFetchMvbox).await?
1964 {
1965 Ok(Some(Config::ConfiguredMvboxFolder))
1966 } else {
1967 Ok(Some(Config::ConfiguredInboxFolder))
1968 }
1969}
1970
1971pub async fn target_folder_cfg(
1974 context: &Context,
1975 folder: &str,
1976 folder_meaning: FolderMeaning,
1977 headers: &[mailparse::MailHeader<'_>],
1978) -> Result<Option<Config>> {
1979 if context.is_mvbox(folder).await? {
1980 return Ok(None);
1981 }
1982
1983 if folder_meaning == FolderMeaning::Spam {
1984 spam_target_folder_cfg(context, headers).await
1985 } else if folder_meaning == FolderMeaning::Inbox
1986 && needs_move_to_mvbox(context, headers).await?
1987 {
1988 Ok(Some(Config::ConfiguredMvboxFolder))
1989 } else {
1990 Ok(None)
1991 }
1992}
1993
1994pub async fn target_folder(
1995 context: &Context,
1996 folder: &str,
1997 folder_meaning: FolderMeaning,
1998 headers: &[mailparse::MailHeader<'_>],
1999) -> Result<String> {
2000 match target_folder_cfg(context, folder, folder_meaning, headers).await? {
2001 Some(config) => match context.get_config(config).await? {
2002 Some(target) => Ok(target),
2003 None => Ok(folder.to_string()),
2004 },
2005 None => Ok(folder.to_string()),
2006 }
2007}
2008
2009async fn needs_move_to_mvbox(
2010 context: &Context,
2011 headers: &[mailparse::MailHeader<'_>],
2012) -> Result<bool> {
2013 let has_chat_version = headers.get_header_value(HeaderDef::ChatVersion).is_some();
2014 if !context.get_config_bool(Config::MvboxMove).await? {
2015 return Ok(false);
2016 }
2017
2018 if has_chat_version {
2019 Ok(true)
2020 } else if let Some(parent) = get_prefetch_parent_message(context, headers).await? {
2021 match parent.is_dc_message {
2022 MessengerMessage::No => Ok(false),
2023 MessengerMessage::Yes | MessengerMessage::Reply => Ok(true),
2024 }
2025 } else {
2026 Ok(false)
2027 }
2028}
2029
2030fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
2037 const SPAM_NAMES: &[&str] = &[
2039 "spam",
2040 "junk",
2041 "Correio electrónico não solicitado",
2042 "Correo basura",
2043 "Lixo",
2044 "Nettsøppel",
2045 "Nevyžádaná pošta",
2046 "No solicitado",
2047 "Ongewenst",
2048 "Posta indesiderata",
2049 "Skräp",
2050 "Wiadomości-śmieci",
2051 "Önemsiz",
2052 "Ανεπιθύμητα",
2053 "Спам",
2054 "垃圾邮件",
2055 "垃圾郵件",
2056 "迷惑メール",
2057 "스팸",
2058 ];
2059 const TRASH_NAMES: &[&str] = &[
2060 "Trash",
2061 "Bin",
2062 "Caixote do lixo",
2063 "Cestino",
2064 "Corbeille",
2065 "Papelera",
2066 "Papierkorb",
2067 "Papirkurv",
2068 "Papperskorgen",
2069 "Prullenbak",
2070 "Rubujo",
2071 "Κάδος απορριμμάτων",
2072 "Корзина",
2073 "Кошик",
2074 "ゴミ箱",
2075 "垃圾桶",
2076 "已删除邮件",
2077 "휴지통",
2078 ];
2079 let lower = folder_name.to_lowercase();
2080
2081 if lower == "inbox" {
2082 FolderMeaning::Inbox
2083 } else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
2084 FolderMeaning::Spam
2085 } else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
2086 FolderMeaning::Trash
2087 } else {
2088 FolderMeaning::Unknown
2089 }
2090}
2091
2092fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
2093 for attr in folder_attrs {
2094 match attr {
2095 NameAttribute::Trash => return FolderMeaning::Trash,
2096 NameAttribute::Junk => return FolderMeaning::Spam,
2097 NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
2098 NameAttribute::Extension(label) => {
2099 match label.as_ref() {
2100 "\\Spam" => return FolderMeaning::Spam,
2101 "\\Important" => return FolderMeaning::Virtual,
2102 _ => {}
2103 };
2104 }
2105 _ => {}
2106 }
2107 }
2108 FolderMeaning::Unknown
2109}
2110
2111pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
2112 match get_folder_meaning_by_attrs(folder.attributes()) {
2113 FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
2114 meaning => meaning,
2115 }
2116}
2117
2118fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader<'_>>> {
2120 match prefetch_msg.header() {
2121 Some(header_bytes) => {
2122 let (headers, _) = mailparse::parse_headers(header_bytes)?;
2123 Ok(headers)
2124 }
2125 None => Ok(Vec::new()),
2126 }
2127}
2128
2129pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
2130 headers
2131 .get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
2132 .or_else(|| headers.get_header_value(HeaderDef::MessageId))
2133 .and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
2134}
2135
2136pub(crate) fn create_message_id() -> String {
2137 format!("{}{}", GENERATED_PREFIX, create_id())
2138}
2139
2140pub(crate) async fn prefetch_should_download(
2142 context: &Context,
2143 headers: &[mailparse::MailHeader<'_>],
2144 message_id: &str,
2145 mut flags: impl Iterator<Item = Flag<'_>>,
2146) -> Result<bool> {
2147 if message::rfc724_mid_download_tried(context, message_id).await? {
2148 if let Some(from) = mimeparser::get_from(headers)
2149 && context.is_self_addr(&from.addr).await?
2150 {
2151 markseen_on_imap_table(context, message_id).await?;
2152 }
2153 return Ok(false);
2154 }
2155
2156 let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
2160 let from = from.to_ascii_lowercase();
2161 from.contains("mailer-daemon") || from.contains("mail-daemon")
2162 } else {
2163 false
2164 };
2165
2166 let from = match mimeparser::get_from(headers) {
2167 Some(f) => f,
2168 None => return Ok(false),
2169 };
2170 let (_from_id, blocked_contact, _origin) =
2171 match from_field_to_contact_id(context, &from, None, true, true).await? {
2172 Some(res) => res,
2173 None => return Ok(false),
2174 };
2175 if flags.any(|f| f == Flag::Draft) {
2179 info!(context, "Ignoring draft message");
2180 return Ok(false);
2181 }
2182
2183 let should_download = (!blocked_contact) || maybe_ndn;
2184 Ok(should_download)
2185}
2186
2187async fn mark_seen_by_uid(
2191 context: &Context,
2192 transport_id: u32,
2193 folder: &str,
2194 uid_validity: u32,
2195 uid: u32,
2196) -> Result<Option<ChatId>> {
2197 if let Some((msg_id, chat_id)) = context
2198 .sql
2199 .query_row_optional(
2200 "SELECT id, chat_id FROM msgs
2201 WHERE id > 9 AND rfc724_mid IN (
2202 SELECT rfc724_mid FROM imap
2203 WHERE transport_id=?
2204 AND folder=?
2205 AND uidvalidity=?
2206 AND uid=?
2207 LIMIT 1
2208 )",
2209 (transport_id, &folder, uid_validity, uid),
2210 |row| {
2211 let msg_id: MsgId = row.get(0)?;
2212 let chat_id: ChatId = row.get(1)?;
2213 Ok((msg_id, chat_id))
2214 },
2215 )
2216 .await
2217 .with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
2218 {
2219 let updated = context
2220 .sql
2221 .execute(
2222 "UPDATE msgs SET state=?1
2223 WHERE (state=?2 OR state=?3)
2224 AND id=?4",
2225 (
2226 MessageState::InSeen,
2227 MessageState::InFresh,
2228 MessageState::InNoticed,
2229 msg_id,
2230 ),
2231 )
2232 .await
2233 .with_context(|| format!("failed to update msg {msg_id} state"))?
2234 > 0;
2235
2236 if updated {
2237 msg_id
2238 .start_ephemeral_timer(context)
2239 .await
2240 .with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
2241 Ok(Some(chat_id))
2242 } else {
2243 Ok(None)
2245 }
2246 } else {
2247 Ok(None)
2249 }
2250}
2251
2252pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
2255 context
2256 .sql
2257 .execute(
2258 "INSERT OR IGNORE INTO imap_markseen (id)
2259 SELECT id FROM imap WHERE rfc724_mid=?",
2260 (message_id,),
2261 )
2262 .await?;
2263 context.scheduler.interrupt_inbox().await;
2264
2265 Ok(())
2266}
2267
2268pub(crate) async fn set_uid_next(
2272 context: &Context,
2273 transport_id: u32,
2274 folder: &str,
2275 uid_next: u32,
2276) -> Result<()> {
2277 context
2278 .sql
2279 .execute(
2280 "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
2281 ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
2282 (transport_id, folder, uid_next),
2283 )
2284 .await?;
2285 Ok(())
2286}
2287
2288async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2294 Ok(context
2295 .sql
2296 .query_get_value(
2297 "SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
2298 (transport_id, folder),
2299 )
2300 .await?
2301 .unwrap_or(0))
2302}
2303
2304pub(crate) async fn set_uidvalidity(
2305 context: &Context,
2306 transport_id: u32,
2307 folder: &str,
2308 uidvalidity: u32,
2309) -> Result<()> {
2310 context
2311 .sql
2312 .execute(
2313 "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
2314 ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
2315 (transport_id, folder, uidvalidity),
2316 )
2317 .await?;
2318 Ok(())
2319}
2320
2321async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2322 Ok(context
2323 .sql
2324 .query_get_value(
2325 "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
2326 (transport_id, folder),
2327 )
2328 .await?
2329 .unwrap_or(0))
2330}
2331
2332pub(crate) async fn set_modseq(
2333 context: &Context,
2334 transport_id: u32,
2335 folder: &str,
2336 modseq: u64,
2337) -> Result<()> {
2338 context
2339 .sql
2340 .execute(
2341 "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
2342 ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
2343 (transport_id, folder, modseq),
2344 )
2345 .await?;
2346 Ok(())
2347}
2348
2349async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
2350 Ok(context
2351 .sql
2352 .query_get_value(
2353 "SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
2354 (transport_id, folder),
2355 )
2356 .await?
2357 .unwrap_or(0))
2358}
2359
2360async fn should_ignore_folder(
2365 context: &Context,
2366 folder: &str,
2367 folder_meaning: FolderMeaning,
2368) -> Result<bool> {
2369 if !context.get_config_bool(Config::OnlyFetchMvbox).await? {
2370 return Ok(false);
2371 }
2372 Ok(!(context.is_mvbox(folder).await? || folder_meaning == FolderMeaning::Spam))
2373}
2374
2375#[expect(clippy::arithmetic_side_effects)]
2379fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
2380 let mut ranges: Vec<UidRange> = vec![];
2382
2383 for ¤t in uids {
2384 if let Some(last) = ranges.last_mut()
2385 && last.end + 1 == current
2386 {
2387 last.end = current;
2388 continue;
2389 }
2390
2391 ranges.push(UidRange {
2392 start: current,
2393 end: current,
2394 });
2395 }
2396
2397 let mut result = vec![];
2399 let (mut last_uids, mut last_str) = (Vec::new(), String::new());
2400 for range in ranges {
2401 last_uids.reserve((range.end - range.start + 1).try_into()?);
2402 (range.start..=range.end).for_each(|u| last_uids.push(u));
2403 if !last_str.is_empty() {
2404 last_str.push(',');
2405 }
2406 last_str.push_str(&range.to_string());
2407
2408 if last_str.len() > 990 {
2409 result.push((take(&mut last_uids), take(&mut last_str)));
2410 }
2411 }
2412 result.push((last_uids, last_str));
2413
2414 result.retain(|(_, s)| !s.is_empty());
2415 Ok(result)
2416}
2417
2418struct UidRange {
2419 start: u32,
2420 end: u32,
2421 }
2423
2424impl std::fmt::Display for UidRange {
2425 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2426 if self.start == self.end {
2427 write!(f, "{}", self.start)
2428 } else {
2429 write!(f, "{}:{}", self.start, self.end)
2430 }
2431 }
2432}
2433
2434pub(crate) async fn get_watched_folder_configs(context: &Context) -> Result<Vec<Config>> {
2435 let mut res = vec![Config::ConfiguredInboxFolder];
2436 if context.should_watch_mvbox().await? {
2437 res.push(Config::ConfiguredMvboxFolder);
2438 }
2439 Ok(res)
2440}
2441
2442pub(crate) async fn get_watched_folders(context: &Context) -> Result<Vec<String>> {
2443 let mut res = Vec::new();
2444 for folder_config in get_watched_folder_configs(context).await? {
2445 if let Some(folder) = context.get_config(folder_config).await? {
2446 res.push(folder);
2447 }
2448 }
2449 Ok(res)
2450}
2451
2452#[cfg(test)]
2453mod imap_tests;