1use std::{
7 cmp::max,
8 cmp::min,
9 collections::{BTreeMap, HashMap},
10 iter::Peekable,
11 mem::take,
12 sync::atomic::Ordering,
13 time::{Duration, UNIX_EPOCH},
14};
15
16use anyhow::{Context as _, Result, bail, ensure, format_err};
17use async_channel::{self, Receiver, Sender};
18use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
19use futures::{FutureExt as _, TryStreamExt};
20use futures_lite::FutureExt;
21use ratelimit::Ratelimit;
22use url::Url;
23
24use crate::chat::{self, ChatIdBlocked, add_device_msg};
25use crate::config::Config;
26use crate::constants::{Blocked, DC_VERSION_STR};
27use crate::contact::ContactId;
28use crate::context::Context;
29use crate::ensure_and_debug_assert;
30use crate::events::EventType;
31use crate::headerdef::{HeaderDef, HeaderDefMap};
32use crate::log::{LogExt, warn};
33use crate::message::{self, Message};
34use crate::mimeparser;
35use crate::net::proxy::ProxyConfig;
36use crate::net::session::SessionStream;
37use crate::oauth2::get_oauth2_access_token;
38use crate::push::encrypt_device_token;
39use crate::receive_imf::{
40 ReceivedMsg, from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner,
41};
42use crate::scheduler::connectivity::ConnectivityStore;
43use crate::stock_str;
44use crate::tools::{self, create_id, duration_to_str, time};
45use crate::transport::{
46 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
47};
48use crate::{
49 calls::{UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata},
50 ephemeral::delete_expired_imap_messages,
51};
52
53pub(crate) mod capabilities;
54mod client;
55mod idle;
56pub mod select_folder;
57pub(crate) mod session;
58
59use client::{Client, determine_capabilities};
60use session::Session;
61
62pub(crate) const GENERATED_PREFIX: &str = "GEN_";
63
64const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
65 MESSAGE-ID \
66 X-MICROSOFT-ORIGINAL-MESSAGE-ID\
67 )])";
68const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
69
70#[derive(Debug)]
71pub(crate) struct Imap {
72 transport_id: u32,
76
77 pub(crate) idle_interrupt_receiver: Receiver<()>,
78
79 pub(crate) addr: String,
81
82 lp: Vec<ConfiguredServerLoginParam>,
84
85 password: String,
87
88 proxy_config: Option<ProxyConfig>,
90
91 strict_tls: bool,
92
93 oauth2: bool,
94
95 pub(crate) folder: String,
97
98 authentication_failed_once: bool,
99
100 pub(crate) connectivity: ConnectivityStore,
101
102 conn_last_try: tools::Time,
103 conn_backoff_ms: u64,
104
105 ratelimit: Ratelimit,
113
114 pub(crate) resync_request_sender: async_channel::Sender<()>,
116
117 pub(crate) resync_request_receiver: async_channel::Receiver<()>,
119}
120
121#[derive(Debug)]
122struct OAuth2 {
123 user: String,
124 access_token: String,
125}
126
127#[derive(Debug, Default)]
128pub(crate) struct ServerMetadata {
129 pub comment: Option<String>,
132
133 pub admin: Option<String>,
136
137 pub iroh_relay: Option<Url>,
138
139 pub ice_servers: Vec<UnresolvedIceServer>,
141
142 pub ice_servers_expiration_timestamp: i64,
149}
150
151impl async_imap::Authenticator for OAuth2 {
152 type Response = String;
153
154 fn process(&mut self, _data: &[u8]) -> Self::Response {
155 format!(
156 "user={}\x01auth=Bearer {}\x01\x01",
157 self.user, self.access_token
158 )
159 }
160}
161
162#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
163pub enum FolderMeaning {
164 Unknown,
165
166 Spam,
168 Inbox,
169 Trash,
170
171 Virtual,
178}
179
180struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
181 inner: Peekable<T>,
182}
183
184impl<T, I> From<I> for UidGrouper<T>
185where
186 T: Iterator<Item = (i64, u32, String)>,
187 I: IntoIterator<IntoIter = T>,
188{
189 fn from(inner: I) -> Self {
190 Self {
191 inner: inner.into_iter().peekable(),
192 }
193 }
194}
195
196impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
197 type Item = (String, Vec<i64>, String);
199
200 #[expect(clippy::arithmetic_side_effects)]
201 fn next(&mut self) -> Option<Self::Item> {
202 let (_, _, folder) = self.inner.peek().cloned()?;
203
204 let mut uid_set = String::new();
205 let mut rowid_set = Vec::new();
206
207 while uid_set.len() < 1000 {
208 if let Some((start_rowid, start_uid, _)) = self
210 .inner
211 .next_if(|(_, _, start_folder)| start_folder == &folder)
212 {
213 rowid_set.push(start_rowid);
214 let mut end_uid = start_uid;
215
216 while let Some((next_rowid, next_uid, _)) =
217 self.inner.next_if(|(_, next_uid, next_folder)| {
218 next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
219 })
220 {
221 end_uid = next_uid;
222 rowid_set.push(next_rowid);
223 }
224
225 let uid_range = UidRange {
226 start: start_uid,
227 end: end_uid,
228 };
229 if !uid_set.is_empty() {
230 uid_set.push(',');
231 }
232 uid_set.push_str(&uid_range.to_string());
233 } else {
234 break;
235 }
236 }
237
238 Some((folder, rowid_set, uid_set))
239 }
240}
241
242impl Imap {
243 pub async fn new(
245 context: &Context,
246 transport_id: u32,
247 param: ConfiguredLoginParam,
248 idle_interrupt_receiver: Receiver<()>,
249 ) -> Result<Self> {
250 let lp = param.imap.clone();
251 let password = param.imap_password.clone();
252 let proxy_config = ProxyConfig::load(context).await?;
253 let addr = ¶m.addr;
254 let strict_tls = param.strict_tls(proxy_config.is_some());
255 let oauth2 = param.oauth2;
256 let folder = param
257 .imap_folder
258 .clone()
259 .unwrap_or_else(|| "INBOX".to_string());
260 ensure_and_debug_assert!(!folder.is_empty(), "Watched folder name cannot be empty");
261 let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
262 Ok(Imap {
263 transport_id,
264 idle_interrupt_receiver,
265 addr: addr.to_string(),
266 lp,
267 password,
268 proxy_config,
269 strict_tls,
270 oauth2,
271 folder,
272 authentication_failed_once: false,
273 connectivity: Default::default(),
274 conn_last_try: UNIX_EPOCH,
275 conn_backoff_ms: 0,
276 ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
278 resync_request_sender,
279 resync_request_receiver,
280 })
281 }
282
283 pub async fn new_configured(
285 context: &Context,
286 idle_interrupt_receiver: Receiver<()>,
287 ) -> Result<Self> {
288 let (transport_id, param) = ConfiguredLoginParam::load(context)
289 .await?
290 .context("Not configured")?;
291 let imap = Self::new(context, transport_id, param, idle_interrupt_receiver).await?;
292 Ok(imap)
293 }
294
295 pub fn transport_id(&self) -> u32 {
297 self.transport_id
298 }
299
300 pub(crate) async fn connect(
306 &mut self,
307 context: &Context,
308 configuring: bool,
309 ) -> Result<Session> {
310 let now = tools::Time::now();
311 let until_can_send = max(
312 min(self.conn_last_try, now)
313 .checked_add(Duration::from_millis(self.conn_backoff_ms))
314 .unwrap_or(now),
315 now,
316 )
317 .duration_since(now)?;
318 let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
319 if !ratelimit_duration.is_zero() {
320 warn!(
321 context,
322 "IMAP got rate limited, waiting for {} until can connect.",
323 duration_to_str(ratelimit_duration),
324 );
325 let interrupted = async {
326 tokio::time::sleep(ratelimit_duration).await;
327 false
328 }
329 .race(self.idle_interrupt_receiver.recv().map(|_| true))
330 .await;
331 if interrupted {
332 info!(
333 context,
334 "Connecting to IMAP without waiting for ratelimit due to interrupt."
335 );
336 }
337 }
338
339 info!(context, "Connecting to IMAP server.");
340 self.connectivity.set_connecting(context);
341
342 self.conn_last_try = tools::Time::now();
343 const BACKOFF_MIN_MS: u64 = 2000;
344 const BACKOFF_MAX_MS: u64 = 80_000;
345 self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
346 self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(rand::random_range(
347 (self.conn_backoff_ms / 2)..=self.conn_backoff_ms,
348 ));
349 self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
350
351 let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
352 let mut first_error = None;
353 for lp in login_params {
354 info!(context, "IMAP trying to connect to {}.", &lp.connection);
355 let connection_candidate = lp.connection.clone();
356 let client = match Client::connect(
357 context,
358 self.proxy_config.clone(),
359 self.strict_tls,
360 &connection_candidate,
361 )
362 .await
363 .with_context(|| format!("IMAP failed to connect to {connection_candidate}"))
364 {
365 Ok(client) => client,
366 Err(err) => {
367 warn!(context, "{err:#}.");
368 first_error.get_or_insert(err);
369 continue;
370 }
371 };
372
373 self.conn_backoff_ms = BACKOFF_MIN_MS;
374 self.ratelimit.send();
375
376 let imap_user: &str = lp.user.as_ref();
377 let imap_pw: &str = &self.password;
378
379 let login_res = if self.oauth2 {
380 info!(context, "Logging into IMAP server with OAuth 2.");
381 let addr: &str = self.addr.as_ref();
382
383 let token = get_oauth2_access_token(context, addr, imap_pw, true)
384 .await?
385 .context("IMAP could not get OAUTH token")?;
386 let auth = OAuth2 {
387 user: imap_user.into(),
388 access_token: token,
389 };
390 client.authenticate("XOAUTH2", auth).await
391 } else {
392 info!(context, "Logging into IMAP server with LOGIN.");
393 client.login(imap_user, imap_pw).await
394 };
395
396 match login_res {
397 Ok(mut session) => {
398 let capabilities = determine_capabilities(&mut session).await?;
399 let resync_request_sender = self.resync_request_sender.clone();
400
401 let session = if capabilities.can_compress {
402 info!(context, "Enabling IMAP compression.");
403 let compressed_session = session
404 .compress(|s| {
405 let session_stream: Box<dyn SessionStream> = Box::new(s);
406 session_stream
407 })
408 .await
409 .context("Failed to enable IMAP compression")?;
410 Session::new(
411 compressed_session,
412 capabilities,
413 resync_request_sender,
414 self.transport_id,
415 )
416 } else {
417 Session::new(
418 session,
419 capabilities,
420 resync_request_sender,
421 self.transport_id,
422 )
423 };
424
425 let mut lock = context.server_id.write().await;
427 lock.clone_from(&session.capabilities.server_id);
428
429 self.authentication_failed_once = false;
430 context.emit_event(EventType::ImapConnected(format!(
431 "IMAP-LOGIN as {}",
432 lp.user
433 )));
434 self.connectivity.set_preparing(context);
435 info!(context, "Successfully logged into IMAP server.");
436 return Ok(session);
437 }
438
439 Err(err) => {
440 let imap_user = lp.user.to_owned();
441 let message = stock_str::cannot_login(context, &imap_user);
442
443 warn!(context, "IMAP failed to login: {err:#}.");
444 first_error.get_or_insert(format_err!("{message} ({err:#})"));
445
446 let _lock = context.wrong_pw_warning_mutex.lock().await;
448 if err.to_string().to_lowercase().contains("authentication") {
449 if self.authentication_failed_once
450 && !configuring
451 && context.get_config_bool(Config::NotifyAboutWrongPw).await?
452 {
453 let mut msg = Message::new_text(message);
454 if let Err(e) = chat::add_device_msg_with_importance(
455 context,
456 None,
457 Some(&mut msg),
458 true,
459 )
460 .await
461 {
462 warn!(context, "Failed to add device message: {e:#}.");
463 } else {
464 context
465 .set_config_internal(Config::NotifyAboutWrongPw, None)
466 .await
467 .log_err(context)
468 .ok();
469 }
470 } else {
471 self.authentication_failed_once = true;
472 }
473 } else {
474 self.authentication_failed_once = false;
475 }
476 }
477 }
478 }
479
480 Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
481 }
482
483 pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
488 let configuring = false;
489 let session = match self.connect(context, configuring).await {
490 Ok(session) => session,
491 Err(err) => {
492 self.connectivity.set_err(context, format!("{err:#}"));
493 return Err(err);
494 }
495 };
496
497 Ok(session)
498 }
499
500 pub async fn fetch_move_delete(
505 &mut self,
506 context: &Context,
507 session: &mut Session,
508 watch_folder: &str,
509 ) -> Result<()> {
510 ensure_and_debug_assert!(!watch_folder.is_empty(), "Watched folder cannot be empty");
511 if !context.sql.is_open().await {
512 bail!("IMAP operation attempted while it is torn down");
514 }
515
516 let msgs_fetched = self
517 .fetch_new_messages(context, session, watch_folder)
518 .await
519 .context("fetch_new_messages")?;
520 if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
521 context.scheduler.interrupt_ephemeral_task().await;
526 }
527
528 delete_expired_imap_messages(context, session.transport_id(), session.is_chatmail())
531 .await
532 .context("delete_expired_imap_messages")?;
533
534 session
535 .move_delete_messages(context, watch_folder)
536 .await
537 .context("move_delete_messages")?;
538
539 Ok(())
540 }
541
542 #[expect(clippy::arithmetic_side_effects)]
546 pub(crate) async fn fetch_new_messages(
547 &mut self,
548 context: &Context,
549 session: &mut Session,
550 folder: &str,
551 ) -> Result<bool> {
552 let transport_id = session.transport_id();
553
554 let folder_exists = session
555 .select_with_uidvalidity(context, folder)
556 .await
557 .with_context(|| format!("Failed to select folder {folder:?}"))?;
558
559 if !session.new_mail {
560 info!(
561 context,
562 "Transport {transport_id}: No new emails in folder {folder:?}."
563 );
564 return Ok(false);
565 }
566 session.new_mail = false;
569
570 if !folder_exists {
571 return Ok(false);
572 }
573
574 let mut read_cnt = 0;
575 loop {
576 let (n, fetch_more) =
577 Box::pin(self.fetch_new_msg_batch(context, session, folder)).await?;
578 read_cnt += n;
579 if !fetch_more {
580 return Ok(read_cnt > 0);
581 }
582 }
583 }
584
585 #[expect(clippy::arithmetic_side_effects)]
587 async fn fetch_new_msg_batch(
588 &mut self,
589 context: &Context,
590 session: &mut Session,
591 folder: &str,
592 ) -> Result<(usize, bool)> {
593 let transport_id = self.transport_id;
594 let uid_validity = get_uidvalidity(context, transport_id, folder).await?;
595 let old_uid_next = get_uid_next(context, transport_id, folder).await?;
596 info!(
597 context,
598 "fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
599 );
600
601 let uids_to_prefetch = 500;
602 let msgs = session
603 .prefetch(old_uid_next, uids_to_prefetch)
604 .await
605 .context("prefetch")?;
606 let read_cnt = msgs.len();
607 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
608
609 let mut uids_fetch: Vec<u32> = Vec::new();
610 let mut available_post_msgs: Vec<String> = Vec::new();
611 let mut download_later: Vec<String> = Vec::new();
612 let mut uid_message_ids = BTreeMap::new();
613 let mut largest_uid_skipped = None;
614
615 let download_limit: Option<u32> = context
616 .get_config_parsed(Config::DownloadLimit)
617 .await?
618 .filter(|&l| 0 < l);
619
620 for (uid, ref fetch_response) in msgs {
622 let headers = match get_fetch_headers(fetch_response) {
623 Ok(headers) => headers,
624 Err(err) => {
625 warn!(context, "Failed to parse FETCH headers: {err:#}.");
626 continue;
627 }
628 };
629
630 let message_id = prefetch_get_message_id(&headers);
631 let size = fetch_response
632 .size
633 .context("imap fetch response does not contain size")?;
634
635 let delete = if let Some(message_id) = &message_id {
646 message::rfc724_mid_exists_ex(context, message_id, "deleted=1")
647 .await?
648 .is_some_and(|(_msg_id, deleted)| deleted)
649 } else {
650 false
651 };
652
653 let message_id = message_id.unwrap_or_else(create_message_id);
656
657 if delete {
658 info!(context, "Deleting locally deleted message {message_id}.");
659 }
660
661 let target = if delete { "" } else { folder };
662
663 context
664 .sql
665 .execute(
666 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
667 VALUES (?, ?, ?, ?, ?, ?)
668 ON CONFLICT(transport_id, folder, uid, uidvalidity)
669 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
670 target=excluded.target",
671 (
672 self.transport_id,
673 &message_id,
674 &folder,
675 uid,
676 uid_validity,
677 target,
678 ),
679 )
680 .await?;
681
682 if folder == target
689 && prefetch_should_download(context, &headers, &message_id, fetch_response.flags())
690 .await
691 .context("prefetch_should_download")?
692 {
693 if headers
694 .get_header_value(HeaderDef::ChatIsPostMessage)
695 .is_some()
696 {
697 info!(context, "{message_id:?} is a post-message.");
698 available_post_msgs.push(message_id.clone());
699
700 let is_bot = context.get_config_bool(Config::Bot).await?;
701 if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
702 {
703 uids_fetch.push(uid);
704 uid_message_ids.insert(uid, message_id);
705 } else {
706 if download_limit.is_none_or(|download_limit| size <= download_limit) {
707 download_later.push(message_id.clone());
710 }
711 largest_uid_skipped = Some(uid);
712 }
713 } else {
714 info!(context, "{message_id:?} is not a post-message.");
715 if download_limit.is_none_or(|download_limit| size <= download_limit) {
716 uids_fetch.push(uid);
717 uid_message_ids.insert(uid, message_id);
718 } else {
719 download_later.push(message_id.clone());
720 largest_uid_skipped = Some(uid);
721 }
722 };
723 } else {
724 largest_uid_skipped = Some(uid);
725 }
726 }
727
728 if !uids_fetch.is_empty() {
729 self.connectivity.set_working(context);
730 }
731
732 let (sender, receiver) = async_channel::unbounded();
733
734 let mut received_msgs = Vec::with_capacity(uids_fetch.len());
735 let mailbox_uid_next = session
736 .selected_mailbox
737 .as_ref()
738 .with_context(|| format!("Expected {folder:?} to be selected"))?
739 .uid_next
740 .unwrap_or_default();
741
742 let update_uids_future = async {
743 let mut largest_uid_fetched: u32 = 0;
744
745 while let Ok((uid, received_msg_opt)) = receiver.recv().await {
746 largest_uid_fetched = max(largest_uid_fetched, uid);
747 if let Some(received_msg) = received_msg_opt {
748 received_msgs.push(received_msg)
749 }
750 }
751
752 largest_uid_fetched
753 };
754
755 let actually_download_messages_future = async {
756 session
757 .fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
758 .await
759 .context("fetch_many_msgs")
760 };
761
762 let (largest_uid_fetched, fetch_res) =
763 tokio::join!(update_uids_future, actually_download_messages_future);
764
765 let mut new_uid_next = largest_uid_fetched + 1;
771 let fetch_more = fetch_res.is_ok() && {
772 let prefetch_uid_next = old_uid_next + uids_to_prefetch;
773 new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
777
778 new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
779
780 prefetch_uid_next < mailbox_uid_next
781 };
782 if new_uid_next > old_uid_next {
783 set_uid_next(context, self.transport_id, folder, new_uid_next).await?;
784 }
785
786 info!(context, "{} mails read from \"{}\".", read_cnt, folder);
787
788 if !received_msgs.is_empty() {
789 context.emit_event(EventType::IncomingMsgBunch);
790 }
791
792 chat::mark_old_messages_as_noticed(context, received_msgs).await?;
793
794 if fetch_res.is_ok() {
795 info!(
796 context,
797 "available_post_msgs: {}, download_later: {}.",
798 available_post_msgs.len(),
799 download_later.len(),
800 );
801 let trans_fn = |t: &mut rusqlite::Transaction| {
802 let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
803 for rfc724_mid in available_post_msgs {
804 stmt.execute((rfc724_mid,))
805 .context("INSERT OR IGNORE INTO available_post_msgs")?;
806 }
807 let mut stmt =
808 t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
809 for rfc724_mid in download_later {
810 stmt.execute((rfc724_mid,))
811 .context("INSERT OR IGNORE INTO download")?;
812 }
813 Ok(())
814 };
815 context.sql.transaction(trans_fn).await?;
816 }
817
818 fetch_res?;
821
822 Ok((read_cnt, fetch_more))
823 }
824}
825
826impl Session {
827 pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
829 let all_folders = self
830 .list_folders()
831 .await
832 .context("listing folders for resync")?;
833 for folder in all_folders {
834 let folder_meaning = get_folder_meaning(&folder);
835 if !matches!(
836 folder_meaning,
837 FolderMeaning::Virtual | FolderMeaning::Unknown
838 ) {
839 self.resync_folder_uids(context, folder.name(), folder_meaning)
840 .await?;
841 }
842 }
843 Ok(())
844 }
845
846 pub(crate) async fn resync_folder_uids(
853 &mut self,
854 context: &Context,
855 folder: &str,
856 folder_meaning: FolderMeaning,
857 ) -> Result<()> {
858 let uid_validity;
859 let mut msgs = BTreeMap::new();
861
862 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
863 let transport_id = self.transport_id();
864 if folder_exists {
865 let mut list = self
866 .uid_fetch("1:*", RFC724MID_UID)
867 .await
868 .with_context(|| format!("Can't resync folder {folder}"))?;
869 while let Some(fetch) = list.try_next().await? {
870 let headers = match get_fetch_headers(&fetch) {
871 Ok(headers) => headers,
872 Err(err) => {
873 warn!(context, "Failed to parse FETCH headers: {}", err);
874 continue;
875 }
876 };
877 let message_id = prefetch_get_message_id(&headers);
878
879 if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
880 msgs.insert(
881 uid,
882 (
883 rfc724_mid,
884 target_folder(context, folder, folder_meaning, &headers).await?,
885 ),
886 );
887 }
888 }
889
890 info!(
891 context,
892 "resync_folder_uids: Collected {} message IDs in {folder}.",
893 msgs.len(),
894 );
895
896 uid_validity = get_uidvalidity(context, transport_id, folder).await?;
897 } else {
898 warn!(context, "resync_folder_uids: No folder {folder}.");
899 uid_validity = 0;
900 }
901
902 context
904 .sql
905 .transaction(move |transaction| {
906 transaction.execute("DELETE FROM imap WHERE transport_id=? AND folder=?", (transport_id, folder,))?;
907 for (uid, (rfc724_mid, target)) in &msgs {
908 transaction.execute(
911 "INSERT INTO imap (transport_id, rfc724_mid, folder, uid, uidvalidity, target)
912 VALUES (?, ?, ?, ?, ?, ?)
913 ON CONFLICT(transport_id, folder, uid, uidvalidity)
914 DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
915 target=excluded.target",
916 (transport_id, rfc724_mid, folder, uid, uid_validity, target),
917 )?;
918 }
919 Ok(())
920 })
921 .await?;
922 Ok(())
923 }
924
925 async fn delete_message_batch(
928 &mut self,
929 context: &Context,
930 uid_set: &str,
931 row_ids: Vec<i64>,
932 ) -> Result<()> {
933 self.add_flag_finalized_with_set(uid_set, "\\Deleted")
935 .await?;
936 context
937 .sql
938 .transaction(|transaction| {
939 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
940 for row_id in row_ids {
941 stmt.execute((row_id,))?;
942 }
943 Ok(())
944 })
945 .await
946 .context("Cannot remove deleted messages from imap table")?;
947
948 context.emit_event(EventType::ImapMessageDeleted(format!(
949 "IMAP messages {uid_set} marked as deleted"
950 )));
951 Ok(())
952 }
953
954 async fn move_message_batch(
957 &mut self,
958 context: &Context,
959 set: &str,
960 row_ids: Vec<i64>,
961 target: &str,
962 ) -> Result<()> {
963 if self.can_move() {
964 match self.uid_mv(set, &target).await {
965 Ok(()) => {
966 context
968 .sql
969 .transaction(|transaction| {
970 let mut stmt = transaction.prepare("DELETE FROM imap WHERE id = ?")?;
971 for row_id in row_ids {
972 stmt.execute((row_id,))?;
973 }
974 Ok(())
975 })
976 .await
977 .context("Cannot delete moved messages from imap table")?;
978 context.emit_event(EventType::ImapMessageMoved(format!(
979 "IMAP messages {set} moved to {target}"
980 )));
981 return Ok(());
982 }
983 Err(err) => {
984 warn!(
985 context,
986 "Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
987 set,
988 target,
989 err
990 );
991 }
992 }
993 }
994
995 info!(
998 context,
999 "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
1000 );
1001 self.uid_copy(&set, &target).await?;
1002 context
1003 .sql
1004 .transaction(|transaction| {
1005 let mut stmt = transaction.prepare("UPDATE imap SET target='' WHERE id = ?")?;
1006 for row_id in row_ids {
1007 stmt.execute((row_id,))?;
1008 }
1009 Ok(())
1010 })
1011 .await
1012 .context("Cannot plan deletion of messages")?;
1013 context.emit_event(EventType::ImapMessageMoved(format!(
1014 "IMAP messages {set} copied to {target}"
1015 )));
1016 Ok(())
1017 }
1018
1019 async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
1023 let transport_id = self.transport_id();
1024 let rows = context
1025 .sql
1026 .query_map_vec(
1027 "SELECT id, uid, target FROM imap
1028 WHERE folder = ?
1029 AND transport_id = ?
1030 AND target != folder
1031 ORDER BY target, uid",
1032 (folder, transport_id),
1033 |row| {
1034 let rowid: i64 = row.get(0)?;
1035 let uid: u32 = row.get(1)?;
1036 let target: String = row.get(2)?;
1037 Ok((rowid, uid, target))
1038 },
1039 )
1040 .await?;
1041
1042 for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
1043 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
1048 ensure!(folder_exists, "No folder {folder}");
1049
1050 if target.is_empty() {
1052 self.delete_message_batch(context, &uid_set, rowid_set)
1053 .await
1054 .with_context(|| format!("cannot delete batch of messages {uid_set:?}"))?;
1055 } else {
1056 self.move_message_batch(context, &uid_set, rowid_set, &target)
1057 .await
1058 .with_context(|| {
1059 format!("cannot move batch of messages {uid_set:?} to folder {target:?}",)
1060 })?;
1061 }
1062 }
1063
1064 if let Err(err) = self.maybe_close_folder(context).await {
1067 warn!(context, "Failed to close folder: {err:#}.");
1068 }
1069
1070 Ok(())
1071 }
1072
1073 pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
1075 if context.get_config_bool(Config::TeamProfile).await? {
1076 return Ok(());
1077 }
1078
1079 context
1080 .sql
1081 .execute(
1082 "DELETE FROM imap_markseen WHERE id NOT IN (SELECT imap.id FROM imap)",
1083 (),
1084 )
1085 .await?;
1086
1087 let transport_id = self.transport_id();
1088 let mut rows = context
1089 .sql
1090 .query_map_vec(
1091 "SELECT imap.id, uid, folder FROM imap, imap_markseen
1092 WHERE imap.id = imap_markseen.id
1093 AND imap.transport_id=?
1094 AND target = folder",
1095 (transport_id,),
1096 |row| {
1097 let rowid: i64 = row.get(0)?;
1098 let uid: u32 = row.get(1)?;
1099 let folder: String = row.get(2)?;
1100 Ok((rowid, uid, folder))
1101 },
1102 )
1103 .await?;
1104
1105 rows.sort_unstable_by(|(_rowid1, uid1, folder1), (_rowid2, uid2, folder2)| {
1112 (folder1, uid1).cmp(&(folder2, uid2))
1113 });
1114
1115 for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
1116 let folder_exists = match self.select_with_uidvalidity(context, &folder).await {
1117 Err(err) => {
1118 warn!(
1119 context,
1120 "store_seen_flags_on_imap: Failed to select {folder}, will retry later: {err:#}."
1121 );
1122 continue;
1123 }
1124 Ok(folder_exists) => folder_exists,
1125 };
1126 if !folder_exists {
1127 warn!(context, "store_seen_flags_on_imap: No folder {folder}.");
1128 } else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
1129 warn!(
1130 context,
1131 "Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}."
1132 );
1133 continue;
1134 } else {
1135 info!(
1136 context,
1137 "Marked messages {} in folder {} as seen.", uid_set, folder
1138 );
1139 }
1140 context
1141 .sql
1142 .transaction(|transaction| {
1143 let mut stmt = transaction.prepare("DELETE FROM imap_markseen WHERE id = ?")?;
1144 for rowid in rowid_set {
1145 stmt.execute((rowid,))?;
1146 }
1147 Ok(())
1148 })
1149 .await
1150 .context("Cannot remove messages marked as seen from imap_markseen table")?;
1151 }
1152
1153 Ok(())
1154 }
1155
1156 #[expect(clippy::arithmetic_side_effects)]
1171 pub(crate) async fn fetch_many_msgs(
1172 &mut self,
1173 context: &Context,
1174 folder: &str,
1175 request_uids: Vec<u32>,
1176 uid_message_ids: &BTreeMap<u32, String>,
1177 received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1178 ) -> Result<()> {
1179 if request_uids.is_empty() {
1180 return Ok(());
1181 }
1182
1183 for (request_uids, set) in build_sequence_sets(&request_uids)? {
1184 info!(context, "Starting UID FETCH of message set \"{}\".", set);
1185 let mut fetch_responses = self
1186 .uid_fetch(&set, BODY_FULL)
1187 .await
1188 .with_context(|| format!("fetching messages {set} from folder {folder:?}"))?;
1189
1190 let mut uid_msgs = HashMap::with_capacity(request_uids.len());
1193
1194 let mut count = 0;
1195 for &request_uid in &request_uids {
1196 let mut fetch_response = uid_msgs.remove(&request_uid);
1198
1199 while fetch_response.is_none() {
1201 let Some(next_fetch_response) = fetch_responses
1202 .try_next()
1203 .await
1204 .context("Failed to process IMAP FETCH result")?
1205 else {
1206 break;
1208 };
1209
1210 if let Some(next_uid) = next_fetch_response.uid {
1211 if next_uid == request_uid {
1212 fetch_response = Some(next_fetch_response);
1213 } else if !request_uids.contains(&next_uid) {
1214 info!(
1221 context,
1222 "Skipping not requested FETCH response for UID {}.", next_uid
1223 );
1224 } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
1225 warn!(context, "Got duplicated UID {}.", next_uid);
1226 }
1227 } else {
1228 info!(context, "Skipping FETCH response without UID.");
1229 }
1230 }
1231
1232 let fetch_response = match fetch_response {
1233 Some(fetch) => fetch,
1234 None => {
1235 warn!(
1236 context,
1237 "Missed UID {} in the server response.", request_uid
1238 );
1239 continue;
1240 }
1241 };
1242 count += 1;
1243
1244 let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
1245 let body = fetch_response.body();
1246
1247 if is_deleted {
1248 info!(context, "Not processing deleted msg {}.", request_uid);
1249 received_msgs_channel.send((request_uid, None)).await?;
1250 continue;
1251 }
1252
1253 let body = if let Some(body) = body {
1254 body
1255 } else {
1256 info!(
1257 context,
1258 "Not processing message {} without a BODY.", request_uid
1259 );
1260 received_msgs_channel.send((request_uid, None)).await?;
1261 continue;
1262 };
1263
1264 let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
1265
1266 let Some(rfc724_mid) = uid_message_ids.get(&request_uid) else {
1267 error!(
1268 context,
1269 "No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
1270 request_uid
1271 );
1272 continue;
1273 };
1274
1275 info!(
1276 context,
1277 "Passing message UID {} to receive_imf().", request_uid
1278 );
1279 let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
1280
1281 let received_msg = match res {
1283 Err(err) => {
1284 warn!(context, "receive_imf error: {err:#}.");
1285
1286 let text = format!(
1287 "❌ Failed to receive a message: {err:#}. Core version v{DC_VERSION_STR}. Please report this bug to delta@merlinux.eu or https://support.delta.chat/.",
1288 );
1289 let mut msg = Message::new_text(text);
1290 add_device_msg(context, None, Some(&mut msg)).await?;
1291 None
1292 }
1293 Ok(msg) => msg,
1294 };
1295 received_msgs_channel
1296 .send((request_uid, received_msg))
1297 .await?;
1298 }
1299
1300 while fetch_responses
1307 .try_next()
1308 .await
1309 .context("Failed to drain FETCH responses")?
1310 .is_some()
1311 {}
1312
1313 if count != request_uids.len() {
1314 warn!(
1315 context,
1316 "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
1317 count,
1318 request_uids.len(),
1319 request_uids,
1320 );
1321 } else {
1322 info!(
1323 context,
1324 "Successfully received {} UIDs.",
1325 request_uids.len()
1326 );
1327 }
1328 }
1329
1330 Ok(())
1331 }
1332
1333 #[expect(clippy::arithmetic_side_effects)]
1339 pub(crate) async fn update_metadata(&mut self, context: &Context) -> Result<()> {
1340 let mut lock = context.metadata.write().await;
1341
1342 if !self.can_metadata() {
1343 *lock = Some(Default::default());
1344 }
1345 if let Some(ref mut old_metadata) = *lock {
1346 let now = time();
1347
1348 if now + 3600 * 12 < old_metadata.ice_servers_expiration_timestamp {
1350 return Ok(());
1351 }
1352
1353 let mut got_turn_server = false;
1354 if self.can_metadata() {
1355 info!(context, "ICE servers expired, requesting new credentials.");
1356 let mailbox = "";
1357 let options = "";
1358 let metadata = self
1359 .get_metadata(mailbox, options, "(/shared/vendor/deltachat/turn)")
1360 .await?;
1361 for m in metadata {
1362 if m.entry == "/shared/vendor/deltachat/turn"
1363 && let Some(value) = m.value
1364 {
1365 match create_ice_servers_from_metadata(&value).await {
1366 Ok((parsed_timestamp, parsed_ice_servers)) => {
1367 old_metadata.ice_servers_expiration_timestamp = parsed_timestamp;
1368 old_metadata.ice_servers = parsed_ice_servers;
1369 got_turn_server = true;
1370 }
1371 Err(err) => {
1372 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1373 }
1374 }
1375 }
1376 }
1377 }
1378 if !got_turn_server {
1379 info!(context, "Will use fallback ICE servers.");
1380 old_metadata.ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1382 old_metadata.ice_servers = create_fallback_ice_servers();
1383 }
1384 return Ok(());
1385 }
1386
1387 info!(
1388 context,
1389 "Server supports metadata, retrieving server comment and admin contact."
1390 );
1391
1392 let mut comment = None;
1393 let mut admin = None;
1394 let mut iroh_relay = None;
1395 let mut ice_servers = None;
1396 let mut ice_servers_expiration_timestamp = 0;
1397
1398 let mailbox = "";
1399 let options = "";
1400 let metadata = self
1401 .get_metadata(
1402 mailbox,
1403 options,
1404 "(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
1405 )
1406 .await?;
1407 for m in metadata {
1408 match m.entry.as_ref() {
1409 "/shared/comment" => {
1410 comment = m.value;
1411 }
1412 "/shared/admin" => {
1413 admin = m.value;
1414 }
1415 "/shared/vendor/deltachat/irohrelay" => {
1416 if let Some(value) = m.value {
1417 if let Ok(url) = Url::parse(&value) {
1418 iroh_relay = Some(url);
1419 } else {
1420 warn!(
1421 context,
1422 "Got invalid URL from iroh relay metadata: {:?}.", value
1423 );
1424 }
1425 }
1426 }
1427 "/shared/vendor/deltachat/turn" => {
1428 if let Some(value) = m.value {
1429 match create_ice_servers_from_metadata(&value).await {
1430 Ok((parsed_timestamp, parsed_ice_servers)) => {
1431 ice_servers_expiration_timestamp = parsed_timestamp;
1432 ice_servers = Some(parsed_ice_servers);
1433 }
1434 Err(err) => {
1435 warn!(context, "Failed to parse TURN server metadata: {err:#}.");
1436 }
1437 }
1438 }
1439 }
1440 _ => {}
1441 }
1442 }
1443 let ice_servers = if let Some(ice_servers) = ice_servers {
1444 ice_servers
1445 } else {
1446 ice_servers_expiration_timestamp = time() + 3600 * 24 * 7;
1448 create_fallback_ice_servers()
1449 };
1450
1451 *lock = Some(ServerMetadata {
1452 comment,
1453 admin,
1454 iroh_relay,
1455 ice_servers,
1456 ice_servers_expiration_timestamp,
1457 });
1458 Ok(())
1459 }
1460
1461 pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
1463 if context.push_subscribed.load(Ordering::Relaxed) {
1464 return Ok(());
1465 }
1466
1467 let transport_id = self.transport_id();
1468
1469 let Some(device_token) = context.push_subscriber.device_token().await else {
1470 return Ok(());
1471 };
1472
1473 if self.can_metadata() && self.can_push() {
1474 info!(
1475 context,
1476 "Transport {transport_id}: Subscribing for push notifications."
1477 );
1478
1479 let old_encrypted_device_token =
1480 context.get_config(Config::EncryptedDeviceToken).await?;
1481
1482 let device_token_changed = old_encrypted_device_token.is_none()
1484 || context.get_config(Config::DeviceToken).await?.as_ref() != Some(&device_token);
1485
1486 let new_encrypted_device_token;
1487 if device_token_changed {
1488 let encrypted_device_token = encrypt_device_token(&device_token)
1489 .context("Failed to encrypt device token")?;
1490
1491 let encrypted_device_token_len = encrypted_device_token.len();
1495
1496 context
1502 .set_config_internal(Config::DeviceToken, Some(&device_token))
1503 .await?;
1504 context
1505 .set_config_internal(
1506 Config::EncryptedDeviceToken,
1507 Some(&encrypted_device_token),
1508 )
1509 .await?;
1510
1511 if encrypted_device_token_len <= 4096 {
1512 new_encrypted_device_token = Some(encrypted_device_token);
1513 } else {
1514 warn!(context, "Device token is too long for LITERAL-, ignoring.");
1524 new_encrypted_device_token = None;
1525 }
1526 } else {
1527 new_encrypted_device_token = old_encrypted_device_token;
1528 }
1529
1530 if let Some(encrypted_device_token) = new_encrypted_device_token {
1533 self.run_command_and_check_ok(&format_setmetadata(
1534 "INBOX",
1535 &encrypted_device_token,
1536 ))
1537 .await
1538 .context("SETMETADATA command failed")?;
1539
1540 context.push_subscribed.store(true, Ordering::Relaxed);
1541 }
1542 } else if !context.push_subscriber.heartbeat_subscribed().await {
1543 let context = context.clone();
1544 tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
1546 }
1547
1548 Ok(())
1549 }
1550}
1551
1552fn format_setmetadata(folder: &str, device_token: &str) -> String {
1553 let device_token_len = device_token.len();
1554 format!(
1555 "SETMETADATA \"{folder}\" (/private/devicetoken {{{device_token_len}+}}\r\n{device_token})"
1556 )
1557}
1558
1559impl Session {
1560 async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
1566 if flag == "\\Deleted" {
1567 self.selected_folder_needs_expunge = true;
1568 }
1569 let query = format!("+FLAGS ({flag})");
1570 let mut responses = self
1571 .uid_store(uid_set, &query)
1572 .await
1573 .with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
1574 while let Some(_response) = responses.try_next().await? {
1575 }
1577 Ok(())
1578 }
1579}
1580
1581impl Session {
1582 fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
1591 use UnsolicitedResponse::*;
1592 use async_imap::imap_proto::Response;
1593 use async_imap::imap_proto::ResponseCode;
1594
1595 let folder = self.selected_folder.as_deref().unwrap_or_default();
1596 let mut should_refetch = false;
1597 while let Ok(response) = self.unsolicited_responses.try_recv() {
1598 match response {
1599 Exists(_) => {
1600 info!(
1601 context,
1602 "Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
1603 );
1604 should_refetch = true;
1605 }
1606
1607 Expunge(_) | Recent(_) => {}
1608 Other(ref response_data) => {
1609 match response_data.parsed() {
1610 Response::Fetch { .. } => {
1611 info!(
1612 context,
1613 "Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
1614 );
1615 should_refetch = true;
1616 }
1617
1618 Response::Done {
1621 code: Some(ResponseCode::CopyUid(_, _, _)),
1622 ..
1623 } => {}
1624
1625 _ => {
1626 info!(context, "{folder:?}: got unsolicited response {response:?}")
1627 }
1628 }
1629 }
1630 _ => {
1631 info!(context, "{folder:?}: got unsolicited response {response:?}")
1632 }
1633 }
1634 }
1635 Ok(should_refetch)
1636 }
1637}
1638
1639async fn should_move_out_of_spam(
1640 context: &Context,
1641 headers: &[mailparse::MailHeader<'_>],
1642) -> Result<bool> {
1643 if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
1644 return Ok(true);
1655 }
1656
1657 if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
1658 if msg.chat_blocked != Blocked::Not {
1659 return Ok(false);
1661 }
1662 } else {
1663 let from = match mimeparser::get_from(headers) {
1664 Some(f) => f,
1665 None => return Ok(false),
1666 };
1667 let (from_id, blocked_contact, _origin) =
1669 match from_field_to_contact_id(context, &from, None, true, true)
1670 .await
1671 .context("from_field_to_contact_id")?
1672 {
1673 Some(res) => res,
1674 None => {
1675 warn!(
1676 context,
1677 "Contact with From address {:?} cannot exist, not moving out of spam", from
1678 );
1679 return Ok(false);
1680 }
1681 };
1682 if blocked_contact {
1683 return Ok(false);
1685 }
1686
1687 if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
1688 if chat_id_blocked.blocked != Blocked::Not {
1689 return Ok(false);
1690 }
1691 } else if from_id != ContactId::SELF {
1692 return Ok(false);
1694 }
1695 }
1696
1697 Ok(true)
1698}
1699
1700async fn spam_target_folder_cfg(
1705 context: &Context,
1706 headers: &[mailparse::MailHeader<'_>],
1707) -> Result<Option<Config>> {
1708 if !should_move_out_of_spam(context, headers).await? {
1709 return Ok(None);
1710 }
1711
1712 Ok(Some(Config::ConfiguredInboxFolder))
1713}
1714
1715pub async fn target_folder_cfg(
1718 context: &Context,
1719 folder: &str,
1720 folder_meaning: FolderMeaning,
1721 headers: &[mailparse::MailHeader<'_>],
1722) -> Result<Option<Config>> {
1723 if folder == "DeltaChat" {
1724 return Ok(None);
1725 }
1726
1727 if folder_meaning == FolderMeaning::Spam {
1728 spam_target_folder_cfg(context, headers).await
1729 } else {
1730 Ok(None)
1731 }
1732}
1733
1734pub async fn target_folder(
1735 context: &Context,
1736 folder: &str,
1737 folder_meaning: FolderMeaning,
1738 headers: &[mailparse::MailHeader<'_>],
1739) -> Result<String> {
1740 match target_folder_cfg(context, folder, folder_meaning, headers).await? {
1741 Some(config) => match context.get_config(config).await? {
1742 Some(target) => Ok(target),
1743 None => Ok(folder.to_string()),
1744 },
1745 None => Ok(folder.to_string()),
1746 }
1747}
1748
1749fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
1756 const SPAM_NAMES: &[&str] = &[
1758 "spam",
1759 "junk",
1760 "Correio electrónico não solicitado",
1761 "Correo basura",
1762 "Lixo",
1763 "Nettsøppel",
1764 "Nevyžádaná pošta",
1765 "No solicitado",
1766 "Ongewenst",
1767 "Posta indesiderata",
1768 "Skräp",
1769 "Wiadomości-śmieci",
1770 "Önemsiz",
1771 "Ανεπιθύμητα",
1772 "Спам",
1773 "垃圾邮件",
1774 "垃圾郵件",
1775 "迷惑メール",
1776 "스팸",
1777 ];
1778 const TRASH_NAMES: &[&str] = &[
1779 "Trash",
1780 "Bin",
1781 "Caixote do lixo",
1782 "Cestino",
1783 "Corbeille",
1784 "Papelera",
1785 "Papierkorb",
1786 "Papirkurv",
1787 "Papperskorgen",
1788 "Prullenbak",
1789 "Rubujo",
1790 "Κάδος απορριμμάτων",
1791 "Корзина",
1792 "Кошик",
1793 "ゴミ箱",
1794 "垃圾桶",
1795 "已删除邮件",
1796 "휴지통",
1797 ];
1798 let lower = folder_name.to_lowercase();
1799
1800 if lower == "inbox" {
1801 FolderMeaning::Inbox
1802 } else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1803 FolderMeaning::Spam
1804 } else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
1805 FolderMeaning::Trash
1806 } else {
1807 FolderMeaning::Unknown
1808 }
1809}
1810
1811fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
1812 for attr in folder_attrs {
1813 match attr {
1814 NameAttribute::Trash => return FolderMeaning::Trash,
1815 NameAttribute::Junk => return FolderMeaning::Spam,
1816 NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
1817 NameAttribute::Extension(label) => {
1818 match label.as_ref() {
1819 "\\Spam" => return FolderMeaning::Spam,
1820 "\\Important" => return FolderMeaning::Virtual,
1821 _ => {}
1822 };
1823 }
1824 _ => {}
1825 }
1826 }
1827 FolderMeaning::Unknown
1828}
1829
1830pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
1831 match get_folder_meaning_by_attrs(folder.attributes()) {
1832 FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
1833 meaning => meaning,
1834 }
1835}
1836
1837fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader<'_>>> {
1839 match prefetch_msg.header() {
1840 Some(header_bytes) => {
1841 let (headers, _) = mailparse::parse_headers(header_bytes)?;
1842 Ok(headers)
1843 }
1844 None => Ok(Vec::new()),
1845 }
1846}
1847
1848pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
1849 headers
1850 .get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
1851 .or_else(|| headers.get_header_value(HeaderDef::MessageId))
1852 .and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
1853}
1854
1855pub(crate) fn create_message_id() -> String {
1856 format!("{}{}", GENERATED_PREFIX, create_id())
1857}
1858
1859pub(crate) async fn prefetch_should_download(
1861 context: &Context,
1862 headers: &[mailparse::MailHeader<'_>],
1863 message_id: &str,
1864 mut flags: impl Iterator<Item = Flag<'_>>,
1865) -> Result<bool> {
1866 if message::rfc724_mid_download_tried(context, message_id).await? {
1867 if let Some(from) = mimeparser::get_from(headers)
1868 && context.is_self_addr(&from.addr).await?
1869 {
1870 markseen_on_imap_table(context, message_id).await?;
1871 }
1872 return Ok(false);
1873 }
1874
1875 let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
1879 let from = from.to_ascii_lowercase();
1880 from.contains("mailer-daemon") || from.contains("mail-daemon")
1881 } else {
1882 false
1883 };
1884
1885 let from = match mimeparser::get_from(headers) {
1886 Some(f) => f,
1887 None => return Ok(false),
1888 };
1889 let (_from_id, blocked_contact, _origin) =
1890 match from_field_to_contact_id(context, &from, None, true, true).await? {
1891 Some(res) => res,
1892 None => return Ok(false),
1893 };
1894 let is_legacy_securejoin = headers.get_header_value(HeaderDef::SecureJoin).is_some();
1900
1901 let is_encrypted = headers
1902 .get_header_value(HeaderDef::ContentType)
1903 .is_some_and(|content_type| {
1904 mailparse::parse_content_type(&content_type).mimetype == "multipart/encrypted"
1905 });
1906
1907 if flags.any(|f| f == Flag::Draft) {
1908 info!(context, "Ignoring draft message");
1909 return Ok(false);
1910 }
1911
1912 let should_download = maybe_ndn
1913 || (!blocked_contact
1914 && (is_legacy_securejoin
1915 || is_encrypted
1916 || !context.get_config_bool(Config::ForceEncryption).await?));
1917 Ok(should_download)
1918}
1919
1920pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
1923 context
1924 .sql
1925 .execute(
1926 "INSERT OR IGNORE INTO imap_markseen (id)
1927 SELECT id FROM imap WHERE rfc724_mid=?",
1928 (message_id,),
1929 )
1930 .await?;
1931 context.scheduler.interrupt_inbox().await;
1932
1933 Ok(())
1934}
1935
1936pub(crate) async fn set_uid_next(
1940 context: &Context,
1941 transport_id: u32,
1942 folder: &str,
1943 uid_next: u32,
1944) -> Result<()> {
1945 context
1946 .sql
1947 .execute(
1948 "INSERT INTO imap_sync (transport_id, folder, uid_next) VALUES (?, ?,?)
1949 ON CONFLICT(transport_id, folder) DO UPDATE SET uid_next=excluded.uid_next",
1950 (transport_id, folder, uid_next),
1951 )
1952 .await?;
1953 Ok(())
1954}
1955
1956async fn get_uid_next(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
1962 Ok(context
1963 .sql
1964 .query_get_value(
1965 "SELECT uid_next FROM imap_sync WHERE transport_id=? AND folder=?",
1966 (transport_id, folder),
1967 )
1968 .await?
1969 .unwrap_or(0))
1970}
1971
1972pub(crate) async fn set_uidvalidity(
1973 context: &Context,
1974 transport_id: u32,
1975 folder: &str,
1976 uidvalidity: u32,
1977) -> Result<()> {
1978 context
1979 .sql
1980 .execute(
1981 "INSERT INTO imap_sync (transport_id, folder, uidvalidity) VALUES (?,?,?)
1982 ON CONFLICT(transport_id, folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
1983 (transport_id, folder, uidvalidity),
1984 )
1985 .await?;
1986 Ok(())
1987}
1988
1989async fn get_uidvalidity(context: &Context, transport_id: u32, folder: &str) -> Result<u32> {
1990 Ok(context
1991 .sql
1992 .query_get_value(
1993 "SELECT uidvalidity FROM imap_sync WHERE transport_id=? AND folder=?",
1994 (transport_id, folder),
1995 )
1996 .await?
1997 .unwrap_or(0))
1998}
1999
2000pub(crate) async fn set_modseq(
2001 context: &Context,
2002 transport_id: u32,
2003 folder: &str,
2004 modseq: u64,
2005) -> Result<()> {
2006 context
2007 .sql
2008 .execute(
2009 "INSERT INTO imap_sync (transport_id, folder, modseq) VALUES (?,?,?)
2010 ON CONFLICT(transport_id, folder) DO UPDATE SET modseq=excluded.modseq",
2011 (transport_id, folder, modseq),
2012 )
2013 .await?;
2014 Ok(())
2015}
2016
2017#[expect(clippy::arithmetic_side_effects)]
2021fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
2022 let mut ranges: Vec<UidRange> = vec![];
2024
2025 for ¤t in uids {
2026 if let Some(last) = ranges.last_mut()
2027 && last.end + 1 == current
2028 {
2029 last.end = current;
2030 continue;
2031 }
2032
2033 ranges.push(UidRange {
2034 start: current,
2035 end: current,
2036 });
2037 }
2038
2039 let mut result = vec![];
2041 let (mut last_uids, mut last_str) = (Vec::new(), String::new());
2042 for range in ranges {
2043 last_uids.reserve((range.end - range.start + 1).try_into()?);
2044 (range.start..=range.end).for_each(|u| last_uids.push(u));
2045 if !last_str.is_empty() {
2046 last_str.push(',');
2047 }
2048 last_str.push_str(&range.to_string());
2049
2050 if last_str.len() > 990 {
2051 result.push((take(&mut last_uids), take(&mut last_str)));
2052 }
2053 }
2054 result.push((last_uids, last_str));
2055
2056 result.retain(|(_, s)| !s.is_empty());
2057 Ok(result)
2058}
2059
2060struct UidRange {
2061 start: u32,
2062 end: u32,
2063 }
2065
2066impl std::fmt::Display for UidRange {
2067 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2068 if self.start == self.end {
2069 write!(f, "{}", self.start)
2070 } else {
2071 write!(f, "{}:{}", self.start, self.end)
2072 }
2073 }
2074}
2075
2076#[cfg(test)]
2077mod imap_tests;