pub mod send;
use std::time::Duration;
use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::response::{Category, Code, Detail};
use async_smtp::{self as smtp, EmailAddress, SmtpTransport};
use tokio::io::BufStream;
use tokio::task;
use crate::chat::{add_info_msg_with_cmd, ChatId};
use crate::config::Config;
use crate::contact::{Contact, ContactId};
use crate::context::Context;
use crate::events::EventType;
use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam};
use crate::message::Message;
use crate::message::{self, MsgId};
use crate::mimefactory::MimeFactory;
use crate::net::connect_tcp;
use crate::net::session::SessionBufStream;
use crate::net::tls::wrap_tls;
use crate::oauth2::get_oauth2_access_token;
use crate::provider::Socket;
use crate::scheduler::connectivity::ConnectivityStore;
use crate::socks::Socks5Config;
use crate::sql;
use crate::stock_str::unencrypted_email;
use crate::tools::{self, time_elapsed};
const SMTP_TIMEOUT: Duration = Duration::from_secs(60);
#[derive(Default)]
pub(crate) struct Smtp {
transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
from: Option<EmailAddress>,
last_success: Option<tools::Time>,
pub(crate) connectivity: ConnectivityStore,
pub(crate) last_send_error: Option<String>,
}
impl Smtp {
pub fn new() -> Self {
Default::default()
}
pub fn disconnect(&mut self) {
if let Some(mut transport) = self.transport.take() {
task::spawn(async move { transport.quit().await });
}
self.last_success = None;
}
pub fn has_maybe_stale_connection(&self) -> bool {
if let Some(last_success) = self.last_success {
time_elapsed(&last_success).as_secs() > 60
} else {
false
}
}
pub fn is_connected(&self) -> bool {
self.transport.is_some()
}
pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
if self.has_maybe_stale_connection() {
info!(context, "Closing stale connection");
self.disconnect();
}
if self.is_connected() {
return Ok(());
}
self.connectivity.set_connecting(context).await;
let lp = LoginParam::load_configured_params(context).await?;
self.connect(
context,
&lp.smtp,
&lp.socks5_config,
&lp.addr,
lp.provider.map_or(lp.socks5_config.is_some(), |provider| {
provider.opt.strict_tls
}),
)
.await
}
async fn connect_secure_socks5(
&self,
context: &Context,
hostname: &str,
port: u16,
strict_tls: bool,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, strict_tls)
.await?;
let tls_stream = wrap_tls(strict_tls, hostname, socks5_stream).await?;
let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_starttls_socks5(
&self,
context: &Context,
hostname: &str,
port: u16,
strict_tls: bool,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, strict_tls)
.await?;
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, BufStream::new(socks5_stream)).await?;
let tcp_stream = transport.starttls().await?.into_inner();
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await
.context("STARTTLS upgrade failed")?;
let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting();
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_insecure_socks5(
&self,
context: &Context,
hostname: &str,
port: u16,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, false)
.await?;
let buffered_stream = BufStream::new(socks5_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_secure(
&self,
context: &Context,
hostname: &str,
port: u16,
strict_tls: bool,
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream).await?;
let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_starttls(
&self,
context: &Context,
hostname: &str,
port: u16,
strict_tls: bool,
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, strict_tls).await?;
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, BufStream::new(tcp_stream)).await?;
let tcp_stream = transport.starttls().await?.into_inner();
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await
.context("STARTTLS upgrade failed")?;
let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting();
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_insecure(
&self,
context: &Context,
hostname: &str,
port: u16,
) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?;
let buffered_stream = BufStream::new(tcp_stream);
let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
pub async fn connect(
&mut self,
context: &Context,
lp: &ServerLoginParam,
socks5_config: &Option<Socks5Config>,
addr: &str,
provider_strict_tls: bool,
) -> Result<()> {
if self.is_connected() {
warn!(context, "SMTP already connected.");
return Ok(());
}
if lp.server.is_empty() || lp.port == 0 {
bail!("bad connection parameters");
}
let from = EmailAddress::new(addr.to_string())
.with_context(|| format!("invalid login address {addr}"))?;
self.from = Some(from);
let domain = &lp.server;
let port = lp.port;
let strict_tls = match lp.certificate_checks {
CertificateChecks::Automatic => provider_strict_tls,
CertificateChecks::Strict => true,
CertificateChecks::AcceptInvalidCertificates
| CertificateChecks::AcceptInvalidCertificates2 => false,
};
let mut transport = if let Some(socks5_config) = socks5_config {
match lp.security {
Socket::Automatic => bail!("SMTP port security is not configured"),
Socket::Ssl => {
self.connect_secure_socks5(
context,
domain,
port,
strict_tls,
socks5_config.clone(),
)
.await?
}
Socket::Starttls => {
self.connect_starttls_socks5(
context,
domain,
port,
strict_tls,
socks5_config.clone(),
)
.await?
}
Socket::Plain => {
self.connect_insecure_socks5(context, domain, port, socks5_config.clone())
.await?
}
}
} else {
match lp.security {
Socket::Automatic => bail!("SMTP port security is not configured"),
Socket::Ssl => {
self.connect_secure(context, domain, port, strict_tls)
.await?
}
Socket::Starttls => {
self.connect_starttls(context, domain, port, strict_tls)
.await?
}
Socket::Plain => self.connect_insecure(context, domain, port).await?,
}
};
{
let (creds, mechanism) = if lp.oauth2 {
let send_pw = &lp.password;
let access_token = get_oauth2_access_token(context, addr, send_pw, false).await?;
if access_token.is_none() {
bail!("SMTP OAuth 2 error {}", addr);
}
let user = &lp.user;
(
smtp::authentication::Credentials::new(
user.to_string(),
access_token.unwrap_or_default(),
),
vec![smtp::authentication::Mechanism::Xoauth2],
)
} else {
let user = lp.user.clone();
let pw = lp.password.clone();
(
smtp::authentication::Credentials::new(user, pw),
vec![
smtp::authentication::Mechanism::Plain,
smtp::authentication::Mechanism::Login,
],
)
};
transport.try_login(&creds, &mechanism).await?;
}
self.transport = Some(transport);
self.last_success = Some(tools::Time::now());
context.emit_event(EventType::SmtpConnected(format!(
"SMTP-LOGIN as {} ok",
lp.user,
)));
Ok(())
}
}
pub(crate) enum SendResult {
Success,
Failure(Error),
Retry,
}
pub(crate) async fn smtp_send(
context: &Context,
recipients: &[async_smtp::EmailAddress],
message: &str,
smtp: &mut Smtp,
msg_id: MsgId,
) -> SendResult {
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
info!(context, "smtp-sending out mime message:");
println!("{message}");
}
smtp.connectivity.set_working(context).await;
if let Err(err) = smtp
.connect_configured(context)
.await
.context("Failed to open SMTP connection")
{
smtp.last_send_error = Some(format!("{err:#}"));
return SendResult::Retry;
}
let send_result = smtp.send(context, recipients, message.as_bytes()).await;
smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
let status = match send_result {
Err(crate::smtp::send::Error::SmtpSend(err)) => {
info!(context, "SMTP failed to send: {:?}", &err);
let res = match err {
async_smtp::error::Error::Permanent(ref response) => {
let maybe_transient = match response.code {
Code {
category: Category::MailSystem,
detail: Detail::Zero,
..
} => {
response.first_word() == Some("5.5.0")
}
_ => false,
};
if maybe_transient {
info!(context, "Permanent error that is likely to actually be transient, postponing retry for later");
SendResult::Retry
} else {
info!(context, "Permanent error, message sending failed");
SendResult::Failure(format_err!("Permanent SMTP error: {}", err))
}
}
async_smtp::error::Error::Transient(ref response) => {
if let Some(first_word) = response.first_word() {
if first_word.ends_with(".1.1")
|| first_word.ends_with(".1.2")
|| first_word.ends_with(".1.3")
{
info!(context, "Received extended status code {} for a transient error. This looks like a misconfigured SMTP server, let's fail immediately", first_word);
SendResult::Failure(format_err!("Permanent SMTP error: {}", err))
} else {
info!(
context,
"Transient error with status code {}, postponing retry for later",
first_word
);
SendResult::Retry
}
} else {
info!(
context,
"Transient error without status code, postponing retry for later"
);
SendResult::Retry
}
}
_ => {
info!(
context,
"Message sending failed without error returned by the server, retry later"
);
SendResult::Retry
}
};
info!(context, "Failed to send message over SMTP, disconnecting");
smtp.disconnect();
res
}
Err(crate::smtp::send::Error::Envelope(err)) => {
smtp.disconnect();
warn!(context, "SMTP job is invalid: {}", err);
SendResult::Failure(err)
}
Err(crate::smtp::send::Error::NoTransport) => {
error!(context, "SMTP job failed because SMTP has no transport");
SendResult::Failure(format_err!("SMTP has not transport"))
}
Err(crate::smtp::send::Error::Other(err)) => {
smtp.disconnect();
warn!(context, "unable to load job: {}", err);
SendResult::Failure(err)
}
Ok(()) => SendResult::Success,
};
if let SendResult::Failure(err) = &status {
match Message::load_from_db(context, msg_id).await {
Ok(mut msg) => {
if let Err(err) = message::set_msg_failed(context, &mut msg, &err.to_string()).await
{
error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
}
}
Err(err) => {
error!(
context,
"Failed to load {msg_id} to mark it as failed: {err:#}."
);
}
}
}
status
}
pub(crate) async fn send_msg_to_smtp(
context: &Context,
smtp: &mut Smtp,
rowid: i64,
) -> anyhow::Result<()> {
if let Err(err) = smtp
.connect_configured(context)
.await
.context("SMTP connection failure")
{
smtp.last_send_error = Some(format!("{err:#}"));
return Err(err);
}
context
.sql
.execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
.await
.context("failed to update retries count")?;
let (body, recipients, msg_id, retries) = context
.sql
.query_row(
"SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
(rowid,),
|row| {
let mime: String = row.get(0)?;
let recipients: String = row.get(1)?;
let msg_id: MsgId = row.get(2)?;
let retries: i64 = row.get(3)?;
Ok((mime, recipients, msg_id, retries))
},
)
.await?;
if retries > 6 {
let mut msg = Message::load_from_db(context, msg_id).await?;
message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.").await?;
context
.sql
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
.await
.context("failed to remove message with exceeded retry limit from smtp table")?;
return Ok(());
}
info!(
context,
"Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP"
);
let recipients_list = recipients
.split(' ')
.filter_map(
|addr| match async_smtp::EmailAddress::new(addr.to_string()) {
Ok(addr) => Some(addr),
Err(err) => {
warn!(context, "invalid recipient: {} {:?}", addr, err);
None
}
},
)
.collect::<Vec<_>>();
let status = smtp_send(context, &recipients_list, body.as_str(), smtp, msg_id).await;
match status {
SendResult::Retry => {}
SendResult::Success => {
context
.sql
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
.await?;
}
SendResult::Failure(ref err) => {
if err.to_string().contains("Invalid unencrypted mail") {
let res = context
.sql
.query_row_optional(
"SELECT chat_id, timestamp FROM msgs WHERE id=?;",
(msg_id,),
|row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
)
.await?;
if let Some((chat_id, timestamp_sort)) = res {
let addr = context.get_config(Config::ConfiguredAddr).await?;
let text = unencrypted_email(
context,
addr.unwrap_or_default()
.split('@')
.nth(1)
.unwrap_or_default(),
)
.await;
add_info_msg_with_cmd(
context,
chat_id,
&text,
crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
timestamp_sort,
None,
None,
None,
)
.await?;
};
}
context
.sql
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
.await?;
}
};
match status {
SendResult::Retry => Err(format_err!("Retry")),
SendResult::Success => {
if !context
.sql
.exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
.await?
{
msg_id.set_delivered(context).await?;
}
Ok(())
}
SendResult::Failure(err) => Err(format_err!("{}", err)),
}
}
async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
loop {
if !context.ratelimit.read().await.can_send() {
info!(context, "Ratelimiter does not allow sending MDNs now");
return Ok(());
}
let more_mdns = send_mdn(context, connection).await?;
if !more_mdns {
return Ok(());
}
}
}
pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
let ratelimited = if context.ratelimit.read().await.can_send() {
context.flush_status_updates().await?;
context.send_sync_msg().await?;
false
} else {
true
};
let rowids = context
.sql
.query_map(
"SELECT id FROM smtp ORDER BY id ASC",
(),
|row| {
let rowid: i64 = row.get(0)?;
Ok(rowid)
},
|rowids| {
rowids
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
info!(context, "Selected rows from SMTP queue: {rowids:?}.");
for rowid in rowids {
send_msg_to_smtp(context, connection, rowid)
.await
.context("failed to send message")?;
}
if !ratelimited {
send_mdns(context, connection)
.await
.context("failed to send MDNs")?;
}
Ok(())
}
async fn send_mdn_msg_id(
context: &Context,
msg_id: MsgId,
contact_id: ContactId,
smtp: &mut Smtp,
) -> Result<bool> {
let contact = Contact::get_by_id(context, contact_id).await?;
if contact.is_blocked() {
return Err(format_err!("Contact is blocked"));
}
let (additional_msg_ids, additional_rfc724_mids): (Vec<MsgId>, Vec<String>) = context
.sql
.query_map(
"SELECT msg_id, rfc724_mid
FROM smtp_mdns
WHERE from_id=? AND msg_id!=?",
(contact_id, msg_id),
|row| {
let msg_id: MsgId = row.get(0)?;
let rfc724_mid: String = row.get(1)?;
Ok((msg_id, rfc724_mid))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?
.into_iter()
.unzip();
let msg = Message::load_from_db(context, msg_id).await?;
let mimefactory = MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await?;
let rendered_msg = mimefactory.render(context).await?;
let body = rendered_msg.message;
let addr = contact.get_addr();
let recipient = async_smtp::EmailAddress::new(addr.to_string())
.map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?;
let recipients = vec![recipient];
match smtp_send(context, &recipients, &body, smtp, msg_id).await {
SendResult::Success => {
info!(context, "Successfully sent MDN for {}", msg_id);
context
.sql
.execute("DELETE FROM smtp_mdns WHERE msg_id = ?", (msg_id,))
.await?;
if !additional_msg_ids.is_empty() {
let q = format!(
"DELETE FROM smtp_mdns WHERE msg_id IN({})",
sql::repeat_vars(additional_msg_ids.len())
);
context
.sql
.execute(&q, rusqlite::params_from_iter(additional_msg_ids))
.await?;
}
Ok(true)
}
SendResult::Retry => {
info!(
context,
"Temporary SMTP failure while sending an MDN for {}", msg_id
);
Ok(false)
}
SendResult::Failure(err) => Err(err),
}
}
async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
let mdns_enabled = context.get_config_bool(Config::MdnsEnabled).await?;
if !mdns_enabled {
context.sql.execute("DELETE FROM smtp_mdns", []).await?;
return Ok(false);
}
info!(context, "Sending MDNs");
context
.sql
.execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
.await?;
let Some(msg_row) = context
.sql
.query_row_optional(
"SELECT msg_id, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
[],
|row| {
let msg_id: MsgId = row.get(0)?;
let from_id: ContactId = row.get(1)?;
Ok((msg_id, from_id))
},
)
.await?
else {
return Ok(false);
};
let (msg_id, contact_id) = msg_row;
context
.sql
.execute(
"UPDATE smtp_mdns SET retries=retries+1 WHERE msg_id=?",
(msg_id,),
)
.await
.context("failed to update MDN retries count")?;
let res = send_mdn_msg_id(context, msg_id, contact_id, smtp).await;
if let Err(ref err) = res {
warn!(
context,
"Error sending MDN for {msg_id}, removing it: {err:#}."
);
context
.sql
.execute("DELETE FROM smtp_mdns WHERE msg_id = ?", (msg_id,))
.await?;
}
res
}