1use std::net::SocketAddr;
4
5use anyhow::{bail, Context as _, Result};
6use async_smtp::{SmtpClient, SmtpTransport};
7use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
8
9use crate::context::Context;
10use crate::login_param::{ConnectionCandidate, ConnectionSecurity};
11use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
12use crate::net::proxy::ProxyConfig;
13use crate::net::session::SessionBufStream;
14use crate::net::tls::wrap_tls;
15use crate::net::{
16 connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
17};
18use crate::oauth2::get_oauth2_access_token;
19use crate::tools::time;
20
21fn alpn(port: u16) -> &'static [&'static str] {
23 if port == 465 {
24 &[]
26 } else {
27 &["smtp"]
28 }
29}
30
31async fn new_smtp_transport<S: AsyncBufRead + AsyncWrite + Unpin>(
34 stream: S,
35) -> Result<SmtpTransport<S>> {
36 let client = SmtpClient::new().smtp_utf8(true).without_greeting();
41
42 let transport = SmtpTransport::new(client, stream)
43 .await
44 .context("Failed to send EHLO command")?;
45 Ok(transport)
46}
47
48#[expect(clippy::too_many_arguments)]
49pub(crate) async fn connect_and_auth(
50 context: &Context,
51 proxy_config: &Option<ProxyConfig>,
52 strict_tls: bool,
53 candidate: ConnectionCandidate,
54 oauth2: bool,
55 addr: &str,
56 user: &str,
57 password: &str,
58) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
59 let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate)
60 .await
61 .context("SMTP failed to connect")?;
62 let mut transport = new_smtp_transport(session_stream).await?;
63
64 let (creds, mechanism) = if oauth2 {
66 let access_token = get_oauth2_access_token(context, addr, password, false)
68 .await
69 .context("SMTP failed to get OAUTH2 access token")?;
70 if access_token.is_none() {
71 bail!("SMTP OAuth 2 error {}", addr);
72 }
73 (
74 async_smtp::authentication::Credentials::new(
75 user.to_string(),
76 access_token.unwrap_or_default(),
77 ),
78 vec![async_smtp::authentication::Mechanism::Xoauth2],
79 )
80 } else {
81 (
83 async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()),
84 vec![
85 async_smtp::authentication::Mechanism::Plain,
86 async_smtp::authentication::Mechanism::Login,
87 ],
88 )
89 };
90 transport
91 .try_login(&creds, &mechanism)
92 .await
93 .context("SMTP failed to login")?;
94 Ok(transport)
95}
96
97async fn connection_attempt(
98 context: Context,
99 host: String,
100 security: ConnectionSecurity,
101 resolved_addr: SocketAddr,
102 strict_tls: bool,
103) -> Result<Box<dyn SessionBufStream>> {
104 let context = &context;
105 let host = &host;
106 info!(
107 context,
108 "Attempting SMTP connection to {host} ({resolved_addr})."
109 );
110 let res = match security {
111 ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await,
112 ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await,
113 ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
114 };
115 match res {
116 Ok(stream) => {
117 let ip_addr = resolved_addr.ip().to_string();
118 let port = resolved_addr.port();
119
120 let save_cache = match security {
121 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
122 ConnectionSecurity::Plain => false,
123 };
124 if save_cache {
125 update_connect_timestamp(context, host, &ip_addr).await?;
126 }
127 update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
128 Ok(stream)
129 }
130 Err(err) => {
131 warn!(
132 context,
133 "Failed to connect to {host} ({resolved_addr}): {err:#}."
134 );
135 Err(err)
136 }
137 }
138}
139
140async fn connect_stream(
149 context: &Context,
150 proxy_config: Option<ProxyConfig>,
151 strict_tls: bool,
152 candidate: ConnectionCandidate,
153) -> Result<Box<dyn SessionBufStream>> {
154 let host = &candidate.host;
155 let port = candidate.port;
156 let security = candidate.security;
157
158 if let Some(proxy_config) = proxy_config {
159 let stream = match security {
160 ConnectionSecurity::Tls => {
161 connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await?
162 }
163 ConnectionSecurity::Starttls => {
164 connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone())
165 .await?
166 }
167 ConnectionSecurity::Plain => {
168 connect_insecure_proxy(context, host, port, proxy_config.clone()).await?
169 }
170 };
171 update_connection_history(context, "smtp", host, port, host, time()).await?;
172 Ok(stream)
173 } else {
174 let load_cache = match security {
175 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
176 ConnectionSecurity::Plain => false,
177 };
178
179 let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
180 .await?
181 .into_iter()
182 .map(|resolved_addr| {
183 let context = context.clone();
184 let host = host.to_string();
185 connection_attempt(context, host, security, resolved_addr, strict_tls)
186 });
187 run_connection_attempts(connection_futures).await
188 }
189}
190
191async fn skip_smtp_greeting<R: tokio::io::AsyncBufReadExt + Unpin>(stream: &mut R) -> Result<()> {
198 let mut line = String::with_capacity(512);
199 loop {
200 line.clear();
201 let read = stream
202 .read_line(&mut line)
203 .await
204 .context("Failed to read from stream while waiting for SMTP greeting")?;
205 if read == 0 {
206 bail!("Unexpected EOF while reading SMTP greeting");
207 }
208 if line.starts_with("220-") {
209 continue;
210 } else if line.starts_with("220 ") {
211 return Ok(());
212 } else {
213 bail!("Unexpected greeting: {line:?}");
214 }
215 }
216}
217
218async fn connect_secure_proxy(
219 context: &Context,
220 hostname: &str,
221 port: u16,
222 strict_tls: bool,
223 proxy_config: ProxyConfig,
224) -> Result<Box<dyn SessionBufStream>> {
225 let proxy_stream = proxy_config
226 .connect(context, hostname, port, strict_tls)
227 .await?;
228 let tls_stream = wrap_tls(strict_tls, hostname, alpn(port), proxy_stream).await?;
229 let mut buffered_stream = BufStream::new(tls_stream);
230 skip_smtp_greeting(&mut buffered_stream).await?;
231 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
232 Ok(session_stream)
233}
234
235async fn connect_starttls_proxy(
236 context: &Context,
237 hostname: &str,
238 port: u16,
239 strict_tls: bool,
240 proxy_config: ProxyConfig,
241) -> Result<Box<dyn SessionBufStream>> {
242 let proxy_stream = proxy_config
243 .connect(context, hostname, port, strict_tls)
244 .await?;
245
246 let mut buffered_stream = BufStream::new(proxy_stream);
248 skip_smtp_greeting(&mut buffered_stream).await?;
249 let transport = new_smtp_transport(buffered_stream).await?;
250 let tcp_stream = transport.starttls().await?.into_inner();
251 let tls_stream = wrap_tls(strict_tls, hostname, &[], tcp_stream)
252 .await
253 .context("STARTTLS upgrade failed")?;
254 let buffered_stream = BufStream::new(tls_stream);
255 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
256 Ok(session_stream)
257}
258
259async fn connect_insecure_proxy(
260 context: &Context,
261 hostname: &str,
262 port: u16,
263 proxy_config: ProxyConfig,
264) -> Result<Box<dyn SessionBufStream>> {
265 let proxy_stream = proxy_config.connect(context, hostname, port, false).await?;
266 let mut buffered_stream = BufStream::new(proxy_stream);
267 skip_smtp_greeting(&mut buffered_stream).await?;
268 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
269 Ok(session_stream)
270}
271
272async fn connect_secure(
273 addr: SocketAddr,
274 hostname: &str,
275 strict_tls: bool,
276) -> Result<Box<dyn SessionBufStream>> {
277 let tls_stream = connect_tls_inner(addr, hostname, strict_tls, alpn(addr.port())).await?;
278 let mut buffered_stream = BufStream::new(tls_stream);
279 skip_smtp_greeting(&mut buffered_stream).await?;
280 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
281 Ok(session_stream)
282}
283
284async fn connect_starttls(
285 addr: SocketAddr,
286 host: &str,
287 strict_tls: bool,
288) -> Result<Box<dyn SessionBufStream>> {
289 let tcp_stream = connect_tcp_inner(addr).await?;
290
291 let mut buffered_stream = BufStream::new(tcp_stream);
293 skip_smtp_greeting(&mut buffered_stream).await?;
294 let transport = new_smtp_transport(buffered_stream).await?;
295 let tcp_stream = transport.starttls().await?.into_inner();
296 let tls_stream = wrap_tls(strict_tls, host, &[], tcp_stream)
297 .await
298 .context("STARTTLS upgrade failed")?;
299
300 let buffered_stream = BufStream::new(tls_stream);
301 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
302 Ok(session_stream)
303}
304
305async fn connect_insecure(addr: SocketAddr) -> Result<Box<dyn SessionBufStream>> {
306 let tcp_stream = connect_tcp_inner(addr).await?;
307 let mut buffered_stream = BufStream::new(tcp_stream);
308 skip_smtp_greeting(&mut buffered_stream).await?;
309 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
310 Ok(session_stream)
311}
312
313#[cfg(test)]
314mod tests {
315 use tokio::io::BufReader;
316
317 use super::*;
318
319 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
320 async fn test_skip_smtp_greeting() -> Result<()> {
321 let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\
322 220-We do not authorize the use of this system to transport unsolicited,\r\n\
323 220 and/or bulk e-mail.\r\n";
324 let mut buffered_stream = BufReader::new(&greeting[..]);
325 skip_smtp_greeting(&mut buffered_stream).await
326 }
327}