1use anyhow::{Context as _, Result, anyhow, bail};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::log::warn;
14use crate::net::proxy::ProxyConfig;
15use crate::net::session::SessionStream;
16use crate::net::tls::wrap_rustls;
17use crate::tools::time;
18
19const USER_AGENT: &str = "chatmail/2 (+https://github.com/chatmail/core/)";
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct Response {
26 pub blob: Vec<u8>,
28
29 pub mimetype: Option<String>,
31
32 pub encoding: Option<String>,
34}
35
36pub async fn read_url(context: &Context, url: &str) -> Result<String> {
38 let response = read_url_blob(context, url).await?;
39 let text = String::from_utf8_lossy(&response.blob);
40 Ok(text.to_string())
41}
42
43async fn get_http_sender<B>(
44 context: &Context,
45 parsed_url: hyper::Uri,
46) -> Result<hyper::client::conn::http1::SendRequest<B>>
47where
48 B: hyper::body::Body + 'static + Send,
49 B::Data: Send,
50 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
51{
52 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
53 let host = parsed_url.host().context("URL has no host")?;
54 let proxy_config_opt = ProxyConfig::load(context).await?;
55
56 let stream: Box<dyn SessionStream> = match scheme {
57 "http" => {
58 let port = parsed_url.port_u16().unwrap_or(80);
59
60 let load_cache = false;
65 if let Some(proxy_config) = proxy_config_opt {
66 let proxy_stream = proxy_config
67 .connect(context, host, port, load_cache)
68 .await?;
69 Box::new(proxy_stream)
70 } else {
71 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
72 Box::new(tcp_stream)
73 }
74 }
75 "https" => {
76 let port = parsed_url.port_u16().unwrap_or(443);
77 let (use_sni, load_cache) = (true, true);
78
79 if let Some(proxy_config) = proxy_config_opt {
80 let proxy_stream = proxy_config
81 .connect(context, host, port, load_cache)
82 .await?;
83 let tls_stream = wrap_rustls(
84 host,
85 port,
86 use_sni,
87 "",
88 proxy_stream,
89 &context.tls_session_store,
90 &context.spki_hash_store,
91 &context.sql,
92 )
93 .await?;
94 Box::new(tls_stream)
95 } else {
96 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
97 let tls_stream = wrap_rustls(
98 host,
99 port,
100 use_sni,
101 "",
102 tcp_stream,
103 &context.tls_session_store,
104 &context.spki_hash_store,
105 &context.sql,
106 )
107 .await?;
108 Box::new(tls_stream)
109 }
110 }
111 _ => bail!("Unknown URL scheme"),
112 };
113
114 let io = TokioIo::new(stream);
115 let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
116 tokio::task::spawn(conn);
117
118 Ok(sender)
119}
120
121fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
123 let now = time();
124
125 let expires = now.saturating_add(3600 * 24 * 35);
126 let stale = if url.ends_with(".xdc") {
127 expires
129 } else if url.starts_with("https://tile.openstreetmap.org/")
130 || url.starts_with("https://vector.openstreetmap.org/")
131 {
132 now.saturating_add(3600 * 24 * 7)
136 } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
137 now.saturating_add(3600 * 24)
143 } else {
144 now.saturating_add(3600)
148 };
149 (expires, stale)
150}
151
152async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
154 let blob =
155 BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
156
157 let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
158 context
159 .sql
160 .insert(
161 "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
162 VALUES (?, ?, ?, ?, ?, ?)",
163 (
164 url,
165 expires,
166 stale,
167 blob.as_name(),
168 response.mimetype.as_deref().unwrap_or_default(),
169 response.encoding.as_deref().unwrap_or_default(),
170 ),
171 )
172 .await?;
173
174 Ok(())
175}
176
177#[expect(clippy::arithmetic_side_effects)]
181async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
182 let now = time();
183 let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
184 .sql
185 .query_row_optional(
186 "SELECT blobname, mimetype, encoding, stale
187 FROM http_cache WHERE url=? AND expires > ?",
188 (url, now),
189 |row| {
190 let blob_name: String = row.get(0)?;
191 let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
192 let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
193 let stale_timestamp: i64 = row.get(3)?;
194 Ok((blob_name, mimetype, encoding, stale_timestamp))
195 },
196 )
197 .await?
198 else {
199 return Ok(None);
200 };
201 let is_stale = now > stale_timestamp;
202
203 let blob_object = BlobObject::from_name(context, &blob_name)?;
204 let blob_abs_path = blob_object.to_abs_path();
205 let blob = match fs::read(blob_abs_path)
206 .await
207 .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
208 {
209 Ok(blob) => blob,
210 Err(err) => {
211 warn!(context, "{err:?}.");
214 return Ok(None);
215 }
216 };
217
218 let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
219 let response = Response {
220 blob,
221 mimetype,
222 encoding,
223 };
224
225 let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
233 context
234 .sql
235 .execute(
236 "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
237 (expires, stale_timestamp, url),
238 )
239 .await?;
240
241 Ok(Some((response, is_stale)))
242}
243
244pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
246 context
250 .sql
251 .execute(
252 "DELETE FROM http_cache
253 WHERE ?1 > expires OR expires > ?1 + 31536000",
254 (time(),),
255 )
256 .await?;
257 Ok(())
258}
259
260async fn fetch_url(context: &Context, original_url: &str) -> Result<Response> {
264 let mut url = original_url.to_string();
265
266 for _i in 0..10 {
268 let parsed_url = url
269 .parse::<hyper::Uri>()
270 .with_context(|| format!("Failed to parse URL {url:?}"))?;
271
272 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
273 let authority = parsed_url
274 .authority()
275 .context("URL has no authority")?
276 .clone();
277
278 let req = hyper::Request::builder().uri(parsed_url);
279
280 let req =
287 if authority == "tile.openstreetmap.org" || authority == "vector.openstreetmap.org" {
288 req.header("User-Agent", USER_AGENT)
289 } else {
290 req
291 };
292
293 let req = req
294 .header(hyper::header::HOST, authority.as_str())
295 .body(http_body_util::Empty::<Bytes>::new())?;
296 let response = sender.send_request(req).await?;
297
298 if response.status().is_redirection() {
299 let header = response
300 .headers()
301 .get_all("location")
302 .iter()
303 .next_back()
304 .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
305 .to_str()?;
306 info!(context, "Following redirect to {}", header);
307 url = header.to_string();
308 continue;
309 }
310
311 if !response.status().is_success() {
312 return Err(anyhow!(
313 "The server returned a non-successful response code: {}{}",
314 response.status().as_u16(),
315 response
316 .status()
317 .canonical_reason()
318 .map(|s| format!(" {s}"))
319 .unwrap_or("".to_string())
320 ));
321 }
322
323 let content_type = response
324 .headers()
325 .get("content-type")
326 .and_then(|value| value.to_str().ok())
327 .and_then(|value| value.parse::<Mime>().ok());
328 let mimetype = content_type
329 .as_ref()
330 .map(|mime| mime.essence_str().to_string());
331 let encoding = content_type.as_ref().and_then(|mime| {
332 mime.get_param(mime::CHARSET)
333 .map(|charset| charset.as_str().to_string())
334 });
335 let body = response.collect().await?.to_bytes();
336 let blob: Vec<u8> = body.to_vec();
337 let response = Response {
338 blob,
339 mimetype,
340 encoding,
341 };
342 info!(context, "Inserting {original_url:?} into cache.");
343 http_cache_put(context, &url, &response).await?;
344 return Ok(response);
345 }
346
347 Err(anyhow!("Followed 10 redirections"))
348}
349
350pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
352 if let Some((response, is_stale)) = http_cache_get(context, url).await? {
353 info!(context, "Returning {url:?} from cache.");
354 if is_stale {
355 let context = context.clone();
356 let url = url.to_string();
357 tokio::spawn(async move {
358 info!(context, "Fetching stale {url:?} in background.");
360 if let Err(err) = fetch_url(&context, &url).await {
361 warn!(context, "Failed to revalidate {url:?}: {err:#}.");
362 }
363 });
364 }
365 return Ok(response);
366 }
367
368 info!(context, "Not found {url:?} in cache, fetching.");
369 let response = fetch_url(context, url).await?;
370 Ok(response)
371}
372
373pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
379 let parsed_url = url
380 .parse::<hyper::Uri>()
381 .with_context(|| format!("Failed to parse URL {url:?}"))?;
382 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
383 if scheme != "https" {
384 bail!("POST requests to non-HTTPS URLs are not allowed");
385 }
386
387 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
388 let authority = parsed_url
389 .authority()
390 .context("URL has no authority")?
391 .clone();
392 let req = hyper::Request::post(parsed_url)
393 .header(hyper::header::HOST, authority.as_str())
394 .body(http_body_util::Empty::<Bytes>::new())?;
395
396 let response = sender.send_request(req).await?;
397
398 let response_status = response.status();
399 let body = response.collect().await?.to_bytes();
400 let text = String::from_utf8_lossy(&body);
401 let response_text = text.to_string();
402
403 Ok((response_text, response_status.is_success()))
404}
405
406#[allow(dead_code)]
412pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
413 let parsed_url = url
414 .parse::<hyper::Uri>()
415 .with_context(|| format!("Failed to parse URL {url:?}"))?;
416 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
417 if scheme != "https" {
418 bail!("POST requests to non-HTTPS URLs are not allowed");
419 }
420
421 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
422 let authority = parsed_url
423 .authority()
424 .context("URL has no authority")?
425 .clone();
426
427 let request = hyper::Request::post(parsed_url)
428 .header(hyper::header::HOST, authority.as_str())
429 .body(body)?;
430 let response = sender.send_request(request).await?;
431
432 Ok(response.status().is_success())
433}
434
435pub(crate) async fn post_form<T: Serialize + ?Sized>(
439 context: &Context,
440 url: &str,
441 form: &T,
442) -> Result<Bytes> {
443 let parsed_url = url
444 .parse::<hyper::Uri>()
445 .with_context(|| format!("Failed to parse URL {url:?}"))?;
446 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
447 if scheme != "https" {
448 bail!("POST requests to non-HTTPS URLs are not allowed");
449 }
450
451 let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
452 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
453 let authority = parsed_url
454 .authority()
455 .context("URL has no authority")?
456 .clone();
457 let request = hyper::Request::post(parsed_url)
458 .header(hyper::header::HOST, authority.as_str())
459 .header("content-type", "application/x-www-form-urlencoded")
460 .body(encoded_body)?;
461 let response = sender.send_request(request).await?;
462 let bytes = response.collect().await?.to_bytes();
463 Ok(bytes)
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469 use std::time::Duration;
470
471 use crate::sql::housekeeping;
472 use crate::test_utils::TestContext;
473 use crate::tools::SystemTime;
474
475 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
476 async fn test_http_cache() -> Result<()> {
477 let t = &TestContext::new().await;
478
479 assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
480
481 let html_response = Response {
482 blob: b"<!DOCTYPE html> ...".to_vec(),
483 mimetype: Some("text/html".to_string()),
484 encoding: None,
485 };
486
487 let xdc_response = Response {
488 blob: b"PK...".to_vec(),
489 mimetype: Some("application/octet-stream".to_string()),
490 encoding: None,
491 };
492 let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
493 let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
494
495 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
496
497 assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
498 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
499 assert_eq!(
500 http_cache_get(t, "https://webxdc.org/").await?,
501 Some((html_response.clone(), false))
502 );
503
504 http_cache_put(t, xdc_editor_url, &xdc_response).await?;
505 http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
506 assert_eq!(
507 http_cache_get(t, xdc_editor_url).await?,
508 Some((xdc_response.clone(), false))
509 );
510 assert_eq!(
511 http_cache_get(t, xdc_pixel_url).await?,
512 Some((xdc_response.clone(), false))
513 );
514
515 assert_eq!(
516 http_cache_get(t, "https://webxdc.org/").await?,
517 Some((html_response.clone(), false))
518 );
519
520 SystemTime::shift(Duration::from_secs(3600 + 100));
522 assert_eq!(
523 http_cache_get(t, "https://webxdc.org/").await?,
524 Some((html_response.clone(), true))
525 );
526 assert_eq!(
527 http_cache_get(t, xdc_editor_url).await?,
528 Some((xdc_response.clone(), false))
529 );
530
531 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
534 assert_eq!(
535 http_cache_get(t, "https://webxdc.org/").await?,
536 Some((html_response.clone(), false))
537 );
538
539 SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
543
544 housekeeping(t).await?;
546
547 assert_eq!(
548 http_cache_get(t, xdc_editor_url).await?,
549 Some((xdc_response.clone(), true))
550 );
551 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
552
553 assert_eq!(
556 http_cache_get(t, xdc_editor_url).await?,
557 Some((xdc_response.clone(), false))
558 );
559 for i in (0..100).rev() {
562 SystemTime::shift(Duration::from_secs(6));
563 if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
564 break;
565 }
566 assert!(i > 0);
567 }
568
569 for entry in std::fs::read_dir(t.get_blobdir())? {
572 let entry = entry.unwrap();
573 let path = entry.path();
574 std::fs::remove_file(path).expect("Failed to remove blob");
575 }
576
577 assert_eq!(
578 http_cache_get(t, xdc_editor_url)
579 .await
580 .context("Failed to get no cache response")?,
581 None
582 );
583
584 Ok(())
585 }
586}