1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
//! # Common network utilities.
use std::net::Ipv4Addr;
use std::net::{IpAddr, SocketAddr};
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;

use anyhow::{format_err, Context as _, Result};
use tokio::net::{lookup_host, TcpStream};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;

use crate::context::Context;
use crate::tools::time;

pub(crate) mod http;
pub(crate) mod session;
pub(crate) mod tls;

pub use http::{read_url, read_url_blob, Response as HttpResponse};

async fn connect_tcp_inner(addr: SocketAddr, timeout_val: Duration) -> Result<TcpStream> {
    let tcp_stream = timeout(timeout_val, TcpStream::connect(addr))
        .await
        .context("connection timeout")?
        .context("connection failure")?;
    Ok(tcp_stream)
}

async fn lookup_host_with_timeout(
    hostname: &str,
    port: u16,
    timeout_val: Duration,
) -> Result<Vec<SocketAddr>> {
    let res = timeout(timeout_val, lookup_host((hostname, port)))
        .await
        .context("DNS lookup timeout")?
        .context("DNS lookup failure")?;
    Ok(res.collect())
}

/// Looks up hostname and port using DNS and updates the address resolution cache.
///
/// If `load_cache` is true, appends cached results not older than 30 days to the end.
async fn lookup_host_with_cache(
    context: &Context,
    hostname: &str,
    port: u16,
    timeout_val: Duration,
    load_cache: bool,
) -> Result<Vec<SocketAddr>> {
    let now = time();
    let mut resolved_addrs = match lookup_host_with_timeout(hostname, port, timeout_val).await {
        Ok(res) => res,
        Err(err) => {
            warn!(
                context,
                "DNS resolution for {}:{} failed: {:#}.", hostname, port, err
            );
            Vec::new()
        }
    };

    for addr in &resolved_addrs {
        let ip_string = addr.ip().to_string();
        if ip_string == hostname {
            // IP address resolved into itself, not interesting to cache.
            continue;
        }

        info!(context, "Resolved {}:{} into {}.", hostname, port, &addr);

        // Update the cache.
        context
            .sql
            .execute(
                "INSERT INTO dns_cache
                 (hostname, address, timestamp)
                 VALUES (?, ?, ?)
                 ON CONFLICT (hostname, address)
                 DO UPDATE SET timestamp=excluded.timestamp",
                (hostname, ip_string, now),
            )
            .await?;
    }

    if load_cache {
        for cached_address in context
            .sql
            .query_map(
                "SELECT address
                 FROM dns_cache
                 WHERE hostname = ?
                 AND ? < timestamp + 30 * 24 * 3600
                 ORDER BY timestamp DESC",
                (hostname, now),
                |row| {
                    let address: String = row.get(0)?;
                    Ok(address)
                },
                |rows| {
                    rows.collect::<std::result::Result<Vec<_>, _>>()
                        .map_err(Into::into)
                },
            )
            .await?
        {
            match IpAddr::from_str(&cached_address) {
                Ok(ip_addr) => {
                    let addr = SocketAddr::new(ip_addr, port);
                    if !resolved_addrs.contains(&addr) {
                        resolved_addrs.push(addr);
                    }
                }
                Err(err) => {
                    warn!(
                        context,
                        "Failed to parse cached address {:?}: {:#}.", cached_address, err
                    );
                }
            }
        }

        if resolved_addrs.is_empty() {
            // Load hardcoded cache if everything else fails.
            //
            // See <https://support.delta.chat/t/no-dns-resolution-result/2778> and
            // <https://github.com/deltachat/deltachat-core-rust/issues/4920> for reasons.
            //
            // In the future we may pre-resolve all provider database addresses
            // and build them in.
            if hostname == "mail.sangham.net" {
                resolved_addrs.push(SocketAddr::new(
                    IpAddr::V4(Ipv4Addr::new(159, 69, 186, 85)),
                    port,
                ));
            }
        }
    }

    Ok(resolved_addrs)
}

/// Returns a TCP connection stream with read/write timeouts set
/// and Nagle's algorithm disabled with `TCP_NODELAY`.
///
/// `TCP_NODELAY` ensures writing to the stream always results in immediate sending of the packet
/// to the network, which is important to reduce the latency of interactive protocols such as IMAP.
///
/// If `load_cache` is true, may use cached DNS results.
/// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests,
/// this option should only be used when connection is authenticated,
/// for example using TLS.
/// If TLS is not used or invalid TLS certificates are allowed,
/// this option should be disabled.
pub(crate) async fn connect_tcp(
    context: &Context,
    host: &str,
    port: u16,
    timeout_val: Duration,
    load_cache: bool,
) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
    let mut tcp_stream = None;
    let mut last_error = None;

    for resolved_addr in
        lookup_host_with_cache(context, host, port, timeout_val, load_cache).await?
    {
        match connect_tcp_inner(resolved_addr, timeout_val).await {
            Ok(stream) => {
                tcp_stream = Some(stream);

                // Maximize priority of this cached entry.
                context
                    .sql
                    .execute(
                        "UPDATE dns_cache
                         SET timestamp = ?
                         WHERE address = ?",
                        (time(), resolved_addr.ip().to_string()),
                    )
                    .await?;
                break;
            }
            Err(err) => {
                warn!(
                    context,
                    "Failed to connect to {}: {:#}.", resolved_addr, err
                );
                last_error = Some(err);
            }
        }
    }

    let tcp_stream = match tcp_stream {
        Some(tcp_stream) => tcp_stream,
        None => {
            return Err(
                last_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))
            );
        }
    };

    // Disable Nagle's algorithm.
    tcp_stream.set_nodelay(true)?;

    let mut timeout_stream = TimeoutStream::new(tcp_stream);
    timeout_stream.set_write_timeout(Some(timeout_val));
    timeout_stream.set_read_timeout(Some(timeout_val));
    let pinned_stream = Box::pin(timeout_stream);

    Ok(pinned_stream)
}