1mod connect;
4pub mod send;
5
6use anyhow::{bail, format_err, Context as _, Error, Result};
7use async_smtp::response::{Category, Code, Detail};
8use async_smtp::{EmailAddress, SmtpTransport};
9use tokio::task;
10
11use crate::chat::{add_info_msg_with_cmd, ChatId};
12use crate::config::Config;
13use crate::contact::{Contact, ContactId};
14use crate::context::Context;
15use crate::events::EventType;
16use crate::login_param::prioritize_server_login_params;
17use crate::login_param::{ConfiguredLoginParam, ConfiguredServerLoginParam};
18use crate::message::Message;
19use crate::message::{self, MsgId};
20use crate::mimefactory::MimeFactory;
21use crate::net::proxy::ProxyConfig;
22use crate::net::session::SessionBufStream;
23use crate::scheduler::connectivity::ConnectivityStore;
24use crate::stock_str::unencrypted_email;
25use crate::tools::{self, time_elapsed};
26
27#[derive(Default)]
28pub(crate) struct Smtp {
29 transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
31
32 from: Option<EmailAddress>,
34
35 last_success: Option<tools::Time>,
39
40 pub(crate) connectivity: ConnectivityStore,
41
42 pub(crate) last_send_error: Option<String>,
44}
45
46impl Smtp {
47 pub fn new() -> Self {
49 Default::default()
50 }
51
52 pub fn disconnect(&mut self) {
54 if let Some(mut transport) = self.transport.take() {
55 task::spawn(async move { transport.quit().await });
59 }
60 self.last_success = None;
61 }
62
63 pub fn has_maybe_stale_connection(&self) -> bool {
66 if let Some(last_success) = self.last_success {
67 time_elapsed(&last_success).as_secs() > 60
68 } else {
69 false
70 }
71 }
72
73 pub fn is_connected(&self) -> bool {
75 self.transport.is_some()
76 }
77
78 pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
80 if self.has_maybe_stale_connection() {
81 info!(context, "Closing stale connection.");
82 self.disconnect();
83 }
84
85 if self.is_connected() {
86 return Ok(());
87 }
88
89 self.connectivity.set_connecting(context).await;
90 let lp = ConfiguredLoginParam::load(context)
91 .await?
92 .context("Not configured")?;
93 let proxy_config = ProxyConfig::load(context).await?;
94 self.connect(
95 context,
96 &lp.smtp,
97 &lp.smtp_password,
98 &proxy_config,
99 &lp.addr,
100 lp.strict_tls(proxy_config.is_some()),
101 lp.oauth2,
102 )
103 .await
104 }
105
106 #[expect(clippy::too_many_arguments)]
108 pub async fn connect(
109 &mut self,
110 context: &Context,
111 login_params: &[ConfiguredServerLoginParam],
112 password: &str,
113 proxy_config: &Option<ProxyConfig>,
114 addr: &str,
115 strict_tls: bool,
116 oauth2: bool,
117 ) -> Result<()> {
118 if self.is_connected() {
119 warn!(context, "SMTP already connected.");
120 return Ok(());
121 }
122
123 let from = EmailAddress::new(addr.to_string())
124 .with_context(|| format!("Invalid address {addr:?}"))?;
125 self.from = Some(from);
126
127 let login_params =
128 prioritize_server_login_params(&context.sql, login_params, "smtp").await?;
129 let mut first_error = None;
130 for lp in login_params {
131 info!(context, "SMTP trying to connect to {}.", &lp.connection);
132 let transport = match connect::connect_and_auth(
133 context,
134 proxy_config,
135 strict_tls,
136 lp.connection.clone(),
137 oauth2,
138 addr,
139 &lp.user,
140 password,
141 )
142 .await
143 {
144 Ok(transport) => transport,
145 Err(err) => {
146 warn!(context, "SMTP failed to connect and authenticate: {err:#}.");
147 first_error.get_or_insert(err);
148 continue;
149 }
150 };
151
152 self.transport = Some(transport);
153 self.last_success = Some(tools::Time::now());
154
155 context.emit_event(EventType::SmtpConnected(format!(
156 "SMTP-LOGIN as {} ok",
157 lp.user,
158 )));
159 return Ok(());
160 }
161
162 Err(first_error.unwrap_or_else(|| format_err!("No SMTP connection candidates provided")))
163 }
164}
165
166pub(crate) enum SendResult {
167 Success,
169
170 Failure(Error),
172
173 Retry,
175}
176
177pub(crate) async fn smtp_send(
179 context: &Context,
180 recipients: &[async_smtp::EmailAddress],
181 message: &str,
182 smtp: &mut Smtp,
183 msg_id: Option<MsgId>,
184) -> SendResult {
185 if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
186 info!(context, "SMTP-sending out mime message:\n{message}");
187 }
188
189 smtp.connectivity.set_working(context).await;
190
191 if let Err(err) = smtp
192 .connect_configured(context)
193 .await
194 .context("Failed to open SMTP connection")
195 {
196 smtp.last_send_error = Some(format!("{err:#}"));
197 return SendResult::Retry;
198 }
199
200 let send_result = smtp.send(context, recipients, message.as_bytes()).await;
201 smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
202
203 let status = match send_result {
204 Err(crate::smtp::send::Error::SmtpSend(err)) => {
205 info!(context, "SMTP failed to send: {:?}.", &err);
207
208 let res = match err {
209 async_smtp::error::Error::Permanent(ref response) => {
210 let maybe_transient = match response.code {
213 Code {
216 category: Category::MailSystem,
217 detail: Detail::Zero,
218 ..
219 } => {
220 response.first_word() == Some("5.5.0")
228 }
229 _ => false,
230 };
231
232 if maybe_transient {
233 info!(context, "Permanent error that is likely to actually be transient, postponing retry for later.");
234 SendResult::Retry
235 } else {
236 info!(context, "Permanent error, message sending failed.");
237 SendResult::Failure(format_err!("Permanent SMTP error: {}", err))
242 }
243 }
244 async_smtp::error::Error::Transient(ref response) => {
245 info!(
264 context,
265 "Transient error {response:?}, postponing retry for later."
266 );
267 SendResult::Retry
268 }
269 _ => {
270 info!(
271 context,
272 "Message sending failed without error returned by the server, retry later."
273 );
274 SendResult::Retry
275 }
276 };
277
278 info!(context, "Failed to send message over SMTP, disconnecting.");
280 smtp.disconnect();
281
282 res
283 }
284 Err(crate::smtp::send::Error::Envelope(err)) => {
285 smtp.disconnect();
287 warn!(context, "SMTP job is invalid: {err:#}.");
288 SendResult::Failure(err)
289 }
290 Err(crate::smtp::send::Error::NoTransport) => {
291 error!(context, "SMTP job failed because SMTP has no transport.");
294 SendResult::Failure(format_err!("SMTP has not transport"))
295 }
296 Err(crate::smtp::send::Error::Other(err)) => {
297 smtp.disconnect();
299 warn!(context, "Unable to load SMTP job: {err:#}.");
300 SendResult::Failure(err)
301 }
302 Ok(()) => SendResult::Success,
303 };
304
305 if let SendResult::Failure(err) = &status {
306 if let Some(msg_id) = msg_id {
307 match Message::load_from_db(context, msg_id).await {
309 Ok(mut msg) => {
310 if let Err(err) =
311 message::set_msg_failed(context, &mut msg, &err.to_string()).await
312 {
313 error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
314 }
315 }
316 Err(err) => {
317 error!(
318 context,
319 "Failed to load {msg_id} to mark it as failed: {err:#}."
320 );
321 }
322 }
323 }
324 }
325 status
326}
327
328pub(crate) async fn send_msg_to_smtp(
332 context: &Context,
333 smtp: &mut Smtp,
334 rowid: i64,
335) -> anyhow::Result<()> {
336 if let Err(err) = smtp
337 .connect_configured(context)
338 .await
339 .context("SMTP connection failure")
340 {
341 smtp.last_send_error = Some(format!("{err:#}"));
342 return Err(err);
343 }
344
345 context
350 .sql
351 .execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
352 .await
353 .context("failed to update retries count")?;
354
355 let Some((body, recipients, msg_id, retries)) = context
356 .sql
357 .query_row_optional(
358 "SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
359 (rowid,),
360 |row| {
361 let mime: String = row.get(0)?;
362 let recipients: String = row.get(1)?;
363 let msg_id: MsgId = row.get(2)?;
364 let retries: i64 = row.get(3)?;
365 Ok((mime, recipients, msg_id, retries))
366 },
367 )
368 .await?
369 else {
370 return Ok(());
371 };
372 if retries > 6 {
373 if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
374 message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.")
375 .await?;
376 }
377 context
378 .sql
379 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
380 .await
381 .context("Failed to remove message with exceeded retry limit from smtp table")?;
382 return Ok(());
383 }
384 info!(
385 context,
386 "Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP."
387 );
388
389 let recipients_list = recipients
390 .split(' ')
391 .filter_map(
392 |addr| match async_smtp::EmailAddress::new(addr.to_string()) {
393 Ok(addr) => Some(addr),
394 Err(err) => {
395 warn!(context, "Invalid recipient: {} {:?}.", addr, err);
396 None
397 }
398 },
399 )
400 .collect::<Vec<_>>();
401
402 let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
403
404 match status {
405 SendResult::Retry => {}
406 SendResult::Success => {
407 context
408 .sql
409 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
410 .await?;
411 }
412 SendResult::Failure(ref err) => {
413 if err.to_string().contains("Invalid unencrypted mail") {
414 let res = context
415 .sql
416 .query_row_optional(
417 "SELECT chat_id, timestamp FROM msgs WHERE id=?;",
418 (msg_id,),
419 |row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
420 )
421 .await?;
422
423 if let Some((chat_id, timestamp_sort)) = res {
424 let addr = context.get_config(Config::ConfiguredAddr).await?;
425 let text = unencrypted_email(
426 context,
427 addr.unwrap_or_default()
428 .split('@')
429 .nth(1)
430 .unwrap_or_default(),
431 )
432 .await;
433 add_info_msg_with_cmd(
434 context,
435 chat_id,
436 &text,
437 crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
438 timestamp_sort,
439 None,
440 None,
441 None,
442 None,
443 )
444 .await?;
445 };
446 }
447 context
448 .sql
449 .execute("DELETE FROM smtp WHERE id=?", (rowid,))
450 .await?;
451 }
452 };
453
454 match status {
455 SendResult::Retry => Err(format_err!("Retry")),
456 SendResult::Success => {
457 if !context
458 .sql
459 .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
460 .await?
461 {
462 msg_id.set_delivered(context).await?;
463 }
464 Ok(())
465 }
466 SendResult::Failure(err) => Err(format_err!("{}", err)),
467 }
468}
469
470async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
472 loop {
473 if !context.ratelimit.read().await.can_send() {
474 info!(context, "Ratelimiter does not allow sending MDNs now.");
475 return Ok(());
476 }
477
478 let more_mdns = send_mdn(context, connection).await?;
479 if !more_mdns {
480 return Ok(());
482 }
483 }
484}
485
486pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
488 let ratelimited = if context.ratelimit.read().await.can_send() {
489 context.flush_status_updates().await?;
491 false
492 } else {
493 true
494 };
495
496 let rowids = context
497 .sql
498 .query_map(
499 "SELECT id FROM smtp ORDER BY id ASC",
500 (),
501 |row| {
502 let rowid: i64 = row.get(0)?;
503 Ok(rowid)
504 },
505 |rowids| {
506 rowids
507 .collect::<std::result::Result<Vec<_>, _>>()
508 .map_err(Into::into)
509 },
510 )
511 .await?;
512
513 info!(context, "Selected rows from SMTP queue: {rowids:?}.");
514 for rowid in rowids {
515 send_msg_to_smtp(context, connection, rowid)
516 .await
517 .context("Failed to send message")?;
518 }
519
520 if !ratelimited {
524 send_mdns(context, connection)
525 .await
526 .context("Failed to send MDNs")?;
527 }
528 Ok(())
529}
530
531async fn send_mdn_rfc724_mid(
541 context: &Context,
542 rfc724_mid: &str,
543 contact_id: ContactId,
544 smtp: &mut Smtp,
545) -> Result<bool> {
546 let contact = Contact::get_by_id(context, contact_id).await?;
547 if contact.is_blocked() {
548 return Err(format_err!("Contact is blocked"));
549 }
550
551 let additional_rfc724_mids: Vec<String> = context
553 .sql
554 .query_map(
555 "SELECT rfc724_mid
556 FROM smtp_mdns
557 WHERE from_id=? AND rfc724_mid!=?",
558 (contact_id, &rfc724_mid),
559 |row| {
560 let rfc724_mid: String = row.get(0)?;
561 Ok(rfc724_mid)
562 },
563 |rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
564 )
565 .await?
566 .into_iter()
567 .collect();
568
569 let mimefactory = MimeFactory::from_mdn(
570 context,
571 contact_id,
572 rfc724_mid.to_string(),
573 additional_rfc724_mids.clone(),
574 )
575 .await?;
576 let rendered_msg = mimefactory.render(context).await?;
577 let body = rendered_msg.message;
578
579 let addr = contact.get_addr();
580 let recipient = async_smtp::EmailAddress::new(addr.to_string())
581 .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?;
582 let recipients = vec![recipient];
583
584 match smtp_send(context, &recipients, &body, smtp, None).await {
585 SendResult::Success => {
586 info!(context, "Successfully sent MDN for {rfc724_mid}.");
587 context
588 .sql
589 .transaction(|transaction| {
590 let mut stmt =
591 transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
592 stmt.execute((rfc724_mid,))?;
593 for additional_rfc724_mid in additional_rfc724_mids {
594 stmt.execute((additional_rfc724_mid,))?;
595 }
596 Ok(())
597 })
598 .await?;
599 Ok(true)
600 }
601 SendResult::Retry => {
602 info!(
603 context,
604 "Temporary SMTP failure while sending an MDN for {rfc724_mid}."
605 );
606 Ok(false)
607 }
608 SendResult::Failure(err) => Err(err),
609 }
610}
611
612async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
614 if !context.should_send_mdns().await? {
615 context.sql.execute("DELETE FROM smtp_mdns", []).await?;
616 return Ok(false);
617 }
618 info!(context, "Sending MDNs.");
619
620 context
621 .sql
622 .execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
623 .await?;
624 let Some(msg_row) = context
625 .sql
626 .query_row_optional(
627 "SELECT rfc724_mid, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
628 [],
629 |row| {
630 let rfc724_mid: String = row.get(0)?;
631 let from_id: ContactId = row.get(1)?;
632 Ok((rfc724_mid, from_id))
633 },
634 )
635 .await?
636 else {
637 return Ok(false);
638 };
639 let (rfc724_mid, contact_id) = msg_row;
640
641 context
642 .sql
643 .execute(
644 "UPDATE smtp_mdns SET retries=retries+1 WHERE rfc724_mid=?",
645 (rfc724_mid.clone(),),
646 )
647 .await
648 .context("Failed to update MDN retries count")?;
649
650 match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
651 Err(err) => {
652 warn!(
655 context,
656 "Error sending MDN for {rfc724_mid}, removing it: {err:#}."
657 );
658 context
659 .sql
660 .execute("DELETE FROM smtp_mdns WHERE rfc724_mid = ?", (rfc724_mid,))
661 .await?;
662 Err(err)
663 }
664 Ok(false) => {
665 bail!("Temporary error while sending an MDN");
666 }
667 Ok(true) => {
668 Ok(true)
670 }
671 }
672}