1use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, format_err};
8use tokio::net::TcpStream;
9use tokio::task::JoinSet;
10use tokio::time::timeout;
11use tokio_io_timeout::TimeoutStream;
12
13use crate::context::Context;
14use crate::net::session::SessionStream;
15use crate::net::tls::TlsSessionStore;
16use crate::sql::Sql;
17use crate::tools::time;
18
19pub(crate) mod dns;
20pub(crate) mod http;
21pub(crate) mod proxy;
22pub(crate) mod session;
23pub(crate) mod tls;
24
25use dns::lookup_host_with_cache;
26pub use http::{Response as HttpResponse, read_url, read_url_blob};
27use tls::wrap_tls;
28
29pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
33
34pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
36
37pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
39 let now = time();
40 context
41 .sql
42 .execute(
43 "DELETE FROM connection_history
44 WHERE ? > timestamp + ?",
45 (now, CACHE_TTL),
46 )
47 .await?;
48 Ok(())
49}
50
51pub(crate) async fn update_connection_history(
60 context: &Context,
61 alpn: &str,
62 host: &str,
63 port: u16,
64 addr: &str,
65 now: i64,
66) -> Result<()> {
67 context
68 .sql
69 .execute(
70 "INSERT INTO connection_history (host, port, alpn, addr, timestamp)
71 VALUES (?, ?, ?, ?, ?)
72 ON CONFLICT (host, port, alpn, addr)
73 DO UPDATE SET timestamp=excluded.timestamp",
74 (host, port, alpn, addr, now),
75 )
76 .await?;
77 Ok(())
78}
79
80pub(crate) async fn load_connection_timestamp(
83 sql: &Sql,
84 alpn: &str,
85 host: &str,
86 port: u16,
87 addr: Option<&str>,
88) -> Result<Option<i64>> {
89 let timestamp = sql
90 .query_get_value(
91 "SELECT timestamp FROM connection_history
92 WHERE host = ?
93 AND port = ?
94 AND alpn = ?
95 AND addr = IFNULL(?, addr)",
96 (host, port, alpn, addr),
97 )
98 .await?;
99 Ok(timestamp)
100}
101
102pub(crate) async fn connect_tcp_inner(
108 addr: SocketAddr,
109) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
110 let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
111 .await
112 .context("connection timeout")?
113 .context("connection failure")?;
114
115 tcp_stream.set_nodelay(true)?;
117
118 let mut timeout_stream = TimeoutStream::new(tcp_stream);
119 timeout_stream.set_write_timeout(Some(TIMEOUT));
120 timeout_stream.set_read_timeout(Some(TIMEOUT));
121
122 Ok(Box::pin(timeout_stream))
123}
124
125pub(crate) async fn connect_tls_inner(
128 addr: SocketAddr,
129 host: &str,
130 strict_tls: bool,
131 alpn: &str,
132 tls_session_store: &TlsSessionStore,
133) -> Result<impl SessionStream + 'static> {
134 let use_sni = true;
135 let tcp_stream = connect_tcp_inner(addr).await?;
136 let tls_stream = wrap_tls(
137 strict_tls,
138 host,
139 addr.port(),
140 use_sni,
141 alpn,
142 tcp_stream,
143 tls_session_store,
144 )
145 .await?;
146 Ok(tls_stream)
147}
148
149pub(crate) async fn run_connection_attempts<O, I, F>(mut futures: I) -> Result<O>
160where
161 I: Iterator<Item = F>,
162 F: Future<Output = Result<O>> + Send + 'static,
163 O: Send + 'static,
164{
165 let mut connection_attempt_set = JoinSet::new();
166
167 let mut delay_set = JoinSet::new();
170 for delay in [
171 Duration::from_millis(300),
172 Duration::from_secs(1),
173 Duration::from_secs(5),
174 Duration::from_secs(10),
175 ] {
176 delay_set.spawn(tokio::time::sleep(delay));
177 }
178
179 let mut first_error = None;
180
181 let res = loop {
182 if let Some(fut) = futures.next() {
183 connection_attempt_set.spawn(fut);
184 }
185
186 tokio::select! {
187 biased;
188
189 res = connection_attempt_set.join_next() => {
190 match res {
191 Some(res) => {
192 match res.context("Failed to join task") {
193 Ok(Ok(conn)) => {
194 break Ok(conn);
196 }
197 Ok(Err(err)) => {
198 first_error.get_or_insert(err);
200 }
201 Err(err) => {
202 break Err(err);
203 }
204 }
205 }
206 None => {
207 break Err(
211 first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
212 );
213 }
214 }
215 },
216
217 _ = delay_set.join_next(), if !delay_set.is_empty() => {
218 }
223 }
224 };
225
226 connection_attempt_set.shutdown().await;
235
236 res
237}
238
239pub(crate) async fn connect_tcp(
246 context: &Context,
247 host: &str,
248 port: u16,
249 load_cache: bool,
250) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
251 let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
252 .await?
253 .into_iter()
254 .map(connect_tcp_inner);
255 run_connection_attempts(connection_futures).await
256}