1use std::cmp::max;
4use std::collections::BTreeMap;
5
6use anyhow::{Result, anyhow, bail, ensure};
7use deltachat_derive::{FromSql, ToSql};
8use serde::{Deserialize, Serialize};
9
10use crate::config::Config;
11use crate::context::Context;
12use crate::imap::session::Session;
13use crate::message::{Message, MsgId, Viewtype};
14use crate::mimeparser::{MimeMessage, Part};
15use crate::tools::time;
16use crate::{EventType, chatlist_events, stock_str};
17
18pub(crate) const MIN_DOWNLOAD_LIMIT: u32 = 163840;
25
26pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
31
32#[derive(
34 Debug,
35 Default,
36 Display,
37 Clone,
38 Copy,
39 PartialEq,
40 Eq,
41 FromPrimitive,
42 ToPrimitive,
43 FromSql,
44 ToSql,
45 Serialize,
46 Deserialize,
47)]
48#[repr(u32)]
49pub enum DownloadState {
50 #[default]
52 Done = 0,
53
54 Available = 10,
56
57 Failure = 20,
59
60 Undecipherable = 30,
62
63 InProgress = 1000,
65}
66
67impl Context {
68 pub(crate) async fn download_limit(&self) -> Result<Option<u32>> {
70 let download_limit = self.get_config_int(Config::DownloadLimit).await?;
71 if download_limit <= 0 {
72 Ok(None)
73 } else {
74 Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
75 }
76 }
77}
78
79impl MsgId {
80 pub async fn download_full(self, context: &Context) -> Result<()> {
82 let msg = Message::load_from_db(context, self).await?;
83 match msg.download_state() {
84 DownloadState::Done | DownloadState::Undecipherable => {
85 return Err(anyhow!("Nothing to download."));
86 }
87 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
88 DownloadState::Available | DownloadState::Failure => {
89 self.update_download_state(context, DownloadState::InProgress)
90 .await?;
91 context
92 .sql
93 .execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
94 .await?;
95 context.scheduler.interrupt_inbox().await;
96 }
97 }
98 Ok(())
99 }
100
101 pub(crate) async fn update_download_state(
103 self,
104 context: &Context,
105 download_state: DownloadState,
106 ) -> Result<()> {
107 if context
108 .sql
109 .execute(
110 "UPDATE msgs SET download_state=? WHERE id=?;",
111 (download_state, self),
112 )
113 .await?
114 == 0
115 {
116 return Ok(());
117 }
118 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
119 return Ok(());
120 };
121 context.emit_event(EventType::MsgsChanged {
122 chat_id: msg.chat_id,
123 msg_id: self,
124 });
125 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
126 Ok(())
127 }
128}
129
130impl Message {
131 pub fn download_state(&self) -> DownloadState {
133 self.download_state
134 }
135}
136
137pub(crate) async fn download_msg(
141 context: &Context,
142 msg_id: MsgId,
143 session: &mut Session,
144) -> Result<()> {
145 let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else {
146 return Ok(());
154 };
155
156 let transport_id = session.transport_id();
157 let row = context
158 .sql
159 .query_row_optional(
160 "SELECT uid, folder FROM imap
161 WHERE rfc724_mid=?
162 AND transport_id=?
163 AND target!=''",
164 (&msg.rfc724_mid, transport_id),
165 |row| {
166 let server_uid: u32 = row.get(0)?;
167 let server_folder: String = row.get(1)?;
168 Ok((server_uid, server_folder))
169 },
170 )
171 .await?;
172
173 let Some((server_uid, server_folder)) = row else {
174 return Err(anyhow!("Call download_full() again to try over."));
176 };
177
178 session
179 .fetch_single_msg(context, &server_folder, server_uid, msg.rfc724_mid.clone())
180 .await?;
181 Ok(())
182}
183
184impl Session {
185 async fn fetch_single_msg(
190 &mut self,
191 context: &Context,
192 folder: &str,
193 uid: u32,
194 rfc724_mid: String,
195 ) -> Result<()> {
196 if uid == 0 {
197 bail!("Attempt to fetch UID 0");
198 }
199
200 let create = false;
201 let folder_exists = self
202 .select_with_uidvalidity(context, folder, create)
203 .await?;
204 ensure!(folder_exists, "No folder {folder}");
205
206 info!(context, "Downloading message {}/{} fully...", folder, uid);
208
209 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
210 uid_message_ids.insert(uid, rfc724_mid);
211 let (sender, receiver) = async_channel::unbounded();
212 self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, false, sender)
213 .await?;
214 if receiver.recv().await.is_err() {
215 bail!("Failed to fetch UID {uid}");
216 }
217 Ok(())
218 }
219}
220
221impl MimeMessage {
222 pub(crate) async fn create_stub_from_partial_download(
229 &mut self,
230 context: &Context,
231 org_bytes: u32,
232 ) -> Result<()> {
233 let mut text = format!(
234 "[{}]",
235 stock_str::partial_download_msg_body(context, org_bytes).await
236 );
237 if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
238 let until = stock_str::download_availability(
239 context,
240 time() + max(delete_server_after, MIN_DELETE_SERVER_AFTER),
241 )
242 .await;
243 text += format!(" [{until}]").as_str();
244 };
245
246 info!(context, "Partial download: {}", text);
247
248 self.do_add_single_part(Part {
249 typ: Viewtype::Text,
250 msg: text,
251 ..Default::default()
252 });
253
254 Ok(())
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use num_traits::FromPrimitive;
261
262 use super::*;
263 use crate::chat::{get_chat_msgs, send_msg};
264 use crate::ephemeral::Timer;
265 use crate::message::delete_msgs;
266 use crate::receive_imf::receive_imf_from_inbox;
267 use crate::test_utils::{E2EE_INFO_MSGS, TestContext, TestContextManager};
268
269 #[test]
270 fn test_downloadstate_values() {
271 assert_eq!(DownloadState::Done, DownloadState::default());
273 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
274 assert_eq!(
275 DownloadState::Available,
276 DownloadState::from_i32(10).unwrap()
277 );
278 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
279 assert_eq!(
280 DownloadState::InProgress,
281 DownloadState::from_i32(1000).unwrap()
282 );
283 }
284
285 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
286 async fn test_download_limit() -> Result<()> {
287 let t = TestContext::new_alice().await;
288
289 assert_eq!(t.download_limit().await?, None);
290
291 t.set_config(Config::DownloadLimit, Some("200000")).await?;
292 assert_eq!(t.download_limit().await?, Some(200000));
293
294 t.set_config(Config::DownloadLimit, Some("20000")).await?;
295 assert_eq!(t.download_limit().await?, Some(MIN_DOWNLOAD_LIMIT));
296
297 t.set_config(Config::DownloadLimit, None).await?;
298 assert_eq!(t.download_limit().await?, None);
299
300 for val in &["0", "-1", "-100", "", "foo"] {
301 t.set_config(Config::DownloadLimit, Some(val)).await?;
302 assert_eq!(t.download_limit().await?, None);
303 }
304
305 Ok(())
306 }
307
308 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
309 async fn test_update_download_state() -> Result<()> {
310 let t = TestContext::new_alice().await;
311 let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
312
313 let mut msg = Message::new_text("Hi Bob".to_owned());
314 let msg_id = send_msg(&t, chat.id, &mut msg).await?;
315 let msg = Message::load_from_db(&t, msg_id).await?;
316 assert_eq!(msg.download_state(), DownloadState::Done);
317
318 for s in &[
319 DownloadState::Available,
320 DownloadState::InProgress,
321 DownloadState::Failure,
322 DownloadState::Done,
323 DownloadState::Done,
324 ] {
325 msg_id.update_download_state(&t, *s).await?;
326 let msg = Message::load_from_db(&t, msg_id).await?;
327 assert_eq!(msg.download_state(), *s);
328 }
329 t.sql
330 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
331 .await?;
332 msg_id
334 .update_download_state(&t, DownloadState::Done)
335 .await?;
336
337 Ok(())
338 }
339
340 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
341 async fn test_partial_receive_imf() -> Result<()> {
342 let t = TestContext::new_alice().await;
343
344 let header = "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\
345 From: bob@example.com\n\
346 To: alice@example.org\n\
347 Subject: foo\n\
348 Message-ID: <Mr.12345678901@example.com>\n\
349 Chat-Version: 1.0\n\
350 Date: Sun, 22 Mar 2020 22:37:57 +0000\
351 Content-Type: text/plain";
352
353 receive_imf_from_inbox(
354 &t,
355 "Mr.12345678901@example.com",
356 header.as_bytes(),
357 false,
358 Some(100000),
359 )
360 .await?;
361 let msg = t.get_last_msg().await;
362 assert_eq!(msg.download_state(), DownloadState::Available);
363 assert_eq!(msg.get_subject(), "foo");
364 assert!(
365 msg.get_text()
366 .contains(&stock_str::partial_download_msg_body(&t, 100000).await)
367 );
368
369 receive_imf_from_inbox(
370 &t,
371 "Mr.12345678901@example.com",
372 format!("{header}\n\n100k text...").as_bytes(),
373 false,
374 None,
375 )
376 .await?;
377 let msg = t.get_last_msg().await;
378 assert_eq!(msg.download_state(), DownloadState::Done);
379 assert_eq!(msg.get_subject(), "foo");
380 assert_eq!(msg.get_text(), "100k text...");
381
382 Ok(())
383 }
384
385 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
386 async fn test_partial_download_and_ephemeral() -> Result<()> {
387 let t = TestContext::new_alice().await;
388 let chat_id = t
389 .create_chat_with_contact("bob", "bob@example.org")
390 .await
391 .id;
392 chat_id
393 .set_ephemeral_timer(&t, Timer::Enabled { duration: 60 })
394 .await?;
395
396 receive_imf_from_inbox(
398 &t,
399 "first@example.org",
400 b"From: Bob <bob@example.org>\n\
401 To: Alice <alice@example.org>\n\
402 Chat-Version: 1.0\n\
403 Subject: subject\n\
404 Message-ID: <first@example.org>\n\
405 Date: Sun, 14 Nov 2021 00:10:00 +0000\
406 Content-Type: text/plain",
407 false,
408 Some(100000),
409 )
410 .await?;
411 assert_eq!(
412 chat_id.get_ephemeral_timer(&t).await?,
413 Timer::Enabled { duration: 60 }
414 );
415
416 Ok(())
417 }
418
419 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
420 async fn test_status_update_expands_to_nothing() -> Result<()> {
421 let alice = TestContext::new_alice().await;
422 let bob = TestContext::new_bob().await;
423 let chat_id = alice.create_chat(&bob).await.id;
424
425 let file = alice.get_blobdir().join("minimal.xdc");
426 tokio::fs::write(&file, include_bytes!("../test-data/webxdc/minimal.xdc")).await?;
427 let mut instance = Message::new(Viewtype::File);
428 instance.set_file_and_deduplicate(&alice, &file, None, None)?;
429 let _sent1 = alice.send_msg(chat_id, &mut instance).await;
430
431 alice
432 .send_webxdc_status_update(instance.id, r#"{"payload":7}"#)
433 .await?;
434 alice.flush_status_updates().await?;
435 let sent2 = alice.pop_sent_msg().await;
436 let sent2_rfc724_mid = sent2.load_from_db().await.rfc724_mid;
437
438 receive_imf_from_inbox(
440 &bob,
441 &sent2_rfc724_mid,
442 sent2.payload().as_bytes(),
443 false,
444 Some(sent2.payload().len() as u32),
445 )
446 .await?;
447 let msg = bob.get_last_msg().await;
448 let chat_id = msg.chat_id;
449 assert_eq!(
450 get_chat_msgs(&bob, chat_id).await?.len(),
451 E2EE_INFO_MSGS + 1
452 );
453 assert_eq!(msg.download_state(), DownloadState::Available);
454
455 receive_imf_from_inbox(
458 &bob,
459 &sent2_rfc724_mid,
460 sent2.payload().as_bytes(),
461 false,
462 None,
463 )
464 .await?;
465 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), E2EE_INFO_MSGS);
466 assert!(
467 Message::load_from_db_optional(&bob, msg.id)
468 .await?
469 .is_none()
470 );
471
472 Ok(())
473 }
474
475 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
476 async fn test_mdn_expands_to_nothing() -> Result<()> {
477 let bob = TestContext::new_bob().await;
478 let raw = b"Subject: Message opened\n\
479 Date: Mon, 10 Jan 2020 00:00:00 +0000\n\
480 Chat-Version: 1.0\n\
481 Message-ID: <bar@example.org>\n\
482 To: Alice <alice@example.org>\n\
483 From: Bob <bob@example.org>\n\
484 Content-Type: multipart/report; report-type=disposition-notification;\n\t\
485 boundary=\"kJBbU58X1xeWNHgBtTbMk80M5qnV4N\"\n\
486 \n\
487 \n\
488 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
489 Content-Type: text/plain; charset=utf-8\n\
490 \n\
491 bla\n\
492 \n\
493 \n\
494 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
495 Content-Type: message/disposition-notification\n\
496 \n\
497 Reporting-UA: Delta Chat 1.88.0\n\
498 Original-Recipient: rfc822;bob@example.org\n\
499 Final-Recipient: rfc822;bob@example.org\n\
500 Original-Message-ID: <foo@example.org>\n\
501 Disposition: manual-action/MDN-sent-automatically; displayed\n\
502 \n\
503 \n\
504 --kJBbU58X1xeWNHgBtTbMk80M5qnV4N--\n\
505 ";
506
507 receive_imf_from_inbox(&bob, "bar@example.org", raw, false, Some(raw.len() as u32)).await?;
509 let msg = bob.get_last_msg().await;
510 let chat_id = msg.chat_id;
511 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 1);
512 assert_eq!(msg.download_state(), DownloadState::Available);
513
514 receive_imf_from_inbox(&bob, "bar@example.org", raw, false, None).await?;
517 assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 0);
518 assert!(
519 Message::load_from_db_optional(&bob, msg.id)
520 .await?
521 .is_none()
522 );
523
524 Ok(())
525 }
526
527 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
531 async fn test_partial_download_trashed() -> Result<()> {
532 let mut tcm = TestContextManager::new();
533 let alice = &tcm.alice().await;
534
535 let imf_raw = b"From: Bob <bob@example.org>\n\
536 To: Alice <alice@example.org>\n\
537 Chat-Version: 1.0\n\
538 Subject: subject\n\
539 Message-ID: <first@example.org>\n\
540 Date: Sun, 14 Nov 2021 00:10:00 +0000\
541 Content-Type: text/plain";
542
543 let partial_received_msg =
545 receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, Some(100000))
546 .await?
547 .unwrap();
548 assert_eq!(partial_received_msg.msg_ids.len(), 1);
549
550 delete_msgs(alice, &[partial_received_msg.msg_ids[0]]).await?;
554
555 let full_received_msg =
557 receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, None).await?;
558
559 assert!(full_received_msg.is_none());
562
563 Ok(())
564 }
565}