1use std::cmp::max;
4use std::collections::BTreeMap;
5
6use anyhow::{Result, anyhow, bail, ensure};
7use deltachat_derive::{FromSql, ToSql};
8use serde::{Deserialize, Serialize};
9
10use crate::config::Config;
11use crate::context::Context;
12use crate::imap::session::Session;
13use crate::log::info;
14use crate::message::{Message, MsgId, Viewtype};
15use crate::mimeparser::{MimeMessage, Part};
16use crate::tools::time;
17use crate::{EventType, chatlist_events, stock_str};
18
19pub(crate) const MIN_DOWNLOAD_LIMIT: u32 = 163840;
26
27pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
32
33#[derive(
35 Debug,
36 Default,
37 Display,
38 Clone,
39 Copy,
40 PartialEq,
41 Eq,
42 FromPrimitive,
43 ToPrimitive,
44 FromSql,
45 ToSql,
46 Serialize,
47 Deserialize,
48)]
49#[repr(u32)]
50pub enum DownloadState {
51 #[default]
53 Done = 0,
54
55 Available = 10,
57
58 Failure = 20,
60
61 Undecipherable = 30,
63
64 InProgress = 1000,
66}
67
68impl Context {
69 pub(crate) async fn download_limit(&self) -> Result<Option<u32>> {
71 let download_limit = self.get_config_int(Config::DownloadLimit).await?;
72 if download_limit <= 0 {
73 Ok(None)
74 } else {
75 Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
76 }
77 }
78}
79
80impl MsgId {
81 pub async fn download_full(self, context: &Context) -> Result<()> {
83 let msg = Message::load_from_db(context, self).await?;
84 match msg.download_state() {
85 DownloadState::Done | DownloadState::Undecipherable => {
86 return Err(anyhow!("Nothing to download."));
87 }
88 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
89 DownloadState::Available | DownloadState::Failure => {
90 self.update_download_state(context, DownloadState::InProgress)
91 .await?;
92 context
93 .sql
94 .execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
95 .await?;
96 context.scheduler.interrupt_inbox().await;
97 }
98 }
99 Ok(())
100 }
101
102 pub(crate) async fn update_download_state(
104 self,
105 context: &Context,
106 download_state: DownloadState,
107 ) -> Result<()> {
108 if context
109 .sql
110 .execute(
111 "UPDATE msgs SET download_state=? WHERE id=?;",
112 (download_state, self),
113 )
114 .await?
115 == 0
116 {
117 return Ok(());
118 }
119 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
120 return Ok(());
121 };
122 context.emit_event(EventType::MsgsChanged {
123 chat_id: msg.chat_id,
124 msg_id: self,
125 });
126 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
127 Ok(())
128 }
129}
130
131impl Message {
132 pub fn download_state(&self) -> DownloadState {
134 self.download_state
135 }
136}
137
138pub(crate) async fn download_msg(
142 context: &Context,
143 msg_id: MsgId,
144 session: &mut Session,
145) -> Result<()> {
146 let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else {
147 return Ok(());
155 };
156
157 let row = context
158 .sql
159 .query_row_optional(
160 "SELECT uid, folder FROM imap WHERE rfc724_mid=? AND target!=''",
161 (&msg.rfc724_mid,),
162 |row| {
163 let server_uid: u32 = row.get(0)?;
164 let server_folder: String = row.get(1)?;
165 Ok((server_uid, server_folder))
166 },
167 )
168 .await?;
169
170 let Some((server_uid, server_folder)) = row else {
171 return Err(anyhow!("Call download_full() again to try over."));
173 };
174
175 session
176 .fetch_single_msg(context, &server_folder, server_uid, msg.rfc724_mid.clone())
177 .await?;
178 Ok(())
179}
180
181impl Session {
182 async fn fetch_single_msg(
187 &mut self,
188 context: &Context,
189 folder: &str,
190 uid: u32,
191 rfc724_mid: String,
192 ) -> Result<()> {
193 if uid == 0 {
194 bail!("Attempt to fetch UID 0");
195 }
196
197 let create = false;
198 let folder_exists = self
199 .select_with_uidvalidity(context, folder, create)
200 .await?;
201 ensure!(folder_exists, "No folder {folder}");
202
203 info!(context, "Downloading message {}/{} fully...", folder, uid);
205
206 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
207 uid_message_ids.insert(uid, rfc724_mid);
208 let (sender, receiver) = async_channel::unbounded();
209 self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, false, sender)
210 .await?;
211 if receiver.recv().await.is_err() {
212 bail!("Failed to fetch UID {uid}");
213 }
214 Ok(())
215 }
216}
217
218impl MimeMessage {
219 pub(crate) async fn create_stub_from_partial_download(
228 &mut self,
229 context: &Context,
230 org_bytes: u32,
231 error: Option<String>,
232 ) -> Result<()> {
233 let prefix = match error {
234 None => "",
235 Some(_) => "[❗] ",
236 };
237 let mut text = format!(
238 "{prefix}[{}]",
239 stock_str::partial_download_msg_body(context, org_bytes).await
240 );
241 if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
242 let until = stock_str::download_availability(
243 context,
244 time() + max(delete_server_after, MIN_DELETE_SERVER_AFTER),
245 )
246 .await;
247 text += format!(" [{until}]").as_str();
248 };
249
250 info!(context, "Partial download: {}", text);
251
252 self.do_add_single_part(Part {
253 typ: Viewtype::Text,
254 msg: text,
255 error,
256 ..Default::default()
257 });
258
259 Ok(())
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use num_traits::FromPrimitive;
266
267 use super::*;
268 use crate::chat::{get_chat_msgs, send_msg};
269 use crate::ephemeral::Timer;
270 use crate::message::delete_msgs;
271 use crate::receive_imf::receive_imf_from_inbox;
272 use crate::test_utils::{E2EE_INFO_MSGS, TestContext, TestContextManager};
273
274 #[test]
275 fn test_downloadstate_values() {
276 assert_eq!(DownloadState::Done, DownloadState::default());
278 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
279 assert_eq!(
280 DownloadState::Available,
281 DownloadState::from_i32(10).unwrap()
282 );
283 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
284 assert_eq!(
285 DownloadState::InProgress,
286 DownloadState::from_i32(1000).unwrap()
287 );
288 }
289
290 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
291 async fn test_download_limit() -> Result<()> {
292 let t = TestContext::new_alice().await;
293
294 assert_eq!(t.download_limit().await?, None);
295
296 t.set_config(Config::DownloadLimit, Some("200000")).await?;
297 assert_eq!(t.download_limit().await?, Some(200000));
298
299 t.set_config(Config::DownloadLimit, Some("20000")).await?;
300 assert_eq!(t.download_limit().await?, Some(MIN_DOWNLOAD_LIMIT));
301
302 t.set_config(Config::DownloadLimit, None).await?;
303 assert_eq!(t.download_limit().await?, None);
304
305 for val in &["0", "-1", "-100", "", "foo"] {
306 t.set_config(Config::DownloadLimit, Some(val)).await?;
307 assert_eq!(t.download_limit().await?, None);
308 }
309
310 Ok(())
311 }
312
313 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
314 async fn test_update_download_state() -> Result<()> {
315 let t = TestContext::new_alice().await;
316 let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
317
318 let mut msg = Message::new_text("Hi Bob".to_owned());
319 let msg_id = send_msg(&t, chat.id, &mut msg).await?;
320 let msg = Message::load_from_db(&t, msg_id).await?;
321 assert_eq!(msg.download_state(), DownloadState::Done);
322
323 for s in &[
324 DownloadState::Available,
325 DownloadState::InProgress,
326 DownloadState::Failure,
327 DownloadState::Done,
328 DownloadState::Done,
329 ] {
330 msg_id.update_download_state(&t, *s).await?;
331 let msg = Message::load_from_db(&t, msg_id).await?;
332 assert_eq!(msg.download_state(), *s);
333 }
334 t.sql
335 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
336 .await?;
337 msg_id
339 .update_download_state(&t, DownloadState::Done)
340 .await?;
341
342 Ok(())
343 }
344
345 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
346 async fn test_partial_receive_imf() -> Result<()> {
347 let t = TestContext::new_alice().await;
348
349 let header = "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\
350 From: bob@example.com\n\
351 To: alice@example.org\n\
352 Subject: foo\n\
353 Message-ID: <Mr.12345678901@example.com>\n\
354 Chat-Version: 1.0\n\
355 Date: Sun, 22 Mar 2020 22:37:57 +0000\
356 Content-Type: text/plain";
357
358 receive_imf_from_inbox(
359 &t,
360 "Mr.12345678901@example.com",
361 header.as_bytes(),
362 false,
363 Some(100000),
364 )
365 .await?;
366 let msg = t.get_last_msg().await;
367 assert_eq!(msg.download_state(), DownloadState::Available);
368 assert_eq!(msg.get_subject(), "foo");
369 assert!(
370 msg.get_text()
371 .contains(&stock_str::partial_download_msg_body(&t, 100000).await)
372 );
373
374 receive_imf_from_inbox(
375 &t,
376 "Mr.12345678901@example.com",
377 format!("{header}\n\n100k text...").as_bytes(),
378 false,
379 None,
380 )
381 .await?;
382 let msg = t.get_last_msg().await;
383 assert_eq!(msg.download_state(), DownloadState::Done);
384 assert_eq!(msg.get_subject(), "foo");
385 assert_eq!(msg.get_text(), "100k text...");
386
387 Ok(())
388 }
389
390 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
391 async fn test_partial_download_and_ephemeral() -> Result<()> {
392 let t = TestContext::new_alice().await;
393 let chat_id = t
394 .create_chat_with_contact("bob", "bob@example.org")
395 .await
396 .id;
397 chat_id
398 .set_ephemeral_timer(&t, Timer::Enabled { duration: 60 })
399 .await?;
400
401 receive_imf_from_inbox(
403 &t,
404 "first@example.org",
405 b"From: Bob <bob@example.org>\n\
406 To: Alice <alice@example.org>\n\
407 Chat-Version: 1.0\n\
408 Subject: subject\n\
409 Message-ID: <first@example.org>\n\
410 Date: Sun, 14 Nov 2021 00:10:00 +0000\
411 Content-Type: text/plain",
412 false,
413 Some(100000),
414 )
415 .await?;
416 assert_eq!(
417 chat_id.get_ephemeral_timer(&t).await?,
418 Timer::Enabled { duration: 60 }
419 );
420
421 Ok(())
422 }
423
424 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
425 async fn test_status_update_expands_to_nothing() -> Result<()> {
426 let alice = TestContext::new_alice().await;
427 let bob = TestContext::new_bob().await;
428 let chat_id = alice.create_chat(&bob).await.id;
429
430 let file = alice.get_blobdir().join("minimal.xdc");
431 tokio::fs::write(&file, include_bytes!("../test-data/webxdc/minimal.xdc")).await?;
432 let mut instance = Message::new(Viewtype::File);
433 instance.set_file_and_deduplicate(&alice, &file, None, None)?;
434 let _sent1 = alice.send_msg(chat_id, &mut instance).await;
435
436 alice
437 .send_webxdc_status_update(instance.id, r#"{"payload":7}"#)
438 .await?;
439 alice.flush_status_updates().await?;
440 let sent2 = alice.pop_sent_msg().await;
441 let sent2_rfc724_mid = sent2.load_from_db().await.rfc724_mid;
442
443 receive_imf_from_inbox(
445 &bob,
446 &sent2_rfc724_mid,
447 sent2.payload().as_bytes(),
448 false,
449 Some(sent2.payload().len() as u32),
450 )
451 .await?;
452 let msg = bob.get_last_msg().await;
453 let chat_id = msg.chat_id;
454 assert_eq!(
455 get_chat_msgs(&bob, chat_id).await?.len(),
456 E2EE_INFO_MSGS + 1
457 );
458 assert_eq!(msg.download_state(), DownloadState::Available);
459
460 receive_imf_from_inbox(
463 &bob,
464 &sent2_rfc724_mid,
465 sent2.payload().as_bytes(),
466 false,
467 None,
468 )
469 .await?;
470 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), E2EE_INFO_MSGS);
471 assert!(
472 Message::load_from_db_optional(&bob, msg.id)
473 .await?
474 .is_none()
475 );
476
477 Ok(())
478 }
479
480 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
481 async fn test_mdn_expands_to_nothing() -> Result<()> {
482 let bob = TestContext::new_bob().await;
483 let raw = b"Subject: Message opened\n\
484 Date: Mon, 10 Jan 2020 00:00:00 +0000\n\
485 Chat-Version: 1.0\n\
486 Message-ID: <bar@example.org>\n\
487 To: Alice <alice@example.org>\n\
488 From: Bob <bob@example.org>\n\
489 Content-Type: multipart/report; report-type=disposition-notification;\n\t\
490 boundary=\"kJBbU58X1xeWNHgBtTbMk80M5qnV4N\"\n\
491 \n\
492 \n\
493 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
494 Content-Type: text/plain; charset=utf-8\n\
495 \n\
496 bla\n\
497 \n\
498 \n\
499 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
500 Content-Type: message/disposition-notification\n\
501 \n\
502 Reporting-UA: Delta Chat 1.88.0\n\
503 Original-Recipient: rfc822;bob@example.org\n\
504 Final-Recipient: rfc822;bob@example.org\n\
505 Original-Message-ID: <foo@example.org>\n\
506 Disposition: manual-action/MDN-sent-automatically; displayed\n\
507 \n\
508 \n\
509 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N--\n\
510 ";
511
512 receive_imf_from_inbox(&bob, "bar@example.org", raw, false, Some(raw.len() as u32)).await?;
514 let msg = bob.get_last_msg().await;
515 let chat_id = msg.chat_id;
516 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 1);
517 assert_eq!(msg.download_state(), DownloadState::Available);
518
519 receive_imf_from_inbox(&bob, "bar@example.org", raw, false, None).await?;
522 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 0);
523 assert!(
524 Message::load_from_db_optional(&bob, msg.id)
525 .await?
526 .is_none()
527 );
528
529 Ok(())
530 }
531
532 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
536 async fn test_partial_download_trashed() -> Result<()> {
537 let mut tcm = TestContextManager::new();
538 let alice = &tcm.alice().await;
539
540 let imf_raw = b"From: Bob <bob@example.org>\n\
541 To: Alice <alice@example.org>\n\
542 Chat-Version: 1.0\n\
543 Subject: subject\n\
544 Message-ID: <first@example.org>\n\
545 Date: Sun, 14 Nov 2021 00:10:00 +0000\
546 Content-Type: text/plain";
547
548 let partial_received_msg =
550 receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, Some(100000))
551 .await?
552 .unwrap();
553 assert_eq!(partial_received_msg.msg_ids.len(), 1);
554
555 delete_msgs(alice, &[partial_received_msg.msg_ids[0]]).await?;
559
560 let full_received_msg =
562 receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, None).await?;
563
564 assert!(full_received_msg.is_none());
567
568 Ok(())
569 }
570}