deltachat/
download.rs

1//! # Download large messages manually.
2
3use std::cmp::max;
4use std::collections::BTreeMap;
5
6use anyhow::{Result, anyhow, bail, ensure};
7use deltachat_derive::{FromSql, ToSql};
8use serde::{Deserialize, Serialize};
9
10use crate::config::Config;
11use crate::context::Context;
12use crate::imap::session::Session;
13use crate::log::info;
14use crate::message::{Message, MsgId, Viewtype};
15use crate::mimeparser::{MimeMessage, Part};
16use crate::tools::time;
17use crate::{EventType, chatlist_events, stock_str};
18
19/// Download limits should not be used below `MIN_DOWNLOAD_LIMIT`.
20///
21/// For better UX, some messages as add-member, non-delivery-reports (NDN) or read-receipts (MDN)
22/// should always be downloaded completely to handle them correctly,
23/// also in larger groups and if group and contact avatar are attached.
24/// Most of these cases are caught by `MIN_DOWNLOAD_LIMIT`.
25pub(crate) const MIN_DOWNLOAD_LIMIT: u32 = 163840;
26
27/// If a message is downloaded only partially
28/// and `delete_server_after` is set to small timeouts (eg. "at once"),
29/// the user might have no chance to actually download that message.
30/// `MIN_DELETE_SERVER_AFTER` increases the timeout in this case.
31pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
32
33/// Download state of the message.
34#[derive(
35    Debug,
36    Default,
37    Display,
38    Clone,
39    Copy,
40    PartialEq,
41    Eq,
42    FromPrimitive,
43    ToPrimitive,
44    FromSql,
45    ToSql,
46    Serialize,
47    Deserialize,
48)]
49#[repr(u32)]
50pub enum DownloadState {
51    /// Message is fully downloaded.
52    #[default]
53    Done = 0,
54
55    /// Message is partially downloaded and can be fully downloaded at request.
56    Available = 10,
57
58    /// Failed to fully download the message.
59    Failure = 20,
60
61    /// Undecipherable message.
62    Undecipherable = 30,
63
64    /// Full download of the message is in progress.
65    InProgress = 1000,
66}
67
68impl Context {
69    // Returns validated download limit or `None` for "no limit".
70    pub(crate) async fn download_limit(&self) -> Result<Option<u32>> {
71        let download_limit = self.get_config_int(Config::DownloadLimit).await?;
72        if download_limit <= 0 {
73            Ok(None)
74        } else {
75            Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
76        }
77    }
78}
79
80impl MsgId {
81    /// Schedules full message download for partially downloaded message.
82    pub async fn download_full(self, context: &Context) -> Result<()> {
83        let msg = Message::load_from_db(context, self).await?;
84        match msg.download_state() {
85            DownloadState::Done | DownloadState::Undecipherable => {
86                return Err(anyhow!("Nothing to download."));
87            }
88            DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
89            DownloadState::Available | DownloadState::Failure => {
90                self.update_download_state(context, DownloadState::InProgress)
91                    .await?;
92                context
93                    .sql
94                    .execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
95                    .await?;
96                context.scheduler.interrupt_inbox().await;
97            }
98        }
99        Ok(())
100    }
101
102    /// Updates the message download state. Returns `Ok` if the message doesn't exist anymore.
103    pub(crate) async fn update_download_state(
104        self,
105        context: &Context,
106        download_state: DownloadState,
107    ) -> Result<()> {
108        if context
109            .sql
110            .execute(
111                "UPDATE msgs SET download_state=? WHERE id=?;",
112                (download_state, self),
113            )
114            .await?
115            == 0
116        {
117            return Ok(());
118        }
119        let Some(msg) = Message::load_from_db_optional(context, self).await? else {
120            return Ok(());
121        };
122        context.emit_event(EventType::MsgsChanged {
123            chat_id: msg.chat_id,
124            msg_id: self,
125        });
126        chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
127        Ok(())
128    }
129}
130
131impl Message {
132    /// Returns the download state of the message.
133    pub fn download_state(&self) -> DownloadState {
134        self.download_state
135    }
136}
137
138/// Actually download a message partially downloaded before.
139///
140/// Most messages are downloaded automatically on fetch instead.
141pub(crate) async fn download_msg(
142    context: &Context,
143    msg_id: MsgId,
144    session: &mut Session,
145) -> Result<()> {
146    let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else {
147        // If partially downloaded message was already deleted
148        // we do not know its Message-ID anymore
149        // so cannot download it.
150        //
151        // Probably the message expired due to `delete_device_after`
152        // setting or was otherwise removed from the device,
153        // so we don't want it to reappear anyway.
154        return Ok(());
155    };
156
157    let row = context
158        .sql
159        .query_row_optional(
160            "SELECT uid, folder, uidvalidity FROM imap WHERE rfc724_mid=? AND target!=''",
161            (&msg.rfc724_mid,),
162            |row| {
163                let server_uid: u32 = row.get(0)?;
164                let server_folder: String = row.get(1)?;
165                let uidvalidity: u32 = row.get(2)?;
166                Ok((server_uid, server_folder, uidvalidity))
167            },
168        )
169        .await?;
170
171    let Some((server_uid, server_folder, uidvalidity)) = row else {
172        // No IMAP record found, we don't know the UID and folder.
173        return Err(anyhow!("Call download_full() again to try over."));
174    };
175
176    session
177        .fetch_single_msg(
178            context,
179            &server_folder,
180            uidvalidity,
181            server_uid,
182            msg.rfc724_mid.clone(),
183        )
184        .await?;
185    Ok(())
186}
187
188impl Session {
189    /// Download a single message and pipe it to receive_imf().
190    ///
191    /// receive_imf() is not directly aware that this is a result of a call to download_msg(),
192    /// however, implicitly knows that as the existing message is flagged as being partly.
193    async fn fetch_single_msg(
194        &mut self,
195        context: &Context,
196        folder: &str,
197        uidvalidity: u32,
198        uid: u32,
199        rfc724_mid: String,
200    ) -> Result<()> {
201        if uid == 0 {
202            bail!("Attempt to fetch UID 0");
203        }
204
205        let create = false;
206        let folder_exists = self
207            .select_with_uidvalidity(context, folder, create)
208            .await?;
209        ensure!(folder_exists, "No folder {folder}");
210
211        // we are connected, and the folder is selected
212        info!(context, "Downloading message {}/{} fully...", folder, uid);
213
214        let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
215        uid_message_ids.insert(uid, rfc724_mid);
216        let (sender, receiver) = async_channel::unbounded();
217        self.fetch_many_msgs(
218            context,
219            folder,
220            uidvalidity,
221            vec![uid],
222            &uid_message_ids,
223            false,
224            sender,
225        )
226        .await?;
227        if receiver.recv().await.is_err() {
228            bail!("Failed to fetch UID {uid}");
229        }
230        Ok(())
231    }
232}
233
234impl MimeMessage {
235    /// Creates a placeholder part and add that to `parts`.
236    ///
237    /// To create the placeholder, only the outermost header can be used,
238    /// the mime-structure itself is not available.
239    ///
240    /// The placeholder part currently contains a text with size and availability of the message;
241    /// `error` is set as the part error;
242    /// in the future, we may do more advanced things as previews here.
243    pub(crate) async fn create_stub_from_partial_download(
244        &mut self,
245        context: &Context,
246        org_bytes: u32,
247        error: Option<String>,
248    ) -> Result<()> {
249        let prefix = match error {
250            None => "",
251            Some(_) => "[❗] ",
252        };
253        let mut text = format!(
254            "{prefix}[{}]",
255            stock_str::partial_download_msg_body(context, org_bytes).await
256        );
257        if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
258            let until = stock_str::download_availability(
259                context,
260                time() + max(delete_server_after, MIN_DELETE_SERVER_AFTER),
261            )
262            .await;
263            text += format!(" [{until}]").as_str();
264        };
265
266        info!(context, "Partial download: {}", text);
267
268        self.do_add_single_part(Part {
269            typ: Viewtype::Text,
270            msg: text,
271            error,
272            ..Default::default()
273        });
274
275        Ok(())
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use num_traits::FromPrimitive;
282
283    use super::*;
284    use crate::chat::{get_chat_msgs, send_msg};
285    use crate::ephemeral::Timer;
286    use crate::message::delete_msgs;
287    use crate::receive_imf::receive_imf_from_inbox;
288    use crate::test_utils::{E2EE_INFO_MSGS, TestContext, TestContextManager};
289
290    #[test]
291    fn test_downloadstate_values() {
292        // values may be written to disk and must not change
293        assert_eq!(DownloadState::Done, DownloadState::default());
294        assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
295        assert_eq!(
296            DownloadState::Available,
297            DownloadState::from_i32(10).unwrap()
298        );
299        assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
300        assert_eq!(
301            DownloadState::InProgress,
302            DownloadState::from_i32(1000).unwrap()
303        );
304    }
305
306    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
307    async fn test_download_limit() -> Result<()> {
308        let t = TestContext::new_alice().await;
309
310        assert_eq!(t.download_limit().await?, None);
311
312        t.set_config(Config::DownloadLimit, Some("200000")).await?;
313        assert_eq!(t.download_limit().await?, Some(200000));
314
315        t.set_config(Config::DownloadLimit, Some("20000")).await?;
316        assert_eq!(t.download_limit().await?, Some(MIN_DOWNLOAD_LIMIT));
317
318        t.set_config(Config::DownloadLimit, None).await?;
319        assert_eq!(t.download_limit().await?, None);
320
321        for val in &["0", "-1", "-100", "", "foo"] {
322            t.set_config(Config::DownloadLimit, Some(val)).await?;
323            assert_eq!(t.download_limit().await?, None);
324        }
325
326        Ok(())
327    }
328
329    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
330    async fn test_update_download_state() -> Result<()> {
331        let t = TestContext::new_alice().await;
332        let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
333
334        let mut msg = Message::new_text("Hi Bob".to_owned());
335        let msg_id = send_msg(&t, chat.id, &mut msg).await?;
336        let msg = Message::load_from_db(&t, msg_id).await?;
337        assert_eq!(msg.download_state(), DownloadState::Done);
338
339        for s in &[
340            DownloadState::Available,
341            DownloadState::InProgress,
342            DownloadState::Failure,
343            DownloadState::Done,
344            DownloadState::Done,
345        ] {
346            msg_id.update_download_state(&t, *s).await?;
347            let msg = Message::load_from_db(&t, msg_id).await?;
348            assert_eq!(msg.download_state(), *s);
349        }
350        t.sql
351            .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
352            .await?;
353        // Nothing to do is ok.
354        msg_id
355            .update_download_state(&t, DownloadState::Done)
356            .await?;
357
358        Ok(())
359    }
360
361    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
362    async fn test_partial_receive_imf() -> Result<()> {
363        let t = TestContext::new_alice().await;
364
365        let header = "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\
366             From: bob@example.com\n\
367             To: alice@example.org\n\
368             Subject: foo\n\
369             Message-ID: <Mr.12345678901@example.com>\n\
370             Chat-Version: 1.0\n\
371             Date: Sun, 22 Mar 2020 22:37:57 +0000\
372             Content-Type: text/plain";
373
374        receive_imf_from_inbox(
375            &t,
376            "Mr.12345678901@example.com",
377            header.as_bytes(),
378            false,
379            Some(100000),
380        )
381        .await?;
382        let msg = t.get_last_msg().await;
383        assert_eq!(msg.download_state(), DownloadState::Available);
384        assert_eq!(msg.get_subject(), "foo");
385        assert!(
386            msg.get_text()
387                .contains(&stock_str::partial_download_msg_body(&t, 100000).await)
388        );
389
390        receive_imf_from_inbox(
391            &t,
392            "Mr.12345678901@example.com",
393            format!("{header}\n\n100k text...").as_bytes(),
394            false,
395            None,
396        )
397        .await?;
398        let msg = t.get_last_msg().await;
399        assert_eq!(msg.download_state(), DownloadState::Done);
400        assert_eq!(msg.get_subject(), "foo");
401        assert_eq!(msg.get_text(), "100k text...");
402
403        Ok(())
404    }
405
406    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
407    async fn test_partial_download_and_ephemeral() -> Result<()> {
408        let t = TestContext::new_alice().await;
409        let chat_id = t
410            .create_chat_with_contact("bob", "bob@example.org")
411            .await
412            .id;
413        chat_id
414            .set_ephemeral_timer(&t, Timer::Enabled { duration: 60 })
415            .await?;
416
417        // download message from bob partially, this must not change the ephemeral timer
418        receive_imf_from_inbox(
419            &t,
420            "first@example.org",
421            b"From: Bob <bob@example.org>\n\
422                    To: Alice <alice@example.org>\n\
423                    Chat-Version: 1.0\n\
424                    Subject: subject\n\
425                    Message-ID: <first@example.org>\n\
426                    Date: Sun, 14 Nov 2021 00:10:00 +0000\
427                    Content-Type: text/plain",
428            false,
429            Some(100000),
430        )
431        .await?;
432        assert_eq!(
433            chat_id.get_ephemeral_timer(&t).await?,
434            Timer::Enabled { duration: 60 }
435        );
436
437        Ok(())
438    }
439
440    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
441    async fn test_status_update_expands_to_nothing() -> Result<()> {
442        let alice = TestContext::new_alice().await;
443        let bob = TestContext::new_bob().await;
444        let chat_id = alice.create_chat(&bob).await.id;
445
446        let file = alice.get_blobdir().join("minimal.xdc");
447        tokio::fs::write(&file, include_bytes!("../test-data/webxdc/minimal.xdc")).await?;
448        let mut instance = Message::new(Viewtype::File);
449        instance.set_file_and_deduplicate(&alice, &file, None, None)?;
450        let _sent1 = alice.send_msg(chat_id, &mut instance).await;
451
452        alice
453            .send_webxdc_status_update(instance.id, r#"{"payload":7}"#)
454            .await?;
455        alice.flush_status_updates().await?;
456        let sent2 = alice.pop_sent_msg().await;
457        let sent2_rfc724_mid = sent2.load_from_db().await.rfc724_mid;
458
459        // not downloading the status update results in an placeholder
460        receive_imf_from_inbox(
461            &bob,
462            &sent2_rfc724_mid,
463            sent2.payload().as_bytes(),
464            false,
465            Some(sent2.payload().len() as u32),
466        )
467        .await?;
468        let msg = bob.get_last_msg().await;
469        let chat_id = msg.chat_id;
470        assert_eq!(
471            get_chat_msgs(&bob, chat_id).await?.len(),
472            E2EE_INFO_MSGS + 1
473        );
474        assert_eq!(msg.download_state(), DownloadState::Available);
475
476        // downloading the status update afterwards expands to nothing and moves the placeholder to trash-chat
477        // (usually status updates are too small for not being downloaded directly)
478        receive_imf_from_inbox(
479            &bob,
480            &sent2_rfc724_mid,
481            sent2.payload().as_bytes(),
482            false,
483            None,
484        )
485        .await?;
486        assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), E2EE_INFO_MSGS);
487        assert!(
488            Message::load_from_db_optional(&bob, msg.id)
489                .await?
490                .is_none()
491        );
492
493        Ok(())
494    }
495
496    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
497    async fn test_mdn_expands_to_nothing() -> Result<()> {
498        let bob = TestContext::new_bob().await;
499        let raw = b"Subject: Message opened\n\
500            Date: Mon, 10 Jan 2020 00:00:00 +0000\n\
501            Chat-Version: 1.0\n\
502            Message-ID: <bar@example.org>\n\
503            To: Alice <alice@example.org>\n\
504            From: Bob <bob@example.org>\n\
505            Content-Type: multipart/report; report-type=disposition-notification;\n\t\
506            boundary=\"kJBbU58X1xeWNHgBtTbMk80M5qnV4N\"\n\
507            \n\
508            \n\
509            --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
510            Content-Type: text/plain; charset=utf-8\n\
511            \n\
512            bla\n\
513            \n\
514            \n\
515            --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
516            Content-Type: message/disposition-notification\n\
517            \n\
518            Reporting-UA: Delta Chat 1.88.0\n\
519            Original-Recipient: rfc822;bob@example.org\n\
520            Final-Recipient: rfc822;bob@example.org\n\
521            Original-Message-ID: <foo@example.org>\n\
522            Disposition: manual-action/MDN-sent-automatically; displayed\n\
523            \n\
524            \n\
525            --kJBbU58X1xeWNHgBtTbMk80M5qnV4N--\n\
526            ";
527
528        // not downloading the mdn results in an placeholder
529        receive_imf_from_inbox(&bob, "bar@example.org", raw, false, Some(raw.len() as u32)).await?;
530        let msg = bob.get_last_msg().await;
531        let chat_id = msg.chat_id;
532        assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 1);
533        assert_eq!(msg.download_state(), DownloadState::Available);
534
535        // downloading the mdn afterwards expands to nothing and deletes the placeholder directly
536        // (usually mdn are too small for not being downloaded directly)
537        receive_imf_from_inbox(&bob, "bar@example.org", raw, false, None).await?;
538        assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 0);
539        assert!(
540            Message::load_from_db_optional(&bob, msg.id)
541                .await?
542                .is_none()
543        );
544
545        Ok(())
546    }
547
548    /// Tests that fully downloading the message
549    /// works even if the Message-ID already exists
550    /// in the database assigned to the trash chat.
551    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
552    async fn test_partial_download_trashed() -> Result<()> {
553        let mut tcm = TestContextManager::new();
554        let alice = &tcm.alice().await;
555
556        let imf_raw = b"From: Bob <bob@example.org>\n\
557              To: Alice <alice@example.org>\n\
558              Chat-Version: 1.0\n\
559              Subject: subject\n\
560              Message-ID: <first@example.org>\n\
561              Date: Sun, 14 Nov 2021 00:10:00 +0000\
562              Content-Type: text/plain";
563
564        // Download message from Bob partially.
565        let partial_received_msg =
566            receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, Some(100000))
567                .await?
568                .unwrap();
569        assert_eq!(partial_received_msg.msg_ids.len(), 1);
570
571        // Delete the received message.
572        // Not it is still in the database,
573        // but in the trash chat.
574        delete_msgs(alice, &[partial_received_msg.msg_ids[0]]).await?;
575
576        // Fully download message after deletion.
577        let full_received_msg =
578            receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, None).await?;
579
580        // The message does not reappear.
581        // However, `receive_imf` should not fail.
582        assert!(full_received_msg.is_none());
583
584        Ok(())
585    }
586}