use std::{
cmp::max,
cmp::min,
collections::{BTreeMap, BTreeSet, HashMap},
iter::Peekable,
mem::take,
sync::atomic::Ordering,
time::{Duration, UNIX_EPOCH},
};
use anyhow::{bail, format_err, Context as _, Result};
use async_channel::Receiver;
use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
use deltachat_contact_tools::ContactAddress;
use futures::{FutureExt as _, StreamExt, TryStreamExt};
use futures_lite::FutureExt;
use num_traits::FromPrimitive;
use rand::Rng;
use ratelimit::Ratelimit;
use url::Url;
use crate::chat::{self, ChatId, ChatIdBlocked};
use crate::chatlist_events;
use crate::config::Config;
use crate::constants::{self, Blocked, Chattype, ShowEmails};
use crate::contact::{Contact, ContactId, Modifier, Origin};
use crate::context::Context;
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::log::LogExt;
use crate::login_param::{
prioritize_server_login_params, ConfiguredLoginParam, ConfiguredServerLoginParam,
};
use crate::message::{self, Message, MessageState, MessengerMessage, MsgId};
use crate::mimeparser;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionStream;
use crate::oauth2::get_oauth2_access_token;
use crate::receive_imf::{
from_field_to_contact_id, get_prefetch_parent_message, receive_imf_inner, ReceivedMsg,
};
use crate::scheduler::connectivity::ConnectivityStore;
use crate::sql;
use crate::stock_str;
use crate::tools::{self, create_id, duration_to_str};
pub(crate) mod capabilities;
mod client;
mod idle;
pub mod scan_folders;
pub mod select_folder;
pub(crate) mod session;
use client::{determine_capabilities, Client};
use mailparse::SingleInfo;
use session::Session;
pub(crate) const GENERATED_PREFIX: &str = "GEN_";
const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
MESSAGE-ID \
X-MICROSOFT-ORIGINAL-MESSAGE-ID\
)])";
const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])";
#[derive(Debug)]
pub(crate) struct Imap {
pub(crate) idle_interrupt_receiver: Receiver<()>,
addr: String,
lp: Vec<ConfiguredServerLoginParam>,
password: String,
proxy_config: Option<ProxyConfig>,
strict_tls: bool,
oauth2: bool,
authentication_failed_once: bool,
pub(crate) connectivity: ConnectivityStore,
conn_last_try: tools::Time,
conn_backoff_ms: u64,
ratelimit: Ratelimit,
}
#[derive(Debug)]
struct OAuth2 {
user: String,
access_token: String,
}
#[derive(Debug)]
pub(crate) struct ServerMetadata {
pub comment: Option<String>,
pub admin: Option<String>,
pub iroh_relay: Option<Url>,
}
impl async_imap::Authenticator for OAuth2 {
type Response = String;
fn process(&mut self, _data: &[u8]) -> Self::Response {
format!(
"user={}\x01auth=Bearer {}\x01\x01",
self.user, self.access_token
)
}
}
#[derive(Debug, Display, PartialEq, Eq, Clone, Copy)]
pub enum FolderMeaning {
Unknown,
Spam,
Inbox,
Mvbox,
Sent,
Trash,
Drafts,
Virtual,
}
impl FolderMeaning {
pub fn to_config(self) -> Option<Config> {
match self {
FolderMeaning::Unknown => None,
FolderMeaning::Spam => None,
FolderMeaning::Inbox => Some(Config::ConfiguredInboxFolder),
FolderMeaning::Mvbox => Some(Config::ConfiguredMvboxFolder),
FolderMeaning::Sent => Some(Config::ConfiguredSentboxFolder),
FolderMeaning::Trash => Some(Config::ConfiguredTrashFolder),
FolderMeaning::Drafts => None,
FolderMeaning::Virtual => None,
}
}
}
struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
inner: Peekable<T>,
}
impl<T, I> From<I> for UidGrouper<T>
where
T: Iterator<Item = (i64, u32, String)>,
I: IntoIterator<IntoIter = T>,
{
fn from(inner: I) -> Self {
Self {
inner: inner.into_iter().peekable(),
}
}
}
impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
type Item = (String, Vec<i64>, String);
fn next(&mut self) -> Option<Self::Item> {
let (_, _, folder) = self.inner.peek().cloned()?;
let mut uid_set = String::new();
let mut rowid_set = Vec::new();
while uid_set.len() < 1000 {
if let Some((start_rowid, start_uid, _)) = self
.inner
.next_if(|(_, _, start_folder)| start_folder == &folder)
{
rowid_set.push(start_rowid);
let mut end_uid = start_uid;
while let Some((next_rowid, next_uid, _)) =
self.inner.next_if(|(_, next_uid, next_folder)| {
next_folder == &folder && (*next_uid == end_uid + 1 || *next_uid == end_uid)
})
{
end_uid = next_uid;
rowid_set.push(next_rowid);
}
let uid_range = UidRange {
start: start_uid,
end: end_uid,
};
if !uid_set.is_empty() {
uid_set.push(',');
}
uid_set.push_str(&uid_range.to_string());
} else {
break;
}
}
Some((folder, rowid_set, uid_set))
}
}
impl Imap {
pub fn new(
lp: Vec<ConfiguredServerLoginParam>,
password: String,
proxy_config: Option<ProxyConfig>,
addr: &str,
strict_tls: bool,
oauth2: bool,
idle_interrupt_receiver: Receiver<()>,
) -> Self {
Imap {
idle_interrupt_receiver,
addr: addr.to_string(),
lp,
password,
proxy_config,
strict_tls,
oauth2,
authentication_failed_once: false,
connectivity: Default::default(),
conn_last_try: UNIX_EPOCH,
conn_backoff_ms: 0,
ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
}
}
pub async fn new_configured(
context: &Context,
idle_interrupt_receiver: Receiver<()>,
) -> Result<Self> {
let param = ConfiguredLoginParam::load(context)
.await?
.context("Not configured")?;
let imap = Self::new(
param.imap.clone(),
param.imap_password.clone(),
param.proxy_config.clone(),
¶m.addr,
param.strict_tls(),
param.oauth2,
idle_interrupt_receiver,
);
Ok(imap)
}
pub(crate) async fn connect(
&mut self,
context: &Context,
configuring: bool,
) -> Result<Session> {
let now = tools::Time::now();
let until_can_send = max(
min(self.conn_last_try, now)
.checked_add(Duration::from_millis(self.conn_backoff_ms))
.unwrap_or(now),
now,
)
.duration_since(now)?;
let ratelimit_duration = max(until_can_send, self.ratelimit.until_can_send());
if !ratelimit_duration.is_zero() {
warn!(
context,
"IMAP got rate limited, waiting for {} until can connect.",
duration_to_str(ratelimit_duration),
);
let interrupted = async {
tokio::time::sleep(ratelimit_duration).await;
false
}
.race(self.idle_interrupt_receiver.recv().map(|_| true))
.await;
if interrupted {
info!(
context,
"Connecting to IMAP without waiting for ratelimit due to interrupt."
);
}
}
info!(context, "Connecting to IMAP server");
self.connectivity.set_connecting(context).await;
self.conn_last_try = tools::Time::now();
const BACKOFF_MIN_MS: u64 = 2000;
const BACKOFF_MAX_MS: u64 = 80_000;
self.conn_backoff_ms = min(self.conn_backoff_ms, BACKOFF_MAX_MS / 2);
self.conn_backoff_ms = self.conn_backoff_ms.saturating_add(
rand::thread_rng().gen_range((self.conn_backoff_ms / 2)..=self.conn_backoff_ms),
);
self.conn_backoff_ms = max(BACKOFF_MIN_MS, self.conn_backoff_ms);
let login_params = prioritize_server_login_params(&context.sql, &self.lp, "imap").await?;
let mut first_error = None;
for lp in login_params {
info!(context, "IMAP trying to connect to {}.", &lp.connection);
let connection_candidate = lp.connection.clone();
let client = match Client::connect(
context,
self.proxy_config.clone(),
self.strict_tls,
connection_candidate,
)
.await
{
Ok(client) => client,
Err(err) => {
warn!(context, "IMAP failed to connect: {err:#}.");
first_error.get_or_insert(err);
continue;
}
};
self.conn_backoff_ms = BACKOFF_MIN_MS;
self.ratelimit.send();
let imap_user: &str = lp.user.as_ref();
let imap_pw: &str = &self.password;
let login_res = if self.oauth2 {
info!(context, "Logging into IMAP server with OAuth 2.");
let addr: &str = self.addr.as_ref();
let token = get_oauth2_access_token(context, addr, imap_pw, true)
.await?
.context("IMAP could not get OAUTH token")?;
let auth = OAuth2 {
user: imap_user.into(),
access_token: token,
};
client.authenticate("XOAUTH2", auth).await
} else {
info!(context, "Logging into IMAP server with LOGIN.");
client.login(imap_user, imap_pw).await
};
match login_res {
Ok(mut session) => {
let capabilities = determine_capabilities(&mut session).await?;
let session = if capabilities.can_compress {
info!(context, "Enabling IMAP compression.");
let compressed_session = session
.compress(|s| {
let session_stream: Box<dyn SessionStream> = Box::new(s);
session_stream
})
.await
.context("Failed to enable IMAP compression")?;
Session::new(compressed_session, capabilities)
} else {
Session::new(session, capabilities)
};
let mut lock = context.server_id.write().await;
lock.clone_from(&session.capabilities.server_id);
self.authentication_failed_once = false;
context.emit_event(EventType::ImapConnected(format!(
"IMAP-LOGIN as {}",
lp.user
)));
self.connectivity.set_connected(context).await;
info!(context, "Successfully logged into IMAP server");
return Ok(session);
}
Err(err) => {
let imap_user = lp.user.to_owned();
let message = stock_str::cannot_login(context, &imap_user).await;
warn!(context, "IMAP failed to login: {err:#}.");
first_error.get_or_insert(format_err!("{message} ({err:#})"));
let _lock = context.wrong_pw_warning_mutex.lock().await;
if err.to_string().to_lowercase().contains("authentication") {
if self.authentication_failed_once
&& !configuring
&& context.get_config_bool(Config::NotifyAboutWrongPw).await?
{
let mut msg = Message::new_text(message);
if let Err(e) = chat::add_device_msg_with_importance(
context,
None,
Some(&mut msg),
true,
)
.await
{
warn!(context, "Failed to add device message: {e:#}.");
} else {
context
.set_config_internal(Config::NotifyAboutWrongPw, None)
.await
.log_err(context)
.ok();
}
} else {
self.authentication_failed_once = true;
}
} else {
self.authentication_failed_once = false;
}
}
}
}
Err(first_error.unwrap_or_else(|| format_err!("No IMAP connection candidates provided")))
}
pub(crate) async fn prepare(&mut self, context: &Context) -> Result<Session> {
let configuring = false;
let mut session = match self.connect(context, configuring).await {
Ok(session) => session,
Err(err) => {
self.connectivity.set_err(context, &err).await;
return Err(err);
}
};
let folders_configured = context
.sql
.get_raw_config_int(constants::DC_FOLDERS_CONFIGURED_KEY)
.await?;
if folders_configured.unwrap_or_default() < constants::DC_FOLDERS_CONFIGURED_VERSION {
let is_chatmail = match context.get_config_bool(Config::FixIsChatmail).await? {
false => session.is_chatmail(),
true => context.get_config_bool(Config::IsChatmail).await?,
};
let create_mvbox = !is_chatmail || context.get_config_bool(Config::MvboxMove).await?;
self.configure_folders(context, &mut session, create_mvbox)
.await?;
}
Ok(session)
}
pub async fn fetch_move_delete(
&mut self,
context: &Context,
session: &mut Session,
watch_folder: &str,
folder_meaning: FolderMeaning,
) -> Result<()> {
if !context.sql.is_open().await {
bail!("IMAP operation attempted while it is torn down");
}
let msgs_fetched = self
.fetch_new_messages(context, session, watch_folder, folder_meaning, false)
.await
.context("fetch_new_messages")?;
if msgs_fetched && context.get_config_delete_device_after().await?.is_some() {
context.scheduler.interrupt_ephemeral_task().await;
}
session
.move_delete_messages(context, watch_folder)
.await
.context("move_delete_messages")?;
Ok(())
}
pub(crate) async fn fetch_new_messages(
&mut self,
context: &Context,
session: &mut Session,
folder: &str,
folder_meaning: FolderMeaning,
fetch_existing_msgs: bool,
) -> Result<bool> {
if should_ignore_folder(context, folder, folder_meaning).await? {
info!(context, "Not fetching from {folder:?}.");
session.new_mail = false;
return Ok(false);
}
session
.select_with_uidvalidity(context, folder)
.await
.with_context(|| format!("Failed to select folder {folder:?}"))?;
if !session.new_mail && !fetch_existing_msgs {
info!(context, "No new emails in folder {folder:?}.");
return Ok(false);
}
session.new_mail = false;
let uid_validity = get_uidvalidity(context, folder).await?;
let old_uid_next = get_uid_next(context, folder).await?;
let msgs = if fetch_existing_msgs {
session
.prefetch_existing_msgs()
.await
.context("prefetch_existing_msgs")?
} else {
session.prefetch(old_uid_next).await.context("prefetch")?
};
let read_cnt = msgs.len();
let download_limit = context.download_limit().await?;
let mut uids_fetch = Vec::<(_, bool )>::with_capacity(msgs.len() + 1);
let mut uid_message_ids = BTreeMap::new();
let mut largest_uid_skipped = None;
let delete_target = context.get_delete_msgs_target().await?;
for (uid, ref fetch_response) in msgs {
let headers = match get_fetch_headers(fetch_response) {
Ok(headers) => headers,
Err(err) => {
warn!(context, "Failed to parse FETCH headers: {err:#}.");
continue;
}
};
let message_id = prefetch_get_message_id(&headers);
let _target;
let target = if let Some(message_id) = &message_id {
let msg_info =
message::rfc724_mid_exists_ex(context, message_id, "deleted=1").await?;
let delete = if let Some((_, _, true)) = msg_info {
info!(context, "Deleting locally deleted message {message_id}.");
true
} else if let Some((_, ts_sent_old, _)) = msg_info {
let is_chat_msg = headers.get_header_value(HeaderDef::ChatVersion).is_some();
let ts_sent = headers
.get_header_value(HeaderDef::Date)
.and_then(|v| mailparse::dateparse(&v).ok())
.unwrap_or_default();
let is_dup = is_dup_msg(is_chat_msg, ts_sent, ts_sent_old);
if is_dup {
info!(context, "Deleting duplicate message {message_id}.");
}
is_dup
} else {
false
};
if delete {
&delete_target
} else if context
.sql
.exists(
"SELECT COUNT (*) FROM imap WHERE rfc724_mid=?",
(message_id,),
)
.await?
{
info!(
context,
"Not moving the message {} that we have seen before.", &message_id
);
folder
} else {
_target = target_folder(context, folder, folder_meaning, &headers).await?;
&_target
}
} else {
warn!(
context,
"Not moving the message that does not have a Message-ID."
);
folder
};
let message_id = message_id.unwrap_or_else(create_message_id);
context
.sql
.execute(
"INSERT INTO imap (rfc724_mid, folder, uid, uidvalidity, target)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(folder, uid, uidvalidity)
DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
target=excluded.target",
(&message_id, &folder, uid, uid_validity, target),
)
.await?;
if folder == target
&& folder_meaning != FolderMeaning::Spam
&& prefetch_should_download(
context,
&headers,
&message_id,
fetch_response.flags(),
)
.await.context("prefetch_should_download")?
{
match download_limit {
Some(download_limit) => uids_fetch.push((
uid,
fetch_response.size.unwrap_or_default() > download_limit,
)),
None => uids_fetch.push((uid, false)),
}
uid_message_ids.insert(uid, message_id);
} else {
largest_uid_skipped = Some(uid);
}
}
if !uids_fetch.is_empty() {
self.connectivity.set_working(context).await;
}
let mut largest_uid_fetched: u32 = 0;
let mut received_msgs = Vec::with_capacity(uids_fetch.len());
let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1));
let mut fetch_partially = false;
uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1));
for (uid, fp) in uids_fetch {
if fp != fetch_partially {
let (largest_uid_fetched_in_batch, received_msgs_in_batch) = session
.fetch_many_msgs(
context,
folder,
uid_validity,
uids_fetch_in_batch.split_off(0),
&uid_message_ids,
fetch_partially,
fetch_existing_msgs,
)
.await
.context("fetch_many_msgs")?;
received_msgs.extend(received_msgs_in_batch);
largest_uid_fetched = max(
largest_uid_fetched,
largest_uid_fetched_in_batch.unwrap_or(0),
);
fetch_partially = fp;
}
uids_fetch_in_batch.push(uid);
}
let mailbox_uid_next = session
.selected_mailbox
.as_ref()
.with_context(|| format!("Expected {folder:?} to be selected"))?
.uid_next
.unwrap_or_default();
let new_uid_next = max(
max(largest_uid_fetched, largest_uid_skipped.unwrap_or(0)) + 1,
mailbox_uid_next,
);
if new_uid_next > old_uid_next {
set_uid_next(context, folder, new_uid_next).await?;
}
info!(context, "{} mails read from \"{}\".", read_cnt, folder);
if !received_msgs.is_empty() {
context.emit_event(EventType::IncomingMsgBunch);
}
chat::mark_old_messages_as_noticed(context, received_msgs).await?;
Ok(read_cnt > 0)
}
pub(crate) async fn fetch_existing_msgs(
&mut self,
context: &Context,
session: &mut Session,
) -> Result<()> {
add_all_recipients_as_contacts(context, session, Config::ConfiguredSentboxFolder)
.await
.context("failed to get recipients from the sentbox")?;
add_all_recipients_as_contacts(context, session, Config::ConfiguredMvboxFolder)
.await
.context("failed to get recipients from the movebox")?;
add_all_recipients_as_contacts(context, session, Config::ConfiguredInboxFolder)
.await
.context("failed to get recipients from the inbox")?;
if context.get_config_bool(Config::FetchExistingMsgs).await? {
for meaning in [
FolderMeaning::Mvbox,
FolderMeaning::Inbox,
FolderMeaning::Sent,
] {
let config = match meaning.to_config() {
Some(c) => c,
None => continue,
};
if let Some(folder) = context.get_config(config).await? {
info!(
context,
"Fetching existing messages from folder {folder:?}."
);
self.fetch_new_messages(context, session, &folder, meaning, true)
.await
.context("could not fetch existing messages")?;
}
}
}
info!(context, "Done fetching existing messages.");
Ok(())
}
}
impl Session {
pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
let all_folders = self
.list_folders()
.await
.context("listing folders for resync")?;
for folder in all_folders {
let folder_meaning = get_folder_meaning(&folder);
if folder_meaning != FolderMeaning::Virtual {
self.resync_folder_uids(context, folder.name(), folder_meaning)
.await?;
}
}
Ok(())
}
pub(crate) async fn resync_folder_uids(
&mut self,
context: &Context,
folder: &str,
folder_meaning: FolderMeaning,
) -> Result<()> {
let mut msgs = BTreeMap::new();
self.select_with_uidvalidity(context, folder).await?;
let mut list = self
.uid_fetch("1:*", RFC724MID_UID)
.await
.with_context(|| format!("can't resync folder {folder}"))?;
while let Some(fetch) = list.try_next().await? {
let headers = match get_fetch_headers(&fetch) {
Ok(headers) => headers,
Err(err) => {
warn!(context, "Failed to parse FETCH headers: {}", err);
continue;
}
};
let message_id = prefetch_get_message_id(&headers);
if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
msgs.insert(
uid,
(
rfc724_mid,
target_folder(context, folder, folder_meaning, &headers).await?,
),
);
}
}
info!(
context,
"Resync: collected {} message IDs in folder {}",
msgs.len(),
folder,
);
let uid_validity = get_uidvalidity(context, folder).await?;
context
.sql
.transaction(move |transaction| {
transaction.execute("DELETE FROM imap WHERE folder=?", (folder,))?;
for (uid, (rfc724_mid, target)) in &msgs {
transaction.execute(
"INSERT INTO imap (rfc724_mid, folder, uid, uidvalidity, target)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(folder, uid, uidvalidity)
DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
target=excluded.target",
(rfc724_mid, folder, uid, uid_validity, target),
)?;
}
Ok(())
})
.await?;
Ok(())
}
async fn delete_message_batch(
&mut self,
context: &Context,
uid_set: &str,
row_ids: Vec<i64>,
) -> Result<()> {
self.add_flag_finalized_with_set(uid_set, "\\Deleted")
.await?;
context
.sql
.execute(
&format!(
"DELETE FROM imap WHERE id IN ({})",
sql::repeat_vars(row_ids.len())
),
rusqlite::params_from_iter(row_ids),
)
.await
.context("cannot remove deleted messages from imap table")?;
context.emit_event(EventType::ImapMessageDeleted(format!(
"IMAP messages {uid_set} marked as deleted"
)));
Ok(())
}
async fn move_message_batch(
&mut self,
context: &Context,
set: &str,
row_ids: Vec<i64>,
target: &str,
) -> Result<()> {
if self.can_move() {
match self.uid_mv(set, &target).await {
Ok(()) => {
context
.sql
.execute(
&format!(
"DELETE FROM imap WHERE id IN ({})",
sql::repeat_vars(row_ids.len())
),
rusqlite::params_from_iter(row_ids),
)
.await
.context("cannot delete moved messages from imap table")?;
context.emit_event(EventType::ImapMessageMoved(format!(
"IMAP messages {set} moved to {target}"
)));
return Ok(());
}
Err(err) => {
if context.should_delete_to_trash().await? {
error!(
context,
"Cannot move messages {} to {}, no fallback to COPY/DELETE because \
delete_to_trash is set. Error: {:#}",
set,
target,
err,
);
return Err(err.into());
}
warn!(
context,
"Cannot move messages, fallback to COPY/DELETE {} to {}: {}",
set,
target,
err
);
}
}
}
let copy = !context.is_trash(target).await?;
if copy {
info!(
context,
"Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
);
self.uid_copy(&set, &target).await?;
} else {
error!(
context,
"Server does not support MOVE, fallback to DELETE {} to {}", set, target,
);
}
context
.sql
.execute(
&format!(
"UPDATE imap SET target='' WHERE id IN ({})",
sql::repeat_vars(row_ids.len())
),
rusqlite::params_from_iter(row_ids),
)
.await
.context("cannot plan deletion of messages")?;
if copy {
context.emit_event(EventType::ImapMessageMoved(format!(
"IMAP messages {set} copied to {target}"
)));
}
Ok(())
}
async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
let rows = context
.sql
.query_map(
"SELECT id, uid, target FROM imap
WHERE folder = ?
AND target != folder
ORDER BY target, uid",
(folder,),
|row| {
let rowid: i64 = row.get(0)?;
let uid: u32 = row.get(1)?;
let target: String = row.get(2)?;
Ok((rowid, uid, target))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
self.select_with_uidvalidity(context, folder).await?;
if target.is_empty() {
self.delete_message_batch(context, &uid_set, rowid_set)
.await
.with_context(|| format!("cannot delete batch of messages {:?}", &uid_set))?;
} else {
self.move_message_batch(context, &uid_set, rowid_set, &target)
.await
.with_context(|| {
format!(
"cannot move batch of messages {:?} to folder {:?}",
&uid_set, target
)
})?;
}
}
if let Err(err) = self.maybe_close_folder(context).await {
warn!(context, "failed to close folder: {:?}", err);
}
Ok(())
}
pub(crate) async fn send_sync_msgs(&mut self, context: &Context, folder: &str) -> Result<()> {
context.send_sync_msg().await?;
while let Some((id, mime, msg_id, attempts)) = context
.sql
.query_row_optional(
"SELECT id, mime, msg_id, attempts FROM imap_send ORDER BY id LIMIT 1",
(),
|row| {
let id: i64 = row.get(0)?;
let mime: String = row.get(1)?;
let msg_id: MsgId = row.get(2)?;
let attempts: i64 = row.get(3)?;
Ok((id, mime, msg_id, attempts))
},
)
.await
.context("Failed to SELECT from imap_send")?
{
let res = self
.append(folder, Some("(\\Seen)"), None, mime)
.await
.with_context(|| format!("IMAP APPEND to {folder} failed for {msg_id}"))
.log_err(context);
if res.is_ok() {
msg_id.set_delivered(context).await?;
}
const MAX_ATTEMPTS: i64 = 2;
if res.is_ok() || attempts >= MAX_ATTEMPTS - 1 {
context
.sql
.execute("DELETE FROM imap_send WHERE id=?", (id,))
.await
.context("Failed to delete from imap_send")?;
} else {
context
.sql
.execute("UPDATE imap_send SET attempts=attempts+1 WHERE id=?", (id,))
.await
.context("Failed to update imap_send.attempts")?;
res?;
}
}
Ok(())
}
pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
let rows = context
.sql
.query_map(
"SELECT imap.id, uid, folder FROM imap, imap_markseen
WHERE imap.id = imap_markseen.id AND target = folder
ORDER BY folder, uid",
[],
|row| {
let rowid: i64 = row.get(0)?;
let uid: u32 = row.get(1)?;
let folder: String = row.get(2)?;
Ok((rowid, uid, folder))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
if let Err(err) = self.select_with_uidvalidity(context, &folder).await {
warn!(context, "store_seen_flags_on_imap: Failed to select {folder}, will retry later: {err:#}.");
} else if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
warn!(
context,
"Cannot mark messages {uid_set} in {folder} as seen, will retry later: {err:#}.");
} else {
info!(
context,
"Marked messages {} in folder {} as seen.", uid_set, folder
);
context
.sql
.execute(
&format!(
"DELETE FROM imap_markseen WHERE id IN ({})",
sql::repeat_vars(rowid_set.len())
),
rusqlite::params_from_iter(rowid_set),
)
.await
.context("cannot remove messages marked as seen from imap_markseen table")?;
}
}
Ok(())
}
pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
if !self.can_condstore() {
info!(
context,
"Server does not support CONDSTORE, skipping flag synchronization."
);
return Ok(());
}
self.select_with_uidvalidity(context, folder)
.await
.context("failed to select folder")?;
let mailbox = self
.selected_mailbox
.as_ref()
.with_context(|| format!("No mailbox selected, folder: {folder}"))?;
if mailbox.highest_modseq.is_none() {
info!(
context,
"Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
);
return Ok(());
}
let mut updated_chat_ids = BTreeSet::new();
let uid_validity = get_uidvalidity(context, folder)
.await
.with_context(|| format!("failed to get UID validity for folder {folder}"))?;
let mut highest_modseq = get_modseq(context, folder)
.await
.with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
let mut list = self
.uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
.await
.context("failed to fetch flags")?;
let mut got_unsolicited_fetch = false;
while let Some(fetch) = list
.try_next()
.await
.context("failed to get FETCH result")?
{
let uid = if let Some(uid) = fetch.uid {
uid
} else {
info!(context, "FETCH result contains no UID, skipping");
got_unsolicited_fetch = true;
continue;
};
let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
if is_seen {
if let Some(chat_id) = mark_seen_by_uid(context, folder, uid_validity, uid)
.await
.with_context(|| {
format!("failed to update seen status for msg {folder}/{uid}")
})?
{
updated_chat_ids.insert(chat_id);
}
}
if let Some(modseq) = fetch.modseq {
if modseq > highest_modseq {
highest_modseq = modseq;
}
} else {
warn!(context, "FETCH result contains no MODSEQ");
}
}
drop(list);
if got_unsolicited_fetch {
self.new_mail = true;
}
set_modseq(context, folder, highest_modseq)
.await
.with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
if !updated_chat_ids.is_empty() {
context.on_archived_chats_maybe_noticed();
}
for updated_chat_id in updated_chat_ids {
context.emit_event(EventType::MsgsNoticed(updated_chat_id));
chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
}
Ok(())
}
pub async fn get_all_recipients(&mut self, context: &Context) -> Result<Vec<SingleInfo>> {
let mut uids: Vec<_> = self
.uid_search(get_imap_self_sent_search_command(context).await?)
.await?
.into_iter()
.collect();
uids.sort_unstable();
let mut result = Vec::new();
for (_, uid_set) in build_sequence_sets(&uids)? {
let mut list = self
.uid_fetch(uid_set, "(UID BODY.PEEK[HEADER.FIELDS (FROM TO CC BCC)])")
.await
.context("IMAP Could not fetch")?;
while let Some(msg) = list.try_next().await? {
match get_fetch_headers(&msg) {
Ok(headers) => {
if let Some(from) = mimeparser::get_from(&headers) {
if context.is_self_addr(&from.addr).await? {
result.extend(mimeparser::get_recipients(&headers));
}
}
}
Err(err) => {
warn!(context, "{}", err);
continue;
}
};
}
}
Ok(result)
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn fetch_many_msgs(
&mut self,
context: &Context,
folder: &str,
uidvalidity: u32,
request_uids: Vec<u32>,
uid_message_ids: &BTreeMap<u32, String>,
fetch_partially: bool,
fetching_existing_messages: bool,
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
let mut last_uid = None;
let mut received_msgs = Vec::new();
if request_uids.is_empty() {
return Ok((last_uid, received_msgs));
}
for (request_uids, set) in build_sequence_sets(&request_uids)? {
info!(
context,
"Starting a {} FETCH of message set \"{}\".",
if fetch_partially { "partial" } else { "full" },
set
);
let mut fetch_responses = self
.uid_fetch(
&set,
if fetch_partially {
BODY_PARTIAL
} else {
BODY_FULL
},
)
.await
.with_context(|| {
format!("fetching messages {} from folder \"{}\"", &set, folder)
})?;
let mut uid_msgs = HashMap::with_capacity(request_uids.len());
let mut count = 0;
for &request_uid in &request_uids {
let mut fetch_response = uid_msgs.remove(&request_uid);
while fetch_response.is_none() {
let next_fetch_response =
if let Some(next_fetch_response) = fetch_responses.next().await {
next_fetch_response
} else {
break;
};
let next_fetch_response =
next_fetch_response.context("Failed to process IMAP FETCH result")?;
if let Some(next_uid) = next_fetch_response.uid {
if next_uid == request_uid {
fetch_response = Some(next_fetch_response);
} else if !request_uids.contains(&next_uid) {
info!(
context,
"Skipping not requested FETCH response for UID {}.", next_uid
);
} else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
warn!(context, "Got duplicated UID {}.", next_uid);
}
} else {
info!(context, "Skipping FETCH response without UID.");
}
}
let fetch_response = match fetch_response {
Some(fetch) => fetch,
None => {
warn!(
context,
"Missed UID {} in the server response.", request_uid
);
continue;
}
};
count += 1;
let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
let (body, partial) = if fetch_partially {
(fetch_response.header(), fetch_response.size) } else {
(fetch_response.body(), None) };
if is_deleted {
info!(context, "Not processing deleted msg {}.", request_uid);
last_uid = Some(request_uid);
continue;
}
let body = if let Some(body) = body {
body
} else {
info!(
context,
"Not processing message {} without a BODY.", request_uid
);
last_uid = Some(request_uid);
continue;
};
let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&request_uid) {
rfc724_mid
} else {
error!(
context,
"No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
request_uid
);
continue;
};
info!(
context,
"Passing message UID {} to receive_imf().", request_uid
);
match receive_imf_inner(
context,
folder,
uidvalidity,
request_uid,
rfc724_mid,
body,
is_seen,
partial,
fetching_existing_messages,
)
.await
{
Ok(received_msg) => {
if let Some(m) = received_msg {
received_msgs.push(m);
}
}
Err(err) => {
warn!(context, "receive_imf error: {:#}.", err);
}
};
last_uid = Some(request_uid)
}
while fetch_responses.next().await.is_some() {}
if count != request_uids.len() {
warn!(
context,
"Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
count,
request_uids.len(),
request_uids,
);
} else {
info!(
context,
"Successfully received {} UIDs.",
request_uids.len()
);
}
}
Ok((last_uid, received_msgs))
}
pub(crate) async fn fetch_metadata(&mut self, context: &Context) -> Result<()> {
if !self.can_metadata() {
return Ok(());
}
let mut lock = context.metadata.write().await;
if (*lock).is_some() {
return Ok(());
}
info!(
context,
"Server supports metadata, retrieving server comment and admin contact."
);
let mut comment = None;
let mut admin = None;
let mut iroh_relay = None;
let mailbox = "";
let options = "";
let metadata = self
.get_metadata(
mailbox,
options,
"(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay)",
)
.await?;
for m in metadata {
match m.entry.as_ref() {
"/shared/comment" => {
comment = m.value;
}
"/shared/admin" => {
admin = m.value;
}
"/shared/vendor/deltachat/irohrelay" => {
if let Some(value) = m.value {
if let Ok(url) = Url::parse(&value) {
iroh_relay = Some(url);
} else {
warn!(
context,
"Got invalid URL from iroh relay metadata: {:?}.", value
);
}
}
}
_ => {}
}
}
*lock = Some(ServerMetadata {
comment,
admin,
iroh_relay,
});
Ok(())
}
pub(crate) async fn register_token(&mut self, context: &Context) -> Result<()> {
if context.push_subscribed.load(Ordering::Relaxed) {
return Ok(());
}
let Some(device_token) = context.push_subscriber.device_token().await else {
return Ok(());
};
if self.can_metadata() && self.can_push() {
let folder = context
.get_config(Config::ConfiguredInboxFolder)
.await?
.context("INBOX is not configured")?;
self.run_command_and_check_ok(format!(
"SETMETADATA \"{folder}\" (/private/devicetoken \"{device_token}\")"
))
.await
.context("SETMETADATA command failed")?;
context.push_subscribed.store(true, Ordering::Relaxed);
} else if !context.push_subscriber.heartbeat_subscribed().await {
let context = context.clone();
tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
}
Ok(())
}
}
impl Session {
async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
if flag == "\\Deleted" {
self.selected_folder_needs_expunge = true;
}
let query = format!("+FLAGS ({flag})");
let mut responses = self
.uid_store(uid_set, &query)
.await
.with_context(|| format!("IMAP failed to store: ({uid_set}, {query})"))?;
while let Some(_response) = responses.next().await {
}
Ok(())
}
async fn configure_mvbox<'a>(
&mut self,
context: &Context,
folders: &[&'a str],
create_mvbox: bool,
) -> Result<Option<&'a str>> {
self.maybe_close_folder(context).await?;
for folder in folders {
info!(context, "Looking for MVBOX-folder \"{}\"...", &folder);
let res = self.examine(&folder).await;
if res.is_ok() {
info!(
context,
"MVBOX-folder {:?} successfully selected, using it.", &folder
);
self.close().await?;
self.select_with_uidvalidity(context, folder).await?;
return Ok(Some(folder));
}
}
if !create_mvbox {
return Ok(None);
}
for folder in folders {
match self.select_with_uidvalidity(context, folder).await {
Ok(_) => {
info!(context, "MVBOX-folder {} created.", folder);
return Ok(Some(folder));
}
Err(err) => {
warn!(context, "Cannot create MVBOX-folder {:?}: {}", folder, err);
}
}
}
Ok(None)
}
}
impl Imap {
pub(crate) async fn configure_folders(
&mut self,
context: &Context,
session: &mut Session,
create_mvbox: bool,
) -> Result<()> {
let mut folders = session
.list(Some(""), Some("*"))
.await
.context("list_folders failed")?;
let mut delimiter = ".".to_string();
let mut delimiter_is_default = true;
let mut folder_configs = BTreeMap::new();
while let Some(folder) = folders.try_next().await? {
info!(context, "Scanning folder: {:?}", folder);
if let Some(d) = folder.delimiter() {
if delimiter_is_default && !d.is_empty() && delimiter != d {
delimiter = d.to_string();
delimiter_is_default = false;
}
}
let folder_meaning = get_folder_meaning_by_attrs(folder.attributes());
let folder_name_meaning = get_folder_meaning_by_name(folder.name());
if let Some(config) = folder_meaning.to_config() {
folder_configs.insert(config, folder.name().to_string());
} else if let Some(config) = folder_name_meaning.to_config() {
folder_configs
.entry(config)
.or_insert_with(|| folder.name().to_string());
}
}
drop(folders);
info!(context, "Using \"{}\" as folder-delimiter.", delimiter);
let fallback_folder = format!("INBOX{delimiter}DeltaChat");
let mvbox_folder = session
.configure_mvbox(context, &["DeltaChat", &fallback_folder], create_mvbox)
.await
.context("failed to configure mvbox")?;
context
.set_config_internal(Config::ConfiguredInboxFolder, Some("INBOX"))
.await?;
if let Some(mvbox_folder) = mvbox_folder {
info!(context, "Setting MVBOX FOLDER TO {}", &mvbox_folder);
context
.set_config_internal(Config::ConfiguredMvboxFolder, Some(mvbox_folder))
.await?;
}
for (config, name) in folder_configs {
context.set_config_internal(config, Some(&name)).await?;
}
context
.sql
.set_raw_config_int(
constants::DC_FOLDERS_CONFIGURED_KEY,
constants::DC_FOLDERS_CONFIGURED_VERSION,
)
.await?;
info!(context, "FINISHED configuring IMAP-folders.");
Ok(())
}
}
impl Session {
fn drain_unsolicited_responses(&self, context: &Context) -> Result<bool> {
use async_imap::imap_proto::Response;
use async_imap::imap_proto::ResponseCode;
use UnsolicitedResponse::*;
let folder = self.selected_folder.as_deref().unwrap_or_default();
let mut should_refetch = false;
while let Ok(response) = self.unsolicited_responses.try_recv() {
match response {
Exists(_) => {
info!(
context,
"Need to refetch {folder:?}, got unsolicited EXISTS {response:?}"
);
should_refetch = true;
}
Expunge(_) | Recent(_) => {}
Other(ref response_data) => {
match response_data.parsed() {
Response::Fetch { .. } => {
info!(
context,
"Need to refetch {folder:?}, got unsolicited FETCH {response:?}"
);
should_refetch = true;
}
Response::Done {
code: Some(ResponseCode::CopyUid(_, _, _)),
..
} => {}
_ => {
info!(context, "{folder:?}: got unsolicited response {response:?}")
}
}
}
_ => {
info!(context, "{folder:?}: got unsolicited response {response:?}")
}
}
}
Ok(should_refetch)
}
}
async fn should_move_out_of_spam(
context: &Context,
headers: &[mailparse::MailHeader<'_>],
) -> Result<bool> {
if headers.get_header_value(HeaderDef::ChatVersion).is_some() {
return Ok(true);
}
if let Some(msg) = get_prefetch_parent_message(context, headers).await? {
if msg.chat_blocked != Blocked::Not {
return Ok(false);
}
} else {
let from = match mimeparser::get_from(headers) {
Some(f) => f,
None => return Ok(false),
};
let (from_id, blocked_contact, _origin) =
match from_field_to_contact_id(context, &from, true)
.await
.context("from_field_to_contact_id")?
{
Some(res) => res,
None => {
warn!(
context,
"Contact with From address {:?} cannot exist, not moving out of spam", from
);
return Ok(false);
}
};
if blocked_contact {
return Ok(false);
}
if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? {
if chat_id_blocked.blocked != Blocked::Not {
return Ok(false);
}
} else if from_id != ContactId::SELF {
return Ok(false);
}
}
Ok(true)
}
async fn spam_target_folder_cfg(
context: &Context,
headers: &[mailparse::MailHeader<'_>],
) -> Result<Option<Config>> {
if !should_move_out_of_spam(context, headers).await? {
return Ok(None);
}
if needs_move_to_mvbox(context, headers).await?
|| context.get_config_bool(Config::OnlyFetchMvbox).await?
{
Ok(Some(Config::ConfiguredMvboxFolder))
} else {
Ok(Some(Config::ConfiguredInboxFolder))
}
}
pub async fn target_folder_cfg(
context: &Context,
folder: &str,
folder_meaning: FolderMeaning,
headers: &[mailparse::MailHeader<'_>],
) -> Result<Option<Config>> {
if context.is_mvbox(folder).await? {
return Ok(None);
}
if folder_meaning == FolderMeaning::Spam {
spam_target_folder_cfg(context, headers).await
} else if needs_move_to_mvbox(context, headers).await? {
Ok(Some(Config::ConfiguredMvboxFolder))
} else {
Ok(None)
}
}
pub async fn target_folder(
context: &Context,
folder: &str,
folder_meaning: FolderMeaning,
headers: &[mailparse::MailHeader<'_>],
) -> Result<String> {
match target_folder_cfg(context, folder, folder_meaning, headers).await? {
Some(config) => match context.get_config(config).await? {
Some(target) => Ok(target),
None => Ok(folder.to_string()),
},
None => Ok(folder.to_string()),
}
}
async fn needs_move_to_mvbox(
context: &Context,
headers: &[mailparse::MailHeader<'_>],
) -> Result<bool> {
let has_chat_version = headers.get_header_value(HeaderDef::ChatVersion).is_some();
if !context.get_config_bool(Config::IsChatmail).await?
&& has_chat_version
&& headers
.get_header_value(HeaderDef::AutoSubmitted)
.filter(|val| val.eq_ignore_ascii_case("auto-generated"))
.is_some()
{
if let Some(from) = mimeparser::get_from(headers) {
if context.is_self_addr(&from.addr).await? {
return Ok(true);
}
}
}
if !context.get_config_bool(Config::MvboxMove).await? {
return Ok(false);
}
if headers
.get_header_value(HeaderDef::AutocryptSetupMessage)
.is_some()
{
return Ok(false);
}
if has_chat_version {
Ok(true)
} else if let Some(parent) = get_prefetch_parent_message(context, headers).await? {
match parent.is_dc_message {
MessengerMessage::No => Ok(false),
MessengerMessage::Yes | MessengerMessage::Reply => Ok(true),
}
} else {
Ok(false)
}
}
fn get_folder_meaning_by_name(folder_name: &str) -> FolderMeaning {
const SENT_NAMES: &[&str] = &[
"sent",
"sentmail",
"sent objects",
"gesendet",
"Sent Mail",
"Sendte e-mails",
"Enviados",
"Messages envoyés",
"Messages envoyes",
"Posta inviata",
"Verzonden berichten",
"Wyslane",
"E-mails enviados",
"Correio enviado",
"Enviada",
"Enviado",
"Gönderildi",
"Inviati",
"Odeslaná pošta",
"Sendt",
"Skickat",
"Verzonden",
"Wysłane",
"Éléments envoyés",
"Απεσταλμένα",
"Отправленные",
"寄件備份",
"已发送邮件",
"送信済み",
"보낸편지함",
];
const SPAM_NAMES: &[&str] = &[
"spam",
"junk",
"Correio electrónico não solicitado",
"Correo basura",
"Lixo",
"Nettsøppel",
"Nevyžádaná pošta",
"No solicitado",
"Ongewenst",
"Posta indesiderata",
"Skräp",
"Wiadomości-śmieci",
"Önemsiz",
"Ανεπιθύμητα",
"Спам",
"垃圾邮件",
"垃圾郵件",
"迷惑メール",
"스팸",
];
const DRAFT_NAMES: &[&str] = &[
"Drafts",
"Kladder",
"Entw?rfe",
"Borradores",
"Brouillons",
"Bozze",
"Concepten",
"Wersje robocze",
"Rascunhos",
"Entwürfe",
"Koncepty",
"Kopie robocze",
"Taslaklar",
"Utkast",
"Πρόχειρα",
"Черновики",
"下書き",
"草稿",
"임시보관함",
];
const TRASH_NAMES: &[&str] = &[
"Trash",
"Bin",
"Caixote do lixo",
"Cestino",
"Corbeille",
"Papelera",
"Papierkorb",
"Papirkurv",
"Papperskorgen",
"Prullenbak",
"Rubujo",
"Κάδος απορριμμάτων",
"Корзина",
"Кошик",
"ゴミ箱",
"垃圾桶",
"已删除邮件",
"휴지통",
];
let lower = folder_name.to_lowercase();
if SENT_NAMES.iter().any(|s| s.to_lowercase() == lower) {
FolderMeaning::Sent
} else if SPAM_NAMES.iter().any(|s| s.to_lowercase() == lower) {
FolderMeaning::Spam
} else if DRAFT_NAMES.iter().any(|s| s.to_lowercase() == lower) {
FolderMeaning::Drafts
} else if TRASH_NAMES.iter().any(|s| s.to_lowercase() == lower) {
FolderMeaning::Trash
} else {
FolderMeaning::Unknown
}
}
fn get_folder_meaning_by_attrs(folder_attrs: &[NameAttribute]) -> FolderMeaning {
for attr in folder_attrs {
match attr {
NameAttribute::Trash => return FolderMeaning::Trash,
NameAttribute::Sent => return FolderMeaning::Sent,
NameAttribute::Junk => return FolderMeaning::Spam,
NameAttribute::Drafts => return FolderMeaning::Drafts,
NameAttribute::All | NameAttribute::Flagged => return FolderMeaning::Virtual,
NameAttribute::Extension(ref label) => {
match label.as_ref() {
"\\Spam" => return FolderMeaning::Spam,
"\\Important" => return FolderMeaning::Virtual,
_ => {}
};
}
_ => {}
}
}
FolderMeaning::Unknown
}
pub(crate) fn get_folder_meaning(folder: &Name) -> FolderMeaning {
match get_folder_meaning_by_attrs(folder.attributes()) {
FolderMeaning::Unknown => get_folder_meaning_by_name(folder.name()),
meaning => meaning,
}
}
fn get_fetch_headers(prefetch_msg: &Fetch) -> Result<Vec<mailparse::MailHeader>> {
match prefetch_msg.header() {
Some(header_bytes) => {
let (headers, _) = mailparse::parse_headers(header_bytes)?;
Ok(headers)
}
None => Ok(Vec::new()),
}
}
pub(crate) fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Option<String> {
headers
.get_header_value(HeaderDef::XMicrosoftOriginalMessageId)
.or_else(|| headers.get_header_value(HeaderDef::MessageId))
.and_then(|msgid| mimeparser::parse_message_id(&msgid).ok())
}
pub(crate) fn create_message_id() -> String {
format!("{}{}", GENERATED_PREFIX, create_id())
}
async fn prefetch_get_chat(
context: &Context,
headers: &[mailparse::MailHeader<'_>],
) -> Result<Option<chat::Chat>> {
let parent = get_prefetch_parent_message(context, headers).await?;
if let Some(parent) = &parent {
return Ok(Some(
chat::Chat::load_from_db(context, parent.get_chat_id()).await?,
));
}
Ok(None)
}
pub(crate) async fn prefetch_should_download(
context: &Context,
headers: &[mailparse::MailHeader<'_>],
message_id: &str,
mut flags: impl Iterator<Item = Flag<'_>>,
) -> Result<bool> {
if message::rfc724_mid_exists(context, message_id)
.await?
.is_some()
{
markseen_on_imap_table(context, message_id).await?;
return Ok(false);
}
if let Some(chat) = prefetch_get_chat(context, headers).await? {
if chat.typ == Chattype::Group && !chat.id.is_special() {
return Ok(true);
}
}
let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) {
let from = from.to_ascii_lowercase();
from.contains("mailer-daemon") || from.contains("mail-daemon")
} else {
false
};
let is_autocrypt_setup_message = headers
.get_header_value(HeaderDef::AutocryptSetupMessage)
.is_some();
let from = match mimeparser::get_from(headers) {
Some(f) => f,
None => return Ok(false),
};
let (_from_id, blocked_contact, origin) =
match from_field_to_contact_id(context, &from, true).await? {
Some(res) => res,
None => return Ok(false),
};
if flags.any(|f| f == Flag::Draft) {
info!(context, "Ignoring draft message");
return Ok(false);
}
let is_chat_message = headers.get_header_value(HeaderDef::ChatVersion).is_some();
let accepted_contact = origin.is_known();
let is_reply_to_chat_message = get_prefetch_parent_message(context, headers)
.await?
.map(|parent| match parent.is_dc_message {
MessengerMessage::No => false,
MessengerMessage::Yes | MessengerMessage::Reply => true,
})
.unwrap_or_default();
let show_emails =
ShowEmails::from_i32(context.get_config_int(Config::ShowEmails).await?).unwrap_or_default();
let show = is_autocrypt_setup_message
|| match show_emails {
ShowEmails::Off => is_chat_message || is_reply_to_chat_message,
ShowEmails::AcceptedContacts => {
is_chat_message || is_reply_to_chat_message || accepted_contact
}
ShowEmails::All => true,
};
let should_download = (show && !blocked_contact) || maybe_ndn;
Ok(should_download)
}
pub(crate) fn is_dup_msg(is_chat_msg: bool, ts_sent: i64, ts_sent_old: i64) -> bool {
is_chat_msg && ts_sent_old != 0 && ts_sent > ts_sent_old
}
async fn mark_seen_by_uid(
context: &Context,
folder: &str,
uid_validity: u32,
uid: u32,
) -> Result<Option<ChatId>> {
if let Some((msg_id, chat_id)) = context
.sql
.query_row_optional(
"SELECT id, chat_id FROM msgs
WHERE id > 9 AND rfc724_mid IN (
SELECT rfc724_mid FROM imap
WHERE folder=?1
AND uidvalidity=?2
AND uid=?3
LIMIT 1
)",
(&folder, uid_validity, uid),
|row| {
let msg_id: MsgId = row.get(0)?;
let chat_id: ChatId = row.get(1)?;
Ok((msg_id, chat_id))
},
)
.await
.with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
{
let updated = context
.sql
.execute(
"UPDATE msgs SET state=?1
WHERE (state=?2 OR state=?3)
AND id=?4",
(
MessageState::InSeen,
MessageState::InFresh,
MessageState::InNoticed,
msg_id,
),
)
.await
.with_context(|| format!("failed to update msg {msg_id} state"))?
> 0;
if updated {
msg_id
.start_ephemeral_timer(context)
.await
.with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
Ok(Some(chat_id))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
context
.sql
.execute(
"INSERT OR IGNORE INTO imap_markseen (id)
SELECT id FROM imap WHERE rfc724_mid=?",
(message_id,),
)
.await?;
context.scheduler.interrupt_inbox().await;
Ok(())
}
pub(crate) async fn set_uid_next(context: &Context, folder: &str, uid_next: u32) -> Result<()> {
context
.sql
.execute(
"INSERT INTO imap_sync (folder, uid_next) VALUES (?,?)
ON CONFLICT(folder) DO UPDATE SET uid_next=excluded.uid_next",
(folder, uid_next),
)
.await?;
Ok(())
}
async fn get_uid_next(context: &Context, folder: &str) -> Result<u32> {
Ok(context
.sql
.query_get_value("SELECT uid_next FROM imap_sync WHERE folder=?;", (folder,))
.await?
.unwrap_or(0))
}
pub(crate) async fn set_uidvalidity(
context: &Context,
folder: &str,
uidvalidity: u32,
) -> Result<()> {
context
.sql
.execute(
"INSERT INTO imap_sync (folder, uidvalidity) VALUES (?,?)
ON CONFLICT(folder) DO UPDATE SET uidvalidity=excluded.uidvalidity",
(folder, uidvalidity),
)
.await?;
Ok(())
}
async fn get_uidvalidity(context: &Context, folder: &str) -> Result<u32> {
Ok(context
.sql
.query_get_value(
"SELECT uidvalidity FROM imap_sync WHERE folder=?;",
(folder,),
)
.await?
.unwrap_or(0))
}
pub(crate) async fn set_modseq(context: &Context, folder: &str, modseq: u64) -> Result<()> {
context
.sql
.execute(
"INSERT INTO imap_sync (folder, modseq) VALUES (?,?)
ON CONFLICT(folder) DO UPDATE SET modseq=excluded.modseq",
(folder, modseq),
)
.await?;
Ok(())
}
async fn get_modseq(context: &Context, folder: &str) -> Result<u64> {
Ok(context
.sql
.query_get_value("SELECT modseq FROM imap_sync WHERE folder=?;", (folder,))
.await?
.unwrap_or(0))
}
pub(crate) async fn get_imap_self_sent_search_command(context: &Context) -> Result<String> {
let mut search_command = format!("FROM \"{}\"", context.get_primary_self_addr().await?);
for item in context.get_secondary_self_addrs().await? {
search_command = format!("OR ({search_command}) (FROM \"{item}\")");
}
Ok(search_command)
}
pub async fn get_config_last_seen_uid(context: &Context, folder: &str) -> Result<(u32, u32)> {
let key = format!("imap.mailbox.{folder}");
if let Some(entry) = context.sql.get_raw_config(&key).await? {
let mut parts = entry.split(':');
Ok((
parts.next().unwrap_or_default().parse().unwrap_or(0),
parts.next().unwrap_or_default().parse().unwrap_or(0),
))
} else {
Ok((0, 0))
}
}
async fn should_ignore_folder(
context: &Context,
folder: &str,
folder_meaning: FolderMeaning,
) -> Result<bool> {
if !context.get_config_bool(Config::OnlyFetchMvbox).await? {
return Ok(false);
}
if context.is_sentbox(folder).await? {
return Ok(!context.get_config_bool(Config::SentboxWatch).await?);
}
Ok(!(context.is_mvbox(folder).await? || folder_meaning == FolderMeaning::Spam))
}
fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
let mut ranges: Vec<UidRange> = vec![];
for ¤t in uids {
if let Some(last) = ranges.last_mut() {
if last.end + 1 == current {
last.end = current;
continue;
}
}
ranges.push(UidRange {
start: current,
end: current,
});
}
let mut result = vec![];
let (mut last_uids, mut last_str) = (Vec::new(), String::new());
for range in ranges {
last_uids.reserve((range.end - range.start + 1).try_into()?);
(range.start..=range.end).for_each(|u| last_uids.push(u));
if !last_str.is_empty() {
last_str.push(',');
}
last_str.push_str(&range.to_string());
if last_str.len() > 990 {
result.push((take(&mut last_uids), take(&mut last_str)));
}
}
result.push((last_uids, last_str));
result.retain(|(_, s)| !s.is_empty());
Ok(result)
}
struct UidRange {
start: u32,
end: u32,
}
impl std::fmt::Display for UidRange {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.start == self.end {
write!(f, "{}", self.start)
} else {
write!(f, "{}:{}", self.start, self.end)
}
}
}
async fn add_all_recipients_as_contacts(
context: &Context,
session: &mut Session,
folder: Config,
) -> Result<()> {
let mailbox = if let Some(m) = context.get_config(folder).await? {
m
} else {
info!(
context,
"Folder {} is not configured, skipping fetching contacts from it.", folder
);
return Ok(());
};
session
.select_with_uidvalidity(context, &mailbox)
.await
.with_context(|| format!("could not select {mailbox}"))?;
let recipients = session
.get_all_recipients(context)
.await
.context("could not get recipients")?;
let mut any_modified = false;
for recipient in recipients {
let recipient_addr = match ContactAddress::new(&recipient.addr) {
Err(err) => {
warn!(
context,
"Could not add contact for recipient with address {:?}: {:#}",
recipient.addr,
err
);
continue;
}
Ok(recipient_addr) => recipient_addr,
};
let (_, modified) = Contact::add_or_lookup(
context,
&recipient.display_name.unwrap_or_default(),
&recipient_addr,
Origin::OutgoingTo,
)
.await?;
if modified != Modifier::None {
any_modified = true;
}
}
if any_modified {
context.emit_event(EventType::ContactsChanged(None));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::TestContext;
#[test]
fn test_get_folder_meaning_by_name() {
assert_eq!(get_folder_meaning_by_name("Gesendet"), FolderMeaning::Sent);
assert_eq!(get_folder_meaning_by_name("GESENDET"), FolderMeaning::Sent);
assert_eq!(get_folder_meaning_by_name("gesendet"), FolderMeaning::Sent);
assert_eq!(
get_folder_meaning_by_name("Messages envoyés"),
FolderMeaning::Sent
);
assert_eq!(
get_folder_meaning_by_name("mEsSaGes envoyÉs"),
FolderMeaning::Sent
);
assert_eq!(get_folder_meaning_by_name("xxx"), FolderMeaning::Unknown);
assert_eq!(get_folder_meaning_by_name("SPAM"), FolderMeaning::Spam);
assert_eq!(get_folder_meaning_by_name("Trash"), FolderMeaning::Trash);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_set_uid_next_validity() {
let t = TestContext::new_alice().await;
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 0);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 0);
set_uidvalidity(&t.ctx, "Inbox", 7).await.unwrap();
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 7);
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 0);
set_uid_next(&t.ctx, "Inbox", 5).await.unwrap();
set_uidvalidity(&t.ctx, "Inbox", 6).await.unwrap();
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 5);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 6);
}
#[test]
fn test_build_sequence_sets() {
assert_eq!(build_sequence_sets(&[]).unwrap(), vec![]);
let cases = vec![
(vec![1], "1"),
(vec![3291], "3291"),
(vec![1, 3, 5, 7, 9, 11], "1,3,5,7,9,11"),
(vec![1, 2, 3], "1:3"),
(vec![1, 4, 5, 6], "1,4:6"),
((1..=500).collect(), "1:500"),
(vec![3, 4, 8, 9, 10, 11, 39, 50, 2], "3:4,8:11,39,50,2"),
];
for (input, s) in cases {
assert_eq!(
build_sequence_sets(&input).unwrap(),
vec![(input, s.into())]
);
}
let has_number = |(uids, s): &(Vec<u32>, String), number| {
uids.iter().any(|&n| n == number)
&& s.split(',').any(|n| n.parse::<u32>().unwrap() == number)
};
let numbers: Vec<_> = (2..=500).step_by(2).collect();
let result = build_sequence_sets(&numbers).unwrap();
for (_, set) in &result {
assert!(set.len() < 1010);
assert!(!set.ends_with(','));
assert!(!set.starts_with(','));
}
assert!(result.len() == 1); for &number in &numbers {
assert!(result.iter().any(|r| has_number(r, number)));
}
let numbers: Vec<_> = (1..=1000).step_by(3).collect();
let result = build_sequence_sets(&numbers).unwrap();
for (_, set) in &result {
assert!(set.len() < 1010);
assert!(!set.ends_with(','));
assert!(!set.starts_with(','));
}
let (last_uids, last_str) = result.last().unwrap();
assert_eq!(
last_uids.get((last_uids.len() - 2)..).unwrap(),
&[997, 1000]
);
assert!(last_str.ends_with("997,1000"));
assert!(result.len() == 2); for &number in &numbers {
assert!(result.iter().any(|r| has_number(r, number)));
}
let numbers: Vec<_> = (30000000..=30002500).step_by(4).collect();
let result = build_sequence_sets(&numbers).unwrap();
for (_, set) in &result {
assert!(set.len() < 1010);
assert!(!set.ends_with(','));
assert!(!set.starts_with(','));
}
assert_eq!(result.len(), 6);
for &number in &numbers {
assert!(result.iter().any(|r| has_number(r, number)));
}
}
#[allow(clippy::too_many_arguments)]
async fn check_target_folder_combination(
folder: &str,
mvbox_move: bool,
chat_msg: bool,
expected_destination: &str,
accepted_chat: bool,
outgoing: bool,
setupmessage: bool,
) -> Result<()> {
println!("Testing: For folder {folder}, mvbox_move {mvbox_move}, chat_msg {chat_msg}, accepted {accepted_chat}, outgoing {outgoing}, setupmessage {setupmessage}");
let t = TestContext::new_alice().await;
t.ctx
.set_config(Config::ConfiguredMvboxFolder, Some("DeltaChat"))
.await?;
t.ctx
.set_config(Config::ConfiguredSentboxFolder, Some("Sent"))
.await?;
t.ctx
.set_config(Config::MvboxMove, Some(if mvbox_move { "1" } else { "0" }))
.await?;
if accepted_chat {
let contact_id = Contact::create(&t.ctx, "", "bob@example.net").await?;
ChatId::create_for_contact(&t.ctx, contact_id).await?;
}
let temp;
let bytes = if setupmessage {
include_bytes!("../test-data/message/AutocryptSetupMessage.eml")
} else {
temp = format!(
"Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\
{}\
Subject: foo\n\
Message-ID: <abc@example.com>\n\
{}\
Date: Sun, 22 Mar 2020 22:37:57 +0000\n\
\n\
hello\n",
if outgoing {
"From: alice@example.org\nTo: bob@example.net\n"
} else {
"From: bob@example.net\nTo: alice@example.org\n"
},
if chat_msg { "Chat-Version: 1.0\n" } else { "" },
);
temp.as_bytes()
};
let (headers, _) = mailparse::parse_headers(bytes)?;
let actual = if let Some(config) =
target_folder_cfg(&t, folder, get_folder_meaning_by_name(folder), &headers).await?
{
t.get_config(config).await?
} else {
None
};
let expected = if expected_destination == folder {
None
} else {
Some(expected_destination)
};
assert_eq!(expected, actual.as_deref(), "For folder {folder}, mvbox_move {mvbox_move}, chat_msg {chat_msg}, accepted {accepted_chat}, outgoing {outgoing}, setupmessage {setupmessage}: expected {expected:?}, got {actual:?}");
Ok(())
}
const COMBINATIONS_ACCEPTED_CHAT: &[(&str, bool, bool, &str)] = &[
("INBOX", false, false, "INBOX"),
("INBOX", false, true, "INBOX"),
("INBOX", true, false, "INBOX"),
("INBOX", true, true, "DeltaChat"),
("Sent", false, false, "Sent"),
("Sent", false, true, "Sent"),
("Sent", true, false, "Sent"),
("Sent", true, true, "DeltaChat"),
("Spam", false, false, "INBOX"), ("Spam", false, true, "INBOX"),
("Spam", true, false, "INBOX"), ("Spam", true, true, "DeltaChat"),
];
const COMBINATIONS_REQUEST: &[(&str, bool, bool, &str)] = &[
("INBOX", false, false, "INBOX"),
("INBOX", false, true, "INBOX"),
("INBOX", true, false, "INBOX"),
("INBOX", true, true, "DeltaChat"),
("Sent", false, false, "Sent"),
("Sent", false, true, "Sent"),
("Sent", true, false, "Sent"),
("Sent", true, true, "DeltaChat"),
("Spam", false, false, "Spam"),
("Spam", false, true, "INBOX"),
("Spam", true, false, "Spam"),
("Spam", true, true, "DeltaChat"),
];
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_target_folder_incoming_accepted() -> Result<()> {
for (folder, mvbox_move, chat_msg, expected_destination) in COMBINATIONS_ACCEPTED_CHAT {
check_target_folder_combination(
folder,
*mvbox_move,
*chat_msg,
expected_destination,
true,
false,
false,
)
.await?;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_target_folder_incoming_request() -> Result<()> {
for (folder, mvbox_move, chat_msg, expected_destination) in COMBINATIONS_REQUEST {
check_target_folder_combination(
folder,
*mvbox_move,
*chat_msg,
expected_destination,
false,
false,
false,
)
.await?;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_target_folder_outgoing() -> Result<()> {
for (folder, mvbox_move, chat_msg, expected_destination) in COMBINATIONS_ACCEPTED_CHAT {
check_target_folder_combination(
folder,
*mvbox_move,
*chat_msg,
expected_destination,
true,
true,
false,
)
.await?;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_target_folder_setupmsg() -> Result<()> {
for (folder, mvbox_move, chat_msg, _expected_destination) in COMBINATIONS_ACCEPTED_CHAT {
check_target_folder_combination(
folder,
*mvbox_move,
*chat_msg,
if folder == &"Spam" { "INBOX" } else { folder }, false,
true,
true,
)
.await?;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_imap_search_command() -> Result<()> {
let t = TestContext::new_alice().await;
assert_eq!(
get_imap_self_sent_search_command(&t.ctx).await?,
r#"FROM "alice@example.org""#
);
t.ctx.set_primary_self_addr("alice@another.com").await?;
assert_eq!(
get_imap_self_sent_search_command(&t.ctx).await?,
r#"OR (FROM "alice@another.com") (FROM "alice@example.org")"#
);
t.ctx.set_primary_self_addr("alice@third.com").await?;
assert_eq!(
get_imap_self_sent_search_command(&t.ctx).await?,
r#"OR (OR (FROM "alice@third.com") (FROM "alice@another.com")) (FROM "alice@example.org")"#
);
Ok(())
}
#[test]
fn test_uid_grouper() {
let grouper = UidGrouper::from([(1, 2, "INBOX".to_string())]);
let res: Vec<(String, Vec<i64>, String)> = grouper.into_iter().collect();
assert_eq!(res, vec![("INBOX".to_string(), vec![1], "2".to_string())]);
let grouper = UidGrouper::from([(1, 2, "INBOX".to_string()), (2, 3, "INBOX".to_string())]);
let res: Vec<(String, Vec<i64>, String)> = grouper.into_iter().collect();
assert_eq!(
res,
vec![("INBOX".to_string(), vec![1, 2], "2:3".to_string())]
);
let grouper = UidGrouper::from([
(1, 2, "INBOX".to_string()),
(2, 2, "INBOX".to_string()),
(3, 3, "INBOX".to_string()),
]);
let res: Vec<(String, Vec<i64>, String)> = grouper.into_iter().collect();
assert_eq!(
res,
vec![("INBOX".to_string(), vec![1, 2, 3], "2:3".to_string())]
);
}
}