1use anyhow::{anyhow, bail, Context as _, Result};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::log::{info, warn};
14use crate::net::proxy::ProxyConfig;
15use crate::net::session::SessionStream;
16use crate::net::tls::wrap_rustls;
17use crate::tools::time;
18
19#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct Response {
22 pub blob: Vec<u8>,
24
25 pub mimetype: Option<String>,
27
28 pub encoding: Option<String>,
30}
31
32pub async fn read_url(context: &Context, url: &str) -> Result<String> {
34 let response = read_url_blob(context, url).await?;
35 let text = String::from_utf8_lossy(&response.blob);
36 Ok(text.to_string())
37}
38
39async fn get_http_sender<B>(
40 context: &Context,
41 parsed_url: hyper::Uri,
42) -> Result<hyper::client::conn::http1::SendRequest<B>>
43where
44 B: hyper::body::Body + 'static + Send,
45 B::Data: Send,
46 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
47{
48 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
49 let host = parsed_url.host().context("URL has no host")?;
50 let proxy_config_opt = ProxyConfig::load(context).await?;
51
52 let stream: Box<dyn SessionStream> = match scheme {
53 "http" => {
54 let port = parsed_url.port_u16().unwrap_or(80);
55
56 let load_cache = false;
61 if let Some(proxy_config) = proxy_config_opt {
62 let proxy_stream = proxy_config
63 .connect(context, host, port, load_cache)
64 .await?;
65 Box::new(proxy_stream)
66 } else {
67 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
68 Box::new(tcp_stream)
69 }
70 }
71 "https" => {
72 let port = parsed_url.port_u16().unwrap_or(443);
73 let load_cache = true;
74
75 if let Some(proxy_config) = proxy_config_opt {
76 let proxy_stream = proxy_config
77 .connect(context, host, port, load_cache)
78 .await?;
79 let tls_stream = wrap_rustls(host, &[], proxy_stream).await?;
80 Box::new(tls_stream)
81 } else {
82 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
83 let tls_stream = wrap_rustls(host, &[], tcp_stream).await?;
84 Box::new(tls_stream)
85 }
86 }
87 _ => bail!("Unknown URL scheme"),
88 };
89
90 let io = TokioIo::new(stream);
91 let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
92 tokio::task::spawn(conn);
93
94 Ok(sender)
95}
96
97fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
99 let now = time();
100
101 let expires = now + 3600 * 24 * 35;
102 let stale = if url.ends_with(".xdc") {
103 expires
105 } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
106 now + 3600 * 24
112 } else {
113 now + 3600
117 };
118 (expires, stale)
119}
120
121async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
123 let blob =
124 BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
125
126 let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
127 context
128 .sql
129 .insert(
130 "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
131 VALUES (?, ?, ?, ?, ?, ?)",
132 (
133 url,
134 expires,
135 stale,
136 blob.as_name(),
137 response.mimetype.as_deref().unwrap_or_default(),
138 response.encoding.as_deref().unwrap_or_default(),
139 ),
140 )
141 .await?;
142
143 Ok(())
144}
145
146async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
150 let now = time();
151 let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
152 .sql
153 .query_row_optional(
154 "SELECT blobname, mimetype, encoding, stale
155 FROM http_cache WHERE url=? AND expires > ?",
156 (url, now),
157 |row| {
158 let blob_name: String = row.get(0)?;
159 let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
160 let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
161 let stale_timestamp: i64 = row.get(3)?;
162 Ok((blob_name, mimetype, encoding, stale_timestamp))
163 },
164 )
165 .await?
166 else {
167 return Ok(None);
168 };
169 let is_stale = now > stale_timestamp;
170
171 let blob_object = BlobObject::from_name(context, &blob_name)?;
172 let blob_abs_path = blob_object.to_abs_path();
173 let blob = match fs::read(blob_abs_path)
174 .await
175 .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
176 {
177 Ok(blob) => blob,
178 Err(err) => {
179 warn!(context, "{err:?}.");
182 return Ok(None);
183 }
184 };
185
186 let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
187 let response = Response {
188 blob,
189 mimetype,
190 encoding,
191 };
192
193 let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
201 context
202 .sql
203 .execute(
204 "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
205 (expires, stale_timestamp, url),
206 )
207 .await?;
208
209 Ok(Some((response, is_stale)))
210}
211
212pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
214 context
218 .sql
219 .execute(
220 "DELETE FROM http_cache
221 WHERE ?1 > expires OR expires > ?1 + 31536000",
222 (time(),),
223 )
224 .await?;
225 Ok(())
226}
227
228async fn fetch_url(context: &Context, original_url: &str) -> Result<Response> {
232 let mut url = original_url.to_string();
233
234 for _i in 0..10 {
236 let parsed_url = url
237 .parse::<hyper::Uri>()
238 .with_context(|| format!("Failed to parse URL {url:?}"))?;
239
240 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
241 let authority = parsed_url
242 .authority()
243 .context("URL has no authority")?
244 .clone();
245
246 let req = hyper::Request::builder()
247 .uri(parsed_url.path())
248 .header(hyper::header::HOST, authority.as_str())
249 .body(http_body_util::Empty::<Bytes>::new())?;
250 let response = sender.send_request(req).await?;
251
252 if response.status().is_redirection() {
253 let header = response
254 .headers()
255 .get_all("location")
256 .iter()
257 .next_back()
258 .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
259 .to_str()?;
260 info!(context, "Following redirect to {}", header);
261 url = header.to_string();
262 continue;
263 }
264
265 if !response.status().is_success() {
266 return Err(anyhow!(
267 "The server returned a non-successful response code: {}{}",
268 response.status().as_u16(),
269 response
270 .status()
271 .canonical_reason()
272 .map(|s| format!(" {s}"))
273 .unwrap_or("".to_string())
274 ));
275 }
276
277 let content_type = response
278 .headers()
279 .get("content-type")
280 .and_then(|value| value.to_str().ok())
281 .and_then(|value| value.parse::<Mime>().ok());
282 let mimetype = content_type
283 .as_ref()
284 .map(|mime| mime.essence_str().to_string());
285 let encoding = content_type.as_ref().and_then(|mime| {
286 mime.get_param(mime::CHARSET)
287 .map(|charset| charset.as_str().to_string())
288 });
289 let body = response.collect().await?.to_bytes();
290 let blob: Vec<u8> = body.to_vec();
291 let response = Response {
292 blob,
293 mimetype,
294 encoding,
295 };
296 info!(context, "Inserting {original_url:?} into cache.");
297 http_cache_put(context, &url, &response).await?;
298 return Ok(response);
299 }
300
301 Err(anyhow!("Followed 10 redirections"))
302}
303
304pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
306 if let Some((response, is_stale)) = http_cache_get(context, url).await? {
307 info!(context, "Returning {url:?} from cache.");
308 if is_stale {
309 let context = context.clone();
310 let url = url.to_string();
311 tokio::spawn(async move {
312 info!(context, "Fetching stale {url:?} in background.");
314 if let Err(err) = fetch_url(&context, &url).await {
315 warn!(context, "Failed to revalidate {url:?}: {err:#}.");
316 }
317 });
318 }
319 return Ok(response);
320 }
321
322 info!(context, "Not found {url:?} in cache, fetching.");
323 let response = fetch_url(context, url).await?;
324 Ok(response)
325}
326
327pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
333 let parsed_url = url
334 .parse::<hyper::Uri>()
335 .with_context(|| format!("Failed to parse URL {url:?}"))?;
336 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
337 if scheme != "https" {
338 bail!("POST requests to non-HTTPS URLs are not allowed");
339 }
340
341 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
342 let authority = parsed_url
343 .authority()
344 .context("URL has no authority")?
345 .clone();
346 let req = hyper::Request::post(parsed_url.path())
347 .header(hyper::header::HOST, authority.as_str())
348 .body(http_body_util::Empty::<Bytes>::new())?;
349
350 let response = sender.send_request(req).await?;
351
352 let response_status = response.status();
353 let body = response.collect().await?.to_bytes();
354 let text = String::from_utf8_lossy(&body);
355 let response_text = text.to_string();
356
357 Ok((response_text, response_status.is_success()))
358}
359
360#[allow(dead_code)]
366pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
367 let parsed_url = url
368 .parse::<hyper::Uri>()
369 .with_context(|| format!("Failed to parse URL {url:?}"))?;
370 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
371 if scheme != "https" {
372 bail!("POST requests to non-HTTPS URLs are not allowed");
373 }
374
375 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
376 let authority = parsed_url
377 .authority()
378 .context("URL has no authority")?
379 .clone();
380
381 let request = hyper::Request::post(parsed_url.path())
382 .header(hyper::header::HOST, authority.as_str())
383 .body(body)?;
384 let response = sender.send_request(request).await?;
385
386 Ok(response.status().is_success())
387}
388
389pub(crate) async fn post_form<T: Serialize + ?Sized>(
393 context: &Context,
394 url: &str,
395 form: &T,
396) -> Result<Bytes> {
397 let parsed_url = url
398 .parse::<hyper::Uri>()
399 .with_context(|| format!("Failed to parse URL {url:?}"))?;
400 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
401 if scheme != "https" {
402 bail!("POST requests to non-HTTPS URLs are not allowed");
403 }
404
405 let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
406 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
407 let authority = parsed_url
408 .authority()
409 .context("URL has no authority")?
410 .clone();
411 let request = hyper::Request::post(parsed_url.path())
412 .header(hyper::header::HOST, authority.as_str())
413 .header("content-type", "application/x-www-form-urlencoded")
414 .body(encoded_body)?;
415 let response = sender.send_request(request).await?;
416 let bytes = response.collect().await?.to_bytes();
417 Ok(bytes)
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423 use std::time::Duration;
424
425 use crate::sql::housekeeping;
426 use crate::test_utils::TestContext;
427 use crate::tools::SystemTime;
428
429 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
430 async fn test_http_cache() -> Result<()> {
431 let t = &TestContext::new().await;
432
433 assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
434
435 let html_response = Response {
436 blob: b"<!DOCTYPE html> ...".to_vec(),
437 mimetype: Some("text/html".to_string()),
438 encoding: None,
439 };
440
441 let xdc_response = Response {
442 blob: b"PK...".to_vec(),
443 mimetype: Some("application/octet-stream".to_string()),
444 encoding: None,
445 };
446 let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
447 let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
448
449 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
450
451 assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
452 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
453 assert_eq!(
454 http_cache_get(t, "https://webxdc.org/").await?,
455 Some((html_response.clone(), false))
456 );
457
458 http_cache_put(t, xdc_editor_url, &xdc_response).await?;
459 http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
460 assert_eq!(
461 http_cache_get(t, xdc_editor_url).await?,
462 Some((xdc_response.clone(), false))
463 );
464 assert_eq!(
465 http_cache_get(t, xdc_pixel_url).await?,
466 Some((xdc_response.clone(), false))
467 );
468
469 assert_eq!(
470 http_cache_get(t, "https://webxdc.org/").await?,
471 Some((html_response.clone(), false))
472 );
473
474 SystemTime::shift(Duration::from_secs(3600 + 100));
476 assert_eq!(
477 http_cache_get(t, "https://webxdc.org/").await?,
478 Some((html_response.clone(), true))
479 );
480 assert_eq!(
481 http_cache_get(t, xdc_editor_url).await?,
482 Some((xdc_response.clone(), false))
483 );
484
485 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
488 assert_eq!(
489 http_cache_get(t, "https://webxdc.org/").await?,
490 Some((html_response.clone(), false))
491 );
492
493 SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
497
498 housekeeping(t).await?;
500
501 assert_eq!(
502 http_cache_get(t, xdc_editor_url).await?,
503 Some((xdc_response.clone(), true))
504 );
505 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
506
507 assert_eq!(
510 http_cache_get(t, xdc_editor_url).await?,
511 Some((xdc_response.clone(), false))
512 );
513 for i in (0..100).rev() {
516 SystemTime::shift(Duration::from_secs(6));
517 if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
518 break;
519 }
520 assert!(i > 0);
521 }
522
523 for entry in std::fs::read_dir(t.get_blobdir())? {
526 let entry = entry.unwrap();
527 let path = entry.path();
528 std::fs::remove_file(path).expect("Failed to remove blob");
529 }
530
531 assert_eq!(
532 http_cache_get(t, xdc_editor_url)
533 .await
534 .context("Failed to get no cache response")?,
535 None
536 );
537
538 Ok(())
539 }
540}