deltachat/
smtp.rs

1//! # SMTP transport module.
2
3mod connect;
4pub mod send;
5
6use anyhow::{Context as _, Error, Result, bail, format_err};
7use async_smtp::response::{Category, Code, Detail};
8use async_smtp::{EmailAddress, SmtpTransport};
9use tokio::task;
10
11use crate::chat::{ChatId, add_info_msg_with_cmd};
12use crate::config::Config;
13use crate::contact::{Contact, ContactId};
14use crate::context::Context;
15use crate::events::EventType;
16use crate::log::{error, info, warn};
17use crate::login_param::prioritize_server_login_params;
18use crate::login_param::{ConfiguredLoginParam, ConfiguredServerLoginParam};
19use crate::message::Message;
20use crate::message::{self, MsgId};
21use crate::mimefactory::MimeFactory;
22use crate::net::proxy::ProxyConfig;
23use crate::net::session::SessionBufStream;
24use crate::scheduler::connectivity::ConnectivityStore;
25use crate::stock_str::unencrypted_email;
26use crate::tools::{self, time_elapsed};
27
28#[derive(Default)]
29pub(crate) struct Smtp {
30    /// SMTP connection.
31    transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
32
33    /// Email address we are sending from.
34    from: Option<EmailAddress>,
35
36    /// Timestamp of last successful send/receive network interaction
37    /// (eg connect or send succeeded). On initialization and disconnect
38    /// it is set to None.
39    last_success: Option<tools::Time>,
40
41    pub(crate) connectivity: ConnectivityStore,
42
43    /// If sending the last message failed, contains the error message.
44    pub(crate) last_send_error: Option<String>,
45}
46
47impl Smtp {
48    /// Create a new Smtp instances.
49    pub fn new() -> Self {
50        Default::default()
51    }
52
53    /// Disconnect the SMTP transport and drop it entirely.
54    pub fn disconnect(&mut self) {
55        if let Some(mut transport) = self.transport.take() {
56            // Closing connection with a QUIT command may take some time, especially if it's a
57            // stale connection and an attempt to send the command times out. Send a command in a
58            // separate task to avoid waiting for reply or timeout.
59            task::spawn(async move { transport.quit().await });
60        }
61        self.last_success = None;
62    }
63
64    /// Return true if smtp was connected but is not known to
65    /// have been successfully used the last 60 seconds
66    pub fn has_maybe_stale_connection(&self) -> bool {
67        if let Some(last_success) = self.last_success {
68            time_elapsed(&last_success).as_secs() > 60
69        } else {
70            false
71        }
72    }
73
74    /// Check whether we are connected.
75    pub fn is_connected(&self) -> bool {
76        self.transport.is_some()
77    }
78
79    /// Connect using configured parameters.
80    pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
81        if self.has_maybe_stale_connection() {
82            info!(context, "Closing stale connection.");
83            self.disconnect();
84        }
85
86        if self.is_connected() {
87            return Ok(());
88        }
89
90        self.connectivity.set_connecting(context).await;
91        let lp = ConfiguredLoginParam::load(context)
92            .await?
93            .context("Not configured")?;
94        let proxy_config = ProxyConfig::load(context).await?;
95        self.connect(
96            context,
97            &lp.smtp,
98            &lp.smtp_password,
99            &proxy_config,
100            &lp.addr,
101            lp.strict_tls(proxy_config.is_some()),
102            lp.oauth2,
103        )
104        .await
105    }
106
107    /// Connect using the provided login params.
108    #[expect(clippy::too_many_arguments)]
109    pub async fn connect(
110        &mut self,
111        context: &Context,
112        login_params: &[ConfiguredServerLoginParam],
113        password: &str,
114        proxy_config: &Option<ProxyConfig>,
115        addr: &str,
116        strict_tls: bool,
117        oauth2: bool,
118    ) -> Result<()> {
119        if self.is_connected() {
120            warn!(context, "SMTP already connected.");
121            return Ok(());
122        }
123
124        let from = EmailAddress::new(addr.to_string())
125            .with_context(|| format!("Invalid address {addr:?}"))?;
126        self.from = Some(from);
127
128        let login_params =
129            prioritize_server_login_params(&context.sql, login_params, "smtp").await?;
130        let mut first_error = None;
131        for lp in login_params {
132            info!(context, "SMTP trying to connect to {}.", &lp.connection);
133            let transport = match connect::connect_and_auth(
134                context,
135                proxy_config,
136                strict_tls,
137                lp.connection.clone(),
138                oauth2,
139                addr,
140                &lp.user,
141                password,
142            )
143            .await
144            {
145                Ok(transport) => transport,
146                Err(err) => {
147                    warn!(context, "SMTP failed to connect and authenticate: {err:#}.");
148                    first_error.get_or_insert(err);
149                    continue;
150                }
151            };
152
153            self.transport = Some(transport);
154            self.last_success = Some(tools::Time::now());
155
156            context.emit_event(EventType::SmtpConnected(format!(
157                "SMTP-LOGIN as {} ok",
158                lp.user,
159            )));
160            return Ok(());
161        }
162
163        Err(first_error.unwrap_or_else(|| format_err!("No SMTP connection candidates provided")))
164    }
165}
166
167pub(crate) enum SendResult {
168    /// Message was sent successfully.
169    Success,
170
171    /// Permanent error, message sending has failed.
172    Failure(Error),
173
174    /// Temporary error, the message should be retried later.
175    Retry,
176}
177
178/// Tries to send a message.
179pub(crate) async fn smtp_send(
180    context: &Context,
181    recipients: &[async_smtp::EmailAddress],
182    message: &str,
183    smtp: &mut Smtp,
184    msg_id: Option<MsgId>,
185) -> SendResult {
186    if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
187        info!(context, "SMTP-sending out mime message:\n{message}");
188    }
189
190    smtp.connectivity.set_working(context).await;
191
192    if let Err(err) = smtp
193        .connect_configured(context)
194        .await
195        .context("Failed to open SMTP connection")
196    {
197        smtp.last_send_error = Some(format!("{err:#}"));
198        return SendResult::Retry;
199    }
200
201    let send_result = smtp.send(context, recipients, message.as_bytes()).await;
202    smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
203
204    let status = match send_result {
205        Err(crate::smtp::send::Error::SmtpSend(err)) => {
206            // Remote error, retry later.
207            info!(context, "SMTP failed to send: {:?}.", &err);
208
209            let res = match err {
210                async_smtp::error::Error::Permanent(ref response) => {
211                    // Workaround for incorrectly configured servers returning permanent errors
212                    // instead of temporary ones.
213                    let maybe_transient = match response.code {
214                        // Sometimes servers send a permanent error when actually it is a temporary error
215                        // For documentation see <https://tools.ietf.org/html/rfc3463>
216                        Code {
217                            category: Category::MailSystem,
218                            detail: Detail::Zero,
219                            ..
220                        } => {
221                            // Ignore status code 5.5.0, see <https://support.delta.chat/t/every-other-message-gets-stuck/877/2>
222                            // Maybe incorrectly configured Postfix milter with "reject" instead of "tempfail", which returns
223                            // "550 5.5.0 Service unavailable" instead of "451 4.7.1 Service unavailable - try again later".
224                            //
225                            // Other enhanced status codes, such as Postfix
226                            // "550 5.1.1 <foobar@example.org>: Recipient address rejected: User unknown in local recipient table"
227                            // are not ignored.
228                            response.first_word() == Some("5.5.0")
229                        }
230                        _ => false,
231                    };
232
233                    if maybe_transient {
234                        info!(
235                            context,
236                            "Permanent error that is likely to actually be transient, postponing retry for later."
237                        );
238                        SendResult::Retry
239                    } else {
240                        info!(context, "Permanent error, message sending failed.");
241                        // If we do not retry, add an info message to the chat.
242                        // Yandex error "554 5.7.1 [2] Message rejected under suspicion of SPAM; https://ya.cc/..."
243                        // should definitely go here, because user has to open the link to
244                        // resume message sending.
245                        SendResult::Failure(format_err!("Permanent SMTP error: {}", err))
246                    }
247                }
248                async_smtp::error::Error::Transient(ref response) => {
249                    // We got a transient 4xx response from SMTP server.
250                    // Give some time until the server-side error maybe goes away.
251                    //
252                    // One particular case is
253                    // `450 4.1.2 <alice@example.org>: Recipient address rejected: Domain not found`.
254                    // known to be returned by Postfix.
255                    //
256                    // [RFC 3463](https://tools.ietf.org/html/rfc3463#section-3.2)
257                    // says "This code is only useful for permanent failures."
258                    // in X.1.1, X.1.2 and X.1.3 descriptions.
259                    //
260                    // Previous Delta Chat core versions
261                    // from 1.51.0 to 1.151.1
262                    // were treating such errors as permanent.
263                    //
264                    // This was later reverted because such errors were observed
265                    // for existing domains and turned out to be actually transient,
266                    // likely caused by nameserver downtime.
267                    info!(
268                        context,
269                        "Transient error {response:?}, postponing retry for later."
270                    );
271                    SendResult::Retry
272                }
273                _ => {
274                    info!(
275                        context,
276                        "Message sending failed without error returned by the server, retry later."
277                    );
278                    SendResult::Retry
279                }
280            };
281
282            // this clears last_success info
283            info!(context, "Failed to send message over SMTP, disconnecting.");
284            smtp.disconnect();
285
286            res
287        }
288        Err(crate::smtp::send::Error::Envelope(err)) => {
289            // Local error, job is invalid, do not retry.
290            smtp.disconnect();
291            warn!(context, "SMTP job is invalid: {err:#}.");
292            SendResult::Failure(err)
293        }
294        Err(crate::smtp::send::Error::NoTransport) => {
295            // Should never happen.
296            // It does not even make sense to disconnect here.
297            error!(context, "SMTP job failed because SMTP has no transport.");
298            SendResult::Failure(format_err!("SMTP has not transport"))
299        }
300        Err(crate::smtp::send::Error::Other(err)) => {
301            // Local error, job is invalid, do not retry.
302            smtp.disconnect();
303            warn!(context, "Unable to load SMTP job: {err:#}.");
304            SendResult::Failure(err)
305        }
306        Ok(()) => SendResult::Success,
307    };
308
309    if let SendResult::Failure(err) = &status {
310        if let Some(msg_id) = msg_id {
311            // We couldn't send the message, so mark it as failed
312            match Message::load_from_db(context, msg_id).await {
313                Ok(mut msg) => {
314                    if let Err(err) =
315                        message::set_msg_failed(context, &mut msg, &err.to_string()).await
316                    {
317                        error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
318                    }
319                }
320                Err(err) => {
321                    error!(
322                        context,
323                        "Failed to load {msg_id} to mark it as failed: {err:#}."
324                    );
325                }
326            }
327        }
328    }
329    status
330}
331
332/// Sends message identified by `smtp` table rowid over SMTP connection.
333///
334/// Removes row if the message should not be retried, otherwise increments retry count.
335pub(crate) async fn send_msg_to_smtp(
336    context: &Context,
337    smtp: &mut Smtp,
338    rowid: i64,
339) -> anyhow::Result<()> {
340    if let Err(err) = smtp
341        .connect_configured(context)
342        .await
343        .context("SMTP connection failure")
344    {
345        smtp.last_send_error = Some(format!("{err:#}"));
346        return Err(err);
347    }
348
349    // Increase retry count as soon as we have an SMTP connection. This ensures that the message is
350    // eventually removed from the queue by exceeding retry limit even in case of an error that
351    // keeps happening early in the message sending code, e.g. failure to read the message from the
352    // database.
353    context
354        .sql
355        .execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
356        .await
357        .context("failed to update retries count")?;
358
359    let Some((body, recipients, msg_id, retries)) = context
360        .sql
361        .query_row_optional(
362            "SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
363            (rowid,),
364            |row| {
365                let mime: String = row.get(0)?;
366                let recipients: String = row.get(1)?;
367                let msg_id: MsgId = row.get(2)?;
368                let retries: i64 = row.get(3)?;
369                Ok((mime, recipients, msg_id, retries))
370            },
371        )
372        .await?
373    else {
374        return Ok(());
375    };
376    if retries > 6 {
377        if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
378            message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.")
379                .await?;
380        }
381        context
382            .sql
383            .execute("DELETE FROM smtp WHERE id=?", (rowid,))
384            .await
385            .context("Failed to remove message with exceeded retry limit from smtp table")?;
386        return Ok(());
387    }
388    info!(
389        context,
390        "Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP."
391    );
392
393    let recipients_list = recipients
394        .split(' ')
395        .filter_map(
396            |addr| match async_smtp::EmailAddress::new(addr.to_string()) {
397                Ok(addr) => Some(addr),
398                Err(err) => {
399                    warn!(context, "Invalid recipient: {} {:?}.", addr, err);
400                    None
401                }
402            },
403        )
404        .collect::<Vec<_>>();
405
406    let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
407
408    match status {
409        SendResult::Retry => {}
410        SendResult::Success => {
411            context
412                .sql
413                .execute("DELETE FROM smtp WHERE id=?", (rowid,))
414                .await?;
415        }
416        SendResult::Failure(ref err) => {
417            if err.to_string().contains("Invalid unencrypted mail") {
418                let res = context
419                    .sql
420                    .query_row_optional(
421                        "SELECT chat_id, timestamp FROM msgs WHERE id=?;",
422                        (msg_id,),
423                        |row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
424                    )
425                    .await?;
426
427                if let Some((chat_id, timestamp_sort)) = res {
428                    let addr = context.get_config(Config::ConfiguredAddr).await?;
429                    let text = unencrypted_email(
430                        context,
431                        addr.unwrap_or_default()
432                            .split('@')
433                            .nth(1)
434                            .unwrap_or_default(),
435                    )
436                    .await;
437                    add_info_msg_with_cmd(
438                        context,
439                        chat_id,
440                        &text,
441                        crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
442                        timestamp_sort,
443                        None,
444                        None,
445                        None,
446                        None,
447                    )
448                    .await?;
449                };
450            }
451            context
452                .sql
453                .execute("DELETE FROM smtp WHERE id=?", (rowid,))
454                .await?;
455        }
456    };
457
458    match status {
459        SendResult::Retry => Err(format_err!("Retry")),
460        SendResult::Success => {
461            if !context
462                .sql
463                .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
464                .await?
465            {
466                msg_id.set_delivered(context).await?;
467            }
468            Ok(())
469        }
470        SendResult::Failure(err) => Err(format_err!("{}", err)),
471    }
472}
473
474/// Attempts to send queued MDNs.
475async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
476    loop {
477        if !context.ratelimit.read().await.can_send() {
478            info!(context, "Ratelimiter does not allow sending MDNs now.");
479            return Ok(());
480        }
481
482        let more_mdns = send_mdn(context, connection).await?;
483        if !more_mdns {
484            // No more MDNs to send or one of them failed.
485            return Ok(());
486        }
487    }
488}
489
490/// Tries to send all messages currently in `smtp`, `smtp_status_updates` and `smtp_mdns` tables.
491pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
492    let ratelimited = if context.ratelimit.read().await.can_send() {
493        // add status updates and sync messages to end of sending queue
494        context.flush_status_updates().await?;
495        false
496    } else {
497        true
498    };
499
500    let rowids = context
501        .sql
502        .query_map(
503            "SELECT id FROM smtp ORDER BY id ASC",
504            (),
505            |row| {
506                let rowid: i64 = row.get(0)?;
507                Ok(rowid)
508            },
509            |rowids| {
510                rowids
511                    .collect::<std::result::Result<Vec<_>, _>>()
512                    .map_err(Into::into)
513            },
514        )
515        .await?;
516
517    info!(context, "Selected rows from SMTP queue: {rowids:?}.");
518    for rowid in rowids {
519        send_msg_to_smtp(context, connection, rowid)
520            .await
521            .context("Failed to send message")?;
522    }
523
524    // although by slow sending, ratelimit may have been expired meanwhile,
525    // do not attempt to send MDNs if ratelimited happened before on status-updates/sync:
526    // instead, let the caller recall this function so that more important status-updates/sync are sent out.
527    if !ratelimited {
528        send_mdns(context, connection)
529            .await
530            .context("Failed to send MDNs")?;
531    }
532    Ok(())
533}
534
535/// Tries to send MDN for message identified by `rfc724_mdn` to `contact_id`.
536///
537/// Attempts to aggregate additional MDNs for `contact_id` into sent MDN.
538///
539/// On failure returns an error without removing any `smtp_mdns` entries, the caller is responsible
540/// for removing the corresponding entry to prevent endless loop in case the entry is invalid, e.g.
541/// points to non-existent message or contact.
542///
543/// Returns true on success, false on temporary error.
544async fn send_mdn_rfc724_mid(
545    context: &Context,
546    rfc724_mid: &str,
547    contact_id: ContactId,
548    smtp: &mut Smtp,
549) -> Result<bool> {
550    let contact = Contact::get_by_id(context, contact_id).await?;
551    if contact.is_blocked() {
552        return Err(format_err!("Contact is blocked"));
553    }
554
555    // Try to aggregate additional MDNs into this MDN.
556    let additional_rfc724_mids: Vec<String> = context
557        .sql
558        .query_map(
559            "SELECT rfc724_mid
560             FROM smtp_mdns
561             WHERE from_id=? AND rfc724_mid!=?",
562            (contact_id, &rfc724_mid),
563            |row| {
564                let rfc724_mid: String = row.get(0)?;
565                Ok(rfc724_mid)
566            },
567            |rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
568        )
569        .await?
570        .into_iter()
571        .collect();
572
573    let mimefactory = MimeFactory::from_mdn(
574        context,
575        contact_id,
576        rfc724_mid.to_string(),
577        additional_rfc724_mids.clone(),
578    )
579    .await?;
580    let rendered_msg = mimefactory.render(context).await?;
581    let body = rendered_msg.message;
582
583    let addr = contact.get_addr();
584    let recipient = async_smtp::EmailAddress::new(addr.to_string())
585        .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?;
586    let recipients = vec![recipient];
587
588    match smtp_send(context, &recipients, &body, smtp, None).await {
589        SendResult::Success => {
590            info!(context, "Successfully sent MDN for {rfc724_mid}.");
591            context
592                .sql
593                .transaction(|transaction| {
594                    let mut stmt =
595                        transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
596                    stmt.execute((rfc724_mid,))?;
597                    for additional_rfc724_mid in additional_rfc724_mids {
598                        stmt.execute((additional_rfc724_mid,))?;
599                    }
600                    Ok(())
601                })
602                .await?;
603            Ok(true)
604        }
605        SendResult::Retry => {
606            info!(
607                context,
608                "Temporary SMTP failure while sending an MDN for {rfc724_mid}."
609            );
610            Ok(false)
611        }
612        SendResult::Failure(err) => Err(err),
613    }
614}
615
616/// Tries to send a single MDN. Returns true if more MDNs should be sent.
617async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
618    if !context.should_send_mdns().await? {
619        context.sql.execute("DELETE FROM smtp_mdns", []).await?;
620        return Ok(false);
621    }
622    info!(context, "Sending MDNs.");
623
624    context
625        .sql
626        .execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
627        .await?;
628    let Some(msg_row) = context
629        .sql
630        .query_row_optional(
631            "SELECT rfc724_mid, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
632            [],
633            |row| {
634                let rfc724_mid: String = row.get(0)?;
635                let from_id: ContactId = row.get(1)?;
636                Ok((rfc724_mid, from_id))
637            },
638        )
639        .await?
640    else {
641        return Ok(false);
642    };
643    let (rfc724_mid, contact_id) = msg_row;
644
645    context
646        .sql
647        .execute(
648            "UPDATE smtp_mdns SET retries=retries+1 WHERE rfc724_mid=?",
649            (rfc724_mid.clone(),),
650        )
651        .await
652        .context("Failed to update MDN retries count")?;
653
654    match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
655        Err(err) => {
656            // If there is an error, for example there is no message corresponding to the msg_id in the
657            // database, do not try to send this MDN again.
658            warn!(
659                context,
660                "Error sending MDN for {rfc724_mid}, removing it: {err:#}."
661            );
662            context
663                .sql
664                .execute("DELETE FROM smtp_mdns WHERE rfc724_mid = ?", (rfc724_mid,))
665                .await?;
666            Err(err)
667        }
668        Ok(false) => {
669            bail!("Temporary error while sending an MDN");
670        }
671        Ok(true) => {
672            // Successfully sent MDN.
673            Ok(true)
674        }
675    }
676}