1mod connect;
4pub mod send;
5
6use anyhow::{Context as _, Error, Result, bail, format_err};
7use async_smtp::response::{Category, Code, Detail};
8use async_smtp::{EmailAddress, SmtpTransport};
9use tokio::task;
10
11use crate::chat::{ChatId, add_info_msg_with_cmd};
12use crate::config::Config;
13use crate::contact::{Contact, ContactId};
14use crate::context::Context;
15use crate::events::EventType;
16use crate::log::warn;
17use crate::message::Message;
18use crate::message::{self, MsgId};
19use crate::mimefactory::MimeFactory;
20use crate::net::proxy::ProxyConfig;
21use crate::net::session::SessionBufStream;
22use crate::scheduler::connectivity::ConnectivityStore;
23use crate::stock_str::unencrypted_email;
24use crate::tools::{self, time_elapsed};
25use crate::transport::{
26 ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
27};
28
29#[derive(Default)]
30pub(crate) struct Smtp {
31 transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
33
34 from: Option<EmailAddress>,
36
37 last_success: Option<tools::Time>,
41
42 pub(crate) connectivity: ConnectivityStore,
43
44 pub(crate) last_send_error: Option<String>,
46}
47
48impl Smtp {
49 pub fn new() -> Self {
51 Default::default()
52 }
53
54 pub fn disconnect(&mut self) {
56 if let Some(mut transport) = self.transport.take() {
57 task::spawn(async move { transport.quit().await });
61 }
62 self.last_success = None;
63 }
64
65 pub fn has_maybe_stale_connection(&self) -> bool {
68 if let Some(last_success) = self.last_success {
69 time_elapsed(&last_success).as_secs() > 60
70 } else {
71 false
72 }
73 }
74
75 pub fn is_connected(&self) -> bool {
77 self.transport.is_some()
78 }
79
80 pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
82 if self.has_maybe_stale_connection() {
83 info!(context, "Closing stale connection.");
84 self.disconnect();
85 }
86
87 if self.is_connected() {
88 return Ok(());
89 }
90
91 self.connectivity.set_connecting(context);
92 let lp = ConfiguredLoginParam::load(context)
93 .await?
94 .context("Not configured")?;
95 let proxy_config = ProxyConfig::load(context).await?;
96 self.connect(
97 context,
98 &lp.smtp,
99 &lp.smtp_password,
100 &proxy_config,
101 &lp.addr,
102 lp.strict_tls(proxy_config.is_some()),
103 lp.oauth2,
104 )
105 .await
106 }
107
108 #[expect(clippy::too_many_arguments)]
110 pub async fn connect(
111 &mut self,
112 context: &Context,
113 login_params: &[ConfiguredServerLoginParam],
114 password: &str,
115 proxy_config: &Option<ProxyConfig>,
116 addr: &str,
117 strict_tls: bool,
118 oauth2: bool,
119 ) -> Result<()> {
120 if self.is_connected() {
121 warn!(context, "SMTP already connected.");
122 return Ok(());
123 }
124
125 let from = EmailAddress::new(addr.to_string())
126 .with_context(|| format!("Invalid address {addr:?}"))?;
127 self.from = Some(from);
128
129 let login_params =
130 prioritize_server_login_params(&context.sql, login_params, "smtp").await?;
131 let mut first_error = None;
132 for lp in login_params {
133 info!(context, "SMTP trying to connect to {}.", &lp.connection);
134 let transport = match connect::connect_and_auth(
135 context,
136 proxy_config,
137 strict_tls,
138 lp.connection.clone(),
139 oauth2,
140 addr,
141 &lp.user,
142 password,
143 )
144 .await
145 {
146 Ok(transport) => transport,
147 Err(err) => {
148 warn!(context, "SMTP failed to connect and authenticate: {err:#}.");
149 first_error.get_or_insert(err);
150 continue;
151 }
152 };
153
154 self.transport = Some(transport);
155 self.last_success = Some(tools::Time::now());
156
157 context.emit_event(EventType::SmtpConnected(format!(
158 "SMTP-LOGIN as {} ok",
159 lp.user,
160 )));
161 return Ok(());
162 }
163
164 Err(first_error.unwrap_or_else(|| format_err!("No SMTP connection candidates provided")))
165 }
166}
167
168pub(crate) enum SendResult {
169 Success,
171
172 Failure(Error),
174
175 Retry,
177}
178
179pub(crate) async fn smtp_send(
181 context: &Context,
182 recipients: &[async_smtp::EmailAddress],
183 message: &str,
184 smtp: &mut Smtp,
185 msg_id: Option<MsgId>,
186) -> SendResult {
187 if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
188 info!(context, "SMTP-sending out mime message:\n{message}");
189 }
190
191 smtp.connectivity.set_working(context);
192
193 if let Err(err) = smtp
194 .connect_configured(context)
195 .await
196 .context("Failed to open SMTP connection")
197 {
198 smtp.last_send_error = Some(format!("{err:#}"));
199 return SendResult::Retry;
200 }
201
202 let send_result = smtp.send(context, recipients, message.as_bytes()).await;
203 smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
204
205 let status = match send_result {
206 Err(crate::smtp::send::Error::SmtpSend(err)) => {
207 info!(context, "SMTP failed to send: {:?}.", &err);
209
210 let res = match err {
211 async_smtp::error::Error::Permanent(ref response) => {
212 let maybe_transient = match response.code {
215 Code {
218 category: Category::MailSystem,
219 detail: Detail::Zero,
220 ..
221 } => {
222 response.first_word() == Some("5.5.0")
230 }
231 _ => false,
232 };
233
234 if maybe_transient {
235 info!(
236 context,
237 "Permanent error that is likely to actually be transient, postponing retry for later."
238 );
239 SendResult::Retry
240 } else {
241 info!(context, "Permanent error, message sending failed.");
242 SendResult::Failure(format_err!("Permanent SMTP error: {err}"))
247 }
248 }
249 async_smtp::error::Error::Transient(ref response) => {
250 info!(
269 context,
270 "Transient error {response:?}, postponing retry for later."
271 );
272 SendResult::Retry
273 }
274 _ => {
275 info!(
276 context,
277 "Message sending failed without error returned by the server, retry later."
278 );
279 SendResult::Retry
280 }
281 };
282
283 info!(context, "Failed to send message over SMTP, disconnecting.");
285 smtp.disconnect();
286
287 res
288 }
289 Err(crate::smtp::send::Error::Envelope(err)) => {
290 smtp.disconnect();
292 warn!(context, "SMTP job is invalid: {err:#}.");
293 SendResult::Failure(err)
294 }
295 Err(crate::smtp::send::Error::NoTransport) => {
296 error!(context, "SMTP job failed because SMTP has no transport.");
299 SendResult::Failure(format_err!("SMTP has not transport"))
300 }
301 Err(crate::smtp::send::Error::Other(err)) => {
302 smtp.disconnect();
304 warn!(context, "Unable to load SMTP job: {err:#}.");
305 SendResult::Failure(err)
306 }
307 Ok(()) => SendResult::Success,
308 };
309
310 if let SendResult::Failure(err) = &status {
311 if let Some(msg_id) = msg_id {
312 match Message::load_from_db(context, msg_id).await {
314 Ok(mut msg) => {
315 if let Err(err) =
316 message::set_msg_failed(context, &mut msg, &err.to_string()).await
317 {
318 error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
319 }
320 }
321 Err(err) => {
322 error!(
323 context,
324 "Failed to load {msg_id} to mark it as failed: {err:#}."
325 );
326 }
327 }
328 }
329 }
330 status
331}
332
333pub(crate) async fn send_msg_to_smtp(
337 context: &Context,
338 smtp: &mut Smtp,
339 rowid: i64,
340) -> anyhow::Result<()> {
341 if let Err(err) = smtp
342 .connect_configured(context)
343 .await
344 .context("SMTP connection failure")
345 {
346 smtp.last_send_error = Some(format!("{err:#}"));
347 return Err(err);
348 }
349
350 context
355 .sql
356 .execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
357 .await
358 .context("failed to update retries count")?;
359
360 let Some((body, recipients, msg_id, retries)) = context
361 .sql
362 .query_row_optional(
363 "SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
364 (rowid,),
365 |row| {
366 let mime: String = row.get(0)?;
367 let recipients: String = row.get(1)?;
368 let msg_id: MsgId = row.get(2)?;
369 let retries: i64 = row.get(3)?;
370 Ok((mime, recipients, msg_id, retries))
371 },
372 )
373 .await?
374 else {
375 return Ok(());
376 };
377 if retries > 6 {
378 if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
379 message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.")
380 .await?;
381 }
382 context
383 .sql
384 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
385 .await
386 .context("Failed to remove message with exceeded retry limit from smtp table")?;
387 return Ok(());
388 }
389 info!(
390 context,
391 "Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP."
392 );
393
394 let recipients_list = recipients
395 .split(' ')
396 .filter_map(
397 |addr| match async_smtp::EmailAddress::new(addr.to_string()) {
398 Ok(addr) => Some(addr),
399 Err(err) => {
400 warn!(context, "Invalid recipient: {} {:?}.", addr, err);
401 None
402 }
403 },
404 )
405 .collect::<Vec<_>>();
406
407 let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
408
409 match status {
410 SendResult::Retry => {}
411 SendResult::Success => {
412 context
413 .sql
414 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
415 .await?;
416 }
417 SendResult::Failure(ref err) => {
418 if err
419 .to_string()
420 .to_lowercase()
421 .contains("invalid unencrypted mail")
422 {
423 let res = context
424 .sql
425 .query_row_optional(
426 "SELECT chat_id, timestamp FROM msgs WHERE id=?;",
427 (msg_id,),
428 |row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
429 )
430 .await?;
431
432 if let Some((chat_id, timestamp_sort)) = res {
433 let addr = context.get_config(Config::ConfiguredAddr).await?;
434 let text = unencrypted_email(
435 context,
436 addr.unwrap_or_default()
437 .split('@')
438 .nth(1)
439 .unwrap_or_default(),
440 )
441 .await;
442 add_info_msg_with_cmd(
443 context,
444 chat_id,
445 &text,
446 crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
447 timestamp_sort,
448 None,
449 None,
450 None,
451 None,
452 )
453 .await?;
454 };
455 }
456 context
457 .sql
458 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
459 .await?;
460 }
461 };
462
463 match status {
464 SendResult::Retry => Err(format_err!("Retry")),
465 SendResult::Success => {
466 if !context
467 .sql
468 .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
469 .await?
470 {
471 msg_id.set_delivered(context).await?;
472 }
473 Ok(())
474 }
475 SendResult::Failure(err) => Err(format_err!("{err}")),
476 }
477}
478
479async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
481 loop {
482 if !context.ratelimit.read().await.can_send() {
483 info!(context, "Ratelimiter does not allow sending MDNs now.");
484 return Ok(());
485 }
486
487 let more_mdns = send_mdn(context, connection).await?;
488 if !more_mdns {
489 return Ok(());
491 }
492 }
493}
494
495pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
497 let ratelimited = if context.ratelimit.read().await.can_send() {
498 context.flush_status_updates().await?;
500 false
501 } else {
502 true
503 };
504
505 let rowids = context
506 .sql
507 .query_map_vec("SELECT id FROM smtp ORDER BY id ASC", (), |row| {
508 let rowid: i64 = row.get(0)?;
509 Ok(rowid)
510 })
511 .await?;
512
513 info!(context, "Selected rows from SMTP queue: {rowids:?}.");
514 for rowid in rowids {
515 send_msg_to_smtp(context, connection, rowid)
516 .await
517 .context("Failed to send message")?;
518 }
519
520 if !ratelimited {
524 send_mdns(context, connection)
525 .await
526 .context("Failed to send MDNs")?;
527 }
528 Ok(())
529}
530
531async fn send_mdn_rfc724_mid(
541 context: &Context,
542 rfc724_mid: &str,
543 contact_id: ContactId,
544 smtp: &mut Smtp,
545) -> Result<bool> {
546 let contact = Contact::get_by_id(context, contact_id).await?;
547 if contact.is_blocked() {
548 return Err(format_err!("Contact is blocked"));
549 }
550
551 let additional_rfc724_mids = context
553 .sql
554 .query_map_vec(
555 "SELECT rfc724_mid
556 FROM smtp_mdns
557 WHERE from_id=? AND rfc724_mid!=?",
558 (contact_id, &rfc724_mid),
559 |row| {
560 let rfc724_mid: String = row.get(0)?;
561 Ok(rfc724_mid)
562 },
563 )
564 .await?;
565
566 let mimefactory = MimeFactory::from_mdn(
567 context,
568 contact_id,
569 rfc724_mid.to_string(),
570 additional_rfc724_mids.clone(),
571 )
572 .await?;
573 let rendered_msg = mimefactory.render(context).await?;
574 let body = rendered_msg.message;
575
576 let addr = contact.get_addr();
577 let recipient = async_smtp::EmailAddress::new(addr.to_string())
578 .map_err(|err| format_err!("invalid recipient: {addr} {err:?}"))?;
579 let recipients = vec![recipient];
580
581 match smtp_send(context, &recipients, &body, smtp, None).await {
582 SendResult::Success => {
583 info!(context, "Successfully sent MDN for {rfc724_mid}.");
584 context
585 .sql
586 .transaction(|transaction| {
587 let mut stmt =
588 transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
589 stmt.execute((rfc724_mid,))?;
590 for additional_rfc724_mid in additional_rfc724_mids {
591 stmt.execute((additional_rfc724_mid,))?;
592 }
593 Ok(())
594 })
595 .await?;
596 Ok(true)
597 }
598 SendResult::Retry => {
599 info!(
600 context,
601 "Temporary SMTP failure while sending an MDN for {rfc724_mid}."
602 );
603 Ok(false)
604 }
605 SendResult::Failure(err) => Err(err),
606 }
607}
608
609async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
611 if !context.should_send_mdns().await? {
612 context.sql.execute("DELETE FROM smtp_mdns", []).await?;
613 return Ok(false);
614 }
615 info!(context, "Sending MDNs.");
616
617 context
618 .sql
619 .execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
620 .await?;
621 let Some(msg_row) = context
622 .sql
623 .query_row_optional(
624 "SELECT rfc724_mid, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
625 [],
626 |row| {
627 let rfc724_mid: String = row.get(0)?;
628 let from_id: ContactId = row.get(1)?;
629 Ok((rfc724_mid, from_id))
630 },
631 )
632 .await?
633 else {
634 return Ok(false);
635 };
636 let (rfc724_mid, contact_id) = msg_row;
637
638 context
639 .sql
640 .execute(
641 "UPDATE smtp_mdns SET retries=retries+1 WHERE rfc724_mid=?",
642 (rfc724_mid.clone(),),
643 )
644 .await
645 .context("Failed to update MDN retries count")?;
646
647 match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
648 Err(err) => {
649 warn!(
652 context,
653 "Error sending MDN for {rfc724_mid}, removing it: {err:#}."
654 );
655 context
656 .sql
657 .execute("DELETE FROM smtp_mdns WHERE rfc724_mid = ?", (rfc724_mid,))
658 .await?;
659 Err(err)
660 }
661 Ok(false) => {
662 bail!("Temporary error while sending an MDN");
663 }
664 Ok(true) => {
665 Ok(true)
667 }
668 }
669}