1use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, format_err};
8use tokio::net::TcpStream;
9use tokio::task::JoinSet;
10use tokio::time::timeout;
11use tokio_io_timeout::TimeoutStream;
12
13use crate::context::Context;
14use crate::net::session::SessionStream;
15use crate::net::tls::TlsSessionStore;
16use crate::sql::Sql;
17use crate::tools::time;
18
19pub(crate) mod dns;
20pub(crate) mod http;
21pub(crate) mod proxy;
22pub(crate) mod session;
23pub(crate) mod tls;
24
25use dns::lookup_host_with_cache;
26pub use http::{Response as HttpResponse, read_url, read_url_blob};
27use tls::wrap_tls;
28
29pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
33
34pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
36
37pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
39 let now = time();
40 context
41 .sql
42 .execute(
43 "DELETE FROM connection_history
44 WHERE ? > timestamp + ?",
45 (now, CACHE_TTL),
46 )
47 .await?;
48 Ok(())
49}
50
51pub(crate) async fn update_connection_history(
60 context: &Context,
61 alpn: &str,
62 host: &str,
63 port: u16,
64 addr: &str,
65 now: i64,
66) -> Result<()> {
67 context
68 .sql
69 .execute(
70 "INSERT INTO connection_history (host, port, alpn, addr, timestamp)
71 VALUES (?, ?, ?, ?, ?)
72 ON CONFLICT (host, port, alpn, addr)
73 DO UPDATE SET timestamp=excluded.timestamp",
74 (host, port, alpn, addr, now),
75 )
76 .await?;
77 Ok(())
78}
79
80pub(crate) async fn load_connection_timestamp(
83 sql: &Sql,
84 alpn: &str,
85 host: &str,
86 port: u16,
87 addr: Option<&str>,
88) -> Result<Option<i64>> {
89 let timestamp = sql
90 .query_get_value(
91 "SELECT timestamp FROM connection_history
92 WHERE host = ?
93 AND port = ?
94 AND alpn = ?
95 AND addr = IFNULL(?, addr)",
96 (host, port, alpn, addr),
97 )
98 .await?;
99 Ok(timestamp)
100}
101
102pub(crate) async fn connect_tcp_inner(
108 addr: SocketAddr,
109) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
110 let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
111 .await
112 .context("connection timeout")?
113 .context("connection failure")?;
114
115 tcp_stream.set_nodelay(true)?;
117
118 let mut timeout_stream = TimeoutStream::new(tcp_stream);
119 timeout_stream.set_write_timeout(Some(TIMEOUT));
120 timeout_stream.set_read_timeout(Some(TIMEOUT));
121
122 Ok(Box::pin(timeout_stream))
123}
124
125pub(crate) async fn connect_tls_inner(
128 addr: SocketAddr,
129 host: &str,
130 strict_tls: bool,
131 alpn: &str,
132 tls_session_store: &TlsSessionStore,
133) -> Result<impl SessionStream + 'static> {
134 let tcp_stream = connect_tcp_inner(addr).await?;
135 let tls_stream = wrap_tls(
136 strict_tls,
137 host,
138 addr.port(),
139 alpn,
140 tcp_stream,
141 tls_session_store,
142 )
143 .await?;
144 Ok(tls_stream)
145}
146
147pub(crate) async fn run_connection_attempts<O, I, F>(mut futures: I) -> Result<O>
158where
159 I: Iterator<Item = F>,
160 F: Future<Output = Result<O>> + Send + 'static,
161 O: Send + 'static,
162{
163 let mut connection_attempt_set = JoinSet::new();
164
165 let mut delay_set = JoinSet::new();
168 for delay in [
169 Duration::from_millis(300),
170 Duration::from_secs(1),
171 Duration::from_secs(5),
172 Duration::from_secs(10),
173 ] {
174 delay_set.spawn(tokio::time::sleep(delay));
175 }
176
177 let mut first_error = None;
178
179 let res = loop {
180 if let Some(fut) = futures.next() {
181 connection_attempt_set.spawn(fut);
182 }
183
184 tokio::select! {
185 biased;
186
187 res = connection_attempt_set.join_next() => {
188 match res {
189 Some(res) => {
190 match res.context("Failed to join task") {
191 Ok(Ok(conn)) => {
192 break Ok(conn);
194 }
195 Ok(Err(err)) => {
196 first_error.get_or_insert(err);
198 }
199 Err(err) => {
200 break Err(err);
201 }
202 }
203 }
204 None => {
205 break Err(
209 first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
210 );
211 }
212 }
213 },
214
215 _ = delay_set.join_next(), if !delay_set.is_empty() => {
216 }
221 }
222 };
223
224 connection_attempt_set.shutdown().await;
233
234 res
235}
236
237pub(crate) async fn connect_tcp(
244 context: &Context,
245 host: &str,
246 port: u16,
247 load_cache: bool,
248) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
249 let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
250 .await?
251 .into_iter()
252 .map(connect_tcp_inner);
253 run_connection_attempts(connection_futures).await
254}