1mod connect;
4pub mod send;
5
6use anyhow::{Context as _, Error, Result, bail, format_err};
7use async_smtp::response::{Category, Code, Detail};
8use async_smtp::{EmailAddress, SmtpTransport};
9use tokio::task;
10
11use crate::chat::{ChatId, add_info_msg_with_cmd};
12use crate::config::Config;
13use crate::contact::{Contact, ContactId};
14use crate::context::Context;
15use crate::events::EventType;
16use crate::log::{LogExt, warn};
17use crate::message::Message;
18use crate::message::{self, MsgId};
19use crate::mimefactory::MimeFactory;
20use crate::net::proxy::ProxyConfig;
21use crate::net::session::SessionBufStream;
22use crate::scheduler::connectivity::ConnectivityStore;
23use crate::stock_str::unencrypted_email;
24use crate::tools::{self, time_elapsed};
25use crate::transport::{
26 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
27};
28
29#[derive(Default)]
30pub(crate) struct Smtp {
31 transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
33
34 from: Option<EmailAddress>,
36
37 last_success: Option<tools::Time>,
41
42 pub(crate) connectivity: ConnectivityStore,
43
44 pub(crate) last_send_error: Option<String>,
46}
47
48impl Smtp {
49 pub fn new() -> Self {
51 Default::default()
52 }
53
54 pub fn disconnect(&mut self) {
56 if let Some(mut transport) = self.transport.take() {
57 task::spawn(async move { transport.quit().await });
61 }
62 self.last_success = None;
63 }
64
65 pub fn has_maybe_stale_connection(&self) -> bool {
68 if let Some(last_success) = self.last_success {
69 time_elapsed(&last_success).as_secs() > 60
70 } else {
71 false
72 }
73 }
74
75 pub fn is_connected(&self) -> bool {
77 self.transport.is_some()
78 }
79
80 pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
82 if self.has_maybe_stale_connection() {
83 info!(context, "Closing stale connection.");
84 self.disconnect();
85 }
86
87 if self.is_connected() {
88 return Ok(());
89 }
90
91 self.connectivity.set_connecting(context);
92 let (_transport_id, lp) = ConfiguredLoginParam::load(context)
93 .await?
94 .context("Not configured")?;
95 let proxy_config = ProxyConfig::load(context).await?;
96 self.connect(
97 context,
98 &lp.smtp,
99 &lp.smtp_password,
100 &proxy_config,
101 &lp.addr,
102 lp.strict_tls(proxy_config.is_some()),
103 lp.oauth2,
104 )
105 .await
106 }
107
108 #[expect(clippy::too_many_arguments)]
110 pub async fn connect(
111 &mut self,
112 context: &Context,
113 login_params: &[ConfiguredServerLoginParam],
114 password: &str,
115 proxy_config: &Option<ProxyConfig>,
116 addr: &str,
117 strict_tls: bool,
118 oauth2: bool,
119 ) -> Result<()> {
120 if self.is_connected() {
121 warn!(context, "SMTP already connected.");
122 return Ok(());
123 }
124
125 let from = EmailAddress::new(addr.to_string())
126 .with_context(|| format!("Invalid address {addr:?}"))?;
127 self.from = Some(from);
128
129 let login_params =
130 prioritize_server_login_params(&context.sql, login_params, "smtp").await?;
131 let mut first_error = None;
132 for lp in login_params {
133 info!(context, "SMTP trying to connect to {}.", &lp.connection);
134 let transport = match connect::connect_and_auth(
135 context,
136 proxy_config,
137 strict_tls,
138 lp.connection.clone(),
139 oauth2,
140 addr,
141 &lp.user,
142 password,
143 )
144 .await
145 {
146 Ok(transport) => transport,
147 Err(err) => {
148 warn!(context, "SMTP failed to connect and authenticate: {err:#}.");
149 first_error.get_or_insert(err);
150 continue;
151 }
152 };
153
154 self.transport = Some(transport);
155 self.last_success = Some(tools::Time::now());
156
157 context.emit_event(EventType::SmtpConnected(format!(
158 "SMTP-LOGIN as {} ok",
159 lp.user,
160 )));
161 return Ok(());
162 }
163
164 Err(first_error.unwrap_or_else(|| format_err!("No SMTP connection candidates provided")))
165 }
166}
167
168pub(crate) enum SendResult {
169 Success,
171
172 Failure(Error),
174
175 Retry,
177}
178
179pub(crate) async fn smtp_send(
181 context: &Context,
182 recipients: &[async_smtp::EmailAddress],
183 message: &str,
184 smtp: &mut Smtp,
185 msg_id: Option<MsgId>,
186) -> SendResult {
187 if recipients.is_empty() {
188 return SendResult::Success;
189 }
190 if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
191 info!(context, "SMTP-sending out mime message:\n{message}");
192 }
193
194 smtp.connectivity.set_working(context);
195
196 if let Err(err) = smtp
197 .connect_configured(context)
198 .await
199 .context("Failed to open SMTP connection")
200 {
201 smtp.last_send_error = Some(format!("{err:#}"));
202 return SendResult::Retry;
203 }
204
205 let send_result = smtp.send(context, recipients, message.as_bytes()).await;
206 smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
207
208 let status = match send_result {
209 Err(crate::smtp::send::Error::SmtpSend(err)) => {
210 info!(context, "SMTP failed to send: {:?}.", &err);
212
213 let res = match err {
214 async_smtp::error::Error::Permanent(ref response) => {
215 let maybe_transient = match response.code {
218 Code {
221 category: Category::MailSystem,
222 detail: Detail::Zero,
223 ..
224 } => {
225 response.first_word() == Some("5.5.0")
233 }
234 _ => false,
235 };
236
237 if maybe_transient {
238 info!(
239 context,
240 "Permanent error that is likely to actually be transient, postponing retry for later."
241 );
242 SendResult::Retry
243 } else {
244 info!(context, "Permanent error, message sending failed.");
245 SendResult::Failure(format_err!("Permanent SMTP error: {err}"))
250 }
251 }
252 async_smtp::error::Error::Transient(ref response) => {
253 info!(
272 context,
273 "Transient error {response:?}, postponing retry for later."
274 );
275 SendResult::Retry
276 }
277 _ => {
278 info!(
279 context,
280 "Message sending failed without error returned by the server, retry later."
281 );
282 SendResult::Retry
283 }
284 };
285
286 info!(context, "Failed to send message over SMTP, disconnecting.");
288 smtp.disconnect();
289
290 res
291 }
292 Err(crate::smtp::send::Error::Envelope(err)) => {
293 smtp.disconnect();
295 warn!(context, "SMTP job is invalid: {err:#}.");
296 SendResult::Failure(err)
297 }
298 Err(crate::smtp::send::Error::NoTransport) => {
299 error!(context, "SMTP job failed because SMTP has no transport.");
302 SendResult::Failure(format_err!("SMTP has not transport"))
303 }
304 Err(crate::smtp::send::Error::Other(err)) => {
305 smtp.disconnect();
307 warn!(context, "Unable to load SMTP job: {err:#}.");
308 SendResult::Failure(err)
309 }
310 Ok(()) => SendResult::Success,
311 };
312
313 if let SendResult::Failure(err) = &status
314 && let Some(msg_id) = msg_id
315 {
316 match Message::load_from_db(context, msg_id).await {
318 Ok(mut msg) => {
319 if let Err(err) = message::set_msg_failed(context, &mut msg, &err.to_string()).await
320 {
321 error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
322 }
323 }
324 Err(err) => {
325 error!(
326 context,
327 "Failed to load {msg_id} to mark it as failed: {err:#}."
328 );
329 }
330 }
331 }
332 status
333}
334
335pub(crate) async fn send_msg_to_smtp(
339 context: &Context,
340 smtp: &mut Smtp,
341 rowid: i64,
342) -> anyhow::Result<()> {
343 if let Err(err) = smtp
344 .connect_configured(context)
345 .await
346 .context("SMTP connection failure")
347 {
348 smtp.last_send_error = Some(format!("{err:#}"));
349 return Err(err);
350 }
351
352 context
357 .sql
358 .execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
359 .await
360 .context("failed to update retries count")?;
361
362 let Some((body, recipients, msg_id, retries)) = context
363 .sql
364 .query_row_optional(
365 "SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
366 (rowid,),
367 |row| {
368 let mime: String = row.get(0)?;
369 let recipients: String = row.get(1)?;
370 let msg_id: MsgId = row.get(2)?;
371 let retries: i64 = row.get(3)?;
372 Ok((mime, recipients, msg_id, retries))
373 },
374 )
375 .await?
376 else {
377 return Ok(());
378 };
379 if retries > 6 {
380 if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
381 message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.")
382 .await?;
383 }
384 context
385 .sql
386 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
387 .await
388 .context("Failed to remove message with exceeded retry limit from smtp table")?;
389 return Ok(());
390 }
391 info!(
392 context,
393 "Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP."
394 );
395
396 let recipients_list = recipients
397 .split(' ')
398 .filter_map(
399 |addr| match async_smtp::EmailAddress::new(addr.to_string()) {
400 Ok(addr) => Some(addr),
401 Err(err) => {
402 warn!(context, "Invalid recipient: {} {:?}.", addr, err);
403 None
404 }
405 },
406 )
407 .collect::<Vec<_>>();
408
409 let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
410
411 match status {
412 SendResult::Retry => {}
413 SendResult::Success => {
414 context
415 .sql
416 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
417 .await?;
418 }
419 SendResult::Failure(ref err) => {
420 if err
421 .to_string()
422 .to_lowercase()
423 .contains("invalid unencrypted mail")
424 {
425 let res = context
426 .sql
427 .query_row_optional(
428 "SELECT chat_id, timestamp FROM msgs WHERE id=?;",
429 (msg_id,),
430 |row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
431 )
432 .await?;
433
434 if let Some((chat_id, timestamp_sort)) = res {
435 let addr = context.get_config(Config::ConfiguredAddr).await?;
436 let text = unencrypted_email(
437 context,
438 addr.unwrap_or_default()
439 .split('@')
440 .nth(1)
441 .unwrap_or_default(),
442 )
443 .await;
444 add_info_msg_with_cmd(
445 context,
446 chat_id,
447 &text,
448 crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
449 Some(timestamp_sort),
450 timestamp_sort,
451 None,
452 None,
453 None,
454 )
455 .await?;
456 };
457 }
458 context
459 .sql
460 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
461 .await?;
462 }
463 };
464
465 match status {
466 SendResult::Retry => Err(format_err!("Retry")),
467 SendResult::Success => {
468 if !context
469 .sql
470 .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
471 .await?
472 {
473 msg_id.set_delivered(context).await?;
474 }
475 Ok(())
476 }
477 SendResult::Failure(err) => Err(format_err!("{err}")),
478 }
479}
480
481async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
483 loop {
484 if !context.ratelimit.read().await.can_send() {
485 info!(context, "Ratelimiter does not allow sending MDNs now.");
486 return Ok(());
487 }
488
489 let more_mdns = send_mdn(context, connection).await?;
490 if !more_mdns {
491 return Ok(());
493 }
494 }
495}
496
497pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
499 let ratelimited = if context.ratelimit.read().await.can_send() {
500 context.send_sync_msg().await?;
502 context.flush_status_updates().await?;
503 false
504 } else {
505 true
506 };
507
508 let rowids = context
509 .sql
510 .query_map_vec("SELECT id FROM smtp ORDER BY id ASC", (), |row| {
511 let rowid: i64 = row.get(0)?;
512 Ok(rowid)
513 })
514 .await?;
515
516 info!(context, "Selected rows from SMTP queue: {rowids:?}.");
517 for rowid in rowids {
518 send_msg_to_smtp(context, connection, rowid)
519 .await
520 .context("Failed to send message")?;
521 }
522
523 if !ratelimited {
527 send_mdns(context, connection)
528 .await
529 .context("Failed to send MDNs")?;
530 }
531 Ok(())
532}
533
534async fn send_mdn_rfc724_mid(
544 context: &Context,
545 rfc724_mid: &str,
546 contact_id: ContactId,
547 smtp: &mut Smtp,
548) -> Result<bool> {
549 let contact = Contact::get_by_id(context, contact_id).await?;
550 if contact.is_blocked() {
551 return Err(format_err!("Contact is blocked"));
552 }
553
554 let additional_rfc724_mids = context
556 .sql
557 .query_map_vec(
558 "SELECT rfc724_mid
559 FROM smtp_mdns
560 WHERE from_id=? AND rfc724_mid!=?",
561 (contact_id, &rfc724_mid),
562 |row| {
563 let rfc724_mid: String = row.get(0)?;
564 Ok(rfc724_mid)
565 },
566 )
567 .await?;
568
569 let mimefactory = MimeFactory::from_mdn(
570 context,
571 contact_id,
572 rfc724_mid.to_string(),
573 additional_rfc724_mids.clone(),
574 )
575 .await?;
576 let encrypted = mimefactory.will_be_encrypted();
577 let rendered_msg = mimefactory.render(context).await?;
578 let body = rendered_msg.message;
579
580 let mut recipients = Vec::new();
581 if contact_id != ContactId::SELF {
582 recipients.push(contact.get_addr().to_string());
583 }
584 if context.get_config_bool(Config::BccSelf).await? {
585 add_self_recipients(context, &mut recipients, encrypted).await?;
586 }
587 let recipients: Vec<_> = recipients
588 .into_iter()
589 .filter_map(|addr| {
590 async_smtp::EmailAddress::new(addr.clone())
591 .with_context(|| format!("Invalid recipient: {addr}"))
592 .log_err(context)
593 .ok()
594 })
595 .collect();
596
597 match smtp_send(context, &recipients, &body, smtp, None).await {
598 SendResult::Success => {
599 if !recipients.is_empty() {
600 info!(context, "Successfully sent MDN for {rfc724_mid}.");
601 }
602 context
603 .sql
604 .transaction(|transaction| {
605 let mut stmt =
606 transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
607 stmt.execute((rfc724_mid,))?;
608 for additional_rfc724_mid in additional_rfc724_mids {
609 stmt.execute((additional_rfc724_mid,))?;
610 }
611 Ok(())
612 })
613 .await?;
614 Ok(true)
615 }
616 SendResult::Retry => {
617 info!(
618 context,
619 "Temporary SMTP failure while sending an MDN for {rfc724_mid}."
620 );
621 Ok(false)
622 }
623 SendResult::Failure(err) => Err(err),
624 }
625}
626
627async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
629 if !context.should_send_mdns().await? {
630 context.sql.execute("DELETE FROM smtp_mdns", []).await?;
631 return Ok(false);
632 }
633 info!(context, "Sending MDNs.");
634
635 context
636 .sql
637 .execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
638 .await?;
639 let Some(msg_row) = context
640 .sql
641 .query_row_optional(
642 "SELECT rfc724_mid, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
643 [],
644 |row| {
645 let rfc724_mid: String = row.get(0)?;
646 let from_id: ContactId = row.get(1)?;
647 Ok((rfc724_mid, from_id))
648 },
649 )
650 .await?
651 else {
652 return Ok(false);
653 };
654 let (rfc724_mid, contact_id) = msg_row;
655
656 context
657 .sql
658 .execute(
659 "UPDATE smtp_mdns SET retries=retries+1 WHERE rfc724_mid=?",
660 (rfc724_mid.clone(),),
661 )
662 .await
663 .context("Failed to update MDN retries count")?;
664
665 match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
666 Err(err) => {
667 warn!(
670 context,
671 "Error sending MDN for {rfc724_mid}, removing it: {err:#}."
672 );
673 context
674 .sql
675 .execute("DELETE FROM smtp_mdns WHERE rfc724_mid = ?", (rfc724_mid,))
676 .await?;
677 Err(err)
678 }
679 Ok(false) => {
680 bail!("Temporary error while sending an MDN");
681 }
682 Ok(true) => {
683 Ok(true)
685 }
686 }
687}
688
689pub(crate) async fn add_self_recipients(
692 context: &Context,
693 recipients: &mut Vec<String>,
694 encrypted: bool,
695) -> Result<()> {
696 if context.get_config_delete_server_after().await? != Some(0) || !recipients.is_empty() {
705 if encrypted {
709 for addr in context.get_secondary_self_addrs().await? {
710 recipients.push(addr);
711 }
712 }
713 let from = context.get_primary_self_addr().await?;
715 recipients.push(from);
716 }
717 Ok(())
718}