1use std::cmp::max;
4use std::collections::BTreeMap;
5
6use anyhow::{Result, anyhow, bail, ensure};
7use deltachat_derive::{FromSql, ToSql};
8use serde::{Deserialize, Serialize};
9
10use crate::config::Config;
11use crate::context::Context;
12use crate::imap::session::Session;
13use crate::log::info;
14use crate::message::{Message, MsgId, Viewtype};
15use crate::mimeparser::{MimeMessage, Part};
16use crate::tools::time;
17use crate::{EventType, chatlist_events, stock_str};
18
19pub(crate) const MIN_DOWNLOAD_LIMIT: u32 = 163840;
26
27pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
32
33#[derive(
35 Debug,
36 Default,
37 Display,
38 Clone,
39 Copy,
40 PartialEq,
41 Eq,
42 FromPrimitive,
43 ToPrimitive,
44 FromSql,
45 ToSql,
46 Serialize,
47 Deserialize,
48)]
49#[repr(u32)]
50pub enum DownloadState {
51 #[default]
53 Done = 0,
54
55 Available = 10,
57
58 Failure = 20,
60
61 Undecipherable = 30,
63
64 InProgress = 1000,
66}
67
68impl Context {
69 pub(crate) async fn download_limit(&self) -> Result<Option<u32>> {
71 let download_limit = self.get_config_int(Config::DownloadLimit).await?;
72 if download_limit <= 0 {
73 Ok(None)
74 } else {
75 Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
76 }
77 }
78}
79
80impl MsgId {
81 pub async fn download_full(self, context: &Context) -> Result<()> {
83 let msg = Message::load_from_db(context, self).await?;
84 match msg.download_state() {
85 DownloadState::Done | DownloadState::Undecipherable => {
86 return Err(anyhow!("Nothing to download."));
87 }
88 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
89 DownloadState::Available | DownloadState::Failure => {
90 self.update_download_state(context, DownloadState::InProgress)
91 .await?;
92 context
93 .sql
94 .execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
95 .await?;
96 context.scheduler.interrupt_inbox().await;
97 }
98 }
99 Ok(())
100 }
101
102 pub(crate) async fn update_download_state(
104 self,
105 context: &Context,
106 download_state: DownloadState,
107 ) -> Result<()> {
108 if context
109 .sql
110 .execute(
111 "UPDATE msgs SET download_state=? WHERE id=?;",
112 (download_state, self),
113 )
114 .await?
115 == 0
116 {
117 return Ok(());
118 }
119 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
120 return Ok(());
121 };
122 context.emit_event(EventType::MsgsChanged {
123 chat_id: msg.chat_id,
124 msg_id: self,
125 });
126 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
127 Ok(())
128 }
129}
130
131impl Message {
132 pub fn download_state(&self) -> DownloadState {
134 self.download_state
135 }
136}
137
138pub(crate) async fn download_msg(
142 context: &Context,
143 msg_id: MsgId,
144 session: &mut Session,
145) -> Result<()> {
146 let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else {
147 return Ok(());
155 };
156
157 let row = context
158 .sql
159 .query_row_optional(
160 "SELECT uid, folder, uidvalidity FROM imap WHERE rfc724_mid=? AND target!=''",
161 (&msg.rfc724_mid,),
162 |row| {
163 let server_uid: u32 = row.get(0)?;
164 let server_folder: String = row.get(1)?;
165 let uidvalidity: u32 = row.get(2)?;
166 Ok((server_uid, server_folder, uidvalidity))
167 },
168 )
169 .await?;
170
171 let Some((server_uid, server_folder, uidvalidity)) = row else {
172 return Err(anyhow!("Call download_full() again to try over."));
174 };
175
176 session
177 .fetch_single_msg(
178 context,
179 &server_folder,
180 uidvalidity,
181 server_uid,
182 msg.rfc724_mid.clone(),
183 )
184 .await?;
185 Ok(())
186}
187
188impl Session {
189 async fn fetch_single_msg(
194 &mut self,
195 context: &Context,
196 folder: &str,
197 uidvalidity: u32,
198 uid: u32,
199 rfc724_mid: String,
200 ) -> Result<()> {
201 if uid == 0 {
202 bail!("Attempt to fetch UID 0");
203 }
204
205 let create = false;
206 let folder_exists = self
207 .select_with_uidvalidity(context, folder, create)
208 .await?;
209 ensure!(folder_exists, "No folder {folder}");
210
211 info!(context, "Downloading message {}/{} fully...", folder, uid);
213
214 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
215 uid_message_ids.insert(uid, rfc724_mid);
216 let (sender, receiver) = async_channel::unbounded();
217 self.fetch_many_msgs(
218 context,
219 folder,
220 uidvalidity,
221 vec![uid],
222 &uid_message_ids,
223 false,
224 sender,
225 )
226 .await?;
227 if receiver.recv().await.is_err() {
228 bail!("Failed to fetch UID {uid}");
229 }
230 Ok(())
231 }
232}
233
234impl MimeMessage {
235 pub(crate) async fn create_stub_from_partial_download(
244 &mut self,
245 context: &Context,
246 org_bytes: u32,
247 error: Option<String>,
248 ) -> Result<()> {
249 let prefix = match error {
250 None => "",
251 Some(_) => "[❗] ",
252 };
253 let mut text = format!(
254 "{prefix}[{}]",
255 stock_str::partial_download_msg_body(context, org_bytes).await
256 );
257 if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
258 let until = stock_str::download_availability(
259 context,
260 time() + max(delete_server_after, MIN_DELETE_SERVER_AFTER),
261 )
262 .await;
263 text += format!(" [{until}]").as_str();
264 };
265
266 info!(context, "Partial download: {}", text);
267
268 self.do_add_single_part(Part {
269 typ: Viewtype::Text,
270 msg: text,
271 error,
272 ..Default::default()
273 });
274
275 Ok(())
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use num_traits::FromPrimitive;
282
283 use super::*;
284 use crate::chat::{get_chat_msgs, send_msg};
285 use crate::ephemeral::Timer;
286 use crate::message::delete_msgs;
287 use crate::receive_imf::receive_imf_from_inbox;
288 use crate::test_utils::{E2EE_INFO_MSGS, TestContext, TestContextManager};
289
290 #[test]
291 fn test_downloadstate_values() {
292 assert_eq!(DownloadState::Done, DownloadState::default());
294 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
295 assert_eq!(
296 DownloadState::Available,
297 DownloadState::from_i32(10).unwrap()
298 );
299 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
300 assert_eq!(
301 DownloadState::InProgress,
302 DownloadState::from_i32(1000).unwrap()
303 );
304 }
305
306 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
307 async fn test_download_limit() -> Result<()> {
308 let t = TestContext::new_alice().await;
309
310 assert_eq!(t.download_limit().await?, None);
311
312 t.set_config(Config::DownloadLimit, Some("200000")).await?;
313 assert_eq!(t.download_limit().await?, Some(200000));
314
315 t.set_config(Config::DownloadLimit, Some("20000")).await?;
316 assert_eq!(t.download_limit().await?, Some(MIN_DOWNLOAD_LIMIT));
317
318 t.set_config(Config::DownloadLimit, None).await?;
319 assert_eq!(t.download_limit().await?, None);
320
321 for val in &["0", "-1", "-100", "", "foo"] {
322 t.set_config(Config::DownloadLimit, Some(val)).await?;
323 assert_eq!(t.download_limit().await?, None);
324 }
325
326 Ok(())
327 }
328
329 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
330 async fn test_update_download_state() -> Result<()> {
331 let t = TestContext::new_alice().await;
332 let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
333
334 let mut msg = Message::new_text("Hi Bob".to_owned());
335 let msg_id = send_msg(&t, chat.id, &mut msg).await?;
336 let msg = Message::load_from_db(&t, msg_id).await?;
337 assert_eq!(msg.download_state(), DownloadState::Done);
338
339 for s in &[
340 DownloadState::Available,
341 DownloadState::InProgress,
342 DownloadState::Failure,
343 DownloadState::Done,
344 DownloadState::Done,
345 ] {
346 msg_id.update_download_state(&t, *s).await?;
347 let msg = Message::load_from_db(&t, msg_id).await?;
348 assert_eq!(msg.download_state(), *s);
349 }
350 t.sql
351 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
352 .await?;
353 msg_id
355 .update_download_state(&t, DownloadState::Done)
356 .await?;
357
358 Ok(())
359 }
360
361 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
362 async fn test_partial_receive_imf() -> Result<()> {
363 let t = TestContext::new_alice().await;
364
365 let header = "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\
366 From: bob@example.com\n\
367 To: alice@example.org\n\
368 Subject: foo\n\
369 Message-ID: <Mr.12345678901@example.com>\n\
370 Chat-Version: 1.0\n\
371 Date: Sun, 22 Mar 2020 22:37:57 +0000\
372 Content-Type: text/plain";
373
374 receive_imf_from_inbox(
375 &t,
376 "Mr.12345678901@example.com",
377 header.as_bytes(),
378 false,
379 Some(100000),
380 )
381 .await?;
382 let msg = t.get_last_msg().await;
383 assert_eq!(msg.download_state(), DownloadState::Available);
384 assert_eq!(msg.get_subject(), "foo");
385 assert!(
386 msg.get_text()
387 .contains(&stock_str::partial_download_msg_body(&t, 100000).await)
388 );
389
390 receive_imf_from_inbox(
391 &t,
392 "Mr.12345678901@example.com",
393 format!("{header}\n\n100k text...").as_bytes(),
394 false,
395 None,
396 )
397 .await?;
398 let msg = t.get_last_msg().await;
399 assert_eq!(msg.download_state(), DownloadState::Done);
400 assert_eq!(msg.get_subject(), "foo");
401 assert_eq!(msg.get_text(), "100k text...");
402
403 Ok(())
404 }
405
406 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
407 async fn test_partial_download_and_ephemeral() -> Result<()> {
408 let t = TestContext::new_alice().await;
409 let chat_id = t
410 .create_chat_with_contact("bob", "bob@example.org")
411 .await
412 .id;
413 chat_id
414 .set_ephemeral_timer(&t, Timer::Enabled { duration: 60 })
415 .await?;
416
417 receive_imf_from_inbox(
419 &t,
420 "first@example.org",
421 b"From: Bob <bob@example.org>\n\
422 To: Alice <alice@example.org>\n\
423 Chat-Version: 1.0\n\
424 Subject: subject\n\
425 Message-ID: <first@example.org>\n\
426 Date: Sun, 14 Nov 2021 00:10:00 +0000\
427 Content-Type: text/plain",
428 false,
429 Some(100000),
430 )
431 .await?;
432 assert_eq!(
433 chat_id.get_ephemeral_timer(&t).await?,
434 Timer::Enabled { duration: 60 }
435 );
436
437 Ok(())
438 }
439
440 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
441 async fn test_status_update_expands_to_nothing() -> Result<()> {
442 let alice = TestContext::new_alice().await;
443 let bob = TestContext::new_bob().await;
444 let chat_id = alice.create_chat(&bob).await.id;
445
446 let file = alice.get_blobdir().join("minimal.xdc");
447 tokio::fs::write(&file, include_bytes!("../test-data/webxdc/minimal.xdc")).await?;
448 let mut instance = Message::new(Viewtype::File);
449 instance.set_file_and_deduplicate(&alice, &file, None, None)?;
450 let _sent1 = alice.send_msg(chat_id, &mut instance).await;
451
452 alice
453 .send_webxdc_status_update(instance.id, r#"{"payload":7}"#)
454 .await?;
455 alice.flush_status_updates().await?;
456 let sent2 = alice.pop_sent_msg().await;
457 let sent2_rfc724_mid = sent2.load_from_db().await.rfc724_mid;
458
459 receive_imf_from_inbox(
461 &bob,
462 &sent2_rfc724_mid,
463 sent2.payload().as_bytes(),
464 false,
465 Some(sent2.payload().len() as u32),
466 )
467 .await?;
468 let msg = bob.get_last_msg().await;
469 let chat_id = msg.chat_id;
470 assert_eq!(
471 get_chat_msgs(&bob, chat_id).await?.len(),
472 E2EE_INFO_MSGS + 1
473 );
474 assert_eq!(msg.download_state(), DownloadState::Available);
475
476 receive_imf_from_inbox(
479 &bob,
480 &sent2_rfc724_mid,
481 sent2.payload().as_bytes(),
482 false,
483 None,
484 )
485 .await?;
486 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), E2EE_INFO_MSGS);
487 assert!(
488 Message::load_from_db_optional(&bob, msg.id)
489 .await?
490 .is_none()
491 );
492
493 Ok(())
494 }
495
496 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
497 async fn test_mdn_expands_to_nothing() -> Result<()> {
498 let bob = TestContext::new_bob().await;
499 let raw = b"Subject: Message opened\n\
500 Date: Mon, 10 Jan 2020 00:00:00 +0000\n\
501 Chat-Version: 1.0\n\
502 Message-ID: <bar@example.org>\n\
503 To: Alice <alice@example.org>\n\
504 From: Bob <bob@example.org>\n\
505 Content-Type: multipart/report; report-type=disposition-notification;\n\t\
506 boundary=\"kJBbU58X1xeWNHgBtTbMk80M5qnV4N\"\n\
507 \n\
508 \n\
509 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
510 Content-Type: text/plain; charset=utf-8\n\
511 \n\
512 bla\n\
513 \n\
514 \n\
515 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
516 Content-Type: message/disposition-notification\n\
517 \n\
518 Reporting-UA: Delta Chat 1.88.0\n\
519 Original-Recipient: rfc822;bob@example.org\n\
520 Final-Recipient: rfc822;bob@example.org\n\
521 Original-Message-ID: <foo@example.org>\n\
522 Disposition: manual-action/MDN-sent-automatically; displayed\n\
523 \n\
524 \n\
525 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N--\n\
526 ";
527
528 receive_imf_from_inbox(&bob, "bar@example.org", raw, false, Some(raw.len() as u32)).await?;
530 let msg = bob.get_last_msg().await;
531 let chat_id = msg.chat_id;
532 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 1);
533 assert_eq!(msg.download_state(), DownloadState::Available);
534
535 receive_imf_from_inbox(&bob, "bar@example.org", raw, false, None).await?;
538 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 0);
539 assert!(
540 Message::load_from_db_optional(&bob, msg.id)
541 .await?
542 .is_none()
543 );
544
545 Ok(())
546 }
547
548 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
552 async fn test_partial_download_trashed() -> Result<()> {
553 let mut tcm = TestContextManager::new();
554 let alice = &tcm.alice().await;
555
556 let imf_raw = b"From: Bob <bob@example.org>\n\
557 To: Alice <alice@example.org>\n\
558 Chat-Version: 1.0\n\
559 Subject: subject\n\
560 Message-ID: <first@example.org>\n\
561 Date: Sun, 14 Nov 2021 00:10:00 +0000\
562 Content-Type: text/plain";
563
564 let partial_received_msg =
566 receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, Some(100000))
567 .await?
568 .unwrap();
569 assert_eq!(partial_received_msg.msg_ids.len(), 1);
570
571 delete_msgs(alice, &[partial_received_msg.msg_ids[0]]).await?;
575
576 let full_received_msg =
578 receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, None).await?;
579
580 assert!(full_received_msg.is_none());
583
584 Ok(())
585 }
586}