1use std::net::SocketAddr;
4
5use anyhow::{Context as _, Result, bail};
6use async_smtp::{SmtpClient, SmtpTransport};
7use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
8
9use crate::context::Context;
10use crate::log::warn;
11use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
12use crate::net::proxy::ProxyConfig;
13use crate::net::session::SessionBufStream;
14use crate::net::tls::{SpkiHashStore, TlsSessionStore, wrap_tls};
15use crate::net::{
16 connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
17};
18use crate::oauth2::get_oauth2_access_token;
19use crate::sql::Sql;
20use crate::tools::time;
21use crate::transport::ConnectionCandidate;
22use crate::transport::ConnectionSecurity;
23
24fn alpn(port: u16) -> &'static str {
26 if port == 465 {
27 ""
29 } else {
30 "smtp"
31 }
32}
33
34async fn new_smtp_transport<S: AsyncBufRead + AsyncWrite + Unpin>(
37 stream: S,
38) -> Result<SmtpTransport<S>> {
39 let client = SmtpClient::new().smtp_utf8(true).without_greeting();
44
45 let transport = SmtpTransport::new(client, stream)
46 .await
47 .context("Failed to send EHLO command")?;
48 Ok(transport)
49}
50
51#[expect(clippy::too_many_arguments)]
52pub(crate) async fn connect_and_auth(
53 context: &Context,
54 proxy_config: &Option<ProxyConfig>,
55 strict_tls: bool,
56 candidate: ConnectionCandidate,
57 oauth2: bool,
58 addr: &str,
59 user: &str,
60 password: &str,
61) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
62 let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate)
63 .await
64 .context("SMTP failed to connect")?;
65 let mut transport = new_smtp_transport(session_stream).await?;
66
67 let (creds, mechanism) = if oauth2 {
69 let access_token = get_oauth2_access_token(context, addr, password, false)
71 .await
72 .context("SMTP failed to get OAUTH2 access token")?;
73 if access_token.is_none() {
74 bail!("SMTP OAuth 2 error {addr}");
75 }
76 (
77 async_smtp::authentication::Credentials::new(
78 user.to_string(),
79 access_token.unwrap_or_default(),
80 ),
81 vec![async_smtp::authentication::Mechanism::Xoauth2],
82 )
83 } else {
84 (
86 async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()),
87 vec![
88 async_smtp::authentication::Mechanism::Plain,
89 async_smtp::authentication::Mechanism::Login,
90 ],
91 )
92 };
93 transport
94 .try_login(&creds, &mechanism)
95 .await
96 .context("SMTP failed to login")?;
97 Ok(transport)
98}
99
100async fn connection_attempt(
101 context: Context,
102 host: String,
103 security: ConnectionSecurity,
104 resolved_addr: SocketAddr,
105 strict_tls: bool,
106) -> Result<Box<dyn SessionBufStream>> {
107 let context = &context;
108 let host = &host;
109 info!(
110 context,
111 "Attempting SMTP connection to {host} ({resolved_addr})."
112 );
113 let res = match security {
114 ConnectionSecurity::Tls => {
115 connect_secure(
116 resolved_addr,
117 host,
118 strict_tls,
119 &context.tls_session_store,
120 &context.spki_hash_store,
121 &context.sql,
122 )
123 .await
124 }
125 ConnectionSecurity::Starttls => {
126 connect_starttls(
127 resolved_addr,
128 host,
129 strict_tls,
130 &context.tls_session_store,
131 &context.spki_hash_store,
132 &context.sql,
133 )
134 .await
135 }
136 ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
137 };
138 match res {
139 Ok(stream) => {
140 let ip_addr = resolved_addr.ip().to_string();
141 let port = resolved_addr.port();
142
143 let save_cache = match security {
144 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
145 ConnectionSecurity::Plain => false,
146 };
147 if save_cache {
148 update_connect_timestamp(context, host, &ip_addr).await?;
149 }
150 update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
151 Ok(stream)
152 }
153 Err(err) => {
154 warn!(
155 context,
156 "SMTP failed to connect to {host} ({resolved_addr}): {err:#}."
157 );
158 Err(err)
159 }
160 }
161}
162
163async fn connect_stream(
172 context: &Context,
173 proxy_config: Option<ProxyConfig>,
174 strict_tls: bool,
175 candidate: ConnectionCandidate,
176) -> Result<Box<dyn SessionBufStream>> {
177 let host = &candidate.host;
178 let port = candidate.port;
179 let security = candidate.security;
180
181 if let Some(proxy_config) = proxy_config {
182 let stream = match security {
183 ConnectionSecurity::Tls => {
184 connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await?
185 }
186 ConnectionSecurity::Starttls => {
187 connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone())
188 .await?
189 }
190 ConnectionSecurity::Plain => {
191 connect_insecure_proxy(context, host, port, proxy_config.clone()).await?
192 }
193 };
194 update_connection_history(context, "smtp", host, port, host, time()).await?;
195 Ok(stream)
196 } else {
197 let load_cache = match security {
198 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
199 ConnectionSecurity::Plain => false,
200 };
201
202 let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
203 .await?
204 .into_iter()
205 .map(|resolved_addr| {
206 let context = context.clone();
207 let host = host.to_string();
208 connection_attempt(context, host, security, resolved_addr, strict_tls)
209 });
210 run_connection_attempts(connection_futures).await
211 }
212}
213
214async fn skip_smtp_greeting<R: tokio::io::AsyncBufReadExt + Unpin>(stream: &mut R) -> Result<()> {
221 let mut line = String::with_capacity(512);
222 loop {
223 line.clear();
224 let read = stream
225 .read_line(&mut line)
226 .await
227 .context("Failed to read from stream while waiting for SMTP greeting")?;
228 if read == 0 {
229 bail!("Unexpected EOF while reading SMTP greeting");
230 }
231 if line.starts_with("220-") {
232 continue;
233 } else if line.starts_with("220 ") {
234 return Ok(());
235 } else {
236 bail!("Unexpected greeting: {line:?}");
237 }
238 }
239}
240
241async fn connect_secure_proxy(
242 context: &Context,
243 hostname: &str,
244 port: u16,
245 strict_tls: bool,
246 proxy_config: ProxyConfig,
247) -> Result<Box<dyn SessionBufStream>> {
248 let use_sni = true;
249 let proxy_stream = proxy_config
250 .connect(context, hostname, port, strict_tls)
251 .await?;
252 let tls_stream = wrap_tls(
253 strict_tls,
254 hostname,
255 port,
256 use_sni,
257 alpn(port),
258 proxy_stream,
259 &context.tls_session_store,
260 &context.spki_hash_store,
261 &context.sql,
262 )
263 .await?;
264 let mut buffered_stream = BufStream::new(tls_stream);
265 skip_smtp_greeting(&mut buffered_stream).await?;
266 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
267 Ok(session_stream)
268}
269
270async fn connect_starttls_proxy(
271 context: &Context,
272 hostname: &str,
273 port: u16,
274 strict_tls: bool,
275 proxy_config: ProxyConfig,
276) -> Result<Box<dyn SessionBufStream>> {
277 let use_sni = false;
278 let proxy_stream = proxy_config
279 .connect(context, hostname, port, strict_tls)
280 .await?;
281
282 let mut buffered_stream = BufStream::new(proxy_stream);
284 skip_smtp_greeting(&mut buffered_stream).await?;
285 let transport = new_smtp_transport(buffered_stream).await?;
286 let tcp_stream = transport.starttls().await?.into_inner();
287 let tls_stream = wrap_tls(
288 strict_tls,
289 hostname,
290 port,
291 use_sni,
292 "",
293 tcp_stream,
294 &context.tls_session_store,
295 &context.spki_hash_store,
296 &context.sql,
297 )
298 .await
299 .context("STARTTLS upgrade failed")?;
300 let buffered_stream = BufStream::new(tls_stream);
301 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
302 Ok(session_stream)
303}
304
305async fn connect_insecure_proxy(
306 context: &Context,
307 hostname: &str,
308 port: u16,
309 proxy_config: ProxyConfig,
310) -> Result<Box<dyn SessionBufStream>> {
311 let proxy_stream = proxy_config.connect(context, hostname, port, false).await?;
312 let mut buffered_stream = BufStream::new(proxy_stream);
313 skip_smtp_greeting(&mut buffered_stream).await?;
314 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
315 Ok(session_stream)
316}
317
318async fn connect_secure(
319 addr: SocketAddr,
320 hostname: &str,
321 strict_tls: bool,
322 tls_session_store: &TlsSessionStore,
323 spki_hash_store: &SpkiHashStore,
324 sql: &Sql,
325) -> Result<Box<dyn SessionBufStream>> {
326 let tls_stream = connect_tls_inner(
327 addr,
328 hostname,
329 strict_tls,
330 alpn(addr.port()),
331 tls_session_store,
332 spki_hash_store,
333 sql,
334 )
335 .await?;
336 let mut buffered_stream = BufStream::new(tls_stream);
337 skip_smtp_greeting(&mut buffered_stream).await?;
338 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
339 Ok(session_stream)
340}
341
342async fn connect_starttls(
343 addr: SocketAddr,
344 host: &str,
345 strict_tls: bool,
346 tls_session_store: &TlsSessionStore,
347 spki_hash_store: &SpkiHashStore,
348 sql: &Sql,
349) -> Result<Box<dyn SessionBufStream>> {
350 let use_sni = false;
351 let tcp_stream = connect_tcp_inner(addr).await?;
352
353 let mut buffered_stream = BufStream::new(tcp_stream);
355 skip_smtp_greeting(&mut buffered_stream).await?;
356 let transport = new_smtp_transport(buffered_stream).await?;
357 let tcp_stream = transport.starttls().await?.into_inner();
358 let tls_stream = wrap_tls(
359 strict_tls,
360 host,
361 addr.port(),
362 use_sni,
363 "",
364 tcp_stream,
365 tls_session_store,
366 spki_hash_store,
367 sql,
368 )
369 .await
370 .context("STARTTLS upgrade failed")?;
371
372 let buffered_stream = BufStream::new(tls_stream);
373 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
374 Ok(session_stream)
375}
376
377async fn connect_insecure(addr: SocketAddr) -> Result<Box<dyn SessionBufStream>> {
378 let tcp_stream = connect_tcp_inner(addr).await?;
379 let mut buffered_stream = BufStream::new(tcp_stream);
380 skip_smtp_greeting(&mut buffered_stream).await?;
381 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
382 Ok(session_stream)
383}
384
385#[cfg(test)]
386mod tests {
387 use tokio::io::BufReader;
388
389 use super::*;
390
391 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
392 async fn test_skip_smtp_greeting() -> Result<()> {
393 let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\
394 220-We do not authorize the use of this system to transport unsolicited,\r\n\
395 220 and/or bulk e-mail.\r\n";
396 let mut buffered_stream = BufReader::new(&greeting[..]);
397 skip_smtp_greeting(&mut buffered_stream).await
398 }
399}