deltachat/
net.rs

1//! # Common network utilities.
2use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, format_err};
8use tokio::net::TcpStream;
9use tokio::task::JoinSet;
10use tokio::time::timeout;
11use tokio_io_timeout::TimeoutStream;
12
13use crate::context::Context;
14use crate::net::session::SessionStream;
15use crate::net::tls::{SpkiHashStore, TlsSessionStore};
16use crate::sql::Sql;
17use crate::tools::time;
18
19pub(crate) mod dns;
20pub(crate) mod http;
21pub(crate) mod proxy;
22pub(crate) mod session;
23pub(crate) mod tls;
24
25use dns::lookup_host_with_cache;
26pub use http::{Response as HttpResponse, read_url, read_url_blob};
27use tls::wrap_tls;
28
29/// Connection, write and read timeout.
30///
31/// This constant should be more than the largest expected RTT.
32pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
33
34/// TTL for caches in seconds.
35pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
36
37/// Removes connection history entries after `CACHE_TTL`.
38pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
39    let now = time();
40    context
41        .sql
42        .execute(
43            "DELETE FROM connection_history
44             WHERE ? > timestamp + ?",
45            (now, CACHE_TTL),
46        )
47        .await?;
48    Ok(())
49}
50
51/// Update the timestamp of the last successful connection
52/// to the given `host` and `port`
53/// with the given application protocol `alpn`.
54///
55/// `addr` is the string representation of IP address.
56/// If connection is made over a proxy which does
57/// its own DNS resolution,
58/// `addr` should be the same as `host`.
59pub(crate) async fn update_connection_history(
60    context: &Context,
61    alpn: &str,
62    host: &str,
63    port: u16,
64    addr: &str,
65    now: i64,
66) -> Result<()> {
67    context
68        .sql
69        .execute(
70            "INSERT INTO connection_history (host, port, alpn, addr, timestamp)
71             VALUES (?, ?, ?, ?, ?)
72             ON CONFLICT (host, port, alpn, addr)
73             DO UPDATE SET timestamp=excluded.timestamp",
74            (host, port, alpn, addr, now),
75        )
76        .await?;
77    Ok(())
78}
79
80/// Returns timestamp of the most recent successful connection
81/// to the host and port for given protocol.
82pub(crate) async fn load_connection_timestamp(
83    sql: &Sql,
84    alpn: &str,
85    host: &str,
86    port: u16,
87    addr: Option<&str>,
88) -> Result<Option<i64>> {
89    let timestamp = sql
90        .query_get_value(
91            "SELECT timestamp FROM connection_history
92             WHERE host = ?
93               AND port = ?
94               AND alpn = ?
95               AND addr = IFNULL(?, addr)",
96            (host, port, alpn, addr),
97        )
98        .await?;
99    Ok(timestamp)
100}
101
102/// Returns a TCP connection stream with read/write timeouts set
103/// and Nagle's algorithm disabled with `TCP_NODELAY`.
104///
105/// `TCP_NODELAY` ensures writing to the stream always results in immediate sending of the packet
106/// to the network, which is important to reduce the latency of interactive protocols such as IMAP.
107pub(crate) async fn connect_tcp_inner(
108    addr: SocketAddr,
109) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
110    let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
111        .await
112        .context("Connection timeout")?
113        .context("Connection failure")?;
114
115    // Disable Nagle's algorithm.
116    tcp_stream.set_nodelay(true)?;
117
118    let mut timeout_stream = TimeoutStream::new(tcp_stream);
119    timeout_stream.set_write_timeout(Some(TIMEOUT));
120    timeout_stream.set_read_timeout(Some(TIMEOUT));
121
122    Ok(Box::pin(timeout_stream))
123}
124
125/// Attempts to establish TLS connection
126/// given the result of the hostname to address resolution.
127pub(crate) async fn connect_tls_inner(
128    addr: SocketAddr,
129    host: &str,
130    strict_tls: bool,
131    alpn: &str,
132    tls_session_store: &TlsSessionStore,
133    spki_hash_store: &SpkiHashStore,
134    sql: &Sql,
135) -> Result<impl SessionStream + 'static> {
136    let use_sni = true;
137    let tcp_stream = connect_tcp_inner(addr).await?;
138    let tls_stream = wrap_tls(
139        strict_tls,
140        host,
141        addr.port(),
142        use_sni,
143        alpn,
144        tcp_stream,
145        tls_session_store,
146        spki_hash_store,
147        sql,
148    )
149    .await?;
150    Ok(tls_stream)
151}
152
153/// Runs connection attempt futures.
154///
155/// Accepts iterator of connection attempt futures
156/// and runs them until one of them succeeds
157/// or all of them fail.
158///
159/// If all connection attempts fail, returns the first error.
160///
161/// This functions starts with one connection attempt and maintains
162/// up to five parallel connection attempts if connecting takes time.
163pub(crate) async fn run_connection_attempts<O, I, F>(mut futures: I) -> Result<O>
164where
165    I: Iterator<Item = F>,
166    F: Future<Output = Result<O>> + Send + 'static,
167    O: Send + 'static,
168{
169    let mut connection_attempt_set = JoinSet::new();
170
171    // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s.
172    // This way we can have up to 5 parallel connection attempts at the same time.
173    let mut delay_set = JoinSet::new();
174    for delay in [
175        Duration::from_millis(300),
176        Duration::from_secs(1),
177        Duration::from_secs(5),
178        Duration::from_secs(10),
179    ] {
180        delay_set.spawn(tokio::time::sleep(delay));
181    }
182
183    let mut first_error = None;
184
185    let res = loop {
186        if let Some(fut) = futures.next() {
187            connection_attempt_set.spawn(fut);
188        }
189
190        tokio::select! {
191            biased;
192
193            res = connection_attempt_set.join_next() => {
194                match res {
195                    Some(res) => {
196                        match res.context("Failed to join task") {
197                            Ok(Ok(conn)) => {
198                                // Successfully connected.
199                                break Ok(conn);
200                            }
201                            Ok(Err(err)) => {
202                                // Some connection attempt failed.
203                                first_error.get_or_insert(err);
204                            }
205                            Err(err) => {
206                                break Err(err);
207                            }
208                        }
209                    }
210                    None => {
211                        // Out of connection attempts.
212                        //
213                        // Break out of the loop and return error.
214                        break Err(
215                            first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
216                        );
217                    }
218                }
219            },
220
221            _ = delay_set.join_next(), if !delay_set.is_empty() => {
222                // Delay expired.
223                //
224                // Don't do anything other than pushing
225                // another connection attempt into `connection_attempt_set`.
226            }
227        }
228    };
229
230    // Abort remaining connection attempts and free resources
231    // such as OS sockets and `Context` references
232    // held by connection attempt tasks.
233    //
234    // `delay_set` contains just `sleep` tasks
235    // so no need to await futures there,
236    // it is enough that futures are aborted
237    // when the set is dropped.
238    connection_attempt_set.shutdown().await;
239
240    res
241}
242
243/// If `load_cache` is true, may use cached DNS results.
244/// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests,
245/// this option should only be used when connection is authenticated,
246/// for example using TLS.
247/// If TLS is not used or invalid TLS certificates are allowed,
248/// this option should be disabled.
249pub(crate) async fn connect_tcp(
250    context: &Context,
251    host: &str,
252    port: u16,
253    load_cache: bool,
254) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
255    let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
256        .await?
257        .into_iter()
258        .map(connect_tcp_inner);
259    run_connection_attempts(connection_futures).await
260}