deltachat/net/
http.rs

1//! # HTTP module.
2
3use anyhow::{Context as _, Result, anyhow, bail};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::log::{info, warn};
14use crate::net::proxy::ProxyConfig;
15use crate::net::session::SessionStream;
16use crate::net::tls::wrap_rustls;
17use crate::tools::time;
18
19/// User-Agent for HTTP requests if a resource usage policy requires it.
20/// By default we do not set User-Agent.
21const USER_AGENT: &str = "chatmail/2 (+https://github.com/chatmail/core/)";
22
23/// HTTP(S) GET response.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct Response {
26    /// Response body.
27    pub blob: Vec<u8>,
28
29    /// MIME type extracted from the `Content-Type` header, if any.
30    pub mimetype: Option<String>,
31
32    /// Encoding extracted from the `Content-Type` header, if any.
33    pub encoding: Option<String>,
34}
35
36/// Retrieves the text contents of URL using HTTP GET request.
37pub async fn read_url(context: &Context, url: &str) -> Result<String> {
38    let response = read_url_blob(context, url).await?;
39    let text = String::from_utf8_lossy(&response.blob);
40    Ok(text.to_string())
41}
42
43async fn get_http_sender<B>(
44    context: &Context,
45    parsed_url: hyper::Uri,
46) -> Result<hyper::client::conn::http1::SendRequest<B>>
47where
48    B: hyper::body::Body + 'static + Send,
49    B::Data: Send,
50    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
51{
52    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
53    let host = parsed_url.host().context("URL has no host")?;
54    let proxy_config_opt = ProxyConfig::load(context).await?;
55
56    let stream: Box<dyn SessionStream> = match scheme {
57        "http" => {
58            let port = parsed_url.port_u16().unwrap_or(80);
59
60            // It is safe to use cached IP addresses
61            // for HTTPS URLs, but for HTTP URLs
62            // better resolve from scratch each time to prevent
63            // cache poisoning attacks from having lasting effects.
64            let load_cache = false;
65            if let Some(proxy_config) = proxy_config_opt {
66                let proxy_stream = proxy_config
67                    .connect(context, host, port, load_cache)
68                    .await?;
69                Box::new(proxy_stream)
70            } else {
71                let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
72                Box::new(tcp_stream)
73            }
74        }
75        "https" => {
76            let port = parsed_url.port_u16().unwrap_or(443);
77            let load_cache = true;
78
79            if let Some(proxy_config) = proxy_config_opt {
80                let proxy_stream = proxy_config
81                    .connect(context, host, port, load_cache)
82                    .await?;
83                let tls_stream =
84                    wrap_rustls(host, port, "", proxy_stream, &context.tls_session_store).await?;
85                Box::new(tls_stream)
86            } else {
87                let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
88                let tls_stream =
89                    wrap_rustls(host, port, "", tcp_stream, &context.tls_session_store).await?;
90                Box::new(tls_stream)
91            }
92        }
93        _ => bail!("Unknown URL scheme"),
94    };
95
96    let io = TokioIo::new(stream);
97    let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
98    tokio::task::spawn(conn);
99
100    Ok(sender)
101}
102
103/// Converts the URL to expiration and stale timestamps.
104fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
105    let now = time();
106
107    let expires = now + 3600 * 24 * 35;
108    let stale = if url.ends_with(".xdc") {
109        // WebXDCs are never stale, they just expire.
110        expires
111    } else if url.starts_with("https://tile.openstreetmap.org/")
112        || url.starts_with("https://vector.openstreetmap.org/")
113    {
114        // Policy at <https://operations.osmfoundation.org/policies/tiles/>
115        // requires that we cache tiles for at least 7 days.
116        // Do not revalidate earlier than that.
117        now + 3600 * 24 * 7
118    } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
119        // Cache images for 1 day.
120        //
121        // As of 2024-12-12 WebXDC icons at <https://webxdc.org/apps/>
122        // use the same path for all app versions,
123        // so may change, but it is not critical if outdated icon is displayed.
124        now + 3600 * 24
125    } else {
126        // Revalidate everything else after 1 hour.
127        //
128        // This includes HTML, CSS and JS.
129        now + 3600
130    };
131    (expires, stale)
132}
133
134/// Places the binary into HTTP cache.
135async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
136    let blob =
137        BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
138
139    let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
140    context
141        .sql
142        .insert(
143            "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
144             VALUES (?, ?, ?, ?, ?, ?)",
145            (
146                url,
147                expires,
148                stale,
149                blob.as_name(),
150                response.mimetype.as_deref().unwrap_or_default(),
151                response.encoding.as_deref().unwrap_or_default(),
152            ),
153        )
154        .await?;
155
156    Ok(())
157}
158
159/// Retrieves the binary from HTTP cache.
160///
161/// Also returns if the response is stale and should be revalidated in the background.
162async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
163    let now = time();
164    let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
165        .sql
166        .query_row_optional(
167            "SELECT blobname, mimetype, encoding, stale
168             FROM http_cache WHERE url=? AND expires > ?",
169            (url, now),
170            |row| {
171                let blob_name: String = row.get(0)?;
172                let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
173                let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
174                let stale_timestamp: i64 = row.get(3)?;
175                Ok((blob_name, mimetype, encoding, stale_timestamp))
176            },
177        )
178        .await?
179    else {
180        return Ok(None);
181    };
182    let is_stale = now > stale_timestamp;
183
184    let blob_object = BlobObject::from_name(context, &blob_name)?;
185    let blob_abs_path = blob_object.to_abs_path();
186    let blob = match fs::read(blob_abs_path)
187        .await
188        .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
189    {
190        Ok(blob) => blob,
191        Err(err) => {
192            // This should not happen, but user may go into the blobdir and remove files,
193            // antivirus may delete the file or there may be a bug in housekeeping.
194            warn!(context, "{err:?}.");
195            return Ok(None);
196        }
197    };
198
199    let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
200    let response = Response {
201        blob,
202        mimetype,
203        encoding,
204    };
205
206    // Update expiration timestamp
207    // to prevent deletion of the file still in use.
208    //
209    // If the response is stale, the caller should revalidate it in the background, so update
210    // `stale` timestamp to avoid revalidating too frequently (and have many parallel revalidation
211    // tasks) if revalidation fails or the HTTP request takes some time. The stale period >= 1 hour,
212    // so 1 more minute won't be a problem.
213    let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
214    context
215        .sql
216        .execute(
217            "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
218            (expires, stale_timestamp, url),
219        )
220        .await?;
221
222    Ok(Some((response, is_stale)))
223}
224
225/// Removes expired cache entries.
226pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
227    // Remove cache entries that are already expired
228    // or entries that will not expire in a year
229    // to make sure we don't have invalid timestamps that are way forward in the future.
230    context
231        .sql
232        .execute(
233            "DELETE FROM http_cache
234             WHERE ?1 > expires OR expires > ?1 + 31536000",
235            (time(),),
236        )
237        .await?;
238    Ok(())
239}
240
241/// Fetches URL and updates the cache.
242///
243/// URL is fetched regardless of whether there is an existing result in the cache.
244async fn fetch_url(context: &Context, original_url: &str) -> Result<Response> {
245    let mut url = original_url.to_string();
246
247    // Follow up to 10 http-redirects
248    for _i in 0..10 {
249        let parsed_url = url
250            .parse::<hyper::Uri>()
251            .with_context(|| format!("Failed to parse URL {url:?}"))?;
252
253        let mut sender = get_http_sender(context, parsed_url.clone()).await?;
254        let authority = parsed_url
255            .authority()
256            .context("URL has no authority")?
257            .clone();
258
259        let req = hyper::Request::builder().uri(parsed_url);
260
261        // OSM usage policy requires
262        // that User-Agent is set for HTTP GET requests
263        // to tile servers:
264        // <https://operations.osmfoundation.org/policies/tiles/>
265        // Same for vectory tiles
266        // at <https://operations.osmfoundation.org/policies/vector/>.
267        let req =
268            if authority == "tile.openstreetmap.org" || authority == "vector.openstreetmap.org" {
269                req.header("User-Agent", USER_AGENT)
270            } else {
271                req
272            };
273
274        let req = req
275            .header(hyper::header::HOST, authority.as_str())
276            .body(http_body_util::Empty::<Bytes>::new())?;
277        let response = sender.send_request(req).await?;
278
279        if response.status().is_redirection() {
280            let header = response
281                .headers()
282                .get_all("location")
283                .iter()
284                .next_back()
285                .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
286                .to_str()?;
287            info!(context, "Following redirect to {}", header);
288            url = header.to_string();
289            continue;
290        }
291
292        if !response.status().is_success() {
293            return Err(anyhow!(
294                "The server returned a non-successful response code: {}{}",
295                response.status().as_u16(),
296                response
297                    .status()
298                    .canonical_reason()
299                    .map(|s| format!(" {s}"))
300                    .unwrap_or("".to_string())
301            ));
302        }
303
304        let content_type = response
305            .headers()
306            .get("content-type")
307            .and_then(|value| value.to_str().ok())
308            .and_then(|value| value.parse::<Mime>().ok());
309        let mimetype = content_type
310            .as_ref()
311            .map(|mime| mime.essence_str().to_string());
312        let encoding = content_type.as_ref().and_then(|mime| {
313            mime.get_param(mime::CHARSET)
314                .map(|charset| charset.as_str().to_string())
315        });
316        let body = response.collect().await?.to_bytes();
317        let blob: Vec<u8> = body.to_vec();
318        let response = Response {
319            blob,
320            mimetype,
321            encoding,
322        };
323        info!(context, "Inserting {original_url:?} into cache.");
324        http_cache_put(context, &url, &response).await?;
325        return Ok(response);
326    }
327
328    Err(anyhow!("Followed 10 redirections"))
329}
330
331/// Retrieves the binary contents of URL using HTTP GET request.
332pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
333    if let Some((response, is_stale)) = http_cache_get(context, url).await? {
334        info!(context, "Returning {url:?} from cache.");
335        if is_stale {
336            let context = context.clone();
337            let url = url.to_string();
338            tokio::spawn(async move {
339                // Fetch URL in background to update the cache.
340                info!(context, "Fetching stale {url:?} in background.");
341                if let Err(err) = fetch_url(&context, &url).await {
342                    warn!(context, "Failed to revalidate {url:?}: {err:#}.");
343                }
344            });
345        }
346        return Ok(response);
347    }
348
349    info!(context, "Not found {url:?} in cache, fetching.");
350    let response = fetch_url(context, url).await?;
351    Ok(response)
352}
353
354/// Sends an empty POST request to the URL.
355///
356/// Returns response text and whether request was successful or not.
357///
358/// Does not follow redirects.
359pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
360    let parsed_url = url
361        .parse::<hyper::Uri>()
362        .with_context(|| format!("Failed to parse URL {url:?}"))?;
363    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
364    if scheme != "https" {
365        bail!("POST requests to non-HTTPS URLs are not allowed");
366    }
367
368    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
369    let authority = parsed_url
370        .authority()
371        .context("URL has no authority")?
372        .clone();
373    let req = hyper::Request::post(parsed_url)
374        .header(hyper::header::HOST, authority.as_str())
375        .body(http_body_util::Empty::<Bytes>::new())?;
376
377    let response = sender.send_request(req).await?;
378
379    let response_status = response.status();
380    let body = response.collect().await?.to_bytes();
381    let text = String::from_utf8_lossy(&body);
382    let response_text = text.to_string();
383
384    Ok((response_text, response_status.is_success()))
385}
386
387/// Posts string to the given URL.
388///
389/// Returns true if successful HTTP response code was returned.
390///
391/// Does not follow redirects.
392#[allow(dead_code)]
393pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
394    let parsed_url = url
395        .parse::<hyper::Uri>()
396        .with_context(|| format!("Failed to parse URL {url:?}"))?;
397    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
398    if scheme != "https" {
399        bail!("POST requests to non-HTTPS URLs are not allowed");
400    }
401
402    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
403    let authority = parsed_url
404        .authority()
405        .context("URL has no authority")?
406        .clone();
407
408    let request = hyper::Request::post(parsed_url)
409        .header(hyper::header::HOST, authority.as_str())
410        .body(body)?;
411    let response = sender.send_request(request).await?;
412
413    Ok(response.status().is_success())
414}
415
416/// Sends a POST request with x-www-form-urlencoded data.
417///
418/// Does not follow redirects.
419pub(crate) async fn post_form<T: Serialize + ?Sized>(
420    context: &Context,
421    url: &str,
422    form: &T,
423) -> Result<Bytes> {
424    let parsed_url = url
425        .parse::<hyper::Uri>()
426        .with_context(|| format!("Failed to parse URL {url:?}"))?;
427    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
428    if scheme != "https" {
429        bail!("POST requests to non-HTTPS URLs are not allowed");
430    }
431
432    let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
433    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
434    let authority = parsed_url
435        .authority()
436        .context("URL has no authority")?
437        .clone();
438    let request = hyper::Request::post(parsed_url)
439        .header(hyper::header::HOST, authority.as_str())
440        .header("content-type", "application/x-www-form-urlencoded")
441        .body(encoded_body)?;
442    let response = sender.send_request(request).await?;
443    let bytes = response.collect().await?.to_bytes();
444    Ok(bytes)
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use std::time::Duration;
451
452    use crate::sql::housekeeping;
453    use crate::test_utils::TestContext;
454    use crate::tools::SystemTime;
455
456    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
457    async fn test_http_cache() -> Result<()> {
458        let t = &TestContext::new().await;
459
460        assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
461
462        let html_response = Response {
463            blob: b"<!DOCTYPE html> ...".to_vec(),
464            mimetype: Some("text/html".to_string()),
465            encoding: None,
466        };
467
468        let xdc_response = Response {
469            blob: b"PK...".to_vec(),
470            mimetype: Some("application/octet-stream".to_string()),
471            encoding: None,
472        };
473        let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
474        let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
475
476        http_cache_put(t, "https://webxdc.org/", &html_response).await?;
477
478        assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
479        assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
480        assert_eq!(
481            http_cache_get(t, "https://webxdc.org/").await?,
482            Some((html_response.clone(), false))
483        );
484
485        http_cache_put(t, xdc_editor_url, &xdc_response).await?;
486        http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
487        assert_eq!(
488            http_cache_get(t, xdc_editor_url).await?,
489            Some((xdc_response.clone(), false))
490        );
491        assert_eq!(
492            http_cache_get(t, xdc_pixel_url).await?,
493            Some((xdc_response.clone(), false))
494        );
495
496        assert_eq!(
497            http_cache_get(t, "https://webxdc.org/").await?,
498            Some((html_response.clone(), false))
499        );
500
501        // HTML is stale after 1 hour, but .xdc is not.
502        SystemTime::shift(Duration::from_secs(3600 + 100));
503        assert_eq!(
504            http_cache_get(t, "https://webxdc.org/").await?,
505            Some((html_response.clone(), true))
506        );
507        assert_eq!(
508            http_cache_get(t, xdc_editor_url).await?,
509            Some((xdc_response.clone(), false))
510        );
511
512        // Stale cache entry can be renewed
513        // even before housekeeping removes old one.
514        http_cache_put(t, "https://webxdc.org/", &html_response).await?;
515        assert_eq!(
516            http_cache_get(t, "https://webxdc.org/").await?,
517            Some((html_response.clone(), false))
518        );
519
520        // 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour.
521        // But editor is still there because we did not request it for just 35 days.
522        // We have not renewed the editor however, so it becomes stale.
523        SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
524
525        // Run housekeeping to test that it does not delete the blob too early.
526        housekeeping(t).await?;
527
528        assert_eq!(
529            http_cache_get(t, xdc_editor_url).await?,
530            Some((xdc_response.clone(), true))
531        );
532        assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
533
534        // If we get the blob the second time quickly, it shouldn't be stale because it's supposed
535        // that we've already run a revalidation task which will update the blob soon.
536        assert_eq!(
537            http_cache_get(t, xdc_editor_url).await?,
538            Some((xdc_response.clone(), false))
539        );
540        // But if the revalidation task hasn't succeeded after some time, the blob is stale again
541        // even if we continue to get it frequently.
542        for i in (0..100).rev() {
543            SystemTime::shift(Duration::from_secs(6));
544            if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
545                break;
546            }
547            assert!(i > 0);
548        }
549
550        // Test that if the file is accidentally removed from the blobdir,
551        // there is no error when trying to load the cache entry.
552        for entry in std::fs::read_dir(t.get_blobdir())? {
553            let entry = entry.unwrap();
554            let path = entry.path();
555            std::fs::remove_file(path).expect("Failed to remove blob");
556        }
557
558        assert_eq!(
559            http_cache_get(t, xdc_editor_url)
560                .await
561                .context("Failed to get no cache response")?,
562            None
563        );
564
565        Ok(())
566    }
567}