deltachat/
net.rs

1//! # Common network utilities.
2use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, format_err};
8use tokio::net::TcpStream;
9use tokio::task::JoinSet;
10use tokio::time::timeout;
11use tokio_io_timeout::TimeoutStream;
12
13use crate::context::Context;
14use crate::net::session::SessionStream;
15use crate::net::tls::TlsSessionStore;
16use crate::sql::Sql;
17use crate::tools::time;
18
19pub(crate) mod dns;
20pub(crate) mod http;
21pub(crate) mod proxy;
22pub(crate) mod session;
23pub(crate) mod tls;
24
25use dns::lookup_host_with_cache;
26pub use http::{Response as HttpResponse, read_url, read_url_blob};
27use tls::wrap_tls;
28
29/// Connection, write and read timeout.
30///
31/// This constant should be more than the largest expected RTT.
32pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
33
34/// TTL for caches in seconds.
35pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
36
37/// Removes connection history entries after `CACHE_TTL`.
38pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
39    let now = time();
40    context
41        .sql
42        .execute(
43            "DELETE FROM connection_history
44             WHERE ? > timestamp + ?",
45            (now, CACHE_TTL),
46        )
47        .await?;
48    Ok(())
49}
50
51/// Update the timestamp of the last successful connection
52/// to the given `host` and `port`
53/// with the given application protocol `alpn`.
54///
55/// `addr` is the string representation of IP address.
56/// If connection is made over a proxy which does
57/// its own DNS resolution,
58/// `addr` should be the same as `host`.
59pub(crate) async fn update_connection_history(
60    context: &Context,
61    alpn: &str,
62    host: &str,
63    port: u16,
64    addr: &str,
65    now: i64,
66) -> Result<()> {
67    context
68        .sql
69        .execute(
70            "INSERT INTO connection_history (host, port, alpn, addr, timestamp)
71             VALUES (?, ?, ?, ?, ?)
72             ON CONFLICT (host, port, alpn, addr)
73             DO UPDATE SET timestamp=excluded.timestamp",
74            (host, port, alpn, addr, now),
75        )
76        .await?;
77    Ok(())
78}
79
80/// Returns timestamp of the most recent successful connection
81/// to the host and port for given protocol.
82pub(crate) async fn load_connection_timestamp(
83    sql: &Sql,
84    alpn: &str,
85    host: &str,
86    port: u16,
87    addr: Option<&str>,
88) -> Result<Option<i64>> {
89    let timestamp = sql
90        .query_get_value(
91            "SELECT timestamp FROM connection_history
92             WHERE host = ?
93               AND port = ?
94               AND alpn = ?
95               AND addr = IFNULL(?, addr)",
96            (host, port, alpn, addr),
97        )
98        .await?;
99    Ok(timestamp)
100}
101
102/// Returns a TCP connection stream with read/write timeouts set
103/// and Nagle's algorithm disabled with `TCP_NODELAY`.
104///
105/// `TCP_NODELAY` ensures writing to the stream always results in immediate sending of the packet
106/// to the network, which is important to reduce the latency of interactive protocols such as IMAP.
107pub(crate) async fn connect_tcp_inner(
108    addr: SocketAddr,
109) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
110    let tcp_stream = timeout(TIMEOUT, TcpStream::connect(addr))
111        .await
112        .context("connection timeout")?
113        .context("connection failure")?;
114
115    // Disable Nagle's algorithm.
116    tcp_stream.set_nodelay(true)?;
117
118    let mut timeout_stream = TimeoutStream::new(tcp_stream);
119    timeout_stream.set_write_timeout(Some(TIMEOUT));
120    timeout_stream.set_read_timeout(Some(TIMEOUT));
121
122    Ok(Box::pin(timeout_stream))
123}
124
125/// Attempts to establish TLS connection
126/// given the result of the hostname to address resolution.
127pub(crate) async fn connect_tls_inner(
128    addr: SocketAddr,
129    host: &str,
130    strict_tls: bool,
131    alpn: &str,
132    tls_session_store: &TlsSessionStore,
133) -> Result<impl SessionStream + 'static> {
134    let tcp_stream = connect_tcp_inner(addr).await?;
135    let tls_stream = wrap_tls(
136        strict_tls,
137        host,
138        addr.port(),
139        alpn,
140        tcp_stream,
141        tls_session_store,
142    )
143    .await?;
144    Ok(tls_stream)
145}
146
147/// Runs connection attempt futures.
148///
149/// Accepts iterator of connection attempt futures
150/// and runs them until one of them succeeds
151/// or all of them fail.
152///
153/// If all connection attempts fail, returns the first error.
154///
155/// This functions starts with one connection attempt and maintains
156/// up to five parallel connection attempts if connecting takes time.
157pub(crate) async fn run_connection_attempts<O, I, F>(mut futures: I) -> Result<O>
158where
159    I: Iterator<Item = F>,
160    F: Future<Output = Result<O>> + Send + 'static,
161    O: Send + 'static,
162{
163    let mut connection_attempt_set = JoinSet::new();
164
165    // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s.
166    // This way we can have up to 5 parallel connection attempts at the same time.
167    let mut delay_set = JoinSet::new();
168    for delay in [
169        Duration::from_millis(300),
170        Duration::from_secs(1),
171        Duration::from_secs(5),
172        Duration::from_secs(10),
173    ] {
174        delay_set.spawn(tokio::time::sleep(delay));
175    }
176
177    let mut first_error = None;
178
179    let res = loop {
180        if let Some(fut) = futures.next() {
181            connection_attempt_set.spawn(fut);
182        }
183
184        tokio::select! {
185            biased;
186
187            res = connection_attempt_set.join_next() => {
188                match res {
189                    Some(res) => {
190                        match res.context("Failed to join task") {
191                            Ok(Ok(conn)) => {
192                                // Successfully connected.
193                                break Ok(conn);
194                            }
195                            Ok(Err(err)) => {
196                                // Some connection attempt failed.
197                                first_error.get_or_insert(err);
198                            }
199                            Err(err) => {
200                                break Err(err);
201                            }
202                        }
203                    }
204                    None => {
205                        // Out of connection attempts.
206                        //
207                        // Break out of the loop and return error.
208                        break Err(
209                            first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
210                        );
211                    }
212                }
213            },
214
215            _ = delay_set.join_next(), if !delay_set.is_empty() => {
216                // Delay expired.
217                //
218                // Don't do anything other than pushing
219                // another connection attempt into `connection_attempt_set`.
220            }
221        }
222    };
223
224    // Abort remaining connection attempts and free resources
225    // such as OS sockets and `Context` references
226    // held by connection attempt tasks.
227    //
228    // `delay_set` contains just `sleep` tasks
229    // so no need to await futures there,
230    // it is enough that futures are aborted
231    // when the set is dropped.
232    connection_attempt_set.shutdown().await;
233
234    res
235}
236
237/// If `load_cache` is true, may use cached DNS results.
238/// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests,
239/// this option should only be used when connection is authenticated,
240/// for example using TLS.
241/// If TLS is not used or invalid TLS certificates are allowed,
242/// this option should be disabled.
243pub(crate) async fn connect_tcp(
244    context: &Context,
245    host: &str,
246    port: u16,
247    load_cache: bool,
248) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
249    let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache)
250        .await?
251        .into_iter()
252        .map(connect_tcp_inner);
253    run_connection_attempts(connection_futures).await
254}