1use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::time::Duration;
6
7use anyhow::{format_err, Context as _, Result};
8use tokio::net::TcpStream;
9use tokio::task::JoinSet;
10use tokio::time::timeout;
11use tokio_io_timeout::TimeoutStream;
12
13use crate::context::Context;
14use crate::net::session::SessionStream;
15use crate::sql::Sql;
16use crate::tools::time;
17
18pub(crate) mod dns;
19pub(crate) mod http;
20pub(crate) mod proxy;
21pub(crate) mod session;
22pub(crate) mod tls;
23
24use dns::lookup_host_with_cache;
25pub use http::{read_url, read_url_blob, Response as HttpResponse};
26use tls::wrap_tls;
27
28pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
32
33pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
35
36pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
38 let now = time();
39 context
40 .sql
41 .execute(
42 "DELETE FROM connection_history
43 WHERE ? > timestamp + ?",
44 (now, CACHE_TTL),
45 )
46 .await?;
47 Ok(())
48}
49
50pub(crate) async fn update_connection_history(
59 context: &Context,
60 alpn: &str,
61 host: &str,
62 port: u16,
63 addr: &str,
64 now: i64,
65) -> Result<()> {
66 context
67 .sql
68 .execute(
69 "INSERT INTO connection_history (host, port, alpn, addr, timestamp)
70 VALUES (?, ?, ?, ?, ?)
71 ON CONFLICT (host, port, alpn, addr)
72 DO UPDATE SET timestamp=excluded.timestamp",
73 (host, port, alpn, addr, now),
74 )
75 .await?;
76 Ok(())
77}
78
79pub(crate) async fn load_connection_timestamp(
82 sql: &Sql,
83 alpn: &str,
84 host: &str,
85 port: u16,
86 addr: Option<&str>,
87) -> Result<Option<i64>> {
88 let timestamp = sql
89 .query_get_value(
90 "SELECT timestamp FROM connection_history
91 WHERE host = ?
92 AND port = ?
93 AND alpn = ?
94 AND addr = IFNULL(?, addr)",
95 (host, port, alpn, addr),
96 )
97 .await?;
98 Ok(timestamp)
99}
100
101pub(crate) async fn connect_tcp_inner(
107 addr: SocketAddr,
108) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
109 let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
110 .await
111 .context("connection timeout")?
112 .context("connection failure")?;
113
114 tcp_stream.set_nodelay(true)?;
116
117 let mut timeout_stream = TimeoutStream::new(tcp_stream);
118 timeout_stream.set_write_timeout(Some(TIMEOUT));
119 timeout_stream.set_read_timeout(Some(TIMEOUT));
120
121 Ok(Box::pin(timeout_stream))
122}
123
124pub(crate) async fn connect_tls_inner(
127 addr: SocketAddr,
128 host: &str,
129 strict_tls: bool,
130 alpn: &[&str],
131) -> Result<impl SessionStream> {
132 let tcp_stream = connect_tcp_inner(addr).await?;
133 let tls_stream = wrap_tls(strict_tls, host, alpn, tcp_stream).await?;
134 Ok(tls_stream)
135}
136
137pub(crate) async fn run_connection_attempts<O, I, F>(mut futures: I) -> Result<O>
148where
149 I: Iterator<Item = F>,
150 F: Future<Output = Result<O>> + Send + 'static,
151 O: Send + 'static,
152{
153 let mut connection_attempt_set = JoinSet::new();
154
155 let mut delay_set = JoinSet::new();
158 for delay in [
159 Duration::from_millis(300),
160 Duration::from_secs(1),
161 Duration::from_secs(5),
162 Duration::from_secs(10),
163 ] {
164 delay_set.spawn(tokio::time::sleep(delay));
165 }
166
167 let mut first_error = None;
168
169 let res = loop {
170 if let Some(fut) = futures.next() {
171 connection_attempt_set.spawn(fut);
172 }
173
174 tokio::select! {
175 biased;
176
177 res = connection_attempt_set.join_next() => {
178 match res {
179 Some(res) => {
180 match res.context("Failed to join task") {
181 Ok(Ok(conn)) => {
182 break Ok(conn);
184 }
185 Ok(Err(err)) => {
186 first_error.get_or_insert(err);
188 }
189 Err(err) => {
190 break Err(err);
191 }
192 }
193 }
194 None => {
195 break Err(
199 first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
200 );
201 }
202 }
203 },
204
205 _ = delay_set.join_next(), if !delay_set.is_empty() => {
206 }
211 }
212 };
213
214 connection_attempt_set.shutdown().await;
223
224 res
225}
226
227pub(crate) async fn connect_tcp(
234 context: &Context,
235 host: &str,
236 port: u16,
237 load_cache: bool,
238) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
239 let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
240 .await?
241 .into_iter()
242 .map(connect_tcp_inner);
243 run_connection_attempts(connection_futures).await
244}