1use anyhow::{Context as _, Result, anyhow, bail};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::log::warn;
14use crate::net::proxy::ProxyConfig;
15use crate::net::session::SessionStream;
16use crate::net::tls::wrap_rustls;
17use crate::tools::time;
18
19const USER_AGENT: &str = "chatmail/2 (+https://github.com/chatmail/core/)";
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct Response {
26 pub blob: Vec<u8>,
28
29 pub mimetype: Option<String>,
31
32 pub encoding: Option<String>,
34}
35
36pub async fn read_url(context: &Context, url: &str) -> Result<String> {
38 let response = read_url_blob(context, url).await?;
39 let text = String::from_utf8_lossy(&response.blob);
40 Ok(text.to_string())
41}
42
43async fn get_http_sender<B>(
44 context: &Context,
45 parsed_url: hyper::Uri,
46) -> Result<hyper::client::conn::http1::SendRequest<B>>
47where
48 B: hyper::body::Body + 'static + Send,
49 B::Data: Send,
50 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
51{
52 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
53 let host = parsed_url.host().context("URL has no host")?;
54 let proxy_config_opt = ProxyConfig::load(context).await?;
55
56 let stream: Box<dyn SessionStream> = match scheme {
57 "http" => {
58 let port = parsed_url.port_u16().unwrap_or(80);
59
60 let load_cache = false;
65 if let Some(proxy_config) = proxy_config_opt {
66 let proxy_stream = proxy_config
67 .connect(context, host, port, load_cache)
68 .await?;
69 Box::new(proxy_stream)
70 } else {
71 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
72 Box::new(tcp_stream)
73 }
74 }
75 "https" => {
76 let port = parsed_url.port_u16().unwrap_or(443);
77 let (use_sni, load_cache) = (true, true);
78
79 if let Some(proxy_config) = proxy_config_opt {
80 let proxy_stream = proxy_config
81 .connect(context, host, port, load_cache)
82 .await?;
83 let tls_stream = wrap_rustls(
84 host,
85 port,
86 use_sni,
87 "",
88 proxy_stream,
89 &context.tls_session_store,
90 )
91 .await?;
92 Box::new(tls_stream)
93 } else {
94 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
95 let tls_stream = wrap_rustls(
96 host,
97 port,
98 use_sni,
99 "",
100 tcp_stream,
101 &context.tls_session_store,
102 )
103 .await?;
104 Box::new(tls_stream)
105 }
106 }
107 _ => bail!("Unknown URL scheme"),
108 };
109
110 let io = TokioIo::new(stream);
111 let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
112 tokio::task::spawn(conn);
113
114 Ok(sender)
115}
116
117fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
119 let now = time();
120
121 let expires = now + 3600 * 24 * 35;
122 let stale = if url.ends_with(".xdc") {
123 expires
125 } else if url.starts_with("https://tile.openstreetmap.org/")
126 || url.starts_with("https://vector.openstreetmap.org/")
127 {
128 now + 3600 * 24 * 7
132 } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
133 now + 3600 * 24
139 } else {
140 now + 3600
144 };
145 (expires, stale)
146}
147
148async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
150 let blob =
151 BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
152
153 let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
154 context
155 .sql
156 .insert(
157 "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
158 VALUES (?, ?, ?, ?, ?, ?)",
159 (
160 url,
161 expires,
162 stale,
163 blob.as_name(),
164 response.mimetype.as_deref().unwrap_or_default(),
165 response.encoding.as_deref().unwrap_or_default(),
166 ),
167 )
168 .await?;
169
170 Ok(())
171}
172
173async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
177 let now = time();
178 let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
179 .sql
180 .query_row_optional(
181 "SELECT blobname, mimetype, encoding, stale
182 FROM http_cache WHERE url=? AND expires > ?",
183 (url, now),
184 |row| {
185 let blob_name: String = row.get(0)?;
186 let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
187 let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
188 let stale_timestamp: i64 = row.get(3)?;
189 Ok((blob_name, mimetype, encoding, stale_timestamp))
190 },
191 )
192 .await?
193 else {
194 return Ok(None);
195 };
196 let is_stale = now > stale_timestamp;
197
198 let blob_object = BlobObject::from_name(context, &blob_name)?;
199 let blob_abs_path = blob_object.to_abs_path();
200 let blob = match fs::read(blob_abs_path)
201 .await
202 .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
203 {
204 Ok(blob) => blob,
205 Err(err) => {
206 warn!(context, "{err:?}.");
209 return Ok(None);
210 }
211 };
212
213 let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
214 let response = Response {
215 blob,
216 mimetype,
217 encoding,
218 };
219
220 let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
228 context
229 .sql
230 .execute(
231 "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
232 (expires, stale_timestamp, url),
233 )
234 .await?;
235
236 Ok(Some((response, is_stale)))
237}
238
239pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
241 context
245 .sql
246 .execute(
247 "DELETE FROM http_cache
248 WHERE ?1 > expires OR expires > ?1 + 31536000",
249 (time(),),
250 )
251 .await?;
252 Ok(())
253}
254
255async fn fetch_url(context: &Context, original_url: &str) -> Result<Response> {
259 let mut url = original_url.to_string();
260
261 for _i in 0..10 {
263 let parsed_url = url
264 .parse::<hyper::Uri>()
265 .with_context(|| format!("Failed to parse URL {url:?}"))?;
266
267 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
268 let authority = parsed_url
269 .authority()
270 .context("URL has no authority")?
271 .clone();
272
273 let req = hyper::Request::builder().uri(parsed_url);
274
275 let req =
282 if authority == "tile.openstreetmap.org" || authority == "vector.openstreetmap.org" {
283 req.header("User-Agent", USER_AGENT)
284 } else {
285 req
286 };
287
288 let req = req
289 .header(hyper::header::HOST, authority.as_str())
290 .body(http_body_util::Empty::<Bytes>::new())?;
291 let response = sender.send_request(req).await?;
292
293 if response.status().is_redirection() {
294 let header = response
295 .headers()
296 .get_all("location")
297 .iter()
298 .next_back()
299 .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
300 .to_str()?;
301 info!(context, "Following redirect to {}", header);
302 url = header.to_string();
303 continue;
304 }
305
306 if !response.status().is_success() {
307 return Err(anyhow!(
308 "The server returned a non-successful response code: {}{}",
309 response.status().as_u16(),
310 response
311 .status()
312 .canonical_reason()
313 .map(|s| format!(" {s}"))
314 .unwrap_or("".to_string())
315 ));
316 }
317
318 let content_type = response
319 .headers()
320 .get("content-type")
321 .and_then(|value| value.to_str().ok())
322 .and_then(|value| value.parse::<Mime>().ok());
323 let mimetype = content_type
324 .as_ref()
325 .map(|mime| mime.essence_str().to_string());
326 let encoding = content_type.as_ref().and_then(|mime| {
327 mime.get_param(mime::CHARSET)
328 .map(|charset| charset.as_str().to_string())
329 });
330 let body = response.collect().await?.to_bytes();
331 let blob: Vec<u8> = body.to_vec();
332 let response = Response {
333 blob,
334 mimetype,
335 encoding,
336 };
337 info!(context, "Inserting {original_url:?} into cache.");
338 http_cache_put(context, &url, &response).await?;
339 return Ok(response);
340 }
341
342 Err(anyhow!("Followed 10 redirections"))
343}
344
345pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
347 if let Some((response, is_stale)) = http_cache_get(context, url).await? {
348 info!(context, "Returning {url:?} from cache.");
349 if is_stale {
350 let context = context.clone();
351 let url = url.to_string();
352 tokio::spawn(async move {
353 info!(context, "Fetching stale {url:?} in background.");
355 if let Err(err) = fetch_url(&context, &url).await {
356 warn!(context, "Failed to revalidate {url:?}: {err:#}.");
357 }
358 });
359 }
360 return Ok(response);
361 }
362
363 info!(context, "Not found {url:?} in cache, fetching.");
364 let response = fetch_url(context, url).await?;
365 Ok(response)
366}
367
368pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
374 let parsed_url = url
375 .parse::<hyper::Uri>()
376 .with_context(|| format!("Failed to parse URL {url:?}"))?;
377 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
378 if scheme != "https" {
379 bail!("POST requests to non-HTTPS URLs are not allowed");
380 }
381
382 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
383 let authority = parsed_url
384 .authority()
385 .context("URL has no authority")?
386 .clone();
387 let req = hyper::Request::post(parsed_url)
388 .header(hyper::header::HOST, authority.as_str())
389 .body(http_body_util::Empty::<Bytes>::new())?;
390
391 let response = sender.send_request(req).await?;
392
393 let response_status = response.status();
394 let body = response.collect().await?.to_bytes();
395 let text = String::from_utf8_lossy(&body);
396 let response_text = text.to_string();
397
398 Ok((response_text, response_status.is_success()))
399}
400
401#[allow(dead_code)]
407pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
408 let parsed_url = url
409 .parse::<hyper::Uri>()
410 .with_context(|| format!("Failed to parse URL {url:?}"))?;
411 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
412 if scheme != "https" {
413 bail!("POST requests to non-HTTPS URLs are not allowed");
414 }
415
416 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
417 let authority = parsed_url
418 .authority()
419 .context("URL has no authority")?
420 .clone();
421
422 let request = hyper::Request::post(parsed_url)
423 .header(hyper::header::HOST, authority.as_str())
424 .body(body)?;
425 let response = sender.send_request(request).await?;
426
427 Ok(response.status().is_success())
428}
429
430pub(crate) async fn post_form<T: Serialize + ?Sized>(
434 context: &Context,
435 url: &str,
436 form: &T,
437) -> Result<Bytes> {
438 let parsed_url = url
439 .parse::<hyper::Uri>()
440 .with_context(|| format!("Failed to parse URL {url:?}"))?;
441 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
442 if scheme != "https" {
443 bail!("POST requests to non-HTTPS URLs are not allowed");
444 }
445
446 let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
447 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
448 let authority = parsed_url
449 .authority()
450 .context("URL has no authority")?
451 .clone();
452 let request = hyper::Request::post(parsed_url)
453 .header(hyper::header::HOST, authority.as_str())
454 .header("content-type", "application/x-www-form-urlencoded")
455 .body(encoded_body)?;
456 let response = sender.send_request(request).await?;
457 let bytes = response.collect().await?.to_bytes();
458 Ok(bytes)
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use std::time::Duration;
465
466 use crate::sql::housekeeping;
467 use crate::test_utils::TestContext;
468 use crate::tools::SystemTime;
469
470 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
471 async fn test_http_cache() -> Result<()> {
472 let t = &TestContext::new().await;
473
474 assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
475
476 let html_response = Response {
477 blob: b"<!DOCTYPE html> ...".to_vec(),
478 mimetype: Some("text/html".to_string()),
479 encoding: None,
480 };
481
482 let xdc_response = Response {
483 blob: b"PK...".to_vec(),
484 mimetype: Some("application/octet-stream".to_string()),
485 encoding: None,
486 };
487 let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
488 let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
489
490 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
491
492 assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
493 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
494 assert_eq!(
495 http_cache_get(t, "https://webxdc.org/").await?,
496 Some((html_response.clone(), false))
497 );
498
499 http_cache_put(t, xdc_editor_url, &xdc_response).await?;
500 http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
501 assert_eq!(
502 http_cache_get(t, xdc_editor_url).await?,
503 Some((xdc_response.clone(), false))
504 );
505 assert_eq!(
506 http_cache_get(t, xdc_pixel_url).await?,
507 Some((xdc_response.clone(), false))
508 );
509
510 assert_eq!(
511 http_cache_get(t, "https://webxdc.org/").await?,
512 Some((html_response.clone(), false))
513 );
514
515 SystemTime::shift(Duration::from_secs(3600 + 100));
517 assert_eq!(
518 http_cache_get(t, "https://webxdc.org/").await?,
519 Some((html_response.clone(), true))
520 );
521 assert_eq!(
522 http_cache_get(t, xdc_editor_url).await?,
523 Some((xdc_response.clone(), false))
524 );
525
526 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
529 assert_eq!(
530 http_cache_get(t, "https://webxdc.org/").await?,
531 Some((html_response.clone(), false))
532 );
533
534 SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
538
539 housekeeping(t).await?;
541
542 assert_eq!(
543 http_cache_get(t, xdc_editor_url).await?,
544 Some((xdc_response.clone(), true))
545 );
546 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
547
548 assert_eq!(
551 http_cache_get(t, xdc_editor_url).await?,
552 Some((xdc_response.clone(), false))
553 );
554 for i in (0..100).rev() {
557 SystemTime::shift(Duration::from_secs(6));
558 if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
559 break;
560 }
561 assert!(i > 0);
562 }
563
564 for entry in std::fs::read_dir(t.get_blobdir())? {
567 let entry = entry.unwrap();
568 let path = entry.path();
569 std::fs::remove_file(path).expect("Failed to remove blob");
570 }
571
572 assert_eq!(
573 http_cache_get(t, xdc_editor_url)
574 .await
575 .context("Failed to get no cache response")?,
576 None
577 );
578
579 Ok(())
580 }
581}