1use anyhow::{anyhow, bail, Context as _, Result};
4use bytes::Bytes;
5use http_body_util::BodyExt;
6use hyper_util::rt::TokioIo;
7use mime::Mime;
8use serde::Serialize;
9use tokio::fs;
10
11use crate::blob::BlobObject;
12use crate::context::Context;
13use crate::net::proxy::ProxyConfig;
14use crate::net::session::SessionStream;
15use crate::net::tls::wrap_rustls;
16use crate::tools::time;
17
18#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct Response {
21 pub blob: Vec<u8>,
23
24 pub mimetype: Option<String>,
26
27 pub encoding: Option<String>,
29}
30
31pub async fn read_url(context: &Context, url: &str) -> Result<String> {
33 let response = read_url_blob(context, url).await?;
34 let text = String::from_utf8_lossy(&response.blob);
35 Ok(text.to_string())
36}
37
38async fn get_http_sender<B>(
39 context: &Context,
40 parsed_url: hyper::Uri,
41) -> Result<hyper::client::conn::http1::SendRequest<B>>
42where
43 B: hyper::body::Body + 'static + Send,
44 B::Data: Send,
45 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
46{
47 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
48 let host = parsed_url.host().context("URL has no host")?;
49 let proxy_config_opt = ProxyConfig::load(context).await?;
50
51 let stream: Box<dyn SessionStream> = match scheme {
52 "http" => {
53 let port = parsed_url.port_u16().unwrap_or(80);
54
55 let load_cache = false;
60 if let Some(proxy_config) = proxy_config_opt {
61 let proxy_stream = proxy_config
62 .connect(context, host, port, load_cache)
63 .await?;
64 Box::new(proxy_stream)
65 } else {
66 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
67 Box::new(tcp_stream)
68 }
69 }
70 "https" => {
71 let port = parsed_url.port_u16().unwrap_or(443);
72 let load_cache = true;
73
74 if let Some(proxy_config) = proxy_config_opt {
75 let proxy_stream = proxy_config
76 .connect(context, host, port, load_cache)
77 .await?;
78 let tls_stream = wrap_rustls(host, &[], proxy_stream).await?;
79 Box::new(tls_stream)
80 } else {
81 let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
82 let tls_stream = wrap_rustls(host, &[], tcp_stream).await?;
83 Box::new(tls_stream)
84 }
85 }
86 _ => bail!("Unknown URL scheme"),
87 };
88
89 let io = TokioIo::new(stream);
90 let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
91 tokio::task::spawn(conn);
92
93 Ok(sender)
94}
95
96fn http_url_cache_timestamps(url: &str, mimetype: Option<&str>) -> (i64, i64) {
98 let now = time();
99
100 let expires = now + 3600 * 24 * 35;
101 let stale = if url.ends_with(".xdc") {
102 expires
104 } else if mimetype.is_some_and(|s| s.starts_with("image/")) {
105 now + 3600 * 24
111 } else {
112 now + 3600
116 };
117 (expires, stale)
118}
119
120async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
122 let blob =
123 BlobObject::create_and_deduplicate_from_bytes(context, response.blob.as_slice(), "")?;
124
125 let (expires, stale) = http_url_cache_timestamps(url, response.mimetype.as_deref());
126 context
127 .sql
128 .insert(
129 "INSERT OR REPLACE INTO http_cache (url, expires, stale, blobname, mimetype, encoding)
130 VALUES (?, ?, ?, ?, ?, ?)",
131 (
132 url,
133 expires,
134 stale,
135 blob.as_name(),
136 response.mimetype.as_deref().unwrap_or_default(),
137 response.encoding.as_deref().unwrap_or_default(),
138 ),
139 )
140 .await?;
141
142 Ok(())
143}
144
145async fn http_cache_get(context: &Context, url: &str) -> Result<Option<(Response, bool)>> {
149 let now = time();
150 let Some((blob_name, mimetype, encoding, stale_timestamp)) = context
151 .sql
152 .query_row_optional(
153 "SELECT blobname, mimetype, encoding, stale
154 FROM http_cache WHERE url=? AND expires > ?",
155 (url, now),
156 |row| {
157 let blob_name: String = row.get(0)?;
158 let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
159 let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
160 let stale_timestamp: i64 = row.get(3)?;
161 Ok((blob_name, mimetype, encoding, stale_timestamp))
162 },
163 )
164 .await?
165 else {
166 return Ok(None);
167 };
168 let is_stale = now > stale_timestamp;
169
170 let blob_object = BlobObject::from_name(context, &blob_name)?;
171 let blob_abs_path = blob_object.to_abs_path();
172 let blob = match fs::read(blob_abs_path)
173 .await
174 .with_context(|| format!("Failed to read blob for {url:?} cache entry."))
175 {
176 Ok(blob) => blob,
177 Err(err) => {
178 warn!(context, "{err:?}.");
181 return Ok(None);
182 }
183 };
184
185 let (expires, _stale) = http_url_cache_timestamps(url, mimetype.as_deref());
186 let response = Response {
187 blob,
188 mimetype,
189 encoding,
190 };
191
192 let stale_timestamp = if is_stale { now + 60 } else { stale_timestamp };
200 context
201 .sql
202 .execute(
203 "UPDATE http_cache SET expires=?, stale=? WHERE url=?",
204 (expires, stale_timestamp, url),
205 )
206 .await?;
207
208 Ok(Some((response, is_stale)))
209}
210
211pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
213 context
217 .sql
218 .execute(
219 "DELETE FROM http_cache
220 WHERE ?1 > expires OR expires > ?1 + 31536000",
221 (time(),),
222 )
223 .await?;
224 Ok(())
225}
226
227async fn fetch_url(context: &Context, original_url: &str) -> Result<Response> {
231 let mut url = original_url.to_string();
232
233 for _i in 0..10 {
235 let parsed_url = url
236 .parse::<hyper::Uri>()
237 .with_context(|| format!("Failed to parse URL {url:?}"))?;
238
239 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
240 let authority = parsed_url
241 .authority()
242 .context("URL has no authority")?
243 .clone();
244
245 let req = hyper::Request::builder()
246 .uri(parsed_url.path())
247 .header(hyper::header::HOST, authority.as_str())
248 .body(http_body_util::Empty::<Bytes>::new())?;
249 let response = sender.send_request(req).await?;
250
251 if response.status().is_redirection() {
252 let header = response
253 .headers()
254 .get_all("location")
255 .iter()
256 .next_back()
257 .ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
258 .to_str()?;
259 info!(context, "Following redirect to {}", header);
260 url = header.to_string();
261 continue;
262 }
263
264 let content_type = response
265 .headers()
266 .get("content-type")
267 .and_then(|value| value.to_str().ok())
268 .and_then(|value| value.parse::<Mime>().ok());
269 let mimetype = content_type
270 .as_ref()
271 .map(|mime| mime.essence_str().to_string());
272 let encoding = content_type.as_ref().and_then(|mime| {
273 mime.get_param(mime::CHARSET)
274 .map(|charset| charset.as_str().to_string())
275 });
276 let body = response.collect().await?.to_bytes();
277 let blob: Vec<u8> = body.to_vec();
278 let response = Response {
279 blob,
280 mimetype,
281 encoding,
282 };
283 info!(context, "Inserting {original_url:?} into cache.");
284 http_cache_put(context, &url, &response).await?;
285 return Ok(response);
286 }
287
288 Err(anyhow!("Followed 10 redirections"))
289}
290
291pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
293 if let Some((response, is_stale)) = http_cache_get(context, url).await? {
294 info!(context, "Returning {url:?} from cache.");
295 if is_stale {
296 let context = context.clone();
297 let url = url.to_string();
298 tokio::spawn(async move {
299 info!(context, "Fetching stale {url:?} in background.");
301 if let Err(err) = fetch_url(&context, &url).await {
302 warn!(context, "Failed to revalidate {url:?}: {err:#}.");
303 }
304 });
305 }
306 return Ok(response);
307 }
308
309 info!(context, "Not found {url:?} in cache, fetching.");
310 let response = fetch_url(context, url).await?;
311 Ok(response)
312}
313
314pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
320 let parsed_url = url
321 .parse::<hyper::Uri>()
322 .with_context(|| format!("Failed to parse URL {url:?}"))?;
323 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
324 if scheme != "https" {
325 bail!("POST requests to non-HTTPS URLs are not allowed");
326 }
327
328 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
329 let authority = parsed_url
330 .authority()
331 .context("URL has no authority")?
332 .clone();
333 let req = hyper::Request::post(parsed_url.path())
334 .header(hyper::header::HOST, authority.as_str())
335 .body(http_body_util::Empty::<Bytes>::new())?;
336
337 let response = sender.send_request(req).await?;
338
339 let response_status = response.status();
340 let body = response.collect().await?.to_bytes();
341 let text = String::from_utf8_lossy(&body);
342 let response_text = text.to_string();
343
344 Ok((response_text, response_status.is_success()))
345}
346
347#[allow(dead_code)]
353pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
354 let parsed_url = url
355 .parse::<hyper::Uri>()
356 .with_context(|| format!("Failed to parse URL {url:?}"))?;
357 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
358 if scheme != "https" {
359 bail!("POST requests to non-HTTPS URLs are not allowed");
360 }
361
362 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
363 let authority = parsed_url
364 .authority()
365 .context("URL has no authority")?
366 .clone();
367
368 let request = hyper::Request::post(parsed_url.path())
369 .header(hyper::header::HOST, authority.as_str())
370 .body(body)?;
371 let response = sender.send_request(request).await?;
372
373 Ok(response.status().is_success())
374}
375
376pub(crate) async fn post_form<T: Serialize + ?Sized>(
380 context: &Context,
381 url: &str,
382 form: &T,
383) -> Result<Bytes> {
384 let parsed_url = url
385 .parse::<hyper::Uri>()
386 .with_context(|| format!("Failed to parse URL {url:?}"))?;
387 let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
388 if scheme != "https" {
389 bail!("POST requests to non-HTTPS URLs are not allowed");
390 }
391
392 let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
393 let mut sender = get_http_sender(context, parsed_url.clone()).await?;
394 let authority = parsed_url
395 .authority()
396 .context("URL has no authority")?
397 .clone();
398 let request = hyper::Request::post(parsed_url.path())
399 .header(hyper::header::HOST, authority.as_str())
400 .header("content-type", "application/x-www-form-urlencoded")
401 .body(encoded_body)?;
402 let response = sender.send_request(request).await?;
403 let bytes = response.collect().await?.to_bytes();
404 Ok(bytes)
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use std::time::Duration;
411
412 use crate::sql::housekeeping;
413 use crate::test_utils::TestContext;
414 use crate::tools::SystemTime;
415
416 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
417 async fn test_http_cache() -> Result<()> {
418 let t = &TestContext::new().await;
419
420 assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
421
422 let html_response = Response {
423 blob: b"<!DOCTYPE html> ...".to_vec(),
424 mimetype: Some("text/html".to_string()),
425 encoding: None,
426 };
427
428 let xdc_response = Response {
429 blob: b"PK...".to_vec(),
430 mimetype: Some("application/octet-stream".to_string()),
431 encoding: None,
432 };
433 let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
434 let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";
435
436 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
437
438 assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
439 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
440 assert_eq!(
441 http_cache_get(t, "https://webxdc.org/").await?,
442 Some((html_response.clone(), false))
443 );
444
445 http_cache_put(t, xdc_editor_url, &xdc_response).await?;
446 http_cache_put(t, xdc_pixel_url, &xdc_response).await?;
447 assert_eq!(
448 http_cache_get(t, xdc_editor_url).await?,
449 Some((xdc_response.clone(), false))
450 );
451 assert_eq!(
452 http_cache_get(t, xdc_pixel_url).await?,
453 Some((xdc_response.clone(), false))
454 );
455
456 assert_eq!(
457 http_cache_get(t, "https://webxdc.org/").await?,
458 Some((html_response.clone(), false))
459 );
460
461 SystemTime::shift(Duration::from_secs(3600 + 100));
463 assert_eq!(
464 http_cache_get(t, "https://webxdc.org/").await?,
465 Some((html_response.clone(), true))
466 );
467 assert_eq!(
468 http_cache_get(t, xdc_editor_url).await?,
469 Some((xdc_response.clone(), false))
470 );
471
472 http_cache_put(t, "https://webxdc.org/", &html_response).await?;
475 assert_eq!(
476 http_cache_get(t, "https://webxdc.org/").await?,
477 Some((html_response.clone(), false))
478 );
479
480 SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));
484
485 housekeeping(t).await?;
487
488 assert_eq!(
489 http_cache_get(t, xdc_editor_url).await?,
490 Some((xdc_response.clone(), true))
491 );
492 assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
493
494 assert_eq!(
497 http_cache_get(t, xdc_editor_url).await?,
498 Some((xdc_response.clone(), false))
499 );
500 for i in (0..100).rev() {
503 SystemTime::shift(Duration::from_secs(6));
504 if let Some((_, true)) = http_cache_get(t, xdc_editor_url).await? {
505 break;
506 }
507 assert!(i > 0);
508 }
509
510 for entry in std::fs::read_dir(t.get_blobdir())? {
513 let entry = entry.unwrap();
514 let path = entry.path();
515 std::fs::remove_file(path).expect("Failed to remove blob");
516 }
517
518 assert_eq!(
519 http_cache_get(t, xdc_editor_url)
520 .await
521 .context("Failed to get no cache response")?,
522 None
523 );
524
525 Ok(())
526 }
527}