1use std::{
7 cmp::max,
8 cmp::min,
9 collections::{BTreeMap, BTreeSet, HashMap},
10 iter::Peekable,
11 mem::take,
12 sync::atomic::Ordering,
13 time::{Duration, UNIX_EPOCH},
14};
15
16use anyhow::{Context as _, Result, bail, ensure, format_err};
17use async_channel::{self, Receiver, Sender};
18use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
19use futures::{FutureExt as _, TryStreamExt};
20use futures_lite::FutureExt;
21use ratelimit::Ratelimit;
22use url::Url;
23
24use crate::chat::{self, ChatId, ChatIdBlocked, add_device_msg};
25use crate::chatlist_events;
26use crate::config::Config;
27use crate::constants::{Blocked, DC_VERSION_STR};
28use crate::contact::ContactId;
29use crate::context::Context;
30use crate::ensure_and_debug_assert;
31use crate::events::EventType;
32use crate::headerdef::{HeaderDef, HeaderDefMap};
33use crate::log::{LogExt, warn};
34use crate::message::{self, Message, MessageState, MsgId};
35use crate::mimeparser;
36use crate::net::proxy::ProxyConfig;
37use crate::net::session::SessionStream;
38use crate::oauth2::get_oauth2_access_token;
39use crate::push::encrypt_device_token;
40use crate::receive_imf::{
41 ReceivedMsg, from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner,
42};
43use crate::scheduler::connectivity::ConnectivityStore;
44use crate::stock_str;
45use crate::tools::{self, create_id, duration_to_str, time};
46use crate::transport::{
47 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
48};
49use crate::{
50 calls::{UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata},
51 ephemeral::delete_expired_imap_messages,
52};
53
54pub(crate) mod capabilities;
55mod client;
56mod idle;
57pub mod select_folder;
58pub(crate) mod session;
59
60use client::{Client, determine_capabilities};
61use session::Session;
62
63pub(crate) const GENERATED_PREFIX: &str = "GEN_";
64
65const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
66 MESSAGE-ID \
67 X-MICROSOFT-ORIGINAL-MESSAGE-ID\
68 )])";
69const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
70
71#[derive(Debug)]
72pub(crate) struct Imap {
73 transport_id: u32,
77
78 pub(crate) idle_interrupt_receiver: Receiver<()>,
79
80 pub(crate) addr: String,
82
83 lp: Vec<ConfiguredServerLoginParam>,
85
86 password: String,
88
89 proxy_config: Option<ProxyConfig>,
91
92 strict_tls: bool,
93
94 oauth2: bool,
95
96 pub(crate) folder: String,
98
99 authentication_failed_once: bool,
100
101 pub(crate) connectivity: ConnectivityStore,
102
103 conn_last_try: tools::Time,
104 conn_backoff_ms: u64,
105
106 ratelimit: Ratelimit,
114
115 pub(crate) resync_request_sender: async_channel::Sender<()>,
117
118 pub(crate) resync_request_receiver: async_channel::Receiver<()>,
120}
121
122#[derive(Debug)]
123struct OAuth2 {
124 user: String,
125 access_token: String,
126}
127
128#[derive(Debug, Default)]
129pub(crate) struct ServerMetadata {
130 pub comment: Option<String>,
133
134 pub admin: Option<String>,
137
138 pub iroh_relay: Option<Url>,
139
140 pub ice_servers: Vec<UnresolvedIceServer>,
142
143 pub ice_servers_expiration_timestamp: i64,
150}
151
152impl async_imap::Authenticator for OAuth2 {
153 type Response = String;
154
155 fn process(&mut self, _data: &[u8]) -> Self::Response {
156 format!(
157 "user={}\x01auth=Bearer {}\x01\x01",
158 self.user, self.access_token
159 )
160 }
161}
162
163#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
164pub enum FolderMeaning {
165 Unknown,
166
167 Spam,
169 Inbox,
170 Trash,
171
172 Virtual,
179}
180
181struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
182 inner: Peekable<T>,
183}
184
185impl<T, I> From<I> for UidGrouper<T>
186where
187 T: Iterator<Item = (i64, u32, String)>,
188 I: IntoIterator<IntoIter = T>,
189{
190 fn from(inner: I) -> Self {
191 Self {
192 inner: inner.into_iter().peekable(),
193 }
194 }
195}
196
197impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
198 type Item = (String, Vec<i64>, String);
200
201 #[expect(clippy::arithmetic_side_effects)]
202 fn next(&mut self) -> Option<Self::Item> {
203 let (_, _, folder) = self.inner.peek().cloned()?;
204
205 let mut uid_set = String::new();
206 let mut rowid_set = Vec::new();
207
208 while uid_set.len() < 1000 {
209 if let Some((start_rowid, start_uid, _)) = self
211 .inner
212 .next_if(|(_, _, start_folder)| start_folder == &folder)
213 {
214 rowid_set.push(start_rowid);
215 let mut end_uid = start_uid;
216
217 while let Some((next_rowid, next_uid, _)) =
218 self.inner.next_if(|(_, next_uid, next_folder)| {
219 next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
220 })
221 {
222 end_uid = next_uid;
223 rowid_set.push(next_rowid);
224 }
225
226 let uid_range = UidRange {
227 start: start_uid,
228 end: end_uid,
229 };
230 if !uid_set.is_empty() {
231 uid_set.push(',');
232 }
233 uid_set.push_str(&uid_range.to_string());
234 } else {
235 break;
236 }
237 }
238
239 Some((folder, rowid_set, uid_set))
240 }
241}
242
243impl Imap {
244 pub async fn new(
246 context: &Context,
247 transport_id: u32,
248 param: ConfiguredLoginParam,
249 idle_interrupt_receiver: Receiver<()>,
250 ) -> Result<Self> {
251 let lp = param.imap.clone();
252 let password = param.imap_password.clone();
253 let proxy_config = ProxyConfig::load(context).await?;
254 let addr = ¶m.addr;
255 let strict_tls = param.strict_tls(proxy_config.is_some());
256 let oauth2 = param.oauth2;
257 let folder = param
258 .imap_folder
259 .clone()
260 .unwrap_or_else(|| "INBOX".to_string());
261 ensure_and_debug_assert!(!folder.is_empty(), "Watched folder name cannot be empty");
262 let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
263 Ok(Imap {
264 transport_id,
265 idle_interrupt_receiver,
266 addr: addr.to_string(),
267 lp,
268 password,
269 proxy_config,
270 strict_tls,
271 oauth2,
272 folder,
273 authentication_failed_once: false,
274 connectivity: Default::default(),
275 conn_last_try: UNIX_EPOCH,
276 conn_backoff_ms: 0,
277 ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
279 resync_request_sender,
280 resync_request_receiver,
281 })
282 }
283
284 pub async fn new_configured(
286 context: &Context,
287 idle_interrupt_receiver: Receiver<()>,
288 ) -> Result<Self> {
289 let (transport_id, param) = ConfiguredLoginParam::load(context)
290 .await?
291 .context("Not configured")?;
292 let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
293 Ok(imap)
294 }
295
296 pub fn transport_id(&self) -> u32 {
298 self.transport_id
299 }
300
301 pub(crate) async fn connect(
307 &mut self,
308 context: &Context,
309 configuring: bool,
310 ) -> Result<Session> {
311 let now = tools::Time::now();
312 let until_can_send = max(
313 min(self.conn_last_try, now)
314 .checked_add(Duration::from_millis(self.conn_backoff_ms))
315 .unwrap_or(now),
316 now,
317 )
318 .duration_since(now)?;
319 let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
320 if !ratelimit_duration.is_zero() {
321 warn!(
322 context,
323 "IMAP got rate limited, waiting for {} until can connect.",
324 duration_to_str(ratelimit_duration),
325 );
326 let interrupted = async {
327 tokio::time::sleep(ratelimit_duration).await;
328 false
329 }
330 .race(self.idle_interrupt_receiver.recv().map(|_| true))
331 .await;
332 if interrupted {
333 info!(
334 context,
335 "Connecting to IMAP without waiting for ratelimit due to interrupt."
336 );
337 }
338 }
339
340 info!(context, "Connecting to IMAP server.");
341 self.connectivity.set_connecting(context);
342
343 self.conn_last_try = tools::Time::now();
344 const BACKOFF_MIN_MS: u64 = 2000;
345 const BACKOFF_MAX_MS: u64 = 80_000;
346 self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
347 self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(rand::random_range(
348 (self.conn_backoff_ms / 2)..=self.conn_backoff_ms,
349 ));
350 self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
351
352 let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
353 let mut first_error = None;
354 for lp in login_params {
355 info!(context, "IMAP trying to connect to {}.", &lp.connection);
356 let connection_candidate = lp.connection.clone();
357 let client = match Client::connect(
358 context,
359 self.proxy_config.clone(),
360 self.strict_tls,
361 &connection_candidate,
362 )
363 .await
364 .with_context(|| format!("IMAP failed to connect to {connection_candidate}"))
365 {
366 Ok(client) => client,
367 Err(err) => {
368 warn!(context, "{err:#}.");
369 first_error.get_or_insert(err);
370 continue;
371 }
372 };
373
374 self.conn_backoff_ms = BACKOFF_MIN_MS;
375 self.ratelimit.send();
376
377 let imap_user: &str = lp.user.as_ref();
378 let imap_pw: &str = &self.password;
379
380 let login_res = if self.oauth2 {
381 info!(context, "Logging into IMAP server with OAuth 2.");
382 let addr: &str = self.addr.as_ref();
383
384 let token = get_oauth2_access_token(context, addr, imap_pw, true)
385 .await?
386 .context("IMAP could not get OAUTH token")?;
387 let auth = OAuth2 {
388 user: imap_user.into(),
389 access_token: token,
390 };
391 client.authenticate("XOAUTH2", auth).await
392 } else {
393 info!(context, "Logging into IMAP server with LOGIN.");
394 client.login(imap_user, imap_pw).await
395 };
396
397 match login_res {
398 Ok(mut session) => {
399 let capabilities = determine_capabilities(&mut session).await?;
400 let resync_request_sender = self.resync_request_sender.clone();
401
402 let session = if capabilities.can_compress {
403 info!(context, "Enabling IMAP compression.");
404 let compressed_session = session
405 .compress(|s| {
406 let session_stream: Box<dyn SessionStream> = Box::new(s);
407 session_stream
408 })
409 .await
410 .context("Failed to enable IMAP compression")?;
411 Session::new(
412 compressed_session,
413 capabilities,
414 resync_request_sender,
415 self.transport_id,
416 )
417 } else {
418 Session::new(
419 session,
420 capabilities,
421 resync_request_sender,
422 self.transport_id,
423 )
424 };
425
426 let mut lock = context.server_id.write().await;
428 lock.clone_from(&session.capabilities.server_id);
429
430 self.authentication_failed_once = false;
431 context.emit_event(EventType::ImapConnected(format!(
432 "IMAP-LOGIN as {}",
433 lp.user
434 )));
435 self.connectivity.set_preparing(context);
436 info!(context, "Successfully logged into IMAP server.");
437 return Ok(session);
438 }
439
440 Err(err) => {
441 let imap_user = lp.user.to_owned();
442 let message = stock_str::cannot_login(context, &imap_user);
443
444 warn!(context, "IMAP failed to login: {err:#}.");
445 first_error.get_or_insert(format_err!("{message} ({err:#})"));
446
447 let _lock = context.wrong_pw_warning_mutex.lock().await;
449 if err.to_string().to_lowercase().contains("authentication") {
450 if self.authentication_failed_once
451 && !configuring
452 && context.get_config_bool(Config::NotifyAboutWrongPw).await?
453 {
454 let mut msg = Message::new_text(message);
455 if let Err(e) = chat::add_device_msg_with_importance(
456 context,
457 None,
458 Some(&mut msg),
459 true,
460 )
461 .await
462 {
463 warn!(context, "Failed to add device message: {e:#}.");
464 } else {
465 context
466 .set_config_internal(Config::NotifyAboutWrongPw, None)
467 .await
468 .log_err(context)
469 .ok();
470 }
471 } else {
472 self.authentication_failed_once = true;
473 }
474 } else {
475 self.authentication_failed_once = false;
476 }
477 }
478 }
479 }
480
481 Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
482 }
483
484 pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
489 let configuring = false;
490 let session = match self.connect(context, configuring).await {
491 Ok(session) => session,
492 Err(err) => {
493 self.connectivity.set_err(context, format!("{err:#}"));
494 return Err(err);
495 }
496 };
497
498 Ok(session)
499 }
500
501 pub async fn fetch_move_delete(
506 &mut self,
507 context: &Context,
508 session: &mut Session,
509 watch_folder: &str,
510 ) -> Result<()> {
511 ensure_and_debug_assert!(!watch_folder.is_empty(), "Watched folder cannot be empty");
512 if !context.sql.is_open().await {
513 bail!("IMAP operation attempted while it is torn down");
515 }
516
517 let msgs_fetched = self
518 .fetch_new_messages(context, session, watch_folder)
519 .await
520 .context("fetch_new_messages")?;
521 if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
522 context.scheduler.interrupt_ephemeral_task().await;
527 }
528
529 delete_expired_imap_messages(context, session.transport_id(), session.is_chatmail())
532 .await
533 .context("delete_expired_imap_messages")?;
534
535 session
536 .move_delete_messages(context, watch_folder)
537 .await
538 .context("move_delete_messages")?;
539
540 Ok(())
541 }
542
543 #[expect(clippy::arithmetic_side_effects)]
547 pub(crate) async fn fetch_new_messages(
548 &mut self,
549 context: &Context,
550 session: &mut Session,
551 folder: &str,
552 ) -> Result<bool> {
553 let transport_id = session.transport_id();
554
555 let folder_exists = session
556 .select_with_uidvalidity(context, folder)
557 .await
558 .with_context(|| format!("Failed to select folder {folder:?}"))?;
559
560 if !session.new_mail {
561 info!(
562 context,
563 "Transport {transport_id}: No new emails in folder {folder:?}."
564 );
565 return Ok(false);
566 }
567 session.new_mail = false;
570
571 if !folder_exists {
572 return Ok(false);
573 }
574
575 let mut read_cnt = 0;
576 loop {
577 let (n, fetch_more) =
578 Box::pin(self.fetch_new_msg_batch(context, session, folder)).await?;
579 read_cnt += n;
580 if !fetch_more {
581 return Ok(read_cnt > 0);
582 }
583 }
584 }
585
586 #[expect(clippy::arithmetic_side_effects)]
588 async fn fetch_new_msg_batch(
589 &mut self,
590 context: &Context,
591 session: &mut Session,
592 folder: &str,
593 ) -> Result<(usize, bool)> {
594 let transport_id = self.transport_id;
595 let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
596 let old_uid_next = get_uid_next(context, transport_id, folder).await?;
597 info!(
598 context,
599 "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
600 );
601
602 let uids_to_prefetch = 500;
603 let msgs = session
604 .prefetch(old_uid_next, uids_to_prefetch)
605 .await
606 .context("prefetch")?;
607 let read_cnt = msgs.len();
608 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
609
610 let mut uids_fetch: Vec<u32> = Vec::new();
611 let mut available_post_msgs: Vec<String> = Vec::new();
612 let mut download_later: Vec<String> = Vec::new();
613 let mut uid_message_ids = BTreeMap::new();
614 let mut largest_uid_skipped = None;
615
616 let download_limit: Option<u32> = context
617 .get_config_parsed(Config::DownloadLimit)
618 .await?
619 .filter(|&l| 0 < l);
620
621 for (uid, ref fetch_response) in msgs {
623 let headers = match get_fetch_headers(fetch_response) {
624 Ok(headers) => headers,
625 Err(err) => {
626 warn!(context, "Failed to parse FETCH headers: {err:#}.");
627 continue;
628 }
629 };
630
631 let message_id = prefetch_get_message_id(&headers);
632 let size = fetch_response
633 .size
634 .context("imap fetch response does not contain size")?;
635
636 let delete = if let Some(message_id) = &message_id {
647 message::rfc724_mid_exists_ex(context, message_id, "deleted=1")
648 .await?
649 .is_some_and(|(_msg_id, deleted)| deleted)
650 } else {
651 false
652 };
653
654 let message_id = message_id.unwrap_or_else(create_message_id);
657
658 if delete {
659 info!(context, "Deleting locally deleted message {message_id}.");
660 }
661
662 let target = if delete { "" } else { folder };
663
664 context
665 .sql
666 .execute(
667 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
668 VALUES (?, ?, ?, ?, ?, ?)
669 ON CONFLICT(transport_id, folder, uid, uidvalidity)
670 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
671 target=excluded.target",
672 (
673 self.transport_id,
674 &message_id,
675 &folder,
676 uid,
677 uid_validity,
678 target,
679 ),
680 )
681 .await?;
682
683 if folder == target
690 && prefetch_should_download(context, &headers, &message_id, fetch_response.flags())
691 .await
692 .context("prefetch_should_download")?
693 {
694 if headers
695 .get_header_value(HeaderDef::ChatIsPostMessage)
696 .is_some()
697 {
698 info!(context, "{message_id:?} is a post-message.");
699 available_post_msgs.push(message_id.clone());
700
701 let is_bot = context.get_config_bool(Config::Bot).await?;
702 if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
703 {
704 uids_fetch.push(uid);
705 uid_message_ids.insert(uid, message_id);
706 } else {
707 if download_limit.is_none_or(|download_limit| size <= download_limit) {
708 download_later.push(message_id.clone());
711 }
712 largest_uid_skipped = Some(uid);
713 }
714 } else {
715 info!(context, "{message_id:?} is not a post-message.");
716 if download_limit.is_none_or(|download_limit| size <= download_limit) {
717 uids_fetch.push(uid);
718 uid_message_ids.insert(uid, message_id);
719 } else {
720 download_later.push(message_id.clone());
721 largest_uid_skipped = Some(uid);
722 }
723 };
724 } else {
725 largest_uid_skipped = Some(uid);
726 }
727 }
728
729 if !uids_fetch.is_empty() {
730 self.connectivity.set_working(context);
731 }
732
733 let (sender, receiver) = async_channel::unbounded();
734
735 let mut received_msgs = Vec::with_capacity(uids_fetch.len());
736 let mailbox_uid_next = session
737 .selected_mailbox
738 .as_ref()
739 .with_context(|| format!("Expected {folder:?} to be selected"))?
740 .uid_next
741 .unwrap_or_default();
742
743 let update_uids_future = async {
744 let mut largest_uid_fetched: u32 = 0;
745
746 while let Ok((uid, received_msg_opt)) = receiver.recv().await {
747 largest_uid_fetched = max(largest_uid_fetched, uid);
748 if let Some(received_msg) = received_msg_opt {
749 received_msgs.push(received_msg)
750 }
751 }
752
753 largest_uid_fetched
754 };
755
756 let actually_download_messages_future = async {
757 session
758 .fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
759 .await
760 .context("fetch_many_msgs")
761 };
762
763 let (largest_uid_fetched, fetch_res) =
764 tokio::join!(update_uids_future, actually_download_messages_future);
765
766 let mut new_uid_next = largest_uid_fetched + 1;
772 let fetch_more = fetch_res.is_ok() && {
773 let prefetch_uid_next = old_uid_next + uids_to_prefetch;
774 new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
778
779 new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
780
781 prefetch_uid_next < mailbox_uid_next
782 };
783 if new_uid_next > old_uid_next {
784 set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
785 }
786
787 info!(context, "{} mails read from \"{}\".", read_cnt, folder);
788
789 if !received_msgs.is_empty() {
790 context.emit_event(EventType::IncomingMsgBunch);
791 }
792
793 chat::mark_old_messages_as_noticed(context, received_msgs).await?;
794
795 if fetch_res.is_ok() {
796 info!(
797 context,
798 "available_post_msgs: {}, download_later: {}.",
799 available_post_msgs.len(),
800 download_later.len(),
801 );
802 let trans_fn = |t: &mut rusqlite::Transaction| {
803 let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
804 for rfc724_mid in available_post_msgs {
805 stmt.execute((rfc724_mid,))
806 .context("INSERT OR IGNORE INTO available_post_msgs")?;
807 }
808 let mut stmt =
809 t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
810 for rfc724_mid in download_later {
811 stmt.execute((rfc724_mid,))
812 .context("INSERT OR IGNORE INTO download")?;
813 }
814 Ok(())
815 };
816 context.sql.transaction(trans_fn).await?;
817 }
818
819 fetch_res?;
822
823 Ok((read_cnt, fetch_more))
824 }
825}
826
827impl Session {
828 pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
830 let all_folders = self
831 .list_folders()
832 .await
833 .context("listing folders for resync")?;
834 for folder in all_folders {
835 let folder_meaning = get_folder_meaning(&folder);
836 if !matches!(
837 folder_meaning,
838 FolderMeaning::Virtual | FolderMeaning::Unknown
839 ) {
840 self.resync_folder_uids(context, folder.name(), folder_meaning)
841 .await?;
842 }
843 }
844 Ok(())
845 }
846
847 pub(crate) async fn resync_folder_uids(
854 &mut self,
855 context: &Context,
856 folder: &str,
857 folder_meaning: FolderMeaning,
858 ) -> Result<()> {
859 let uid_validity;
860 let mut msgs = BTreeMap::new();
862
863 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
864 let transport_id = self.transport_id();
865 if folder_exists {
866 let mut list = self
867 .uid_fetch("1:*", RFC724MID_UID)
868 .await
869 .with_context(|| format!("Can't resync folder {folder}"))?;
870 while let Some(fetch) = list.try_next().await? {
871 let headers = match get_fetch_headers(&fetch) {
872 Ok(headers) => headers,
873 Err(err) => {
874 warn!(context, "Failed to parse FETCH headers: {}", err);
875 continue;
876 }
877 };
878 let message_id = prefetch_get_message_id(&headers);
879
880 if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
881 msgs.insert(
882 uid,
883 (
884 rfc724_mid,
885 target_folder(context, folder, folder_meaning, &headers).await?,
886 ),
887 );
888 }
889 }
890
891 info!(
892 context,
893 "resync_folder_uids: Collected {} message IDs in {folder}.",
894 msgs.len(),
895 );
896
897 uid_validity = get_uidvalidity(context, transport_id, folder).await?;
898 } else {
899 warn!(context, "resync_folder_uids: No folder {folder}.");
900 uid_validity = 0;
901 }
902
903 context
905 .sql
906 .transaction(move |transaction| {
907 transaction.execute("DELETE FROM imap WHERE transport_id=? AND folder=?", (transport_id, folder,))?;
908 for (uid, (rfc724_mid, target)) in &msgs {
909 transaction.execute(
912 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
913 VALUES (?, ?, ?, ?, ?, ?)
914 ON CONFLICT(transport_id, folder, uid, uidvalidity)
915 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
916 target=excluded.target",
917 (transport_id, rfc724_mid, folder, uid, uid_validity, target),
918 )?;
919 }
920 Ok(())
921 })
922 .await?;
923 Ok(())
924 }
925
926 async fn delete_message_batch(
929 &mut self,
930 context: &Context,
931 uid_set: &str,
932 row_ids: Vec<i64>,
933 ) -> Result<()> {
934 self.add_flag_finalized_with_set(uid_set, "\\Deleted")
936 .await?;
937 context
938 .sql
939 .transaction(|transaction| {
940 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
941 for row_id in row_ids {
942 stmt.execute((row_id,))?;
943 }
944 Ok(())
945 })
946 .await
947 .context("Cannot remove deleted messages from imap table")?;
948
949 context.emit_event(EventType::ImapMessageDeleted(format!(
950 "IMAP messages {uid_set} marked as deleted"
951 )));
952 Ok(())
953 }
954
955 async fn move_message_batch(
958 &mut self,
959 context: &Context,
960 set: &str,
961 row_ids: Vec<i64>,
962 target: &str,
963 ) -> Result<()> {
964 if self.can_move() {
965 match self.uid_mv(set, &target).await {
966 Ok(()) => {
967 context
969 .sql
970 .transaction(|transaction| {
971 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
972 for row_id in row_ids {
973 stmt.execute((row_id,))?;
974 }
975 Ok(())
976 })
977 .await
978 .context("Cannot delete moved messages from imap table")?;
979 context.emit_event(EventType::ImapMessageMoved(format!(
980 "IMAP messages {set} moved to {target}"
981 )));
982 return Ok(());
983 }
984 Err(err) => {
985 warn!(
986 context,
987 "Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
988 set,
989 target,
990 err
991 );
992 }
993 }
994 }
995
996 info!(
999 context,
1000 "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
1001 );
1002 self.uid_copy(&set, &target).await?;
1003 context
1004 .sql
1005 .transaction(|transaction| {
1006 let mut stmt = transaction.prepare("UPDATE imap SET target='' WHERE id = ?")?;
1007 for row_id in row_ids {
1008 stmt.execute((row_id,))?;
1009 }
1010 Ok(())
1011 })
1012 .await
1013 .context("Cannot plan deletion of messages")?;
1014 context.emit_event(EventType::ImapMessageMoved(format!(
1015 "IMAP messages {set} copied to {target}"
1016 )));
1017 Ok(())
1018 }
1019
1020 async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
1024 let transport_id = self.transport_id();
1025 let rows = context
1026 .sql
1027 .query_map_vec(
1028 "SELECT id, uid, target FROM imap
1029 WHERE folder = ?
1030 AND transport_id = ?
1031 AND target != folder
1032 ORDER BY target, uid",
1033 (folder, transport_id),
1034 |row| {
1035 let rowid: i64 = row.get(0)?;
1036 let uid: u32 = row.get(1)?;
1037 let target: String = row.get(2)?;
1038 Ok((rowid, uid, target))
1039 },
1040 )
1041 .await?;
1042
1043 for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
1044 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1049 ensure!(folder_exists, "No folder {folder}");
1050
1051 if target.is_empty() {
1053 self.delete_message_batch(context, &uid_set, rowid_set)
1054 .await
1055 .with_context(|| format!("cannot delete batch of messages {uid_set:?}"))?;
1056 } else {
1057 self.move_message_batch(context, &uid_set, rowid_set, &target)
1058 .await
1059 .with_context(|| {
1060 format!("cannot move batch of messages {uid_set:?} to folder {target:?}",)
1061 })?;
1062 }
1063 }
1064
1065 if let Err(err) = self.maybe_close_folder(context).await {
1068 warn!(context, "Failed to close folder: {err:#}.");
1069 }
1070
1071 Ok(())
1072 }
1073
1074 pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
1076 if context.get_config_bool(Config::TeamProfile).await? {
1077 return Ok(());
1078 }
1079
1080 context
1081 .sql
1082 .execute(
1083 "DELETE FROM imap_markseen WHERE id NOT IN (SELECT imap.id FROM imap)",
1084 (),
1085 )
1086 .await?;
1087
1088 let transport_id = self.transport_id();
1089 let mut rows = context
1090 .sql
1091 .query_map_vec(
1092 "SELECT imap.id, uid, folder FROM imap, imap_markseen
1093 WHERE imap.id = imap_markseen.id
1094 AND imap.transport_id=?
1095 AND target = folder",
1096 (transport_id,),
1097 |row| {
1098 let rowid: i64 = row.get(0)?;
1099 let uid: u32 = row.get(1)?;
1100 let folder: String = row.get(2)?;
1101 Ok((rowid, uid, folder))
1102 },
1103 )
1104 .await?;
1105
1106 rows.sort_unstable_by(|(_rowid1, uid1, folder1), (_rowid2, uid2, folder2)| {
1113 (folder1, uid1).cmp(&(folder2, uid2))
1114 });
1115
1116 for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
1117 let folder_exists = match self.select_with_uidvalidity(context, &folder).await {
1118 Err(err) => {
1119 warn!(
1120 context,
1121 "store_seen_flags_on_imap: Failed to select {folder}, will retry later: {err:#}."
1122 );
1123 continue;
1124 }
1125 Ok(folder_exists) => folder_exists,
1126 };
1127 if !folder_exists {
1128 warn!(context, "store_seen_flags_on_imap: No folder {folder}.");
1129 } else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
1130 warn!(
1131 context,
1132 "Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}."
1133 );
1134 continue;
1135 } else {
1136 info!(
1137 context,
1138 "Marked messages {} in folder {} as seen.", uid_set, folder
1139 );
1140 }
1141 context
1142 .sql
1143 .transaction(|transaction| {
1144 let mut stmt = transaction.prepare("DELETE FROM imap_markseen WHERE id = ?")?;
1145 for rowid in rowid_set {
1146 stmt.execute((rowid,))?;
1147 }
1148 Ok(())
1149 })
1150 .await
1151 .context("Cannot remove messages marked as seen from imap_markseen table")?;
1152 }
1153
1154 Ok(())
1155 }
1156
1157 pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
1159 if !self.can_condstore() {
1160 info!(
1161 context,
1162 "Server does not support CONDSTORE, skipping flag synchronization."
1163 );
1164 return Ok(());
1165 }
1166
1167 if context.get_config_bool(Config::TeamProfile).await? {
1168 return Ok(());
1169 }
1170
1171 let folder_exists = self
1172 .select_with_uidvalidity(context, folder)
1173 .await
1174 .context("Failed to select folder")?;
1175 if !folder_exists {
1176 return Ok(());
1177 }
1178
1179 let mailbox = self
1180 .selected_mailbox
1181 .as_ref()
1182 .with_context(|| format!("No mailbox selected, folder: {folder}"))?;
1183
1184 if mailbox.highest_modseq.is_none() {
1187 info!(
1188 context,
1189 "Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
1190 );
1191 return Ok(());
1192 }
1193
1194 let transport_id = self.transport_id();
1195 let mut updated_chat_ids = BTreeSet::new();
1196 let uid_validity = get_uidvalidity(context, transport_id, folder)
1197 .await
1198 .with_context(|| format!("failed to get UID validity for folder {folder}"))?;
1199 let mut highest_modseq = get_modseq(context, transport_id, folder)
1200 .await
1201 .with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
1202 let mut list = self
1203 .uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
1204 .await
1205 .context("failed to fetch flags")?;
1206
1207 let mut got_unsolicited_fetch = false;
1208
1209 while let Some(fetch) = list
1210 .try_next()
1211 .await
1212 .context("failed to get FETCH result")?
1213 {
1214 let uid = if let Some(uid) = fetch.uid {
1215 uid
1216 } else {
1217 info!(context, "FETCH result contains no UID, skipping");
1218 got_unsolicited_fetch = true;
1219 continue;
1220 };
1221 let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
1222 if is_seen
1223 && let Some(chat_id) = mark_seen_by_uid(context, transport_id, folder, uid_validity, uid)
1224 .await
1225 .with_context(|| {
1226 format!("Transport {transport_id}: Failed to update seen status for msg {folder}/{uid}")
1227 })?
1228 {
1229 updated_chat_ids.insert(chat_id);
1230 }
1231
1232 if let Some(modseq) = fetch.modseq {
1233 if modseq > highest_modseq {
1234 highest_modseq = modseq;
1235 }
1236 } else {
1237 warn!(context, "FETCH result contains no MODSEQ");
1238 }
1239 }
1240 drop(list);
1241
1242 if got_unsolicited_fetch {
1243 info!(context, "Got unsolicited fetch, will skip idle");
1248 self.new_mail = true;
1249 }
1250
1251 set_modseq(context, transport_id, folder, highest_modseq)
1252 .await
1253 .with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
1254 if !updated_chat_ids.is_empty() {
1255 context.on_archived_chats_maybe_noticed();
1256 }
1257 for updated_chat_id in updated_chat_ids {
1258 context.emit_event(EventType::MsgsNoticed(updated_chat_id));
1259 chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
1260 }
1261
1262 Ok(())
1263 }
1264
1265 #[expect(clippy::arithmetic_side_effects)]
1280 pub(crate) async fn fetch_many_msgs(
1281 &mut self,
1282 context: &Context,
1283 folder: &str,
1284 request_uids: Vec<u32>,
1285 uid_message_ids: &BTreeMap<u32, String>,
1286 received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1287 ) -> Result<()> {
1288 if request_uids.is_empty() {
1289 return Ok(());
1290 }
1291
1292 for (request_uids, set) in build_sequence_sets(&request_uids)? {
1293 info!(context, "Starting UID FETCH of message set \"{}\".", set);
1294 let mut fetch_responses = self
1295 .uid_fetch(&set, BODY_FULL)
1296 .await
1297 .with_context(|| format!("fetching messages {set} from folder {folder:?}"))?;
1298
1299 let mut uid_msgs = HashMap::with_capacity(request_uids.len());
1302
1303 let mut count = 0;
1304 for &request_uid in &request_uids {
1305 let mut fetch_response = uid_msgs.remove(&request_uid);
1307
1308 while fetch_response.is_none() {
1310 let Some(next_fetch_response) = fetch_responses
1311 .try_next()
1312 .await
1313 .context("Failed to process IMAP FETCH result")?
1314 else {
1315 break;
1317 };
1318
1319 if let Some(next_uid) = next_fetch_response.uid {
1320 if next_uid == request_uid {
1321 fetch_response = Some(next_fetch_response);
1322 } else if !request_uids.contains(&next_uid) {
1323 info!(
1330 context,
1331 "Skipping not requested FETCH response for UID {}.", next_uid
1332 );
1333 } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
1334 warn!(context, "Got duplicated UID {}.", next_uid);
1335 }
1336 } else {
1337 info!(context, "Skipping FETCH response without UID.");
1338 }
1339 }
1340
1341 let fetch_response = match fetch_response {
1342 Some(fetch) => fetch,
1343 None => {
1344 warn!(
1345 context,
1346 "Missed UID {} in the server response.", request_uid
1347 );
1348 continue;
1349 }
1350 };
1351 count += 1;
1352
1353 let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
1354 let body = fetch_response.body();
1355
1356 if is_deleted {
1357 info!(context, "Not processing deleted msg {}.", request_uid);
1358 received_msgs_channel.send((request_uid, None)).await?;
1359 continue;
1360 }
1361
1362 let body = if let Some(body) = body {
1363 body
1364 } else {
1365 info!(
1366 context,
1367 "Not processing message {} without a BODY.", request_uid
1368 );
1369 received_msgs_channel.send((request_uid, None)).await?;
1370 continue;
1371 };
1372
1373 let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
1374
1375 let Some(rfc724_mid) = uid_message_ids.get(&request_uid) else {
1376 error!(
1377 context,
1378 "No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
1379 request_uid
1380 );
1381 continue;
1382 };
1383
1384 info!(
1385 context,
1386 "Passing message UID {} to receive_imf().", request_uid
1387 );
1388 let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
1389
1390 let received_msg = match res {
1392 Err(err) => {
1393 warn!(context, "receive_imf error: {err:#}.");
1394
1395 let text = format!(
1396 "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/.",
1397 );
1398 let mut msg = Message::new_text(text);
1399 add_device_msg(context, None, Some(&mut msg)).await?;
1400 None
1401 }
1402 Ok(msg) => msg,
1403 };
1404 received_msgs_channel
1405 .send((request_uid, received_msg))
1406 .await?;
1407 }
1408
1409 while fetch_responses
1416 .try_next()
1417 .await
1418 .context("Failed to drain FETCH responses")?
1419 .is_some()
1420 {}
1421
1422 if count != request_uids.len() {
1423 warn!(
1424 context,
1425 "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
1426 count,
1427 request_uids.len(),
1428 request_uids,
1429 );
1430 } else {
1431 info!(
1432 context,
1433 "Successfully received {} UIDs.",
1434 request_uids.len()
1435 );
1436 }
1437 }
1438
1439 Ok(())
1440 }
1441
1442 #[expect(clippy::arithmetic_side_effects)]
1448 pub(crate) async fn update_metadata(&mut self, context: &Context) -> Result<()> {
1449 let mut lock = context.metadata.write().await;
1450
1451 if !self.can_metadata() {
1452 *lock = Some(Default::default());
1453 }
1454 if let Some(ref mut old_metadata) = *lock {
1455 let now = time();
1456
1457 if now + 3600 * 12 < old_metadata.ice_servers_expiration_timestamp {
1459 return Ok(());
1460 }
1461
1462 let mut got_turn_server = false;
1463 if self.can_metadata() {
1464 info!(context, "ICE servers expired, requesting new credentials.");
1465 let mailbox = "";
1466 let options = "";
1467 let metadata = self
1468 .get_metadata(mailbox, options, "(/shared/vendor/deltachat/turn)")
1469 .await?;
1470 for m in metadata {
1471 if m.entry == "/shared/vendor/deltachat/turn"
1472 && let Some(value) = m.value
1473 {
1474 match create_ice_servers_from_metadata(&value).await {
1475 Ok((parsed_timestamp, parsed_ice_servers)) => {
1476 old_metadata.ice_servers_expiration_timestamp = parsed_timestamp;
1477 old_metadata.ice_servers = parsed_ice_servers;
1478 got_turn_server = true;
1479 }
1480 Err(err) => {
1481 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1482 }
1483 }
1484 }
1485 }
1486 }
1487 if !got_turn_server {
1488 info!(context, "Will use fallback ICE servers.");
1489 old_metadata.ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1491 old_metadata.ice_servers = create_fallback_ice_servers();
1492 }
1493 return Ok(());
1494 }
1495
1496 info!(
1497 context,
1498 "Server supports metadata, retrieving server comment and admin contact."
1499 );
1500
1501 let mut comment = None;
1502 let mut admin = None;
1503 let mut iroh_relay = None;
1504 let mut ice_servers = None;
1505 let mut ice_servers_expiration_timestamp = 0;
1506
1507 let mailbox = "";
1508 let options = "";
1509 let metadata = self
1510 .get_metadata(
1511 mailbox,
1512 options,
1513 "(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
1514 )
1515 .await?;
1516 for m in metadata {
1517 match m.entry.as_ref() {
1518 "/shared/comment" => {
1519 comment = m.value;
1520 }
1521 "/shared/admin" => {
1522 admin = m.value;
1523 }
1524 "/shared/vendor/deltachat/irohrelay" => {
1525 if let Some(value) = m.value {
1526 if let Ok(url) = Url::parse(&value) {
1527 iroh_relay = Some(url);
1528 } else {
1529 warn!(
1530 context,
1531 "Got invalid URL from iroh relay metadata: {:?}.", value
1532 );
1533 }
1534 }
1535 }
1536 "/shared/vendor/deltachat/turn" => {
1537 if let Some(value) = m.value {
1538 match create_ice_servers_from_metadata(&value).await {
1539 Ok((parsed_timestamp, parsed_ice_servers)) => {
1540 ice_servers_expiration_timestamp = parsed_timestamp;
1541 ice_servers = Some(parsed_ice_servers);
1542 }
1543 Err(err) => {
1544 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1545 }
1546 }
1547 }
1548 }
1549 _ => {}
1550 }
1551 }
1552 let ice_servers = if let Some(ice_servers) = ice_servers {
1553 ice_servers
1554 } else {
1555 ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1557 create_fallback_ice_servers()
1558 };
1559
1560 *lock = Some(ServerMetadata {
1561 comment,
1562 admin,
1563 iroh_relay,
1564 ice_servers,
1565 ice_servers_expiration_timestamp,
1566 });
1567 Ok(())
1568 }
1569
1570 pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
1572 if context.push_subscribed.load(Ordering::Relaxed) {
1573 return Ok(());
1574 }
1575
1576 let transport_id = self.transport_id();
1577
1578 let Some(device_token) = context.push_subscriber.device_token().await else {
1579 return Ok(());
1580 };
1581
1582 if self.can_metadata() && self.can_push() {
1583 info!(
1584 context,
1585 "Transport {transport_id}: Subscribing for push notifications."
1586 );
1587
1588 let old_encrypted_device_token =
1589 context.get_config(Config::EncryptedDeviceToken).await?;
1590
1591 let device_token_changed = old_encrypted_device_token.is_none()
1593 || context.get_config(Config::DeviceToken).await?.as_ref() != Some(&device_token);
1594
1595 let new_encrypted_device_token;
1596 if device_token_changed {
1597 let encrypted_device_token = encrypt_device_token(&device_token)
1598 .context("Failed to encrypt device token")?;
1599
1600 let encrypted_device_token_len = encrypted_device_token.len();
1604
1605 context
1611 .set_config_internal(Config::DeviceToken, Some(&device_token))
1612 .await?;
1613 context
1614 .set_config_internal(
1615 Config::EncryptedDeviceToken,
1616 Some(&encrypted_device_token),
1617 )
1618 .await?;
1619
1620 if encrypted_device_token_len <= 4096 {
1621 new_encrypted_device_token = Some(encrypted_device_token);
1622 } else {
1623 warn!(context, "Device token is too long for LITERAL-, ignoring.");
1633 new_encrypted_device_token = None;
1634 }
1635 } else {
1636 new_encrypted_device_token = old_encrypted_device_token;
1637 }
1638
1639 if let Some(encrypted_device_token) = new_encrypted_device_token {
1642 self.run_command_and_check_ok(&format_setmetadata(
1643 "INBOX",
1644 &encrypted_device_token,
1645 ))
1646 .await
1647 .context("SETMETADATA command failed")?;
1648
1649 context.push_subscribed.store(true, Ordering::Relaxed);
1650 }
1651 } else if !context.push_subscriber.heartbeat_subscribed().await {
1652 let context = context.clone();
1653 tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
1655 }
1656
1657 Ok(())
1658 }
1659}
1660
1661fn format_setmetadata(folder: &str, device_token: &str) -> String {
1662 let device_token_len = device_token.len();
1663 format!(
1664 "SETMETADATA \"{folder}\" (/private/devicetoken {{{device_token_len}+}}\r\n{device_token})"
1665 )
1666}
1667
1668impl Session {
1669 async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
1675 if flag == "\\Deleted" {
1676 self.selected_folder_needs_expunge = true;
1677 }
1678 let query = format!("+FLAGS ({flag})");
1679 let mut responses = self
1680 .uid_store(uid_set, &query)
1681 .await
1682 .with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
1683 while let Some(_response) = responses.try_next().await? {
1684 }
1686 Ok(())
1687 }
1688}
1689
1690impl Session {
1691 fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
1700 use UnsolicitedResponse::*;
1701 use async_imap::imap_proto::Response;
1702 use async_imap::imap_proto::ResponseCode;
1703
1704 let folder = self.selected_folder.as_deref().unwrap_or_default();
1705 let mut should_refetch = false;
1706 while let Ok(response) = self.unsolicited_responses.try_recv() {
1707 match response {
1708 Exists(_) => {
1709 info!(
1710 context,
1711 "Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
1712 );
1713 should_refetch = true;
1714 }
1715
1716 Expunge(_) | Recent(_) => {}
1717 Other(ref response_data) => {
1718 match response_data.parsed() {
1719 Response::Fetch { .. } => {
1720 info!(
1721 context,
1722 "Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
1723 );
1724 should_refetch = true;
1725 }
1726
1727 Response::Done {
1730 code: Some(ResponseCode::CopyUid(_, _, _)),
1731 ..
1732 } => {}
1733
1734 _ => {
1735 info!(context, "{folder:?}: got unsolicited response {response:?}")
1736 }
1737 }
1738 }
1739 _ => {
1740 info!(context, "{folder:?}: got unsolicited response {response:?}")
1741 }
1742 }
1743 }
1744 Ok(should_refetch)
1745 }
1746}
1747
1748async fn should_move_out_of_spam(
1749 context: &Context,
1750 headers: &[mailparse::MailHeader<'_>],
1751) -> Result<bool> {
1752 if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
1753 return Ok(true);
1764 }
1765
1766 if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
1767 if msg.chat_blocked != Blocked::Not {
1768 return Ok(false);
1770 }
1771 } else {
1772 let from = match mimeparser::get_from(headers) {
1773 Some(f) => f,
1774 None => return Ok(false),
1775 };
1776 let (from_id, blocked_contact, _origin) =
1778 match from_field_to_contact_id(context, &from, None, true, true)
1779 .await
1780 .context("from_field_to_contact_id")?
1781 {
1782 Some(res) => res,
1783 None => {
1784 warn!(
1785 context,
1786 "Contact with From address {:?} cannot exist, not moving out of spam", from
1787 );
1788 return Ok(false);
1789 }
1790 };
1791 if blocked_contact {
1792 return Ok(false);
1794 }
1795
1796 if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
1797 if chat_id_blocked.blocked != Blocked::Not {
1798 return Ok(false);
1799 }
1800 } else if from_id != ContactId::SELF {
1801 return Ok(false);
1803 }
1804 }
1805
1806 Ok(true)
1807}
1808
1809async fn spam_target_folder_cfg(
1814 context: &Context,
1815 headers: &[mailparse::MailHeader<'_>],
1816) -> Result<Option<Config>> {
1817 if !should_move_out_of_spam(context, headers).await? {
1818 return Ok(None);
1819 }
1820
1821 Ok(Some(Config::ConfiguredInboxFolder))
1822}
1823
1824pub async fn target_folder_cfg(
1827 context: &Context,
1828 folder: &str,
1829 folder_meaning: FolderMeaning,
1830 headers: &[mailparse::MailHeader<'_>],
1831) -> Result<Option<Config>> {
1832 if folder == "DeltaChat" {
1833 return Ok(None);
1834 }
1835
1836 if folder_meaning == FolderMeaning::Spam {
1837 spam_target_folder_cfg(context, headers).await
1838 } else {
1839 Ok(None)
1840 }
1841}
1842
1843pub async fn target_folder(
1844 context: &Context,
1845 folder: &str,
1846 folder_meaning: FolderMeaning,
1847 headers: &[mailparse::MailHeader<'_>],
1848) -> Result<String> {
1849 match target_folder_cfg(context, folder, folder_meaning, headers).await? {
1850 Some(config) => match context.get_config(config).await? {
1851 Some(target) => Ok(target),
1852 None => Ok(folder.to_string()),
1853 },
1854 None => Ok(folder.to_string()),
1855 }
1856}
1857
1858fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
1865 const SPAM_NAMES: &[&str] = &[
1867 "spam",
1868 "junk",
1869 "Correio electrónico não solicitado",
1870 "Correo basura",
1871 "Lixo",
1872 "Nettsøppel",
1873 "Nevyžádaná pošta",
1874 "No solicitado",
1875 "Ongewenst",
1876 "Posta indesiderata",
1877 "Skräp",
1878 "Wiadomości-śmieci",
1879 "Önemsiz",
1880 "Ανεπιθύμητα",
1881 "Спам",
1882 "垃圾邮件",
1883 "垃圾郵件",
1884 "迷惑メール",
1885 "스팸",
1886 ];
1887 const TRASH_NAMES: &[&str] = &[
1888 "Trash",
1889 "Bin",
1890 "Caixote do lixo",
1891 "Cestino",
1892 "Corbeille",
1893 "Papelera",
1894 "Papierkorb",
1895 "Papirkurv",
1896 "Papperskorgen",
1897 "Prullenbak",
1898 "Rubujo",
1899 "Κάδος απορριμμάτων",
1900 "Корзина",
1901 "Кошик",
1902 "ゴミ箱",
1903 "垃圾桶",
1904 "已删除邮件",
1905 "휴지통",
1906 ];
1907 let lower = folder_name.to_lowercase();
1908
1909 if lower == "inbox" {
1910 FolderMeaning::Inbox
1911 } else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1912 FolderMeaning::Spam
1913 } else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1914 FolderMeaning::Trash
1915 } else {
1916 FolderMeaning::Unknown
1917 }
1918}
1919
1920fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
1921 for attr in folder_attrs {
1922 match attr {
1923 NameAttribute::Trash => return FolderMeaning::Trash,
1924 NameAttribute::Junk => return FolderMeaning::Spam,
1925 NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
1926 NameAttribute::Extension(label) => {
1927 match label.as_ref() {
1928 "\\Spam" => return FolderMeaning::Spam,
1929 "\\Important" => return FolderMeaning::Virtual,
1930 _ => {}
1931 };
1932 }
1933 _ => {}
1934 }
1935 }
1936 FolderMeaning::Unknown
1937}
1938
1939pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
1940 match get_folder_meaning_by_attrs(folder.attributes()) {
1941 FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
1942 meaning => meaning,
1943 }
1944}
1945
1946fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader<'_>>> {
1948 match prefetch_msg.header() {
1949 Some(header_bytes) => {
1950 let (headers, _) = mailparse::parse_headers(header_bytes)?;
1951 Ok(headers)
1952 }
1953 None => Ok(Vec::new()),
1954 }
1955}
1956
1957pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
1958 headers
1959 .get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
1960 .or_else(|| headers.get_header_value(HeaderDef::MessageId))
1961 .and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
1962}
1963
1964pub(crate) fn create_message_id() -> String {
1965 format!("{}{}", GENERATED_PREFIX, create_id())
1966}
1967
1968pub(crate) async fn prefetch_should_download(
1970 context: &Context,
1971 headers: &[mailparse::MailHeader<'_>],
1972 message_id: &str,
1973 mut flags: impl Iterator<Item = Flag<'_>>,
1974) -> Result<bool> {
1975 if message::rfc724_mid_download_tried(context, message_id).await? {
1976 if let Some(from) = mimeparser::get_from(headers)
1977 && context.is_self_addr(&from.addr).await?
1978 {
1979 markseen_on_imap_table(context, message_id).await?;
1980 }
1981 return Ok(false);
1982 }
1983
1984 let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
1988 let from = from.to_ascii_lowercase();
1989 from.contains("mailer-daemon") || from.contains("mail-daemon")
1990 } else {
1991 false
1992 };
1993
1994 let from = match mimeparser::get_from(headers) {
1995 Some(f) => f,
1996 None => return Ok(false),
1997 };
1998 let (_from_id, blocked_contact, _origin) =
1999 match from_field_to_contact_id(context, &from, None, true, true).await? {
2000 Some(res) => res,
2001 None => return Ok(false),
2002 };
2003 let is_legacy_securejoin = headers.get_header_value(HeaderDef::SecureJoin).is_some();
2009
2010 let is_encrypted = headers
2011 .get_header_value(HeaderDef::ContentType)
2012 .is_some_and(|content_type| {
2013 mailparse::parse_content_type(&content_type).mimetype == "multipart/encrypted"
2014 });
2015
2016 if flags.any(|f| f == Flag::Draft) {
2017 info!(context, "Ignoring draft message");
2018 return Ok(false);
2019 }
2020
2021 let should_download = maybe_ndn
2022 || (!blocked_contact
2023 && (is_legacy_securejoin
2024 || is_encrypted
2025 || !context.get_config_bool(Config::ForceEncryption).await?));
2026 Ok(should_download)
2027}
2028
2029async fn mark_seen_by_uid(
2033 context: &Context,
2034 transport_id: u32,
2035 folder: &str,
2036 uid_validity: u32,
2037 uid: u32,
2038) -> Result<Option<ChatId>> {
2039 if let Some((msg_id, chat_id)) = context
2040 .sql
2041 .query_row_optional(
2042 "SELECT id, chat_id FROM msgs
2043 WHERE id > 9 AND rfc724_mid IN (
2044 SELECT rfc724_mid FROM imap
2045 WHERE transport_id=?
2046 AND folder=?
2047 AND uidvalidity=?
2048 AND uid=?
2049 LIMIT 1
2050 )",
2051 (transport_id, &folder, uid_validity, uid),
2052 |row| {
2053 let msg_id: MsgId = row.get(0)?;
2054 let chat_id: ChatId = row.get(1)?;
2055 Ok((msg_id, chat_id))
2056 },
2057 )
2058 .await
2059 .with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
2060 {
2061 let updated = context
2062 .sql
2063 .execute(
2064 "UPDATE msgs SET state=?1
2065 WHERE (state=?2 OR state=?3)
2066 AND id=?4",
2067 (
2068 MessageState::InSeen,
2069 MessageState::InFresh,
2070 MessageState::InNoticed,
2071 msg_id,
2072 ),
2073 )
2074 .await
2075 .with_context(|| format!("failed to update msg {msg_id} state"))?
2076 > 0;
2077
2078 if updated {
2079 msg_id
2080 .start_ephemeral_timer(context)
2081 .await
2082 .with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
2083 Ok(Some(chat_id))
2084 } else {
2085 Ok(None)
2087 }
2088 } else {
2089 Ok(None)
2091 }
2092}
2093
2094pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
2097 context
2098 .sql
2099 .execute(
2100 "INSERT OR IGNORE INTO imap_markseen (id)
2101 SELECT id FROM imap WHERE rfc724_mid=?",
2102 (message_id,),
2103 )
2104 .await?;
2105 context.scheduler.interrupt_inbox().await;
2106
2107 Ok(())
2108}
2109
2110pub(crate) async fn set_uid_next(
2114 context: &Context,
2115 transport_id: u32,
2116 folder: &str,
2117 uid_next: u32,
2118) -> Result<()> {
2119 context
2120 .sql
2121 .execute(
2122 "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
2123 ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
2124 (transport_id, folder, uid_next),
2125 )
2126 .await?;
2127 Ok(())
2128}
2129
2130async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2136 Ok(context
2137 .sql
2138 .query_get_value(
2139 "SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
2140 (transport_id, folder),
2141 )
2142 .await?
2143 .unwrap_or(0))
2144}
2145
2146pub(crate) async fn set_uidvalidity(
2147 context: &Context,
2148 transport_id: u32,
2149 folder: &str,
2150 uidvalidity: u32,
2151) -> Result<()> {
2152 context
2153 .sql
2154 .execute(
2155 "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
2156 ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
2157 (transport_id, folder, uidvalidity),
2158 )
2159 .await?;
2160 Ok(())
2161}
2162
2163async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2164 Ok(context
2165 .sql
2166 .query_get_value(
2167 "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
2168 (transport_id, folder),
2169 )
2170 .await?
2171 .unwrap_or(0))
2172}
2173
2174pub(crate) async fn set_modseq(
2175 context: &Context,
2176 transport_id: u32,
2177 folder: &str,
2178 modseq: u64,
2179) -> Result<()> {
2180 context
2181 .sql
2182 .execute(
2183 "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
2184 ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
2185 (transport_id, folder, modseq),
2186 )
2187 .await?;
2188 Ok(())
2189}
2190
2191async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
2192 Ok(context
2193 .sql
2194 .query_get_value(
2195 "SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
2196 (transport_id, folder),
2197 )
2198 .await?
2199 .unwrap_or(0))
2200}
2201
2202#[expect(clippy::arithmetic_side_effects)]
2206fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
2207 let mut ranges: Vec<UidRange> = vec![];
2209
2210 for ¤t in uids {
2211 if let Some(last) = ranges.last_mut()
2212 && last.end + 1 == current
2213 {
2214 last.end = current;
2215 continue;
2216 }
2217
2218 ranges.push(UidRange {
2219 start: current,
2220 end: current,
2221 });
2222 }
2223
2224 let mut result = vec![];
2226 let (mut last_uids, mut last_str) = (Vec::new(), String::new());
2227 for range in ranges {
2228 last_uids.reserve((range.end - range.start + 1).try_into()?);
2229 (range.start..=range.end).for_each(|u| last_uids.push(u));
2230 if !last_str.is_empty() {
2231 last_str.push(',');
2232 }
2233 last_str.push_str(&range.to_string());
2234
2235 if last_str.len() > 990 {
2236 result.push((take(&mut last_uids), take(&mut last_str)));
2237 }
2238 }
2239 result.push((last_uids, last_str));
2240
2241 result.retain(|(_, s)| !s.is_empty());
2242 Ok(result)
2243}
2244
2245struct UidRange {
2246 start: u32,
2247 end: u32,
2248 }
2250
2251impl std::fmt::Display for UidRange {
2252 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2253 if self.start == self.end {
2254 write!(f, "{}", self.start)
2255 } else {
2256 write!(f, "{}:{}", self.start, self.end)
2257 }
2258 }
2259}
2260
2261#[cfg(test)]
2262mod imap_tests;