deltachat/
net.rs

1//! # Common network utilities.
2use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, format_err};
8use tokio::net::TcpStream;
9use tokio::task::JoinSet;
10use tokio::time::timeout;
11use tokio_io_timeout::TimeoutStream;
12
13use crate::context::Context;
14use crate::net::session::SessionStream;
15use crate::net::tls::TlsSessionStore;
16use crate::sql::Sql;
17use crate::tools::time;
18
19pub(crate) mod dns;
20pub(crate) mod http;
21pub(crate) mod proxy;
22pub(crate) mod session;
23pub(crate) mod tls;
24
25use dns::lookup_host_with_cache;
26pub use http::{Response as HttpResponse, read_url, read_url_blob};
27use tls::wrap_tls;
28
29/// Connection, write and read timeout.
30///
31/// This constant should be more than the largest expected RTT.
32pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
33
34/// TTL for caches in seconds.
35pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
36
37/// Removes connection history entries after `CACHE_TTL`.
38pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
39    let now = time();
40    context
41        .sql
42        .execute(
43            "DELETE FROM connection_history
44             WHERE ? > timestamp + ?",
45            (now, CACHE_TTL),
46        )
47        .await?;
48    Ok(())
49}
50
51/// Update the timestamp of the last successful connection
52/// to the given `host` and `port`
53/// with the given application protocol `alpn`.
54///
55/// `addr` is the string representation of IP address.
56/// If connection is made over a proxy which does
57/// its own DNS resolution,
58/// `addr` should be the same as `host`.
59pub(crate) async fn update_connection_history(
60    context: &Context,
61    alpn: &str,
62    host: &str,
63    port: u16,
64    addr: &str,
65    now: i64,
66) -> Result<()> {
67    context
68        .sql
69        .execute(
70            "INSERT INTO connection_history (host, port, alpn, addr, timestamp)
71             VALUES (?, ?, ?, ?, ?)
72             ON CONFLICT (host, port, alpn, addr)
73             DO UPDATE SET timestamp=excluded.timestamp",
74            (host, port, alpn, addr, now),
75        )
76        .await?;
77    Ok(())
78}
79
80/// Returns timestamp of the most recent successful connection
81/// to the host and port for given protocol.
82pub(crate) async fn load_connection_timestamp(
83    sql: &Sql,
84    alpn: &str,
85    host: &str,
86    port: u16,
87    addr: Option<&str>,
88) -> Result<Option<i64>> {
89    let timestamp = sql
90        .query_get_value(
91            "SELECT timestamp FROM connection_history
92             WHERE host = ?
93               AND port = ?
94               AND alpn = ?
95               AND addr = IFNULL(?, addr)",
96            (host, port, alpn, addr),
97        )
98        .await?;
99    Ok(timestamp)
100}
101
102/// Returns a TCP connection stream with read/write timeouts set
103/// and Nagle's algorithm disabled with `TCP_NODELAY`.
104///
105/// `TCP_NODELAY` ensures writing to the stream always results in immediate sending of the packet
106/// to the network, which is important to reduce the latency of interactive protocols such as IMAP.
107pub(crate) async fn connect_tcp_inner(
108    addr: SocketAddr,
109) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
110    let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
111        .await
112        .context("connection timeout")?
113        .context("connection failure")?;
114
115    // Disable Nagle's algorithm.
116    tcp_stream.set_nodelay(true)?;
117
118    let mut timeout_stream = TimeoutStream::new(tcp_stream);
119    timeout_stream.set_write_timeout(Some(TIMEOUT));
120    timeout_stream.set_read_timeout(Some(TIMEOUT));
121
122    Ok(Box::pin(timeout_stream))
123}
124
125/// Attempts to establish TLS connection
126/// given the result of the hostname to address resolution.
127pub(crate) async fn connect_tls_inner(
128    addr: SocketAddr,
129    host: &str,
130    strict_tls: bool,
131    alpn: &str,
132    tls_session_store: &TlsSessionStore,
133) -> Result<impl SessionStream + 'static> {
134    let use_sni = true;
135    let tcp_stream = connect_tcp_inner(addr).await?;
136    let tls_stream = wrap_tls(
137        strict_tls,
138        host,
139        addr.port(),
140        use_sni,
141        alpn,
142        tcp_stream,
143        tls_session_store,
144    )
145    .await?;
146    Ok(tls_stream)
147}
148
149/// Runs connection attempt futures.
150///
151/// Accepts iterator of connection attempt futures
152/// and runs them until one of them succeeds
153/// or all of them fail.
154///
155/// If all connection attempts fail, returns the first error.
156///
157/// This functions starts with one connection attempt and maintains
158/// up to five parallel connection attempts if connecting takes time.
159pub(crate) async fn run_connection_attempts<O, I, F>(mut futures: I) -> Result<O>
160where
161    I: Iterator<Item = F>,
162    F: Future<Output = Result<O>> + Send + 'static,
163    O: Send + 'static,
164{
165    let mut connection_attempt_set = JoinSet::new();
166
167    // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s.
168    // This way we can have up to 5 parallel connection attempts at the same time.
169    let mut delay_set = JoinSet::new();
170    for delay in [
171        Duration::from_millis(300),
172        Duration::from_secs(1),
173        Duration::from_secs(5),
174        Duration::from_secs(10),
175    ] {
176        delay_set.spawn(tokio::time::sleep(delay));
177    }
178
179    let mut first_error = None;
180
181    let res = loop {
182        if let Some(fut) = futures.next() {
183            connection_attempt_set.spawn(fut);
184        }
185
186        tokio::select! {
187            biased;
188
189            res = connection_attempt_set.join_next() => {
190                match res {
191                    Some(res) => {
192                        match res.context("Failed to join task") {
193                            Ok(Ok(conn)) => {
194                                // Successfully connected.
195                                break Ok(conn);
196                            }
197                            Ok(Err(err)) => {
198                                // Some connection attempt failed.
199                                first_error.get_or_insert(err);
200                            }
201                            Err(err) => {
202                                break Err(err);
203                            }
204                        }
205                    }
206                    None => {
207                        // Out of connection attempts.
208                        //
209                        // Break out of the loop and return error.
210                        break Err(
211                            first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
212                        );
213                    }
214                }
215            },
216
217            _ = delay_set.join_next(), if !delay_set.is_empty() => {
218                // Delay expired.
219                //
220                // Don't do anything other than pushing
221                // another connection attempt into `connection_attempt_set`.
222            }
223        }
224    };
225
226    // Abort remaining connection attempts and free resources
227    // such as OS sockets and `Context` references
228    // held by connection attempt tasks.
229    //
230    // `delay_set` contains just `sleep` tasks
231    // so no need to await futures there,
232    // it is enough that futures are aborted
233    // when the set is dropped.
234    connection_attempt_set.shutdown().await;
235
236    res
237}
238
239/// If `load_cache` is true, may use cached DNS results.
240/// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests,
241/// this option should only be used when connection is authenticated,
242/// for example using TLS.
243/// If TLS is not used or invalid TLS certificates are allowed,
244/// this option should be disabled.
245pub(crate) async fn connect_tcp(
246    context: &Context,
247    host: &str,
248    port: u16,
249    load_cache: bool,
250) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
251    let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
252        .await?
253        .into_iter()
254        .map(connect_tcp_inner);
255    run_connection_attempts(connection_futures).await
256}