1mod connect;
4pub mod send;
5
6use anyhow::{bail, format_err, Context as _, Error, Result};
7use async_smtp::response::{Category, Code, Detail};
8use async_smtp::{EmailAddress, SmtpTransport};
9use tokio::task;
10
11use crate::chat::{add_info_msg_with_cmd, ChatId};
12use crate::config::Config;
13use crate::contact::{Contact, ContactId};
14use crate::context::Context;
15use crate::events::EventType;
16use crate::log::{error, info, warn};
17use crate::login_param::prioritize_server_login_params;
18use crate::login_param::{ConfiguredLoginParam, ConfiguredServerLoginParam};
19use crate::message::Message;
20use crate::message::{self, MsgId};
21use crate::mimefactory::MimeFactory;
22use crate::net::proxy::ProxyConfig;
23use crate::net::session::SessionBufStream;
24use crate::scheduler::connectivity::ConnectivityStore;
25use crate::stock_str::unencrypted_email;
26use crate::tools::{self, time_elapsed};
27
28#[derive(Default)]
29pub(crate) struct Smtp {
30 transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
32
33 from: Option<EmailAddress>,
35
36 last_success: Option<tools::Time>,
40
41 pub(crate) connectivity: ConnectivityStore,
42
43 pub(crate) last_send_error: Option<String>,
45}
46
47impl Smtp {
48 pub fn new() -> Self {
50 Default::default()
51 }
52
53 pub fn disconnect(&mut self) {
55 if let Some(mut transport) = self.transport.take() {
56 task::spawn(async move { transport.quit().await });
60 }
61 self.last_success = None;
62 }
63
64 pub fn has_maybe_stale_connection(&self) -> bool {
67 if let Some(last_success) = self.last_success {
68 time_elapsed(&last_success).as_secs() > 60
69 } else {
70 false
71 }
72 }
73
74 pub fn is_connected(&self) -> bool {
76 self.transport.is_some()
77 }
78
79 pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
81 if self.has_maybe_stale_connection() {
82 info!(context, "Closing stale connection.");
83 self.disconnect();
84 }
85
86 if self.is_connected() {
87 return Ok(());
88 }
89
90 self.connectivity.set_connecting(context).await;
91 let lp = ConfiguredLoginParam::load(context)
92 .await?
93 .context("Not configured")?;
94 let proxy_config = ProxyConfig::load(context).await?;
95 self.connect(
96 context,
97 &lp.smtp,
98 &lp.smtp_password,
99 &proxy_config,
100 &lp.addr,
101 lp.strict_tls(proxy_config.is_some()),
102 lp.oauth2,
103 )
104 .await
105 }
106
107 #[expect(clippy::too_many_arguments)]
109 pub async fn connect(
110 &mut self,
111 context: &Context,
112 login_params: &[ConfiguredServerLoginParam],
113 password: &str,
114 proxy_config: &Option<ProxyConfig>,
115 addr: &str,
116 strict_tls: bool,
117 oauth2: bool,
118 ) -> Result<()> {
119 if self.is_connected() {
120 warn!(context, "SMTP already connected.");
121 return Ok(());
122 }
123
124 let from = EmailAddress::new(addr.to_string())
125 .with_context(|| format!("Invalid address {addr:?}"))?;
126 self.from = Some(from);
127
128 let login_params =
129 prioritize_server_login_params(&context.sql, login_params, "smtp").await?;
130 let mut first_error = None;
131 for lp in login_params {
132 info!(context, "SMTP trying to connect to {}.", &lp.connection);
133 let transport = match connect::connect_and_auth(
134 context,
135 proxy_config,
136 strict_tls,
137 lp.connection.clone(),
138 oauth2,
139 addr,
140 &lp.user,
141 password,
142 )
143 .await
144 {
145 Ok(transport) => transport,
146 Err(err) => {
147 warn!(context, "SMTP failed to connect and authenticate: {err:#}.");
148 first_error.get_or_insert(err);
149 continue;
150 }
151 };
152
153 self.transport = Some(transport);
154 self.last_success = Some(tools::Time::now());
155
156 context.emit_event(EventType::SmtpConnected(format!(
157 "SMTP-LOGIN as {} ok",
158 lp.user,
159 )));
160 return Ok(());
161 }
162
163 Err(first_error.unwrap_or_else(|| format_err!("No SMTP connection candidates provided")))
164 }
165}
166
167pub(crate) enum SendResult {
168 Success,
170
171 Failure(Error),
173
174 Retry,
176}
177
178pub(crate) async fn smtp_send(
180 context: &Context,
181 recipients: &[async_smtp::EmailAddress],
182 message: &str,
183 smtp: &mut Smtp,
184 msg_id: Option<MsgId>,
185) -> SendResult {
186 if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
187 info!(context, "SMTP-sending out mime message:\n{message}");
188 }
189
190 smtp.connectivity.set_working(context).await;
191
192 if let Err(err) = smtp
193 .connect_configured(context)
194 .await
195 .context("Failed to open SMTP connection")
196 {
197 smtp.last_send_error = Some(format!("{err:#}"));
198 return SendResult::Retry;
199 }
200
201 let send_result = smtp.send(context, recipients, message.as_bytes()).await;
202 smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
203
204 let status = match send_result {
205 Err(crate::smtp::send::Error::SmtpSend(err)) => {
206 info!(context, "SMTP failed to send: {:?}.", &err);
208
209 let res = match err {
210 async_smtp::error::Error::Permanent(ref response) => {
211 let maybe_transient = match response.code {
214 Code {
217 category: Category::MailSystem,
218 detail: Detail::Zero,
219 ..
220 } => {
221 response.first_word() == Some("5.5.0")
229 }
230 _ => false,
231 };
232
233 if maybe_transient {
234 info!(context, "Permanent error that is likely to actually be transient, postponing retry for later.");
235 SendResult::Retry
236 } else {
237 info!(context, "Permanent error, message sending failed.");
238 SendResult::Failure(format_err!("Permanent SMTP error: {}", err))
243 }
244 }
245 async_smtp::error::Error::Transient(ref response) => {
246 info!(
265 context,
266 "Transient error {response:?}, postponing retry for later."
267 );
268 SendResult::Retry
269 }
270 _ => {
271 info!(
272 context,
273 "Message sending failed without error returned by the server, retry later."
274 );
275 SendResult::Retry
276 }
277 };
278
279 info!(context, "Failed to send message over SMTP, disconnecting.");
281 smtp.disconnect();
282
283 res
284 }
285 Err(crate::smtp::send::Error::Envelope(err)) => {
286 smtp.disconnect();
288 warn!(context, "SMTP job is invalid: {err:#}.");
289 SendResult::Failure(err)
290 }
291 Err(crate::smtp::send::Error::NoTransport) => {
292 error!(context, "SMTP job failed because SMTP has no transport.");
295 SendResult::Failure(format_err!("SMTP has not transport"))
296 }
297 Err(crate::smtp::send::Error::Other(err)) => {
298 smtp.disconnect();
300 warn!(context, "Unable to load SMTP job: {err:#}.");
301 SendResult::Failure(err)
302 }
303 Ok(()) => SendResult::Success,
304 };
305
306 if let SendResult::Failure(err) = &status {
307 if let Some(msg_id) = msg_id {
308 match Message::load_from_db(context, msg_id).await {
310 Ok(mut msg) => {
311 if let Err(err) =
312 message::set_msg_failed(context, &mut msg, &err.to_string()).await
313 {
314 error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
315 }
316 }
317 Err(err) => {
318 error!(
319 context,
320 "Failed to load {msg_id} to mark it as failed: {err:#}."
321 );
322 }
323 }
324 }
325 }
326 status
327}
328
329pub(crate) async fn send_msg_to_smtp(
333 context: &Context,
334 smtp: &mut Smtp,
335 rowid: i64,
336) -> anyhow::Result<()> {
337 if let Err(err) = smtp
338 .connect_configured(context)
339 .await
340 .context("SMTP connection failure")
341 {
342 smtp.last_send_error = Some(format!("{err:#}"));
343 return Err(err);
344 }
345
346 context
351 .sql
352 .execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
353 .await
354 .context("failed to update retries count")?;
355
356 let Some((body, recipients, msg_id, retries)) = context
357 .sql
358 .query_row_optional(
359 "SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
360 (rowid,),
361 |row| {
362 let mime: String = row.get(0)?;
363 let recipients: String = row.get(1)?;
364 let msg_id: MsgId = row.get(2)?;
365 let retries: i64 = row.get(3)?;
366 Ok((mime, recipients, msg_id, retries))
367 },
368 )
369 .await?
370 else {
371 return Ok(());
372 };
373 if retries > 6 {
374 if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
375 message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.")
376 .await?;
377 }
378 context
379 .sql
380 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
381 .await
382 .context("Failed to remove message with exceeded retry limit from smtp table")?;
383 return Ok(());
384 }
385 info!(
386 context,
387 "Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP."
388 );
389
390 let recipients_list = recipients
391 .split(' ')
392 .filter_map(
393 |addr| match async_smtp::EmailAddress::new(addr.to_string()) {
394 Ok(addr) => Some(addr),
395 Err(err) => {
396 warn!(context, "Invalid recipient: {} {:?}.", addr, err);
397 None
398 }
399 },
400 )
401 .collect::<Vec<_>>();
402
403 let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
404
405 match status {
406 SendResult::Retry => {}
407 SendResult::Success => {
408 context
409 .sql
410 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
411 .await?;
412 }
413 SendResult::Failure(ref err) => {
414 if err.to_string().contains("Invalid unencrypted mail") {
415 let res = context
416 .sql
417 .query_row_optional(
418 "SELECT chat_id, timestamp FROM msgs WHERE id=?;",
419 (msg_id,),
420 |row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
421 )
422 .await?;
423
424 if let Some((chat_id, timestamp_sort)) = res {
425 let addr = context.get_config(Config::ConfiguredAddr).await?;
426 let text = unencrypted_email(
427 context,
428 addr.unwrap_or_default()
429 .split('@')
430 .nth(1)
431 .unwrap_or_default(),
432 )
433 .await;
434 add_info_msg_with_cmd(
435 context,
436 chat_id,
437 &text,
438 crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
439 timestamp_sort,
440 None,
441 None,
442 None,
443 None,
444 )
445 .await?;
446 };
447 }
448 context
449 .sql
450 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
451 .await?;
452 }
453 };
454
455 match status {
456 SendResult::Retry => Err(format_err!("Retry")),
457 SendResult::Success => {
458 if !context
459 .sql
460 .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
461 .await?
462 {
463 msg_id.set_delivered(context).await?;
464 }
465 Ok(())
466 }
467 SendResult::Failure(err) => Err(format_err!("{}", err)),
468 }
469}
470
471async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
473 loop {
474 if !context.ratelimit.read().await.can_send() {
475 info!(context, "Ratelimiter does not allow sending MDNs now.");
476 return Ok(());
477 }
478
479 let more_mdns = send_mdn(context, connection).await?;
480 if !more_mdns {
481 return Ok(());
483 }
484 }
485}
486
487pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
489 let ratelimited = if context.ratelimit.read().await.can_send() {
490 context.flush_status_updates().await?;
492 false
493 } else {
494 true
495 };
496
497 let rowids = context
498 .sql
499 .query_map(
500 "SELECT id FROM smtp ORDER BY id ASC",
501 (),
502 |row| {
503 let rowid: i64 = row.get(0)?;
504 Ok(rowid)
505 },
506 |rowids| {
507 rowids
508 .collect::<std::result::Result<Vec<_>, _>>()
509 .map_err(Into::into)
510 },
511 )
512 .await?;
513
514 info!(context, "Selected rows from SMTP queue: {rowids:?}.");
515 for rowid in rowids {
516 send_msg_to_smtp(context, connection, rowid)
517 .await
518 .context("Failed to send message")?;
519 }
520
521 if !ratelimited {
525 send_mdns(context, connection)
526 .await
527 .context("Failed to send MDNs")?;
528 }
529 Ok(())
530}
531
532async fn send_mdn_rfc724_mid(
542 context: &Context,
543 rfc724_mid: &str,
544 contact_id: ContactId,
545 smtp: &mut Smtp,
546) -> Result<bool> {
547 let contact = Contact::get_by_id(context, contact_id).await?;
548 if contact.is_blocked() {
549 return Err(format_err!("Contact is blocked"));
550 }
551
552 let additional_rfc724_mids: Vec<String> = context
554 .sql
555 .query_map(
556 "SELECT rfc724_mid
557 FROM smtp_mdns
558 WHERE from_id=? AND rfc724_mid!=?",
559 (contact_id, &rfc724_mid),
560 |row| {
561 let rfc724_mid: String = row.get(0)?;
562 Ok(rfc724_mid)
563 },
564 |rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
565 )
566 .await?
567 .into_iter()
568 .collect();
569
570 let mimefactory = MimeFactory::from_mdn(
571 context,
572 contact_id,
573 rfc724_mid.to_string(),
574 additional_rfc724_mids.clone(),
575 )
576 .await?;
577 let rendered_msg = mimefactory.render(context).await?;
578 let body = rendered_msg.message;
579
580 let addr = contact.get_addr();
581 let recipient = async_smtp::EmailAddress::new(addr.to_string())
582 .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?;
583 let recipients = vec![recipient];
584
585 match smtp_send(context, &recipients, &body, smtp, None).await {
586 SendResult::Success => {
587 info!(context, "Successfully sent MDN for {rfc724_mid}.");
588 context
589 .sql
590 .transaction(|transaction| {
591 let mut stmt =
592 transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
593 stmt.execute((rfc724_mid,))?;
594 for additional_rfc724_mid in additional_rfc724_mids {
595 stmt.execute((additional_rfc724_mid,))?;
596 }
597 Ok(())
598 })
599 .await?;
600 Ok(true)
601 }
602 SendResult::Retry => {
603 info!(
604 context,
605 "Temporary SMTP failure while sending an MDN for {rfc724_mid}."
606 );
607 Ok(false)
608 }
609 SendResult::Failure(err) => Err(err),
610 }
611}
612
613async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
615 if !context.should_send_mdns().await? {
616 context.sql.execute("DELETE FROM smtp_mdns", []).await?;
617 return Ok(false);
618 }
619 info!(context, "Sending MDNs.");
620
621 context
622 .sql
623 .execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
624 .await?;
625 let Some(msg_row) = context
626 .sql
627 .query_row_optional(
628 "SELECT rfc724_mid, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
629 [],
630 |row| {
631 let rfc724_mid: String = row.get(0)?;
632 let from_id: ContactId = row.get(1)?;
633 Ok((rfc724_mid, from_id))
634 },
635 )
636 .await?
637 else {
638 return Ok(false);
639 };
640 let (rfc724_mid, contact_id) = msg_row;
641
642 context
643 .sql
644 .execute(
645 "UPDATE smtp_mdns SET retries=retries+1 WHERE rfc724_mid=?",
646 (rfc724_mid.clone(),),
647 )
648 .await
649 .context("Failed to update MDN retries count")?;
650
651 match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
652 Err(err) => {
653 warn!(
656 context,
657 "Error sending MDN for {rfc724_mid}, removing it: {err:#}."
658 );
659 context
660 .sql
661 .execute("DELETE FROM smtp_mdns WHERE rfc724_mid = ?", (rfc724_mid,))
662 .await?;
663 Err(err)
664 }
665 Ok(false) => {
666 bail!("Temporary error while sending an MDN");
667 }
668 Ok(true) => {
669 Ok(true)
671 }
672 }
673}