deltachat/net/
http.rs

1//! # HTTP module.
2
3use anyhow::{anyhow, bail, Context as _, Result};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::log::{info, warn};
14use crate::net::proxy::ProxyConfig;
15use crate::net::session::SessionStream;
16use crate::net::tls::wrap_rustls;
17use crate::tools::time;
18
19/// HTTP(S) GET response.
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct Response {
22    /// Response body.
23    pub blob: Vec<u8>,
24
25    /// MIME type extracted from the `Content-Type` header, if any.
26    pub mimetype: Option<String>,
27
28    /// Encoding extracted from the `Content-Type` header, if any.
29    pub encoding: Option<String>,
30}
31
32/// Retrieves the text contents of URL using HTTP GET request.
33pub async fn read_url(context: &Context, url: &str) -> Result<String> {
34    let response = read_url_blob(context, url).await?;
35    let text = String::from_utf8_lossy(&response.blob);
36    Ok(text.to_string())
37}
38
39async fn get_http_sender<B>(
40    context: &Context,
41    parsed_url: hyper::Uri,
42) -> Result<hyper::client::conn::http1::SendRequest<B>>
43where
44    B: hyper::body::Body + 'static + Send,
45    B::Data: Send,
46    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
47{
48    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
49    let host = parsed_url.host().context("URL has no host")?;
50    let proxy_config_opt = ProxyConfig::load(context).await?;
51
52    let stream: Box<dyn SessionStream> = match scheme {
53        "http" => {
54            let port = parsed_url.port_u16().unwrap_or(80);
55
56            // It is safe to use cached IP addresses
57            // for HTTPS URLs, but for HTTP URLs
58            // better resolve from scratch each time to prevent
59            // cache poisoning attacks from having lasting effects.
60            let load_cache = false;
61            if let Some(proxy_config) = proxy_config_opt {
62                let proxy_stream = proxy_config
63                    .connect(context, host, port, load_cache)
64                    .await?;
65                Box::new(proxy_stream)
66            } else {
67                let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
68                Box::new(tcp_stream)
69            }
70        }
71        "https" => {
72            let port = parsed_url.port_u16().unwrap_or(443);
73            let load_cache = true;
74
75            if let Some(proxy_config) = proxy_config_opt {
76                let proxy_stream = proxy_config
77                    .connect(context, host, port, load_cache)
78                    .await?;
79                let tls_stream = wrap_rustls(host, &[], proxy_stream).await?;
80                Box::new(tls_stream)
81            } else {
82                let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
83                let tls_stream = wrap_rustls(host, &[], tcp_stream).await?;
84                Box::new(tls_stream)
85            }
86        }
87        _ => bail!("Unknown URL scheme"),
88    };
89
90    let io = TokioIo::new(stream);
91    let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
92    tokio::task::spawn(conn);
93
94    Ok(sender)
95}
96
97/// Converts the URL to expiration and stale timestamps.
98fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
99    let now = time();
100
101    let expires = now + 3600 * 24 * 35;
102    let stale = if url.ends_with(".xdc") {
103        // WebXDCs are never stale, they just expire.
104        expires
105    } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
106        // Cache images for 1 day.
107        //
108        // As of 2024-12-12 WebXDC icons at <https://webxdc.org/apps/>
109        // use the same path for all app versions,
110        // so may change, but it is not critical if outdated icon is displayed.
111        now + 3600 * 24
112    } else {
113        // Revalidate everything else after 1 hour.
114        //
115        // This includes HTML, CSS and JS.
116        now + 3600
117    };
118    (expires, stale)
119}
120
121/// Places the binary into HTTP cache.
122async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
123    let blob =
124        BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
125
126    let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
127    context
128        .sql
129        .insert(
130            "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
131             VALUES (?, ?, ?, ?, ?, ?)",
132            (
133                url,
134                expires,
135                stale,
136                blob.as_name(),
137                response.mimetype.as_deref().unwrap_or_default(),
138                response.encoding.as_deref().unwrap_or_default(),
139            ),
140        )
141        .await?;
142
143    Ok(())
144}
145
146/// Retrieves the binary from HTTP cache.
147///
148/// Also returns if the response is stale and should be revalidated in the background.
149async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
150    let now = time();
151    let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
152        .sql
153        .query_row_optional(
154            "SELECT blobname, mimetype, encoding, stale
155             FROM http_cache WHERE url=? AND expires > ?",
156            (url, now),
157            |row| {
158                let blob_name: String = row.get(0)?;
159                let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
160                let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
161                let stale_timestamp: i64 = row.get(3)?;
162                Ok((blob_name, mimetype, encoding, stale_timestamp))
163            },
164        )
165        .await?
166    else {
167        return Ok(None);
168    };
169    let is_stale = now > stale_timestamp;
170
171    let blob_object = BlobObject::from_name(context, &blob_name)?;
172    let blob_abs_path = blob_object.to_abs_path();
173    let blob = match fs::read(blob_abs_path)
174        .await
175        .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
176    {
177        Ok(blob) => blob,
178        Err(err) => {
179            // This should not happen, but user may go into the blobdir and remove files,
180            // antivirus may delete the file or there may be a bug in housekeeping.
181            warn!(context, "{err:?}.");
182            return Ok(None);
183        }
184    };
185
186    let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
187    let response = Response {
188        blob,
189        mimetype,
190        encoding,
191    };
192
193    // Update expiration timestamp
194    // to prevent deletion of the file still in use.
195    //
196    // If the response is stale, the caller should revalidate it in the background, so update
197    // `stale` timestamp to avoid revalidating too frequently (and have many parallel revalidation
198    // tasks) if revalidation fails or the HTTP request takes some time. The stale period >= 1 hour,
199    // so 1 more minute won't be a problem.
200    let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
201    context
202        .sql
203        .execute(
204            "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
205            (expires, stale_timestamp, url),
206        )
207        .await?;
208
209    Ok(Some((response, is_stale)))
210}
211
212/// Removes expired cache entries.
213pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
214    // Remove cache entries that are already expired
215    // or entries that will not expire in a year
216    // to make sure we don't have invalid timestamps that are way forward in the future.
217    context
218        .sql
219        .execute(
220            "DELETE FROM http_cache
221             WHERE ?1 > expires OR expires > ?1 + 31536000",
222            (time(),),
223        )
224        .await?;
225    Ok(())
226}
227
228/// Fetches URL and updates the cache.
229///
230/// URL is fetched regardless of whether there is an existing result in the cache.
231async fn fetch_url(context: &Context, original_url: &str) -> Result<Response> {
232    let mut url = original_url.to_string();
233
234    // Follow up to 10 http-redirects
235    for _i in 0..10 {
236        let parsed_url = url
237            .parse::<hyper::Uri>()
238            .with_context(|| format!("Failed to parse URL {url:?}"))?;
239
240        let mut sender = get_http_sender(context, parsed_url.clone()).await?;
241        let authority = parsed_url
242            .authority()
243            .context("URL has no authority")?
244            .clone();
245
246        let req = hyper::Request::builder()
247            .uri(parsed_url.path())
248            .header(hyper::header::HOST, authority.as_str())
249            .body(http_body_util::Empty::<Bytes>::new())?;
250        let response = sender.send_request(req).await?;
251
252        if response.status().is_redirection() {
253            let header = response
254                .headers()
255                .get_all("location")
256                .iter()
257                .next_back()
258                .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
259                .to_str()?;
260            info!(context, "Following redirect to {}", header);
261            url = header.to_string();
262            continue;
263        }
264
265        if !response.status().is_success() {
266            return Err(anyhow!(
267                "The server returned a non-successful response code: {}{}",
268                response.status().as_u16(),
269                response
270                    .status()
271                    .canonical_reason()
272                    .map(|s| format!(" {s}"))
273                    .unwrap_or("".to_string())
274            ));
275        }
276
277        let content_type = response
278            .headers()
279            .get("content-type")
280            .and_then(|value| value.to_str().ok())
281            .and_then(|value| value.parse::<Mime>().ok());
282        let mimetype = content_type
283            .as_ref()
284            .map(|mime| mime.essence_str().to_string());
285        let encoding = content_type.as_ref().and_then(|mime| {
286            mime.get_param(mime::CHARSET)
287                .map(|charset| charset.as_str().to_string())
288        });
289        let body = response.collect().await?.to_bytes();
290        let blob: Vec<u8> = body.to_vec();
291        let response = Response {
292            blob,
293            mimetype,
294            encoding,
295        };
296        info!(context, "Inserting {original_url:?} into cache.");
297        http_cache_put(context, &url, &response).await?;
298        return Ok(response);
299    }
300
301    Err(anyhow!("Followed 10 redirections"))
302}
303
304/// Retrieves the binary contents of URL using HTTP GET request.
305pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
306    if let Some((response, is_stale)) = http_cache_get(context, url).await? {
307        info!(context, "Returning {url:?} from cache.");
308        if is_stale {
309            let context = context.clone();
310            let url = url.to_string();
311            tokio::spawn(async move {
312                // Fetch URL in background to update the cache.
313                info!(context, "Fetching stale {url:?} in background.");
314                if let Err(err) = fetch_url(&context, &url).await {
315                    warn!(context, "Failed to revalidate {url:?}: {err:#}.");
316                }
317            });
318        }
319        return Ok(response);
320    }
321
322    info!(context, "Not found {url:?} in cache, fetching.");
323    let response = fetch_url(context, url).await?;
324    Ok(response)
325}
326
327/// Sends an empty POST request to the URL.
328///
329/// Returns response text and whether request was successful or not.
330///
331/// Does not follow redirects.
332pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
333    let parsed_url = url
334        .parse::<hyper::Uri>()
335        .with_context(|| format!("Failed to parse URL {url:?}"))?;
336    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
337    if scheme != "https" {
338        bail!("POST requests to non-HTTPS URLs are not allowed");
339    }
340
341    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
342    let authority = parsed_url
343        .authority()
344        .context("URL has no authority")?
345        .clone();
346    let req = hyper::Request::post(parsed_url.path())
347        .header(hyper::header::HOST, authority.as_str())
348        .body(http_body_util::Empty::<Bytes>::new())?;
349
350    let response = sender.send_request(req).await?;
351
352    let response_status = response.status();
353    let body = response.collect().await?.to_bytes();
354    let text = String::from_utf8_lossy(&body);
355    let response_text = text.to_string();
356
357    Ok((response_text, response_status.is_success()))
358}
359
360/// Posts string to the given URL.
361///
362/// Returns true if successful HTTP response code was returned.
363///
364/// Does not follow redirects.
365#[allow(dead_code)]
366pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
367    let parsed_url = url
368        .parse::<hyper::Uri>()
369        .with_context(|| format!("Failed to parse URL {url:?}"))?;
370    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
371    if scheme != "https" {
372        bail!("POST requests to non-HTTPS URLs are not allowed");
373    }
374
375    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
376    let authority = parsed_url
377        .authority()
378        .context("URL has no authority")?
379        .clone();
380
381    let request = hyper::Request::post(parsed_url.path())
382        .header(hyper::header::HOST, authority.as_str())
383        .body(body)?;
384    let response = sender.send_request(request).await?;
385
386    Ok(response.status().is_success())
387}
388
389/// Sends a POST request with x-www-form-urlencoded data.
390///
391/// Does not follow redirects.
392pub(crate) async fn post_form<T: Serialize + ?Sized>(
393    context: &Context,
394    url: &str,
395    form: &T,
396) -> Result<Bytes> {
397    let parsed_url = url
398        .parse::<hyper::Uri>()
399        .with_context(|| format!("Failed to parse URL {url:?}"))?;
400    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
401    if scheme != "https" {
402        bail!("POST requests to non-HTTPS URLs are not allowed");
403    }
404
405    let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
406    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
407    let authority = parsed_url
408        .authority()
409        .context("URL has no authority")?
410        .clone();
411    let request = hyper::Request::post(parsed_url.path())
412        .header(hyper::header::HOST, authority.as_str())
413        .header("content-type", "application/x-www-form-urlencoded")
414        .body(encoded_body)?;
415    let response = sender.send_request(request).await?;
416    let bytes = response.collect().await?.to_bytes();
417    Ok(bytes)
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    use std::time::Duration;
424
425    use crate::sql::housekeeping;
426    use crate::test_utils::TestContext;
427    use crate::tools::SystemTime;
428
429    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
430    async fn test_http_cache() -> Result<()> {
431        let t = &TestContext::new().await;
432
433        assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
434
435        let html_response = Response {
436            blob: b"<!DOCTYPE html> ...".to_vec(),
437            mimetype: Some("text/html".to_string()),
438            encoding: None,
439        };
440
441        let xdc_response = Response {
442            blob: b"PK...".to_vec(),
443            mimetype: Some("application/octet-stream".to_string()),
444            encoding: None,
445        };
446        let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
447        let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
448
449        http_cache_put(t, "https://webxdc.org/", &html_response).await?;
450
451        assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
452        assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
453        assert_eq!(
454            http_cache_get(t, "https://webxdc.org/").await?,
455            Some((html_response.clone(), false))
456        );
457
458        http_cache_put(t, xdc_editor_url, &xdc_response).await?;
459        http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
460        assert_eq!(
461            http_cache_get(t, xdc_editor_url).await?,
462            Some((xdc_response.clone(), false))
463        );
464        assert_eq!(
465            http_cache_get(t, xdc_pixel_url).await?,
466            Some((xdc_response.clone(), false))
467        );
468
469        assert_eq!(
470            http_cache_get(t, "https://webxdc.org/").await?,
471            Some((html_response.clone(), false))
472        );
473
474        // HTML is stale after 1 hour, but .xdc is not.
475        SystemTime::shift(Duration::from_secs(3600 + 100));
476        assert_eq!(
477            http_cache_get(t, "https://webxdc.org/").await?,
478            Some((html_response.clone(), true))
479        );
480        assert_eq!(
481            http_cache_get(t, xdc_editor_url).await?,
482            Some((xdc_response.clone(), false))
483        );
484
485        // Stale cache entry can be renewed
486        // even before housekeeping removes old one.
487        http_cache_put(t, "https://webxdc.org/", &html_response).await?;
488        assert_eq!(
489            http_cache_get(t, "https://webxdc.org/").await?,
490            Some((html_response.clone(), false))
491        );
492
493        // 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour.
494        // But editor is still there because we did not request it for just 35 days.
495        // We have not renewed the editor however, so it becomes stale.
496        SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
497
498        // Run housekeeping to test that it does not delete the blob too early.
499        housekeeping(t).await?;
500
501        assert_eq!(
502            http_cache_get(t, xdc_editor_url).await?,
503            Some((xdc_response.clone(), true))
504        );
505        assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
506
507        // If we get the blob the second time quickly, it shouldn't be stale because it's supposed
508        // that we've already run a revalidation task which will update the blob soon.
509        assert_eq!(
510            http_cache_get(t, xdc_editor_url).await?,
511            Some((xdc_response.clone(), false))
512        );
513        // But if the revalidation task hasn't succeeded after some time, the blob is stale again
514        // even if we continue to get it frequently.
515        for i in (0..100).rev() {
516            SystemTime::shift(Duration::from_secs(6));
517            if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
518                break;
519            }
520            assert!(i > 0);
521        }
522
523        // Test that if the file is accidentally removed from the blobdir,
524        // there is no error when trying to load the cache entry.
525        for entry in std::fs::read_dir(t.get_blobdir())? {
526            let entry = entry.unwrap();
527            let path = entry.path();
528            std::fs::remove_file(path).expect("Failed to remove blob");
529        }
530
531        assert_eq!(
532            http_cache_get(t, xdc_editor_url)
533                .await
534                .context("Failed to get no cache response")?,
535            None
536        );
537
538        Ok(())
539    }
540}