1use std::net::SocketAddr;
4
5use anyhow::{bail, Context as _, Result};
6use async_smtp::{SmtpClient, SmtpTransport};
7use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
8
9use crate::context::Context;
10use crate::log::{info, warn};
11use crate::login_param::{ConnectionCandidate, ConnectionSecurity};
12use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
13use crate::net::proxy::ProxyConfig;
14use crate::net::session::SessionBufStream;
15use crate::net::tls::wrap_tls;
16use crate::net::{
17 connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
18};
19use crate::oauth2::get_oauth2_access_token;
20use crate::tools::time;
21
22fn alpn(port: u16) -> &'static [&'static str] {
24 if port == 465 {
25 &[]
27 } else {
28 &["smtp"]
29 }
30}
31
32async fn new_smtp_transport<S: AsyncBufRead + AsyncWrite + Unpin>(
35 stream: S,
36) -> Result<SmtpTransport<S>> {
37 let client = SmtpClient::new().smtp_utf8(true).without_greeting();
42
43 let transport = SmtpTransport::new(client, stream)
44 .await
45 .context("Failed to send EHLO command")?;
46 Ok(transport)
47}
48
49#[expect(clippy::too_many_arguments)]
50pub(crate) async fn connect_and_auth(
51 context: &Context,
52 proxy_config: &Option<ProxyConfig>,
53 strict_tls: bool,
54 candidate: ConnectionCandidate,
55 oauth2: bool,
56 addr: &str,
57 user: &str,
58 password: &str,
59) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
60 let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate)
61 .await
62 .context("SMTP failed to connect")?;
63 let mut transport = new_smtp_transport(session_stream).await?;
64
65 let (creds, mechanism) = if oauth2 {
67 let access_token = get_oauth2_access_token(context, addr, password, false)
69 .await
70 .context("SMTP failed to get OAUTH2 access token")?;
71 if access_token.is_none() {
72 bail!("SMTP OAuth 2 error {}", addr);
73 }
74 (
75 async_smtp::authentication::Credentials::new(
76 user.to_string(),
77 access_token.unwrap_or_default(),
78 ),
79 vec![async_smtp::authentication::Mechanism::Xoauth2],
80 )
81 } else {
82 (
84 async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()),
85 vec![
86 async_smtp::authentication::Mechanism::Plain,
87 async_smtp::authentication::Mechanism::Login,
88 ],
89 )
90 };
91 transport
92 .try_login(&creds, &mechanism)
93 .await
94 .context("SMTP failed to login")?;
95 Ok(transport)
96}
97
98async fn connection_attempt(
99 context: Context,
100 host: String,
101 security: ConnectionSecurity,
102 resolved_addr: SocketAddr,
103 strict_tls: bool,
104) -> Result<Box<dyn SessionBufStream>> {
105 let context = &context;
106 let host = &host;
107 info!(
108 context,
109 "Attempting SMTP connection to {host} ({resolved_addr})."
110 );
111 let res = match security {
112 ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await,
113 ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await,
114 ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
115 };
116 match res {
117 Ok(stream) => {
118 let ip_addr = resolved_addr.ip().to_string();
119 let port = resolved_addr.port();
120
121 let save_cache = match security {
122 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
123 ConnectionSecurity::Plain => false,
124 };
125 if save_cache {
126 update_connect_timestamp(context, host, &ip_addr).await?;
127 }
128 update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
129 Ok(stream)
130 }
131 Err(err) => {
132 warn!(
133 context,
134 "Failed to connect to {host} ({resolved_addr}): {err:#}."
135 );
136 Err(err)
137 }
138 }
139}
140
141async fn connect_stream(
150 context: &Context,
151 proxy_config: Option<ProxyConfig>,
152 strict_tls: bool,
153 candidate: ConnectionCandidate,
154) -> Result<Box<dyn SessionBufStream>> {
155 let host = &candidate.host;
156 let port = candidate.port;
157 let security = candidate.security;
158
159 if let Some(proxy_config) = proxy_config {
160 let stream = match security {
161 ConnectionSecurity::Tls => {
162 connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await?
163 }
164 ConnectionSecurity::Starttls => {
165 connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone())
166 .await?
167 }
168 ConnectionSecurity::Plain => {
169 connect_insecure_proxy(context, host, port, proxy_config.clone()).await?
170 }
171 };
172 update_connection_history(context, "smtp", host, port, host, time()).await?;
173 Ok(stream)
174 } else {
175 let load_cache = match security {
176 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
177 ConnectionSecurity::Plain => false,
178 };
179
180 let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
181 .await?
182 .into_iter()
183 .map(|resolved_addr| {
184 let context = context.clone();
185 let host = host.to_string();
186 connection_attempt(context, host, security, resolved_addr, strict_tls)
187 });
188 run_connection_attempts(connection_futures).await
189 }
190}
191
192async fn skip_smtp_greeting<R: tokio::io::AsyncBufReadExt + Unpin>(stream: &mut R) -> Result<()> {
199 let mut line = String::with_capacity(512);
200 loop {
201 line.clear();
202 let read = stream
203 .read_line(&mut line)
204 .await
205 .context("Failed to read from stream while waiting for SMTP greeting")?;
206 if read == 0 {
207 bail!("Unexpected EOF while reading SMTP greeting");
208 }
209 if line.starts_with("220-") {
210 continue;
211 } else if line.starts_with("220 ") {
212 return Ok(());
213 } else {
214 bail!("Unexpected greeting: {line:?}");
215 }
216 }
217}
218
219async fn connect_secure_proxy(
220 context: &Context,
221 hostname: &str,
222 port: u16,
223 strict_tls: bool,
224 proxy_config: ProxyConfig,
225) -> Result<Box<dyn SessionBufStream>> {
226 let proxy_stream = proxy_config
227 .connect(context, hostname, port, strict_tls)
228 .await?;
229 let tls_stream = wrap_tls(strict_tls, hostname, alpn(port), proxy_stream).await?;
230 let mut buffered_stream = BufStream::new(tls_stream);
231 skip_smtp_greeting(&mut buffered_stream).await?;
232 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
233 Ok(session_stream)
234}
235
236async fn connect_starttls_proxy(
237 context: &Context,
238 hostname: &str,
239 port: u16,
240 strict_tls: bool,
241 proxy_config: ProxyConfig,
242) -> Result<Box<dyn SessionBufStream>> {
243 let proxy_stream = proxy_config
244 .connect(context, hostname, port, strict_tls)
245 .await?;
246
247 let mut buffered_stream = BufStream::new(proxy_stream);
249 skip_smtp_greeting(&mut buffered_stream).await?;
250 let transport = new_smtp_transport(buffered_stream).await?;
251 let tcp_stream = transport.starttls().await?.into_inner();
252 let tls_stream = wrap_tls(strict_tls, hostname, &[], tcp_stream)
253 .await
254 .context("STARTTLS upgrade failed")?;
255 let buffered_stream = BufStream::new(tls_stream);
256 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
257 Ok(session_stream)
258}
259
260async fn connect_insecure_proxy(
261 context: &Context,
262 hostname: &str,
263 port: u16,
264 proxy_config: ProxyConfig,
265) -> Result<Box<dyn SessionBufStream>> {
266 let proxy_stream = proxy_config.connect(context, hostname, port, false).await?;
267 let mut buffered_stream = BufStream::new(proxy_stream);
268 skip_smtp_greeting(&mut buffered_stream).await?;
269 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
270 Ok(session_stream)
271}
272
273async fn connect_secure(
274 addr: SocketAddr,
275 hostname: &str,
276 strict_tls: bool,
277) -> Result<Box<dyn SessionBufStream>> {
278 let tls_stream = connect_tls_inner(addr, hostname, strict_tls, alpn(addr.port())).await?;
279 let mut buffered_stream = BufStream::new(tls_stream);
280 skip_smtp_greeting(&mut buffered_stream).await?;
281 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
282 Ok(session_stream)
283}
284
285async fn connect_starttls(
286 addr: SocketAddr,
287 host: &str,
288 strict_tls: bool,
289) -> Result<Box<dyn SessionBufStream>> {
290 let tcp_stream = connect_tcp_inner(addr).await?;
291
292 let mut buffered_stream = BufStream::new(tcp_stream);
294 skip_smtp_greeting(&mut buffered_stream).await?;
295 let transport = new_smtp_transport(buffered_stream).await?;
296 let tcp_stream = transport.starttls().await?.into_inner();
297 let tls_stream = wrap_tls(strict_tls, host, &[], tcp_stream)
298 .await
299 .context("STARTTLS upgrade failed")?;
300
301 let buffered_stream = BufStream::new(tls_stream);
302 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
303 Ok(session_stream)
304}
305
306async fn connect_insecure(addr: SocketAddr) -> Result<Box<dyn SessionBufStream>> {
307 let tcp_stream = connect_tcp_inner(addr).await?;
308 let mut buffered_stream = BufStream::new(tcp_stream);
309 skip_smtp_greeting(&mut buffered_stream).await?;
310 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
311 Ok(session_stream)
312}
313
314#[cfg(test)]
315mod tests {
316 use tokio::io::BufReader;
317
318 use super::*;
319
320 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
321 async fn test_skip_smtp_greeting() -> Result<()> {
322 let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\
323 220-We do not authorize the use of this system to transport unsolicited,\r\n\
324 220 and/or bulk e-mail.\r\n";
325 let mut buffered_stream = BufReader::new(&greeting[..]);
326 skip_smtp_greeting(&mut buffered_stream).await
327 }
328}