1use std::{
7 cmp::max,
8 cmp::min,
9 collections::{BTreeMap, BTreeSet, HashMap},
10 iter::Peekable,
11 mem::take,
12 sync::atomic::Ordering,
13 time::{Duration, UNIX_EPOCH},
14};
15
16use anyhow::{Context as _, Result, bail, ensure, format_err};
17use async_channel::{self, Receiver, Sender};
18use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
19use futures::{FutureExt as _, TryStreamExt};
20use futures_lite::FutureExt;
21use ratelimit::Ratelimit;
22use url::Url;
23
24use crate::calls::{
25 UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata,
26};
27use crate::chat::{self, ChatId, ChatIdBlocked, add_device_msg};
28use crate::chatlist_events;
29use crate::config::Config;
30use crate::constants::{self, Blocked, DC_VERSION_STR};
31use crate::contact::ContactId;
32use crate::context::Context;
33use crate::events::EventType;
34use crate::headerdef::{HeaderDef, HeaderDefMap};
35use crate::log::{LogExt, warn};
36use crate::message::{self, Message, MessageState, MessengerMessage, MsgId};
37use crate::mimeparser;
38use crate::net::proxy::ProxyConfig;
39use crate::net::session::SessionStream;
40use crate::oauth2::get_oauth2_access_token;
41use crate::push::encrypt_device_token;
42use crate::receive_imf::{
43 ReceivedMsg, from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner,
44};
45use crate::scheduler::connectivity::ConnectivityStore;
46use crate::stock_str;
47use crate::tools::{self, create_id, duration_to_str, time};
48use crate::transport::{
49 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
50};
51
52pub(crate) mod capabilities;
53mod client;
54mod idle;
55pub mod select_folder;
56pub(crate) mod session;
57
58use client::{Client, determine_capabilities};
59use session::Session;
60
61pub(crate) const GENERATED_PREFIX: &str = "GEN_";
62
63const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
64 MESSAGE-ID \
65 X-MICROSOFT-ORIGINAL-MESSAGE-ID\
66 )])";
67const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
68
69#[derive(Debug)]
70pub(crate) struct Imap {
71 transport_id: u32,
75
76 pub(crate) idle_interrupt_receiver: Receiver<()>,
77
78 pub(crate) addr: String,
80
81 lp: Vec<ConfiguredServerLoginParam>,
83
84 password: String,
86
87 proxy_config: Option<ProxyConfig>,
89
90 strict_tls: bool,
91
92 oauth2: bool,
93
94 authentication_failed_once: bool,
95
96 pub(crate) connectivity: ConnectivityStore,
97
98 conn_last_try: tools::Time,
99 conn_backoff_ms: u64,
100
101 ratelimit: Ratelimit,
109
110 pub(crate) resync_request_sender: async_channel::Sender<()>,
112
113 pub(crate) resync_request_receiver: async_channel::Receiver<()>,
115}
116
117#[derive(Debug)]
118struct OAuth2 {
119 user: String,
120 access_token: String,
121}
122
123#[derive(Debug, Default)]
124pub(crate) struct ServerMetadata {
125 pub comment: Option<String>,
128
129 pub admin: Option<String>,
132
133 pub iroh_relay: Option<Url>,
134
135 pub ice_servers: Vec<UnresolvedIceServer>,
137
138 pub ice_servers_expiration_timestamp: i64,
145}
146
147impl async_imap::Authenticator for OAuth2 {
148 type Response = String;
149
150 fn process(&mut self, _data: &[u8]) -> Self::Response {
151 format!(
152 "user={}\x01auth=Bearer {}\x01\x01",
153 self.user, self.access_token
154 )
155 }
156}
157
158#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
159pub enum FolderMeaning {
160 Unknown,
161
162 Spam,
164 Inbox,
165 Mvbox,
166 Trash,
167
168 Virtual,
175}
176
177impl FolderMeaning {
178 pub fn to_config(self) -> Option<Config> {
179 match self {
180 FolderMeaning::Unknown => None,
181 FolderMeaning::Spam => None,
182 FolderMeaning::Inbox => Some(Config::ConfiguredInboxFolder),
183 FolderMeaning::Mvbox => Some(Config::ConfiguredMvboxFolder),
184 FolderMeaning::Trash => None,
185 FolderMeaning::Virtual => None,
186 }
187 }
188}
189
190struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
191 inner: Peekable<T>,
192}
193
194impl<T, I> From<I> for UidGrouper<T>
195where
196 T: Iterator<Item = (i64, u32, String)>,
197 I: IntoIterator<IntoIter = T>,
198{
199 fn from(inner: I) -> Self {
200 Self {
201 inner: inner.into_iter().peekable(),
202 }
203 }
204}
205
206impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
207 type Item = (String, Vec<i64>, String);
209
210 #[expect(clippy::arithmetic_side_effects)]
211 fn next(&mut self) -> Option<Self::Item> {
212 let (_, _, folder) = self.inner.peek().cloned()?;
213
214 let mut uid_set = String::new();
215 let mut rowid_set = Vec::new();
216
217 while uid_set.len() < 1000 {
218 if let Some((start_rowid, start_uid, _)) = self
220 .inner
221 .next_if(|(_, _, start_folder)| start_folder == &folder)
222 {
223 rowid_set.push(start_rowid);
224 let mut end_uid = start_uid;
225
226 while let Some((next_rowid, next_uid, _)) =
227 self.inner.next_if(|(_, next_uid, next_folder)| {
228 next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
229 })
230 {
231 end_uid = next_uid;
232 rowid_set.push(next_rowid);
233 }
234
235 let uid_range = UidRange {
236 start: start_uid,
237 end: end_uid,
238 };
239 if !uid_set.is_empty() {
240 uid_set.push(',');
241 }
242 uid_set.push_str(&uid_range.to_string());
243 } else {
244 break;
245 }
246 }
247
248 Some((folder, rowid_set, uid_set))
249 }
250}
251
252impl Imap {
253 pub async fn new(
255 context: &Context,
256 transport_id: u32,
257 param: ConfiguredLoginParam,
258 idle_interrupt_receiver: Receiver<()>,
259 ) -> Result<Self> {
260 let lp = param.imap.clone();
261 let password = param.imap_password.clone();
262 let proxy_config = ProxyConfig::load(context).await?;
263 let addr = ¶m.addr;
264 let strict_tls = param.strict_tls(proxy_config.is_some());
265 let oauth2 = param.oauth2;
266 let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
267 Ok(Imap {
268 transport_id,
269 idle_interrupt_receiver,
270 addr: addr.to_string(),
271 lp,
272 password,
273 proxy_config,
274 strict_tls,
275 oauth2,
276 authentication_failed_once: false,
277 connectivity: Default::default(),
278 conn_last_try: UNIX_EPOCH,
279 conn_backoff_ms: 0,
280 ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
282 resync_request_sender,
283 resync_request_receiver,
284 })
285 }
286
287 pub async fn new_configured(
289 context: &Context,
290 idle_interrupt_receiver: Receiver<()>,
291 ) -> Result<Self> {
292 let (transport_id, param) = ConfiguredLoginParam::load(context)
293 .await?
294 .context("Not configured")?;
295 let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
296 Ok(imap)
297 }
298
299 pub fn transport_id(&self) -> u32 {
301 self.transport_id
302 }
303
304 pub(crate) async fn connect(
310 &mut self,
311 context: &Context,
312 configuring: bool,
313 ) -> Result<Session> {
314 let now = tools::Time::now();
315 let until_can_send = max(
316 min(self.conn_last_try, now)
317 .checked_add(Duration::from_millis(self.conn_backoff_ms))
318 .unwrap_or(now),
319 now,
320 )
321 .duration_since(now)?;
322 let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
323 if !ratelimit_duration.is_zero() {
324 warn!(
325 context,
326 "IMAP got rate limited, waiting for {} until can connect.",
327 duration_to_str(ratelimit_duration),
328 );
329 let interrupted = async {
330 tokio::time::sleep(ratelimit_duration).await;
331 false
332 }
333 .race(self.idle_interrupt_receiver.recv().map(|_| true))
334 .await;
335 if interrupted {
336 info!(
337 context,
338 "Connecting to IMAP without waiting for ratelimit due to interrupt."
339 );
340 }
341 }
342
343 info!(context, "Connecting to IMAP server.");
344 self.connectivity.set_connecting(context);
345
346 self.conn_last_try = tools::Time::now();
347 const BACKOFF_MIN_MS: u64 = 2000;
348 const BACKOFF_MAX_MS: u64 = 80_000;
349 self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
350 self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(rand::random_range(
351 (self.conn_backoff_ms / 2)..=self.conn_backoff_ms,
352 ));
353 self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
354
355 let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
356 let mut first_error = None;
357 for lp in login_params {
358 info!(context, "IMAP trying to connect to {}.", &lp.connection);
359 let connection_candidate = lp.connection.clone();
360 let client = match Client::connect(
361 context,
362 self.proxy_config.clone(),
363 self.strict_tls,
364 &connection_candidate,
365 )
366 .await
367 .with_context(|| format!("IMAP failed to connect to {connection_candidate}"))
368 {
369 Ok(client) => client,
370 Err(err) => {
371 warn!(context, "{err:#}.");
372 first_error.get_or_insert(err);
373 continue;
374 }
375 };
376
377 self.conn_backoff_ms = BACKOFF_MIN_MS;
378 self.ratelimit.send();
379
380 let imap_user: &str = lp.user.as_ref();
381 let imap_pw: &str = &self.password;
382
383 let login_res = if self.oauth2 {
384 info!(context, "Logging into IMAP server with OAuth 2.");
385 let addr: &str = self.addr.as_ref();
386
387 let token = get_oauth2_access_token(context, addr, imap_pw, true)
388 .await?
389 .context("IMAP could not get OAUTH token")?;
390 let auth = OAuth2 {
391 user: imap_user.into(),
392 access_token: token,
393 };
394 client.authenticate("XOAUTH2", auth).await
395 } else {
396 info!(context, "Logging into IMAP server with LOGIN.");
397 client.login(imap_user, imap_pw).await
398 };
399
400 match login_res {
401 Ok(mut session) => {
402 let capabilities = determine_capabilities(&mut session).await?;
403 let resync_request_sender = self.resync_request_sender.clone();
404
405 let session = if capabilities.can_compress {
406 info!(context, "Enabling IMAP compression.");
407 let compressed_session = session
408 .compress(|s| {
409 let session_stream: Box<dyn SessionStream> = Box::new(s);
410 session_stream
411 })
412 .await
413 .context("Failed to enable IMAP compression")?;
414 Session::new(
415 compressed_session,
416 capabilities,
417 resync_request_sender,
418 self.transport_id,
419 )
420 } else {
421 Session::new(
422 session,
423 capabilities,
424 resync_request_sender,
425 self.transport_id,
426 )
427 };
428
429 let mut lock = context.server_id.write().await;
431 lock.clone_from(&session.capabilities.server_id);
432
433 self.authentication_failed_once = false;
434 context.emit_event(EventType::ImapConnected(format!(
435 "IMAP-LOGIN as {}",
436 lp.user
437 )));
438 self.connectivity.set_preparing(context);
439 info!(context, "Successfully logged into IMAP server.");
440 return Ok(session);
441 }
442
443 Err(err) => {
444 let imap_user = lp.user.to_owned();
445 let message = stock_str::cannot_login(context, &imap_user);
446
447 warn!(context, "IMAP failed to login: {err:#}.");
448 first_error.get_or_insert(format_err!("{message} ({err:#})"));
449
450 let _lock = context.wrong_pw_warning_mutex.lock().await;
452 if err.to_string().to_lowercase().contains("authentication") {
453 if self.authentication_failed_once
454 && !configuring
455 && context.get_config_bool(Config::NotifyAboutWrongPw).await?
456 {
457 let mut msg = Message::new_text(message);
458 if let Err(e) = chat::add_device_msg_with_importance(
459 context,
460 None,
461 Some(&mut msg),
462 true,
463 )
464 .await
465 {
466 warn!(context, "Failed to add device message: {e:#}.");
467 } else {
468 context
469 .set_config_internal(Config::NotifyAboutWrongPw, None)
470 .await
471 .log_err(context)
472 .ok();
473 }
474 } else {
475 self.authentication_failed_once = true;
476 }
477 } else {
478 self.authentication_failed_once = false;
479 }
480 }
481 }
482 }
483
484 Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
485 }
486
487 pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
492 let configuring = false;
493 let mut session = match self.connect(context, configuring).await {
494 Ok(session) => session,
495 Err(err) => {
496 self.connectivity.set_err(context, &err);
497 return Err(err);
498 }
499 };
500
501 let folders_configured = context
502 .sql
503 .get_raw_config_int(constants::DC_FOLDERS_CONFIGURED_KEY)
504 .await?;
505 if folders_configured.unwrap_or_default() < constants::DC_FOLDERS_CONFIGURED_VERSION {
506 self.configure_folders(context, &mut session).await?;
507 }
508
509 Ok(session)
510 }
511
512 pub async fn fetch_move_delete(
517 &mut self,
518 context: &Context,
519 session: &mut Session,
520 watch_folder: &str,
521 folder_meaning: FolderMeaning,
522 ) -> Result<()> {
523 if !context.sql.is_open().await {
524 bail!("IMAP operation attempted while it is torn down");
526 }
527
528 let msgs_fetched = self
529 .fetch_new_messages(context, session, watch_folder, folder_meaning)
530 .await
531 .context("fetch_new_messages")?;
532 if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
533 context.scheduler.interrupt_ephemeral_task().await;
538 }
539
540 session
541 .move_delete_messages(context, watch_folder)
542 .await
543 .context("move_delete_messages")?;
544
545 Ok(())
546 }
547
548 #[expect(clippy::arithmetic_side_effects)]
552 pub(crate) async fn fetch_new_messages(
553 &mut self,
554 context: &Context,
555 session: &mut Session,
556 folder: &str,
557 folder_meaning: FolderMeaning,
558 ) -> Result<bool> {
559 let transport_id = session.transport_id();
560
561 if should_ignore_folder(context, folder, folder_meaning).await? {
562 info!(
563 context,
564 "Transport {transport_id}: Not fetching from {folder:?}."
565 );
566 session.new_mail = false;
567 return Ok(false);
568 }
569
570 let folder_exists = session
571 .select_with_uidvalidity(context, folder)
572 .await
573 .with_context(|| format!("Failed to select folder {folder:?}"))?;
574
575 if !session.new_mail {
576 info!(
577 context,
578 "Transport {transport_id}: No new emails in folder {folder:?}."
579 );
580 return Ok(false);
581 }
582 session.new_mail = false;
585
586 if !folder_exists {
587 return Ok(false);
588 }
589
590 let mut read_cnt = 0;
591 loop {
592 let (n, fetch_more) = self
593 .fetch_new_msg_batch(context, session, folder, folder_meaning)
594 .await?;
595 read_cnt += n;
596 if !fetch_more {
597 return Ok(read_cnt > 0);
598 }
599 }
600 }
601
602 #[expect(clippy::arithmetic_side_effects)]
604 async fn fetch_new_msg_batch(
605 &mut self,
606 context: &Context,
607 session: &mut Session,
608 folder: &str,
609 folder_meaning: FolderMeaning,
610 ) -> Result<(usize, bool)> {
611 let transport_id = self.transport_id;
612 let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
613 let old_uid_next = get_uid_next(context, transport_id, folder).await?;
614 info!(
615 context,
616 "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
617 );
618
619 let uids_to_prefetch = 500;
620 let msgs = session
621 .prefetch(old_uid_next, uids_to_prefetch)
622 .await
623 .context("prefetch")?;
624 let read_cnt = msgs.len();
625 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
626
627 let mut uids_fetch: Vec<u32> = Vec::new();
628 let mut available_post_msgs: Vec<String> = Vec::new();
629 let mut download_later: Vec<String> = Vec::new();
630 let mut uid_message_ids = BTreeMap::new();
631 let mut largest_uid_skipped = None;
632
633 let download_limit: Option<u32> = context
634 .get_config_parsed(Config::DownloadLimit)
635 .await?
636 .filter(|&l| 0 < l);
637
638 for (uid, ref fetch_response) in msgs {
640 let headers = match get_fetch_headers(fetch_response) {
641 Ok(headers) => headers,
642 Err(err) => {
643 warn!(context, "Failed to parse FETCH headers: {err:#}.");
644 continue;
645 }
646 };
647
648 let message_id = prefetch_get_message_id(&headers);
649 let size = fetch_response
650 .size
651 .context("imap fetch response does not contain size")?;
652
653 let delete = if let Some(message_id) = &message_id {
664 message::rfc724_mid_exists_ex(context, message_id, "deleted=1")
665 .await?
666 .is_some_and(|(_msg_id, deleted)| deleted)
667 } else {
668 false
669 };
670
671 let message_id = message_id.unwrap_or_else(create_message_id);
674
675 if delete {
676 info!(context, "Deleting locally deleted message {message_id}.");
677 }
678
679 let _target;
680 let target = if delete {
681 ""
682 } else {
683 _target = target_folder(context, folder, folder_meaning, &headers).await?;
684 &_target
685 };
686
687 context
688 .sql
689 .execute(
690 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
691 VALUES (?, ?, ?, ?, ?, ?)
692 ON CONFLICT(transport_id, folder, uid, uidvalidity)
693 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
694 target=excluded.target",
695 (
696 self.transport_id,
697 &message_id,
698 &folder,
699 uid,
700 uid_validity,
701 target,
702 ),
703 )
704 .await?;
705
706 if folder == target
713 && folder_meaning != FolderMeaning::Spam
718 && prefetch_should_download(
719 context,
720 &headers,
721 &message_id,
722 fetch_response.flags(),
723 )
724 .await.context("prefetch_should_download")?
725 {
726 if headers
727 .get_header_value(HeaderDef::ChatIsPostMessage)
728 .is_some()
729 {
730 info!(context, "{message_id:?} is a post-message.");
731 available_post_msgs.push(message_id.clone());
732
733 let is_bot = context.get_config_bool(Config::Bot).await?;
734 if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
735 {
736 uids_fetch.push(uid);
737 uid_message_ids.insert(uid, message_id);
738 } else {
739 if download_limit.is_none_or(|download_limit| size <= download_limit) {
740 download_later.push(message_id.clone());
743 }
744 largest_uid_skipped = Some(uid);
745 }
746 } else {
747 info!(context, "{message_id:?} is not a post-message.");
748 if download_limit.is_none_or(|download_limit| size <= download_limit) {
749 uids_fetch.push(uid);
750 uid_message_ids.insert(uid, message_id);
751 } else {
752 download_later.push(message_id.clone());
753 largest_uid_skipped = Some(uid);
754 }
755 };
756 } else {
757 largest_uid_skipped = Some(uid);
758 }
759 }
760
761 if !uids_fetch.is_empty() {
762 self.connectivity.set_working(context);
763 }
764
765 let (sender, receiver) = async_channel::unbounded();
766
767 let mut received_msgs = Vec::with_capacity(uids_fetch.len());
768 let mailbox_uid_next = session
769 .selected_mailbox
770 .as_ref()
771 .with_context(|| format!("Expected {folder:?} to be selected"))?
772 .uid_next
773 .unwrap_or_default();
774
775 let update_uids_future = async {
776 let mut largest_uid_fetched: u32 = 0;
777
778 while let Ok((uid, received_msg_opt)) = receiver.recv().await {
779 largest_uid_fetched = max(largest_uid_fetched, uid);
780 if let Some(received_msg) = received_msg_opt {
781 received_msgs.push(received_msg)
782 }
783 }
784
785 largest_uid_fetched
786 };
787
788 let actually_download_messages_future = async {
789 session
790 .fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
791 .await
792 .context("fetch_many_msgs")
793 };
794
795 let (largest_uid_fetched, fetch_res) =
796 tokio::join!(update_uids_future, actually_download_messages_future);
797
798 let mut new_uid_next = largest_uid_fetched + 1;
804 let fetch_more = fetch_res.is_ok() && {
805 let prefetch_uid_next = old_uid_next + uids_to_prefetch;
806 new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
810
811 new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
812
813 prefetch_uid_next < mailbox_uid_next
814 };
815 if new_uid_next > old_uid_next {
816 set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
817 }
818
819 info!(context, "{} mails read from \"{}\".", read_cnt, folder);
820
821 if !received_msgs.is_empty() {
822 context.emit_event(EventType::IncomingMsgBunch);
823 }
824
825 chat::mark_old_messages_as_noticed(context, received_msgs).await?;
826
827 if fetch_res.is_ok() {
828 info!(
829 context,
830 "available_post_msgs: {}, download_later: {}.",
831 available_post_msgs.len(),
832 download_later.len(),
833 );
834 let trans_fn = |t: &mut rusqlite::Transaction| {
835 let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
836 for rfc724_mid in available_post_msgs {
837 stmt.execute((rfc724_mid,))
838 .context("INSERT OR IGNORE INTO available_post_msgs")?;
839 }
840 let mut stmt =
841 t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
842 for rfc724_mid in download_later {
843 stmt.execute((rfc724_mid,))
844 .context("INSERT OR IGNORE INTO download")?;
845 }
846 Ok(())
847 };
848 context.sql.transaction(trans_fn).await?;
849 }
850
851 fetch_res?;
854
855 Ok((read_cnt, fetch_more))
856 }
857}
858
859impl Session {
860 pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
862 let all_folders = self
863 .list_folders()
864 .await
865 .context("listing folders for resync")?;
866 for folder in all_folders {
867 let folder_meaning = get_folder_meaning(&folder);
868 if !matches!(
869 folder_meaning,
870 FolderMeaning::Virtual | FolderMeaning::Unknown
871 ) {
872 self.resync_folder_uids(context, folder.name(), folder_meaning)
873 .await?;
874 }
875 }
876 Ok(())
877 }
878
879 pub(crate) async fn resync_folder_uids(
886 &mut self,
887 context: &Context,
888 folder: &str,
889 folder_meaning: FolderMeaning,
890 ) -> Result<()> {
891 let uid_validity;
892 let mut msgs = BTreeMap::new();
894
895 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
896 let transport_id = self.transport_id();
897 if folder_exists {
898 let mut list = self
899 .uid_fetch("1:*", RFC724MID_UID)
900 .await
901 .with_context(|| format!("Can't resync folder {folder}"))?;
902 while let Some(fetch) = list.try_next().await? {
903 let headers = match get_fetch_headers(&fetch) {
904 Ok(headers) => headers,
905 Err(err) => {
906 warn!(context, "Failed to parse FETCH headers: {}", err);
907 continue;
908 }
909 };
910 let message_id = prefetch_get_message_id(&headers);
911
912 if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
913 msgs.insert(
914 uid,
915 (
916 rfc724_mid,
917 target_folder(context, folder, folder_meaning, &headers).await?,
918 ),
919 );
920 }
921 }
922
923 info!(
924 context,
925 "resync_folder_uids: Collected {} message IDs in {folder}.",
926 msgs.len(),
927 );
928
929 uid_validity = get_uidvalidity(context, transport_id, folder).await?;
930 } else {
931 warn!(context, "resync_folder_uids: No folder {folder}.");
932 uid_validity = 0;
933 }
934
935 context
937 .sql
938 .transaction(move |transaction| {
939 transaction.execute("DELETE FROM imap WHERE transport_id=? AND folder=?", (transport_id, folder,))?;
940 for (uid, (rfc724_mid, target)) in &msgs {
941 transaction.execute(
944 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
945 VALUES (?, ?, ?, ?, ?, ?)
946 ON CONFLICT(transport_id, folder, uid, uidvalidity)
947 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
948 target=excluded.target",
949 (transport_id, rfc724_mid, folder, uid, uid_validity, target),
950 )?;
951 }
952 Ok(())
953 })
954 .await?;
955 Ok(())
956 }
957
958 async fn delete_message_batch(
961 &mut self,
962 context: &Context,
963 uid_set: &str,
964 row_ids: Vec<i64>,
965 ) -> Result<()> {
966 self.add_flag_finalized_with_set(uid_set, "\\Deleted")
968 .await?;
969 context
970 .sql
971 .transaction(|transaction| {
972 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
973 for row_id in row_ids {
974 stmt.execute((row_id,))?;
975 }
976 Ok(())
977 })
978 .await
979 .context("Cannot remove deleted messages from imap table")?;
980
981 context.emit_event(EventType::ImapMessageDeleted(format!(
982 "IMAP messages {uid_set} marked as deleted"
983 )));
984 Ok(())
985 }
986
987 async fn move_message_batch(
990 &mut self,
991 context: &Context,
992 set: &str,
993 row_ids: Vec<i64>,
994 target: &str,
995 ) -> Result<()> {
996 if self.can_move() {
997 match self.uid_mv(set, &target).await {
998 Ok(()) => {
999 context
1001 .sql
1002 .transaction(|transaction| {
1003 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
1004 for row_id in row_ids {
1005 stmt.execute((row_id,))?;
1006 }
1007 Ok(())
1008 })
1009 .await
1010 .context("Cannot delete moved messages from imap table")?;
1011 context.emit_event(EventType::ImapMessageMoved(format!(
1012 "IMAP messages {set} moved to {target}"
1013 )));
1014 return Ok(());
1015 }
1016 Err(err) => {
1017 warn!(
1018 context,
1019 "Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
1020 set,
1021 target,
1022 err
1023 );
1024 }
1025 }
1026 }
1027
1028 info!(
1031 context,
1032 "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
1033 );
1034 self.uid_copy(&set, &target).await?;
1035 context
1036 .sql
1037 .transaction(|transaction| {
1038 let mut stmt = transaction.prepare("UPDATE imap SET target='' WHERE id = ?")?;
1039 for row_id in row_ids {
1040 stmt.execute((row_id,))?;
1041 }
1042 Ok(())
1043 })
1044 .await
1045 .context("Cannot plan deletion of messages")?;
1046 context.emit_event(EventType::ImapMessageMoved(format!(
1047 "IMAP messages {set} copied to {target}"
1048 )));
1049 Ok(())
1050 }
1051
1052 async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
1056 let transport_id = self.transport_id();
1057 let rows = context
1058 .sql
1059 .query_map_vec(
1060 "SELECT id, uid, target FROM imap
1061 WHERE folder = ?
1062 AND transport_id = ?
1063 AND target != folder
1064 ORDER BY target, uid",
1065 (folder, transport_id),
1066 |row| {
1067 let rowid: i64 = row.get(0)?;
1068 let uid: u32 = row.get(1)?;
1069 let target: String = row.get(2)?;
1070 Ok((rowid, uid, target))
1071 },
1072 )
1073 .await?;
1074
1075 for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
1076 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1081 ensure!(folder_exists, "No folder {folder}");
1082
1083 if target.is_empty() {
1085 self.delete_message_batch(context, &uid_set, rowid_set)
1086 .await
1087 .with_context(|| format!("cannot delete batch of messages {:?}", &uid_set))?;
1088 } else {
1089 self.move_message_batch(context, &uid_set, rowid_set, &target)
1090 .await
1091 .with_context(|| {
1092 format!(
1093 "cannot move batch of messages {:?} to folder {:?}",
1094 &uid_set, target
1095 )
1096 })?;
1097 }
1098 }
1099
1100 if let Err(err) = self.maybe_close_folder(context).await {
1103 warn!(context, "Failed to close folder: {err:#}.");
1104 }
1105
1106 Ok(())
1107 }
1108
1109 pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
1111 if context.get_config_bool(Config::TeamProfile).await? {
1112 return Ok(());
1113 }
1114
1115 context
1116 .sql
1117 .execute(
1118 "DELETE FROM imap_markseen WHERE id NOT IN (SELECT imap.id FROM imap)",
1119 (),
1120 )
1121 .await?;
1122
1123 let transport_id = self.transport_id();
1124 let mut rows = context
1125 .sql
1126 .query_map_vec(
1127 "SELECT imap.id, uid, folder FROM imap, imap_markseen
1128 WHERE imap.id = imap_markseen.id
1129 AND imap.transport_id=?
1130 AND target = folder",
1131 (transport_id,),
1132 |row| {
1133 let rowid: i64 = row.get(0)?;
1134 let uid: u32 = row.get(1)?;
1135 let folder: String = row.get(2)?;
1136 Ok((rowid, uid, folder))
1137 },
1138 )
1139 .await?;
1140
1141 rows.sort_unstable_by(|(_rowid1, uid1, folder1), (_rowid2, uid2, folder2)| {
1148 (folder1, uid1).cmp(&(folder2, uid2))
1149 });
1150
1151 for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
1152 let folder_exists = match self.select_with_uidvalidity(context, &folder).await {
1153 Err(err) => {
1154 warn!(
1155 context,
1156 "store_seen_flags_on_imap: Failed to select {folder}, will retry later: {err:#}."
1157 );
1158 continue;
1159 }
1160 Ok(folder_exists) => folder_exists,
1161 };
1162 if !folder_exists {
1163 warn!(context, "store_seen_flags_on_imap: No folder {folder}.");
1164 } else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
1165 warn!(
1166 context,
1167 "Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}."
1168 );
1169 continue;
1170 } else {
1171 info!(
1172 context,
1173 "Marked messages {} in folder {} as seen.", uid_set, folder
1174 );
1175 }
1176 context
1177 .sql
1178 .transaction(|transaction| {
1179 let mut stmt = transaction.prepare("DELETE FROM imap_markseen WHERE id = ?")?;
1180 for rowid in rowid_set {
1181 stmt.execute((rowid,))?;
1182 }
1183 Ok(())
1184 })
1185 .await
1186 .context("Cannot remove messages marked as seen from imap_markseen table")?;
1187 }
1188
1189 Ok(())
1190 }
1191
1192 pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
1194 if !self.can_condstore() {
1195 info!(
1196 context,
1197 "Server does not support CONDSTORE, skipping flag synchronization."
1198 );
1199 return Ok(());
1200 }
1201
1202 if context.get_config_bool(Config::TeamProfile).await? {
1203 return Ok(());
1204 }
1205
1206 let folder_exists = self
1207 .select_with_uidvalidity(context, folder)
1208 .await
1209 .context("Failed to select folder")?;
1210 if !folder_exists {
1211 return Ok(());
1212 }
1213
1214 let mailbox = self
1215 .selected_mailbox
1216 .as_ref()
1217 .with_context(|| format!("No mailbox selected, folder: {folder}"))?;
1218
1219 if mailbox.highest_modseq.is_none() {
1222 info!(
1223 context,
1224 "Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
1225 );
1226 return Ok(());
1227 }
1228
1229 let transport_id = self.transport_id();
1230 let mut updated_chat_ids = BTreeSet::new();
1231 let uid_validity = get_uidvalidity(context, transport_id, folder)
1232 .await
1233 .with_context(|| format!("failed to get UID validity for folder {folder}"))?;
1234 let mut highest_modseq = get_modseq(context, transport_id, folder)
1235 .await
1236 .with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
1237 let mut list = self
1238 .uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
1239 .await
1240 .context("failed to fetch flags")?;
1241
1242 let mut got_unsolicited_fetch = false;
1243
1244 while let Some(fetch) = list
1245 .try_next()
1246 .await
1247 .context("failed to get FETCH result")?
1248 {
1249 let uid = if let Some(uid) = fetch.uid {
1250 uid
1251 } else {
1252 info!(context, "FETCH result contains no UID, skipping");
1253 got_unsolicited_fetch = true;
1254 continue;
1255 };
1256 let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
1257 if is_seen
1258 && let Some(chat_id) = mark_seen_by_uid(context, transport_id, folder, uid_validity, uid)
1259 .await
1260 .with_context(|| {
1261 format!("Transport {transport_id}: Failed to update seen status for msg {folder}/{uid}")
1262 })?
1263 {
1264 updated_chat_ids.insert(chat_id);
1265 }
1266
1267 if let Some(modseq) = fetch.modseq {
1268 if modseq > highest_modseq {
1269 highest_modseq = modseq;
1270 }
1271 } else {
1272 warn!(context, "FETCH result contains no MODSEQ");
1273 }
1274 }
1275 drop(list);
1276
1277 if got_unsolicited_fetch {
1278 info!(context, "Got unsolicited fetch, will skip idle");
1283 self.new_mail = true;
1284 }
1285
1286 set_modseq(context, transport_id, folder, highest_modseq)
1287 .await
1288 .with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
1289 if !updated_chat_ids.is_empty() {
1290 context.on_archived_chats_maybe_noticed();
1291 }
1292 for updated_chat_id in updated_chat_ids {
1293 context.emit_event(EventType::MsgsNoticed(updated_chat_id));
1294 chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
1295 }
1296
1297 Ok(())
1298 }
1299
1300 #[expect(clippy::arithmetic_side_effects)]
1315 pub(crate) async fn fetch_many_msgs(
1316 &mut self,
1317 context: &Context,
1318 folder: &str,
1319 request_uids: Vec<u32>,
1320 uid_message_ids: &BTreeMap<u32, String>,
1321 received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1322 ) -> Result<()> {
1323 if request_uids.is_empty() {
1324 return Ok(());
1325 }
1326
1327 for (request_uids, set) in build_sequence_sets(&request_uids)? {
1328 info!(context, "Starting UID FETCH of message set \"{}\".", set);
1329 let mut fetch_responses = self.uid_fetch(&set, BODY_FULL).await.with_context(|| {
1330 format!("fetching messages {} from folder \"{}\"", &set, folder)
1331 })?;
1332
1333 let mut uid_msgs = HashMap::with_capacity(request_uids.len());
1336
1337 let mut count = 0;
1338 for &request_uid in &request_uids {
1339 let mut fetch_response = uid_msgs.remove(&request_uid);
1341
1342 while fetch_response.is_none() {
1344 let Some(next_fetch_response) = fetch_responses
1345 .try_next()
1346 .await
1347 .context("Failed to process IMAP FETCH result")?
1348 else {
1349 break;
1351 };
1352
1353 if let Some(next_uid) = next_fetch_response.uid {
1354 if next_uid == request_uid {
1355 fetch_response = Some(next_fetch_response);
1356 } else if !request_uids.contains(&next_uid) {
1357 info!(
1364 context,
1365 "Skipping not requested FETCH response for UID {}.", next_uid
1366 );
1367 } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
1368 warn!(context, "Got duplicated UID {}.", next_uid);
1369 }
1370 } else {
1371 info!(context, "Skipping FETCH response without UID.");
1372 }
1373 }
1374
1375 let fetch_response = match fetch_response {
1376 Some(fetch) => fetch,
1377 None => {
1378 warn!(
1379 context,
1380 "Missed UID {} in the server response.", request_uid
1381 );
1382 continue;
1383 }
1384 };
1385 count += 1;
1386
1387 let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
1388 let body = fetch_response.body();
1389
1390 if is_deleted {
1391 info!(context, "Not processing deleted msg {}.", request_uid);
1392 received_msgs_channel.send((request_uid, None)).await?;
1393 continue;
1394 }
1395
1396 let body = if let Some(body) = body {
1397 body
1398 } else {
1399 info!(
1400 context,
1401 "Not processing message {} without a BODY.", request_uid
1402 );
1403 received_msgs_channel.send((request_uid, None)).await?;
1404 continue;
1405 };
1406
1407 let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
1408
1409 let Some(rfc724_mid) = uid_message_ids.get(&request_uid) else {
1410 error!(
1411 context,
1412 "No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
1413 request_uid
1414 );
1415 continue;
1416 };
1417
1418 info!(
1419 context,
1420 "Passing message UID {} to receive_imf().", request_uid
1421 );
1422 let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
1423 let received_msg = match res {
1424 Err(err) => {
1425 warn!(context, "receive_imf error: {err:#}.");
1426
1427 let text = format!(
1428 "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/.",
1429 );
1430 let mut msg = Message::new_text(text);
1431 add_device_msg(context, None, Some(&mut msg)).await?;
1432 None
1433 }
1434 Ok(msg) => msg,
1435 };
1436 received_msgs_channel
1437 .send((request_uid, received_msg))
1438 .await?;
1439 }
1440
1441 while fetch_responses
1448 .try_next()
1449 .await
1450 .context("Failed to drain FETCH responses")?
1451 .is_some()
1452 {}
1453
1454 if count != request_uids.len() {
1455 warn!(
1456 context,
1457 "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
1458 count,
1459 request_uids.len(),
1460 request_uids,
1461 );
1462 } else {
1463 info!(
1464 context,
1465 "Successfully received {} UIDs.",
1466 request_uids.len()
1467 );
1468 }
1469 }
1470
1471 Ok(())
1472 }
1473
1474 #[expect(clippy::arithmetic_side_effects)]
1480 pub(crate) async fn update_metadata(&mut self, context: &Context) -> Result<()> {
1481 let mut lock = context.metadata.write().await;
1482
1483 if !self.can_metadata() {
1484 *lock = Some(Default::default());
1485 }
1486 if let Some(ref mut old_metadata) = *lock {
1487 let now = time();
1488
1489 if now + 3600 * 12 < old_metadata.ice_servers_expiration_timestamp {
1491 return Ok(());
1492 }
1493
1494 let mut got_turn_server = false;
1495 if self.can_metadata() {
1496 info!(context, "ICE servers expired, requesting new credentials.");
1497 let mailbox = "";
1498 let options = "";
1499 let metadata = self
1500 .get_metadata(mailbox, options, "(/shared/vendor/deltachat/turn)")
1501 .await?;
1502 for m in metadata {
1503 if m.entry == "/shared/vendor/deltachat/turn"
1504 && let Some(value) = m.value
1505 {
1506 match create_ice_servers_from_metadata(&value).await {
1507 Ok((parsed_timestamp, parsed_ice_servers)) => {
1508 old_metadata.ice_servers_expiration_timestamp = parsed_timestamp;
1509 old_metadata.ice_servers = parsed_ice_servers;
1510 got_turn_server = true;
1511 }
1512 Err(err) => {
1513 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1514 }
1515 }
1516 }
1517 }
1518 }
1519 if !got_turn_server {
1520 info!(context, "Will use fallback ICE servers.");
1521 old_metadata.ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1523 old_metadata.ice_servers = create_fallback_ice_servers();
1524 }
1525 return Ok(());
1526 }
1527
1528 info!(
1529 context,
1530 "Server supports metadata, retrieving server comment and admin contact."
1531 );
1532
1533 let mut comment = None;
1534 let mut admin = None;
1535 let mut iroh_relay = None;
1536 let mut ice_servers = None;
1537 let mut ice_servers_expiration_timestamp = 0;
1538
1539 let mailbox = "";
1540 let options = "";
1541 let metadata = self
1542 .get_metadata(
1543 mailbox,
1544 options,
1545 "(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
1546 )
1547 .await?;
1548 for m in metadata {
1549 match m.entry.as_ref() {
1550 "/shared/comment" => {
1551 comment = m.value;
1552 }
1553 "/shared/admin" => {
1554 admin = m.value;
1555 }
1556 "/shared/vendor/deltachat/irohrelay" => {
1557 if let Some(value) = m.value {
1558 if let Ok(url) = Url::parse(&value) {
1559 iroh_relay = Some(url);
1560 } else {
1561 warn!(
1562 context,
1563 "Got invalid URL from iroh relay metadata: {:?}.", value
1564 );
1565 }
1566 }
1567 }
1568 "/shared/vendor/deltachat/turn" => {
1569 if let Some(value) = m.value {
1570 match create_ice_servers_from_metadata(&value).await {
1571 Ok((parsed_timestamp, parsed_ice_servers)) => {
1572 ice_servers_expiration_timestamp = parsed_timestamp;
1573 ice_servers = Some(parsed_ice_servers);
1574 }
1575 Err(err) => {
1576 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1577 }
1578 }
1579 }
1580 }
1581 _ => {}
1582 }
1583 }
1584 let ice_servers = if let Some(ice_servers) = ice_servers {
1585 ice_servers
1586 } else {
1587 ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1589 create_fallback_ice_servers()
1590 };
1591
1592 *lock = Some(ServerMetadata {
1593 comment,
1594 admin,
1595 iroh_relay,
1596 ice_servers,
1597 ice_servers_expiration_timestamp,
1598 });
1599 Ok(())
1600 }
1601
1602 pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
1604 if context.push_subscribed.load(Ordering::Relaxed) {
1605 return Ok(());
1606 }
1607
1608 let transport_id = self.transport_id();
1609
1610 let Some(device_token) = context.push_subscriber.device_token().await else {
1611 return Ok(());
1612 };
1613
1614 if self.can_metadata() && self.can_push() {
1615 info!(
1616 context,
1617 "Transport {transport_id}: Subscribing for push notifications."
1618 );
1619
1620 let old_encrypted_device_token =
1621 context.get_config(Config::EncryptedDeviceToken).await?;
1622
1623 let device_token_changed = old_encrypted_device_token.is_none()
1625 || context.get_config(Config::DeviceToken).await?.as_ref() != Some(&device_token);
1626
1627 let new_encrypted_device_token;
1628 if device_token_changed {
1629 let encrypted_device_token = encrypt_device_token(&device_token)
1630 .context("Failed to encrypt device token")?;
1631
1632 let encrypted_device_token_len = encrypted_device_token.len();
1636
1637 context
1643 .set_config_internal(Config::DeviceToken, Some(&device_token))
1644 .await?;
1645 context
1646 .set_config_internal(
1647 Config::EncryptedDeviceToken,
1648 Some(&encrypted_device_token),
1649 )
1650 .await?;
1651
1652 if encrypted_device_token_len <= 4096 {
1653 new_encrypted_device_token = Some(encrypted_device_token);
1654 } else {
1655 warn!(context, "Device token is too long for LITERAL-, ignoring.");
1665 new_encrypted_device_token = None;
1666 }
1667 } else {
1668 new_encrypted_device_token = old_encrypted_device_token;
1669 }
1670
1671 if let Some(encrypted_device_token) = new_encrypted_device_token {
1674 let folder = context
1675 .get_config(Config::ConfiguredInboxFolder)
1676 .await?
1677 .context("INBOX is not configured")?;
1678
1679 self.run_command_and_check_ok(&format_setmetadata(
1680 &folder,
1681 &encrypted_device_token,
1682 ))
1683 .await
1684 .context("SETMETADATA command failed")?;
1685
1686 context.push_subscribed.store(true, Ordering::Relaxed);
1687 }
1688 } else if !context.push_subscriber.heartbeat_subscribed().await {
1689 let context = context.clone();
1690 tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
1692 }
1693
1694 Ok(())
1695 }
1696}
1697
1698fn format_setmetadata(folder: &str, device_token: &str) -> String {
1699 let device_token_len = device_token.len();
1700 format!(
1701 "SETMETADATA \"{folder}\" (/private/devicetoken {{{device_token_len}+}}\r\n{device_token})"
1702 )
1703}
1704
1705impl Session {
1706 async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
1712 if flag == "\\Deleted" {
1713 self.selected_folder_needs_expunge = true;
1714 }
1715 let query = format!("+FLAGS ({flag})");
1716 let mut responses = self
1717 .uid_store(uid_set, &query)
1718 .await
1719 .with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
1720 while let Some(_response) = responses.try_next().await? {
1721 }
1723 Ok(())
1724 }
1725
1726 async fn configure_mvbox<'a>(
1735 &mut self,
1736 context: &Context,
1737 folders: &[&'a str],
1738 ) -> Result<Option<&'a str>> {
1739 self.maybe_close_folder(context).await?;
1742
1743 for folder in folders {
1744 info!(context, "Looking for MVBOX-folder \"{}\"...", &folder);
1745 let res = self.examine(&folder).await;
1746 if res.is_ok() {
1747 info!(
1748 context,
1749 "MVBOX-folder {:?} successfully selected, using it.", &folder
1750 );
1751 self.close().await?;
1752 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1755 ensure!(folder_exists, "No MVBOX folder {:?}??", &folder);
1756 return Ok(Some(folder));
1757 }
1758 }
1759
1760 Ok(None)
1761 }
1762}
1763
1764impl Imap {
1765 pub(crate) async fn configure_folders(
1766 &mut self,
1767 context: &Context,
1768 session: &mut Session,
1769 ) -> Result<()> {
1770 let mut folders = session
1771 .list(Some(""), Some("*"))
1772 .await
1773 .context("list_folders failed")?;
1774 let mut delimiter = ".".to_string();
1775 let mut delimiter_is_default = true;
1776 let mut folder_configs = BTreeMap::new();
1777
1778 while let Some(folder) = folders.try_next().await? {
1779 info!(context, "Scanning folder: {:?}", folder);
1780
1781 if let Some(d) = folder.delimiter()
1783 && delimiter_is_default
1784 && !d.is_empty()
1785 && delimiter != d
1786 {
1787 delimiter = d.to_string();
1788 delimiter_is_default = false;
1789 }
1790
1791 let folder_meaning = get_folder_meaning_by_attrs(folder.attributes());
1792 let folder_name_meaning = get_folder_meaning_by_name(folder.name());
1793 if let Some(config) = folder_meaning.to_config() {
1794 folder_configs.insert(config, folder.name().to_string());
1796 } else if let Some(config) = folder_name_meaning.to_config() {
1797 folder_configs
1799 .entry(config)
1800 .or_insert_with(|| folder.name().to_string());
1801 }
1802 }
1803 drop(folders);
1804
1805 info!(context, "Using \"{}\" as folder-delimiter.", delimiter);
1806
1807 let fallback_folder = format!("INBOX{delimiter}DeltaChat");
1808 let mvbox_folder = session
1809 .configure_mvbox(context, &["DeltaChat", &fallback_folder])
1810 .await
1811 .context("failed to configure mvbox")?;
1812
1813 context
1814 .set_config_internal(Config::ConfiguredInboxFolder, Some("INBOX"))
1815 .await?;
1816 if let Some(mvbox_folder) = mvbox_folder {
1817 info!(context, "Setting MVBOX FOLDER TO {}", &mvbox_folder);
1818 context
1819 .set_config_internal(Config::ConfiguredMvboxFolder, Some(mvbox_folder))
1820 .await?;
1821 }
1822 for (config, name) in folder_configs {
1823 context.set_config_internal(config, Some(&name)).await?;
1824 }
1825 context
1826 .sql
1827 .set_raw_config_int(
1828 constants::DC_FOLDERS_CONFIGURED_KEY,
1829 constants::DC_FOLDERS_CONFIGURED_VERSION,
1830 )
1831 .await?;
1832
1833 info!(context, "FINISHED configuring IMAP-folders.");
1834 Ok(())
1835 }
1836}
1837
1838impl Session {
1839 fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
1848 use UnsolicitedResponse::*;
1849 use async_imap::imap_proto::Response;
1850 use async_imap::imap_proto::ResponseCode;
1851
1852 let folder = self.selected_folder.as_deref().unwrap_or_default();
1853 let mut should_refetch = false;
1854 while let Ok(response) = self.unsolicited_responses.try_recv() {
1855 match response {
1856 Exists(_) => {
1857 info!(
1858 context,
1859 "Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
1860 );
1861 should_refetch = true;
1862 }
1863
1864 Expunge(_) | Recent(_) => {}
1865 Other(ref response_data) => {
1866 match response_data.parsed() {
1867 Response::Fetch { .. } => {
1868 info!(
1869 context,
1870 "Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
1871 );
1872 should_refetch = true;
1873 }
1874
1875 Response::Done {
1878 code: Some(ResponseCode::CopyUid(_, _, _)),
1879 ..
1880 } => {}
1881
1882 _ => {
1883 info!(context, "{folder:?}: got unsolicited response {response:?}")
1884 }
1885 }
1886 }
1887 _ => {
1888 info!(context, "{folder:?}: got unsolicited response {response:?}")
1889 }
1890 }
1891 }
1892 Ok(should_refetch)
1893 }
1894}
1895
1896async fn should_move_out_of_spam(
1897 context: &Context,
1898 headers: &[mailparse::MailHeader<'_>],
1899) -> Result<bool> {
1900 if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
1901 return Ok(true);
1912 }
1913
1914 if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
1915 if msg.chat_blocked != Blocked::Not {
1916 return Ok(false);
1918 }
1919 } else {
1920 let from = match mimeparser::get_from(headers) {
1921 Some(f) => f,
1922 None => return Ok(false),
1923 };
1924 let (from_id, blocked_contact, _origin) =
1926 match from_field_to_contact_id(context, &from, None, true, true)
1927 .await
1928 .context("from_field_to_contact_id")?
1929 {
1930 Some(res) => res,
1931 None => {
1932 warn!(
1933 context,
1934 "Contact with From address {:?} cannot exist, not moving out of spam", from
1935 );
1936 return Ok(false);
1937 }
1938 };
1939 if blocked_contact {
1940 return Ok(false);
1942 }
1943
1944 if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
1945 if chat_id_blocked.blocked != Blocked::Not {
1946 return Ok(false);
1947 }
1948 } else if from_id != ContactId::SELF {
1949 return Ok(false);
1951 }
1952 }
1953
1954 Ok(true)
1955}
1956
1957async fn spam_target_folder_cfg(
1962 context: &Context,
1963 headers: &[mailparse::MailHeader<'_>],
1964) -> Result<Option<Config>> {
1965 if !should_move_out_of_spam(context, headers).await? {
1966 return Ok(None);
1967 }
1968
1969 if needs_move_to_mvbox(context, headers).await?
1970 || context.get_config_bool(Config::OnlyFetchMvbox).await?
1973 {
1974 Ok(Some(Config::ConfiguredMvboxFolder))
1975 } else {
1976 Ok(Some(Config::ConfiguredInboxFolder))
1977 }
1978}
1979
1980pub async fn target_folder_cfg(
1983 context: &Context,
1984 folder: &str,
1985 folder_meaning: FolderMeaning,
1986 headers: &[mailparse::MailHeader<'_>],
1987) -> Result<Option<Config>> {
1988 if context.is_mvbox(folder).await? {
1989 return Ok(None);
1990 }
1991
1992 if folder_meaning == FolderMeaning::Spam {
1993 spam_target_folder_cfg(context, headers).await
1994 } else if folder_meaning == FolderMeaning::Inbox
1995 && needs_move_to_mvbox(context, headers).await?
1996 {
1997 Ok(Some(Config::ConfiguredMvboxFolder))
1998 } else {
1999 Ok(None)
2000 }
2001}
2002
2003pub async fn target_folder(
2004 context: &Context,
2005 folder: &str,
2006 folder_meaning: FolderMeaning,
2007 headers: &[mailparse::MailHeader<'_>],
2008) -> Result<String> {
2009 match target_folder_cfg(context, folder, folder_meaning, headers).await? {
2010 Some(config) => match context.get_config(config).await? {
2011 Some(target) => Ok(target),
2012 None => Ok(folder.to_string()),
2013 },
2014 None => Ok(folder.to_string()),
2015 }
2016}
2017
2018async fn needs_move_to_mvbox(
2019 context: &Context,
2020 headers: &[mailparse::MailHeader<'_>],
2021) -> Result<bool> {
2022 let has_chat_version = headers.get_header_value(HeaderDef::ChatVersion).is_some();
2023 if !context.get_config_bool(Config::MvboxMove).await? {
2024 return Ok(false);
2025 }
2026
2027 if has_chat_version {
2028 Ok(true)
2029 } else if let Some(parent) = get_prefetch_parent_message(context, headers).await? {
2030 match parent.is_dc_message {
2031 MessengerMessage::No => Ok(false),
2032 MessengerMessage::Yes | MessengerMessage::Reply => Ok(true),
2033 }
2034 } else {
2035 Ok(false)
2036 }
2037}
2038
2039fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
2046 const SPAM_NAMES: &[&str] = &[
2048 "spam",
2049 "junk",
2050 "Correio electrónico não solicitado",
2051 "Correo basura",
2052 "Lixo",
2053 "Nettsøppel",
2054 "Nevyžádaná pošta",
2055 "No solicitado",
2056 "Ongewenst",
2057 "Posta indesiderata",
2058 "Skräp",
2059 "Wiadomości-śmieci",
2060 "Önemsiz",
2061 "Ανεπιθύμητα",
2062 "Спам",
2063 "垃圾邮件",
2064 "垃圾郵件",
2065 "迷惑メール",
2066 "스팸",
2067 ];
2068 const TRASH_NAMES: &[&str] = &[
2069 "Trash",
2070 "Bin",
2071 "Caixote do lixo",
2072 "Cestino",
2073 "Corbeille",
2074 "Papelera",
2075 "Papierkorb",
2076 "Papirkurv",
2077 "Papperskorgen",
2078 "Prullenbak",
2079 "Rubujo",
2080 "Κάδος απορριμμάτων",
2081 "Корзина",
2082 "Кошик",
2083 "ゴミ箱",
2084 "垃圾桶",
2085 "已删除邮件",
2086 "휴지통",
2087 ];
2088 let lower = folder_name.to_lowercase();
2089
2090 if lower == "inbox" {
2091 FolderMeaning::Inbox
2092 } else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
2093 FolderMeaning::Spam
2094 } else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
2095 FolderMeaning::Trash
2096 } else {
2097 FolderMeaning::Unknown
2098 }
2099}
2100
2101fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
2102 for attr in folder_attrs {
2103 match attr {
2104 NameAttribute::Trash => return FolderMeaning::Trash,
2105 NameAttribute::Junk => return FolderMeaning::Spam,
2106 NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
2107 NameAttribute::Extension(label) => {
2108 match label.as_ref() {
2109 "\\Spam" => return FolderMeaning::Spam,
2110 "\\Important" => return FolderMeaning::Virtual,
2111 _ => {}
2112 };
2113 }
2114 _ => {}
2115 }
2116 }
2117 FolderMeaning::Unknown
2118}
2119
2120pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
2121 match get_folder_meaning_by_attrs(folder.attributes()) {
2122 FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
2123 meaning => meaning,
2124 }
2125}
2126
2127fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader<'_>>> {
2129 match prefetch_msg.header() {
2130 Some(header_bytes) => {
2131 let (headers, _) = mailparse::parse_headers(header_bytes)?;
2132 Ok(headers)
2133 }
2134 None => Ok(Vec::new()),
2135 }
2136}
2137
2138pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
2139 headers
2140 .get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
2141 .or_else(|| headers.get_header_value(HeaderDef::MessageId))
2142 .and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
2143}
2144
2145pub(crate) fn create_message_id() -> String {
2146 format!("{}{}", GENERATED_PREFIX, create_id())
2147}
2148
2149pub(crate) async fn prefetch_should_download(
2151 context: &Context,
2152 headers: &[mailparse::MailHeader<'_>],
2153 message_id: &str,
2154 mut flags: impl Iterator<Item = Flag<'_>>,
2155) -> Result<bool> {
2156 if message::rfc724_mid_download_tried(context, message_id).await? {
2157 if let Some(from) = mimeparser::get_from(headers)
2158 && context.is_self_addr(&from.addr).await?
2159 {
2160 markseen_on_imap_table(context, message_id).await?;
2161 }
2162 return Ok(false);
2163 }
2164
2165 let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
2169 let from = from.to_ascii_lowercase();
2170 from.contains("mailer-daemon") || from.contains("mail-daemon")
2171 } else {
2172 false
2173 };
2174
2175 let from = match mimeparser::get_from(headers) {
2176 Some(f) => f,
2177 None => return Ok(false),
2178 };
2179 let (_from_id, blocked_contact, _origin) =
2180 match from_field_to_contact_id(context, &from, None, true, true).await? {
2181 Some(res) => res,
2182 None => return Ok(false),
2183 };
2184 if flags.any(|f| f == Flag::Draft) {
2188 info!(context, "Ignoring draft message");
2189 return Ok(false);
2190 }
2191
2192 let should_download = (!blocked_contact) || maybe_ndn;
2193 Ok(should_download)
2194}
2195
2196async fn mark_seen_by_uid(
2200 context: &Context,
2201 transport_id: u32,
2202 folder: &str,
2203 uid_validity: u32,
2204 uid: u32,
2205) -> Result<Option<ChatId>> {
2206 if let Some((msg_id, chat_id)) = context
2207 .sql
2208 .query_row_optional(
2209 "SELECT id, chat_id FROM msgs
2210 WHERE id > 9 AND rfc724_mid IN (
2211 SELECT rfc724_mid FROM imap
2212 WHERE transport_id=?
2213 AND folder=?
2214 AND uidvalidity=?
2215 AND uid=?
2216 LIMIT 1
2217 )",
2218 (transport_id, &folder, uid_validity, uid),
2219 |row| {
2220 let msg_id: MsgId = row.get(0)?;
2221 let chat_id: ChatId = row.get(1)?;
2222 Ok((msg_id, chat_id))
2223 },
2224 )
2225 .await
2226 .with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
2227 {
2228 let updated = context
2229 .sql
2230 .execute(
2231 "UPDATE msgs SET state=?1
2232 WHERE (state=?2 OR state=?3)
2233 AND id=?4",
2234 (
2235 MessageState::InSeen,
2236 MessageState::InFresh,
2237 MessageState::InNoticed,
2238 msg_id,
2239 ),
2240 )
2241 .await
2242 .with_context(|| format!("failed to update msg {msg_id} state"))?
2243 > 0;
2244
2245 if updated {
2246 msg_id
2247 .start_ephemeral_timer(context)
2248 .await
2249 .with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
2250 Ok(Some(chat_id))
2251 } else {
2252 Ok(None)
2254 }
2255 } else {
2256 Ok(None)
2258 }
2259}
2260
2261pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
2264 context
2265 .sql
2266 .execute(
2267 "INSERT OR IGNORE INTO imap_markseen (id)
2268 SELECT id FROM imap WHERE rfc724_mid=?",
2269 (message_id,),
2270 )
2271 .await?;
2272 context.scheduler.interrupt_inbox().await;
2273
2274 Ok(())
2275}
2276
2277pub(crate) async fn set_uid_next(
2281 context: &Context,
2282 transport_id: u32,
2283 folder: &str,
2284 uid_next: u32,
2285) -> Result<()> {
2286 context
2287 .sql
2288 .execute(
2289 "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
2290 ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
2291 (transport_id, folder, uid_next),
2292 )
2293 .await?;
2294 Ok(())
2295}
2296
2297async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2303 Ok(context
2304 .sql
2305 .query_get_value(
2306 "SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
2307 (transport_id, folder),
2308 )
2309 .await?
2310 .unwrap_or(0))
2311}
2312
2313pub(crate) async fn set_uidvalidity(
2314 context: &Context,
2315 transport_id: u32,
2316 folder: &str,
2317 uidvalidity: u32,
2318) -> Result<()> {
2319 context
2320 .sql
2321 .execute(
2322 "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
2323 ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
2324 (transport_id, folder, uidvalidity),
2325 )
2326 .await?;
2327 Ok(())
2328}
2329
2330async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2331 Ok(context
2332 .sql
2333 .query_get_value(
2334 "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
2335 (transport_id, folder),
2336 )
2337 .await?
2338 .unwrap_or(0))
2339}
2340
2341pub(crate) async fn set_modseq(
2342 context: &Context,
2343 transport_id: u32,
2344 folder: &str,
2345 modseq: u64,
2346) -> Result<()> {
2347 context
2348 .sql
2349 .execute(
2350 "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
2351 ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
2352 (transport_id, folder, modseq),
2353 )
2354 .await?;
2355 Ok(())
2356}
2357
2358async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
2359 Ok(context
2360 .sql
2361 .query_get_value(
2362 "SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
2363 (transport_id, folder),
2364 )
2365 .await?
2366 .unwrap_or(0))
2367}
2368
2369async fn should_ignore_folder(
2374 context: &Context,
2375 folder: &str,
2376 folder_meaning: FolderMeaning,
2377) -> Result<bool> {
2378 if !context.get_config_bool(Config::OnlyFetchMvbox).await? {
2379 return Ok(false);
2380 }
2381 Ok(!(context.is_mvbox(folder).await? || folder_meaning == FolderMeaning::Spam))
2382}
2383
2384#[expect(clippy::arithmetic_side_effects)]
2388fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
2389 let mut ranges: Vec<UidRange> = vec![];
2391
2392 for ¤t in uids {
2393 if let Some(last) = ranges.last_mut()
2394 && last.end + 1 == current
2395 {
2396 last.end = current;
2397 continue;
2398 }
2399
2400 ranges.push(UidRange {
2401 start: current,
2402 end: current,
2403 });
2404 }
2405
2406 let mut result = vec![];
2408 let (mut last_uids, mut last_str) = (Vec::new(), String::new());
2409 for range in ranges {
2410 last_uids.reserve((range.end - range.start + 1).try_into()?);
2411 (range.start..=range.end).for_each(|u| last_uids.push(u));
2412 if !last_str.is_empty() {
2413 last_str.push(',');
2414 }
2415 last_str.push_str(&range.to_string());
2416
2417 if last_str.len() > 990 {
2418 result.push((take(&mut last_uids), take(&mut last_str)));
2419 }
2420 }
2421 result.push((last_uids, last_str));
2422
2423 result.retain(|(_, s)| !s.is_empty());
2424 Ok(result)
2425}
2426
2427struct UidRange {
2428 start: u32,
2429 end: u32,
2430 }
2432
2433impl std::fmt::Display for UidRange {
2434 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2435 if self.start == self.end {
2436 write!(f, "{}", self.start)
2437 } else {
2438 write!(f, "{}:{}", self.start, self.end)
2439 }
2440 }
2441}
2442
2443pub(crate) async fn get_watched_folder_configs(context: &Context) -> Result<Vec<Config>> {
2444 let mut res = vec![Config::ConfiguredInboxFolder];
2445 if context.should_watch_mvbox().await? {
2446 res.push(Config::ConfiguredMvboxFolder);
2447 }
2448 Ok(res)
2449}
2450
2451pub(crate) async fn get_watched_folders(context: &Context) -> Result<Vec<String>> {
2452 let mut res = Vec::new();
2453 for folder_config in get_watched_folder_configs(context).await? {
2454 if let Some(folder) = context.get_config(folder_config).await? {
2455 res.push(folder);
2456 }
2457 }
2458 Ok(res)
2459}
2460
2461#[cfg(test)]
2462mod imap_tests;