1mod connect;
4pub mod send;
5
6use anyhow::{Context as _, Error, Result, bail, format_err};
7use async_smtp::response::{Category, Code, Detail};
8use async_smtp::{EmailAddress, SmtpTransport};
9use tokio::task;
10
11use crate::chat::{ChatId, add_info_msg_with_cmd};
12use crate::config::Config;
13use crate::contact::{Contact, ContactId};
14use crate::context::Context;
15use crate::events::EventType;
16use crate::log::{error, info, warn};
17use crate::login_param::prioritize_server_login_params;
18use crate::login_param::{ConfiguredLoginParam, ConfiguredServerLoginParam};
19use crate::message::Message;
20use crate::message::{self, MsgId};
21use crate::mimefactory::MimeFactory;
22use crate::net::proxy::ProxyConfig;
23use crate::net::session::SessionBufStream;
24use crate::scheduler::connectivity::ConnectivityStore;
25use crate::stock_str::unencrypted_email;
26use crate::tools::{self, time_elapsed};
27
28#[derive(Default)]
29pub(crate) struct Smtp {
30 transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
32
33 from: Option<EmailAddress>,
35
36 last_success: Option<tools::Time>,
40
41 pub(crate) connectivity: ConnectivityStore,
42
43 pub(crate) last_send_error: Option<String>,
45}
46
47impl Smtp {
48 pub fn new() -> Self {
50 Default::default()
51 }
52
53 pub fn disconnect(&mut self) {
55 if let Some(mut transport) = self.transport.take() {
56 task::spawn(async move { transport.quit().await });
60 }
61 self.last_success = None;
62 }
63
64 pub fn has_maybe_stale_connection(&self) -> bool {
67 if let Some(last_success) = self.last_success {
68 time_elapsed(&last_success).as_secs() > 60
69 } else {
70 false
71 }
72 }
73
74 pub fn is_connected(&self) -> bool {
76 self.transport.is_some()
77 }
78
79 pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
81 if self.has_maybe_stale_connection() {
82 info!(context, "Closing stale connection.");
83 self.disconnect();
84 }
85
86 if self.is_connected() {
87 return Ok(());
88 }
89
90 self.connectivity.set_connecting(context).await;
91 let lp = ConfiguredLoginParam::load(context)
92 .await?
93 .context("Not configured")?;
94 let proxy_config = ProxyConfig::load(context).await?;
95 self.connect(
96 context,
97 &lp.smtp,
98 &lp.smtp_password,
99 &proxy_config,
100 &lp.addr,
101 lp.strict_tls(proxy_config.is_some()),
102 lp.oauth2,
103 )
104 .await
105 }
106
107 #[expect(clippy::too_many_arguments)]
109 pub async fn connect(
110 &mut self,
111 context: &Context,
112 login_params: &[ConfiguredServerLoginParam],
113 password: &str,
114 proxy_config: &Option<ProxyConfig>,
115 addr: &str,
116 strict_tls: bool,
117 oauth2: bool,
118 ) -> Result<()> {
119 if self.is_connected() {
120 warn!(context, "SMTP already connected.");
121 return Ok(());
122 }
123
124 let from = EmailAddress::new(addr.to_string())
125 .with_context(|| format!("Invalid address {addr:?}"))?;
126 self.from = Some(from);
127
128 let login_params =
129 prioritize_server_login_params(&context.sql, login_params, "smtp").await?;
130 let mut first_error = None;
131 for lp in login_params {
132 info!(context, "SMTP trying to connect to {}.", &lp.connection);
133 let transport = match connect::connect_and_auth(
134 context,
135 proxy_config,
136 strict_tls,
137 lp.connection.clone(),
138 oauth2,
139 addr,
140 &lp.user,
141 password,
142 )
143 .await
144 {
145 Ok(transport) => transport,
146 Err(err) => {
147 warn!(context, "SMTP failed to connect and authenticate: {err:#}.");
148 first_error.get_or_insert(err);
149 continue;
150 }
151 };
152
153 self.transport = Some(transport);
154 self.last_success = Some(tools::Time::now());
155
156 context.emit_event(EventType::SmtpConnected(format!(
157 "SMTP-LOGIN as {} ok",
158 lp.user,
159 )));
160 return Ok(());
161 }
162
163 Err(first_error.unwrap_or_else(|| format_err!("No SMTP connection candidates provided")))
164 }
165}
166
167pub(crate) enum SendResult {
168 Success,
170
171 Failure(Error),
173
174 Retry,
176}
177
178pub(crate) async fn smtp_send(
180 context: &Context,
181 recipients: &[async_smtp::EmailAddress],
182 message: &str,
183 smtp: &mut Smtp,
184 msg_id: Option<MsgId>,
185) -> SendResult {
186 if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
187 info!(context, "SMTP-sending out mime message:\n{message}");
188 }
189
190 smtp.connectivity.set_working(context).await;
191
192 if let Err(err) = smtp
193 .connect_configured(context)
194 .await
195 .context("Failed to open SMTP connection")
196 {
197 smtp.last_send_error = Some(format!("{err:#}"));
198 return SendResult::Retry;
199 }
200
201 let send_result = smtp.send(context, recipients, message.as_bytes()).await;
202 smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
203
204 let status = match send_result {
205 Err(crate::smtp::send::Error::SmtpSend(err)) => {
206 info!(context, "SMTP failed to send: {:?}.", &err);
208
209 let res = match err {
210 async_smtp::error::Error::Permanent(ref response) => {
211 let maybe_transient = match response.code {
214 Code {
217 category: Category::MailSystem,
218 detail: Detail::Zero,
219 ..
220 } => {
221 response.first_word() == Some("5.5.0")
229 }
230 _ => false,
231 };
232
233 if maybe_transient {
234 info!(
235 context,
236 "Permanent error that is likely to actually be transient, postponing retry for later."
237 );
238 SendResult::Retry
239 } else {
240 info!(context, "Permanent error, message sending failed.");
241 SendResult::Failure(format_err!("Permanent SMTP error: {}", err))
246 }
247 }
248 async_smtp::error::Error::Transient(ref response) => {
249 info!(
268 context,
269 "Transient error {response:?}, postponing retry for later."
270 );
271 SendResult::Retry
272 }
273 _ => {
274 info!(
275 context,
276 "Message sending failed without error returned by the server, retry later."
277 );
278 SendResult::Retry
279 }
280 };
281
282 info!(context, "Failed to send message over SMTP, disconnecting.");
284 smtp.disconnect();
285
286 res
287 }
288 Err(crate::smtp::send::Error::Envelope(err)) => {
289 smtp.disconnect();
291 warn!(context, "SMTP job is invalid: {err:#}.");
292 SendResult::Failure(err)
293 }
294 Err(crate::smtp::send::Error::NoTransport) => {
295 error!(context, "SMTP job failed because SMTP has no transport.");
298 SendResult::Failure(format_err!("SMTP has not transport"))
299 }
300 Err(crate::smtp::send::Error::Other(err)) => {
301 smtp.disconnect();
303 warn!(context, "Unable to load SMTP job: {err:#}.");
304 SendResult::Failure(err)
305 }
306 Ok(()) => SendResult::Success,
307 };
308
309 if let SendResult::Failure(err) = &status {
310 if let Some(msg_id) = msg_id {
311 match Message::load_from_db(context, msg_id).await {
313 Ok(mut msg) => {
314 if let Err(err) =
315 message::set_msg_failed(context, &mut msg, &err.to_string()).await
316 {
317 error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
318 }
319 }
320 Err(err) => {
321 error!(
322 context,
323 "Failed to load {msg_id} to mark it as failed: {err:#}."
324 );
325 }
326 }
327 }
328 }
329 status
330}
331
332pub(crate) async fn send_msg_to_smtp(
336 context: &Context,
337 smtp: &mut Smtp,
338 rowid: i64,
339) -> anyhow::Result<()> {
340 if let Err(err) = smtp
341 .connect_configured(context)
342 .await
343 .context("SMTP connection failure")
344 {
345 smtp.last_send_error = Some(format!("{err:#}"));
346 return Err(err);
347 }
348
349 context
354 .sql
355 .execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
356 .await
357 .context("failed to update retries count")?;
358
359 let Some((body, recipients, msg_id, retries)) = context
360 .sql
361 .query_row_optional(
362 "SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
363 (rowid,),
364 |row| {
365 let mime: String = row.get(0)?;
366 let recipients: String = row.get(1)?;
367 let msg_id: MsgId = row.get(2)?;
368 let retries: i64 = row.get(3)?;
369 Ok((mime, recipients, msg_id, retries))
370 },
371 )
372 .await?
373 else {
374 return Ok(());
375 };
376 if retries > 6 {
377 if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
378 message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.")
379 .await?;
380 }
381 context
382 .sql
383 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
384 .await
385 .context("Failed to remove message with exceeded retry limit from smtp table")?;
386 return Ok(());
387 }
388 info!(
389 context,
390 "Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP."
391 );
392
393 let recipients_list = recipients
394 .split(' ')
395 .filter_map(
396 |addr| match async_smtp::EmailAddress::new(addr.to_string()) {
397 Ok(addr) => Some(addr),
398 Err(err) => {
399 warn!(context, "Invalid recipient: {} {:?}.", addr, err);
400 None
401 }
402 },
403 )
404 .collect::<Vec<_>>();
405
406 let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
407
408 match status {
409 SendResult::Retry => {}
410 SendResult::Success => {
411 context
412 .sql
413 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
414 .await?;
415 }
416 SendResult::Failure(ref err) => {
417 if err.to_string().contains("Invalid unencrypted mail") {
418 let res = context
419 .sql
420 .query_row_optional(
421 "SELECT chat_id, timestamp FROM msgs WHERE id=?;",
422 (msg_id,),
423 |row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
424 )
425 .await?;
426
427 if let Some((chat_id, timestamp_sort)) = res {
428 let addr = context.get_config(Config::ConfiguredAddr).await?;
429 let text = unencrypted_email(
430 context,
431 addr.unwrap_or_default()
432 .split('@')
433 .nth(1)
434 .unwrap_or_default(),
435 )
436 .await;
437 add_info_msg_with_cmd(
438 context,
439 chat_id,
440 &text,
441 crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
442 timestamp_sort,
443 None,
444 None,
445 None,
446 None,
447 )
448 .await?;
449 };
450 }
451 context
452 .sql
453 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
454 .await?;
455 }
456 };
457
458 match status {
459 SendResult::Retry => Err(format_err!("Retry")),
460 SendResult::Success => {
461 if !context
462 .sql
463 .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
464 .await?
465 {
466 msg_id.set_delivered(context).await?;
467 }
468 Ok(())
469 }
470 SendResult::Failure(err) => Err(format_err!("{}", err)),
471 }
472}
473
474async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
476 loop {
477 if !context.ratelimit.read().await.can_send() {
478 info!(context, "Ratelimiter does not allow sending MDNs now.");
479 return Ok(());
480 }
481
482 let more_mdns = send_mdn(context, connection).await?;
483 if !more_mdns {
484 return Ok(());
486 }
487 }
488}
489
490pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
492 let ratelimited = if context.ratelimit.read().await.can_send() {
493 context.flush_status_updates().await?;
495 false
496 } else {
497 true
498 };
499
500 let rowids = context
501 .sql
502 .query_map(
503 "SELECT id FROM smtp ORDER BY id ASC",
504 (),
505 |row| {
506 let rowid: i64 = row.get(0)?;
507 Ok(rowid)
508 },
509 |rowids| {
510 rowids
511 .collect::<std::result::Result<Vec<_>, _>>()
512 .map_err(Into::into)
513 },
514 )
515 .await?;
516
517 info!(context, "Selected rows from SMTP queue: {rowids:?}.");
518 for rowid in rowids {
519 send_msg_to_smtp(context, connection, rowid)
520 .await
521 .context("Failed to send message")?;
522 }
523
524 if !ratelimited {
528 send_mdns(context, connection)
529 .await
530 .context("Failed to send MDNs")?;
531 }
532 Ok(())
533}
534
535async fn send_mdn_rfc724_mid(
545 context: &Context,
546 rfc724_mid: &str,
547 contact_id: ContactId,
548 smtp: &mut Smtp,
549) -> Result<bool> {
550 let contact = Contact::get_by_id(context, contact_id).await?;
551 if contact.is_blocked() {
552 return Err(format_err!("Contact is blocked"));
553 }
554
555 let additional_rfc724_mids: Vec<String> = context
557 .sql
558 .query_map(
559 "SELECT rfc724_mid
560 FROM smtp_mdns
561 WHERE from_id=? AND rfc724_mid!=?",
562 (contact_id, &rfc724_mid),
563 |row| {
564 let rfc724_mid: String = row.get(0)?;
565 Ok(rfc724_mid)
566 },
567 |rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
568 )
569 .await?
570 .into_iter()
571 .collect();
572
573 let mimefactory = MimeFactory::from_mdn(
574 context,
575 contact_id,
576 rfc724_mid.to_string(),
577 additional_rfc724_mids.clone(),
578 )
579 .await?;
580 let rendered_msg = mimefactory.render(context).await?;
581 let body = rendered_msg.message;
582
583 let addr = contact.get_addr();
584 let recipient = async_smtp::EmailAddress::new(addr.to_string())
585 .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?;
586 let recipients = vec![recipient];
587
588 match smtp_send(context, &recipients, &body, smtp, None).await {
589 SendResult::Success => {
590 info!(context, "Successfully sent MDN for {rfc724_mid}.");
591 context
592 .sql
593 .transaction(|transaction| {
594 let mut stmt =
595 transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
596 stmt.execute((rfc724_mid,))?;
597 for additional_rfc724_mid in additional_rfc724_mids {
598 stmt.execute((additional_rfc724_mid,))?;
599 }
600 Ok(())
601 })
602 .await?;
603 Ok(true)
604 }
605 SendResult::Retry => {
606 info!(
607 context,
608 "Temporary SMTP failure while sending an MDN for {rfc724_mid}."
609 );
610 Ok(false)
611 }
612 SendResult::Failure(err) => Err(err),
613 }
614}
615
616async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
618 if !context.should_send_mdns().await? {
619 context.sql.execute("DELETE FROM smtp_mdns", []).await?;
620 return Ok(false);
621 }
622 info!(context, "Sending MDNs.");
623
624 context
625 .sql
626 .execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
627 .await?;
628 let Some(msg_row) = context
629 .sql
630 .query_row_optional(
631 "SELECT rfc724_mid, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
632 [],
633 |row| {
634 let rfc724_mid: String = row.get(0)?;
635 let from_id: ContactId = row.get(1)?;
636 Ok((rfc724_mid, from_id))
637 },
638 )
639 .await?
640 else {
641 return Ok(false);
642 };
643 let (rfc724_mid, contact_id) = msg_row;
644
645 context
646 .sql
647 .execute(
648 "UPDATE smtp_mdns SET retries=retries+1 WHERE rfc724_mid=?",
649 (rfc724_mid.clone(),),
650 )
651 .await
652 .context("Failed to update MDN retries count")?;
653
654 match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
655 Err(err) => {
656 warn!(
659 context,
660 "Error sending MDN for {rfc724_mid}, removing it: {err:#}."
661 );
662 context
663 .sql
664 .execute("DELETE FROM smtp_mdns WHERE rfc724_mid = ?", (rfc724_mid,))
665 .await?;
666 Err(err)
667 }
668 Ok(false) => {
669 bail!("Temporary error while sending an MDN");
670 }
671 Ok(true) => {
672 Ok(true)
674 }
675 }
676}