1use std::cmp::max;
4use std::collections::BTreeMap;
5
6use anyhow::{Result, anyhow, bail, ensure};
7use deltachat_derive::{FromSql, ToSql};
8use serde::{Deserialize, Serialize};
9
10use crate::config::Config;
11use crate::context::Context;
12use crate::imap::session::Session;
13use crate::message::{Message, MsgId, Viewtype};
14use crate::mimeparser::{MimeMessage, Part};
15use crate::tools::time;
16use crate::{EventType, chatlist_events, stock_str};
17
18pub(crate) const MIN_DOWNLOAD_LIMIT: u32 = 163840;
25
26pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
31
32#[derive(
34 Debug,
35 Default,
36 Display,
37 Clone,
38 Copy,
39 PartialEq,
40 Eq,
41 FromPrimitive,
42 ToPrimitive,
43 FromSql,
44 ToSql,
45 Serialize,
46 Deserialize,
47)]
48#[repr(u32)]
49pub enum DownloadState {
50 #[default]
52 Done = 0,
53
54 Available = 10,
56
57 Failure = 20,
59
60 Undecipherable = 30,
62
63 InProgress = 1000,
65}
66
67impl Context {
68 pub(crate) async fn download_limit(&self) -> Result<Option<u32>> {
70 let download_limit = self.get_config_int(Config::DownloadLimit).await?;
71 if download_limit <= 0 {
72 Ok(None)
73 } else {
74 Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
75 }
76 }
77}
78
79impl MsgId {
80 pub async fn download_full(self, context: &Context) -> Result<()> {
82 let msg = Message::load_from_db(context, self).await?;
83 match msg.download_state() {
84 DownloadState::Done | DownloadState::Undecipherable => {
85 return Err(anyhow!("Nothing to download."));
86 }
87 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
88 DownloadState::Available | DownloadState::Failure => {
89 self.update_download_state(context, DownloadState::InProgress)
90 .await?;
91 context
92 .sql
93 .execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
94 .await?;
95 context.scheduler.interrupt_inbox().await;
96 }
97 }
98 Ok(())
99 }
100
101 pub(crate) async fn update_download_state(
103 self,
104 context: &Context,
105 download_state: DownloadState,
106 ) -> Result<()> {
107 if context
108 .sql
109 .execute(
110 "UPDATE msgs SET download_state=? WHERE id=?;",
111 (download_state, self),
112 )
113 .await?
114 == 0
115 {
116 return Ok(());
117 }
118 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
119 return Ok(());
120 };
121 context.emit_event(EventType::MsgsChanged {
122 chat_id: msg.chat_id,
123 msg_id: self,
124 });
125 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
126 Ok(())
127 }
128}
129
130impl Message {
131 pub fn download_state(&self) -> DownloadState {
133 self.download_state
134 }
135}
136
137pub(crate) async fn download_msg(
141 context: &Context,
142 msg_id: MsgId,
143 session: &mut Session,
144) -> Result<()> {
145 let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else {
146 return Ok(());
154 };
155
156 let row = context
157 .sql
158 .query_row_optional(
159 "SELECT uid, folder FROM imap WHERE rfc724_mid=? AND target!=''",
160 (&msg.rfc724_mid,),
161 |row| {
162 let server_uid: u32 = row.get(0)?;
163 let server_folder: String = row.get(1)?;
164 Ok((server_uid, server_folder))
165 },
166 )
167 .await?;
168
169 let Some((server_uid, server_folder)) = row else {
170 return Err(anyhow!("Call download_full() again to try over."));
172 };
173
174 session
175 .fetch_single_msg(context, &server_folder, server_uid, msg.rfc724_mid.clone())
176 .await?;
177 Ok(())
178}
179
180impl Session {
181 async fn fetch_single_msg(
186 &mut self,
187 context: &Context,
188 folder: &str,
189 uid: u32,
190 rfc724_mid: String,
191 ) -> Result<()> {
192 if uid == 0 {
193 bail!("Attempt to fetch UID 0");
194 }
195
196 let create = false;
197 let folder_exists = self
198 .select_with_uidvalidity(context, folder, create)
199 .await?;
200 ensure!(folder_exists, "No folder {folder}");
201
202 info!(context, "Downloading message {}/{} fully...", folder, uid);
204
205 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
206 uid_message_ids.insert(uid, rfc724_mid);
207 let (sender, receiver) = async_channel::unbounded();
208 self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, false, sender)
209 .await?;
210 if receiver.recv().await.is_err() {
211 bail!("Failed to fetch UID {uid}");
212 }
213 Ok(())
214 }
215}
216
217impl MimeMessage {
218 pub(crate) async fn create_stub_from_partial_download(
225 &mut self,
226 context: &Context,
227 org_bytes: u32,
228 ) -> Result<()> {
229 let mut text = format!(
230 "[{}]",
231 stock_str::partial_download_msg_body(context, org_bytes).await
232 );
233 if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
234 let until = stock_str::download_availability(
235 context,
236 time() + max(delete_server_after, MIN_DELETE_SERVER_AFTER),
237 )
238 .await;
239 text += format!(" [{until}]").as_str();
240 };
241
242 info!(context, "Partial download: {}", text);
243
244 self.do_add_single_part(Part {
245 typ: Viewtype::Text,
246 msg: text,
247 ..Default::default()
248 });
249
250 Ok(())
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use num_traits::FromPrimitive;
257
258 use super::*;
259 use crate::chat::{get_chat_msgs, send_msg};
260 use crate::ephemeral::Timer;
261 use crate::message::delete_msgs;
262 use crate::receive_imf::receive_imf_from_inbox;
263 use crate::test_utils::{E2EE_INFO_MSGS, TestContext, TestContextManager};
264
265 #[test]
266 fn test_downloadstate_values() {
267 assert_eq!(DownloadState::Done, DownloadState::default());
269 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
270 assert_eq!(
271 DownloadState::Available,
272 DownloadState::from_i32(10).unwrap()
273 );
274 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
275 assert_eq!(
276 DownloadState::InProgress,
277 DownloadState::from_i32(1000).unwrap()
278 );
279 }
280
281 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
282 async fn test_download_limit() -> Result<()> {
283 let t = TestContext::new_alice().await;
284
285 assert_eq!(t.download_limit().await?, None);
286
287 t.set_config(Config::DownloadLimit, Some("200000")).await?;
288 assert_eq!(t.download_limit().await?, Some(200000));
289
290 t.set_config(Config::DownloadLimit, Some("20000")).await?;
291 assert_eq!(t.download_limit().await?, Some(MIN_DOWNLOAD_LIMIT));
292
293 t.set_config(Config::DownloadLimit, None).await?;
294 assert_eq!(t.download_limit().await?, None);
295
296 for val in &["0", "-1", "-100", "", "foo"] {
297 t.set_config(Config::DownloadLimit, Some(val)).await?;
298 assert_eq!(t.download_limit().await?, None);
299 }
300
301 Ok(())
302 }
303
304 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
305 async fn test_update_download_state() -> Result<()> {
306 let t = TestContext::new_alice().await;
307 let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
308
309 let mut msg = Message::new_text("Hi Bob".to_owned());
310 let msg_id = send_msg(&t, chat.id, &mut msg).await?;
311 let msg = Message::load_from_db(&t, msg_id).await?;
312 assert_eq!(msg.download_state(), DownloadState::Done);
313
314 for s in &[
315 DownloadState::Available,
316 DownloadState::InProgress,
317 DownloadState::Failure,
318 DownloadState::Done,
319 DownloadState::Done,
320 ] {
321 msg_id.update_download_state(&t, *s).await?;
322 let msg = Message::load_from_db(&t, msg_id).await?;
323 assert_eq!(msg.download_state(), *s);
324 }
325 t.sql
326 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
327 .await?;
328 msg_id
330 .update_download_state(&t, DownloadState::Done)
331 .await?;
332
333 Ok(())
334 }
335
336 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
337 async fn test_partial_receive_imf() -> Result<()> {
338 let t = TestContext::new_alice().await;
339
340 let header = "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\
341 From: bob@example.com\n\
342 To: alice@example.org\n\
343 Subject: foo\n\
344 Message-ID: <Mr.12345678901@example.com>\n\
345 Chat-Version: 1.0\n\
346 Date: Sun, 22 Mar 2020 22:37:57 +0000\
347 Content-Type: text/plain";
348
349 receive_imf_from_inbox(
350 &t,
351 "Mr.12345678901@example.com",
352 header.as_bytes(),
353 false,
354 Some(100000),
355 )
356 .await?;
357 let msg = t.get_last_msg().await;
358 assert_eq!(msg.download_state(), DownloadState::Available);
359 assert_eq!(msg.get_subject(), "foo");
360 assert!(
361 msg.get_text()
362 .contains(&stock_str::partial_download_msg_body(&t, 100000).await)
363 );
364
365 receive_imf_from_inbox(
366 &t,
367 "Mr.12345678901@example.com",
368 format!("{header}\n\n100k text...").as_bytes(),
369 false,
370 None,
371 )
372 .await?;
373 let msg = t.get_last_msg().await;
374 assert_eq!(msg.download_state(), DownloadState::Done);
375 assert_eq!(msg.get_subject(), "foo");
376 assert_eq!(msg.get_text(), "100k text...");
377
378 Ok(())
379 }
380
381 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
382 async fn test_partial_download_and_ephemeral() -> Result<()> {
383 let t = TestContext::new_alice().await;
384 let chat_id = t
385 .create_chat_with_contact("bob", "bob@example.org")
386 .await
387 .id;
388 chat_id
389 .set_ephemeral_timer(&t, Timer::Enabled { duration: 60 })
390 .await?;
391
392 receive_imf_from_inbox(
394 &t,
395 "first@example.org",
396 b"From: Bob <bob@example.org>\n\
397 To: Alice <alice@example.org>\n\
398 Chat-Version: 1.0\n\
399 Subject: subject\n\
400 Message-ID: <first@example.org>\n\
401 Date: Sun, 14 Nov 2021 00:10:00 +0000\
402 Content-Type: text/plain",
403 false,
404 Some(100000),
405 )
406 .await?;
407 assert_eq!(
408 chat_id.get_ephemeral_timer(&t).await?,
409 Timer::Enabled { duration: 60 }
410 );
411
412 Ok(())
413 }
414
415 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
416 async fn test_status_update_expands_to_nothing() -> Result<()> {
417 let alice = TestContext::new_alice().await;
418 let bob = TestContext::new_bob().await;
419 let chat_id = alice.create_chat(&bob).await.id;
420
421 let file = alice.get_blobdir().join("minimal.xdc");
422 tokio::fs::write(&file, include_bytes!("../test-data/webxdc/minimal.xdc")).await?;
423 let mut instance = Message::new(Viewtype::File);
424 instance.set_file_and_deduplicate(&alice, &file, None, None)?;
425 let _sent1 = alice.send_msg(chat_id, &mut instance).await;
426
427 alice
428 .send_webxdc_status_update(instance.id, r#"{"payload":7}"#)
429 .await?;
430 alice.flush_status_updates().await?;
431 let sent2 = alice.pop_sent_msg().await;
432 let sent2_rfc724_mid = sent2.load_from_db().await.rfc724_mid;
433
434 receive_imf_from_inbox(
436 &bob,
437 &sent2_rfc724_mid,
438 sent2.payload().as_bytes(),
439 false,
440 Some(sent2.payload().len() as u32),
441 )
442 .await?;
443 let msg = bob.get_last_msg().await;
444 let chat_id = msg.chat_id;
445 assert_eq!(
446 get_chat_msgs(&bob, chat_id).await?.len(),
447 E2EE_INFO_MSGS + 1
448 );
449 assert_eq!(msg.download_state(), DownloadState::Available);
450
451 receive_imf_from_inbox(
454 &bob,
455 &sent2_rfc724_mid,
456 sent2.payload().as_bytes(),
457 false,
458 None,
459 )
460 .await?;
461 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), E2EE_INFO_MSGS);
462 assert!(
463 Message::load_from_db_optional(&bob, msg.id)
464 .await?
465 .is_none()
466 );
467
468 Ok(())
469 }
470
471 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
472 async fn test_mdn_expands_to_nothing() -> Result<()> {
473 let bob = TestContext::new_bob().await;
474 let raw = b"Subject: Message opened\n\
475 Date: Mon, 10 Jan 2020 00:00:00 +0000\n\
476 Chat-Version: 1.0\n\
477 Message-ID: <bar@example.org>\n\
478 To: Alice <alice@example.org>\n\
479 From: Bob <bob@example.org>\n\
480 Content-Type: multipart/report; report-type=disposition-notification;\n\t\
481 boundary=\"kJBbU58X1xeWNHgBtTbMk80M5qnV4N\"\n\
482 \n\
483 \n\
484 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
485 Content-Type: text/plain; charset=utf-8\n\
486 \n\
487 bla\n\
488 \n\
489 \n\
490 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
491 Content-Type: message/disposition-notification\n\
492 \n\
493 Reporting-UA: Delta Chat 1.88.0\n\
494 Original-Recipient: rfc822;bob@example.org\n\
495 Final-Recipient: rfc822;bob@example.org\n\
496 Original-Message-ID: <foo@example.org>\n\
497 Disposition: manual-action/MDN-sent-automatically; displayed\n\
498 \n\
499 \n\
500 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N--\n\
501 ";
502
503 receive_imf_from_inbox(&bob, "bar@example.org", raw, false, Some(raw.len() as u32)).await?;
505 let msg = bob.get_last_msg().await;
506 let chat_id = msg.chat_id;
507 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 1);
508 assert_eq!(msg.download_state(), DownloadState::Available);
509
510 receive_imf_from_inbox(&bob, "bar@example.org", raw, false, None).await?;
513 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 0);
514 assert!(
515 Message::load_from_db_optional(&bob, msg.id)
516 .await?
517 .is_none()
518 );
519
520 Ok(())
521 }
522
523 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
527 async fn test_partial_download_trashed() -> Result<()> {
528 let mut tcm = TestContextManager::new();
529 let alice = &tcm.alice().await;
530
531 let imf_raw = b"From: Bob <bob@example.org>\n\
532 To: Alice <alice@example.org>\n\
533 Chat-Version: 1.0\n\
534 Subject: subject\n\
535 Message-ID: <first@example.org>\n\
536 Date: Sun, 14 Nov 2021 00:10:00 +0000\
537 Content-Type: text/plain";
538
539 let partial_received_msg =
541 receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, Some(100000))
542 .await?
543 .unwrap();
544 assert_eq!(partial_received_msg.msg_ids.len(), 1);
545
546 delete_msgs(alice, &[partial_received_msg.msg_ids[0]]).await?;
550
551 let full_received_msg =
553 receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, None).await?;
554
555 assert!(full_received_msg.is_none());
558
559 Ok(())
560 }
561}