1use anyhow::{Context as _, Result, anyhow, bail};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::log::warn;
14use crate::net::proxy::ProxyConfig;
15use crate::net::session::SessionStream;
16use crate::net::tls::wrap_tls;
17use crate::tools::time;
18
19const USER_AGENT: &str = "chatmail/2 (+https://github.com/chatmail/core/)";
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct Response {
26 pub blob: Vec<u8>,
28
29 pub mimetype: Option<String>,
31
32 pub encoding: Option<String>,
34}
35
36pub async fn read_url(context: &Context, url: &str) -> Result<String> {
38 read_url_with_tls(context, url, true).await
39}
40
41pub(crate) async fn read_url_with_tls(
43 context: &Context,
44 url: &str,
45 strict_tls: bool,
46) -> Result<String> {
47 let response = read_url_blob_with_tls(context, url, strict_tls).await?;
48 let text = String::from_utf8_lossy(&response.blob);
49 Ok(text.to_string())
50}
51
52async fn get_http_sender<B>(
53 context: &Context,
54 parsed_url: hyper::Uri,
55 strict_tls: bool,
56) -> Result<hyper::client::conn::http1::SendRequest<B>>
57where
58 B: hyper::body::Body + 'static + Send,
59 B::Data: Send,
60 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
61{
62 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
63 let host = parsed_url.host().context("URL has no host")?;
64 let proxy_config_opt = ProxyConfig::load(context).await?;
65
66 let stream: Box<dyn SessionStream> = match scheme {
67 "http" => {
68 let port = parsed_url.port_u16().unwrap_or(80);
69
70 let load_cache = false;
75 if let Some(proxy_config) = proxy_config_opt {
76 let proxy_stream = proxy_config
77 .connect(context, host, port, load_cache)
78 .await?;
79 Box::new(proxy_stream)
80 } else {
81 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
82 Box::new(tcp_stream)
83 }
84 }
85 "https" => {
86 let port = parsed_url.port_u16().unwrap_or(443);
87 let (use_sni, load_cache) = (true, true);
88
89 let tcp_stream: Box<dyn SessionStream> = if let Some(proxy_config) = proxy_config_opt {
90 let proxy_stream = proxy_config
91 .connect(context, host, port, load_cache)
92 .await?;
93 Box::new(proxy_stream)
94 } else {
95 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
96 Box::new(tcp_stream)
97 };
98
99 let tls_stream = wrap_tls(
100 strict_tls,
101 host,
102 port,
103 use_sni,
104 "",
105 tcp_stream,
106 &context.tls_session_store,
107 &context.spki_hash_store,
108 &context.sql,
109 )
110 .await?;
111 Box::new(tls_stream)
112 }
113 _ => bail!("Unknown URL scheme"),
114 };
115
116 let io = TokioIo::new(stream);
117 let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
118 tokio::task::spawn(conn);
119
120 Ok(sender)
121}
122
123fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
125 let now = time();
126
127 let expires = now.saturating_add(3600 * 24 * 35);
128 let stale = if url.ends_with(".xdc") {
129 expires
131 } else if url.starts_with("https://tile.openstreetmap.org/")
132 || url.starts_with("https://vector.openstreetmap.org/")
133 {
134 now.saturating_add(3600 * 24 * 7)
138 } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
139 now.saturating_add(3600 * 24)
145 } else {
146 now.saturating_add(3600)
150 };
151 (expires, stale)
152}
153
154async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
156 let blob =
157 BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
158
159 let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
160 context
161 .sql
162 .insert(
163 "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
164 VALUES (?, ?, ?, ?, ?, ?)",
165 (
166 url,
167 expires,
168 stale,
169 blob.as_name(),
170 response.mimetype.as_deref().unwrap_or_default(),
171 response.encoding.as_deref().unwrap_or_default(),
172 ),
173 )
174 .await?;
175
176 Ok(())
177}
178
179#[expect(clippy::arithmetic_side_effects)]
183async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
184 let now = time();
185 let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
186 .sql
187 .query_row_optional(
188 "SELECT blobname, mimetype, encoding, stale
189 FROM http_cache WHERE url=? AND expires > ?",
190 (url, now),
191 |row| {
192 let blob_name: String = row.get(0)?;
193 let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
194 let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
195 let stale_timestamp: i64 = row.get(3)?;
196 Ok((blob_name, mimetype, encoding, stale_timestamp))
197 },
198 )
199 .await?
200 else {
201 return Ok(None);
202 };
203 let is_stale = now > stale_timestamp;
204
205 let blob_object = BlobObject::from_name(context, &blob_name)?;
206 let blob_abs_path = blob_object.to_abs_path();
207 let blob = match fs::read(blob_abs_path)
208 .await
209 .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
210 {
211 Ok(blob) => blob,
212 Err(err) => {
213 warn!(context, "{err:?}.");
216 return Ok(None);
217 }
218 };
219
220 let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
221 let response = Response {
222 blob,
223 mimetype,
224 encoding,
225 };
226
227 let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
235 context
236 .sql
237 .execute(
238 "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
239 (expires, stale_timestamp, url),
240 )
241 .await?;
242
243 Ok(Some((response, is_stale)))
244}
245
246pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
248 context
252 .sql
253 .execute(
254 "DELETE FROM http_cache
255 WHERE ?1 > expires OR expires > ?1 + 31536000",
256 (time(),),
257 )
258 .await?;
259 Ok(())
260}
261
262async fn fetch_url(context: &Context, original_url: &str, strict_tls: bool) -> Result<Response> {
266 let mut url = original_url.to_string();
267
268 for _i in 0..10 {
270 let parsed_url = url
271 .parse::<hyper::Uri>()
272 .with_context(|| format!("Failed to parse URL {url:?}"))?;
273
274 let mut sender = get_http_sender(context, parsed_url.clone(), strict_tls).await?;
275 let authority = parsed_url
276 .authority()
277 .context("URL has no authority")?
278 .clone();
279
280 let req = hyper::Request::builder().uri(parsed_url);
281
282 let req =
289 if authority == "tile.openstreetmap.org" || authority == "vector.openstreetmap.org" {
290 req.header("User-Agent", USER_AGENT)
291 } else {
292 req
293 };
294
295 let req = req
296 .header(hyper::header::HOST, authority.as_str())
297 .body(http_body_util::Empty::<Bytes>::new())?;
298 let response = sender.send_request(req).await?;
299
300 if response.status().is_redirection() {
301 let header = response
302 .headers()
303 .get_all("location")
304 .iter()
305 .next_back()
306 .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
307 .to_str()?;
308 info!(context, "Following redirect to {}", header);
309 url = header.to_string();
310 continue;
311 }
312
313 if !response.status().is_success() {
314 return Err(anyhow!(
315 "The server returned a non-successful response code: {}{}",
316 response.status().as_u16(),
317 response
318 .status()
319 .canonical_reason()
320 .map(|s| format!(" {s}"))
321 .unwrap_or("".to_string())
322 ));
323 }
324
325 let content_type = response
326 .headers()
327 .get("content-type")
328 .and_then(|value| value.to_str().ok())
329 .and_then(|value| value.parse::<Mime>().ok());
330 let mimetype = content_type
331 .as_ref()
332 .map(|mime| mime.essence_str().to_string());
333 let encoding = content_type.as_ref().and_then(|mime| {
334 mime.get_param(mime::CHARSET)
335 .map(|charset| charset.as_str().to_string())
336 });
337 let body = response.collect().await?.to_bytes();
338 let blob: Vec<u8> = body.to_vec();
339 let response = Response {
340 blob,
341 mimetype,
342 encoding,
343 };
344 if strict_tls {
345 info!(context, "Inserting {original_url:?} into cache.");
346 http_cache_put(context, &url, &response).await?;
347 }
348 return Ok(response);
349 }
350
351 Err(anyhow!("Followed 10 redirections"))
352}
353
354pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
356 read_url_blob_with_tls(context, url, true).await
357}
358
359pub(crate) async fn read_url_blob_with_tls(
361 context: &Context,
362 url: &str,
363 strict_tls: bool,
364) -> Result<Response> {
365 if !strict_tls {
366 info!(
367 context,
368 "Fetching {url:?} without HTTP cache due to relaxed TLS."
369 );
370 return fetch_url(context, url, strict_tls).await;
371 }
372
373 if let Some((response, is_stale)) = http_cache_get(context, url).await? {
374 info!(context, "Returning {url:?} from cache.");
375 if is_stale {
376 let context = context.clone();
377 let url = url.to_string();
378 tokio::spawn(async move {
379 info!(context, "Fetching stale {url:?} in background.");
381 if let Err(err) = fetch_url(&context, &url, true).await {
382 warn!(context, "Failed to revalidate {url:?}: {err:#}.");
383 }
384 });
385 }
386 return Ok(response);
387 }
388
389 info!(context, "Not found {url:?} in cache, fetching.");
390 let response = fetch_url(context, url, true).await?;
391 Ok(response)
392}
393
394pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
400 let parsed_url = url
401 .parse::<hyper::Uri>()
402 .with_context(|| format!("Failed to parse URL {url:?}"))?;
403 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
404 if scheme != "https" {
405 bail!("POST requests to non-HTTPS URLs are not allowed");
406 }
407
408 let mut sender = get_http_sender(context, parsed_url.clone(), true).await?;
409 let authority = parsed_url
410 .authority()
411 .context("URL has no authority")?
412 .clone();
413 let req = hyper::Request::post(parsed_url)
414 .header(hyper::header::HOST, authority.as_str())
415 .body(http_body_util::Empty::<Bytes>::new())?;
416
417 let response = sender.send_request(req).await?;
418
419 let response_status = response.status();
420 let body = response.collect().await?.to_bytes();
421 let text = String::from_utf8_lossy(&body);
422 let response_text = text.to_string();
423
424 Ok((response_text, response_status.is_success()))
425}
426
427#[allow(dead_code)]
433pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
434 let parsed_url = url
435 .parse::<hyper::Uri>()
436 .with_context(|| format!("Failed to parse URL {url:?}"))?;
437 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
438 if scheme != "https" {
439 bail!("POST requests to non-HTTPS URLs are not allowed");
440 }
441
442 let mut sender = get_http_sender(context, parsed_url.clone(), true).await?;
443 let authority = parsed_url
444 .authority()
445 .context("URL has no authority")?
446 .clone();
447
448 let request = hyper::Request::post(parsed_url)
449 .header(hyper::header::HOST, authority.as_str())
450 .body(body)?;
451 let response = sender.send_request(request).await?;
452
453 Ok(response.status().is_success())
454}
455
456pub(crate) async fn post_form<T: Serialize + ?Sized>(
460 context: &Context,
461 url: &str,
462 form: &T,
463) -> Result<Bytes> {
464 let parsed_url = url
465 .parse::<hyper::Uri>()
466 .with_context(|| format!("Failed to parse URL {url:?}"))?;
467 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
468 if scheme != "https" {
469 bail!("POST requests to non-HTTPS URLs are not allowed");
470 }
471
472 let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
473 let mut sender = get_http_sender(context, parsed_url.clone(), true).await?;
474 let authority = parsed_url
475 .authority()
476 .context("URL has no authority")?
477 .clone();
478 let request = hyper::Request::post(parsed_url)
479 .header(hyper::header::HOST, authority.as_str())
480 .header("content-type", "application/x-www-form-urlencoded")
481 .body(encoded_body)?;
482 let response = sender.send_request(request).await?;
483 let bytes = response.collect().await?.to_bytes();
484 Ok(bytes)
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490 use std::time::Duration;
491
492 use crate::sql::housekeeping;
493 use crate::test_utils::TestContext;
494 use crate::tools::SystemTime;
495
496 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
497 async fn test_http_cache() -> Result<()> {
498 let t = &TestContext::new().await;
499
500 assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
501
502 let html_response = Response {
503 blob: b"<!DOCTYPE html> ...".to_vec(),
504 mimetype: Some("text/html".to_string()),
505 encoding: None,
506 };
507
508 let xdc_response = Response {
509 blob: b"PK...".to_vec(),
510 mimetype: Some("application/octet-stream".to_string()),
511 encoding: None,
512 };
513 let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
514 let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
515
516 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
517
518 assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
519 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
520 assert_eq!(
521 http_cache_get(t, "https://webxdc.org/").await?,
522 Some((html_response.clone(), false))
523 );
524
525 http_cache_put(t, xdc_editor_url, &xdc_response).await?;
526 http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
527 assert_eq!(
528 http_cache_get(t, xdc_editor_url).await?,
529 Some((xdc_response.clone(), false))
530 );
531 assert_eq!(
532 http_cache_get(t, xdc_pixel_url).await?,
533 Some((xdc_response.clone(), false))
534 );
535
536 assert_eq!(
537 http_cache_get(t, "https://webxdc.org/").await?,
538 Some((html_response.clone(), false))
539 );
540
541 SystemTime::shift(Duration::from_secs(3600 + 100));
543 assert_eq!(
544 http_cache_get(t, "https://webxdc.org/").await?,
545 Some((html_response.clone(), true))
546 );
547 assert_eq!(
548 http_cache_get(t, xdc_editor_url).await?,
549 Some((xdc_response.clone(), false))
550 );
551
552 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
555 assert_eq!(
556 http_cache_get(t, "https://webxdc.org/").await?,
557 Some((html_response.clone(), false))
558 );
559
560 SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
564
565 housekeeping(t).await?;
567
568 assert_eq!(
569 http_cache_get(t, xdc_editor_url).await?,
570 Some((xdc_response.clone(), true))
571 );
572 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
573
574 assert_eq!(
577 http_cache_get(t, xdc_editor_url).await?,
578 Some((xdc_response.clone(), false))
579 );
580 for i in (0..100).rev() {
583 SystemTime::shift(Duration::from_secs(6));
584 if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
585 break;
586 }
587 assert!(i > 0);
588 }
589
590 for entry in std::fs::read_dir(t.get_blobdir())? {
593 let entry = entry.unwrap();
594 let path = entry.path();
595 std::fs::remove_file(path).expect("Failed to remove blob");
596 }
597
598 assert_eq!(
599 http_cache_get(t, xdc_editor_url)
600 .await
601 .context("Failed to get no cache response")?,
602 None
603 );
604
605 Ok(())
606 }
607}