deltachat/
smtp.rs

1//! # SMTP transport module.
2
3mod connect;
4pub mod send;
5
6use anyhow::{Context as _, Error, Result, bail, format_err};
7use async_smtp::response::{Category, Code, Detail};
8use async_smtp::{EmailAddress, SmtpTransport};
9use tokio::task;
10
11use crate::chat::{ChatId, add_info_msg_with_cmd};
12use crate::config::Config;
13use crate::contact::{Contact, ContactId};
14use crate::context::Context;
15use crate::events::EventType;
16use crate::log::{LogExt, warn};
17use crate::message::Message;
18use crate::message::{self, MsgId};
19use crate::mimefactory::MimeFactory;
20use crate::net::proxy::ProxyConfig;
21use crate::net::session::SessionBufStream;
22use crate::scheduler::connectivity::ConnectivityStore;
23use crate::stock_str::unencrypted_email;
24use crate::tools::{self, time_elapsed};
25use crate::transport::{
26    ConfiguredLoginParam, ConfiguredServerLoginParam, prioritize_server_login_params,
27};
28
29#[derive(Default)]
30pub(crate) struct Smtp {
31    /// SMTP connection.
32    transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
33
34    /// Email address we are sending from.
35    from: Option<EmailAddress>,
36
37    /// Timestamp of last successful send/receive network interaction
38    /// (eg connect or send succeeded). On initialization and disconnect
39    /// it is set to None.
40    last_success: Option<tools::Time>,
41
42    pub(crate) connectivity: ConnectivityStore,
43
44    /// If sending the last message failed, contains the error message.
45    pub(crate) last_send_error: Option<String>,
46}
47
48impl Smtp {
49    /// Create a new Smtp instances.
50    pub fn new() -> Self {
51        Default::default()
52    }
53
54    /// Disconnect the SMTP transport and drop it entirely.
55    pub fn disconnect(&mut self) {
56        if let Some(mut transport) = self.transport.take() {
57            // Closing connection with a QUIT command may take some time, especially if it's a
58            // stale connection and an attempt to send the command times out. Send a command in a
59            // separate task to avoid waiting for reply or timeout.
60            task::spawn(async move { transport.quit().await });
61        }
62        self.last_success = None;
63    }
64
65    /// Return true if smtp was connected but is not known to
66    /// have been successfully used the last 60 seconds
67    pub fn has_maybe_stale_connection(&self) -> bool {
68        if let Some(last_success) = self.last_success {
69            time_elapsed(&last_success).as_secs() > 60
70        } else {
71            false
72        }
73    }
74
75    /// Check whether we are connected.
76    pub fn is_connected(&self) -> bool {
77        self.transport.is_some()
78    }
79
80    /// Connect using configured parameters.
81    pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
82        if self.has_maybe_stale_connection() {
83            info!(context, "Closing stale connection.");
84            self.disconnect();
85        }
86
87        if self.is_connected() {
88            return Ok(());
89        }
90
91        self.connectivity.set_connecting(context);
92        let (_transport_id, lp) = ConfiguredLoginParam::load(context)
93            .await?
94            .context("Not configured")?;
95        let proxy_config = ProxyConfig::load(context).await?;
96        self.connect(
97            context,
98            &lp.smtp,
99            &lp.smtp_password,
100            &proxy_config,
101            &lp.addr,
102            lp.strict_tls(proxy_config.is_some()),
103            lp.oauth2,
104        )
105        .await
106    }
107
108    /// Connect using the provided login params.
109    #[expect(clippy::too_many_arguments)]
110    pub async fn connect(
111        &mut self,
112        context: &Context,
113        login_params: &[ConfiguredServerLoginParam],
114        password: &str,
115        proxy_config: &Option<ProxyConfig>,
116        addr: &str,
117        strict_tls: bool,
118        oauth2: bool,
119    ) -> Result<()> {
120        if self.is_connected() {
121            warn!(context, "SMTP already connected.");
122            return Ok(());
123        }
124
125        let from = EmailAddress::new(addr.to_string())
126            .with_context(|| format!("Invalid address {addr:?}"))?;
127        self.from = Some(from);
128
129        let login_params =
130            prioritize_server_login_params(&context.sql, login_params, "smtp").await?;
131        let mut first_error = None;
132        for lp in login_params {
133            info!(context, "SMTP trying to connect to {}.", &lp.connection);
134            let transport = match connect::connect_and_auth(
135                context,
136                proxy_config,
137                strict_tls,
138                lp.connection.clone(),
139                oauth2,
140                addr,
141                &lp.user,
142                password,
143            )
144            .await
145            {
146                Ok(transport) => transport,
147                Err(err) => {
148                    warn!(context, "SMTP failed to connect and authenticate: {err:#}.");
149                    first_error.get_or_insert(err);
150                    continue;
151                }
152            };
153
154            self.transport = Some(transport);
155            self.last_success = Some(tools::Time::now());
156
157            context.emit_event(EventType::SmtpConnected(format!(
158                "SMTP-LOGIN as {} ok",
159                lp.user,
160            )));
161            return Ok(());
162        }
163
164        Err(first_error.unwrap_or_else(|| format_err!("No SMTP connection candidates provided")))
165    }
166}
167
168pub(crate) enum SendResult {
169    /// Message was sent successfully.
170    Success,
171
172    /// Permanent error, message sending has failed.
173    Failure(Error),
174
175    /// Temporary error, the message should be retried later.
176    Retry,
177}
178
179/// Tries to send a message.
180pub(crate) async fn smtp_send(
181    context: &Context,
182    recipients: &[async_smtp::EmailAddress],
183    message: &str,
184    smtp: &mut Smtp,
185    msg_id: Option<MsgId>,
186) -> SendResult {
187    if recipients.is_empty() {
188        return SendResult::Success;
189    }
190    if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
191        info!(context, "SMTP-sending out mime message:\n{message}");
192    }
193
194    smtp.connectivity.set_working(context);
195
196    if let Err(err) = smtp
197        .connect_configured(context)
198        .await
199        .context("Failed to open SMTP connection")
200    {
201        smtp.last_send_error = Some(format!("{err:#}"));
202        return SendResult::Retry;
203    }
204
205    let send_result = smtp.send(context, recipients, message.as_bytes()).await;
206    smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
207
208    let status = match send_result {
209        Err(crate::smtp::send::Error::SmtpSend(err)) => {
210            // Remote error, retry later.
211            info!(context, "SMTP failed to send: {:?}.", &err);
212
213            let res = match err {
214                async_smtp::error::Error::Permanent(ref response) => {
215                    // Workaround for incorrectly configured servers returning permanent errors
216                    // instead of temporary ones.
217                    let maybe_transient = match response.code {
218                        // Sometimes servers send a permanent error when actually it is a temporary error
219                        // For documentation see <https://tools.ietf.org/html/rfc3463>
220                        Code {
221                            category: Category::MailSystem,
222                            detail: Detail::Zero,
223                            ..
224                        } => {
225                            // Ignore status code 5.5.0, see <https://support.delta.chat/t/every-other-message-gets-stuck/877/2>
226                            // Maybe incorrectly configured Postfix milter with "reject" instead of "tempfail", which returns
227                            // "550 5.5.0 Service unavailable" instead of "451 4.7.1 Service unavailable - try again later".
228                            //
229                            // Other enhanced status codes, such as Postfix
230                            // "550 5.1.1 <foobar@example.org>: Recipient address rejected: User unknown in local recipient table"
231                            // are not ignored.
232                            response.first_word() == Some("5.5.0")
233                        }
234                        _ => false,
235                    };
236
237                    if maybe_transient {
238                        info!(
239                            context,
240                            "Permanent error that is likely to actually be transient, postponing retry for later."
241                        );
242                        SendResult::Retry
243                    } else {
244                        info!(context, "Permanent error, message sending failed.");
245                        // If we do not retry, add an info message to the chat.
246                        // Yandex error "554 5.7.1 [2] Message rejected under suspicion of SPAM; https://ya.cc/..."
247                        // should definitely go here, because user has to open the link to
248                        // resume message sending.
249                        SendResult::Failure(format_err!("Permanent SMTP error: {err}"))
250                    }
251                }
252                async_smtp::error::Error::Transient(ref response) => {
253                    // We got a transient 4xx response from SMTP server.
254                    // Give some time until the server-side error maybe goes away.
255                    //
256                    // One particular case is
257                    // `450 4.1.2 <alice@example.org>: Recipient address rejected: Domain not found`.
258                    // known to be returned by Postfix.
259                    //
260                    // [RFC 3463](https://tools.ietf.org/html/rfc3463#section-3.2)
261                    // says "This code is only useful for permanent failures."
262                    // in X.1.1, X.1.2 and X.1.3 descriptions.
263                    //
264                    // Previous Delta Chat core versions
265                    // from 1.51.0 to 1.151.1
266                    // were treating such errors as permanent.
267                    //
268                    // This was later reverted because such errors were observed
269                    // for existing domains and turned out to be actually transient,
270                    // likely caused by nameserver downtime.
271                    info!(
272                        context,
273                        "Transient error {response:?}, postponing retry for later."
274                    );
275                    SendResult::Retry
276                }
277                _ => {
278                    info!(
279                        context,
280                        "Message sending failed without error returned by the server, retry later."
281                    );
282                    SendResult::Retry
283                }
284            };
285
286            // this clears last_success info
287            info!(context, "Failed to send message over SMTP, disconnecting.");
288            smtp.disconnect();
289
290            res
291        }
292        Err(crate::smtp::send::Error::Envelope(err)) => {
293            // Local error, job is invalid, do not retry.
294            smtp.disconnect();
295            warn!(context, "SMTP job is invalid: {err:#}.");
296            SendResult::Failure(err)
297        }
298        Err(crate::smtp::send::Error::NoTransport) => {
299            // Should never happen.
300            // It does not even make sense to disconnect here.
301            error!(context, "SMTP job failed because SMTP has no transport.");
302            SendResult::Failure(format_err!("SMTP has not transport"))
303        }
304        Err(crate::smtp::send::Error::Other(err)) => {
305            // Local error, job is invalid, do not retry.
306            smtp.disconnect();
307            warn!(context, "Unable to load SMTP job: {err:#}.");
308            SendResult::Failure(err)
309        }
310        Ok(()) => SendResult::Success,
311    };
312
313    if let SendResult::Failure(err) = &status
314        && let Some(msg_id) = msg_id
315    {
316        // We couldn't send the message, so mark it as failed
317        match Message::load_from_db(context, msg_id).await {
318            Ok(mut msg) => {
319                if let Err(err) = message::set_msg_failed(context, &mut msg, &err.to_string()).await
320                {
321                    error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
322                }
323            }
324            Err(err) => {
325                error!(
326                    context,
327                    "Failed to load {msg_id} to mark it as failed: {err:#}."
328                );
329            }
330        }
331    }
332    status
333}
334
335/// Sends message identified by `smtp` table rowid over SMTP connection.
336///
337/// Removes row if the message should not be retried, otherwise increments retry count.
338pub(crate) async fn send_msg_to_smtp(
339    context: &Context,
340    smtp: &mut Smtp,
341    rowid: i64,
342) -> anyhow::Result<()> {
343    if let Err(err) = smtp
344        .connect_configured(context)
345        .await
346        .context("SMTP connection failure")
347    {
348        smtp.last_send_error = Some(format!("{err:#}"));
349        return Err(err);
350    }
351
352    // Increase retry count as soon as we have an SMTP connection. This ensures that the message is
353    // eventually removed from the queue by exceeding retry limit even in case of an error that
354    // keeps happening early in the message sending code, e.g. failure to read the message from the
355    // database.
356    context
357        .sql
358        .execute("UPDATE smtp SET retries=retries+1 WHERE id=?", (rowid,))
359        .await
360        .context("failed to update retries count")?;
361
362    let Some((body, recipients, msg_id, retries)) = context
363        .sql
364        .query_row_optional(
365            "SELECT mime, recipients, msg_id, retries FROM smtp WHERE id=?",
366            (rowid,),
367            |row| {
368                let mime: String = row.get(0)?;
369                let recipients: String = row.get(1)?;
370                let msg_id: MsgId = row.get(2)?;
371                let retries: i64 = row.get(3)?;
372                Ok((mime, recipients, msg_id, retries))
373            },
374        )
375        .await?
376    else {
377        return Ok(());
378    };
379    if retries > 6 {
380        context
381            .sql
382            .execute("DELETE FROM smtp WHERE id=?", (rowid,))
383            .await
384            .context("Failed to remove message with exceeded retry limit from smtp table")?;
385        if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
386            message::set_msg_failed(context, &mut msg, "Number of retries exceeded the limit.")
387                .await?;
388        }
389        return Ok(());
390    }
391    info!(
392        context,
393        "Try number {retries} to send message {msg_id} (entry {rowid}) over SMTP."
394    );
395
396    let recipients_list = recipients
397        .split(' ')
398        .filter_map(
399            |addr| match async_smtp::EmailAddress::new(addr.to_string()) {
400                Ok(addr) => Some(addr),
401                Err(err) => {
402                    warn!(context, "Invalid recipient: {} {:?}.", addr, err);
403                    None
404                }
405            },
406        )
407        .collect::<Vec<_>>();
408
409    let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
410
411    match status {
412        SendResult::Retry => {}
413        SendResult::Success => {
414            context
415                .sql
416                .execute("DELETE FROM smtp WHERE id=?", (rowid,))
417                .await?;
418        }
419        SendResult::Failure(ref err) => {
420            if err
421                .to_string()
422                .to_lowercase()
423                .contains("invalid unencrypted mail")
424            {
425                let res = context
426                    .sql
427                    .query_row_optional(
428                        "SELECT chat_id, timestamp FROM msgs WHERE id=?;",
429                        (msg_id,),
430                        |row| Ok((row.get::<_, ChatId>(0)?, row.get::<_, i64>(1)?)),
431                    )
432                    .await?;
433
434                if let Some((chat_id, timestamp_sort)) = res {
435                    let addr = context.get_config(Config::ConfiguredAddr).await?;
436                    let text = unencrypted_email(
437                        context,
438                        addr.unwrap_or_default()
439                            .split('@')
440                            .nth(1)
441                            .unwrap_or_default(),
442                    )
443                    .await;
444                    add_info_msg_with_cmd(
445                        context,
446                        chat_id,
447                        &text,
448                        crate::mimeparser::SystemMessage::InvalidUnencryptedMail,
449                        Some(timestamp_sort),
450                        timestamp_sort,
451                        None,
452                        None,
453                        None,
454                    )
455                    .await?;
456                };
457            }
458            context
459                .sql
460                .execute("DELETE FROM smtp WHERE id=?", (rowid,))
461                .await?;
462        }
463    };
464
465    match status {
466        SendResult::Retry => Err(format_err!("Retry")),
467        SendResult::Success => {
468            if !msg_has_pending_smtp_job(context, msg_id).await? {
469                msg_id.set_delivered(context).await?;
470            }
471            Ok(())
472        }
473        SendResult::Failure(err) => Err(format_err!("{err}")),
474    }
475}
476
477pub(crate) async fn msg_has_pending_smtp_job(
478    context: &Context,
479    msg_id: MsgId,
480) -> Result<bool, Error> {
481    context
482        .sql
483        .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
484        .await
485}
486
487/// Attempts to send queued MDNs.
488async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
489    loop {
490        if !context.ratelimit.read().await.can_send() {
491            info!(context, "Ratelimiter does not allow sending MDNs now.");
492            return Ok(());
493        }
494
495        let more_mdns = send_mdn(context, connection).await?;
496        if !more_mdns {
497            // No more MDNs to send or one of them failed.
498            return Ok(());
499        }
500    }
501}
502
503/// Tries to send all messages currently in `smtp`, `smtp_status_updates` and `smtp_mdns` tables.
504pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
505    let ratelimited = if context.ratelimit.read().await.can_send() {
506        // add status updates and sync messages to end of sending queue
507        context.send_sync_msg().await?;
508        context.flush_status_updates().await?;
509        false
510    } else {
511        true
512    };
513
514    let rowids = context
515        .sql
516        .query_map_vec("SELECT id FROM smtp ORDER BY id ASC", (), |row| {
517            let rowid: i64 = row.get(0)?;
518            Ok(rowid)
519        })
520        .await?;
521
522    info!(context, "Selected rows from SMTP queue: {rowids:?}.");
523    for rowid in rowids {
524        send_msg_to_smtp(context, connection, rowid)
525            .await
526            .context("Failed to send message")?;
527    }
528
529    // although by slow sending, ratelimit may have been expired meanwhile,
530    // do not attempt to send MDNs if ratelimited happened before on status-updates/sync:
531    // instead, let the caller recall this function so that more important status-updates/sync are sent out.
532    if !ratelimited {
533        send_mdns(context, connection)
534            .await
535            .context("Failed to send MDNs")?;
536    }
537    Ok(())
538}
539
540/// Tries to send MDN for message identified by `rfc724_mdn` to `contact_id`.
541///
542/// Attempts to aggregate additional MDNs for `contact_id` into sent MDN.
543///
544/// On failure returns an error without removing any `smtp_mdns` entries, the caller is responsible
545/// for removing the corresponding entry to prevent endless loop in case the entry is invalid, e.g.
546/// points to non-existent message or contact.
547///
548/// Returns true on success, false on temporary error.
549async fn send_mdn_rfc724_mid(
550    context: &Context,
551    rfc724_mid: &str,
552    contact_id: ContactId,
553    smtp: &mut Smtp,
554) -> Result<bool> {
555    let contact = Contact::get_by_id(context, contact_id).await?;
556    if contact.is_blocked() {
557        return Err(format_err!("Contact is blocked"));
558    }
559
560    // Try to aggregate additional MDNs into this MDN.
561    let additional_rfc724_mids = context
562        .sql
563        .query_map_vec(
564            "SELECT rfc724_mid
565             FROM smtp_mdns
566             WHERE from_id=? AND rfc724_mid!=?",
567            (contact_id, &rfc724_mid),
568            |row| {
569                let rfc724_mid: String = row.get(0)?;
570                Ok(rfc724_mid)
571            },
572        )
573        .await?;
574
575    let mimefactory = MimeFactory::from_mdn(
576        context,
577        contact_id,
578        rfc724_mid.to_string(),
579        additional_rfc724_mids.clone(),
580    )
581    .await?;
582    let encrypted = mimefactory.will_be_encrypted();
583    let rendered_msg = mimefactory.render(context).await?;
584    let body = rendered_msg.message;
585
586    let mut recipients = Vec::new();
587    if contact_id != ContactId::SELF {
588        recipients.push(contact.get_addr().to_string());
589    }
590    if context.get_config_bool(Config::BccSelf).await? {
591        add_self_recipients(context, &mut recipients, encrypted).await?;
592    }
593    let recipients: Vec<_> = recipients
594        .into_iter()
595        .filter_map(|addr| {
596            async_smtp::EmailAddress::new(addr.clone())
597                .with_context(|| format!("Invalid recipient: {addr}"))
598                .log_err(context)
599                .ok()
600        })
601        .collect();
602
603    match smtp_send(context, &recipients, &body, smtp, None).await {
604        SendResult::Success => {
605            if !recipients.is_empty() {
606                info!(context, "Successfully sent MDN for {rfc724_mid}.");
607            }
608            context
609                .sql
610                .transaction(|transaction| {
611                    let mut stmt =
612                        transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
613                    stmt.execute((rfc724_mid,))?;
614                    for additional_rfc724_mid in additional_rfc724_mids {
615                        stmt.execute((additional_rfc724_mid,))?;
616                    }
617                    Ok(())
618                })
619                .await?;
620            Ok(true)
621        }
622        SendResult::Retry => {
623            info!(
624                context,
625                "Temporary SMTP failure while sending an MDN for {rfc724_mid}."
626            );
627            Ok(false)
628        }
629        SendResult::Failure(err) => Err(err),
630    }
631}
632
633/// Tries to send a single MDN. Returns true if more MDNs should be sent.
634async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
635    if !context.should_send_mdns().await? {
636        context.sql.execute("DELETE FROM smtp_mdns", []).await?;
637        return Ok(false);
638    }
639    info!(context, "Sending MDNs.");
640
641    context
642        .sql
643        .execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
644        .await?;
645    let Some(msg_row) = context
646        .sql
647        .query_row_optional(
648            "SELECT rfc724_mid, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
649            [],
650            |row| {
651                let rfc724_mid: String = row.get(0)?;
652                let from_id: ContactId = row.get(1)?;
653                Ok((rfc724_mid, from_id))
654            },
655        )
656        .await?
657    else {
658        return Ok(false);
659    };
660    let (rfc724_mid, contact_id) = msg_row;
661
662    context
663        .sql
664        .execute(
665            "UPDATE smtp_mdns SET retries=retries+1 WHERE rfc724_mid=?",
666            (rfc724_mid.clone(),),
667        )
668        .await
669        .context("Failed to update MDN retries count")?;
670
671    match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await {
672        Err(err) => {
673            // If there is an error, for example there is no message corresponding to the msg_id in the
674            // database, do not try to send this MDN again.
675            warn!(
676                context,
677                "Error sending MDN for {rfc724_mid}, removing it: {err:#}."
678            );
679            context
680                .sql
681                .execute("DELETE FROM smtp_mdns WHERE rfc724_mid = ?", (rfc724_mid,))
682                .await?;
683            Err(err)
684        }
685        Ok(false) => {
686            bail!("Temporary error while sending an MDN");
687        }
688        Ok(true) => {
689            // Successfully sent MDN.
690            Ok(true)
691        }
692    }
693}
694
695/// Adds self-addresses to `recipients` as necessary.
696/// This doesn't check `Config::BccSelf`, it should be checked by the caller if needed.
697pub(crate) async fn add_self_recipients(
698    context: &Context,
699    recipients: &mut Vec<String>,
700    encrypted: bool,
701) -> Result<()> {
702    // Previous versions of Delta Chat did not send BCC self
703    // if DeleteServerAfter was set to immediately delete messages
704    // from the server. This is not the case anymore
705    // because BCC-self messages are also used to detect
706    // that message was sent if SMTP server is slow to respond
707    // and connection is frequently lost
708    // before receiving status line. NB: This is not a problem for chatmail servers, so `BccSelf`
709    // disabled by default is fine.
710    if context.get_config_delete_server_after().await? != Some(0) || !recipients.is_empty() {
711        // Avoid sending unencrypted messages to all transports, chatmail relays won't accept
712        // them. Normally the user should have a non-chatmail primary transport to send unencrypted
713        // messages.
714        if encrypted {
715            for addr in context.get_published_secondary_self_addrs().await? {
716                recipients.push(addr);
717            }
718        }
719        // `from` must be the last addr, see `receive_imf_inner()` why.
720        let from = context.get_primary_self_addr().await?;
721        recipients.push(from);
722    }
723    Ok(())
724}