deltachat/smtp/
connect.rs

1//! SMTP connection establishment.
2
3use std::net::SocketAddr;
4
5use anyhow::{Context as _, Result, bail};
6use async_smtp::{SmtpClient, SmtpTransport};
7use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
8
9use crate::context::Context;
10use crate::log::warn;
11use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
12use crate::net::proxy::ProxyConfig;
13use crate::net::session::SessionBufStream;
14use crate::net::tls::{TlsSessionStore, wrap_tls};
15use crate::net::{
16    connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
17};
18use crate::oauth2::get_oauth2_access_token;
19use crate::tools::time;
20use crate::transport::ConnectionCandidate;
21use crate::transport::ConnectionSecurity;
22
23/// Converts port number to ALPN.
24fn alpn(port: u16) -> &'static str {
25    if port == 465 {
26        // Do not request ALPN on standard port.
27        ""
28    } else {
29        "smtp"
30    }
31}
32
33// Constructs a new SMTP transport
34// over a stream with already skipped SMTP greeting.
35async fn new_smtp_transport<S: AsyncBufRead + AsyncWrite + Unpin>(
36    stream: S,
37) -> Result<SmtpTransport<S>> {
38    // We always read the greeting manually to unify
39    // the cases of STARTTLS where the greeting is
40    // sent outside the encrypted channel and implicit TLS
41    // where the greeting is sent after establishing TLS channel.
42    let client = SmtpClient::new().smtp_utf8(true).without_greeting();
43
44    let transport = SmtpTransport::new(client, stream)
45        .await
46        .context("Failed to send EHLO command")?;
47    Ok(transport)
48}
49
50#[expect(clippy::too_many_arguments)]
51pub(crate) async fn connect_and_auth(
52    context: &Context,
53    proxy_config: &Option<ProxyConfig>,
54    strict_tls: bool,
55    candidate: ConnectionCandidate,
56    oauth2: bool,
57    addr: &str,
58    user: &str,
59    password: &str,
60) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
61    let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate)
62        .await
63        .context("SMTP failed to connect")?;
64    let mut transport = new_smtp_transport(session_stream).await?;
65
66    // Authenticate.
67    let (creds, mechanism) = if oauth2 {
68        // oauth2
69        let access_token = get_oauth2_access_token(context, addr, password, false)
70            .await
71            .context("SMTP failed to get OAUTH2 access token")?;
72        if access_token.is_none() {
73            bail!("SMTP OAuth 2 error {addr}");
74        }
75        (
76            async_smtp::authentication::Credentials::new(
77                user.to_string(),
78                access_token.unwrap_or_default(),
79            ),
80            vec![async_smtp::authentication::Mechanism::Xoauth2],
81        )
82    } else {
83        // plain
84        (
85            async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()),
86            vec![
87                async_smtp::authentication::Mechanism::Plain,
88                async_smtp::authentication::Mechanism::Login,
89            ],
90        )
91    };
92    transport
93        .try_login(&creds, &mechanism)
94        .await
95        .context("SMTP failed to login")?;
96    Ok(transport)
97}
98
99async fn connection_attempt(
100    context: Context,
101    host: String,
102    security: ConnectionSecurity,
103    resolved_addr: SocketAddr,
104    strict_tls: bool,
105) -> Result<Box<dyn SessionBufStream>> {
106    let context = &context;
107    let host = &host;
108    info!(
109        context,
110        "Attempting SMTP connection to {host} ({resolved_addr})."
111    );
112    let res = match security {
113        ConnectionSecurity::Tls => {
114            connect_secure(resolved_addr, host, strict_tls, &context.tls_session_store).await
115        }
116        ConnectionSecurity::Starttls => {
117            connect_starttls(resolved_addr, host, strict_tls, &context.tls_session_store).await
118        }
119        ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
120    };
121    match res {
122        Ok(stream) => {
123            let ip_addr = resolved_addr.ip().to_string();
124            let port = resolved_addr.port();
125
126            let save_cache = match security {
127                ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
128                ConnectionSecurity::Plain => false,
129            };
130            if save_cache {
131                update_connect_timestamp(context, host, &ip_addr).await?;
132            }
133            update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
134            Ok(stream)
135        }
136        Err(err) => {
137            warn!(
138                context,
139                "Failed to connect to {host} ({resolved_addr}): {err:#}."
140            );
141            Err(err)
142        }
143    }
144}
145
146/// Returns TLS, STARTTLS or plaintext connection
147/// using SOCKS5 or direct connection depending on the given configuration.
148///
149/// Connection is returned after skipping the welcome message
150/// and is ready for sending commands. Because SMTP STARTTLS
151/// does not send welcome message over TLS connection
152/// after establishing it, welcome message is always ignored
153/// to unify the result regardless of whether TLS or STARTTLS is used.
154async fn connect_stream(
155    context: &Context,
156    proxy_config: Option<ProxyConfig>,
157    strict_tls: bool,
158    candidate: ConnectionCandidate,
159) -> Result<Box<dyn SessionBufStream>> {
160    let host = &candidate.host;
161    let port = candidate.port;
162    let security = candidate.security;
163
164    if let Some(proxy_config) = proxy_config {
165        let stream = match security {
166            ConnectionSecurity::Tls => {
167                connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await?
168            }
169            ConnectionSecurity::Starttls => {
170                connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone())
171                    .await?
172            }
173            ConnectionSecurity::Plain => {
174                connect_insecure_proxy(context, host, port, proxy_config.clone()).await?
175            }
176        };
177        update_connection_history(context, "smtp", host, port, host, time()).await?;
178        Ok(stream)
179    } else {
180        let load_cache = match security {
181            ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
182            ConnectionSecurity::Plain => false,
183        };
184
185        let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
186            .await?
187            .into_iter()
188            .map(|resolved_addr| {
189                let context = context.clone();
190                let host = host.to_string();
191                connection_attempt(context, host, security, resolved_addr, strict_tls)
192            });
193        run_connection_attempts(connection_futures).await
194    }
195}
196
197/// Reads and ignores SMTP greeting.
198///
199/// This function is used to unify
200/// TLS, STARTTLS and plaintext connection setup
201/// by skipping the greeting in case of TLS
202/// and STARTTLS connection setup.
203async fn skip_smtp_greeting<R: tokio::io::AsyncBufReadExt + Unpin>(stream: &mut R) -> Result<()> {
204    let mut line = String::with_capacity(512);
205    loop {
206        line.clear();
207        let read = stream
208            .read_line(&mut line)
209            .await
210            .context("Failed to read from stream while waiting for SMTP greeting")?;
211        if read == 0 {
212            bail!("Unexpected EOF while reading SMTP greeting");
213        }
214        if line.starts_with("220-") {
215            continue;
216        } else if line.starts_with("220 ") {
217            return Ok(());
218        } else {
219            bail!("Unexpected greeting: {line:?}");
220        }
221    }
222}
223
224async fn connect_secure_proxy(
225    context: &Context,
226    hostname: &str,
227    port: u16,
228    strict_tls: bool,
229    proxy_config: ProxyConfig,
230) -> Result<Box<dyn SessionBufStream>> {
231    let use_sni = true;
232    let proxy_stream = proxy_config
233        .connect(context, hostname, port, strict_tls)
234        .await?;
235    let tls_stream = wrap_tls(
236        strict_tls,
237        hostname,
238        port,
239        use_sni,
240        alpn(port),
241        proxy_stream,
242        &context.tls_session_store,
243    )
244    .await?;
245    let mut buffered_stream = BufStream::new(tls_stream);
246    skip_smtp_greeting(&mut buffered_stream).await?;
247    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
248    Ok(session_stream)
249}
250
251async fn connect_starttls_proxy(
252    context: &Context,
253    hostname: &str,
254    port: u16,
255    strict_tls: bool,
256    proxy_config: ProxyConfig,
257) -> Result<Box<dyn SessionBufStream>> {
258    let use_sni = false;
259    let proxy_stream = proxy_config
260        .connect(context, hostname, port, strict_tls)
261        .await?;
262
263    // Run STARTTLS command and convert the client back into a stream.
264    let mut buffered_stream = BufStream::new(proxy_stream);
265    skip_smtp_greeting(&mut buffered_stream).await?;
266    let transport = new_smtp_transport(buffered_stream).await?;
267    let tcp_stream = transport.starttls().await?.into_inner();
268    let tls_stream = wrap_tls(
269        strict_tls,
270        hostname,
271        port,
272        use_sni,
273        "",
274        tcp_stream,
275        &context.tls_session_store,
276    )
277    .await
278    .context("STARTTLS upgrade failed")?;
279    let buffered_stream = BufStream::new(tls_stream);
280    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
281    Ok(session_stream)
282}
283
284async fn connect_insecure_proxy(
285    context: &Context,
286    hostname: &str,
287    port: u16,
288    proxy_config: ProxyConfig,
289) -> Result<Box<dyn SessionBufStream>> {
290    let proxy_stream = proxy_config.connect(context, hostname, port, false).await?;
291    let mut buffered_stream = BufStream::new(proxy_stream);
292    skip_smtp_greeting(&mut buffered_stream).await?;
293    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
294    Ok(session_stream)
295}
296
297async fn connect_secure(
298    addr: SocketAddr,
299    hostname: &str,
300    strict_tls: bool,
301    tls_session_store: &TlsSessionStore,
302) -> Result<Box<dyn SessionBufStream>> {
303    let tls_stream = connect_tls_inner(
304        addr,
305        hostname,
306        strict_tls,
307        alpn(addr.port()),
308        tls_session_store,
309    )
310    .await?;
311    let mut buffered_stream = BufStream::new(tls_stream);
312    skip_smtp_greeting(&mut buffered_stream).await?;
313    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
314    Ok(session_stream)
315}
316
317async fn connect_starttls(
318    addr: SocketAddr,
319    host: &str,
320    strict_tls: bool,
321    tls_session_store: &TlsSessionStore,
322) -> Result<Box<dyn SessionBufStream>> {
323    let use_sni = false;
324    let tcp_stream = connect_tcp_inner(addr).await?;
325
326    // Run STARTTLS command and convert the client back into a stream.
327    let mut buffered_stream = BufStream::new(tcp_stream);
328    skip_smtp_greeting(&mut buffered_stream).await?;
329    let transport = new_smtp_transport(buffered_stream).await?;
330    let tcp_stream = transport.starttls().await?.into_inner();
331    let tls_stream = wrap_tls(
332        strict_tls,
333        host,
334        addr.port(),
335        use_sni,
336        "",
337        tcp_stream,
338        tls_session_store,
339    )
340    .await
341    .context("STARTTLS upgrade failed")?;
342
343    let buffered_stream = BufStream::new(tls_stream);
344    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
345    Ok(session_stream)
346}
347
348async fn connect_insecure(addr: SocketAddr) -> Result<Box<dyn SessionBufStream>> {
349    let tcp_stream = connect_tcp_inner(addr).await?;
350    let mut buffered_stream = BufStream::new(tcp_stream);
351    skip_smtp_greeting(&mut buffered_stream).await?;
352    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
353    Ok(session_stream)
354}
355
356#[cfg(test)]
357mod tests {
358    use tokio::io::BufReader;
359
360    use super::*;
361
362    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
363    async fn test_skip_smtp_greeting() -> Result<()> {
364        let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\
365                         220-We do not authorize the use of this system to transport unsolicited,\r\n\
366                         220 and/or bulk e-mail.\r\n";
367        let mut buffered_stream = BufReader::new(&greeting[..]);
368        skip_smtp_greeting(&mut buffered_stream).await
369    }
370}