1use std::{
7 cmp::max,
8 cmp::min,
9 collections::{BTreeMap, BTreeSet, HashMap},
10 iter::Peekable,
11 mem::take,
12 sync::atomic::Ordering,
13 time::{Duration, UNIX_EPOCH},
14};
15
16use anyhow::{Context as _, Result, bail, ensure, format_err};
17use async_channel::{self, Receiver, Sender};
18use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
19use futures::{FutureExt as _, TryStreamExt};
20use futures_lite::FutureExt;
21use ratelimit::Ratelimit;
22use url::Url;
23
24use crate::calls::{
25 UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata,
26};
27use crate::chat::{self, ChatId, ChatIdBlocked, add_device_msg};
28use crate::chatlist_events;
29use crate::config::Config;
30use crate::constants::{Blocked, DC_VERSION_STR};
31use crate::contact::ContactId;
32use crate::context::Context;
33use crate::ensure_and_debug_assert;
34use crate::events::EventType;
35use crate::headerdef::{HeaderDef, HeaderDefMap};
36use crate::log::{LogExt, warn};
37use crate::message::{self, Message, MessageState, MsgId};
38use crate::mimeparser;
39use crate::net::proxy::ProxyConfig;
40use crate::net::session::SessionStream;
41use crate::oauth2::get_oauth2_access_token;
42use crate::push::encrypt_device_token;
43use crate::receive_imf::{
44 ReceivedMsg, from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner,
45};
46use crate::scheduler::connectivity::ConnectivityStore;
47use crate::stock_str;
48use crate::tools::{self, create_id, duration_to_str, time};
49use crate::transport::{
50 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
51};
52
53pub(crate) mod capabilities;
54mod client;
55mod idle;
56pub mod select_folder;
57pub(crate) mod session;
58
59use client::{Client, determine_capabilities};
60use session::Session;
61
62pub(crate) const GENERATED_PREFIX: &str = "GEN_";
63
64const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
65 MESSAGE-ID \
66 X-MICROSOFT-ORIGINAL-MESSAGE-ID\
67 )])";
68const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
69
70#[derive(Debug)]
71pub(crate) struct Imap {
72 transport_id: u32,
76
77 pub(crate) idle_interrupt_receiver: Receiver<()>,
78
79 pub(crate) addr: String,
81
82 lp: Vec<ConfiguredServerLoginParam>,
84
85 password: String,
87
88 proxy_config: Option<ProxyConfig>,
90
91 strict_tls: bool,
92
93 oauth2: bool,
94
95 pub(crate) folder: String,
97
98 authentication_failed_once: bool,
99
100 pub(crate) connectivity: ConnectivityStore,
101
102 conn_last_try: tools::Time,
103 conn_backoff_ms: u64,
104
105 ratelimit: Ratelimit,
113
114 pub(crate) resync_request_sender: async_channel::Sender<()>,
116
117 pub(crate) resync_request_receiver: async_channel::Receiver<()>,
119}
120
121#[derive(Debug)]
122struct OAuth2 {
123 user: String,
124 access_token: String,
125}
126
127#[derive(Debug, Default)]
128pub(crate) struct ServerMetadata {
129 pub comment: Option<String>,
132
133 pub admin: Option<String>,
136
137 pub iroh_relay: Option<Url>,
138
139 pub ice_servers: Vec<UnresolvedIceServer>,
141
142 pub ice_servers_expiration_timestamp: i64,
149}
150
151impl async_imap::Authenticator for OAuth2 {
152 type Response = String;
153
154 fn process(&mut self, _data: &[u8]) -> Self::Response {
155 format!(
156 "user={}\x01auth=Bearer {}\x01\x01",
157 self.user, self.access_token
158 )
159 }
160}
161
162#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
163pub enum FolderMeaning {
164 Unknown,
165
166 Spam,
168 Inbox,
169 Trash,
170
171 Virtual,
178}
179
180struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
181 inner: Peekable<T>,
182}
183
184impl<T, I> From<I> for UidGrouper<T>
185where
186 T: Iterator<Item = (i64, u32, String)>,
187 I: IntoIterator<IntoIter = T>,
188{
189 fn from(inner: I) -> Self {
190 Self {
191 inner: inner.into_iter().peekable(),
192 }
193 }
194}
195
196impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
197 type Item = (String, Vec<i64>, String);
199
200 #[expect(clippy::arithmetic_side_effects)]
201 fn next(&mut self) -> Option<Self::Item> {
202 let (_, _, folder) = self.inner.peek().cloned()?;
203
204 let mut uid_set = String::new();
205 let mut rowid_set = Vec::new();
206
207 while uid_set.len() < 1000 {
208 if let Some((start_rowid, start_uid, _)) = self
210 .inner
211 .next_if(|(_, _, start_folder)| start_folder == &folder)
212 {
213 rowid_set.push(start_rowid);
214 let mut end_uid = start_uid;
215
216 while let Some((next_rowid, next_uid, _)) =
217 self.inner.next_if(|(_, next_uid, next_folder)| {
218 next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
219 })
220 {
221 end_uid = next_uid;
222 rowid_set.push(next_rowid);
223 }
224
225 let uid_range = UidRange {
226 start: start_uid,
227 end: end_uid,
228 };
229 if !uid_set.is_empty() {
230 uid_set.push(',');
231 }
232 uid_set.push_str(&uid_range.to_string());
233 } else {
234 break;
235 }
236 }
237
238 Some((folder, rowid_set, uid_set))
239 }
240}
241
242impl Imap {
243 pub async fn new(
245 context: &Context,
246 transport_id: u32,
247 param: ConfiguredLoginParam,
248 idle_interrupt_receiver: Receiver<()>,
249 ) -> Result<Self> {
250 let lp = param.imap.clone();
251 let password = param.imap_password.clone();
252 let proxy_config = ProxyConfig::load(context).await?;
253 let addr = ¶m.addr;
254 let strict_tls = param.strict_tls(proxy_config.is_some());
255 let oauth2 = param.oauth2;
256 let folder = param
257 .imap_folder
258 .clone()
259 .unwrap_or_else(|| "INBOX".to_string());
260 ensure_and_debug_assert!(!folder.is_empty(), "Watched folder name cannot be empty");
261 let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
262 Ok(Imap {
263 transport_id,
264 idle_interrupt_receiver,
265 addr: addr.to_string(),
266 lp,
267 password,
268 proxy_config,
269 strict_tls,
270 oauth2,
271 folder,
272 authentication_failed_once: false,
273 connectivity: Default::default(),
274 conn_last_try: UNIX_EPOCH,
275 conn_backoff_ms: 0,
276 ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
278 resync_request_sender,
279 resync_request_receiver,
280 })
281 }
282
283 pub async fn new_configured(
285 context: &Context,
286 idle_interrupt_receiver: Receiver<()>,
287 ) -> Result<Self> {
288 let (transport_id, param) = ConfiguredLoginParam::load(context)
289 .await?
290 .context("Not configured")?;
291 let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
292 Ok(imap)
293 }
294
295 pub fn transport_id(&self) -> u32 {
297 self.transport_id
298 }
299
300 pub(crate) async fn connect(
306 &mut self,
307 context: &Context,
308 configuring: bool,
309 ) -> Result<Session> {
310 let now = tools::Time::now();
311 let until_can_send = max(
312 min(self.conn_last_try, now)
313 .checked_add(Duration::from_millis(self.conn_backoff_ms))
314 .unwrap_or(now),
315 now,
316 )
317 .duration_since(now)?;
318 let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
319 if !ratelimit_duration.is_zero() {
320 warn!(
321 context,
322 "IMAP got rate limited, waiting for {} until can connect.",
323 duration_to_str(ratelimit_duration),
324 );
325 let interrupted = async {
326 tokio::time::sleep(ratelimit_duration).await;
327 false
328 }
329 .race(self.idle_interrupt_receiver.recv().map(|_| true))
330 .await;
331 if interrupted {
332 info!(
333 context,
334 "Connecting to IMAP without waiting for ratelimit due to interrupt."
335 );
336 }
337 }
338
339 info!(context, "Connecting to IMAP server.");
340 self.connectivity.set_connecting(context);
341
342 self.conn_last_try = tools::Time::now();
343 const BACKOFF_MIN_MS: u64 = 2000;
344 const BACKOFF_MAX_MS: u64 = 80_000;
345 self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
346 self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(rand::random_range(
347 (self.conn_backoff_ms / 2)..=self.conn_backoff_ms,
348 ));
349 self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
350
351 let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
352 let mut first_error = None;
353 for lp in login_params {
354 info!(context, "IMAP trying to connect to {}.", &lp.connection);
355 let connection_candidate = lp.connection.clone();
356 let client = match Client::connect(
357 context,
358 self.proxy_config.clone(),
359 self.strict_tls,
360 &connection_candidate,
361 )
362 .await
363 .with_context(|| format!("IMAP failed to connect to {connection_candidate}"))
364 {
365 Ok(client) => client,
366 Err(err) => {
367 warn!(context, "{err:#}.");
368 first_error.get_or_insert(err);
369 continue;
370 }
371 };
372
373 self.conn_backoff_ms = BACKOFF_MIN_MS;
374 self.ratelimit.send();
375
376 let imap_user: &str = lp.user.as_ref();
377 let imap_pw: &str = &self.password;
378
379 let login_res = if self.oauth2 {
380 info!(context, "Logging into IMAP server with OAuth 2.");
381 let addr: &str = self.addr.as_ref();
382
383 let token = get_oauth2_access_token(context, addr, imap_pw, true)
384 .await?
385 .context("IMAP could not get OAUTH token")?;
386 let auth = OAuth2 {
387 user: imap_user.into(),
388 access_token: token,
389 };
390 client.authenticate("XOAUTH2", auth).await
391 } else {
392 info!(context, "Logging into IMAP server with LOGIN.");
393 client.login(imap_user, imap_pw).await
394 };
395
396 match login_res {
397 Ok(mut session) => {
398 let capabilities = determine_capabilities(&mut session).await?;
399 let resync_request_sender = self.resync_request_sender.clone();
400
401 let session = if capabilities.can_compress {
402 info!(context, "Enabling IMAP compression.");
403 let compressed_session = session
404 .compress(|s| {
405 let session_stream: Box<dyn SessionStream> = Box::new(s);
406 session_stream
407 })
408 .await
409 .context("Failed to enable IMAP compression")?;
410 Session::new(
411 compressed_session,
412 capabilities,
413 resync_request_sender,
414 self.transport_id,
415 )
416 } else {
417 Session::new(
418 session,
419 capabilities,
420 resync_request_sender,
421 self.transport_id,
422 )
423 };
424
425 let mut lock = context.server_id.write().await;
427 lock.clone_from(&session.capabilities.server_id);
428
429 self.authentication_failed_once = false;
430 context.emit_event(EventType::ImapConnected(format!(
431 "IMAP-LOGIN as {}",
432 lp.user
433 )));
434 self.connectivity.set_preparing(context);
435 info!(context, "Successfully logged into IMAP server.");
436 return Ok(session);
437 }
438
439 Err(err) => {
440 let imap_user = lp.user.to_owned();
441 let message = stock_str::cannot_login(context, &imap_user);
442
443 warn!(context, "IMAP failed to login: {err:#}.");
444 first_error.get_or_insert(format_err!("{message} ({err:#})"));
445
446 let _lock = context.wrong_pw_warning_mutex.lock().await;
448 if err.to_string().to_lowercase().contains("authentication") {
449 if self.authentication_failed_once
450 && !configuring
451 && context.get_config_bool(Config::NotifyAboutWrongPw).await?
452 {
453 let mut msg = Message::new_text(message);
454 if let Err(e) = chat::add_device_msg_with_importance(
455 context,
456 None,
457 Some(&mut msg),
458 true,
459 )
460 .await
461 {
462 warn!(context, "Failed to add device message: {e:#}.");
463 } else {
464 context
465 .set_config_internal(Config::NotifyAboutWrongPw, None)
466 .await
467 .log_err(context)
468 .ok();
469 }
470 } else {
471 self.authentication_failed_once = true;
472 }
473 } else {
474 self.authentication_failed_once = false;
475 }
476 }
477 }
478 }
479
480 Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
481 }
482
483 pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
488 let configuring = false;
489 let session = match self.connect(context, configuring).await {
490 Ok(session) => session,
491 Err(err) => {
492 self.connectivity.set_err(context, format!("{err:#}"));
493 return Err(err);
494 }
495 };
496
497 Ok(session)
498 }
499
500 pub async fn fetch_move_delete(
505 &mut self,
506 context: &Context,
507 session: &mut Session,
508 watch_folder: &str,
509 ) -> Result<()> {
510 ensure_and_debug_assert!(!watch_folder.is_empty(), "Watched folder cannot be empty");
511 if !context.sql.is_open().await {
512 bail!("IMAP operation attempted while it is torn down");
514 }
515
516 let msgs_fetched = self
517 .fetch_new_messages(context, session, watch_folder)
518 .await
519 .context("fetch_new_messages")?;
520 if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
521 context.scheduler.interrupt_ephemeral_task().await;
526 }
527
528 session
529 .move_delete_messages(context, watch_folder)
530 .await
531 .context("move_delete_messages")?;
532
533 Ok(())
534 }
535
536 #[expect(clippy::arithmetic_side_effects)]
540 pub(crate) async fn fetch_new_messages(
541 &mut self,
542 context: &Context,
543 session: &mut Session,
544 folder: &str,
545 ) -> Result<bool> {
546 let transport_id = session.transport_id();
547
548 let folder_exists = session
549 .select_with_uidvalidity(context, folder)
550 .await
551 .with_context(|| format!("Failed to select folder {folder:?}"))?;
552
553 if !session.new_mail {
554 info!(
555 context,
556 "Transport {transport_id}: No new emails in folder {folder:?}."
557 );
558 return Ok(false);
559 }
560 session.new_mail = false;
563
564 if !folder_exists {
565 return Ok(false);
566 }
567
568 let mut read_cnt = 0;
569 loop {
570 let (n, fetch_more) =
571 Box::pin(self.fetch_new_msg_batch(context, session, folder)).await?;
572 read_cnt += n;
573 if !fetch_more {
574 return Ok(read_cnt > 0);
575 }
576 }
577 }
578
579 #[expect(clippy::arithmetic_side_effects)]
581 async fn fetch_new_msg_batch(
582 &mut self,
583 context: &Context,
584 session: &mut Session,
585 folder: &str,
586 ) -> Result<(usize, bool)> {
587 let transport_id = self.transport_id;
588 let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
589 let old_uid_next = get_uid_next(context, transport_id, folder).await?;
590 info!(
591 context,
592 "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
593 );
594
595 let uids_to_prefetch = 500;
596 let msgs = session
597 .prefetch(old_uid_next, uids_to_prefetch)
598 .await
599 .context("prefetch")?;
600 let read_cnt = msgs.len();
601 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
602
603 let mut uids_fetch: Vec<u32> = Vec::new();
604 let mut available_post_msgs: Vec<String> = Vec::new();
605 let mut download_later: Vec<String> = Vec::new();
606 let mut uid_message_ids = BTreeMap::new();
607 let mut largest_uid_skipped = None;
608
609 let download_limit: Option<u32> = context
610 .get_config_parsed(Config::DownloadLimit)
611 .await?
612 .filter(|&l| 0 < l);
613
614 for (uid, ref fetch_response) in msgs {
616 let headers = match get_fetch_headers(fetch_response) {
617 Ok(headers) => headers,
618 Err(err) => {
619 warn!(context, "Failed to parse FETCH headers: {err:#}.");
620 continue;
621 }
622 };
623
624 let message_id = prefetch_get_message_id(&headers);
625 let size = fetch_response
626 .size
627 .context("imap fetch response does not contain size")?;
628
629 let delete = if let Some(message_id) = &message_id {
640 message::rfc724_mid_exists_ex(context, message_id, "deleted=1")
641 .await?
642 .is_some_and(|(_msg_id, deleted)| deleted)
643 } else {
644 false
645 };
646
647 let message_id = message_id.unwrap_or_else(create_message_id);
650
651 if delete {
652 info!(context, "Deleting locally deleted message {message_id}.");
653 }
654
655 let target = if delete { "" } else { folder };
656
657 context
658 .sql
659 .execute(
660 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
661 VALUES (?, ?, ?, ?, ?, ?)
662 ON CONFLICT(transport_id, folder, uid, uidvalidity)
663 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
664 target=excluded.target",
665 (
666 self.transport_id,
667 &message_id,
668 &folder,
669 uid,
670 uid_validity,
671 target,
672 ),
673 )
674 .await?;
675
676 if folder == target
683 && prefetch_should_download(context, &headers, &message_id, fetch_response.flags())
684 .await
685 .context("prefetch_should_download")?
686 {
687 if headers
688 .get_header_value(HeaderDef::ChatIsPostMessage)
689 .is_some()
690 {
691 info!(context, "{message_id:?} is a post-message.");
692 available_post_msgs.push(message_id.clone());
693
694 let is_bot = context.get_config_bool(Config::Bot).await?;
695 if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
696 {
697 uids_fetch.push(uid);
698 uid_message_ids.insert(uid, message_id);
699 } else {
700 if download_limit.is_none_or(|download_limit| size <= download_limit) {
701 download_later.push(message_id.clone());
704 }
705 largest_uid_skipped = Some(uid);
706 }
707 } else {
708 info!(context, "{message_id:?} is not a post-message.");
709 if download_limit.is_none_or(|download_limit| size <= download_limit) {
710 uids_fetch.push(uid);
711 uid_message_ids.insert(uid, message_id);
712 } else {
713 download_later.push(message_id.clone());
714 largest_uid_skipped = Some(uid);
715 }
716 };
717 } else {
718 largest_uid_skipped = Some(uid);
719 }
720 }
721
722 if !uids_fetch.is_empty() {
723 self.connectivity.set_working(context);
724 }
725
726 let (sender, receiver) = async_channel::unbounded();
727
728 let mut received_msgs = Vec::with_capacity(uids_fetch.len());
729 let mailbox_uid_next = session
730 .selected_mailbox
731 .as_ref()
732 .with_context(|| format!("Expected {folder:?} to be selected"))?
733 .uid_next
734 .unwrap_or_default();
735
736 let update_uids_future = async {
737 let mut largest_uid_fetched: u32 = 0;
738
739 while let Ok((uid, received_msg_opt)) = receiver.recv().await {
740 largest_uid_fetched = max(largest_uid_fetched, uid);
741 if let Some(received_msg) = received_msg_opt {
742 received_msgs.push(received_msg)
743 }
744 }
745
746 largest_uid_fetched
747 };
748
749 let actually_download_messages_future = async {
750 session
751 .fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
752 .await
753 .context("fetch_many_msgs")
754 };
755
756 let (largest_uid_fetched, fetch_res) =
757 tokio::join!(update_uids_future, actually_download_messages_future);
758
759 let mut new_uid_next = largest_uid_fetched + 1;
765 let fetch_more = fetch_res.is_ok() && {
766 let prefetch_uid_next = old_uid_next + uids_to_prefetch;
767 new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
771
772 new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
773
774 prefetch_uid_next < mailbox_uid_next
775 };
776 if new_uid_next > old_uid_next {
777 set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
778 }
779
780 info!(context, "{} mails read from \"{}\".", read_cnt, folder);
781
782 if !received_msgs.is_empty() {
783 context.emit_event(EventType::IncomingMsgBunch);
784 }
785
786 chat::mark_old_messages_as_noticed(context, received_msgs).await?;
787
788 if fetch_res.is_ok() {
789 info!(
790 context,
791 "available_post_msgs: {}, download_later: {}.",
792 available_post_msgs.len(),
793 download_later.len(),
794 );
795 let trans_fn = |t: &mut rusqlite::Transaction| {
796 let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
797 for rfc724_mid in available_post_msgs {
798 stmt.execute((rfc724_mid,))
799 .context("INSERT OR IGNORE INTO available_post_msgs")?;
800 }
801 let mut stmt =
802 t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
803 for rfc724_mid in download_later {
804 stmt.execute((rfc724_mid,))
805 .context("INSERT OR IGNORE INTO download")?;
806 }
807 Ok(())
808 };
809 context.sql.transaction(trans_fn).await?;
810 }
811
812 fetch_res?;
815
816 Ok((read_cnt, fetch_more))
817 }
818}
819
820impl Session {
821 pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
823 let all_folders = self
824 .list_folders()
825 .await
826 .context("listing folders for resync")?;
827 for folder in all_folders {
828 let folder_meaning = get_folder_meaning(&folder);
829 if !matches!(
830 folder_meaning,
831 FolderMeaning::Virtual | FolderMeaning::Unknown
832 ) {
833 self.resync_folder_uids(context, folder.name(), folder_meaning)
834 .await?;
835 }
836 }
837 Ok(())
838 }
839
840 pub(crate) async fn resync_folder_uids(
847 &mut self,
848 context: &Context,
849 folder: &str,
850 folder_meaning: FolderMeaning,
851 ) -> Result<()> {
852 let uid_validity;
853 let mut msgs = BTreeMap::new();
855
856 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
857 let transport_id = self.transport_id();
858 if folder_exists {
859 let mut list = self
860 .uid_fetch("1:*", RFC724MID_UID)
861 .await
862 .with_context(|| format!("Can't resync folder {folder}"))?;
863 while let Some(fetch) = list.try_next().await? {
864 let headers = match get_fetch_headers(&fetch) {
865 Ok(headers) => headers,
866 Err(err) => {
867 warn!(context, "Failed to parse FETCH headers: {}", err);
868 continue;
869 }
870 };
871 let message_id = prefetch_get_message_id(&headers);
872
873 if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
874 msgs.insert(
875 uid,
876 (
877 rfc724_mid,
878 target_folder(context, folder, folder_meaning, &headers).await?,
879 ),
880 );
881 }
882 }
883
884 info!(
885 context,
886 "resync_folder_uids: Collected {} message IDs in {folder}.",
887 msgs.len(),
888 );
889
890 uid_validity = get_uidvalidity(context, transport_id, folder).await?;
891 } else {
892 warn!(context, "resync_folder_uids: No folder {folder}.");
893 uid_validity = 0;
894 }
895
896 context
898 .sql
899 .transaction(move |transaction| {
900 transaction.execute("DELETE FROM imap WHERE transport_id=? AND folder=?", (transport_id, folder,))?;
901 for (uid, (rfc724_mid, target)) in &msgs {
902 transaction.execute(
905 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
906 VALUES (?, ?, ?, ?, ?, ?)
907 ON CONFLICT(transport_id, folder, uid, uidvalidity)
908 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
909 target=excluded.target",
910 (transport_id, rfc724_mid, folder, uid, uid_validity, target),
911 )?;
912 }
913 Ok(())
914 })
915 .await?;
916 Ok(())
917 }
918
919 async fn delete_message_batch(
922 &mut self,
923 context: &Context,
924 uid_set: &str,
925 row_ids: Vec<i64>,
926 ) -> Result<()> {
927 self.add_flag_finalized_with_set(uid_set, "\\Deleted")
929 .await?;
930 context
931 .sql
932 .transaction(|transaction| {
933 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
934 for row_id in row_ids {
935 stmt.execute((row_id,))?;
936 }
937 Ok(())
938 })
939 .await
940 .context("Cannot remove deleted messages from imap table")?;
941
942 context.emit_event(EventType::ImapMessageDeleted(format!(
943 "IMAP messages {uid_set} marked as deleted"
944 )));
945 Ok(())
946 }
947
948 pub(crate) async fn delete_all_messages(
950 &mut self,
951 context: &Context,
952 folder: &str,
953 ) -> Result<()> {
954 let transport_id = self.transport_id();
955
956 if self.select_with_uidvalidity(context, folder).await? {
957 self.add_flag_finalized_with_set("1:*", "\\Deleted").await?;
958 self.selected_folder_needs_expunge = true;
959 context
960 .sql
961 .execute(
962 "DELETE FROM imap WHERE transport_id=? AND folder=?",
963 (transport_id, folder),
964 )
965 .await?;
966 }
967
968 Ok(())
969 }
970
971 async fn move_message_batch(
974 &mut self,
975 context: &Context,
976 set: &str,
977 row_ids: Vec<i64>,
978 target: &str,
979 ) -> Result<()> {
980 if self.can_move() {
981 match self.uid_mv(set, &target).await {
982 Ok(()) => {
983 context
985 .sql
986 .transaction(|transaction| {
987 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
988 for row_id in row_ids {
989 stmt.execute((row_id,))?;
990 }
991 Ok(())
992 })
993 .await
994 .context("Cannot delete moved messages from imap table")?;
995 context.emit_event(EventType::ImapMessageMoved(format!(
996 "IMAP messages {set} moved to {target}"
997 )));
998 return Ok(());
999 }
1000 Err(err) => {
1001 warn!(
1002 context,
1003 "Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
1004 set,
1005 target,
1006 err
1007 );
1008 }
1009 }
1010 }
1011
1012 info!(
1015 context,
1016 "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
1017 );
1018 self.uid_copy(&set, &target).await?;
1019 context
1020 .sql
1021 .transaction(|transaction| {
1022 let mut stmt = transaction.prepare("UPDATE imap SET target='' WHERE id = ?")?;
1023 for row_id in row_ids {
1024 stmt.execute((row_id,))?;
1025 }
1026 Ok(())
1027 })
1028 .await
1029 .context("Cannot plan deletion of messages")?;
1030 context.emit_event(EventType::ImapMessageMoved(format!(
1031 "IMAP messages {set} copied to {target}"
1032 )));
1033 Ok(())
1034 }
1035
1036 async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
1040 let transport_id = self.transport_id();
1041 let rows = context
1042 .sql
1043 .query_map_vec(
1044 "SELECT id, uid, target FROM imap
1045 WHERE folder = ?
1046 AND transport_id = ?
1047 AND target != folder
1048 ORDER BY target, uid",
1049 (folder, transport_id),
1050 |row| {
1051 let rowid: i64 = row.get(0)?;
1052 let uid: u32 = row.get(1)?;
1053 let target: String = row.get(2)?;
1054 Ok((rowid, uid, target))
1055 },
1056 )
1057 .await?;
1058
1059 for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
1060 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1065 ensure!(folder_exists, "No folder {folder}");
1066
1067 if target.is_empty() {
1069 self.delete_message_batch(context, &uid_set, rowid_set)
1070 .await
1071 .with_context(|| format!("cannot delete batch of messages {:?}", &uid_set))?;
1072 } else {
1073 self.move_message_batch(context, &uid_set, rowid_set, &target)
1074 .await
1075 .with_context(|| {
1076 format!(
1077 "cannot move batch of messages {:?} to folder {:?}",
1078 &uid_set, target
1079 )
1080 })?;
1081 }
1082 }
1083
1084 if let Err(err) = self.maybe_close_folder(context).await {
1087 warn!(context, "Failed to close folder: {err:#}.");
1088 }
1089
1090 Ok(())
1091 }
1092
1093 pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
1095 if context.get_config_bool(Config::TeamProfile).await? {
1096 return Ok(());
1097 }
1098
1099 context
1100 .sql
1101 .execute(
1102 "DELETE FROM imap_markseen WHERE id NOT IN (SELECT imap.id FROM imap)",
1103 (),
1104 )
1105 .await?;
1106
1107 let transport_id = self.transport_id();
1108 let mut rows = context
1109 .sql
1110 .query_map_vec(
1111 "SELECT imap.id, uid, folder FROM imap, imap_markseen
1112 WHERE imap.id = imap_markseen.id
1113 AND imap.transport_id=?
1114 AND target = folder",
1115 (transport_id,),
1116 |row| {
1117 let rowid: i64 = row.get(0)?;
1118 let uid: u32 = row.get(1)?;
1119 let folder: String = row.get(2)?;
1120 Ok((rowid, uid, folder))
1121 },
1122 )
1123 .await?;
1124
1125 rows.sort_unstable_by(|(_rowid1, uid1, folder1), (_rowid2, uid2, folder2)| {
1132 (folder1, uid1).cmp(&(folder2, uid2))
1133 });
1134
1135 for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
1136 let folder_exists = match self.select_with_uidvalidity(context, &folder).await {
1137 Err(err) => {
1138 warn!(
1139 context,
1140 "store_seen_flags_on_imap: Failed to select {folder}, will retry later: {err:#}."
1141 );
1142 continue;
1143 }
1144 Ok(folder_exists) => folder_exists,
1145 };
1146 if !folder_exists {
1147 warn!(context, "store_seen_flags_on_imap: No folder {folder}.");
1148 } else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
1149 warn!(
1150 context,
1151 "Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}."
1152 );
1153 continue;
1154 } else {
1155 info!(
1156 context,
1157 "Marked messages {} in folder {} as seen.", uid_set, folder
1158 );
1159 }
1160 context
1161 .sql
1162 .transaction(|transaction| {
1163 let mut stmt = transaction.prepare("DELETE FROM imap_markseen WHERE id = ?")?;
1164 for rowid in rowid_set {
1165 stmt.execute((rowid,))?;
1166 }
1167 Ok(())
1168 })
1169 .await
1170 .context("Cannot remove messages marked as seen from imap_markseen table")?;
1171 }
1172
1173 Ok(())
1174 }
1175
1176 pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
1178 if !self.can_condstore() {
1179 info!(
1180 context,
1181 "Server does not support CONDSTORE, skipping flag synchronization."
1182 );
1183 return Ok(());
1184 }
1185
1186 if context.get_config_bool(Config::TeamProfile).await? {
1187 return Ok(());
1188 }
1189
1190 let folder_exists = self
1191 .select_with_uidvalidity(context, folder)
1192 .await
1193 .context("Failed to select folder")?;
1194 if !folder_exists {
1195 return Ok(());
1196 }
1197
1198 let mailbox = self
1199 .selected_mailbox
1200 .as_ref()
1201 .with_context(|| format!("No mailbox selected, folder: {folder}"))?;
1202
1203 if mailbox.highest_modseq.is_none() {
1206 info!(
1207 context,
1208 "Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
1209 );
1210 return Ok(());
1211 }
1212
1213 let transport_id = self.transport_id();
1214 let mut updated_chat_ids = BTreeSet::new();
1215 let uid_validity = get_uidvalidity(context, transport_id, folder)
1216 .await
1217 .with_context(|| format!("failed to get UID validity for folder {folder}"))?;
1218 let mut highest_modseq = get_modseq(context, transport_id, folder)
1219 .await
1220 .with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
1221 let mut list = self
1222 .uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
1223 .await
1224 .context("failed to fetch flags")?;
1225
1226 let mut got_unsolicited_fetch = false;
1227
1228 while let Some(fetch) = list
1229 .try_next()
1230 .await
1231 .context("failed to get FETCH result")?
1232 {
1233 let uid = if let Some(uid) = fetch.uid {
1234 uid
1235 } else {
1236 info!(context, "FETCH result contains no UID, skipping");
1237 got_unsolicited_fetch = true;
1238 continue;
1239 };
1240 let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
1241 if is_seen
1242 && let Some(chat_id) = mark_seen_by_uid(context, transport_id, folder, uid_validity, uid)
1243 .await
1244 .with_context(|| {
1245 format!("Transport {transport_id}: Failed to update seen status for msg {folder}/{uid}")
1246 })?
1247 {
1248 updated_chat_ids.insert(chat_id);
1249 }
1250
1251 if let Some(modseq) = fetch.modseq {
1252 if modseq > highest_modseq {
1253 highest_modseq = modseq;
1254 }
1255 } else {
1256 warn!(context, "FETCH result contains no MODSEQ");
1257 }
1258 }
1259 drop(list);
1260
1261 if got_unsolicited_fetch {
1262 info!(context, "Got unsolicited fetch, will skip idle");
1267 self.new_mail = true;
1268 }
1269
1270 set_modseq(context, transport_id, folder, highest_modseq)
1271 .await
1272 .with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
1273 if !updated_chat_ids.is_empty() {
1274 context.on_archived_chats_maybe_noticed();
1275 }
1276 for updated_chat_id in updated_chat_ids {
1277 context.emit_event(EventType::MsgsNoticed(updated_chat_id));
1278 chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
1279 }
1280
1281 Ok(())
1282 }
1283
1284 #[expect(clippy::arithmetic_side_effects)]
1299 pub(crate) async fn fetch_many_msgs(
1300 &mut self,
1301 context: &Context,
1302 folder: &str,
1303 request_uids: Vec<u32>,
1304 uid_message_ids: &BTreeMap<u32, String>,
1305 received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1306 ) -> Result<()> {
1307 if request_uids.is_empty() {
1308 return Ok(());
1309 }
1310
1311 for (request_uids, set) in build_sequence_sets(&request_uids)? {
1312 info!(context, "Starting UID FETCH of message set \"{}\".", set);
1313 let mut fetch_responses = self.uid_fetch(&set, BODY_FULL).await.with_context(|| {
1314 format!("fetching messages {} from folder \"{}\"", &set, folder)
1315 })?;
1316
1317 let mut uid_msgs = HashMap::with_capacity(request_uids.len());
1320
1321 let mut count = 0;
1322 for &request_uid in &request_uids {
1323 let mut fetch_response = uid_msgs.remove(&request_uid);
1325
1326 while fetch_response.is_none() {
1328 let Some(next_fetch_response) = fetch_responses
1329 .try_next()
1330 .await
1331 .context("Failed to process IMAP FETCH result")?
1332 else {
1333 break;
1335 };
1336
1337 if let Some(next_uid) = next_fetch_response.uid {
1338 if next_uid == request_uid {
1339 fetch_response = Some(next_fetch_response);
1340 } else if !request_uids.contains(&next_uid) {
1341 info!(
1348 context,
1349 "Skipping not requested FETCH response for UID {}.", next_uid
1350 );
1351 } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
1352 warn!(context, "Got duplicated UID {}.", next_uid);
1353 }
1354 } else {
1355 info!(context, "Skipping FETCH response without UID.");
1356 }
1357 }
1358
1359 let fetch_response = match fetch_response {
1360 Some(fetch) => fetch,
1361 None => {
1362 warn!(
1363 context,
1364 "Missed UID {} in the server response.", request_uid
1365 );
1366 continue;
1367 }
1368 };
1369 count += 1;
1370
1371 let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
1372 let body = fetch_response.body();
1373
1374 if is_deleted {
1375 info!(context, "Not processing deleted msg {}.", request_uid);
1376 received_msgs_channel.send((request_uid, None)).await?;
1377 continue;
1378 }
1379
1380 let body = if let Some(body) = body {
1381 body
1382 } else {
1383 info!(
1384 context,
1385 "Not processing message {} without a BODY.", request_uid
1386 );
1387 received_msgs_channel.send((request_uid, None)).await?;
1388 continue;
1389 };
1390
1391 let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
1392
1393 let Some(rfc724_mid) = uid_message_ids.get(&request_uid) else {
1394 error!(
1395 context,
1396 "No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
1397 request_uid
1398 );
1399 continue;
1400 };
1401
1402 info!(
1403 context,
1404 "Passing message UID {} to receive_imf().", request_uid
1405 );
1406 let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
1407 let received_msg = match res {
1408 Err(err) => {
1409 warn!(context, "receive_imf error: {err:#}.");
1410
1411 let text = format!(
1412 "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/.",
1413 );
1414 let mut msg = Message::new_text(text);
1415 add_device_msg(context, None, Some(&mut msg)).await?;
1416 None
1417 }
1418 Ok(msg) => msg,
1419 };
1420 received_msgs_channel
1421 .send((request_uid, received_msg))
1422 .await?;
1423 }
1424
1425 while fetch_responses
1432 .try_next()
1433 .await
1434 .context("Failed to drain FETCH responses")?
1435 .is_some()
1436 {}
1437
1438 if count != request_uids.len() {
1439 warn!(
1440 context,
1441 "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
1442 count,
1443 request_uids.len(),
1444 request_uids,
1445 );
1446 } else {
1447 info!(
1448 context,
1449 "Successfully received {} UIDs.",
1450 request_uids.len()
1451 );
1452 }
1453 }
1454
1455 Ok(())
1456 }
1457
1458 #[expect(clippy::arithmetic_side_effects)]
1464 pub(crate) async fn update_metadata(&mut self, context: &Context) -> Result<()> {
1465 let mut lock = context.metadata.write().await;
1466
1467 if !self.can_metadata() {
1468 *lock = Some(Default::default());
1469 }
1470 if let Some(ref mut old_metadata) = *lock {
1471 let now = time();
1472
1473 if now + 3600 * 12 < old_metadata.ice_servers_expiration_timestamp {
1475 return Ok(());
1476 }
1477
1478 let mut got_turn_server = false;
1479 if self.can_metadata() {
1480 info!(context, "ICE servers expired, requesting new credentials.");
1481 let mailbox = "";
1482 let options = "";
1483 let metadata = self
1484 .get_metadata(mailbox, options, "(/shared/vendor/deltachat/turn)")
1485 .await?;
1486 for m in metadata {
1487 if m.entry == "/shared/vendor/deltachat/turn"
1488 && let Some(value) = m.value
1489 {
1490 match create_ice_servers_from_metadata(&value).await {
1491 Ok((parsed_timestamp, parsed_ice_servers)) => {
1492 old_metadata.ice_servers_expiration_timestamp = parsed_timestamp;
1493 old_metadata.ice_servers = parsed_ice_servers;
1494 got_turn_server = true;
1495 }
1496 Err(err) => {
1497 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1498 }
1499 }
1500 }
1501 }
1502 }
1503 if !got_turn_server {
1504 info!(context, "Will use fallback ICE servers.");
1505 old_metadata.ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1507 old_metadata.ice_servers = create_fallback_ice_servers();
1508 }
1509 return Ok(());
1510 }
1511
1512 info!(
1513 context,
1514 "Server supports metadata, retrieving server comment and admin contact."
1515 );
1516
1517 let mut comment = None;
1518 let mut admin = None;
1519 let mut iroh_relay = None;
1520 let mut ice_servers = None;
1521 let mut ice_servers_expiration_timestamp = 0;
1522
1523 let mailbox = "";
1524 let options = "";
1525 let metadata = self
1526 .get_metadata(
1527 mailbox,
1528 options,
1529 "(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
1530 )
1531 .await?;
1532 for m in metadata {
1533 match m.entry.as_ref() {
1534 "/shared/comment" => {
1535 comment = m.value;
1536 }
1537 "/shared/admin" => {
1538 admin = m.value;
1539 }
1540 "/shared/vendor/deltachat/irohrelay" => {
1541 if let Some(value) = m.value {
1542 if let Ok(url) = Url::parse(&value) {
1543 iroh_relay = Some(url);
1544 } else {
1545 warn!(
1546 context,
1547 "Got invalid URL from iroh relay metadata: {:?}.", value
1548 );
1549 }
1550 }
1551 }
1552 "/shared/vendor/deltachat/turn" => {
1553 if let Some(value) = m.value {
1554 match create_ice_servers_from_metadata(&value).await {
1555 Ok((parsed_timestamp, parsed_ice_servers)) => {
1556 ice_servers_expiration_timestamp = parsed_timestamp;
1557 ice_servers = Some(parsed_ice_servers);
1558 }
1559 Err(err) => {
1560 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1561 }
1562 }
1563 }
1564 }
1565 _ => {}
1566 }
1567 }
1568 let ice_servers = if let Some(ice_servers) = ice_servers {
1569 ice_servers
1570 } else {
1571 ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1573 create_fallback_ice_servers()
1574 };
1575
1576 *lock = Some(ServerMetadata {
1577 comment,
1578 admin,
1579 iroh_relay,
1580 ice_servers,
1581 ice_servers_expiration_timestamp,
1582 });
1583 Ok(())
1584 }
1585
1586 pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
1588 if context.push_subscribed.load(Ordering::Relaxed) {
1589 return Ok(());
1590 }
1591
1592 let transport_id = self.transport_id();
1593
1594 let Some(device_token) = context.push_subscriber.device_token().await else {
1595 return Ok(());
1596 };
1597
1598 if self.can_metadata() && self.can_push() {
1599 info!(
1600 context,
1601 "Transport {transport_id}: Subscribing for push notifications."
1602 );
1603
1604 let old_encrypted_device_token =
1605 context.get_config(Config::EncryptedDeviceToken).await?;
1606
1607 let device_token_changed = old_encrypted_device_token.is_none()
1609 || context.get_config(Config::DeviceToken).await?.as_ref() != Some(&device_token);
1610
1611 let new_encrypted_device_token;
1612 if device_token_changed {
1613 let encrypted_device_token = encrypt_device_token(&device_token)
1614 .context("Failed to encrypt device token")?;
1615
1616 let encrypted_device_token_len = encrypted_device_token.len();
1620
1621 context
1627 .set_config_internal(Config::DeviceToken, Some(&device_token))
1628 .await?;
1629 context
1630 .set_config_internal(
1631 Config::EncryptedDeviceToken,
1632 Some(&encrypted_device_token),
1633 )
1634 .await?;
1635
1636 if encrypted_device_token_len <= 4096 {
1637 new_encrypted_device_token = Some(encrypted_device_token);
1638 } else {
1639 warn!(context, "Device token is too long for LITERAL-, ignoring.");
1649 new_encrypted_device_token = None;
1650 }
1651 } else {
1652 new_encrypted_device_token = old_encrypted_device_token;
1653 }
1654
1655 if let Some(encrypted_device_token) = new_encrypted_device_token {
1658 self.run_command_and_check_ok(&format_setmetadata(
1659 "INBOX",
1660 &encrypted_device_token,
1661 ))
1662 .await
1663 .context("SETMETADATA command failed")?;
1664
1665 context.push_subscribed.store(true, Ordering::Relaxed);
1666 }
1667 } else if !context.push_subscriber.heartbeat_subscribed().await {
1668 let context = context.clone();
1669 tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
1671 }
1672
1673 Ok(())
1674 }
1675}
1676
1677fn format_setmetadata(folder: &str, device_token: &str) -> String {
1678 let device_token_len = device_token.len();
1679 format!(
1680 "SETMETADATA \"{folder}\" (/private/devicetoken {{{device_token_len}+}}\r\n{device_token})"
1681 )
1682}
1683
1684impl Session {
1685 async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
1691 if flag == "\\Deleted" {
1692 self.selected_folder_needs_expunge = true;
1693 }
1694 let query = format!("+FLAGS ({flag})");
1695 let mut responses = self
1696 .uid_store(uid_set, &query)
1697 .await
1698 .with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
1699 while let Some(_response) = responses.try_next().await? {
1700 }
1702 Ok(())
1703 }
1704}
1705
1706impl Session {
1707 fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
1716 use UnsolicitedResponse::*;
1717 use async_imap::imap_proto::Response;
1718 use async_imap::imap_proto::ResponseCode;
1719
1720 let folder = self.selected_folder.as_deref().unwrap_or_default();
1721 let mut should_refetch = false;
1722 while let Ok(response) = self.unsolicited_responses.try_recv() {
1723 match response {
1724 Exists(_) => {
1725 info!(
1726 context,
1727 "Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
1728 );
1729 should_refetch = true;
1730 }
1731
1732 Expunge(_) | Recent(_) => {}
1733 Other(ref response_data) => {
1734 match response_data.parsed() {
1735 Response::Fetch { .. } => {
1736 info!(
1737 context,
1738 "Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
1739 );
1740 should_refetch = true;
1741 }
1742
1743 Response::Done {
1746 code: Some(ResponseCode::CopyUid(_, _, _)),
1747 ..
1748 } => {}
1749
1750 _ => {
1751 info!(context, "{folder:?}: got unsolicited response {response:?}")
1752 }
1753 }
1754 }
1755 _ => {
1756 info!(context, "{folder:?}: got unsolicited response {response:?}")
1757 }
1758 }
1759 }
1760 Ok(should_refetch)
1761 }
1762}
1763
1764async fn should_move_out_of_spam(
1765 context: &Context,
1766 headers: &[mailparse::MailHeader<'_>],
1767) -> Result<bool> {
1768 if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
1769 return Ok(true);
1780 }
1781
1782 if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
1783 if msg.chat_blocked != Blocked::Not {
1784 return Ok(false);
1786 }
1787 } else {
1788 let from = match mimeparser::get_from(headers) {
1789 Some(f) => f,
1790 None => return Ok(false),
1791 };
1792 let (from_id, blocked_contact, _origin) =
1794 match from_field_to_contact_id(context, &from, None, true, true)
1795 .await
1796 .context("from_field_to_contact_id")?
1797 {
1798 Some(res) => res,
1799 None => {
1800 warn!(
1801 context,
1802 "Contact with From address {:?} cannot exist, not moving out of spam", from
1803 );
1804 return Ok(false);
1805 }
1806 };
1807 if blocked_contact {
1808 return Ok(false);
1810 }
1811
1812 if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
1813 if chat_id_blocked.blocked != Blocked::Not {
1814 return Ok(false);
1815 }
1816 } else if from_id != ContactId::SELF {
1817 return Ok(false);
1819 }
1820 }
1821
1822 Ok(true)
1823}
1824
1825async fn spam_target_folder_cfg(
1830 context: &Context,
1831 headers: &[mailparse::MailHeader<'_>],
1832) -> Result<Option<Config>> {
1833 if !should_move_out_of_spam(context, headers).await? {
1834 return Ok(None);
1835 }
1836
1837 Ok(Some(Config::ConfiguredInboxFolder))
1838}
1839
1840pub async fn target_folder_cfg(
1843 context: &Context,
1844 folder: &str,
1845 folder_meaning: FolderMeaning,
1846 headers: &[mailparse::MailHeader<'_>],
1847) -> Result<Option<Config>> {
1848 if folder == "DeltaChat" {
1849 return Ok(None);
1850 }
1851
1852 if folder_meaning == FolderMeaning::Spam {
1853 spam_target_folder_cfg(context, headers).await
1854 } else {
1855 Ok(None)
1856 }
1857}
1858
1859pub async fn target_folder(
1860 context: &Context,
1861 folder: &str,
1862 folder_meaning: FolderMeaning,
1863 headers: &[mailparse::MailHeader<'_>],
1864) -> Result<String> {
1865 match target_folder_cfg(context, folder, folder_meaning, headers).await? {
1866 Some(config) => match context.get_config(config).await? {
1867 Some(target) => Ok(target),
1868 None => Ok(folder.to_string()),
1869 },
1870 None => Ok(folder.to_string()),
1871 }
1872}
1873
1874fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
1881 const SPAM_NAMES: &[&str] = &[
1883 "spam",
1884 "junk",
1885 "Correio electrónico não solicitado",
1886 "Correo basura",
1887 "Lixo",
1888 "Nettsøppel",
1889 "Nevyžádaná pošta",
1890 "No solicitado",
1891 "Ongewenst",
1892 "Posta indesiderata",
1893 "Skräp",
1894 "Wiadomości-śmieci",
1895 "Önemsiz",
1896 "Ανεπιθύμητα",
1897 "Спам",
1898 "垃圾邮件",
1899 "垃圾郵件",
1900 "迷惑メール",
1901 "스팸",
1902 ];
1903 const TRASH_NAMES: &[&str] = &[
1904 "Trash",
1905 "Bin",
1906 "Caixote do lixo",
1907 "Cestino",
1908 "Corbeille",
1909 "Papelera",
1910 "Papierkorb",
1911 "Papirkurv",
1912 "Papperskorgen",
1913 "Prullenbak",
1914 "Rubujo",
1915 "Κάδος απορριμμάτων",
1916 "Корзина",
1917 "Кошик",
1918 "ゴミ箱",
1919 "垃圾桶",
1920 "已删除邮件",
1921 "휴지통",
1922 ];
1923 let lower = folder_name.to_lowercase();
1924
1925 if lower == "inbox" {
1926 FolderMeaning::Inbox
1927 } else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1928 FolderMeaning::Spam
1929 } else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1930 FolderMeaning::Trash
1931 } else {
1932 FolderMeaning::Unknown
1933 }
1934}
1935
1936fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
1937 for attr in folder_attrs {
1938 match attr {
1939 NameAttribute::Trash => return FolderMeaning::Trash,
1940 NameAttribute::Junk => return FolderMeaning::Spam,
1941 NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
1942 NameAttribute::Extension(label) => {
1943 match label.as_ref() {
1944 "\\Spam" => return FolderMeaning::Spam,
1945 "\\Important" => return FolderMeaning::Virtual,
1946 _ => {}
1947 };
1948 }
1949 _ => {}
1950 }
1951 }
1952 FolderMeaning::Unknown
1953}
1954
1955pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
1956 match get_folder_meaning_by_attrs(folder.attributes()) {
1957 FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
1958 meaning => meaning,
1959 }
1960}
1961
1962fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader<'_>>> {
1964 match prefetch_msg.header() {
1965 Some(header_bytes) => {
1966 let (headers, _) = mailparse::parse_headers(header_bytes)?;
1967 Ok(headers)
1968 }
1969 None => Ok(Vec::new()),
1970 }
1971}
1972
1973pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
1974 headers
1975 .get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
1976 .or_else(|| headers.get_header_value(HeaderDef::MessageId))
1977 .and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
1978}
1979
1980pub(crate) fn create_message_id() -> String {
1981 format!("{}{}", GENERATED_PREFIX, create_id())
1982}
1983
1984pub(crate) async fn prefetch_should_download(
1986 context: &Context,
1987 headers: &[mailparse::MailHeader<'_>],
1988 message_id: &str,
1989 mut flags: impl Iterator<Item = Flag<'_>>,
1990) -> Result<bool> {
1991 if message::rfc724_mid_download_tried(context, message_id).await? {
1992 if let Some(from) = mimeparser::get_from(headers)
1993 && context.is_self_addr(&from.addr).await?
1994 {
1995 markseen_on_imap_table(context, message_id).await?;
1996 }
1997 return Ok(false);
1998 }
1999
2000 let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
2004 let from = from.to_ascii_lowercase();
2005 from.contains("mailer-daemon") || from.contains("mail-daemon")
2006 } else {
2007 false
2008 };
2009
2010 let from = match mimeparser::get_from(headers) {
2011 Some(f) => f,
2012 None => return Ok(false),
2013 };
2014 let (_from_id, blocked_contact, _origin) =
2015 match from_field_to_contact_id(context, &from, None, true, true).await? {
2016 Some(res) => res,
2017 None => return Ok(false),
2018 };
2019 if flags.any(|f| f == Flag::Draft) {
2023 info!(context, "Ignoring draft message");
2024 return Ok(false);
2025 }
2026
2027 let should_download = (!blocked_contact) || maybe_ndn;
2028 Ok(should_download)
2029}
2030
2031async fn mark_seen_by_uid(
2035 context: &Context,
2036 transport_id: u32,
2037 folder: &str,
2038 uid_validity: u32,
2039 uid: u32,
2040) -> Result<Option<ChatId>> {
2041 if let Some((msg_id, chat_id)) = context
2042 .sql
2043 .query_row_optional(
2044 "SELECT id, chat_id FROM msgs
2045 WHERE id > 9 AND rfc724_mid IN (
2046 SELECT rfc724_mid FROM imap
2047 WHERE transport_id=?
2048 AND folder=?
2049 AND uidvalidity=?
2050 AND uid=?
2051 LIMIT 1
2052 )",
2053 (transport_id, &folder, uid_validity, uid),
2054 |row| {
2055 let msg_id: MsgId = row.get(0)?;
2056 let chat_id: ChatId = row.get(1)?;
2057 Ok((msg_id, chat_id))
2058 },
2059 )
2060 .await
2061 .with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
2062 {
2063 let updated = context
2064 .sql
2065 .execute(
2066 "UPDATE msgs SET state=?1
2067 WHERE (state=?2 OR state=?3)
2068 AND id=?4",
2069 (
2070 MessageState::InSeen,
2071 MessageState::InFresh,
2072 MessageState::InNoticed,
2073 msg_id,
2074 ),
2075 )
2076 .await
2077 .with_context(|| format!("failed to update msg {msg_id} state"))?
2078 > 0;
2079
2080 if updated {
2081 msg_id
2082 .start_ephemeral_timer(context)
2083 .await
2084 .with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
2085 Ok(Some(chat_id))
2086 } else {
2087 Ok(None)
2089 }
2090 } else {
2091 Ok(None)
2093 }
2094}
2095
2096pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
2099 context
2100 .sql
2101 .execute(
2102 "INSERT OR IGNORE INTO imap_markseen (id)
2103 SELECT id FROM imap WHERE rfc724_mid=?",
2104 (message_id,),
2105 )
2106 .await?;
2107 context.scheduler.interrupt_inbox().await;
2108
2109 Ok(())
2110}
2111
2112pub(crate) async fn set_uid_next(
2116 context: &Context,
2117 transport_id: u32,
2118 folder: &str,
2119 uid_next: u32,
2120) -> Result<()> {
2121 context
2122 .sql
2123 .execute(
2124 "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
2125 ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
2126 (transport_id, folder, uid_next),
2127 )
2128 .await?;
2129 Ok(())
2130}
2131
2132async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2138 Ok(context
2139 .sql
2140 .query_get_value(
2141 "SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
2142 (transport_id, folder),
2143 )
2144 .await?
2145 .unwrap_or(0))
2146}
2147
2148pub(crate) async fn set_uidvalidity(
2149 context: &Context,
2150 transport_id: u32,
2151 folder: &str,
2152 uidvalidity: u32,
2153) -> Result<()> {
2154 context
2155 .sql
2156 .execute(
2157 "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
2158 ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
2159 (transport_id, folder, uidvalidity),
2160 )
2161 .await?;
2162 Ok(())
2163}
2164
2165async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
2166 Ok(context
2167 .sql
2168 .query_get_value(
2169 "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
2170 (transport_id, folder),
2171 )
2172 .await?
2173 .unwrap_or(0))
2174}
2175
2176pub(crate) async fn set_modseq(
2177 context: &Context,
2178 transport_id: u32,
2179 folder: &str,
2180 modseq: u64,
2181) -> Result<()> {
2182 context
2183 .sql
2184 .execute(
2185 "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
2186 ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
2187 (transport_id, folder, modseq),
2188 )
2189 .await?;
2190 Ok(())
2191}
2192
2193async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
2194 Ok(context
2195 .sql
2196 .query_get_value(
2197 "SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
2198 (transport_id, folder),
2199 )
2200 .await?
2201 .unwrap_or(0))
2202}
2203
2204#[expect(clippy::arithmetic_side_effects)]
2208fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
2209 let mut ranges: Vec<UidRange> = vec![];
2211
2212 for ¤t in uids {
2213 if let Some(last) = ranges.last_mut()
2214 && last.end + 1 == current
2215 {
2216 last.end = current;
2217 continue;
2218 }
2219
2220 ranges.push(UidRange {
2221 start: current,
2222 end: current,
2223 });
2224 }
2225
2226 let mut result = vec![];
2228 let (mut last_uids, mut last_str) = (Vec::new(), String::new());
2229 for range in ranges {
2230 last_uids.reserve((range.end - range.start + 1).try_into()?);
2231 (range.start..=range.end).for_each(|u| last_uids.push(u));
2232 if !last_str.is_empty() {
2233 last_str.push(',');
2234 }
2235 last_str.push_str(&range.to_string());
2236
2237 if last_str.len() > 990 {
2238 result.push((take(&mut last_uids), take(&mut last_str)));
2239 }
2240 }
2241 result.push((last_uids, last_str));
2242
2243 result.retain(|(_, s)| !s.is_empty());
2244 Ok(result)
2245}
2246
2247struct UidRange {
2248 start: u32,
2249 end: u32,
2250 }
2252
2253impl std::fmt::Display for UidRange {
2254 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2255 if self.start == self.end {
2256 write!(f, "{}", self.start)
2257 } else {
2258 write!(f, "{}:{}", self.start, self.end)
2259 }
2260 }
2261}
2262
2263#[cfg(test)]
2264mod imap_tests;