1use std::net::SocketAddr;
4
5use anyhow::{Context as _, Result, bail};
6use async_smtp::{SmtpClient, SmtpTransport};
7use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
8
9use crate::context::Context;
10use crate::log::warn;
11use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
12use crate::net::proxy::ProxyConfig;
13use crate::net::session::SessionBufStream;
14use crate::net::tls::{TlsSessionStore, wrap_tls};
15use crate::net::{
16 connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
17};
18use crate::oauth2::get_oauth2_access_token;
19use crate::tools::time;
20use crate::transport::ConnectionCandidate;
21use crate::transport::ConnectionSecurity;
22
23fn alpn(port: u16) -> &'static str {
25 if port == 465 {
26 ""
28 } else {
29 "smtp"
30 }
31}
32
33async fn new_smtp_transport<S: AsyncBufRead + AsyncWrite + Unpin>(
36 stream: S,
37) -> Result<SmtpTransport<S>> {
38 let client = SmtpClient::new().smtp_utf8(true).without_greeting();
43
44 let transport = SmtpTransport::new(client, stream)
45 .await
46 .context("Failed to send EHLO command")?;
47 Ok(transport)
48}
49
50#[expect(clippy::too_many_arguments)]
51pub(crate) async fn connect_and_auth(
52 context: &Context,
53 proxy_config: &Option<ProxyConfig>,
54 strict_tls: bool,
55 candidate: ConnectionCandidate,
56 oauth2: bool,
57 addr: &str,
58 user: &str,
59 password: &str,
60) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
61 let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate)
62 .await
63 .context("SMTP failed to connect")?;
64 let mut transport = new_smtp_transport(session_stream).await?;
65
66 let (creds, mechanism) = if oauth2 {
68 let access_token = get_oauth2_access_token(context, addr, password, false)
70 .await
71 .context("SMTP failed to get OAUTH2 access token")?;
72 if access_token.is_none() {
73 bail!("SMTP OAuth 2 error {addr}");
74 }
75 (
76 async_smtp::authentication::Credentials::new(
77 user.to_string(),
78 access_token.unwrap_or_default(),
79 ),
80 vec![async_smtp::authentication::Mechanism::Xoauth2],
81 )
82 } else {
83 (
85 async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()),
86 vec![
87 async_smtp::authentication::Mechanism::Plain,
88 async_smtp::authentication::Mechanism::Login,
89 ],
90 )
91 };
92 transport
93 .try_login(&creds, &mechanism)
94 .await
95 .context("SMTP failed to login")?;
96 Ok(transport)
97}
98
99async fn connection_attempt(
100 context: Context,
101 host: String,
102 security: ConnectionSecurity,
103 resolved_addr: SocketAddr,
104 strict_tls: bool,
105) -> Result<Box<dyn SessionBufStream>> {
106 let context = &context;
107 let host = &host;
108 info!(
109 context,
110 "Attempting SMTP connection to {host} ({resolved_addr})."
111 );
112 let res = match security {
113 ConnectionSecurity::Tls => {
114 connect_secure(resolved_addr, host, strict_tls, &context.tls_session_store).await
115 }
116 ConnectionSecurity::Starttls => {
117 connect_starttls(resolved_addr, host, strict_tls, &context.tls_session_store).await
118 }
119 ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
120 };
121 match res {
122 Ok(stream) => {
123 let ip_addr = resolved_addr.ip().to_string();
124 let port = resolved_addr.port();
125
126 let save_cache = match security {
127 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
128 ConnectionSecurity::Plain => false,
129 };
130 if save_cache {
131 update_connect_timestamp(context, host, &ip_addr).await?;
132 }
133 update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
134 Ok(stream)
135 }
136 Err(err) => {
137 warn!(
138 context,
139 "Failed to connect to {host} ({resolved_addr}): {err:#}."
140 );
141 Err(err)
142 }
143 }
144}
145
146async fn connect_stream(
155 context: &Context,
156 proxy_config: Option<ProxyConfig>,
157 strict_tls: bool,
158 candidate: ConnectionCandidate,
159) -> Result<Box<dyn SessionBufStream>> {
160 let host = &candidate.host;
161 let port = candidate.port;
162 let security = candidate.security;
163
164 if let Some(proxy_config) = proxy_config {
165 let stream = match security {
166 ConnectionSecurity::Tls => {
167 connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await?
168 }
169 ConnectionSecurity::Starttls => {
170 connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone())
171 .await?
172 }
173 ConnectionSecurity::Plain => {
174 connect_insecure_proxy(context, host, port, proxy_config.clone()).await?
175 }
176 };
177 update_connection_history(context, "smtp", host, port, host, time()).await?;
178 Ok(stream)
179 } else {
180 let load_cache = match security {
181 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
182 ConnectionSecurity::Plain => false,
183 };
184
185 let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
186 .await?
187 .into_iter()
188 .map(|resolved_addr| {
189 let context = context.clone();
190 let host = host.to_string();
191 connection_attempt(context, host, security, resolved_addr, strict_tls)
192 });
193 run_connection_attempts(connection_futures).await
194 }
195}
196
197async fn skip_smtp_greeting<R: tokio::io::AsyncBufReadExt + Unpin>(stream: &mut R) -> Result<()> {
204 let mut line = String::with_capacity(512);
205 loop {
206 line.clear();
207 let read = stream
208 .read_line(&mut line)
209 .await
210 .context("Failed to read from stream while waiting for SMTP greeting")?;
211 if read == 0 {
212 bail!("Unexpected EOF while reading SMTP greeting");
213 }
214 if line.starts_with("220-") {
215 continue;
216 } else if line.starts_with("220 ") {
217 return Ok(());
218 } else {
219 bail!("Unexpected greeting: {line:?}");
220 }
221 }
222}
223
224async fn connect_secure_proxy(
225 context: &Context,
226 hostname: &str,
227 port: u16,
228 strict_tls: bool,
229 proxy_config: ProxyConfig,
230) -> Result<Box<dyn SessionBufStream>> {
231 let use_sni = true;
232 let proxy_stream = proxy_config
233 .connect(context, hostname, port, strict_tls)
234 .await?;
235 let tls_stream = wrap_tls(
236 strict_tls,
237 hostname,
238 port,
239 use_sni,
240 alpn(port),
241 proxy_stream,
242 &context.tls_session_store,
243 )
244 .await?;
245 let mut buffered_stream = BufStream::new(tls_stream);
246 skip_smtp_greeting(&mut buffered_stream).await?;
247 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
248 Ok(session_stream)
249}
250
251async fn connect_starttls_proxy(
252 context: &Context,
253 hostname: &str,
254 port: u16,
255 strict_tls: bool,
256 proxy_config: ProxyConfig,
257) -> Result<Box<dyn SessionBufStream>> {
258 let use_sni = false;
259 let proxy_stream = proxy_config
260 .connect(context, hostname, port, strict_tls)
261 .await?;
262
263 let mut buffered_stream = BufStream::new(proxy_stream);
265 skip_smtp_greeting(&mut buffered_stream).await?;
266 let transport = new_smtp_transport(buffered_stream).await?;
267 let tcp_stream = transport.starttls().await?.into_inner();
268 let tls_stream = wrap_tls(
269 strict_tls,
270 hostname,
271 port,
272 use_sni,
273 "",
274 tcp_stream,
275 &context.tls_session_store,
276 )
277 .await
278 .context("STARTTLS upgrade failed")?;
279 let buffered_stream = BufStream::new(tls_stream);
280 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
281 Ok(session_stream)
282}
283
284async fn connect_insecure_proxy(
285 context: &Context,
286 hostname: &str,
287 port: u16,
288 proxy_config: ProxyConfig,
289) -> Result<Box<dyn SessionBufStream>> {
290 let proxy_stream = proxy_config.connect(context, hostname, port, false).await?;
291 let mut buffered_stream = BufStream::new(proxy_stream);
292 skip_smtp_greeting(&mut buffered_stream).await?;
293 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
294 Ok(session_stream)
295}
296
297async fn connect_secure(
298 addr: SocketAddr,
299 hostname: &str,
300 strict_tls: bool,
301 tls_session_store: &TlsSessionStore,
302) -> Result<Box<dyn SessionBufStream>> {
303 let tls_stream = connect_tls_inner(
304 addr,
305 hostname,
306 strict_tls,
307 alpn(addr.port()),
308 tls_session_store,
309 )
310 .await?;
311 let mut buffered_stream = BufStream::new(tls_stream);
312 skip_smtp_greeting(&mut buffered_stream).await?;
313 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
314 Ok(session_stream)
315}
316
317async fn connect_starttls(
318 addr: SocketAddr,
319 host: &str,
320 strict_tls: bool,
321 tls_session_store: &TlsSessionStore,
322) -> Result<Box<dyn SessionBufStream>> {
323 let use_sni = false;
324 let tcp_stream = connect_tcp_inner(addr).await?;
325
326 let mut buffered_stream = BufStream::new(tcp_stream);
328 skip_smtp_greeting(&mut buffered_stream).await?;
329 let transport = new_smtp_transport(buffered_stream).await?;
330 let tcp_stream = transport.starttls().await?.into_inner();
331 let tls_stream = wrap_tls(
332 strict_tls,
333 host,
334 addr.port(),
335 use_sni,
336 "",
337 tcp_stream,
338 tls_session_store,
339 )
340 .await
341 .context("STARTTLS upgrade failed")?;
342
343 let buffered_stream = BufStream::new(tls_stream);
344 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
345 Ok(session_stream)
346}
347
348async fn connect_insecure(addr: SocketAddr) -> Result<Box<dyn SessionBufStream>> {
349 let tcp_stream = connect_tcp_inner(addr).await?;
350 let mut buffered_stream = BufStream::new(tcp_stream);
351 skip_smtp_greeting(&mut buffered_stream).await?;
352 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
353 Ok(session_stream)
354}
355
356#[cfg(test)]
357mod tests {
358 use tokio::io::BufReader;
359
360 use super::*;
361
362 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
363 async fn test_skip_smtp_greeting() -> Result<()> {
364 let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\
365 220-We do not authorize the use of this system to transport unsolicited,\r\n\
366 220 and/or bulk e-mail.\r\n";
367 let mut buffered_stream = BufReader::new(&greeting[..]);
368 skip_smtp_greeting(&mut buffered_stream).await
369 }
370}