1use anyhow::{Context as _, Result, anyhow, bail};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::log::warn;
14use crate::net::proxy::ProxyConfig;
15use crate::net::session::SessionStream;
16use crate::net::tls::wrap_rustls;
17use crate::tools::time;
18
19const USER_AGENT: &str = "chatmail/2 (+https://github.com/chatmail/core/)";
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct Response {
26 pub blob: Vec<u8>,
28
29 pub mimetype: Option<String>,
31
32 pub encoding: Option<String>,
34}
35
36pub async fn read_url(context: &Context, url: &str) -> Result<String> {
38 let response = read_url_blob(context, url).await?;
39 let text = String::from_utf8_lossy(&response.blob);
40 Ok(text.to_string())
41}
42
43async fn get_http_sender<B>(
44 context: &Context,
45 parsed_url: hyper::Uri,
46) -> Result<hyper::client::conn::http1::SendRequest<B>>
47where
48 B: hyper::body::Body + 'static + Send,
49 B::Data: Send,
50 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
51{
52 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
53 let host = parsed_url.host().context("URL has no host")?;
54 let proxy_config_opt = ProxyConfig::load(context).await?;
55
56 let stream: Box<dyn SessionStream> = match scheme {
57 "http" => {
58 let port = parsed_url.port_u16().unwrap_or(80);
59
60 let load_cache = false;
65 if let Some(proxy_config) = proxy_config_opt {
66 let proxy_stream = proxy_config
67 .connect(context, host, port, load_cache)
68 .await?;
69 Box::new(proxy_stream)
70 } else {
71 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
72 Box::new(tcp_stream)
73 }
74 }
75 "https" => {
76 let port = parsed_url.port_u16().unwrap_or(443);
77 let (use_sni, load_cache) = (true, true);
78
79 if let Some(proxy_config) = proxy_config_opt {
80 let proxy_stream = proxy_config
81 .connect(context, host, port, load_cache)
82 .await?;
83 let tls_stream = wrap_rustls(
84 host,
85 port,
86 use_sni,
87 "",
88 proxy_stream,
89 &context.tls_session_store,
90 )
91 .await?;
92 Box::new(tls_stream)
93 } else {
94 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
95 let tls_stream = wrap_rustls(
96 host,
97 port,
98 use_sni,
99 "",
100 tcp_stream,
101 &context.tls_session_store,
102 )
103 .await?;
104 Box::new(tls_stream)
105 }
106 }
107 _ => bail!("Unknown URL scheme"),
108 };
109
110 let io = TokioIo::new(stream);
111 let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
112 tokio::task::spawn(conn);
113
114 Ok(sender)
115}
116
117fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
119 let now = time();
120
121 let expires = now.saturating_add(3600 * 24 * 35);
122 let stale = if url.ends_with(".xdc") {
123 expires
125 } else if url.starts_with("https://tile.openstreetmap.org/")
126 || url.starts_with("https://vector.openstreetmap.org/")
127 {
128 now.saturating_add(3600 * 24 * 7)
132 } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
133 now.saturating_add(3600 * 24)
139 } else {
140 now.saturating_add(3600)
144 };
145 (expires, stale)
146}
147
148async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
150 let blob =
151 BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
152
153 let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
154 context
155 .sql
156 .insert(
157 "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
158 VALUES (?, ?, ?, ?, ?, ?)",
159 (
160 url,
161 expires,
162 stale,
163 blob.as_name(),
164 response.mimetype.as_deref().unwrap_or_default(),
165 response.encoding.as_deref().unwrap_or_default(),
166 ),
167 )
168 .await?;
169
170 Ok(())
171}
172
173#[expect(clippy::arithmetic_side_effects)]
177async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
178 let now = time();
179 let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
180 .sql
181 .query_row_optional(
182 "SELECT blobname, mimetype, encoding, stale
183 FROM http_cache WHERE url=? AND expires > ?",
184 (url, now),
185 |row| {
186 let blob_name: String = row.get(0)?;
187 let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
188 let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
189 let stale_timestamp: i64 = row.get(3)?;
190 Ok((blob_name, mimetype, encoding, stale_timestamp))
191 },
192 )
193 .await?
194 else {
195 return Ok(None);
196 };
197 let is_stale = now > stale_timestamp;
198
199 let blob_object = BlobObject::from_name(context, &blob_name)?;
200 let blob_abs_path = blob_object.to_abs_path();
201 let blob = match fs::read(blob_abs_path)
202 .await
203 .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
204 {
205 Ok(blob) => blob,
206 Err(err) => {
207 warn!(context, "{err:?}.");
210 return Ok(None);
211 }
212 };
213
214 let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
215 let response = Response {
216 blob,
217 mimetype,
218 encoding,
219 };
220
221 let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
229 context
230 .sql
231 .execute(
232 "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
233 (expires, stale_timestamp, url),
234 )
235 .await?;
236
237 Ok(Some((response, is_stale)))
238}
239
240pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
242 context
246 .sql
247 .execute(
248 "DELETE FROM http_cache
249 WHERE ?1 > expires OR expires > ?1 + 31536000",
250 (time(),),
251 )
252 .await?;
253 Ok(())
254}
255
256async fn fetch_url(context: &Context, original_url: &str) -> Result<Response> {
260 let mut url = original_url.to_string();
261
262 for _i in 0..10 {
264 let parsed_url = url
265 .parse::<hyper::Uri>()
266 .with_context(|| format!("Failed to parse URL {url:?}"))?;
267
268 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
269 let authority = parsed_url
270 .authority()
271 .context("URL has no authority")?
272 .clone();
273
274 let req = hyper::Request::builder().uri(parsed_url);
275
276 let req =
283 if authority == "tile.openstreetmap.org" || authority == "vector.openstreetmap.org" {
284 req.header("User-Agent", USER_AGENT)
285 } else {
286 req
287 };
288
289 let req = req
290 .header(hyper::header::HOST, authority.as_str())
291 .body(http_body_util::Empty::<Bytes>::new())?;
292 let response = sender.send_request(req).await?;
293
294 if response.status().is_redirection() {
295 let header = response
296 .headers()
297 .get_all("location")
298 .iter()
299 .next_back()
300 .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
301 .to_str()?;
302 info!(context, "Following redirect to {}", header);
303 url = header.to_string();
304 continue;
305 }
306
307 if !response.status().is_success() {
308 return Err(anyhow!(
309 "The server returned a non-successful response code: {}{}",
310 response.status().as_u16(),
311 response
312 .status()
313 .canonical_reason()
314 .map(|s| format!(" {s}"))
315 .unwrap_or("".to_string())
316 ));
317 }
318
319 let content_type = response
320 .headers()
321 .get("content-type")
322 .and_then(|value| value.to_str().ok())
323 .and_then(|value| value.parse::<Mime>().ok());
324 let mimetype = content_type
325 .as_ref()
326 .map(|mime| mime.essence_str().to_string());
327 let encoding = content_type.as_ref().and_then(|mime| {
328 mime.get_param(mime::CHARSET)
329 .map(|charset| charset.as_str().to_string())
330 });
331 let body = response.collect().await?.to_bytes();
332 let blob: Vec<u8> = body.to_vec();
333 let response = Response {
334 blob,
335 mimetype,
336 encoding,
337 };
338 info!(context, "Inserting {original_url:?} into cache.");
339 http_cache_put(context, &url, &response).await?;
340 return Ok(response);
341 }
342
343 Err(anyhow!("Followed 10 redirections"))
344}
345
346pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
348 if let Some((response, is_stale)) = http_cache_get(context, url).await? {
349 info!(context, "Returning {url:?} from cache.");
350 if is_stale {
351 let context = context.clone();
352 let url = url.to_string();
353 tokio::spawn(async move {
354 info!(context, "Fetching stale {url:?} in background.");
356 if let Err(err) = fetch_url(&context, &url).await {
357 warn!(context, "Failed to revalidate {url:?}: {err:#}.");
358 }
359 });
360 }
361 return Ok(response);
362 }
363
364 info!(context, "Not found {url:?} in cache, fetching.");
365 let response = fetch_url(context, url).await?;
366 Ok(response)
367}
368
369pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
375 let parsed_url = url
376 .parse::<hyper::Uri>()
377 .with_context(|| format!("Failed to parse URL {url:?}"))?;
378 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
379 if scheme != "https" {
380 bail!("POST requests to non-HTTPS URLs are not allowed");
381 }
382
383 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
384 let authority = parsed_url
385 .authority()
386 .context("URL has no authority")?
387 .clone();
388 let req = hyper::Request::post(parsed_url)
389 .header(hyper::header::HOST, authority.as_str())
390 .body(http_body_util::Empty::<Bytes>::new())?;
391
392 let response = sender.send_request(req).await?;
393
394 let response_status = response.status();
395 let body = response.collect().await?.to_bytes();
396 let text = String::from_utf8_lossy(&body);
397 let response_text = text.to_string();
398
399 Ok((response_text, response_status.is_success()))
400}
401
402#[allow(dead_code)]
408pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
409 let parsed_url = url
410 .parse::<hyper::Uri>()
411 .with_context(|| format!("Failed to parse URL {url:?}"))?;
412 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
413 if scheme != "https" {
414 bail!("POST requests to non-HTTPS URLs are not allowed");
415 }
416
417 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
418 let authority = parsed_url
419 .authority()
420 .context("URL has no authority")?
421 .clone();
422
423 let request = hyper::Request::post(parsed_url)
424 .header(hyper::header::HOST, authority.as_str())
425 .body(body)?;
426 let response = sender.send_request(request).await?;
427
428 Ok(response.status().is_success())
429}
430
431pub(crate) async fn post_form<T: Serialize + ?Sized>(
435 context: &Context,
436 url: &str,
437 form: &T,
438) -> Result<Bytes> {
439 let parsed_url = url
440 .parse::<hyper::Uri>()
441 .with_context(|| format!("Failed to parse URL {url:?}"))?;
442 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
443 if scheme != "https" {
444 bail!("POST requests to non-HTTPS URLs are not allowed");
445 }
446
447 let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
448 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
449 let authority = parsed_url
450 .authority()
451 .context("URL has no authority")?
452 .clone();
453 let request = hyper::Request::post(parsed_url)
454 .header(hyper::header::HOST, authority.as_str())
455 .header("content-type", "application/x-www-form-urlencoded")
456 .body(encoded_body)?;
457 let response = sender.send_request(request).await?;
458 let bytes = response.collect().await?.to_bytes();
459 Ok(bytes)
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use std::time::Duration;
466
467 use crate::sql::housekeeping;
468 use crate::test_utils::TestContext;
469 use crate::tools::SystemTime;
470
471 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
472 async fn test_http_cache() -> Result<()> {
473 let t = &TestContext::new().await;
474
475 assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
476
477 let html_response = Response {
478 blob: b"<!DOCTYPE html> ...".to_vec(),
479 mimetype: Some("text/html".to_string()),
480 encoding: None,
481 };
482
483 let xdc_response = Response {
484 blob: b"PK...".to_vec(),
485 mimetype: Some("application/octet-stream".to_string()),
486 encoding: None,
487 };
488 let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
489 let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
490
491 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
492
493 assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
494 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
495 assert_eq!(
496 http_cache_get(t, "https://webxdc.org/").await?,
497 Some((html_response.clone(), false))
498 );
499
500 http_cache_put(t, xdc_editor_url, &xdc_response).await?;
501 http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
502 assert_eq!(
503 http_cache_get(t, xdc_editor_url).await?,
504 Some((xdc_response.clone(), false))
505 );
506 assert_eq!(
507 http_cache_get(t, xdc_pixel_url).await?,
508 Some((xdc_response.clone(), false))
509 );
510
511 assert_eq!(
512 http_cache_get(t, "https://webxdc.org/").await?,
513 Some((html_response.clone(), false))
514 );
515
516 SystemTime::shift(Duration::from_secs(3600 + 100));
518 assert_eq!(
519 http_cache_get(t, "https://webxdc.org/").await?,
520 Some((html_response.clone(), true))
521 );
522 assert_eq!(
523 http_cache_get(t, xdc_editor_url).await?,
524 Some((xdc_response.clone(), false))
525 );
526
527 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
530 assert_eq!(
531 http_cache_get(t, "https://webxdc.org/").await?,
532 Some((html_response.clone(), false))
533 );
534
535 SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
539
540 housekeeping(t).await?;
542
543 assert_eq!(
544 http_cache_get(t, xdc_editor_url).await?,
545 Some((xdc_response.clone(), true))
546 );
547 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
548
549 assert_eq!(
552 http_cache_get(t, xdc_editor_url).await?,
553 Some((xdc_response.clone(), false))
554 );
555 for i in (0..100).rev() {
558 SystemTime::shift(Duration::from_secs(6));
559 if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
560 break;
561 }
562 assert!(i > 0);
563 }
564
565 for entry in std::fs::read_dir(t.get_blobdir())? {
568 let entry = entry.unwrap();
569 let path = entry.path();
570 std::fs::remove_file(path).expect("Failed to remove blob");
571 }
572
573 assert_eq!(
574 http_cache_get(t, xdc_editor_url)
575 .await
576 .context("Failed to get no cache response")?,
577 None
578 );
579
580 Ok(())
581 }
582}