deltachat/smtp/
connect.rs

1//! SMTP connection establishment.
2
3use std::net::SocketAddr;
4
5use anyhow::{Context as _, Result, bail};
6use async_smtp::{SmtpClient, SmtpTransport};
7use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
8
9use crate::context::Context;
10use crate::log::{info, warn};
11use crate::login_param::{ConnectionCandidate, ConnectionSecurity};
12use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
13use crate::net::proxy::ProxyConfig;
14use crate::net::session::SessionBufStream;
15use crate::net::tls::{TlsSessionStore, wrap_tls};
16use crate::net::{
17    connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
18};
19use crate::oauth2::get_oauth2_access_token;
20use crate::tools::time;
21
22/// Converts port number to ALPN.
23fn alpn(port: u16) -> &'static str {
24    if port == 465 {
25        // Do not request ALPN on standard port.
26        ""
27    } else {
28        "smtp"
29    }
30}
31
32// Constructs a new SMTP transport
33// over a stream with already skipped SMTP greeting.
34async fn new_smtp_transport<S: AsyncBufRead + AsyncWrite + Unpin>(
35    stream: S,
36) -> Result<SmtpTransport<S>> {
37    // We always read the greeting manually to unify
38    // the cases of STARTTLS where the greeting is
39    // sent outside the encrypted channel and implicit TLS
40    // where the greeting is sent after establishing TLS channel.
41    let client = SmtpClient::new().smtp_utf8(true).without_greeting();
42
43    let transport = SmtpTransport::new(client, stream)
44        .await
45        .context("Failed to send EHLO command")?;
46    Ok(transport)
47}
48
49#[expect(clippy::too_many_arguments)]
50pub(crate) async fn connect_and_auth(
51    context: &Context,
52    proxy_config: &Option<ProxyConfig>,
53    strict_tls: bool,
54    candidate: ConnectionCandidate,
55    oauth2: bool,
56    addr: &str,
57    user: &str,
58    password: &str,
59) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
60    let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate)
61        .await
62        .context("SMTP failed to connect")?;
63    let mut transport = new_smtp_transport(session_stream).await?;
64
65    // Authenticate.
66    let (creds, mechanism) = if oauth2 {
67        // oauth2
68        let access_token = get_oauth2_access_token(context, addr, password, false)
69            .await
70            .context("SMTP failed to get OAUTH2 access token")?;
71        if access_token.is_none() {
72            bail!("SMTP OAuth 2 error {addr}");
73        }
74        (
75            async_smtp::authentication::Credentials::new(
76                user.to_string(),
77                access_token.unwrap_or_default(),
78            ),
79            vec![async_smtp::authentication::Mechanism::Xoauth2],
80        )
81    } else {
82        // plain
83        (
84            async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()),
85            vec![
86                async_smtp::authentication::Mechanism::Plain,
87                async_smtp::authentication::Mechanism::Login,
88            ],
89        )
90    };
91    transport
92        .try_login(&creds, &mechanism)
93        .await
94        .context("SMTP failed to login")?;
95    Ok(transport)
96}
97
98async fn connection_attempt(
99    context: Context,
100    host: String,
101    security: ConnectionSecurity,
102    resolved_addr: SocketAddr,
103    strict_tls: bool,
104) -> Result<Box<dyn SessionBufStream>> {
105    let context = &context;
106    let host = &host;
107    info!(
108        context,
109        "Attempting SMTP connection to {host} ({resolved_addr})."
110    );
111    let res = match security {
112        ConnectionSecurity::Tls => {
113            connect_secure(resolved_addr, host, strict_tls, &context.tls_session_store).await
114        }
115        ConnectionSecurity::Starttls => {
116            connect_starttls(resolved_addr, host, strict_tls, &context.tls_session_store).await
117        }
118        ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
119    };
120    match res {
121        Ok(stream) => {
122            let ip_addr = resolved_addr.ip().to_string();
123            let port = resolved_addr.port();
124
125            let save_cache = match security {
126                ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
127                ConnectionSecurity::Plain => false,
128            };
129            if save_cache {
130                update_connect_timestamp(context, host, &ip_addr).await?;
131            }
132            update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
133            Ok(stream)
134        }
135        Err(err) => {
136            warn!(
137                context,
138                "Failed to connect to {host} ({resolved_addr}): {err:#}."
139            );
140            Err(err)
141        }
142    }
143}
144
145/// Returns TLS, STARTTLS or plaintext connection
146/// using SOCKS5 or direct connection depending on the given configuration.
147///
148/// Connection is returned after skipping the welcome message
149/// and is ready for sending commands. Because SMTP STARTTLS
150/// does not send welcome message over TLS connection
151/// after establishing it, welcome message is always ignored
152/// to unify the result regardless of whether TLS or STARTTLS is used.
153async fn connect_stream(
154    context: &Context,
155    proxy_config: Option<ProxyConfig>,
156    strict_tls: bool,
157    candidate: ConnectionCandidate,
158) -> Result<Box<dyn SessionBufStream>> {
159    let host = &candidate.host;
160    let port = candidate.port;
161    let security = candidate.security;
162
163    if let Some(proxy_config) = proxy_config {
164        let stream = match security {
165            ConnectionSecurity::Tls => {
166                connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await?
167            }
168            ConnectionSecurity::Starttls => {
169                connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone())
170                    .await?
171            }
172            ConnectionSecurity::Plain => {
173                connect_insecure_proxy(context, host, port, proxy_config.clone()).await?
174            }
175        };
176        update_connection_history(context, "smtp", host, port, host, time()).await?;
177        Ok(stream)
178    } else {
179        let load_cache = match security {
180            ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
181            ConnectionSecurity::Plain => false,
182        };
183
184        let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
185            .await?
186            .into_iter()
187            .map(|resolved_addr| {
188                let context = context.clone();
189                let host = host.to_string();
190                connection_attempt(context, host, security, resolved_addr, strict_tls)
191            });
192        run_connection_attempts(connection_futures).await
193    }
194}
195
196/// Reads and ignores SMTP greeting.
197///
198/// This function is used to unify
199/// TLS, STARTTLS and plaintext connection setup
200/// by skipping the greeting in case of TLS
201/// and STARTTLS connection setup.
202async fn skip_smtp_greeting<R: tokio::io::AsyncBufReadExt + Unpin>(stream: &mut R) -> Result<()> {
203    let mut line = String::with_capacity(512);
204    loop {
205        line.clear();
206        let read = stream
207            .read_line(&mut line)
208            .await
209            .context("Failed to read from stream while waiting for SMTP greeting")?;
210        if read == 0 {
211            bail!("Unexpected EOF while reading SMTP greeting");
212        }
213        if line.starts_with("220-") {
214            continue;
215        } else if line.starts_with("220 ") {
216            return Ok(());
217        } else {
218            bail!("Unexpected greeting: {line:?}");
219        }
220    }
221}
222
223async fn connect_secure_proxy(
224    context: &Context,
225    hostname: &str,
226    port: u16,
227    strict_tls: bool,
228    proxy_config: ProxyConfig,
229) -> Result<Box<dyn SessionBufStream>> {
230    let proxy_stream = proxy_config
231        .connect(context, hostname, port, strict_tls)
232        .await?;
233    let tls_stream = wrap_tls(
234        strict_tls,
235        hostname,
236        port,
237        alpn(port),
238        proxy_stream,
239        &context.tls_session_store,
240    )
241    .await?;
242    let mut buffered_stream = BufStream::new(tls_stream);
243    skip_smtp_greeting(&mut buffered_stream).await?;
244    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
245    Ok(session_stream)
246}
247
248async fn connect_starttls_proxy(
249    context: &Context,
250    hostname: &str,
251    port: u16,
252    strict_tls: bool,
253    proxy_config: ProxyConfig,
254) -> Result<Box<dyn SessionBufStream>> {
255    let proxy_stream = proxy_config
256        .connect(context, hostname, port, strict_tls)
257        .await?;
258
259    // Run STARTTLS command and convert the client back into a stream.
260    let mut buffered_stream = BufStream::new(proxy_stream);
261    skip_smtp_greeting(&mut buffered_stream).await?;
262    let transport = new_smtp_transport(buffered_stream).await?;
263    let tcp_stream = transport.starttls().await?.into_inner();
264    let tls_stream = wrap_tls(
265        strict_tls,
266        hostname,
267        port,
268        "",
269        tcp_stream,
270        &context.tls_session_store,
271    )
272    .await
273    .context("STARTTLS upgrade failed")?;
274    let buffered_stream = BufStream::new(tls_stream);
275    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
276    Ok(session_stream)
277}
278
279async fn connect_insecure_proxy(
280    context: &Context,
281    hostname: &str,
282    port: u16,
283    proxy_config: ProxyConfig,
284) -> Result<Box<dyn SessionBufStream>> {
285    let proxy_stream = proxy_config.connect(context, hostname, port, false).await?;
286    let mut buffered_stream = BufStream::new(proxy_stream);
287    skip_smtp_greeting(&mut buffered_stream).await?;
288    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
289    Ok(session_stream)
290}
291
292async fn connect_secure(
293    addr: SocketAddr,
294    hostname: &str,
295    strict_tls: bool,
296    tls_session_store: &TlsSessionStore,
297) -> Result<Box<dyn SessionBufStream>> {
298    let tls_stream = connect_tls_inner(
299        addr,
300        hostname,
301        strict_tls,
302        alpn(addr.port()),
303        tls_session_store,
304    )
305    .await?;
306    let mut buffered_stream = BufStream::new(tls_stream);
307    skip_smtp_greeting(&mut buffered_stream).await?;
308    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
309    Ok(session_stream)
310}
311
312async fn connect_starttls(
313    addr: SocketAddr,
314    host: &str,
315    strict_tls: bool,
316    tls_session_store: &TlsSessionStore,
317) -> Result<Box<dyn SessionBufStream>> {
318    let tcp_stream = connect_tcp_inner(addr).await?;
319
320    // Run STARTTLS command and convert the client back into a stream.
321    let mut buffered_stream = BufStream::new(tcp_stream);
322    skip_smtp_greeting(&mut buffered_stream).await?;
323    let transport = new_smtp_transport(buffered_stream).await?;
324    let tcp_stream = transport.starttls().await?.into_inner();
325    let tls_stream = wrap_tls(
326        strict_tls,
327        host,
328        addr.port(),
329        "",
330        tcp_stream,
331        tls_session_store,
332    )
333    .await
334    .context("STARTTLS upgrade failed")?;
335
336    let buffered_stream = BufStream::new(tls_stream);
337    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
338    Ok(session_stream)
339}
340
341async fn connect_insecure(addr: SocketAddr) -> Result<Box<dyn SessionBufStream>> {
342    let tcp_stream = connect_tcp_inner(addr).await?;
343    let mut buffered_stream = BufStream::new(tcp_stream);
344    skip_smtp_greeting(&mut buffered_stream).await?;
345    let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
346    Ok(session_stream)
347}
348
349#[cfg(test)]
350mod tests {
351    use tokio::io::BufReader;
352
353    use super::*;
354
355    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
356    async fn test_skip_smtp_greeting() -> Result<()> {
357        let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\
358                         220-We do not authorize the use of this system to transport unsolicited,\r\n\
359                         220 and/or bulk e-mail.\r\n";
360        let mut buffered_stream = BufReader::new(&greeting[..]);
361        skip_smtp_greeting(&mut buffered_stream).await
362    }
363}