deltachat/net/
http.rs

1//! # HTTP module.
2
3use anyhow::{anyhow, bail, Context as _, Result};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::net::proxy::ProxyConfig;
14use crate::net::session::SessionStream;
15use crate::net::tls::wrap_rustls;
16use crate::tools::time;
17
18/// HTTP(S) GET response.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct Response {
21    /// Response body.
22    pub blob: Vec<u8>,
23
24    /// MIME type extracted from the `Content-Type` header, if any.
25    pub mimetype: Option<String>,
26
27    /// Encoding extracted from the `Content-Type` header, if any.
28    pub encoding: Option<String>,
29}
30
31/// Retrieves the text contents of URL using HTTP GET request.
32pub async fn read_url(context: &Context, url: &str) -> Result<String> {
33    let response = read_url_blob(context, url).await?;
34    let text = String::from_utf8_lossy(&response.blob);
35    Ok(text.to_string())
36}
37
38async fn get_http_sender<B>(
39    context: &Context,
40    parsed_url: hyper::Uri,
41) -> Result<hyper::client::conn::http1::SendRequest<B>>
42where
43    B: hyper::body::Body + 'static + Send,
44    B::Data: Send,
45    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
46{
47    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
48    let host = parsed_url.host().context("URL has no host")?;
49    let proxy_config_opt = ProxyConfig::load(context).await?;
50
51    let stream: Box<dyn SessionStream> = match scheme {
52        "http" => {
53            let port = parsed_url.port_u16().unwrap_or(80);
54
55            // It is safe to use cached IP addresses
56            // for HTTPS URLs, but for HTTP URLs
57            // better resolve from scratch each time to prevent
58            // cache poisoning attacks from having lasting effects.
59            let load_cache = false;
60            if let Some(proxy_config) = proxy_config_opt {
61                let proxy_stream = proxy_config
62                    .connect(context, host, port, load_cache)
63                    .await?;
64                Box::new(proxy_stream)
65            } else {
66                let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
67                Box::new(tcp_stream)
68            }
69        }
70        "https" => {
71            let port = parsed_url.port_u16().unwrap_or(443);
72            let load_cache = true;
73
74            if let Some(proxy_config) = proxy_config_opt {
75                let proxy_stream = proxy_config
76                    .connect(context, host, port, load_cache)
77                    .await?;
78                let tls_stream = wrap_rustls(host, &[], proxy_stream).await?;
79                Box::new(tls_stream)
80            } else {
81                let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
82                let tls_stream = wrap_rustls(host, &[], tcp_stream).await?;
83                Box::new(tls_stream)
84            }
85        }
86        _ => bail!("Unknown URL scheme"),
87    };
88
89    let io = TokioIo::new(stream);
90    let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
91    tokio::task::spawn(conn);
92
93    Ok(sender)
94}
95
96/// Converts the URL to expiration and stale timestamps.
97fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
98    let now = time();
99
100    let expires = now + 3600 * 24 * 35;
101    let stale = if url.ends_with(".xdc") {
102        // WebXDCs are never stale, they just expire.
103        expires
104    } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
105        // Cache images for 1 day.
106        //
107        // As of 2024-12-12 WebXDC icons at <https://webxdc.org/apps/>
108        // use the same path for all app versions,
109        // so may change, but it is not critical if outdated icon is displayed.
110        now + 3600 * 24
111    } else {
112        // Revalidate everything else after 1 hour.
113        //
114        // This includes HTML, CSS and JS.
115        now + 3600
116    };
117    (expires, stale)
118}
119
120/// Places the binary into HTTP cache.
121async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
122    let blob =
123        BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
124
125    let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
126    context
127        .sql
128        .insert(
129            "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
130             VALUES (?, ?, ?, ?, ?, ?)",
131            (
132                url,
133                expires,
134                stale,
135                blob.as_name(),
136                response.mimetype.as_deref().unwrap_or_default(),
137                response.encoding.as_deref().unwrap_or_default(),
138            ),
139        )
140        .await?;
141
142    Ok(())
143}
144
145/// Retrieves the binary from HTTP cache.
146///
147/// Also returns if the response is stale and should be revalidated in the background.
148async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
149    let now = time();
150    let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
151        .sql
152        .query_row_optional(
153            "SELECT blobname, mimetype, encoding, stale
154             FROM http_cache WHERE url=? AND expires > ?",
155            (url, now),
156            |row| {
157                let blob_name: String = row.get(0)?;
158                let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
159                let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
160                let stale_timestamp: i64 = row.get(3)?;
161                Ok((blob_name, mimetype, encoding, stale_timestamp))
162            },
163        )
164        .await?
165    else {
166        return Ok(None);
167    };
168    let is_stale = now > stale_timestamp;
169
170    let blob_object = BlobObject::from_name(context, &blob_name)?;
171    let blob_abs_path = blob_object.to_abs_path();
172    let blob = match fs::read(blob_abs_path)
173        .await
174        .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
175    {
176        Ok(blob) => blob,
177        Err(err) => {
178            // This should not happen, but user may go into the blobdir and remove files,
179            // antivirus may delete the file or there may be a bug in housekeeping.
180            warn!(context, "{err:?}.");
181            return Ok(None);
182        }
183    };
184
185    let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
186    let response = Response {
187        blob,
188        mimetype,
189        encoding,
190    };
191
192    // Update expiration timestamp
193    // to prevent deletion of the file still in use.
194    //
195    // If the response is stale, the caller should revalidate it in the background, so update
196    // `stale` timestamp to avoid revalidating too frequently (and have many parallel revalidation
197    // tasks) if revalidation fails or the HTTP request takes some time. The stale period >= 1 hour,
198    // so 1 more minute won't be a problem.
199    let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
200    context
201        .sql
202        .execute(
203            "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
204            (expires, stale_timestamp, url),
205        )
206        .await?;
207
208    Ok(Some((response, is_stale)))
209}
210
211/// Removes expired cache entries.
212pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
213    // Remove cache entries that are already expired
214    // or entries that will not expire in a year
215    // to make sure we don't have invalid timestamps that are way forward in the future.
216    context
217        .sql
218        .execute(
219            "DELETE FROM http_cache
220             WHERE ?1 > expires OR expires > ?1 + 31536000",
221            (time(),),
222        )
223        .await?;
224    Ok(())
225}
226
227/// Fetches URL and updates the cache.
228///
229/// URL is fetched regardless of whether there is an existing result in the cache.
230async fn fetch_url(context: &Context, original_url: &str) -> Result<Response> {
231    let mut url = original_url.to_string();
232
233    // Follow up to 10 http-redirects
234    for _i in 0..10 {
235        let parsed_url = url
236            .parse::<hyper::Uri>()
237            .with_context(|| format!("Failed to parse URL {url:?}"))?;
238
239        let mut sender = get_http_sender(context, parsed_url.clone()).await?;
240        let authority = parsed_url
241            .authority()
242            .context("URL has no authority")?
243            .clone();
244
245        let req = hyper::Request::builder()
246            .uri(parsed_url.path())
247            .header(hyper::header::HOST, authority.as_str())
248            .body(http_body_util::Empty::<Bytes>::new())?;
249        let response = sender.send_request(req).await?;
250
251        if response.status().is_redirection() {
252            let header = response
253                .headers()
254                .get_all("location")
255                .iter()
256                .next_back()
257                .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
258                .to_str()?;
259            info!(context, "Following redirect to {}", header);
260            url = header.to_string();
261            continue;
262        }
263
264        let content_type = response
265            .headers()
266            .get("content-type")
267            .and_then(|value| value.to_str().ok())
268            .and_then(|value| value.parse::<Mime>().ok());
269        let mimetype = content_type
270            .as_ref()
271            .map(|mime| mime.essence_str().to_string());
272        let encoding = content_type.as_ref().and_then(|mime| {
273            mime.get_param(mime::CHARSET)
274                .map(|charset| charset.as_str().to_string())
275        });
276        let body = response.collect().await?.to_bytes();
277        let blob: Vec<u8> = body.to_vec();
278        let response = Response {
279            blob,
280            mimetype,
281            encoding,
282        };
283        info!(context, "Inserting {original_url:?} into cache.");
284        http_cache_put(context, &url, &response).await?;
285        return Ok(response);
286    }
287
288    Err(anyhow!("Followed 10 redirections"))
289}
290
291/// Retrieves the binary contents of URL using HTTP GET request.
292pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
293    if let Some((response, is_stale)) = http_cache_get(context, url).await? {
294        info!(context, "Returning {url:?} from cache.");
295        if is_stale {
296            let context = context.clone();
297            let url = url.to_string();
298            tokio::spawn(async move {
299                // Fetch URL in background to update the cache.
300                info!(context, "Fetching stale {url:?} in background.");
301                if let Err(err) = fetch_url(&context, &url).await {
302                    warn!(context, "Failed to revalidate {url:?}: {err:#}.");
303                }
304            });
305        }
306        return Ok(response);
307    }
308
309    info!(context, "Not found {url:?} in cache, fetching.");
310    let response = fetch_url(context, url).await?;
311    Ok(response)
312}
313
314/// Sends an empty POST request to the URL.
315///
316/// Returns response text and whether request was successful or not.
317///
318/// Does not follow redirects.
319pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
320    let parsed_url = url
321        .parse::<hyper::Uri>()
322        .with_context(|| format!("Failed to parse URL {url:?}"))?;
323    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
324    if scheme != "https" {
325        bail!("POST requests to non-HTTPS URLs are not allowed");
326    }
327
328    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
329    let authority = parsed_url
330        .authority()
331        .context("URL has no authority")?
332        .clone();
333    let req = hyper::Request::post(parsed_url.path())
334        .header(hyper::header::HOST, authority.as_str())
335        .body(http_body_util::Empty::<Bytes>::new())?;
336
337    let response = sender.send_request(req).await?;
338
339    let response_status = response.status();
340    let body = response.collect().await?.to_bytes();
341    let text = String::from_utf8_lossy(&body);
342    let response_text = text.to_string();
343
344    Ok((response_text, response_status.is_success()))
345}
346
347/// Posts string to the given URL.
348///
349/// Returns true if successful HTTP response code was returned.
350///
351/// Does not follow redirects.
352#[allow(dead_code)]
353pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
354    let parsed_url = url
355        .parse::<hyper::Uri>()
356        .with_context(|| format!("Failed to parse URL {url:?}"))?;
357    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
358    if scheme != "https" {
359        bail!("POST requests to non-HTTPS URLs are not allowed");
360    }
361
362    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
363    let authority = parsed_url
364        .authority()
365        .context("URL has no authority")?
366        .clone();
367
368    let request = hyper::Request::post(parsed_url.path())
369        .header(hyper::header::HOST, authority.as_str())
370        .body(body)?;
371    let response = sender.send_request(request).await?;
372
373    Ok(response.status().is_success())
374}
375
376/// Sends a POST request with x-www-form-urlencoded data.
377///
378/// Does not follow redirects.
379pub(crate) async fn post_form<T: Serialize + ?Sized>(
380    context: &Context,
381    url: &str,
382    form: &T,
383) -> Result<Bytes> {
384    let parsed_url = url
385        .parse::<hyper::Uri>()
386        .with_context(|| format!("Failed to parse URL {url:?}"))?;
387    let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
388    if scheme != "https" {
389        bail!("POST requests to non-HTTPS URLs are not allowed");
390    }
391
392    let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
393    let mut sender = get_http_sender(context, parsed_url.clone()).await?;
394    let authority = parsed_url
395        .authority()
396        .context("URL has no authority")?
397        .clone();
398    let request = hyper::Request::post(parsed_url.path())
399        .header(hyper::header::HOST, authority.as_str())
400        .header("content-type", "application/x-www-form-urlencoded")
401        .body(encoded_body)?;
402    let response = sender.send_request(request).await?;
403    let bytes = response.collect().await?.to_bytes();
404    Ok(bytes)
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use std::time::Duration;
411
412    use crate::sql::housekeeping;
413    use crate::test_utils::TestContext;
414    use crate::tools::SystemTime;
415
416    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
417    async fn test_http_cache() -> Result<()> {
418        let t = &TestContext::new().await;
419
420        assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
421
422        let html_response = Response {
423            blob: b"<!DOCTYPE html> ...".to_vec(),
424            mimetype: Some("text/html".to_string()),
425            encoding: None,
426        };
427
428        let xdc_response = Response {
429            blob: b"PK...".to_vec(),
430            mimetype: Some("application/octet-stream".to_string()),
431            encoding: None,
432        };
433        let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
434        let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
435
436        http_cache_put(t, "https://webxdc.org/", &html_response).await?;
437
438        assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
439        assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
440        assert_eq!(
441            http_cache_get(t, "https://webxdc.org/").await?,
442            Some((html_response.clone(), false))
443        );
444
445        http_cache_put(t, xdc_editor_url, &xdc_response).await?;
446        http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
447        assert_eq!(
448            http_cache_get(t, xdc_editor_url).await?,
449            Some((xdc_response.clone(), false))
450        );
451        assert_eq!(
452            http_cache_get(t, xdc_pixel_url).await?,
453            Some((xdc_response.clone(), false))
454        );
455
456        assert_eq!(
457            http_cache_get(t, "https://webxdc.org/").await?,
458            Some((html_response.clone(), false))
459        );
460
461        // HTML is stale after 1 hour, but .xdc is not.
462        SystemTime::shift(Duration::from_secs(3600 + 100));
463        assert_eq!(
464            http_cache_get(t, "https://webxdc.org/").await?,
465            Some((html_response.clone(), true))
466        );
467        assert_eq!(
468            http_cache_get(t, xdc_editor_url).await?,
469            Some((xdc_response.clone(), false))
470        );
471
472        // Stale cache entry can be renewed
473        // even before housekeeping removes old one.
474        http_cache_put(t, "https://webxdc.org/", &html_response).await?;
475        assert_eq!(
476            http_cache_get(t, "https://webxdc.org/").await?,
477            Some((html_response.clone(), false))
478        );
479
480        // 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour.
481        // But editor is still there because we did not request it for just 35 days.
482        // We have not renewed the editor however, so it becomes stale.
483        SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
484
485        // Run housekeeping to test that it does not delete the blob too early.
486        housekeeping(t).await?;
487
488        assert_eq!(
489            http_cache_get(t, xdc_editor_url).await?,
490            Some((xdc_response.clone(), true))
491        );
492        assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
493
494        // If we get the blob the second time quickly, it shouldn't be stale because it's supposed
495        // that we've already run a revalidation task which will update the blob soon.
496        assert_eq!(
497            http_cache_get(t, xdc_editor_url).await?,
498            Some((xdc_response.clone(), false))
499        );
500        // But if the revalidation task hasn't succeeded after some time, the blob is stale again
501        // even if we continue to get it frequently.
502        for i in (0..100).rev() {
503            SystemTime::shift(Duration::from_secs(6));
504            if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
505                break;
506            }
507            assert!(i > 0);
508        }
509
510        // Test that if the file is accidentally removed from the blobdir,
511        // there is no error when trying to load the cache entry.
512        for entry in std::fs::read_dir(t.get_blobdir())? {
513            let entry = entry.unwrap();
514            let path = entry.path();
515            std::fs::remove_file(path).expect("Failed to remove blob");
516        }
517
518        assert_eq!(
519            http_cache_get(t, xdc_editor_url)
520                .await
521                .context("Failed to get no cache response")?,
522            None
523        );
524
525        Ok(())
526    }
527}