1use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, format_err};
8use tokio::net::TcpStream;
9use tokio::task::JoinSet;
10use tokio::time::timeout;
11use tokio_io_timeout::TimeoutStream;
12
13use crate::context::Context;
14use crate::net::session::SessionStream;
15use crate::net::tls::{SpkiHashStore, TlsSessionStore};
16use crate::sql::Sql;
17use crate::tools::time;
18
19pub(crate) mod dns;
20pub(crate) mod http;
21pub(crate) mod proxy;
22pub(crate) mod session;
23pub(crate) mod tls;
24
25use dns::lookup_host_with_cache;
26pub use http::{Response as HttpResponse, read_url, read_url_blob};
27use tls::wrap_tls;
28
29pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
33
34pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
36
37pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
39 let now = time();
40 context
41 .sql
42 .execute(
43 "DELETE FROM connection_history
44 WHERE ? > timestamp + ?",
45 (now, CACHE_TTL),
46 )
47 .await?;
48 Ok(())
49}
50
51pub(crate) async fn update_connection_history(
60 context: &Context,
61 alpn: &str,
62 host: &str,
63 port: u16,
64 addr: &str,
65 now: i64,
66) -> Result<()> {
67 context
68 .sql
69 .execute(
70 "INSERT INTO connection_history (host, port, alpn, addr, timestamp)
71 VALUES (?, ?, ?, ?, ?)
72 ON CONFLICT (host, port, alpn, addr)
73 DO UPDATE SET timestamp=excluded.timestamp",
74 (host, port, alpn, addr, now),
75 )
76 .await?;
77 Ok(())
78}
79
80pub(crate) async fn load_connection_timestamp(
83 sql: &Sql,
84 alpn: &str,
85 host: &str,
86 port: u16,
87 addr: Option<&str>,
88) -> Result<Option<i64>> {
89 let timestamp = sql
90 .query_get_value(
91 "SELECT timestamp FROM connection_history
92 WHERE host = ?
93 AND port = ?
94 AND alpn = ?
95 AND addr = IFNULL(?, addr)",
96 (host, port, alpn, addr),
97 )
98 .await?;
99 Ok(timestamp)
100}
101
102pub(crate) async fn connect_tcp_inner(
108 addr: SocketAddr,
109) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
110 let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
111 .await
112 .context("Connection timeout")?
113 .context("Connection failure")?;
114
115 tcp_stream.set_nodelay(true)?;
117
118 let mut timeout_stream = TimeoutStream::new(tcp_stream);
119 timeout_stream.set_write_timeout(Some(TIMEOUT));
120 timeout_stream.set_read_timeout(Some(TIMEOUT));
121
122 Ok(Box::pin(timeout_stream))
123}
124
125pub(crate) async fn connect_tls_inner(
128 addr: SocketAddr,
129 host: &str,
130 strict_tls: bool,
131 alpn: &str,
132 tls_session_store: &TlsSessionStore,
133 spki_hash_store: &SpkiHashStore,
134 sql: &Sql,
135) -> Result<impl SessionStream + 'static> {
136 let use_sni = true;
137 let tcp_stream = connect_tcp_inner(addr).await?;
138 let tls_stream = wrap_tls(
139 strict_tls,
140 host,
141 addr.port(),
142 use_sni,
143 alpn,
144 tcp_stream,
145 tls_session_store,
146 spki_hash_store,
147 sql,
148 )
149 .await?;
150 Ok(tls_stream)
151}
152
153pub(crate) async fn run_connection_attempts<O, I, F>(mut futures: I) -> Result<O>
164where
165 I: Iterator<Item = F>,
166 F: Future<Output = Result<O>> + Send + 'static,
167 O: Send + 'static,
168{
169 let mut connection_attempt_set = JoinSet::new();
170
171 let mut delay_set = JoinSet::new();
174 for delay in [
175 Duration::from_millis(300),
176 Duration::from_secs(1),
177 Duration::from_secs(5),
178 Duration::from_secs(10),
179 ] {
180 delay_set.spawn(tokio::time::sleep(delay));
181 }
182
183 let mut first_error = None;
184
185 let res = loop {
186 if let Some(fut) = futures.next() {
187 connection_attempt_set.spawn(fut);
188 }
189
190 tokio::select! {
191 biased;
192
193 res = connection_attempt_set.join_next() => {
194 match res {
195 Some(res) => {
196 match res.context("Failed to join task") {
197 Ok(Ok(conn)) => {
198 break Ok(conn);
200 }
201 Ok(Err(err)) => {
202 first_error.get_or_insert(err);
204 }
205 Err(err) => {
206 break Err(err);
207 }
208 }
209 }
210 None => {
211 break Err(
215 first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
216 );
217 }
218 }
219 },
220
221 _ = delay_set.join_next(), if !delay_set.is_empty() => {
222 }
227 }
228 };
229
230 connection_attempt_set.shutdown().await;
239
240 res
241}
242
243pub(crate) async fn connect_tcp(
250 context: &Context,
251 host: &str,
252 port: u16,
253 load_cache: bool,
254) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
255 let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
256 .await?
257 .into_iter()
258 .map(connect_tcp_inner);
259 run_connection_attempts(connection_futures).await
260}