1use std::{
7 cmp::max,
8 cmp::min,
9 collections::{BTreeMap, BTreeSet, HashMap},
10 iter::Peekable,
11 mem::take,
12 sync::atomic::Ordering,
13 time::{Duration, UNIX_EPOCH},
14};
15
16use anyhow::{Context as _, Result, bail, ensure, format_err};
17use async_channel::{self, Receiver, Sender};
18use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
19use futures::{FutureExt as _, TryStreamExt};
20use futures_lite::FutureExt;
21use ratelimit::Ratelimit;
22use url::Url;
23
24use crate::calls::{
25 UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata,
26};
27use crate::chat::{self, ChatId, ChatIdBlocked, add_device_msg};
28use crate::chatlist_events;
29use crate::config::Config;
30use crate::constants::{Blocked, DC_VERSION_STR};
31use crate::contact::ContactId;
32use crate::context::Context;
33use crate::ensure_and_debug_assert;
34use crate::events::EventType;
35use crate::headerdef::{HeaderDef, HeaderDefMap};
36use crate::log::{LogExt, warn};
37use crate::message::{self, Message, MessageState, MsgId};
38use crate::mimeparser;
39use crate::net::proxy::ProxyConfig;
40use crate::net::session::SessionStream;
41use crate::oauth2::get_oauth2_access_token;
42use crate::push::encrypt_device_token;
43use crate::receive_imf::{
44 ReceivedMsg, from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner,
45};
46use crate::scheduler::connectivity::ConnectivityStore;
47use crate::stock_str;
48use crate::tools::{self, create_id, duration_to_str, time};
49use crate::transport::{
50 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
51};
52
53pub(crate) mod capabilities;
54mod client;
55mod idle;
56pub mod select_folder;
57pub(crate) mod session;
58
59use client::{Client, determine_capabilities};
60use session::Session;
61
62pub(crate) const GENERATED_PREFIX: &str = "GEN_";
63
64const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
65 MESSAGE-ID \
66 X-MICROSOFT-ORIGINAL-MESSAGE-ID\
67 )])";
68const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
69
70#[derive(Debug)]
71pub(crate) struct Imap {
72 transport_id: u32,
76
77 pub(crate) idle_interrupt_receiver: Receiver<()>,
78
79 pub(crate) addr: String,
81
82 lp: Vec<ConfiguredServerLoginParam>,
84
85 password: String,
87
88 proxy_config: Option<ProxyConfig>,
90
91 strict_tls: bool,
92
93 oauth2: bool,
94
95 pub(crate) folder: String,
97
98 authentication_failed_once: bool,
99
100 pub(crate) connectivity: ConnectivityStore,
101
102 conn_last_try: tools::Time,
103 conn_backoff_ms: u64,
104
105 ratelimit: Ratelimit,
113
114 pub(crate) resync_request_sender: async_channel::Sender<()>,
116
117 pub(crate) resync_request_receiver: async_channel::Receiver<()>,
119}
120
121#[derive(Debug)]
122struct OAuth2 {
123 user: String,
124 access_token: String,
125}
126
127#[derive(Debug, Default)]
128pub(crate) struct ServerMetadata {
129 pub comment: Option<String>,
132
133 pub admin: Option<String>,
136
137 pub iroh_relay: Option<Url>,
138
139 pub ice_servers: Vec<UnresolvedIceServer>,
141
142 pub ice_servers_expiration_timestamp: i64,
149}
150
151impl async_imap::Authenticator for OAuth2 {
152 type Response = String;
153
154 fn process(&mut self, _data: &[u8]) -> Self::Response {
155 format!(
156 "user={}\x01auth=Bearer {}\x01\x01",
157 self.user, self.access_token
158 )
159 }
160}
161
162#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
163pub enum FolderMeaning {
164 Unknown,
165
166 Spam,
168 Inbox,
169 Trash,
170
171 Virtual,
178}
179
180struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
181 inner: Peekable<T>,
182}
183
184impl<T, I> From<I> for UidGrouper<T>
185where
186 T: Iterator<Item = (i64, u32, String)>,
187 I: IntoIterator<IntoIter = T>,
188{
189 fn from(inner: I) -> Self {
190 Self {
191 inner: inner.into_iter().peekable(),
192 }
193 }
194}
195
196impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
197 type Item = (String, Vec<i64>, String);
199
200 #[expect(clippy::arithmetic_side_effects)]
201 fn next(&mut self) -> Option<Self::Item> {
202 let (_, _, folder) = self.inner.peek().cloned()?;
203
204 let mut uid_set = String::new();
205 let mut rowid_set = Vec::new();
206
207 while uid_set.len() < 1000 {
208 if let Some((start_rowid, start_uid, _)) = self
210 .inner
211 .next_if(|(_, _, start_folder)| start_folder == &folder)
212 {
213 rowid_set.push(start_rowid);
214 let mut end_uid = start_uid;
215
216 while let Some((next_rowid, next_uid, _)) =
217 self.inner.next_if(|(_, next_uid, next_folder)| {
218 next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
219 })
220 {
221 end_uid = next_uid;
222 rowid_set.push(next_rowid);
223 }
224
225 let uid_range = UidRange {
226 start: start_uid,
227 end: end_uid,
228 };
229 if !uid_set.is_empty() {
230 uid_set.push(',');
231 }
232 uid_set.push_str(&uid_range.to_string());
233 } else {
234 break;
235 }
236 }
237
238 Some((folder, rowid_set, uid_set))
239 }
240}
241
242impl Imap {
243 pub async fn new(
245 context: &Context,
246 transport_id: u32,
247 param: ConfiguredLoginParam,
248 idle_interrupt_receiver: Receiver<()>,
249 ) -> Result<Self> {
250 let lp = param.imap.clone();
251 let password = param.imap_password.clone();
252 let proxy_config = ProxyConfig::load(context).await?;
253 let addr = ¶m.addr;
254 let strict_tls = param.strict_tls(proxy_config.is_some());
255 let oauth2 = param.oauth2;
256 let folder = param
257 .imap_folder
258 .clone()
259 .unwrap_or_else(|| "INBOX".to_string());
260 ensure_and_debug_assert!(!folder.is_empty(), "Watched folder name cannot be empty");
261 let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
262 Ok(Imap {
263 transport_id,
264 idle_interrupt_receiver,
265 addr: addr.to_string(),
266 lp,
267 password,
268 proxy_config,
269 strict_tls,
270 oauth2,
271 folder,
272 authentication_failed_once: false,
273 connectivity: Default::default(),
274 conn_last_try: UNIX_EPOCH,
275 conn_backoff_ms: 0,
276 ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
278 resync_request_sender,
279 resync_request_receiver,
280 })
281 }
282
283 pub async fn new_configured(
285 context: &Context,
286 idle_interrupt_receiver: Receiver<()>,
287 ) -> Result<Self> {
288 let (transport_id, param) = ConfiguredLoginParam::load(context)
289 .await?
290 .context("Not configured")?;
291 let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
292 Ok(imap)
293 }
294
295 pub fn transport_id(&self) -> u32 {
297 self.transport_id
298 }
299
300 pub(crate) async fn connect(
306 &mut self,
307 context: &Context,
308 configuring: bool,
309 ) -> Result<Session> {
310 let now = tools::Time::now();
311 let until_can_send = max(
312 min(self.conn_last_try, now)
313 .checked_add(Duration::from_millis(self.conn_backoff_ms))
314 .unwrap_or(now),
315 now,
316 )
317 .duration_since(now)?;
318 let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
319 if !ratelimit_duration.is_zero() {
320 warn!(
321 context,
322 "IMAP got rate limited, waiting for {} until can connect.",
323 duration_to_str(ratelimit_duration),
324 );
325 let interrupted = async {
326 tokio::time::sleep(ratelimit_duration).await;
327 false
328 }
329 .race(self.idle_interrupt_receiver.recv().map(|_| true))
330 .await;
331 if interrupted {
332 info!(
333 context,
334 "Connecting to IMAP without waiting for ratelimit due to interrupt."
335 );
336 }
337 }
338
339 info!(context, "Connecting to IMAP server.");
340 self.connectivity.set_connecting(context);
341
342 self.conn_last_try = tools::Time::now();
343 const BACKOFF_MIN_MS: u64 = 2000;
344 const BACKOFF_MAX_MS: u64 = 80_000;
345 self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
346 self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(rand::random_range(
347 (self.conn_backoff_ms / 2)..=self.conn_backoff_ms,
348 ));
349 self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
350
351 let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
352 let mut first_error = None;
353 for lp in login_params {
354 info!(context, "IMAP trying to connect to {}.", &lp.connection);
355 let connection_candidate = lp.connection.clone();
356 let client = match Client::connect(
357 context,
358 self.proxy_config.clone(),
359 self.strict_tls,
360 &connection_candidate,
361 )
362 .await
363 .with_context(|| format!("IMAP failed to connect to {connection_candidate}"))
364 {
365 Ok(client) => client,
366 Err(err) => {
367 warn!(context, "{err:#}.");
368 first_error.get_or_insert(err);
369 continue;
370 }
371 };
372
373 self.conn_backoff_ms = BACKOFF_MIN_MS;
374 self.ratelimit.send();
375
376 let imap_user: &str = lp.user.as_ref();
377 let imap_pw: &str = &self.password;
378
379 let login_res = if self.oauth2 {
380 info!(context, "Logging into IMAP server with OAuth 2.");
381 let addr: &str = self.addr.as_ref();
382
383 let token = get_oauth2_access_token(context, addr, imap_pw, true)
384 .await?
385 .context("IMAP could not get OAUTH token")?;
386 let auth = OAuth2 {
387 user: imap_user.into(),
388 access_token: token,
389 };
390 client.authenticate("XOAUTH2", auth).await
391 } else {
392 info!(context, "Logging into IMAP server with LOGIN.");
393 client.login(imap_user, imap_pw).await
394 };
395
396 match login_res {
397 Ok(mut session) => {
398 let capabilities = determine_capabilities(&mut session).await?;
399 let resync_request_sender = self.resync_request_sender.clone();
400
401 let session = if capabilities.can_compress {
402 info!(context, "Enabling IMAP compression.");
403 let compressed_session = session
404 .compress(|s| {
405 let session_stream: Box<dyn SessionStream> = Box::new(s);
406 session_stream
407 })
408 .await
409 .context("Failed to enable IMAP compression")?;
410 Session::new(
411 compressed_session,
412 capabilities,
413 resync_request_sender,
414 self.transport_id,
415 )
416 } else {
417 Session::new(
418 session,
419 capabilities,
420 resync_request_sender,
421 self.transport_id,
422 )
423 };
424
425 let mut lock = context.server_id.write().await;
427 lock.clone_from(&session.capabilities.server_id);
428
429 self.authentication_failed_once = false;
430 context.emit_event(EventType::ImapConnected(format!(
431 "IMAP-LOGIN as {}",
432 lp.user
433 )));
434 self.connectivity.set_preparing(context);
435 info!(context, "Successfully logged into IMAP server.");
436 return Ok(session);
437 }
438
439 Err(err) => {
440 let imap_user = lp.user.to_owned();
441 let message = stock_str::cannot_login(context, &imap_user);
442
443 warn!(context, "IMAP failed to login: {err:#}.");
444 first_error.get_or_insert(format_err!("{message} ({err:#})"));
445
446 let _lock = context.wrong_pw_warning_mutex.lock().await;
448 if err.to_string().to_lowercase().contains("authentication") {
449 if self.authentication_failed_once
450 && !configuring
451 && context.get_config_bool(Config::NotifyAboutWrongPw).await?
452 {
453 let mut msg = Message::new_text(message);
454 if let Err(e) = chat::add_device_msg_with_importance(
455 context,
456 None,
457 Some(&mut msg),
458 true,
459 )
460 .await
461 {
462 warn!(context, "Failed to add device message: {e:#}.");
463 } else {
464 context
465 .set_config_internal(Config::NotifyAboutWrongPw, None)
466 .await
467 .log_err(context)
468 .ok();
469 }
470 } else {
471 self.authentication_failed_once = true;
472 }
473 } else {
474 self.authentication_failed_once = false;
475 }
476 }
477 }
478 }
479
480 Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
481 }
482
483 pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
488 let configuring = false;
489 let session = match self.connect(context, configuring).await {
490 Ok(session) => session,
491 Err(err) => {
492 self.connectivity.set_err(context, format!("{err:#}"));
493 return Err(err);
494 }
495 };
496
497 Ok(session)
498 }
499
500 pub async fn fetch_move_delete(
505 &mut self,
506 context: &Context,
507 session: &mut Session,
508 watch_folder: &str,
509 ) -> Result<()> {
510 ensure_and_debug_assert!(!watch_folder.is_empty(), "Watched folder cannot be empty");
511 if !context.sql.is_open().await {
512 bail!("IMAP operation attempted while it is torn down");
514 }
515
516 let msgs_fetched = self
517 .fetch_new_messages(context, session, watch_folder)
518 .await
519 .context("fetch_new_messages")?;
520 if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
521 context.scheduler.interrupt_ephemeral_task().await;
526 }
527
528 session
529 .move_delete_messages(context, watch_folder)
530 .await
531 .context("move_delete_messages")?;
532
533 Ok(())
534 }
535
536 #[expect(clippy::arithmetic_side_effects)]
540 pub(crate) async fn fetch_new_messages(
541 &mut self,
542 context: &Context,
543 session: &mut Session,
544 folder: &str,
545 ) -> Result<bool> {
546 let transport_id = session.transport_id();
547
548 let folder_exists = session
549 .select_with_uidvalidity(context, folder)
550 .await
551 .with_context(|| format!("Failed to select folder {folder:?}"))?;
552
553 if !session.new_mail {
554 info!(
555 context,
556 "Transport {transport_id}: No new emails in folder {folder:?}."
557 );
558 return Ok(false);
559 }
560 session.new_mail = false;
563
564 if !folder_exists {
565 return Ok(false);
566 }
567
568 let mut read_cnt = 0;
569 loop {
570 let (n, fetch_more) =
571 Box::pin(self.fetch_new_msg_batch(context, session, folder)).await?;
572 read_cnt += n;
573 if !fetch_more {
574 return Ok(read_cnt > 0);
575 }
576 }
577 }
578
579 #[expect(clippy::arithmetic_side_effects)]
581 async fn fetch_new_msg_batch(
582 &mut self,
583 context: &Context,
584 session: &mut Session,
585 folder: &str,
586 ) -> Result<(usize, bool)> {
587 let transport_id = self.transport_id;
588 let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
589 let old_uid_next = get_uid_next(context, transport_id, folder).await?;
590 info!(
591 context,
592 "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
593 );
594
595 let uids_to_prefetch = 500;
596 let msgs = session
597 .prefetch(old_uid_next, uids_to_prefetch)
598 .await
599 .context("prefetch")?;
600 let read_cnt = msgs.len();
601 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
602
603 let mut uids_fetch: Vec<u32> = Vec::new();
604 let mut available_post_msgs: Vec<String> = Vec::new();
605 let mut download_later: Vec<String> = Vec::new();
606 let mut uid_message_ids = BTreeMap::new();
607 let mut largest_uid_skipped = None;
608
609 let download_limit: Option<u32> = context
610 .get_config_parsed(Config::DownloadLimit)
611 .await?
612 .filter(|&l| 0 < l);
613
614 for (uid, ref fetch_response) in msgs {
616 let headers = match get_fetch_headers(fetch_response) {
617 Ok(headers) => headers,
618 Err(err) => {
619 warn!(context, "Failed to parse FETCH headers: {err:#}.");
620 continue;
621 }
622 };
623
624 let message_id = prefetch_get_message_id(&headers);
625 let size = fetch_response
626 .size
627 .context("imap fetch response does not contain size")?;
628
629 let delete = if let Some(message_id) = &message_id {
640 message::rfc724_mid_exists_ex(context, message_id, "deleted=1")
641 .await?
642 .is_some_and(|(_msg_id, deleted)| deleted)
643 } else {
644 false
645 };
646
647 let message_id = message_id.unwrap_or_else(create_message_id);
650
651 if delete {
652 info!(context, "Deleting locally deleted message {message_id}.");
653 }
654
655 let target = if delete { "" } else { folder };
656
657 context
658 .sql
659 .execute(
660 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
661 VALUES (?, ?, ?, ?, ?, ?)
662 ON CONFLICT(transport_id, folder, uid, uidvalidity)
663 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
664 target=excluded.target",
665 (
666 self.transport_id,
667 &message_id,
668 &folder,
669 uid,
670 uid_validity,
671 target,
672 ),
673 )
674 .await?;
675
676 if folder == target
683 && prefetch_should_download(context, &headers, &message_id, fetch_response.flags())
684 .await
685 .context("prefetch_should_download")?
686 {
687 if headers
688 .get_header_value(HeaderDef::ChatIsPostMessage)
689 .is_some()
690 {
691 info!(context, "{message_id:?} is a post-message.");
692 available_post_msgs.push(message_id.clone());
693
694 let is_bot = context.get_config_bool(Config::Bot).await?;
695 if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
696 {
697 uids_fetch.push(uid);
698 uid_message_ids.insert(uid, message_id);
699 } else {
700 if download_limit.is_none_or(|download_limit| size <= download_limit) {
701 download_later.push(message_id.clone());
704 }
705 largest_uid_skipped = Some(uid);
706 }
707 } else {
708 info!(context, "{message_id:?} is not a post-message.");
709 if download_limit.is_none_or(|download_limit| size <= download_limit) {
710 uids_fetch.push(uid);
711 uid_message_ids.insert(uid, message_id);
712 } else {
713 download_later.push(message_id.clone());
714 largest_uid_skipped = Some(uid);
715 }
716 };
717 } else {
718 largest_uid_skipped = Some(uid);
719 }
720 }
721
722 if !uids_fetch.is_empty() {
723 self.connectivity.set_working(context);
724 }
725
726 let (sender, receiver) = async_channel::unbounded();
727
728 let mut received_msgs = Vec::with_capacity(uids_fetch.len());
729 let mailbox_uid_next = session
730 .selected_mailbox
731 .as_ref()
732 .with_context(|| format!("Expected {folder:?} to be selected"))?
733 .uid_next
734 .unwrap_or_default();
735
736 let update_uids_future = async {
737 let mut largest_uid_fetched: u32 = 0;
738
739 while let Ok((uid, received_msg_opt)) = receiver.recv().await {
740 largest_uid_fetched = max(largest_uid_fetched, uid);
741 if let Some(received_msg) = received_msg_opt {
742 received_msgs.push(received_msg)
743 }
744 }
745
746 largest_uid_fetched
747 };
748
749 let actually_download_messages_future = async {
750 session
751 .fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
752 .await
753 .context("fetch_many_msgs")
754 };
755
756 let (largest_uid_fetched, fetch_res) =
757 tokio::join!(update_uids_future, actually_download_messages_future);
758
759 let mut new_uid_next = largest_uid_fetched + 1;
765 let fetch_more = fetch_res.is_ok() && {
766 let prefetch_uid_next = old_uid_next + uids_to_prefetch;
767 new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
771
772 new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
773
774 prefetch_uid_next < mailbox_uid_next
775 };
776 if new_uid_next > old_uid_next {
777 set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
778 }
779
780 info!(context, "{} mails read from \"{}\".", read_cnt, folder);
781
782 if !received_msgs.is_empty() {
783 context.emit_event(EventType::IncomingMsgBunch);
784 }
785
786 chat::mark_old_messages_as_noticed(context, received_msgs).await?;
787
788 if fetch_res.is_ok() {
789 info!(
790 context,
791 "available_post_msgs: {}, download_later: {}.",
792 available_post_msgs.len(),
793 download_later.len(),
794 );
795 let trans_fn = |t: &mut rusqlite::Transaction| {
796 let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
797 for rfc724_mid in available_post_msgs {
798 stmt.execute((rfc724_mid,))
799 .context("INSERT OR IGNORE INTO available_post_msgs")?;
800 }
801 let mut stmt =
802 t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
803 for rfc724_mid in download_later {
804 stmt.execute((rfc724_mid,))
805 .context("INSERT OR IGNORE INTO download")?;
806 }
807 Ok(())
808 };
809 context.sql.transaction(trans_fn).await?;
810 }
811
812 fetch_res?;
815
816 Ok((read_cnt, fetch_more))
817 }
818}
819
820impl Session {
821 pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
823 let all_folders = self
824 .list_folders()
825 .await
826 .context("listing folders for resync")?;
827 for folder in all_folders {
828 let folder_meaning = get_folder_meaning(&folder);
829 if !matches!(
830 folder_meaning,
831 FolderMeaning::Virtual | FolderMeaning::Unknown
832 ) {
833 self.resync_folder_uids(context, folder.name(), folder_meaning)
834 .await?;
835 }
836 }
837 Ok(())
838 }
839
840 pub(crate) async fn resync_folder_uids(
847 &mut self,
848 context: &Context,
849 folder: &str,
850 folder_meaning: FolderMeaning,
851 ) -> Result<()> {
852 let uid_validity;
853 let mut msgs = BTreeMap::new();
855
856 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
857 let transport_id = self.transport_id();
858 if folder_exists {
859 let mut list = self
860 .uid_fetch("1:*", RFC724MID_UID)
861 .await
862 .with_context(|| format!("Can't resync folder {folder}"))?;
863 while let Some(fetch) = list.try_next().await? {
864 let headers = match get_fetch_headers(&fetch) {
865 Ok(headers) => headers,
866 Err(err) => {
867 warn!(context, "Failed to parse FETCH headers: {}", err);
868 continue;
869 }
870 };
871 let message_id = prefetch_get_message_id(&headers);
872
873 if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
874 msgs.insert(
875 uid,
876 (
877 rfc724_mid,
878 target_folder(context, folder, folder_meaning, &headers).await?,
879 ),
880 );
881 }
882 }
883
884 info!(
885 context,
886 "resync_folder_uids: Collected {} message IDs in {folder}.",
887 msgs.len(),
888 );
889
890 uid_validity = get_uidvalidity(context, transport_id, folder).await?;
891 } else {
892 warn!(context, "resync_folder_uids: No folder {folder}.");
893 uid_validity = 0;
894 }
895
896 context
898 .sql
899 .transaction(move |transaction| {
900 transaction.execute("DELETE FROM imap WHERE transport_id=? AND folder=?", (transport_id, folder,))?;
901 for (uid, (rfc724_mid, target)) in &msgs {
902 transaction.execute(
905 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
906 VALUES (?, ?, ?, ?, ?, ?)
907 ON CONFLICT(transport_id, folder, uid, uidvalidity)
908 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
909 target=excluded.target",
910 (transport_id, rfc724_mid, folder, uid, uid_validity, target),
911 )?;
912 }
913 Ok(())
914 })
915 .await?;
916 Ok(())
917 }
918
919 async fn delete_message_batch(
922 &mut self,
923 context: &Context,
924 uid_set: &str,
925 row_ids: Vec<i64>,
926 ) -> Result<()> {
927 self.add_flag_finalized_with_set(uid_set, "\\Deleted")
929 .await?;
930 context
931 .sql
932 .transaction(|transaction| {
933 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
934 for row_id in row_ids {
935 stmt.execute((row_id,))?;
936 }
937 Ok(())
938 })
939 .await
940 .context("Cannot remove deleted messages from imap table")?;
941
942 context.emit_event(EventType::ImapMessageDeleted(format!(
943 "IMAP messages {uid_set} marked as deleted"
944 )));
945 Ok(())
946 }
947
948 async fn move_message_batch(
951 &mut self,
952 context: &Context,
953 set: &str,
954 row_ids: Vec<i64>,
955 target: &str,
956 ) -> Result<()> {
957 if self.can_move() {
958 match self.uid_mv(set, &target).await {
959 Ok(()) => {
960 context
962 .sql
963 .transaction(|transaction| {
964 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
965 for row_id in row_ids {
966 stmt.execute((row_id,))?;
967 }
968 Ok(())
969 })
970 .await
971 .context("Cannot delete moved messages from imap table")?;
972 context.emit_event(EventType::ImapMessageMoved(format!(
973 "IMAP messages {set} moved to {target}"
974 )));
975 return Ok(());
976 }
977 Err(err) => {
978 warn!(
979 context,
980 "Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
981 set,
982 target,
983 err
984 );
985 }
986 }
987 }
988
989 info!(
992 context,
993 "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
994 );
995 self.uid_copy(&set, &target).await?;
996 context
997 .sql
998 .transaction(|transaction| {
999 let mut stmt = transaction.prepare("UPDATE imap SET target='' WHERE id = ?")?;
1000 for row_id in row_ids {
1001 stmt.execute((row_id,))?;
1002 }
1003 Ok(())
1004 })
1005 .await
1006 .context("Cannot plan deletion of messages")?;
1007 context.emit_event(EventType::ImapMessageMoved(format!(
1008 "IMAP messages {set} copied to {target}"
1009 )));
1010 Ok(())
1011 }
1012
1013 async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
1017 let transport_id = self.transport_id();
1018 let rows = context
1019 .sql
1020 .query_map_vec(
1021 "SELECT id, uid, target FROM imap
1022 WHERE folder = ?
1023 AND transport_id = ?
1024 AND target != folder
1025 ORDER BY target, uid",
1026 (folder, transport_id),
1027 |row| {
1028 let rowid: i64 = row.get(0)?;
1029 let uid: u32 = row.get(1)?;
1030 let target: String = row.get(2)?;
1031 Ok((rowid, uid, target))
1032 },
1033 )
1034 .await?;
1035
1036 for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
1037 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1042 ensure!(folder_exists, "No folder {folder}");
1043
1044 if target.is_empty() {
1046 self.delete_message_batch(context, &uid_set, rowid_set)
1047 .await
1048 .with_context(|| format!("cannot delete batch of messages {uid_set:?}"))?;
1049 } else {
1050 self.move_message_batch(context, &uid_set, rowid_set, &target)
1051 .await
1052 .with_context(|| {
1053 format!("cannot move batch of messages {uid_set:?} to folder {target:?}",)
1054 })?;
1055 }
1056 }
1057
1058 if let Err(err) = self.maybe_close_folder(context).await {
1061 warn!(context, "Failed to close folder: {err:#}.");
1062 }
1063
1064 Ok(())
1065 }
1066
1067 pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
1069 if context.get_config_bool(Config::TeamProfile).await? {
1070 return Ok(());
1071 }
1072
1073 context
1074 .sql
1075 .execute(
1076 "DELETE FROM imap_markseen WHERE id NOT IN (SELECT imap.id FROM imap)",
1077 (),
1078 )
1079 .await?;
1080
1081 let transport_id = self.transport_id();
1082 let mut rows = context
1083 .sql
1084 .query_map_vec(
1085 "SELECT imap.id, uid, folder FROM imap, imap_markseen
1086 WHERE imap.id = imap_markseen.id
1087 AND imap.transport_id=?
1088 AND target = folder",
1089 (transport_id,),
1090 |row| {
1091 let rowid: i64 = row.get(0)?;
1092 let uid: u32 = row.get(1)?;
1093 let folder: String = row.get(2)?;
1094 Ok((rowid, uid, folder))
1095 },
1096 )
1097 .await?;
1098
1099 rows.sort_unstable_by(|(_rowid1, uid1, folder1), (_rowid2, uid2, folder2)| {
1106 (folder1, uid1).cmp(&(folder2, uid2))
1107 });
1108
1109 for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
1110 let folder_exists = match self.select_with_uidvalidity(context, &folder).await {
1111 Err(err) => {
1112 warn!(
1113 context,
1114 "store_seen_flags_on_imap: Failed to select {folder}, will retry later: {err:#}."
1115 );
1116 continue;
1117 }
1118 Ok(folder_exists) => folder_exists,
1119 };
1120 if !folder_exists {
1121 warn!(context, "store_seen_flags_on_imap: No folder {folder}.");
1122 } else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
1123 warn!(
1124 context,
1125 "Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}."
1126 );
1127 continue;
1128 } else {
1129 info!(
1130 context,
1131 "Marked messages {} in folder {} as seen.", uid_set, folder
1132 );
1133 }
1134 context
1135 .sql
1136 .transaction(|transaction| {
1137 let mut stmt = transaction.prepare("DELETE FROM imap_markseen WHERE id = ?")?;
1138 for rowid in rowid_set {
1139 stmt.execute((rowid,))?;
1140 }
1141 Ok(())
1142 })
1143 .await
1144 .context("Cannot remove messages marked as seen from imap_markseen table")?;
1145 }
1146
1147 Ok(())
1148 }
1149
1150 pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
1152 if !self.can_condstore() {
1153 info!(
1154 context,
1155 "Server does not support CONDSTORE, skipping flag synchronization."
1156 );
1157 return Ok(());
1158 }
1159
1160 if context.get_config_bool(Config::TeamProfile).await? {
1161 return Ok(());
1162 }
1163
1164 let folder_exists = self
1165 .select_with_uidvalidity(context, folder)
1166 .await
1167 .context("Failed to select folder")?;
1168 if !folder_exists {
1169 return Ok(());
1170 }
1171
1172 let mailbox = self
1173 .selected_mailbox
1174 .as_ref()
1175 .with_context(|| format!("No mailbox selected, folder: {folder}"))?;
1176
1177 if mailbox.highest_modseq.is_none() {
1180 info!(
1181 context,
1182 "Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
1183 );
1184 return Ok(());
1185 }
1186
1187 let transport_id = self.transport_id();
1188 let mut updated_chat_ids = BTreeSet::new();
1189 let uid_validity = get_uidvalidity(context, transport_id, folder)
1190 .await
1191 .with_context(|| format!("failed to get UID validity for folder {folder}"))?;
1192 let mut highest_modseq = get_modseq(context, transport_id, folder)
1193 .await
1194 .with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
1195 let mut list = self
1196 .uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
1197 .await
1198 .context("failed to fetch flags")?;
1199
1200 let mut got_unsolicited_fetch = false;
1201
1202 while let Some(fetch) = list
1203 .try_next()
1204 .await
1205 .context("failed to get FETCH result")?
1206 {
1207 let uid = if let Some(uid) = fetch.uid {
1208 uid
1209 } else {
1210 info!(context, "FETCH result contains no UID, skipping");
1211 got_unsolicited_fetch = true;
1212 continue;
1213 };
1214 let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
1215 if is_seen
1216 && let Some(chat_id) = mark_seen_by_uid(context, transport_id, folder, uid_validity, uid)
1217 .await
1218 .with_context(|| {
1219 format!("Transport {transport_id}: Failed to update seen status for msg {folder}/{uid}")
1220 })?
1221 {
1222 updated_chat_ids.insert(chat_id);
1223 }
1224
1225 if let Some(modseq) = fetch.modseq {
1226 if modseq > highest_modseq {
1227 highest_modseq = modseq;
1228 }
1229 } else {
1230 warn!(context, "FETCH result contains no MODSEQ");
1231 }
1232 }
1233 drop(list);
1234
1235 if got_unsolicited_fetch {
1236 info!(context, "Got unsolicited fetch, will skip idle");
1241 self.new_mail = true;
1242 }
1243
1244 set_modseq(context, transport_id, folder, highest_modseq)
1245 .await
1246 .with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
1247 if !updated_chat_ids.is_empty() {
1248 context.on_archived_chats_maybe_noticed();
1249 }
1250 for updated_chat_id in updated_chat_ids {
1251 context.emit_event(EventType::MsgsNoticed(updated_chat_id));
1252 chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
1253 }
1254
1255 Ok(())
1256 }
1257
1258 #[expect(clippy::arithmetic_side_effects)]
1273 pub(crate) async fn fetch_many_msgs(
1274 &mut self,
1275 context: &Context,
1276 folder: &str,
1277 request_uids: Vec<u32>,
1278 uid_message_ids: &BTreeMap<u32, String>,
1279 received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1280 ) -> Result<()> {
1281 if request_uids.is_empty() {
1282 return Ok(());
1283 }
1284
1285 for (request_uids, set) in build_sequence_sets(&request_uids)? {
1286 info!(context, "Starting UID FETCH of message set \"{}\".", set);
1287 let mut fetch_responses = self
1288 .uid_fetch(&set, BODY_FULL)
1289 .await
1290 .with_context(|| format!("fetching messages {set} from folder {folder:?}"))?;
1291
1292 let mut uid_msgs = HashMap::with_capacity(request_uids.len());
1295
1296 let mut count = 0;
1297 for &request_uid in &request_uids {
1298 let mut fetch_response = uid_msgs.remove(&request_uid);
1300
1301 while fetch_response.is_none() {
1303 let Some(next_fetch_response) = fetch_responses
1304 .try_next()
1305 .await
1306 .context("Failed to process IMAP FETCH result")?
1307 else {
1308 break;
1310 };
1311
1312 if let Some(next_uid) = next_fetch_response.uid {
1313 if next_uid == request_uid {
1314 fetch_response = Some(next_fetch_response);
1315 } else if !request_uids.contains(&next_uid) {
1316 info!(
1323 context,
1324 "Skipping not requested FETCH response for UID {}.", next_uid
1325 );
1326 } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
1327 warn!(context, "Got duplicated UID {}.", next_uid);
1328 }
1329 } else {
1330 info!(context, "Skipping FETCH response without UID.");
1331 }
1332 }
1333
1334 let fetch_response = match fetch_response {
1335 Some(fetch) => fetch,
1336 None => {
1337 warn!(
1338 context,
1339 "Missed UID {} in the server response.", request_uid
1340 );
1341 continue;
1342 }
1343 };
1344 count += 1;
1345
1346 let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
1347 let body = fetch_response.body();
1348
1349 if is_deleted {
1350 info!(context, "Not processing deleted msg {}.", request_uid);
1351 received_msgs_channel.send((request_uid, None)).await?;
1352 continue;
1353 }
1354
1355 let body = if let Some(body) = body {
1356 body
1357 } else {
1358 info!(
1359 context,
1360 "Not processing message {} without a BODY.", request_uid
1361 );
1362 received_msgs_channel.send((request_uid, None)).await?;
1363 continue;
1364 };
1365
1366 let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
1367
1368 let Some(rfc724_mid) = uid_message_ids.get(&request_uid) else {
1369 error!(
1370 context,
1371 "No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
1372 request_uid
1373 );
1374 continue;
1375 };
1376
1377 info!(
1378 context,
1379 "Passing message UID {} to receive_imf().", request_uid
1380 );
1381 let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
1382 let received_msg = match res {
1383 Err(err) => {
1384 warn!(context, "receive_imf error: {err:#}.");
1385
1386 let text = format!(
1387 "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/.",
1388 );
1389 let mut msg = Message::new_text(text);
1390 add_device_msg(context, None, Some(&mut msg)).await?;
1391 None
1392 }
1393 Ok(msg) => msg,
1394 };
1395 received_msgs_channel
1396 .send((request_uid, received_msg))
1397 .await?;
1398 }
1399
1400 while fetch_responses
1407 .try_next()
1408 .await
1409 .context("Failed to drain FETCH responses")?
1410 .is_some()
1411 {}
1412
1413 if count != request_uids.len() {
1414 warn!(
1415 context,
1416 "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
1417 count,
1418 request_uids.len(),
1419 request_uids,
1420 );
1421 } else {
1422 info!(
1423 context,
1424 "Successfully received {} UIDs.",
1425 request_uids.len()
1426 );
1427 }
1428 }
1429
1430 Ok(())
1431 }
1432
1433 #[expect(clippy::arithmetic_side_effects)]
1439 pub(crate) async fn update_metadata(&mut self, context: &Context) -> Result<()> {
1440 let mut lock = context.metadata.write().await;
1441
1442 if !self.can_metadata() {
1443 *lock = Some(Default::default());
1444 }
1445 if let Some(ref mut old_metadata) = *lock {
1446 let now = time();
1447
1448 if now + 3600 * 12 < old_metadata.ice_servers_expiration_timestamp {
1450 return Ok(());
1451 }
1452
1453 let mut got_turn_server = false;
1454 if self.can_metadata() {
1455 info!(context, "ICE servers expired, requesting new credentials.");
1456 let mailbox = "";
1457 let options = "";
1458 let metadata = self
1459 .get_metadata(mailbox, options, "(/shared/vendor/deltachat/turn)")
1460 .await?;
1461 for m in metadata {
1462 if m.entry == "/shared/vendor/deltachat/turn"
1463 && let Some(value) = m.value
1464 {
1465 match create_ice_servers_from_metadata(&value).await {
1466 Ok((parsed_timestamp, parsed_ice_servers)) => {
1467 old_metadata.ice_servers_expiration_timestamp = parsed_timestamp;
1468 old_metadata.ice_servers = parsed_ice_servers;
1469 got_turn_server = true;
1470 }
1471 Err(err) => {
1472 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1473 }
1474 }
1475 }
1476 }
1477 }
1478 if !got_turn_server {
1479 info!(context, "Will use fallback ICE servers.");
1480 old_metadata.ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1482 old_metadata.ice_servers = create_fallback_ice_servers();
1483 }
1484 return Ok(());
1485 }
1486
1487 info!(
1488 context,
1489 "Server supports metadata, retrieving server comment and admin contact."
1490 );
1491
1492 let mut comment = None;
1493 let mut admin = None;
1494 let mut iroh_relay = None;
1495 let mut ice_servers = None;
1496 let mut ice_servers_expiration_timestamp = 0;
1497
1498 let mailbox = "";
1499 let options = "";
1500 let metadata = self
1501 .get_metadata(
1502 mailbox,
1503 options,
1504 "(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
1505 )
1506 .await?;
1507 for m in metadata {
1508 match m.entry.as_ref() {
1509 "/shared/comment" => {
1510 comment = m.value;
1511 }
1512 "/shared/admin" => {
1513 admin = m.value;
1514 }
1515 "/shared/vendor/deltachat/irohrelay" => {
1516 if let Some(value) = m.value {
1517 if let Ok(url) = Url::parse(&value) {
1518 iroh_relay = Some(url);
1519 } else {
1520 warn!(
1521 context,
1522 "Got invalid URL from iroh relay metadata: {:?}.", value
1523 );
1524 }
1525 }
1526 }
1527 "/shared/vendor/deltachat/turn" => {
1528 if let Some(value) = m.value {
1529 match create_ice_servers_from_metadata(&value).await {
1530 Ok((parsed_timestamp, parsed_ice_servers)) => {
1531 ice_servers_expiration_timestamp = parsed_timestamp;
1532 ice_servers = Some(parsed_ice_servers);
1533 }
1534 Err(err) => {
1535 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1536 }
1537 }
1538 }
1539 }
1540 _ => {}
1541 }
1542 }
1543 let ice_servers = if let Some(ice_servers) = ice_servers {
1544 ice_servers
1545 } else {
1546 ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1548 create_fallback_ice_servers()
1549 };
1550
1551 *lock = Some(ServerMetadata {
1552 comment,
1553 admin,
1554 iroh_relay,
1555 ice_servers,
1556 ice_servers_expiration_timestamp,
1557 });
1558 Ok(())
1559 }
1560
1561 pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
1563 if context.push_subscribed.load(Ordering::Relaxed) {
1564 return Ok(());
1565 }
1566
1567 let transport_id = self.transport_id();
1568
1569 let Some(device_token) = context.push_subscriber.device_token().await else {
1570 return Ok(());
1571 };
1572
1573 if self.can_metadata() && self.can_push() {
1574 info!(
1575 context,
1576 "Transport {transport_id}: Subscribing for push notifications."
1577 );
1578
1579 let old_encrypted_device_token =
1580 context.get_config(Config::EncryptedDeviceToken).await?;
1581
1582 let device_token_changed = old_encrypted_device_token.is_none()
1584 || context.get_config(Config::DeviceToken).await?.as_ref() != Some(&device_token);
1585
1586 let new_encrypted_device_token;
1587 if device_token_changed {
1588 let encrypted_device_token = encrypt_device_token(&device_token)
1589 .context("Failed to encrypt device token")?;
1590
1591 let encrypted_device_token_len = encrypted_device_token.len();
1595
1596 context
1602 .set_config_internal(Config::DeviceToken, Some(&device_token))
1603 .await?;
1604 context
1605 .set_config_internal(
1606 Config::EncryptedDeviceToken,
1607 Some(&encrypted_device_token),
1608 )
1609 .await?;
1610
1611 if encrypted_device_token_len <= 4096 {
1612 new_encrypted_device_token = Some(encrypted_device_token);
1613 } else {
1614 warn!(context, "Device token is too long for LITERAL-, ignoring.");
1624 new_encrypted_device_token = None;
1625 }
1626 } else {
1627 new_encrypted_device_token = old_encrypted_device_token;
1628 }
1629
1630 if let Some(encrypted_device_token) = new_encrypted_device_token {
1633 self.run_command_and_check_ok(&format_setmetadata(
1634 "INBOX",
1635 &encrypted_device_token,
1636 ))
1637 .await
1638 .context("SETMETADATA command failed")?;
1639
1640 context.push_subscribed.store(true, Ordering::Relaxed);
1641 }
1642 } else if !context.push_subscriber.heartbeat_subscribed().await {
1643 let context = context.clone();
1644 tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
1646 }
1647
1648 Ok(())
1649 }
1650}
1651
1652fn format_setmetadata(folder: &str, device_token: &str) -> String {
1653 let device_token_len = device_token.len();
1654 format!(
1655 "SETMETADATA \"{folder}\" (/private/devicetoken {{{device_token_len}+}}\r\n{device_token})"
1656 )
1657}
1658
1659impl Session {
1660 async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
1666 if flag == "\\Deleted" {
1667 self.selected_folder_needs_expunge = true;
1668 }
1669 let query = format!("+FLAGS ({flag})");
1670 let mut responses = self
1671 .uid_store(uid_set, &query)
1672 .await
1673 .with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
1674 while let Some(_response) = responses.try_next().await? {
1675 }
1677 Ok(())
1678 }
1679}
1680
1681impl Session {
1682 fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
1691 use UnsolicitedResponse::*;
1692 use async_imap::imap_proto::Response;
1693 use async_imap::imap_proto::ResponseCode;
1694
1695 let folder = self.selected_folder.as_deref().unwrap_or_default();
1696 let mut should_refetch = false;
1697 while let Ok(response) = self.unsolicited_responses.try_recv() {
1698 match response {
1699 Exists(_) => {
1700 info!(
1701 context,
1702 "Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
1703 );
1704 should_refetch = true;
1705 }
1706
1707 Expunge(_) | Recent(_) => {}
1708 Other(ref response_data) => {
1709 match response_data.parsed() {
1710 Response::Fetch { .. } => {
1711 info!(
1712 context,
1713 "Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
1714 );
1715 should_refetch = true;
1716 }
1717
1718 Response::Done {
1721 code: Some(ResponseCode::CopyUid(_, _, _)),
1722 ..
1723 } => {}
1724
1725 _ => {
1726 info!(context, "{folder:?}: got unsolicited response {response:?}")
1727 }
1728 }
1729 }
1730 _ => {
1731 info!(context, "{folder:?}: got unsolicited response {response:?}")
1732 }
1733 }
1734 }
1735 Ok(should_refetch)
1736 }
1737}
1738
1739async fn should_move_out_of_spam(
1740 context: &Context,
1741 headers: &[mailparse::MailHeader<'_>],
1742) -> Result<bool> {
1743 if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
1744 return Ok(true);
1755 }
1756
1757 if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
1758 if msg.chat_blocked != Blocked::Not {
1759 return Ok(false);
1761 }
1762 } else {
1763 let from = match mimeparser::get_from(headers) {
1764 Some(f) => f,
1765 None => return Ok(false),
1766 };
1767 let (from_id, blocked_contact, _origin) =
1769 match from_field_to_contact_id(context, &from, None, true, true)
1770 .await
1771 .context("from_field_to_contact_id")?
1772 {
1773 Some(res) => res,
1774 None => {
1775 warn!(
1776 context,
1777 "Contact with From address {:?} cannot exist, not moving out of spam", from
1778 );
1779 return Ok(false);
1780 }
1781 };
1782 if blocked_contact {
1783 return Ok(false);
1785 }
1786
1787 if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
1788 if chat_id_blocked.blocked != Blocked::Not {
1789 return Ok(false);
1790 }
1791 } else if from_id != ContactId::SELF {
1792 return Ok(false);
1794 }
1795 }
1796
1797 Ok(true)
1798}
1799
1800async fn spam_target_folder_cfg(
1805 context: &Context,
1806 headers: &[mailparse::MailHeader<'_>],
1807) -> Result<Option<Config>> {
1808 if !should_move_out_of_spam(context, headers).await? {
1809 return Ok(None);
1810 }
1811
1812 Ok(Some(Config::ConfiguredInboxFolder))
1813}
1814
1815pub async fn target_folder_cfg(
1818 context: &Context,
1819 folder: &str,
1820 folder_meaning: FolderMeaning,
1821 headers: &[mailparse::MailHeader<'_>],
1822) -> Result<Option<Config>> {
1823 if folder == "DeltaChat" {
1824 return Ok(None);
1825 }
1826
1827 if folder_meaning == FolderMeaning::Spam {
1828 spam_target_folder_cfg(context, headers).await
1829 } else {
1830 Ok(None)
1831 }
1832}
1833
1834pub async fn target_folder(
1835 context: &Context,
1836 folder: &str,
1837 folder_meaning: FolderMeaning,
1838 headers: &[mailparse::MailHeader<'_>],
1839) -> Result<String> {
1840 match target_folder_cfg(context, folder, folder_meaning, headers).await? {
1841 Some(config) => match context.get_config(config).await? {
1842 Some(target) => Ok(target),
1843 None => Ok(folder.to_string()),
1844 },
1845 None => Ok(folder.to_string()),
1846 }
1847}
1848
1849fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
1856 const SPAM_NAMES: &[&str] = &[
1858 "spam",
1859 "junk",
1860 "Correio electrónico não solicitado",
1861 "Correo basura",
1862 "Lixo",
1863 "Nettsøppel",
1864 "Nevyžádaná pošta",
1865 "No solicitado",
1866 "Ongewenst",
1867 "Posta indesiderata",
1868 "Skräp",
1869 "Wiadomości-śmieci",
1870 "Önemsiz",
1871 "Ανεπιθύμητα",
1872 "Спам",
1873 "垃圾邮件",
1874 "垃圾郵件",
1875 "迷惑メール",
1876 "스팸",
1877 ];
1878 const TRASH_NAMES: &[&str] = &[
1879 "Trash",
1880 "Bin",
1881 "Caixote do lixo",
1882 "Cestino",
1883 "Corbeille",
1884 "Papelera",
1885 "Papierkorb",
1886 "Papirkurv",
1887 "Papperskorgen",
1888 "Prullenbak",
1889 "Rubujo",
1890 "Κάδος απορριμμάτων",
1891 "Корзина",
1892 "Кошик",
1893 "ゴミ箱",
1894 "垃圾桶",
1895 "已删除邮件",
1896 "휴지통",
1897 ];
1898 let lower = folder_name.to_lowercase();
1899
1900 if lower == "inbox" {
1901 FolderMeaning::Inbox
1902 } else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1903 FolderMeaning::Spam
1904 } else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1905 FolderMeaning::Trash
1906 } else {
1907 FolderMeaning::Unknown
1908 }
1909}
1910
1911fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
1912 for attr in folder_attrs {
1913 match attr {
1914 NameAttribute::Trash => return FolderMeaning::Trash,
1915 NameAttribute::Junk => return FolderMeaning::Spam,
1916 NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
1917 NameAttribute::Extension(label) => {
1918 match label.as_ref() {
1919 "\\Spam" => return FolderMeaning::Spam,
1920 "\\Important" => return FolderMeaning::Virtual,
1921 _ => {}
1922 };
1923 }
1924 _ => {}
1925 }
1926 }
1927 FolderMeaning::Unknown
1928}
1929
1930pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
1931 match get_folder_meaning_by_attrs(folder.attributes()) {
1932 FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
1933 meaning => meaning,
1934 }
1935}
1936
1937fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader<'_>>> {
1939 match prefetch_msg.header() {
1940 Some(header_bytes) => {
1941 let (headers, _) = mailparse::parse_headers(header_bytes)?;
1942 Ok(headers)
1943 }
1944 None => Ok(Vec::new()),
1945 }
1946}
1947
1948pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
1949 headers
1950 .get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
1951 .or_else(|| headers.get_header_value(HeaderDef::MessageId))
1952 .and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
1953}
1954
1955pub(crate) fn create_message_id() -> String {
1956 format!("{}{}", GENERATED_PREFIX, create_id())
1957}
1958
1959pub(crate) async fn prefetch_should_download(
1961 context: &Context,
1962 headers: &[mailparse::MailHeader<'_>],
1963 message_id: &str,
1964 mut flags: impl Iterator<Item = Flag<'_>>,
1965) -> Result<bool> {
1966 if message::rfc724_mid_download_tried(context, message_id).await? {
1967 if let Some(from) = mimeparser::get_from(headers)
1968 && context.is_self_addr(&from.addr).await?
1969 {
1970 markseen_on_imap_table(context, message_id).await?;
1971 }
1972 return Ok(false);
1973 }
1974
1975 let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
1979 let from = from.to_ascii_lowercase();
1980 from.contains("mailer-daemon") || from.contains("mail-daemon")
1981 } else {
1982 false
1983 };
1984
1985 let from = match mimeparser::get_from(headers) {
1986 Some(f) => f,
1987 None => return Ok(false),
1988 };
1989 let (_from_id, blocked_contact, _origin) =
1990 match from_field_to_contact_id(context, &from, None, true, true).await? {
1991 Some(res) => res,
1992 None => return Ok(false),
1993 };
1994 if flags.any(|f| f == Flag::Draft) {
1998 info!(context, "Ignoring draft message");
1999 return Ok(false);
2000 }
2001
2002 let should_download = !blocked_contact || maybe_ndn;
2003 Ok(should_download)
2004}
2005
2006async fn mark_seen_by_uid(
2010 context: &Context,
2011 transport_id: u32,
2012 folder: &str,
2013 uid_validity: u32,
2014 uid: u32,
2015) -> Result<Option<ChatId>> {
2016 if let Some((msg_id, chat_id)) = context
2017 .sql
2018 .query_row_optional(
2019 "SELECT id, chat_id FROM msgs
2020 WHERE id > 9 AND rfc724_mid IN (
2021 SELECT rfc724_mid FROM imap
2022 WHERE transport_id=?
2023 AND folder=?
2024 AND uidvalidity=?
2025 AND uid=?
2026 LIMIT 1
2027 )",
2028 (transport_id, &folder, uid_validity, uid),
2029 |row| {
2030 let msg_id: MsgId = row.get(0)?;
2031 let chat_id: ChatId = row.get(1)?;
2032 Ok((msg_id, chat_id))
2033 },
2034 )
2035 .await
2036 .with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
2037 {
2038 let updated = context
2039 .sql
2040 .execute(
2041 "UPDATE msgs SET state=?1
2042 WHERE (state=?2 OR state=?3)
2043 AND id=?4",
2044 (
2045 MessageState::InSeen,
2046 MessageState::InFresh,
2047 MessageState::InNoticed,
2048 msg_id,
2049 ),
2050 )
2051 .await
2052 .with_context(|| format!("failed to update msg {msg_id} state"))?
2053 > 0;
2054
2055 if updated {
2056 msg_id
2057 .start_ephemeral_timer(context)
2058 .await
2059 .with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
2060 Ok(Some(chat_id))
2061 } else {
2062 Ok(None)
2064 }
2065 } else {
2066 Ok(None)
2068 }
2069}
2070
2071pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
2074 context
2075 .sql
2076 .execute(
2077 "INSERT OR IGNORE INTO imap_markseen (id)
2078 SELECT id FROM imap WHERE rfc724_mid=?",
2079 (message_id,),
2080 )
2081 .await?;
2082 context.scheduler.interrupt_inbox().await;
2083
2084 Ok(())
2085}
2086
2087pub(crate) async fn set_uid_next(
2091 context: &Context,
2092 transport_id: u32,
2093 folder: &str,
2094 uid_next: u32,
2095) -> Result<()> {
2096 context
2097 .sql
2098 .execute(
2099 "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
2100 ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
2101 (transport_id, folder, uid_next),
2102 )
2103 .await?;
2104 Ok(())
2105}
2106
2107async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2113 Ok(context
2114 .sql
2115 .query_get_value(
2116 "SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
2117 (transport_id, folder),
2118 )
2119 .await?
2120 .unwrap_or(0))
2121}
2122
2123pub(crate) async fn set_uidvalidity(
2124 context: &Context,
2125 transport_id: u32,
2126 folder: &str,
2127 uidvalidity: u32,
2128) -> Result<()> {
2129 context
2130 .sql
2131 .execute(
2132 "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
2133 ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
2134 (transport_id, folder, uidvalidity),
2135 )
2136 .await?;
2137 Ok(())
2138}
2139
2140async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2141 Ok(context
2142 .sql
2143 .query_get_value(
2144 "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
2145 (transport_id, folder),
2146 )
2147 .await?
2148 .unwrap_or(0))
2149}
2150
2151pub(crate) async fn set_modseq(
2152 context: &Context,
2153 transport_id: u32,
2154 folder: &str,
2155 modseq: u64,
2156) -> Result<()> {
2157 context
2158 .sql
2159 .execute(
2160 "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
2161 ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
2162 (transport_id, folder, modseq),
2163 )
2164 .await?;
2165 Ok(())
2166}
2167
2168async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
2169 Ok(context
2170 .sql
2171 .query_get_value(
2172 "SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
2173 (transport_id, folder),
2174 )
2175 .await?
2176 .unwrap_or(0))
2177}
2178
2179#[expect(clippy::arithmetic_side_effects)]
2183fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
2184 let mut ranges: Vec<UidRange> = vec![];
2186
2187 for ¤t in uids {
2188 if let Some(last) = ranges.last_mut()
2189 && last.end + 1 == current
2190 {
2191 last.end = current;
2192 continue;
2193 }
2194
2195 ranges.push(UidRange {
2196 start: current,
2197 end: current,
2198 });
2199 }
2200
2201 let mut result = vec![];
2203 let (mut last_uids, mut last_str) = (Vec::new(), String::new());
2204 for range in ranges {
2205 last_uids.reserve((range.end - range.start + 1).try_into()?);
2206 (range.start..=range.end).for_each(|u| last_uids.push(u));
2207 if !last_str.is_empty() {
2208 last_str.push(',');
2209 }
2210 last_str.push_str(&range.to_string());
2211
2212 if last_str.len() > 990 {
2213 result.push((take(&mut last_uids), take(&mut last_str)));
2214 }
2215 }
2216 result.push((last_uids, last_str));
2217
2218 result.retain(|(_, s)| !s.is_empty());
2219 Ok(result)
2220}
2221
2222struct UidRange {
2223 start: u32,
2224 end: u32,
2225 }
2227
2228impl std::fmt::Display for UidRange {
2229 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2230 if self.start == self.end {
2231 write!(f, "{}", self.start)
2232 } else {
2233 write!(f, "{}:{}", self.start, self.end)
2234 }
2235 }
2236}
2237
2238#[cfg(test)]
2239mod imap_tests;