deltachat/
smtp.rs

1//! # SMTP transport module.
2
3mod connect;
4pub mod send;
5
6use anyhow::{bail, format_err, Context as _, Error, Result};
7use async_smtp::response::{Category, Code, Detail};
8use async_smtp::{EmailAddress, SmtpTransport};
9use tokio::task;
10
11use crate::chat::{add_info_msg_with_cmd, ChatId};
12use crate::config::Config;
13use crate::contact::{Contact, ContactId};
14use crate::context::Context;
15use crate::events::EventType;
16use crate::login_param::prioritize_server_login_params;
17use crate::login_param::{ConfiguredLoginParam, ConfiguredServerLoginParam};
18use crate::message::Message;
19use crate::message::{self, MsgId};
20use crate::mimefactory::MimeFactory;
21use crate::net::proxy::ProxyConfig;
22use crate::net::session::SessionBufStream;
23use crate::scheduler::connectivity::ConnectivityStore;
24use crate::stock_str::unencrypted_email;
25use crate::tools::{self, time_elapsed};
26
27#[derive(Default)]
28pub(crate) struct Smtp {
29    /// SMTP connection.
30    transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
31
32    /// Email address we are sending from.
33    from: Option<EmailAddress>,
34
35    /// Timestamp of last successful send/receive network interaction
36    /// (eg connect or send succeeded). On initialization and disconnect
37    /// it is set to None.
38    last_success: Option<tools::Time>,
39
40    pub(crate) connectivity: ConnectivityStore,
41
42    /// If sending the last message failed, contains the error message.
43    pub(crate) last_send_error: Option<String>,
44}
45
46impl Smtp {
47    /// Create a new Smtp instances.
48    pub fn new() -> Self {
49        Default::default()
50    }
51
52    /// Disconnect the SMTP transport and drop it entirely.
53    pub fn disconnect(&mut self) {
54        if let Some(mut transport) = self.transport.take() {
55            // Closing connection with a QUIT command may take some time, especially if it's a
56            // stale connection and an attempt to send the command times out. Send a command in a
57            // separate task to avoid waiting for reply or timeout.
58            task::spawn(async move { transport.quit().await });
59        }
60        self.last_success = None;
61    }
62
63    /// Return true if smtp was connected but is not known to
64    /// have been successfully used the last 60 seconds
65    pub fn has_maybe_stale_connection(&self) -> bool {
66        if let Some(last_success) = self.last_success {
67            time_elapsed(&last_success).as_secs() > 60
68        } else {
69            false
70        }
71    }
72
73    /// Check whether we are connected.
74    pub fn is_connected(&self) -> bool {
75        self.transport.is_some()
76    }
77
78    /// Connect using configured parameters.
79    pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
80        if self.has_maybe_stale_connection() {
81            info!(context, "Closing stale connection.");
82            self.disconnect();
83        }
84
85        if self.is_connected() {
86            return Ok(());
87        }
88
89        self.connectivity.set_connecting(context).await;
90        let lp = ConfiguredLoginParam::load(context)
91            .await?
92            .context("Not configured")?;
93        let proxy_config = ProxyConfig::load(context).await?;
94        self.connect(
95            context,
96            &lp.smtp,
97            &lp.smtp_password,
98            &proxy_config,
99            &lp.addr,
100            lp.strict_tls(proxy_config.is_some()),
101            lp.oauth2,
102        )
103        .await
104    }
105
106    /// Connect using the provided login params.
107    #[expect(clippy::too_many_arguments)]
108    pub async fn connect(
109        &mut self,
110        context: &Context,
111        login_params: &[ConfiguredServerLoginParam],
112        password: &str,
113        proxy_config: &Option<ProxyConfig>,
114        addr: &str,
115        strict_tls: bool,
116        oauth2: bool,
117    ) -> Result<()> {
118        if self.is_connected() {
119            warn!(context, "SMTP already connected.");
120            return Ok(());
121        }
122
123        let from = EmailAddress::new(addr.to_string())
124            .with_context(|| format!("Invalid address {addr:?}"))?;
125        self.from = Some(from);
126
127        let login_params =
128            prioritize_server_login_params(&context.sql, login_params, "smtp").await?;
129        let mut first_error = None;
130        for lp in login_params {
131            info!(context, "SMTP trying to connect to {}.", &lp.connection);
132            let transport = match connect::connect_and_auth(
133                context,
134                proxy_config,
135                strict_tls,
136                lp.connection.clone(),
137                oauth2,
138                addr,
139                &lp.user,
140                password,
141            )
142            .await
143            {
144                Ok(transport) => transport,
145                Err(err) => {
146                    warn!(context, "SMTP failed to connect and authenticate: {err:#}.");
147                    first_error.get_or_insert(err);
148                    continue;
149                }
150            };
151
152            self.transport = Some(transport);
153            self.last_success = Some(tools::Time::now());
154
155            context.emit_event(EventType::SmtpConnected(format!(
156                "SMTP-LOGIN as {} ok",
157                lp.user,
158            )));
159            return Ok(());
160        }
161
162        Err(first_error.unwrap_or_else(|| format_err!("No SMTP connection candidates provided")))
163    }
164}
165
166pub(crate) enum SendResult {
167    /// Message was sent successfully.
168    Success,
169
170    /// Permanent error, message sending has failed.
171    Failure(Error),
172
173    /// Temporary error, the message should be retried later.
174    Retry,
175}
176
177/// Tries to send a message.
178pub(crate) async fn smtp_send(
179    context: &Context,
180    recipients: &[async_smtp::EmailAddress],
181    message: &str,
182    smtp: &mut Smtp,
183    msg_id: Option<MsgId>,
184) -> SendResult {
185    if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
186        info!(context, "SMTP-sending out mime message:\n{message}");
187    }
188
189    smtp.connectivity.set_working(context).await;
190
191    if let Err(err) = smtp
192        .connect_configured(context)
193        .await
194        .context("Failed to open SMTP connection")
195    {
196        smtp.last_send_error = Some(format!("{err:#}"));
197        return SendResult::Retry;
198    }
199
200    let send_result = smtp.send(context, recipients, message.as_bytes()).await;
201    smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
202
203    let status = match send_result {
204        Err(crate::smtp::send::Error::SmtpSend(err)) => {
205            // Remote error, retry later.
206            info!(context, "SMTP failed to send: {:?}.", &err);
207
208            let res = match err {
209                async_smtp::error::Error::Permanent(ref response) => {
210                    // Workaround for incorrectly configured servers returning permanent errors
211                    // instead of temporary ones.
212                    let maybe_transient = match response.code {
213                        // Sometimes servers send a permanent error when actually it is a temporary error
214                        // For documentation see <https://tools.ietf.org/html/rfc3463>
215                        Code {
216                            category: Category::MailSystem,
217                            detail: Detail::Zero,
218                            ..
219                        } => {
220                            // Ignore status code 5.5.0, see <https://support.delta.chat/t/every-other-message-gets-stuck/877/2>
221                            // Maybe incorrectly configured Postfix milter with "reject" instead of "tempfail", which returns
222                            // "550 5.5.0 Service unavailable" instead of "451 4.7.1 Service unavailable - try again later".
223                            //
224                            // Other enhanced status codes, such as Postfix
225                            // "550 5.1.1 <foobar@example.org>: Recipient address rejected: User unknown in local recipient table"
226                            // are not ignored.
227                            response.first_word() == Some("5.5.0")
228                        }
229                        _ => false,
230                    };
231
232                    if maybe_transient {
233                        info!(context, "Permanent error that is likely to actually be transient, postponing retry for later.");
234                        SendResult::Retry
235                    } else {
236                        info!(context, "Permanent error, message sending failed.");
237                        // If we do not retry, add an info message to the chat.
238                        // Yandex error "554 5.7.1 [2] Message rejected under suspicion of SPAM; https://ya.cc/..."
239                        // should definitely go here, because user has to open the link to
240                        // resume message sending.
241                        SendResult::Failure(format_err!("Permanent SMTP error: {}", err))
242                    }
243                }
244                async_smtp::error::Error::Transient(ref response) => {
245                    // We got a transient 4xx response from SMTP server.
246                    // Give some time until the server-side error maybe goes away.
247                    //
248                    // One particular case is
249                    // `450 4.1.2 <alice@example.org>: Recipient address rejected: Domain not found`.
250                    // known to be returned by Postfix.
251                    //
252                    // [RFC 3463](https://tools.ietf.org/html/rfc3463#section-3.2)
253                    // says "This code is only useful for permanent failures."
254                    // in X.1.1, X.1.2 and X.1.3 descriptions.
255                    //
256                    // Previous Delta Chat core versions
257                    // from 1.51.0 to 1.151.1
258                    // were treating such errors as permanent.
259                    //
260                    // This was later reverted because such errors were observed
261                    // for existing domains and turned out to be actually transient,
262                    // likely caused by nameserver downtime.
263                    info!(
264                        context,
265                        "Transient error {response:?}, postponing retry for later."
266                    );
267                    SendResult::Retry
268                }
269                _ => {
270                    info!(
271                        context,
272                        "Message sending failed without error returned by the server, retry later."
273                    );
274                    SendResult::Retry
275                }
276            };
277
278            // this clears last_success info
279            info!(context, "Failed to send message over SMTP, disconnecting.");
280            smtp.disconnect();
281
282            res
283        }
284        Err(crate::smtp::send::Error::Envelope(err)) => {
285            // Local error, job is invalid, do not retry.
286            smtp.disconnect();
287            warn!(context, "SMTP job is invalid: {err:#}.");
288            SendResult::Failure(err)
289        }
290        Err(crate::smtp::send::Error::NoTransport) => {
291            // Should never happen.
292            // It does not even make sense to disconnect here.
293            error!(context, "SMTP job failed because SMTP has no transport.");
294            SendResult::Failure(format_err!("SMTP has not transport"))
295        }
296        Err(crate::smtp::send::Error::Other(err)) => {
297            // Local error, job is invalid, do not retry.
298            smtp.disconnect();
299            warn!(context, "Unable to load SMTP job: {err:#}.");
300            SendResult::Failure(err)
301        }
302        Ok(()) => SendResult::Success,
303    };
304
305    if let SendResult::Failure(err) = &status {
306        if let Some(msg_id) = msg_id {
307            // We couldn't send the message, so mark it as failed
308            match Message::load_from_db(context, msg_id).await {
309                Ok(mut msg) => {
310                    if let Err(err) =
311                        message::set_msg_failed(context, &mut msg, &err.to_string()).await
312                    {
313                        error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
314                    }
315                }
316                Err(err) => {
317                    error!(
318                        context,
319                        "Failed to load {msg_id} to mark it as failed: {err:#}."
320                    );
321                }
322            }
323        }
324    }
325    status
326}
327
328/// Sends message identified by `smtp` table rowid over SMTP connection.
329///
330/// Removes row if the message should not be retried, otherwise increments retry count.
331pub(crate) async fn send_msg_to_smtp(
332    context: &Context,
333    smtp: &mut Smtp,
334    rowid: i64,
335) -> anyhow::Result<()> {
336    if let Err(err) = smtp
337        .connect_configured(context)
338        .await
339        .context("SMTP connection failure")
340    {
341        smtp.last_send_error = Some(format!("{err:#}"));
342        return Err(err);
343    }
344
345    // Increase retry count as soon as we have an SMTP connection. This ensures that the message is
346    // eventually removed from the queue by exceeding retry limit even in case of an error that
347    // keeps happening early in the message sending code, e.g. failure to read the message from the
348    // database.
349    context
350        .sql
351        .execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
352        .await
353        .context("failed to update retries count")?;
354
355    let Some((body, recipients, msg_id, retries)) = context
356        .sql
357        .query_row_optional(
358            "SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
359            (rowid,),
360            |row| {
361                let mime: String = row.get(0)?;
362                let recipients: String = row.get(1)?;
363                let msg_id: MsgId = row.get(2)?;
364                let retries: i64 = row.get(3)?;
365                Ok((mime, recipients, msg_id, retries))
366            },
367        )
368        .await?
369    else {
370        return Ok(());
371    };
372    if retries > 6 {
373        if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
374            message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.")
375                .await?;
376        }
377        context
378            .sql
379            .execute("DELETE FROM smtp WHERE id=?", (rowid,))
380            .await
381            .context("Failed to remove message with exceeded retry limit from smtp table")?;
382        return Ok(());
383    }
384    info!(
385        context,
386        "Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP."
387    );
388
389    let recipients_list = recipients
390        .split(' ')
391        .filter_map(
392            |addr| match async_smtp::EmailAddress::new(addr.to_string()) {
393                Ok(addr) => Some(addr),
394                Err(err) => {
395                    warn!(context, "Invalid recipient: {} {:?}.", addr, err);
396                    None
397                }
398            },
399        )
400        .collect::<Vec<_>>();
401
402    let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
403
404    match status {
405        SendResult::Retry => {}
406        SendResult::Success => {
407            context
408                .sql
409                .execute("DELETE FROM smtp WHERE id=?", (rowid,))
410                .await?;
411        }
412        SendResult::Failure(ref err) => {
413            if err.to_string().contains("Invalid unencrypted mail") {
414                let res = context
415                    .sql
416                    .query_row_optional(
417                        "SELECT chat_id, timestamp FROM msgs WHERE id=?;",
418                        (msg_id,),
419                        |row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
420                    )
421                    .await?;
422
423                if let Some((chat_id, timestamp_sort)) = res {
424                    let addr = context.get_config(Config::ConfiguredAddr).await?;
425                    let text = unencrypted_email(
426                        context,
427                        addr.unwrap_or_default()
428                            .split('@')
429                            .nth(1)
430                            .unwrap_or_default(),
431                    )
432                    .await;
433                    add_info_msg_with_cmd(
434                        context,
435                        chat_id,
436                        &text,
437                        crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
438                        timestamp_sort,
439                        None,
440                        None,
441                        None,
442                        None,
443                    )
444                    .await?;
445                };
446            }
447            context
448                .sql
449                .execute("DELETE FROM smtp WHERE id=?", (rowid,))
450                .await?;
451        }
452    };
453
454    match status {
455        SendResult::Retry => Err(format_err!("Retry")),
456        SendResult::Success => {
457            if !context
458                .sql
459                .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
460                .await?
461            {
462                msg_id.set_delivered(context).await?;
463            }
464            Ok(())
465        }
466        SendResult::Failure(err) => Err(format_err!("{}", err)),
467    }
468}
469
470/// Attempts to send queued MDNs.
471async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
472    loop {
473        if !context.ratelimit.read().await.can_send() {
474            info!(context, "Ratelimiter does not allow sending MDNs now.");
475            return Ok(());
476        }
477
478        let more_mdns = send_mdn(context, connection).await?;
479        if !more_mdns {
480            // No more MDNs to send or one of them failed.
481            return Ok(());
482        }
483    }
484}
485
486/// Tries to send all messages currently in `smtp`, `smtp_status_updates` and `smtp_mdns` tables.
487pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
488    let ratelimited = if context.ratelimit.read().await.can_send() {
489        // add status updates and sync messages to end of sending queue
490        context.flush_status_updates().await?;
491        false
492    } else {
493        true
494    };
495
496    let rowids = context
497        .sql
498        .query_map(
499            "SELECT id FROM smtp ORDER BY id ASC",
500            (),
501            |row| {
502                let rowid: i64 = row.get(0)?;
503                Ok(rowid)
504            },
505            |rowids| {
506                rowids
507                    .collect::<std::result::Result<Vec<_>, _>>()
508                    .map_err(Into::into)
509            },
510        )
511        .await?;
512
513    info!(context, "Selected rows from SMTP queue: {rowids:?}.");
514    for rowid in rowids {
515        send_msg_to_smtp(context, connection, rowid)
516            .await
517            .context("Failed to send message")?;
518    }
519
520    // although by slow sending, ratelimit may have been expired meanwhile,
521    // do not attempt to send MDNs if ratelimited happened before on status-updates/sync:
522    // instead, let the caller recall this function so that more important status-updates/sync are sent out.
523    if !ratelimited {
524        send_mdns(context, connection)
525            .await
526            .context("Failed to send MDNs")?;
527    }
528    Ok(())
529}
530
531/// Tries to send MDN for message identified by `rfc724_mdn` to `contact_id`.
532///
533/// Attempts to aggregate additional MDNs for `contact_id` into sent MDN.
534///
535/// On failure returns an error without removing any `smtp_mdns` entries, the caller is responsible
536/// for removing the corresponding entry to prevent endless loop in case the entry is invalid, e.g.
537/// points to non-existent message or contact.
538///
539/// Returns true on success, false on temporary error.
540async fn send_mdn_rfc724_mid(
541    context: &Context,
542    rfc724_mid: &str,
543    contact_id: ContactId,
544    smtp: &mut Smtp,
545) -> Result<bool> {
546    let contact = Contact::get_by_id(context, contact_id).await?;
547    if contact.is_blocked() {
548        return Err(format_err!("Contact is blocked"));
549    }
550
551    // Try to aggregate additional MDNs into this MDN.
552    let additional_rfc724_mids: Vec<String> = context
553        .sql
554        .query_map(
555            "SELECT rfc724_mid
556             FROM smtp_mdns
557             WHERE from_id=? AND rfc724_mid!=?",
558            (contact_id, &rfc724_mid),
559            |row| {
560                let rfc724_mid: String = row.get(0)?;
561                Ok(rfc724_mid)
562            },
563            |rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
564        )
565        .await?
566        .into_iter()
567        .collect();
568
569    let mimefactory = MimeFactory::from_mdn(
570        context,
571        contact_id,
572        rfc724_mid.to_string(),
573        additional_rfc724_mids.clone(),
574    )
575    .await?;
576    let rendered_msg = mimefactory.render(context).await?;
577    let body = rendered_msg.message;
578
579    let addr = contact.get_addr();
580    let recipient = async_smtp::EmailAddress::new(addr.to_string())
581        .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?;
582    let recipients = vec![recipient];
583
584    match smtp_send(context, &recipients, &body, smtp, None).await {
585        SendResult::Success => {
586            info!(context, "Successfully sent MDN for {rfc724_mid}.");
587            context
588                .sql
589                .transaction(|transaction| {
590                    let mut stmt =
591                        transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
592                    stmt.execute((rfc724_mid,))?;
593                    for additional_rfc724_mid in additional_rfc724_mids {
594                        stmt.execute((additional_rfc724_mid,))?;
595                    }
596                    Ok(())
597                })
598                .await?;
599            Ok(true)
600        }
601        SendResult::Retry => {
602            info!(
603                context,
604                "Temporary SMTP failure while sending an MDN for {rfc724_mid}."
605            );
606            Ok(false)
607        }
608        SendResult::Failure(err) => Err(err),
609    }
610}
611
612/// Tries to send a single MDN. Returns true if more MDNs should be sent.
613async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
614    if !context.should_send_mdns().await? {
615        context.sql.execute("DELETE FROM smtp_mdns", []).await?;
616        return Ok(false);
617    }
618    info!(context, "Sending MDNs.");
619
620    context
621        .sql
622        .execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
623        .await?;
624    let Some(msg_row) = context
625        .sql
626        .query_row_optional(
627            "SELECT rfc724_mid, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
628            [],
629            |row| {
630                let rfc724_mid: String = row.get(0)?;
631                let from_id: ContactId = row.get(1)?;
632                Ok((rfc724_mid, from_id))
633            },
634        )
635        .await?
636    else {
637        return Ok(false);
638    };
639    let (rfc724_mid, contact_id) = msg_row;
640
641    context
642        .sql
643        .execute(
644            "UPDATE smtp_mdns SET retries=retries+1 WHERE rfc724_mid=?",
645            (rfc724_mid.clone(),),
646        )
647        .await
648        .context("Failed to update MDN retries count")?;
649
650    match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
651        Err(err) => {
652            // If there is an error, for example there is no message corresponding to the msg_id in the
653            // database, do not try to send this MDN again.
654            warn!(
655                context,
656                "Error sending MDN for {rfc724_mid}, removing it: {err:#}."
657            );
658            context
659                .sql
660                .execute("DELETE FROM smtp_mdns WHERE rfc724_mid = ?", (rfc724_mid,))
661                .await?;
662            Err(err)
663        }
664        Ok(false) => {
665            bail!("Temporary error while sending an MDN");
666        }
667        Ok(true) => {
668            // Successfully sent MDN.
669            Ok(true)
670        }
671    }
672}