1use std::{
7 cmp::max,
8 cmp::min,
9 collections::{BTreeMap, BTreeSet, HashMap},
10 iter::Peekable,
11 mem::take,
12 sync::atomic::Ordering,
13 time::{Duration, UNIX_EPOCH},
14};
15
16use anyhow::{Context as _, Result, bail, ensure, format_err};
17use async_channel::{self, Receiver, Sender};
18use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
19use futures::{FutureExt as _, TryStreamExt};
20use futures_lite::FutureExt;
21use ratelimit::Ratelimit;
22use url::Url;
23
24use crate::calls::{
25 UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata,
26};
27use crate::chat::{self, ChatId, ChatIdBlocked, add_device_msg};
28use crate::chatlist_events;
29use crate::config::Config;
30use crate::constants::{Blocked, DC_VERSION_STR};
31use crate::contact::ContactId;
32use crate::context::Context;
33use crate::ensure_and_debug_assert;
34use crate::events::EventType;
35use crate::headerdef::{HeaderDef, HeaderDefMap};
36use crate::log::{LogExt, warn};
37use crate::message::{self, Message, MessageState, MsgId};
38use crate::mimeparser;
39use crate::net::proxy::ProxyConfig;
40use crate::net::session::SessionStream;
41use crate::oauth2::get_oauth2_access_token;
42use crate::push::encrypt_device_token;
43use crate::receive_imf::{
44 ReceivedMsg, from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner,
45};
46use crate::scheduler::connectivity::ConnectivityStore;
47use crate::stock_str;
48use crate::tools::{self, create_id, duration_to_str, time};
49use crate::transport::{
50 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
51};
52
53pub(crate) mod capabilities;
54mod client;
55mod idle;
56pub mod select_folder;
57pub(crate) mod session;
58
59use client::{Client, determine_capabilities};
60use session::Session;
61
62pub(crate) const GENERATED_PREFIX: &str = "GEN_";
63
64const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
65 MESSAGE-ID \
66 X-MICROSOFT-ORIGINAL-MESSAGE-ID\
67 )])";
68const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
69
70#[derive(Debug)]
71pub(crate) struct Imap {
72 transport_id: u32,
76
77 pub(crate) idle_interrupt_receiver: Receiver<()>,
78
79 pub(crate) addr: String,
81
82 lp: Vec<ConfiguredServerLoginParam>,
84
85 password: String,
87
88 proxy_config: Option<ProxyConfig>,
90
91 strict_tls: bool,
92
93 oauth2: bool,
94
95 pub(crate) folder: String,
97
98 authentication_failed_once: bool,
99
100 pub(crate) connectivity: ConnectivityStore,
101
102 conn_last_try: tools::Time,
103 conn_backoff_ms: u64,
104
105 ratelimit: Ratelimit,
113
114 pub(crate) resync_request_sender: async_channel::Sender<()>,
116
117 pub(crate) resync_request_receiver: async_channel::Receiver<()>,
119}
120
121#[derive(Debug)]
122struct OAuth2 {
123 user: String,
124 access_token: String,
125}
126
127#[derive(Debug, Default)]
128pub(crate) struct ServerMetadata {
129 pub comment: Option<String>,
132
133 pub admin: Option<String>,
136
137 pub iroh_relay: Option<Url>,
138
139 pub ice_servers: Vec<UnresolvedIceServer>,
141
142 pub ice_servers_expiration_timestamp: i64,
149}
150
151impl async_imap::Authenticator for OAuth2 {
152 type Response = String;
153
154 fn process(&mut self, _data: &[u8]) -> Self::Response {
155 format!(
156 "user={}\x01auth=Bearer {}\x01\x01",
157 self.user, self.access_token
158 )
159 }
160}
161
162#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
163pub enum FolderMeaning {
164 Unknown,
165
166 Spam,
168 Inbox,
169 Trash,
170
171 Virtual,
178}
179
180struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
181 inner: Peekable<T>,
182}
183
184impl<T, I> From<I> for UidGrouper<T>
185where
186 T: Iterator<Item = (i64, u32, String)>,
187 I: IntoIterator<IntoIter = T>,
188{
189 fn from(inner: I) -> Self {
190 Self {
191 inner: inner.into_iter().peekable(),
192 }
193 }
194}
195
196impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
197 type Item = (String, Vec<i64>, String);
199
200 #[expect(clippy::arithmetic_side_effects)]
201 fn next(&mut self) -> Option<Self::Item> {
202 let (_, _, folder) = self.inner.peek().cloned()?;
203
204 let mut uid_set = String::new();
205 let mut rowid_set = Vec::new();
206
207 while uid_set.len() < 1000 {
208 if let Some((start_rowid, start_uid, _)) = self
210 .inner
211 .next_if(|(_, _, start_folder)| start_folder == &folder)
212 {
213 rowid_set.push(start_rowid);
214 let mut end_uid = start_uid;
215
216 while let Some((next_rowid, next_uid, _)) =
217 self.inner.next_if(|(_, next_uid, next_folder)| {
218 next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
219 })
220 {
221 end_uid = next_uid;
222 rowid_set.push(next_rowid);
223 }
224
225 let uid_range = UidRange {
226 start: start_uid,
227 end: end_uid,
228 };
229 if !uid_set.is_empty() {
230 uid_set.push(',');
231 }
232 uid_set.push_str(&uid_range.to_string());
233 } else {
234 break;
235 }
236 }
237
238 Some((folder, rowid_set, uid_set))
239 }
240}
241
242impl Imap {
243 pub async fn new(
245 context: &Context,
246 transport_id: u32,
247 param: ConfiguredLoginParam,
248 idle_interrupt_receiver: Receiver<()>,
249 ) -> Result<Self> {
250 let lp = param.imap.clone();
251 let password = param.imap_password.clone();
252 let proxy_config = ProxyConfig::load(context).await?;
253 let addr = ¶m.addr;
254 let strict_tls = param.strict_tls(proxy_config.is_some());
255 let oauth2 = param.oauth2;
256 let folder = param
257 .imap_folder
258 .clone()
259 .unwrap_or_else(|| "INBOX".to_string());
260 ensure_and_debug_assert!(!folder.is_empty(), "Watched folder name cannot be empty");
261 let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
262 Ok(Imap {
263 transport_id,
264 idle_interrupt_receiver,
265 addr: addr.to_string(),
266 lp,
267 password,
268 proxy_config,
269 strict_tls,
270 oauth2,
271 folder,
272 authentication_failed_once: false,
273 connectivity: Default::default(),
274 conn_last_try: UNIX_EPOCH,
275 conn_backoff_ms: 0,
276 ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
278 resync_request_sender,
279 resync_request_receiver,
280 })
281 }
282
283 pub async fn new_configured(
285 context: &Context,
286 idle_interrupt_receiver: Receiver<()>,
287 ) -> Result<Self> {
288 let (transport_id, param) = ConfiguredLoginParam::load(context)
289 .await?
290 .context("Not configured")?;
291 let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
292 Ok(imap)
293 }
294
295 pub fn transport_id(&self) -> u32 {
297 self.transport_id
298 }
299
300 pub(crate) async fn connect(
306 &mut self,
307 context: &Context,
308 configuring: bool,
309 ) -> Result<Session> {
310 let now = tools::Time::now();
311 let until_can_send = max(
312 min(self.conn_last_try, now)
313 .checked_add(Duration::from_millis(self.conn_backoff_ms))
314 .unwrap_or(now),
315 now,
316 )
317 .duration_since(now)?;
318 let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
319 if !ratelimit_duration.is_zero() {
320 warn!(
321 context,
322 "IMAP got rate limited, waiting for {} until can connect.",
323 duration_to_str(ratelimit_duration),
324 );
325 let interrupted = async {
326 tokio::time::sleep(ratelimit_duration).await;
327 false
328 }
329 .race(self.idle_interrupt_receiver.recv().map(|_| true))
330 .await;
331 if interrupted {
332 info!(
333 context,
334 "Connecting to IMAP without waiting for ratelimit due to interrupt."
335 );
336 }
337 }
338
339 info!(context, "Connecting to IMAP server.");
340 self.connectivity.set_connecting(context);
341
342 self.conn_last_try = tools::Time::now();
343 const BACKOFF_MIN_MS: u64 = 2000;
344 const BACKOFF_MAX_MS: u64 = 80_000;
345 self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
346 self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(rand::random_range(
347 (self.conn_backoff_ms / 2)..=self.conn_backoff_ms,
348 ));
349 self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
350
351 let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
352 let mut first_error = None;
353 for lp in login_params {
354 info!(context, "IMAP trying to connect to {}.", &lp.connection);
355 let connection_candidate = lp.connection.clone();
356 let client = match Client::connect(
357 context,
358 self.proxy_config.clone(),
359 self.strict_tls,
360 &connection_candidate,
361 )
362 .await
363 .with_context(|| format!("IMAP failed to connect to {connection_candidate}"))
364 {
365 Ok(client) => client,
366 Err(err) => {
367 warn!(context, "{err:#}.");
368 first_error.get_or_insert(err);
369 continue;
370 }
371 };
372
373 self.conn_backoff_ms = BACKOFF_MIN_MS;
374 self.ratelimit.send();
375
376 let imap_user: &str = lp.user.as_ref();
377 let imap_pw: &str = &self.password;
378
379 let login_res = if self.oauth2 {
380 info!(context, "Logging into IMAP server with OAuth 2.");
381 let addr: &str = self.addr.as_ref();
382
383 let token = get_oauth2_access_token(context, addr, imap_pw, true)
384 .await?
385 .context("IMAP could not get OAUTH token")?;
386 let auth = OAuth2 {
387 user: imap_user.into(),
388 access_token: token,
389 };
390 client.authenticate("XOAUTH2", auth).await
391 } else {
392 info!(context, "Logging into IMAP server with LOGIN.");
393 client.login(imap_user, imap_pw).await
394 };
395
396 match login_res {
397 Ok(mut session) => {
398 let capabilities = determine_capabilities(&mut session).await?;
399 let resync_request_sender = self.resync_request_sender.clone();
400
401 let session = if capabilities.can_compress {
402 info!(context, "Enabling IMAP compression.");
403 let compressed_session = session
404 .compress(|s| {
405 let session_stream: Box<dyn SessionStream> = Box::new(s);
406 session_stream
407 })
408 .await
409 .context("Failed to enable IMAP compression")?;
410 Session::new(
411 compressed_session,
412 capabilities,
413 resync_request_sender,
414 self.transport_id,
415 )
416 } else {
417 Session::new(
418 session,
419 capabilities,
420 resync_request_sender,
421 self.transport_id,
422 )
423 };
424
425 let mut lock = context.server_id.write().await;
427 lock.clone_from(&session.capabilities.server_id);
428
429 self.authentication_failed_once = false;
430 context.emit_event(EventType::ImapConnected(format!(
431 "IMAP-LOGIN as {}",
432 lp.user
433 )));
434 self.connectivity.set_preparing(context);
435 info!(context, "Successfully logged into IMAP server.");
436 return Ok(session);
437 }
438
439 Err(err) => {
440 let imap_user = lp.user.to_owned();
441 let message = stock_str::cannot_login(context, &imap_user);
442
443 warn!(context, "IMAP failed to login: {err:#}.");
444 first_error.get_or_insert(format_err!("{message} ({err:#})"));
445
446 let _lock = context.wrong_pw_warning_mutex.lock().await;
448 if err.to_string().to_lowercase().contains("authentication") {
449 if self.authentication_failed_once
450 && !configuring
451 && context.get_config_bool(Config::NotifyAboutWrongPw).await?
452 {
453 let mut msg = Message::new_text(message);
454 if let Err(e) = chat::add_device_msg_with_importance(
455 context,
456 None,
457 Some(&mut msg),
458 true,
459 )
460 .await
461 {
462 warn!(context, "Failed to add device message: {e:#}.");
463 } else {
464 context
465 .set_config_internal(Config::NotifyAboutWrongPw, None)
466 .await
467 .log_err(context)
468 .ok();
469 }
470 } else {
471 self.authentication_failed_once = true;
472 }
473 } else {
474 self.authentication_failed_once = false;
475 }
476 }
477 }
478 }
479
480 Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
481 }
482
483 pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
488 let configuring = false;
489 let session = match self.connect(context, configuring).await {
490 Ok(session) => session,
491 Err(err) => {
492 self.connectivity.set_err(context, &err);
493 return Err(err);
494 }
495 };
496
497 Ok(session)
498 }
499
500 pub async fn fetch_move_delete(
505 &mut self,
506 context: &Context,
507 session: &mut Session,
508 watch_folder: &str,
509 ) -> Result<()> {
510 ensure_and_debug_assert!(!watch_folder.is_empty(), "Watched folder cannot be empty");
511 if !context.sql.is_open().await {
512 bail!("IMAP operation attempted while it is torn down");
514 }
515
516 let msgs_fetched = self
517 .fetch_new_messages(context, session, watch_folder)
518 .await
519 .context("fetch_new_messages")?;
520 if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
521 context.scheduler.interrupt_ephemeral_task().await;
526 }
527
528 session
529 .move_delete_messages(context, watch_folder)
530 .await
531 .context("move_delete_messages")?;
532
533 Ok(())
534 }
535
536 #[expect(clippy::arithmetic_side_effects)]
540 pub(crate) async fn fetch_new_messages(
541 &mut self,
542 context: &Context,
543 session: &mut Session,
544 folder: &str,
545 ) -> Result<bool> {
546 let transport_id = session.transport_id();
547
548 let folder_exists = session
549 .select_with_uidvalidity(context, folder)
550 .await
551 .with_context(|| format!("Failed to select folder {folder:?}"))?;
552
553 if !session.new_mail {
554 info!(
555 context,
556 "Transport {transport_id}: No new emails in folder {folder:?}."
557 );
558 return Ok(false);
559 }
560 session.new_mail = false;
563
564 if !folder_exists {
565 return Ok(false);
566 }
567
568 let mut read_cnt = 0;
569 loop {
570 let (n, fetch_more) =
571 Box::pin(self.fetch_new_msg_batch(context, session, folder)).await?;
572 read_cnt += n;
573 if !fetch_more {
574 return Ok(read_cnt > 0);
575 }
576 }
577 }
578
579 #[expect(clippy::arithmetic_side_effects)]
581 async fn fetch_new_msg_batch(
582 &mut self,
583 context: &Context,
584 session: &mut Session,
585 folder: &str,
586 ) -> Result<(usize, bool)> {
587 let transport_id = self.transport_id;
588 let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
589 let old_uid_next = get_uid_next(context, transport_id, folder).await?;
590 info!(
591 context,
592 "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
593 );
594
595 let uids_to_prefetch = 500;
596 let msgs = session
597 .prefetch(old_uid_next, uids_to_prefetch)
598 .await
599 .context("prefetch")?;
600 let read_cnt = msgs.len();
601 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
602
603 let mut uids_fetch: Vec<u32> = Vec::new();
604 let mut available_post_msgs: Vec<String> = Vec::new();
605 let mut download_later: Vec<String> = Vec::new();
606 let mut uid_message_ids = BTreeMap::new();
607 let mut largest_uid_skipped = None;
608
609 let download_limit: Option<u32> = context
610 .get_config_parsed(Config::DownloadLimit)
611 .await?
612 .filter(|&l| 0 < l);
613
614 for (uid, ref fetch_response) in msgs {
616 let headers = match get_fetch_headers(fetch_response) {
617 Ok(headers) => headers,
618 Err(err) => {
619 warn!(context, "Failed to parse FETCH headers: {err:#}.");
620 continue;
621 }
622 };
623
624 let message_id = prefetch_get_message_id(&headers);
625 let size = fetch_response
626 .size
627 .context("imap fetch response does not contain size")?;
628
629 let delete = if let Some(message_id) = &message_id {
640 message::rfc724_mid_exists_ex(context, message_id, "deleted=1")
641 .await?
642 .is_some_and(|(_msg_id, deleted)| deleted)
643 } else {
644 false
645 };
646
647 let message_id = message_id.unwrap_or_else(create_message_id);
650
651 if delete {
652 info!(context, "Deleting locally deleted message {message_id}.");
653 }
654
655 let target = if delete { "" } else { folder };
656
657 context
658 .sql
659 .execute(
660 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
661 VALUES (?, ?, ?, ?, ?, ?)
662 ON CONFLICT(transport_id, folder, uid, uidvalidity)
663 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
664 target=excluded.target",
665 (
666 self.transport_id,
667 &message_id,
668 &folder,
669 uid,
670 uid_validity,
671 target,
672 ),
673 )
674 .await?;
675
676 if folder == target
683 && prefetch_should_download(context, &headers, &message_id, fetch_response.flags())
684 .await
685 .context("prefetch_should_download")?
686 {
687 if headers
688 .get_header_value(HeaderDef::ChatIsPostMessage)
689 .is_some()
690 {
691 info!(context, "{message_id:?} is a post-message.");
692 available_post_msgs.push(message_id.clone());
693
694 let is_bot = context.get_config_bool(Config::Bot).await?;
695 if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
696 {
697 uids_fetch.push(uid);
698 uid_message_ids.insert(uid, message_id);
699 } else {
700 if download_limit.is_none_or(|download_limit| size <= download_limit) {
701 download_later.push(message_id.clone());
704 }
705 largest_uid_skipped = Some(uid);
706 }
707 } else {
708 info!(context, "{message_id:?} is not a post-message.");
709 if download_limit.is_none_or(|download_limit| size <= download_limit) {
710 uids_fetch.push(uid);
711 uid_message_ids.insert(uid, message_id);
712 } else {
713 download_later.push(message_id.clone());
714 largest_uid_skipped = Some(uid);
715 }
716 };
717 } else {
718 largest_uid_skipped = Some(uid);
719 }
720 }
721
722 if !uids_fetch.is_empty() {
723 self.connectivity.set_working(context);
724 }
725
726 let (sender, receiver) = async_channel::unbounded();
727
728 let mut received_msgs = Vec::with_capacity(uids_fetch.len());
729 let mailbox_uid_next = session
730 .selected_mailbox
731 .as_ref()
732 .with_context(|| format!("Expected {folder:?} to be selected"))?
733 .uid_next
734 .unwrap_or_default();
735
736 let update_uids_future = async {
737 let mut largest_uid_fetched: u32 = 0;
738
739 while let Ok((uid, received_msg_opt)) = receiver.recv().await {
740 largest_uid_fetched = max(largest_uid_fetched, uid);
741 if let Some(received_msg) = received_msg_opt {
742 received_msgs.push(received_msg)
743 }
744 }
745
746 largest_uid_fetched
747 };
748
749 let actually_download_messages_future = async {
750 session
751 .fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
752 .await
753 .context("fetch_many_msgs")
754 };
755
756 let (largest_uid_fetched, fetch_res) =
757 tokio::join!(update_uids_future, actually_download_messages_future);
758
759 let mut new_uid_next = largest_uid_fetched + 1;
765 let fetch_more = fetch_res.is_ok() && {
766 let prefetch_uid_next = old_uid_next + uids_to_prefetch;
767 new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
771
772 new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
773
774 prefetch_uid_next < mailbox_uid_next
775 };
776 if new_uid_next > old_uid_next {
777 set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
778 }
779
780 info!(context, "{} mails read from \"{}\".", read_cnt, folder);
781
782 if !received_msgs.is_empty() {
783 context.emit_event(EventType::IncomingMsgBunch);
784 }
785
786 chat::mark_old_messages_as_noticed(context, received_msgs).await?;
787
788 if fetch_res.is_ok() {
789 info!(
790 context,
791 "available_post_msgs: {}, download_later: {}.",
792 available_post_msgs.len(),
793 download_later.len(),
794 );
795 let trans_fn = |t: &mut rusqlite::Transaction| {
796 let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
797 for rfc724_mid in available_post_msgs {
798 stmt.execute((rfc724_mid,))
799 .context("INSERT OR IGNORE INTO available_post_msgs")?;
800 }
801 let mut stmt =
802 t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
803 for rfc724_mid in download_later {
804 stmt.execute((rfc724_mid,))
805 .context("INSERT OR IGNORE INTO download")?;
806 }
807 Ok(())
808 };
809 context.sql.transaction(trans_fn).await?;
810 }
811
812 fetch_res?;
815
816 Ok((read_cnt, fetch_more))
817 }
818}
819
820impl Session {
821 pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
823 let all_folders = self
824 .list_folders()
825 .await
826 .context("listing folders for resync")?;
827 for folder in all_folders {
828 let folder_meaning = get_folder_meaning(&folder);
829 if !matches!(
830 folder_meaning,
831 FolderMeaning::Virtual | FolderMeaning::Unknown
832 ) {
833 self.resync_folder_uids(context, folder.name(), folder_meaning)
834 .await?;
835 }
836 }
837 Ok(())
838 }
839
840 pub(crate) async fn resync_folder_uids(
847 &mut self,
848 context: &Context,
849 folder: &str,
850 folder_meaning: FolderMeaning,
851 ) -> Result<()> {
852 let uid_validity;
853 let mut msgs = BTreeMap::new();
855
856 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
857 let transport_id = self.transport_id();
858 if folder_exists {
859 let mut list = self
860 .uid_fetch("1:*", RFC724MID_UID)
861 .await
862 .with_context(|| format!("Can't resync folder {folder}"))?;
863 while let Some(fetch) = list.try_next().await? {
864 let headers = match get_fetch_headers(&fetch) {
865 Ok(headers) => headers,
866 Err(err) => {
867 warn!(context, "Failed to parse FETCH headers: {}", err);
868 continue;
869 }
870 };
871 let message_id = prefetch_get_message_id(&headers);
872
873 if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
874 msgs.insert(
875 uid,
876 (
877 rfc724_mid,
878 target_folder(context, folder, folder_meaning, &headers).await?,
879 ),
880 );
881 }
882 }
883
884 info!(
885 context,
886 "resync_folder_uids: Collected {} message IDs in {folder}.",
887 msgs.len(),
888 );
889
890 uid_validity = get_uidvalidity(context, transport_id, folder).await?;
891 } else {
892 warn!(context, "resync_folder_uids: No folder {folder}.");
893 uid_validity = 0;
894 }
895
896 context
898 .sql
899 .transaction(move |transaction| {
900 transaction.execute("DELETE FROM imap WHERE transport_id=? AND folder=?", (transport_id, folder,))?;
901 for (uid, (rfc724_mid, target)) in &msgs {
902 transaction.execute(
905 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
906 VALUES (?, ?, ?, ?, ?, ?)
907 ON CONFLICT(transport_id, folder, uid, uidvalidity)
908 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
909 target=excluded.target",
910 (transport_id, rfc724_mid, folder, uid, uid_validity, target),
911 )?;
912 }
913 Ok(())
914 })
915 .await?;
916 Ok(())
917 }
918
919 async fn delete_message_batch(
922 &mut self,
923 context: &Context,
924 uid_set: &str,
925 row_ids: Vec<i64>,
926 ) -> Result<()> {
927 self.add_flag_finalized_with_set(uid_set, "\\Deleted")
929 .await?;
930 context
931 .sql
932 .transaction(|transaction| {
933 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
934 for row_id in row_ids {
935 stmt.execute((row_id,))?;
936 }
937 Ok(())
938 })
939 .await
940 .context("Cannot remove deleted messages from imap table")?;
941
942 context.emit_event(EventType::ImapMessageDeleted(format!(
943 "IMAP messages {uid_set} marked as deleted"
944 )));
945 Ok(())
946 }
947
948 async fn move_message_batch(
951 &mut self,
952 context: &Context,
953 set: &str,
954 row_ids: Vec<i64>,
955 target: &str,
956 ) -> Result<()> {
957 if self.can_move() {
958 match self.uid_mv(set, &target).await {
959 Ok(()) => {
960 context
962 .sql
963 .transaction(|transaction| {
964 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
965 for row_id in row_ids {
966 stmt.execute((row_id,))?;
967 }
968 Ok(())
969 })
970 .await
971 .context("Cannot delete moved messages from imap table")?;
972 context.emit_event(EventType::ImapMessageMoved(format!(
973 "IMAP messages {set} moved to {target}"
974 )));
975 return Ok(());
976 }
977 Err(err) => {
978 warn!(
979 context,
980 "Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
981 set,
982 target,
983 err
984 );
985 }
986 }
987 }
988
989 info!(
992 context,
993 "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
994 );
995 self.uid_copy(&set, &target).await?;
996 context
997 .sql
998 .transaction(|transaction| {
999 let mut stmt = transaction.prepare("UPDATE imap SET target='' WHERE id = ?")?;
1000 for row_id in row_ids {
1001 stmt.execute((row_id,))?;
1002 }
1003 Ok(())
1004 })
1005 .await
1006 .context("Cannot plan deletion of messages")?;
1007 context.emit_event(EventType::ImapMessageMoved(format!(
1008 "IMAP messages {set} copied to {target}"
1009 )));
1010 Ok(())
1011 }
1012
1013 async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
1017 let transport_id = self.transport_id();
1018 let rows = context
1019 .sql
1020 .query_map_vec(
1021 "SELECT id, uid, target FROM imap
1022 WHERE folder = ?
1023 AND transport_id = ?
1024 AND target != folder
1025 ORDER BY target, uid",
1026 (folder, transport_id),
1027 |row| {
1028 let rowid: i64 = row.get(0)?;
1029 let uid: u32 = row.get(1)?;
1030 let target: String = row.get(2)?;
1031 Ok((rowid, uid, target))
1032 },
1033 )
1034 .await?;
1035
1036 for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
1037 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1042 ensure!(folder_exists, "No folder {folder}");
1043
1044 if target.is_empty() {
1046 self.delete_message_batch(context, &uid_set, rowid_set)
1047 .await
1048 .with_context(|| format!("cannot delete batch of messages {:?}", &uid_set))?;
1049 } else {
1050 self.move_message_batch(context, &uid_set, rowid_set, &target)
1051 .await
1052 .with_context(|| {
1053 format!(
1054 "cannot move batch of messages {:?} to folder {:?}",
1055 &uid_set, target
1056 )
1057 })?;
1058 }
1059 }
1060
1061 if let Err(err) = self.maybe_close_folder(context).await {
1064 warn!(context, "Failed to close folder: {err:#}.");
1065 }
1066
1067 Ok(())
1068 }
1069
1070 pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
1072 if context.get_config_bool(Config::TeamProfile).await? {
1073 return Ok(());
1074 }
1075
1076 context
1077 .sql
1078 .execute(
1079 "DELETE FROM imap_markseen WHERE id NOT IN (SELECT imap.id FROM imap)",
1080 (),
1081 )
1082 .await?;
1083
1084 let transport_id = self.transport_id();
1085 let mut rows = context
1086 .sql
1087 .query_map_vec(
1088 "SELECT imap.id, uid, folder FROM imap, imap_markseen
1089 WHERE imap.id = imap_markseen.id
1090 AND imap.transport_id=?
1091 AND target = folder",
1092 (transport_id,),
1093 |row| {
1094 let rowid: i64 = row.get(0)?;
1095 let uid: u32 = row.get(1)?;
1096 let folder: String = row.get(2)?;
1097 Ok((rowid, uid, folder))
1098 },
1099 )
1100 .await?;
1101
1102 rows.sort_unstable_by(|(_rowid1, uid1, folder1), (_rowid2, uid2, folder2)| {
1109 (folder1, uid1).cmp(&(folder2, uid2))
1110 });
1111
1112 for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
1113 let folder_exists = match self.select_with_uidvalidity(context, &folder).await {
1114 Err(err) => {
1115 warn!(
1116 context,
1117 "store_seen_flags_on_imap: Failed to select {folder}, will retry later: {err:#}."
1118 );
1119 continue;
1120 }
1121 Ok(folder_exists) => folder_exists,
1122 };
1123 if !folder_exists {
1124 warn!(context, "store_seen_flags_on_imap: No folder {folder}.");
1125 } else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
1126 warn!(
1127 context,
1128 "Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}."
1129 );
1130 continue;
1131 } else {
1132 info!(
1133 context,
1134 "Marked messages {} in folder {} as seen.", uid_set, folder
1135 );
1136 }
1137 context
1138 .sql
1139 .transaction(|transaction| {
1140 let mut stmt = transaction.prepare("DELETE FROM imap_markseen WHERE id = ?")?;
1141 for rowid in rowid_set {
1142 stmt.execute((rowid,))?;
1143 }
1144 Ok(())
1145 })
1146 .await
1147 .context("Cannot remove messages marked as seen from imap_markseen table")?;
1148 }
1149
1150 Ok(())
1151 }
1152
1153 pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
1155 if !self.can_condstore() {
1156 info!(
1157 context,
1158 "Server does not support CONDSTORE, skipping flag synchronization."
1159 );
1160 return Ok(());
1161 }
1162
1163 if context.get_config_bool(Config::TeamProfile).await? {
1164 return Ok(());
1165 }
1166
1167 let folder_exists = self
1168 .select_with_uidvalidity(context, folder)
1169 .await
1170 .context("Failed to select folder")?;
1171 if !folder_exists {
1172 return Ok(());
1173 }
1174
1175 let mailbox = self
1176 .selected_mailbox
1177 .as_ref()
1178 .with_context(|| format!("No mailbox selected, folder: {folder}"))?;
1179
1180 if mailbox.highest_modseq.is_none() {
1183 info!(
1184 context,
1185 "Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
1186 );
1187 return Ok(());
1188 }
1189
1190 let transport_id = self.transport_id();
1191 let mut updated_chat_ids = BTreeSet::new();
1192 let uid_validity = get_uidvalidity(context, transport_id, folder)
1193 .await
1194 .with_context(|| format!("failed to get UID validity for folder {folder}"))?;
1195 let mut highest_modseq = get_modseq(context, transport_id, folder)
1196 .await
1197 .with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
1198 let mut list = self
1199 .uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
1200 .await
1201 .context("failed to fetch flags")?;
1202
1203 let mut got_unsolicited_fetch = false;
1204
1205 while let Some(fetch) = list
1206 .try_next()
1207 .await
1208 .context("failed to get FETCH result")?
1209 {
1210 let uid = if let Some(uid) = fetch.uid {
1211 uid
1212 } else {
1213 info!(context, "FETCH result contains no UID, skipping");
1214 got_unsolicited_fetch = true;
1215 continue;
1216 };
1217 let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
1218 if is_seen
1219 && let Some(chat_id) = mark_seen_by_uid(context, transport_id, folder, uid_validity, uid)
1220 .await
1221 .with_context(|| {
1222 format!("Transport {transport_id}: Failed to update seen status for msg {folder}/{uid}")
1223 })?
1224 {
1225 updated_chat_ids.insert(chat_id);
1226 }
1227
1228 if let Some(modseq) = fetch.modseq {
1229 if modseq > highest_modseq {
1230 highest_modseq = modseq;
1231 }
1232 } else {
1233 warn!(context, "FETCH result contains no MODSEQ");
1234 }
1235 }
1236 drop(list);
1237
1238 if got_unsolicited_fetch {
1239 info!(context, "Got unsolicited fetch, will skip idle");
1244 self.new_mail = true;
1245 }
1246
1247 set_modseq(context, transport_id, folder, highest_modseq)
1248 .await
1249 .with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
1250 if !updated_chat_ids.is_empty() {
1251 context.on_archived_chats_maybe_noticed();
1252 }
1253 for updated_chat_id in updated_chat_ids {
1254 context.emit_event(EventType::MsgsNoticed(updated_chat_id));
1255 chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
1256 }
1257
1258 Ok(())
1259 }
1260
1261 #[expect(clippy::arithmetic_side_effects)]
1276 pub(crate) async fn fetch_many_msgs(
1277 &mut self,
1278 context: &Context,
1279 folder: &str,
1280 request_uids: Vec<u32>,
1281 uid_message_ids: &BTreeMap<u32, String>,
1282 received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1283 ) -> Result<()> {
1284 if request_uids.is_empty() {
1285 return Ok(());
1286 }
1287
1288 for (request_uids, set) in build_sequence_sets(&request_uids)? {
1289 info!(context, "Starting UID FETCH of message set \"{}\".", set);
1290 let mut fetch_responses = self.uid_fetch(&set, BODY_FULL).await.with_context(|| {
1291 format!("fetching messages {} from folder \"{}\"", &set, folder)
1292 })?;
1293
1294 let mut uid_msgs = HashMap::with_capacity(request_uids.len());
1297
1298 let mut count = 0;
1299 for &request_uid in &request_uids {
1300 let mut fetch_response = uid_msgs.remove(&request_uid);
1302
1303 while fetch_response.is_none() {
1305 let Some(next_fetch_response) = fetch_responses
1306 .try_next()
1307 .await
1308 .context("Failed to process IMAP FETCH result")?
1309 else {
1310 break;
1312 };
1313
1314 if let Some(next_uid) = next_fetch_response.uid {
1315 if next_uid == request_uid {
1316 fetch_response = Some(next_fetch_response);
1317 } else if !request_uids.contains(&next_uid) {
1318 info!(
1325 context,
1326 "Skipping not requested FETCH response for UID {}.", next_uid
1327 );
1328 } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
1329 warn!(context, "Got duplicated UID {}.", next_uid);
1330 }
1331 } else {
1332 info!(context, "Skipping FETCH response without UID.");
1333 }
1334 }
1335
1336 let fetch_response = match fetch_response {
1337 Some(fetch) => fetch,
1338 None => {
1339 warn!(
1340 context,
1341 "Missed UID {} in the server response.", request_uid
1342 );
1343 continue;
1344 }
1345 };
1346 count += 1;
1347
1348 let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
1349 let body = fetch_response.body();
1350
1351 if is_deleted {
1352 info!(context, "Not processing deleted msg {}.", request_uid);
1353 received_msgs_channel.send((request_uid, None)).await?;
1354 continue;
1355 }
1356
1357 let body = if let Some(body) = body {
1358 body
1359 } else {
1360 info!(
1361 context,
1362 "Not processing message {} without a BODY.", request_uid
1363 );
1364 received_msgs_channel.send((request_uid, None)).await?;
1365 continue;
1366 };
1367
1368 let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
1369
1370 let Some(rfc724_mid) = uid_message_ids.get(&request_uid) else {
1371 error!(
1372 context,
1373 "No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
1374 request_uid
1375 );
1376 continue;
1377 };
1378
1379 info!(
1380 context,
1381 "Passing message UID {} to receive_imf().", request_uid
1382 );
1383 let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
1384 let received_msg = match res {
1385 Err(err) => {
1386 warn!(context, "receive_imf error: {err:#}.");
1387
1388 let text = format!(
1389 "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/.",
1390 );
1391 let mut msg = Message::new_text(text);
1392 add_device_msg(context, None, Some(&mut msg)).await?;
1393 None
1394 }
1395 Ok(msg) => msg,
1396 };
1397 received_msgs_channel
1398 .send((request_uid, received_msg))
1399 .await?;
1400 }
1401
1402 while fetch_responses
1409 .try_next()
1410 .await
1411 .context("Failed to drain FETCH responses")?
1412 .is_some()
1413 {}
1414
1415 if count != request_uids.len() {
1416 warn!(
1417 context,
1418 "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
1419 count,
1420 request_uids.len(),
1421 request_uids,
1422 );
1423 } else {
1424 info!(
1425 context,
1426 "Successfully received {} UIDs.",
1427 request_uids.len()
1428 );
1429 }
1430 }
1431
1432 Ok(())
1433 }
1434
1435 #[expect(clippy::arithmetic_side_effects)]
1441 pub(crate) async fn update_metadata(&mut self, context: &Context) -> Result<()> {
1442 let mut lock = context.metadata.write().await;
1443
1444 if !self.can_metadata() {
1445 *lock = Some(Default::default());
1446 }
1447 if let Some(ref mut old_metadata) = *lock {
1448 let now = time();
1449
1450 if now + 3600 * 12 < old_metadata.ice_servers_expiration_timestamp {
1452 return Ok(());
1453 }
1454
1455 let mut got_turn_server = false;
1456 if self.can_metadata() {
1457 info!(context, "ICE servers expired, requesting new credentials.");
1458 let mailbox = "";
1459 let options = "";
1460 let metadata = self
1461 .get_metadata(mailbox, options, "(/shared/vendor/deltachat/turn)")
1462 .await?;
1463 for m in metadata {
1464 if m.entry == "/shared/vendor/deltachat/turn"
1465 && let Some(value) = m.value
1466 {
1467 match create_ice_servers_from_metadata(&value).await {
1468 Ok((parsed_timestamp, parsed_ice_servers)) => {
1469 old_metadata.ice_servers_expiration_timestamp = parsed_timestamp;
1470 old_metadata.ice_servers = parsed_ice_servers;
1471 got_turn_server = true;
1472 }
1473 Err(err) => {
1474 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1475 }
1476 }
1477 }
1478 }
1479 }
1480 if !got_turn_server {
1481 info!(context, "Will use fallback ICE servers.");
1482 old_metadata.ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1484 old_metadata.ice_servers = create_fallback_ice_servers();
1485 }
1486 return Ok(());
1487 }
1488
1489 info!(
1490 context,
1491 "Server supports metadata, retrieving server comment and admin contact."
1492 );
1493
1494 let mut comment = None;
1495 let mut admin = None;
1496 let mut iroh_relay = None;
1497 let mut ice_servers = None;
1498 let mut ice_servers_expiration_timestamp = 0;
1499
1500 let mailbox = "";
1501 let options = "";
1502 let metadata = self
1503 .get_metadata(
1504 mailbox,
1505 options,
1506 "(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
1507 )
1508 .await?;
1509 for m in metadata {
1510 match m.entry.as_ref() {
1511 "/shared/comment" => {
1512 comment = m.value;
1513 }
1514 "/shared/admin" => {
1515 admin = m.value;
1516 }
1517 "/shared/vendor/deltachat/irohrelay" => {
1518 if let Some(value) = m.value {
1519 if let Ok(url) = Url::parse(&value) {
1520 iroh_relay = Some(url);
1521 } else {
1522 warn!(
1523 context,
1524 "Got invalid URL from iroh relay metadata: {:?}.", value
1525 );
1526 }
1527 }
1528 }
1529 "/shared/vendor/deltachat/turn" => {
1530 if let Some(value) = m.value {
1531 match create_ice_servers_from_metadata(&value).await {
1532 Ok((parsed_timestamp, parsed_ice_servers)) => {
1533 ice_servers_expiration_timestamp = parsed_timestamp;
1534 ice_servers = Some(parsed_ice_servers);
1535 }
1536 Err(err) => {
1537 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1538 }
1539 }
1540 }
1541 }
1542 _ => {}
1543 }
1544 }
1545 let ice_servers = if let Some(ice_servers) = ice_servers {
1546 ice_servers
1547 } else {
1548 ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1550 create_fallback_ice_servers()
1551 };
1552
1553 *lock = Some(ServerMetadata {
1554 comment,
1555 admin,
1556 iroh_relay,
1557 ice_servers,
1558 ice_servers_expiration_timestamp,
1559 });
1560 Ok(())
1561 }
1562
1563 pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
1565 if context.push_subscribed.load(Ordering::Relaxed) {
1566 return Ok(());
1567 }
1568
1569 let transport_id = self.transport_id();
1570
1571 let Some(device_token) = context.push_subscriber.device_token().await else {
1572 return Ok(());
1573 };
1574
1575 if self.can_metadata() && self.can_push() {
1576 info!(
1577 context,
1578 "Transport {transport_id}: Subscribing for push notifications."
1579 );
1580
1581 let old_encrypted_device_token =
1582 context.get_config(Config::EncryptedDeviceToken).await?;
1583
1584 let device_token_changed = old_encrypted_device_token.is_none()
1586 || context.get_config(Config::DeviceToken).await?.as_ref() != Some(&device_token);
1587
1588 let new_encrypted_device_token;
1589 if device_token_changed {
1590 let encrypted_device_token = encrypt_device_token(&device_token)
1591 .context("Failed to encrypt device token")?;
1592
1593 let encrypted_device_token_len = encrypted_device_token.len();
1597
1598 context
1604 .set_config_internal(Config::DeviceToken, Some(&device_token))
1605 .await?;
1606 context
1607 .set_config_internal(
1608 Config::EncryptedDeviceToken,
1609 Some(&encrypted_device_token),
1610 )
1611 .await?;
1612
1613 if encrypted_device_token_len <= 4096 {
1614 new_encrypted_device_token = Some(encrypted_device_token);
1615 } else {
1616 warn!(context, "Device token is too long for LITERAL-, ignoring.");
1626 new_encrypted_device_token = None;
1627 }
1628 } else {
1629 new_encrypted_device_token = old_encrypted_device_token;
1630 }
1631
1632 if let Some(encrypted_device_token) = new_encrypted_device_token {
1635 self.run_command_and_check_ok(&format_setmetadata(
1636 "INBOX",
1637 &encrypted_device_token,
1638 ))
1639 .await
1640 .context("SETMETADATA command failed")?;
1641
1642 context.push_subscribed.store(true, Ordering::Relaxed);
1643 }
1644 } else if !context.push_subscriber.heartbeat_subscribed().await {
1645 let context = context.clone();
1646 tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
1648 }
1649
1650 Ok(())
1651 }
1652}
1653
1654fn format_setmetadata(folder: &str, device_token: &str) -> String {
1655 let device_token_len = device_token.len();
1656 format!(
1657 "SETMETADATA \"{folder}\" (/private/devicetoken {{{device_token_len}+}}\r\n{device_token})"
1658 )
1659}
1660
1661impl Session {
1662 async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
1668 if flag == "\\Deleted" {
1669 self.selected_folder_needs_expunge = true;
1670 }
1671 let query = format!("+FLAGS ({flag})");
1672 let mut responses = self
1673 .uid_store(uid_set, &query)
1674 .await
1675 .with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
1676 while let Some(_response) = responses.try_next().await? {
1677 }
1679 Ok(())
1680 }
1681}
1682
1683impl Session {
1684 fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
1693 use UnsolicitedResponse::*;
1694 use async_imap::imap_proto::Response;
1695 use async_imap::imap_proto::ResponseCode;
1696
1697 let folder = self.selected_folder.as_deref().unwrap_or_default();
1698 let mut should_refetch = false;
1699 while let Ok(response) = self.unsolicited_responses.try_recv() {
1700 match response {
1701 Exists(_) => {
1702 info!(
1703 context,
1704 "Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
1705 );
1706 should_refetch = true;
1707 }
1708
1709 Expunge(_) | Recent(_) => {}
1710 Other(ref response_data) => {
1711 match response_data.parsed() {
1712 Response::Fetch { .. } => {
1713 info!(
1714 context,
1715 "Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
1716 );
1717 should_refetch = true;
1718 }
1719
1720 Response::Done {
1723 code: Some(ResponseCode::CopyUid(_, _, _)),
1724 ..
1725 } => {}
1726
1727 _ => {
1728 info!(context, "{folder:?}: got unsolicited response {response:?}")
1729 }
1730 }
1731 }
1732 _ => {
1733 info!(context, "{folder:?}: got unsolicited response {response:?}")
1734 }
1735 }
1736 }
1737 Ok(should_refetch)
1738 }
1739}
1740
1741async fn should_move_out_of_spam(
1742 context: &Context,
1743 headers: &[mailparse::MailHeader<'_>],
1744) -> Result<bool> {
1745 if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
1746 return Ok(true);
1757 }
1758
1759 if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
1760 if msg.chat_blocked != Blocked::Not {
1761 return Ok(false);
1763 }
1764 } else {
1765 let from = match mimeparser::get_from(headers) {
1766 Some(f) => f,
1767 None => return Ok(false),
1768 };
1769 let (from_id, blocked_contact, _origin) =
1771 match from_field_to_contact_id(context, &from, None, true, true)
1772 .await
1773 .context("from_field_to_contact_id")?
1774 {
1775 Some(res) => res,
1776 None => {
1777 warn!(
1778 context,
1779 "Contact with From address {:?} cannot exist, not moving out of spam", from
1780 );
1781 return Ok(false);
1782 }
1783 };
1784 if blocked_contact {
1785 return Ok(false);
1787 }
1788
1789 if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
1790 if chat_id_blocked.blocked != Blocked::Not {
1791 return Ok(false);
1792 }
1793 } else if from_id != ContactId::SELF {
1794 return Ok(false);
1796 }
1797 }
1798
1799 Ok(true)
1800}
1801
1802async fn spam_target_folder_cfg(
1807 context: &Context,
1808 headers: &[mailparse::MailHeader<'_>],
1809) -> Result<Option<Config>> {
1810 if !should_move_out_of_spam(context, headers).await? {
1811 return Ok(None);
1812 }
1813
1814 Ok(Some(Config::ConfiguredInboxFolder))
1815}
1816
1817pub async fn target_folder_cfg(
1820 context: &Context,
1821 folder: &str,
1822 folder_meaning: FolderMeaning,
1823 headers: &[mailparse::MailHeader<'_>],
1824) -> Result<Option<Config>> {
1825 if folder == "DeltaChat" {
1826 return Ok(None);
1827 }
1828
1829 if folder_meaning == FolderMeaning::Spam {
1830 spam_target_folder_cfg(context, headers).await
1831 } else {
1832 Ok(None)
1833 }
1834}
1835
1836pub async fn target_folder(
1837 context: &Context,
1838 folder: &str,
1839 folder_meaning: FolderMeaning,
1840 headers: &[mailparse::MailHeader<'_>],
1841) -> Result<String> {
1842 match target_folder_cfg(context, folder, folder_meaning, headers).await? {
1843 Some(config) => match context.get_config(config).await? {
1844 Some(target) => Ok(target),
1845 None => Ok(folder.to_string()),
1846 },
1847 None => Ok(folder.to_string()),
1848 }
1849}
1850
1851fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
1858 const SPAM_NAMES: &[&str] = &[
1860 "spam",
1861 "junk",
1862 "Correio electrónico não solicitado",
1863 "Correo basura",
1864 "Lixo",
1865 "Nettsøppel",
1866 "Nevyžádaná pošta",
1867 "No solicitado",
1868 "Ongewenst",
1869 "Posta indesiderata",
1870 "Skräp",
1871 "Wiadomości-śmieci",
1872 "Önemsiz",
1873 "Ανεπιθύμητα",
1874 "Спам",
1875 "垃圾邮件",
1876 "垃圾郵件",
1877 "迷惑メール",
1878 "스팸",
1879 ];
1880 const TRASH_NAMES: &[&str] = &[
1881 "Trash",
1882 "Bin",
1883 "Caixote do lixo",
1884 "Cestino",
1885 "Corbeille",
1886 "Papelera",
1887 "Papierkorb",
1888 "Papirkurv",
1889 "Papperskorgen",
1890 "Prullenbak",
1891 "Rubujo",
1892 "Κάδος απορριμμάτων",
1893 "Корзина",
1894 "Кошик",
1895 "ゴミ箱",
1896 "垃圾桶",
1897 "已删除邮件",
1898 "휴지통",
1899 ];
1900 let lower = folder_name.to_lowercase();
1901
1902 if lower == "inbox" {
1903 FolderMeaning::Inbox
1904 } else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1905 FolderMeaning::Spam
1906 } else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1907 FolderMeaning::Trash
1908 } else {
1909 FolderMeaning::Unknown
1910 }
1911}
1912
1913fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
1914 for attr in folder_attrs {
1915 match attr {
1916 NameAttribute::Trash => return FolderMeaning::Trash,
1917 NameAttribute::Junk => return FolderMeaning::Spam,
1918 NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
1919 NameAttribute::Extension(label) => {
1920 match label.as_ref() {
1921 "\\Spam" => return FolderMeaning::Spam,
1922 "\\Important" => return FolderMeaning::Virtual,
1923 _ => {}
1924 };
1925 }
1926 _ => {}
1927 }
1928 }
1929 FolderMeaning::Unknown
1930}
1931
1932pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
1933 match get_folder_meaning_by_attrs(folder.attributes()) {
1934 FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
1935 meaning => meaning,
1936 }
1937}
1938
1939fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader<'_>>> {
1941 match prefetch_msg.header() {
1942 Some(header_bytes) => {
1943 let (headers, _) = mailparse::parse_headers(header_bytes)?;
1944 Ok(headers)
1945 }
1946 None => Ok(Vec::new()),
1947 }
1948}
1949
1950pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
1951 headers
1952 .get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
1953 .or_else(|| headers.get_header_value(HeaderDef::MessageId))
1954 .and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
1955}
1956
1957pub(crate) fn create_message_id() -> String {
1958 format!("{}{}", GENERATED_PREFIX, create_id())
1959}
1960
1961pub(crate) async fn prefetch_should_download(
1963 context: &Context,
1964 headers: &[mailparse::MailHeader<'_>],
1965 message_id: &str,
1966 mut flags: impl Iterator<Item = Flag<'_>>,
1967) -> Result<bool> {
1968 if message::rfc724_mid_download_tried(context, message_id).await? {
1969 if let Some(from) = mimeparser::get_from(headers)
1970 && context.is_self_addr(&from.addr).await?
1971 {
1972 markseen_on_imap_table(context, message_id).await?;
1973 }
1974 return Ok(false);
1975 }
1976
1977 let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
1981 let from = from.to_ascii_lowercase();
1982 from.contains("mailer-daemon") || from.contains("mail-daemon")
1983 } else {
1984 false
1985 };
1986
1987 let from = match mimeparser::get_from(headers) {
1988 Some(f) => f,
1989 None => return Ok(false),
1990 };
1991 let (_from_id, blocked_contact, _origin) =
1992 match from_field_to_contact_id(context, &from, None, true, true).await? {
1993 Some(res) => res,
1994 None => return Ok(false),
1995 };
1996 if flags.any(|f| f == Flag::Draft) {
2000 info!(context, "Ignoring draft message");
2001 return Ok(false);
2002 }
2003
2004 let should_download = (!blocked_contact) || maybe_ndn;
2005 Ok(should_download)
2006}
2007
2008async fn mark_seen_by_uid(
2012 context: &Context,
2013 transport_id: u32,
2014 folder: &str,
2015 uid_validity: u32,
2016 uid: u32,
2017) -> Result<Option<ChatId>> {
2018 if let Some((msg_id, chat_id)) = context
2019 .sql
2020 .query_row_optional(
2021 "SELECT id, chat_id FROM msgs
2022 WHERE id > 9 AND rfc724_mid IN (
2023 SELECT rfc724_mid FROM imap
2024 WHERE transport_id=?
2025 AND folder=?
2026 AND uidvalidity=?
2027 AND uid=?
2028 LIMIT 1
2029 )",
2030 (transport_id, &folder, uid_validity, uid),
2031 |row| {
2032 let msg_id: MsgId = row.get(0)?;
2033 let chat_id: ChatId = row.get(1)?;
2034 Ok((msg_id, chat_id))
2035 },
2036 )
2037 .await
2038 .with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
2039 {
2040 let updated = context
2041 .sql
2042 .execute(
2043 "UPDATE msgs SET state=?1
2044 WHERE (state=?2 OR state=?3)
2045 AND id=?4",
2046 (
2047 MessageState::InSeen,
2048 MessageState::InFresh,
2049 MessageState::InNoticed,
2050 msg_id,
2051 ),
2052 )
2053 .await
2054 .with_context(|| format!("failed to update msg {msg_id} state"))?
2055 > 0;
2056
2057 if updated {
2058 msg_id
2059 .start_ephemeral_timer(context)
2060 .await
2061 .with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
2062 Ok(Some(chat_id))
2063 } else {
2064 Ok(None)
2066 }
2067 } else {
2068 Ok(None)
2070 }
2071}
2072
2073pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
2076 context
2077 .sql
2078 .execute(
2079 "INSERT OR IGNORE INTO imap_markseen (id)
2080 SELECT id FROM imap WHERE rfc724_mid=?",
2081 (message_id,),
2082 )
2083 .await?;
2084 context.scheduler.interrupt_inbox().await;
2085
2086 Ok(())
2087}
2088
2089pub(crate) async fn set_uid_next(
2093 context: &Context,
2094 transport_id: u32,
2095 folder: &str,
2096 uid_next: u32,
2097) -> Result<()> {
2098 context
2099 .sql
2100 .execute(
2101 "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
2102 ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
2103 (transport_id, folder, uid_next),
2104 )
2105 .await?;
2106 Ok(())
2107}
2108
2109async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2115 Ok(context
2116 .sql
2117 .query_get_value(
2118 "SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
2119 (transport_id, folder),
2120 )
2121 .await?
2122 .unwrap_or(0))
2123}
2124
2125pub(crate) async fn set_uidvalidity(
2126 context: &Context,
2127 transport_id: u32,
2128 folder: &str,
2129 uidvalidity: u32,
2130) -> Result<()> {
2131 context
2132 .sql
2133 .execute(
2134 "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
2135 ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
2136 (transport_id, folder, uidvalidity),
2137 )
2138 .await?;
2139 Ok(())
2140}
2141
2142async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2143 Ok(context
2144 .sql
2145 .query_get_value(
2146 "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
2147 (transport_id, folder),
2148 )
2149 .await?
2150 .unwrap_or(0))
2151}
2152
2153pub(crate) async fn set_modseq(
2154 context: &Context,
2155 transport_id: u32,
2156 folder: &str,
2157 modseq: u64,
2158) -> Result<()> {
2159 context
2160 .sql
2161 .execute(
2162 "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
2163 ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
2164 (transport_id, folder, modseq),
2165 )
2166 .await?;
2167 Ok(())
2168}
2169
2170async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
2171 Ok(context
2172 .sql
2173 .query_get_value(
2174 "SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
2175 (transport_id, folder),
2176 )
2177 .await?
2178 .unwrap_or(0))
2179}
2180
2181#[expect(clippy::arithmetic_side_effects)]
2185fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
2186 let mut ranges: Vec<UidRange> = vec![];
2188
2189 for ¤t in uids {
2190 if let Some(last) = ranges.last_mut()
2191 && last.end + 1 == current
2192 {
2193 last.end = current;
2194 continue;
2195 }
2196
2197 ranges.push(UidRange {
2198 start: current,
2199 end: current,
2200 });
2201 }
2202
2203 let mut result = vec![];
2205 let (mut last_uids, mut last_str) = (Vec::new(), String::new());
2206 for range in ranges {
2207 last_uids.reserve((range.end - range.start + 1).try_into()?);
2208 (range.start..=range.end).for_each(|u| last_uids.push(u));
2209 if !last_str.is_empty() {
2210 last_str.push(',');
2211 }
2212 last_str.push_str(&range.to_string());
2213
2214 if last_str.len() > 990 {
2215 result.push((take(&mut last_uids), take(&mut last_str)));
2216 }
2217 }
2218 result.push((last_uids, last_str));
2219
2220 result.retain(|(_, s)| !s.is_empty());
2221 Ok(result)
2222}
2223
2224struct UidRange {
2225 start: u32,
2226 end: u32,
2227 }
2229
2230impl std::fmt::Display for UidRange {
2231 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2232 if self.start == self.end {
2233 write!(f, "{}", self.start)
2234 } else {
2235 write!(f, "{}:{}", self.start, self.end)
2236 }
2237 }
2238}
2239
2240#[cfg(test)]
2241mod imap_tests;