1use std::net::SocketAddr;
4
5use anyhow::{Context as _, Result, bail};
6use async_smtp::{SmtpClient, SmtpTransport};
7use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
8
9use crate::context::Context;
10use crate::log::{info, warn};
11use crate::login_param::{ConnectionCandidate, ConnectionSecurity};
12use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
13use crate::net::proxy::ProxyConfig;
14use crate::net::session::SessionBufStream;
15use crate::net::tls::{TlsSessionStore, wrap_tls};
16use crate::net::{
17 connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
18};
19use crate::oauth2::get_oauth2_access_token;
20use crate::tools::time;
21
22fn alpn(port: u16) -> &'static str {
24 if port == 465 {
25 ""
27 } else {
28 "smtp"
29 }
30}
31
32async fn new_smtp_transport<S: AsyncBufRead + AsyncWrite + Unpin>(
35 stream: S,
36) -> Result<SmtpTransport<S>> {
37 let client = SmtpClient::new().smtp_utf8(true).without_greeting();
42
43 let transport = SmtpTransport::new(client, stream)
44 .await
45 .context("Failed to send EHLO command")?;
46 Ok(transport)
47}
48
49#[expect(clippy::too_many_arguments)]
50pub(crate) async fn connect_and_auth(
51 context: &Context,
52 proxy_config: &Option<ProxyConfig>,
53 strict_tls: bool,
54 candidate: ConnectionCandidate,
55 oauth2: bool,
56 addr: &str,
57 user: &str,
58 password: &str,
59) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
60 let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate)
61 .await
62 .context("SMTP failed to connect")?;
63 let mut transport = new_smtp_transport(session_stream).await?;
64
65 let (creds, mechanism) = if oauth2 {
67 let access_token = get_oauth2_access_token(context, addr, password, false)
69 .await
70 .context("SMTP failed to get OAUTH2 access token")?;
71 if access_token.is_none() {
72 bail!("SMTP OAuth 2 error {addr}");
73 }
74 (
75 async_smtp::authentication::Credentials::new(
76 user.to_string(),
77 access_token.unwrap_or_default(),
78 ),
79 vec![async_smtp::authentication::Mechanism::Xoauth2],
80 )
81 } else {
82 (
84 async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()),
85 vec![
86 async_smtp::authentication::Mechanism::Plain,
87 async_smtp::authentication::Mechanism::Login,
88 ],
89 )
90 };
91 transport
92 .try_login(&creds, &mechanism)
93 .await
94 .context("SMTP failed to login")?;
95 Ok(transport)
96}
97
98async fn connection_attempt(
99 context: Context,
100 host: String,
101 security: ConnectionSecurity,
102 resolved_addr: SocketAddr,
103 strict_tls: bool,
104) -> Result<Box<dyn SessionBufStream>> {
105 let context = &context;
106 let host = &host;
107 info!(
108 context,
109 "Attempting SMTP connection to {host} ({resolved_addr})."
110 );
111 let res = match security {
112 ConnectionSecurity::Tls => {
113 connect_secure(resolved_addr, host, strict_tls, &context.tls_session_store).await
114 }
115 ConnectionSecurity::Starttls => {
116 connect_starttls(resolved_addr, host, strict_tls, &context.tls_session_store).await
117 }
118 ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
119 };
120 match res {
121 Ok(stream) => {
122 let ip_addr = resolved_addr.ip().to_string();
123 let port = resolved_addr.port();
124
125 let save_cache = match security {
126 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
127 ConnectionSecurity::Plain => false,
128 };
129 if save_cache {
130 update_connect_timestamp(context, host, &ip_addr).await?;
131 }
132 update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
133 Ok(stream)
134 }
135 Err(err) => {
136 warn!(
137 context,
138 "Failed to connect to {host} ({resolved_addr}): {err:#}."
139 );
140 Err(err)
141 }
142 }
143}
144
145async fn connect_stream(
154 context: &Context,
155 proxy_config: Option<ProxyConfig>,
156 strict_tls: bool,
157 candidate: ConnectionCandidate,
158) -> Result<Box<dyn SessionBufStream>> {
159 let host = &candidate.host;
160 let port = candidate.port;
161 let security = candidate.security;
162
163 if let Some(proxy_config) = proxy_config {
164 let stream = match security {
165 ConnectionSecurity::Tls => {
166 connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await?
167 }
168 ConnectionSecurity::Starttls => {
169 connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone())
170 .await?
171 }
172 ConnectionSecurity::Plain => {
173 connect_insecure_proxy(context, host, port, proxy_config.clone()).await?
174 }
175 };
176 update_connection_history(context, "smtp", host, port, host, time()).await?;
177 Ok(stream)
178 } else {
179 let load_cache = match security {
180 ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
181 ConnectionSecurity::Plain => false,
182 };
183
184 let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
185 .await?
186 .into_iter()
187 .map(|resolved_addr| {
188 let context = context.clone();
189 let host = host.to_string();
190 connection_attempt(context, host, security, resolved_addr, strict_tls)
191 });
192 run_connection_attempts(connection_futures).await
193 }
194}
195
196async fn skip_smtp_greeting<R: tokio::io::AsyncBufReadExt + Unpin>(stream: &mut R) -> Result<()> {
203 let mut line = String::with_capacity(512);
204 loop {
205 line.clear();
206 let read = stream
207 .read_line(&mut line)
208 .await
209 .context("Failed to read from stream while waiting for SMTP greeting")?;
210 if read == 0 {
211 bail!("Unexpected EOF while reading SMTP greeting");
212 }
213 if line.starts_with("220-") {
214 continue;
215 } else if line.starts_with("220 ") {
216 return Ok(());
217 } else {
218 bail!("Unexpected greeting: {line:?}");
219 }
220 }
221}
222
223async fn connect_secure_proxy(
224 context: &Context,
225 hostname: &str,
226 port: u16,
227 strict_tls: bool,
228 proxy_config: ProxyConfig,
229) -> Result<Box<dyn SessionBufStream>> {
230 let proxy_stream = proxy_config
231 .connect(context, hostname, port, strict_tls)
232 .await?;
233 let tls_stream = wrap_tls(
234 strict_tls,
235 hostname,
236 port,
237 alpn(port),
238 proxy_stream,
239 &context.tls_session_store,
240 )
241 .await?;
242 let mut buffered_stream = BufStream::new(tls_stream);
243 skip_smtp_greeting(&mut buffered_stream).await?;
244 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
245 Ok(session_stream)
246}
247
248async fn connect_starttls_proxy(
249 context: &Context,
250 hostname: &str,
251 port: u16,
252 strict_tls: bool,
253 proxy_config: ProxyConfig,
254) -> Result<Box<dyn SessionBufStream>> {
255 let proxy_stream = proxy_config
256 .connect(context, hostname, port, strict_tls)
257 .await?;
258
259 let mut buffered_stream = BufStream::new(proxy_stream);
261 skip_smtp_greeting(&mut buffered_stream).await?;
262 let transport = new_smtp_transport(buffered_stream).await?;
263 let tcp_stream = transport.starttls().await?.into_inner();
264 let tls_stream = wrap_tls(
265 strict_tls,
266 hostname,
267 port,
268 "",
269 tcp_stream,
270 &context.tls_session_store,
271 )
272 .await
273 .context("STARTTLS upgrade failed")?;
274 let buffered_stream = BufStream::new(tls_stream);
275 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
276 Ok(session_stream)
277}
278
279async fn connect_insecure_proxy(
280 context: &Context,
281 hostname: &str,
282 port: u16,
283 proxy_config: ProxyConfig,
284) -> Result<Box<dyn SessionBufStream>> {
285 let proxy_stream = proxy_config.connect(context, hostname, port, false).await?;
286 let mut buffered_stream = BufStream::new(proxy_stream);
287 skip_smtp_greeting(&mut buffered_stream).await?;
288 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
289 Ok(session_stream)
290}
291
292async fn connect_secure(
293 addr: SocketAddr,
294 hostname: &str,
295 strict_tls: bool,
296 tls_session_store: &TlsSessionStore,
297) -> Result<Box<dyn SessionBufStream>> {
298 let tls_stream = connect_tls_inner(
299 addr,
300 hostname,
301 strict_tls,
302 alpn(addr.port()),
303 tls_session_store,
304 )
305 .await?;
306 let mut buffered_stream = BufStream::new(tls_stream);
307 skip_smtp_greeting(&mut buffered_stream).await?;
308 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
309 Ok(session_stream)
310}
311
312async fn connect_starttls(
313 addr: SocketAddr,
314 host: &str,
315 strict_tls: bool,
316 tls_session_store: &TlsSessionStore,
317) -> Result<Box<dyn SessionBufStream>> {
318 let tcp_stream = connect_tcp_inner(addr).await?;
319
320 let mut buffered_stream = BufStream::new(tcp_stream);
322 skip_smtp_greeting(&mut buffered_stream).await?;
323 let transport = new_smtp_transport(buffered_stream).await?;
324 let tcp_stream = transport.starttls().await?.into_inner();
325 let tls_stream = wrap_tls(
326 strict_tls,
327 host,
328 addr.port(),
329 "",
330 tcp_stream,
331 tls_session_store,
332 )
333 .await
334 .context("STARTTLS upgrade failed")?;
335
336 let buffered_stream = BufStream::new(tls_stream);
337 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
338 Ok(session_stream)
339}
340
341async fn connect_insecure(addr: SocketAddr) -> Result<Box<dyn SessionBufStream>> {
342 let tcp_stream = connect_tcp_inner(addr).await?;
343 let mut buffered_stream = BufStream::new(tcp_stream);
344 skip_smtp_greeting(&mut buffered_stream).await?;
345 let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
346 Ok(session_stream)
347}
348
349#[cfg(test)]
350mod tests {
351 use tokio::io::BufReader;
352
353 use super::*;
354
355 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
356 async fn test_skip_smtp_greeting() -> Result<()> {
357 let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\
358 220-We do not authorize the use of this system to transport unsolicited,\r\n\
359 220 and/or bulk e-mail.\r\n";
360 let mut buffered_stream = BufReader::new(&greeting[..]);
361 skip_smtp_greeting(&mut buffered_stream).await
362 }
363}