deltachat/
net.rs

1//! # Common network utilities.
2use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::time::Duration;
6
7use anyhow::{format_err, Context as _, Result};
8use tokio::net::TcpStream;
9use tokio::task::JoinSet;
10use tokio::time::timeout;
11use tokio_io_timeout::TimeoutStream;
12
13use crate::context::Context;
14use crate::net::session::SessionStream;
15use crate::sql::Sql;
16use crate::tools::time;
17
18pub(crate) mod dns;
19pub(crate) mod http;
20pub(crate) mod proxy;
21pub(crate) mod session;
22pub(crate) mod tls;
23
24use dns::lookup_host_with_cache;
25pub use http::{read_url, read_url_blob, Response as HttpResponse};
26use tls::wrap_tls;
27
28/// Connection, write and read timeout.
29///
30/// This constant should be more than the largest expected RTT.
31pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
32
33/// TTL for caches in seconds.
34pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
35
36/// Removes connection history entries after `CACHE_TTL`.
37pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
38    let now = time();
39    context
40        .sql
41        .execute(
42            "DELETE FROM connection_history
43             WHERE ? > timestamp + ?",
44            (now, CACHE_TTL),
45        )
46        .await?;
47    Ok(())
48}
49
50/// Update the timestamp of the last successful connection
51/// to the given `host` and `port`
52/// with the given application protocol `alpn`.
53///
54/// `addr` is the string representation of IP address.
55/// If connection is made over a proxy which does
56/// its own DNS resolution,
57/// `addr` should be the same as `host`.
58pub(crate) async fn update_connection_history(
59    context: &Context,
60    alpn: &str,
61    host: &str,
62    port: u16,
63    addr: &str,
64    now: i64,
65) -> Result<()> {
66    context
67        .sql
68        .execute(
69            "INSERT INTO connection_history (host, port, alpn, addr, timestamp)
70             VALUES (?, ?, ?, ?, ?)
71             ON CONFLICT (host, port, alpn, addr)
72             DO UPDATE SET timestamp=excluded.timestamp",
73            (host, port, alpn, addr, now),
74        )
75        .await?;
76    Ok(())
77}
78
79/// Returns timestamp of the most recent successful connection
80/// to the host and port for given protocol.
81pub(crate) async fn load_connection_timestamp(
82    sql: &Sql,
83    alpn: &str,
84    host: &str,
85    port: u16,
86    addr: Option<&str>,
87) -> Result<Option<i64>> {
88    let timestamp = sql
89        .query_get_value(
90            "SELECT timestamp FROM connection_history
91             WHERE host = ?
92               AND port = ?
93               AND alpn = ?
94               AND addr = IFNULL(?, addr)",
95            (host, port, alpn, addr),
96        )
97        .await?;
98    Ok(timestamp)
99}
100
101/// Returns a TCP connection stream with read/write timeouts set
102/// and Nagle's algorithm disabled with `TCP_NODELAY`.
103///
104/// `TCP_NODELAY` ensures writing to the stream always results in immediate sending of the packet
105/// to the network, which is important to reduce the latency of interactive protocols such as IMAP.
106pub(crate) async fn connect_tcp_inner(
107    addr: SocketAddr,
108) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
109    let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
110        .await
111        .context("connection timeout")?
112        .context("connection failure")?;
113
114    // Disable Nagle's algorithm.
115    tcp_stream.set_nodelay(true)?;
116
117    let mut timeout_stream = TimeoutStream::new(tcp_stream);
118    timeout_stream.set_write_timeout(Some(TIMEOUT));
119    timeout_stream.set_read_timeout(Some(TIMEOUT));
120
121    Ok(Box::pin(timeout_stream))
122}
123
124/// Attempts to establish TLS connection
125/// given the result of the hostname to address resolution.
126pub(crate) async fn connect_tls_inner(
127    addr: SocketAddr,
128    host: &str,
129    strict_tls: bool,
130    alpn: &[&str],
131) -> Result<impl SessionStream> {
132    let tcp_stream = connect_tcp_inner(addr).await?;
133    let tls_stream = wrap_tls(strict_tls, host, alpn, tcp_stream).await?;
134    Ok(tls_stream)
135}
136
137/// Runs connection attempt futures.
138///
139/// Accepts iterator of connection attempt futures
140/// and runs them until one of them succeeds
141/// or all of them fail.
142///
143/// If all connection attempts fail, returns the first error.
144///
145/// This functions starts with one connection attempt and maintains
146/// up to five parallel connection attempts if connecting takes time.
147pub(crate) async fn run_connection_attempts<O, I, F>(mut futures: I) -> Result<O>
148where
149    I: Iterator<Item = F>,
150    F: Future<Output = Result<O>> + Send + 'static,
151    O: Send + 'static,
152{
153    let mut connection_attempt_set = JoinSet::new();
154
155    // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s.
156    // This way we can have up to 5 parallel connection attempts at the same time.
157    let mut delay_set = JoinSet::new();
158    for delay in [
159        Duration::from_millis(300),
160        Duration::from_secs(1),
161        Duration::from_secs(5),
162        Duration::from_secs(10),
163    ] {
164        delay_set.spawn(tokio::time::sleep(delay));
165    }
166
167    let mut first_error = None;
168
169    let res = loop {
170        if let Some(fut) = futures.next() {
171            connection_attempt_set.spawn(fut);
172        }
173
174        tokio::select! {
175            biased;
176
177            res = connection_attempt_set.join_next() => {
178                match res {
179                    Some(res) => {
180                        match res.context("Failed to join task") {
181                            Ok(Ok(conn)) => {
182                                // Successfully connected.
183                                break Ok(conn);
184                            }
185                            Ok(Err(err)) => {
186                                // Some connection attempt failed.
187                                first_error.get_or_insert(err);
188                            }
189                            Err(err) => {
190                                break Err(err);
191                            }
192                        }
193                    }
194                    None => {
195                        // Out of connection attempts.
196                        //
197                        // Break out of the loop and return error.
198                        break Err(
199                            first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
200                        );
201                    }
202                }
203            },
204
205            _ = delay_set.join_next(), if !delay_set.is_empty() => {
206                // Delay expired.
207                //
208                // Don't do anything other than pushing
209                // another connection attempt into `connection_attempt_set`.
210            }
211        }
212    };
213
214    // Abort remaining connection attempts and free resources
215    // such as OS sockets and `Context` references
216    // held by connection attempt tasks.
217    //
218    // `delay_set` contains just `sleep` tasks
219    // so no need to await futures there,
220    // it is enough that futures are aborted
221    // when the set is dropped.
222    connection_attempt_set.shutdown().await;
223
224    res
225}
226
227/// If `load_cache` is true, may use cached DNS results.
228/// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests,
229/// this option should only be used when connection is authenticated,
230/// for example using TLS.
231/// If TLS is not used or invalid TLS certificates are allowed,
232/// this option should be disabled.
233pub(crate) async fn connect_tcp(
234    context: &Context,
235    host: &str,
236    port: u16,
237    load_cache: bool,
238) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
239    let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
240        .await?
241        .into_iter()
242        .map(connect_tcp_inner);
243    run_connection_attempts(connection_futures).await
244}