deltachat/smtp/
connect.rs

1//! SMTP connection establishment.
2
3use std::net::SocketAddr;
4
5use anyhow::{Context as _, Result, bail};
6use async_smtp::{SmtpClient, SmtpTransport};
7use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
8
9use crate::context::Context;
10use crate::log::warn;
11use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
12use crate::net::proxy::ProxyConfig;
13use crate::net::session::SessionBufStream;
14use crate::net::tls::{SpkiHashStore, TlsSessionStore, wrap_tls};
15use crate::net::{
16    connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
17};
18use crate::oauth2::get_oauth2_access_token;
19use crate::sql::Sql;
20use crate::tools::time;
21use crate::transport::ConnectionCandidate;
22use crate::transport::ConnectionSecurity;
23
24/// Converts port number to ALPN.
25fn alpn(port: u16) -> &'static str {
26    if port == 465 {
27        // Do not request ALPN on standard port.
28        ""
29    } else {
30        "smtp"
31    }
32}
33
34// Constructs a new SMTP transport
35// over a stream with already skipped SMTP greeting.
36async fn new_smtp_transport<S: AsyncBufRead + AsyncWrite + Unpin>(
37    stream: S,
38) -> Result<SmtpTransport<S>> {
39    // We always read the greeting manually to unify
40    // the cases of STARTTLS where the greeting is
41    // sent outside the encrypted channel and implicit TLS
42    // where the greeting is sent after establishing TLS channel.
43    let client = SmtpClient::new().smtp_utf8(true).without_greeting();
44
45    let transport = SmtpTransport::new(client, stream)
46        .await
47        .context("Failed to send EHLO command")?;
48    Ok(transport)
49}
50
51#[expect(clippy::too_many_arguments)]
52pub(crate) async fn connect_and_auth(
53    context: &Context,
54    proxy_config: &Option<ProxyConfig>,
55    strict_tls: bool,
56    candidate: ConnectionCandidate,
57    oauth2: bool,
58    addr: &str,
59    user: &str,
60    password: &str,
61) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
62    let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate)
63        .await
64        .context("SMTP failed to connect")?;
65    let mut transport = new_smtp_transport(session_stream).await?;
66
67    // Authenticate.
68    let (creds, mechanism) = if oauth2 {
69        // oauth2
70        let access_token = get_oauth2_access_token(context, addr, password, false)
71            .await
72            .context("SMTP failed to get OAUTH2 access token")?;
73        if access_token.is_none() {
74            bail!("SMTP OAuth 2 error {addr}");
75        }
76        (
77            async_smtp::authentication::Credentials::new(
78                user.to_string(),
79                access_token.unwrap_or_default(),
80            ),
81            vec![async_smtp::authentication::Mechanism::Xoauth2],
82        )
83    } else {
84        // plain
85        (
86            async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()),
87            vec![
88                async_smtp::authentication::Mechanism::Plain,
89                async_smtp::authentication::Mechanism::Login,
90            ],
91        )
92    };
93    transport
94        .try_login(&creds, &mechanism)
95        .await
96        .context("SMTP failed to login")?;
97    Ok(transport)
98}
99
100async fn connection_attempt(
101    context: Context,
102    host: String,
103    security: ConnectionSecurity,
104    resolved_addr: SocketAddr,
105    strict_tls: bool,
106) -> Result<Box<dyn SessionBufStream>> {
107    let context = &context;
108    let host = &host;
109    info!(
110        context,
111        "Attempting SMTP connection to {host} ({resolved_addr})."
112    );
113    let res = match security {
114        ConnectionSecurity::Tls => {
115            connect_secure(
116                resolved_addr,
117                host,
118                strict_tls,
119                &context.tls_session_store,
120                &context.spki_hash_store,
121                &context.sql,
122            )
123            .await
124        }
125        ConnectionSecurity::Starttls => {
126            connect_starttls(
127                resolved_addr,
128                host,
129                strict_tls,
130                &context.tls_session_store,
131                &context.spki_hash_store,
132                &context.sql,
133            )
134            .await
135        }
136        ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
137    };
138    match res {
139        Ok(stream) => {
140            let ip_addr = resolved_addr.ip().to_string();
141            let port = resolved_addr.port();
142
143            let save_cache = match security {
144                ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
145                ConnectionSecurity::Plain => false,
146            };
147            if save_cache {
148                update_connect_timestamp(context, host, &ip_addr).await?;
149            }
150            update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
151            Ok(stream)
152        }
153        Err(err) => {
154            warn!(
155                context,
156                "SMTP failed to connect to {host} ({resolved_addr}): {err:#}."
157            );
158            Err(err)
159        }
160    }
161}
162
163/// Returns TLS, STARTTLS or plaintext connection
164/// using SOCKS5 or direct connection depending on the given configuration.
165///
166/// Connection is returned after skipping the welcome message
167/// and is ready for sending commands. Because SMTP STARTTLS
168/// does not send welcome message over TLS connection
169/// after establishing it, welcome message is always ignored
170/// to unify the result regardless of whether TLS or STARTTLS is used.
171async fn connect_stream(
172    context: &Context,
173    proxy_config: Option<ProxyConfig>,
174    strict_tls: bool,
175    candidate: ConnectionCandidate,
176) -> Result<Box<dyn SessionBufStream>> {
177    let host = &candidate.host;
178    let port = candidate.port;
179    let security = candidate.security;
180
181    if let Some(proxy_config) = proxy_config {
182        let stream = match security {
183            ConnectionSecurity::Tls => {
184                connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await?
185            }
186            ConnectionSecurity::Starttls => {
187                connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone())
188                    .await?
189            }
190            ConnectionSecurity::Plain => {
191                connect_insecure_proxy(context, host, port, proxy_config.clone()).await?
192            }
193        };
194        update_connection_history(context, "smtp", host, port, host, time()).await?;
195        Ok(stream)
196    } else {
197        let load_cache = match security {
198            ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
199            ConnectionSecurity::Plain => false,
200        };
201
202        let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
203            .await?
204            .into_iter()
205            .map(|resolved_addr| {
206                let context = context.clone();
207                let host = host.to_string();
208                connection_attempt(context, host, security, resolved_addr, strict_tls)
209            });
210        run_connection_attempts(connection_futures).await
211    }
212}
213
214/// Reads and ignores SMTP greeting.
215///
216/// This function is used to unify
217/// TLS, STARTTLS and plaintext connection setup
218/// by skipping the greeting in case of TLS
219/// and STARTTLS connection setup.
220async fn skip_smtp_greeting<R: tokio::io::AsyncBufReadExt + Unpin>(stream: &mut R) -> Result<()> {
221    let mut line = String::with_capacity(512);
222    loop {
223        line.clear();
224        let read = stream
225            .read_line(&mut line)
226            .await
227            .context("Failed to read from stream while waiting for SMTP greeting")?;
228        if read == 0 {
229            bail!("Unexpected EOF while reading SMTP greeting");
230        }
231        if line.starts_with("220-") {
232            continue;
233        } else if line.starts_with("220 ") {
234            return Ok(());
235        } else {
236            bail!("Unexpected greeting: {line:?}");
237        }
238    }
239}
240
241async fn connect_secure_proxy(
242    context: &Context,
243    hostname: &str,
244    port: u16,
245    strict_tls: bool,
246    proxy_config: ProxyConfig,
247) -> Result<Box<dyn SessionBufStream>> {
248    let use_sni = true;
249    let proxy_stream = proxy_config
250        .connect(context, hostname, port, strict_tls)
251        .await?;
252    let tls_stream = wrap_tls(
253        strict_tls,
254        hostname,
255        port,
256        use_sni,
257        alpn(port),
258        proxy_stream,
259        &context.tls_session_store,
260        &context.spki_hash_store,
261        &context.sql,
262    )
263    .await?;
264    let mut buffered_stream = BufStream::new(tls_stream);
265    skip_smtp_greeting(&mut buffered_stream).await?;
266    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
267    Ok(session_stream)
268}
269
270async fn connect_starttls_proxy(
271    context: &Context,
272    hostname: &str,
273    port: u16,
274    strict_tls: bool,
275    proxy_config: ProxyConfig,
276) -> Result<Box<dyn SessionBufStream>> {
277    let use_sni = false;
278    let proxy_stream = proxy_config
279        .connect(context, hostname, port, strict_tls)
280        .await?;
281
282    // Run STARTTLS command and convert the client back into a stream.
283    let mut buffered_stream = BufStream::new(proxy_stream);
284    skip_smtp_greeting(&mut buffered_stream).await?;
285    let transport = new_smtp_transport(buffered_stream).await?;
286    let tcp_stream = transport.starttls().await?.into_inner();
287    let tls_stream = wrap_tls(
288        strict_tls,
289        hostname,
290        port,
291        use_sni,
292        "",
293        tcp_stream,
294        &context.tls_session_store,
295        &context.spki_hash_store,
296        &context.sql,
297    )
298    .await
299    .context("STARTTLS upgrade failed")?;
300    let buffered_stream = BufStream::new(tls_stream);
301    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
302    Ok(session_stream)
303}
304
305async fn connect_insecure_proxy(
306    context: &Context,
307    hostname: &str,
308    port: u16,
309    proxy_config: ProxyConfig,
310) -> Result<Box<dyn SessionBufStream>> {
311    let proxy_stream = proxy_config.connect(context, hostname, port, false).await?;
312    let mut buffered_stream = BufStream::new(proxy_stream);
313    skip_smtp_greeting(&mut buffered_stream).await?;
314    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
315    Ok(session_stream)
316}
317
318async fn connect_secure(
319    addr: SocketAddr,
320    hostname: &str,
321    strict_tls: bool,
322    tls_session_store: &TlsSessionStore,
323    spki_hash_store: &SpkiHashStore,
324    sql: &Sql,
325) -> Result<Box<dyn SessionBufStream>> {
326    let tls_stream = connect_tls_inner(
327        addr,
328        hostname,
329        strict_tls,
330        alpn(addr.port()),
331        tls_session_store,
332        spki_hash_store,
333        sql,
334    )
335    .await?;
336    let mut buffered_stream = BufStream::new(tls_stream);
337    skip_smtp_greeting(&mut buffered_stream).await?;
338    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
339    Ok(session_stream)
340}
341
342async fn connect_starttls(
343    addr: SocketAddr,
344    host: &str,
345    strict_tls: bool,
346    tls_session_store: &TlsSessionStore,
347    spki_hash_store: &SpkiHashStore,
348    sql: &Sql,
349) -> Result<Box<dyn SessionBufStream>> {
350    let use_sni = false;
351    let tcp_stream = connect_tcp_inner(addr).await?;
352
353    // Run STARTTLS command and convert the client back into a stream.
354    let mut buffered_stream = BufStream::new(tcp_stream);
355    skip_smtp_greeting(&mut buffered_stream).await?;
356    let transport = new_smtp_transport(buffered_stream).await?;
357    let tcp_stream = transport.starttls().await?.into_inner();
358    let tls_stream = wrap_tls(
359        strict_tls,
360        host,
361        addr.port(),
362        use_sni,
363        "",
364        tcp_stream,
365        tls_session_store,
366        spki_hash_store,
367        sql,
368    )
369    .await
370    .context("STARTTLS upgrade failed")?;
371
372    let buffered_stream = BufStream::new(tls_stream);
373    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
374    Ok(session_stream)
375}
376
377async fn connect_insecure(addr: SocketAddr) -> Result<Box<dyn SessionBufStream>> {
378    let tcp_stream = connect_tcp_inner(addr).await?;
379    let mut buffered_stream = BufStream::new(tcp_stream);
380    skip_smtp_greeting(&mut buffered_stream).await?;
381    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
382    Ok(session_stream)
383}
384
385#[cfg(test)]
386mod tests {
387    use tokio::io::BufReader;
388
389    use super::*;
390
391    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
392    async fn test_skip_smtp_greeting() -> Result<()> {
393        let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\
394                         220-We do not authorize the use of this system to transport unsolicited,\r\n\
395                         220 and/or bulk e-mail.\r\n";
396        let mut buffered_stream = BufReader::new(&greeting[..]);
397        skip_smtp_greeting(&mut buffered_stream).await
398    }
399}