deltachat/
download.rs

1//! # Download large messages manually.
2
3use std::cmp::max;
4use std::collections::BTreeMap;
5
6use anyhow::{Result, anyhow, bail, ensure};
7use deltachat_derive::{FromSql, ToSql};
8use serde::{Deserialize, Serialize};
9
10use crate::config::Config;
11use crate::context::Context;
12use crate::imap::session::Session;
13use crate::message::{Message, MsgId, Viewtype};
14use crate::mimeparser::{MimeMessage, Part};
15use crate::tools::time;
16use crate::{EventType, chatlist_events, stock_str};
17
18/// Download limits should not be used below `MIN_DOWNLOAD_LIMIT`.
19///
20/// For better UX, some messages as add-member, non-delivery-reports (NDN) or read-receipts (MDN)
21/// should always be downloaded completely to handle them correctly,
22/// also in larger groups and if group and contact avatar are attached.
23/// Most of these cases are caught by `MIN_DOWNLOAD_LIMIT`.
24pub(crate) const MIN_DOWNLOAD_LIMIT: u32 = 163840;
25
26/// If a message is downloaded only partially
27/// and `delete_server_after` is set to small timeouts (eg. "at once"),
28/// the user might have no chance to actually download that message.
29/// `MIN_DELETE_SERVER_AFTER` increases the timeout in this case.
30pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
31
32/// Download state of the message.
33#[derive(
34    Debug,
35    Default,
36    Display,
37    Clone,
38    Copy,
39    PartialEq,
40    Eq,
41    FromPrimitive,
42    ToPrimitive,
43    FromSql,
44    ToSql,
45    Serialize,
46    Deserialize,
47)]
48#[repr(u32)]
49pub enum DownloadState {
50    /// Message is fully downloaded.
51    #[default]
52    Done = 0,
53
54    /// Message is partially downloaded and can be fully downloaded at request.
55    Available = 10,
56
57    /// Failed to fully download the message.
58    Failure = 20,
59
60    /// Undecipherable message.
61    Undecipherable = 30,
62
63    /// Full download of the message is in progress.
64    InProgress = 1000,
65}
66
67impl Context {
68    // Returns validated download limit or `None` for "no limit".
69    pub(crate) async fn download_limit(&self) -> Result<Option<u32>> {
70        let download_limit = self.get_config_int(Config::DownloadLimit).await?;
71        if download_limit <= 0 {
72            Ok(None)
73        } else {
74            Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
75        }
76    }
77}
78
79impl MsgId {
80    /// Schedules full message download for partially downloaded message.
81    pub async fn download_full(self, context: &Context) -> Result<()> {
82        let msg = Message::load_from_db(context, self).await?;
83        match msg.download_state() {
84            DownloadState::Done | DownloadState::Undecipherable => {
85                return Err(anyhow!("Nothing to download."));
86            }
87            DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
88            DownloadState::Available | DownloadState::Failure => {
89                self.update_download_state(context, DownloadState::InProgress)
90                    .await?;
91                context
92                    .sql
93                    .execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
94                    .await?;
95                context.scheduler.interrupt_inbox().await;
96            }
97        }
98        Ok(())
99    }
100
101    /// Updates the message download state. Returns `Ok` if the message doesn't exist anymore.
102    pub(crate) async fn update_download_state(
103        self,
104        context: &Context,
105        download_state: DownloadState,
106    ) -> Result<()> {
107        if context
108            .sql
109            .execute(
110                "UPDATE msgs SET download_state=? WHERE id=?;",
111                (download_state, self),
112            )
113            .await?
114            == 0
115        {
116            return Ok(());
117        }
118        let Some(msg) = Message::load_from_db_optional(context, self).await? else {
119            return Ok(());
120        };
121        context.emit_event(EventType::MsgsChanged {
122            chat_id: msg.chat_id,
123            msg_id: self,
124        });
125        chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
126        Ok(())
127    }
128}
129
130impl Message {
131    /// Returns the download state of the message.
132    pub fn download_state(&self) -> DownloadState {
133        self.download_state
134    }
135}
136
137/// Actually download a message partially downloaded before.
138///
139/// Most messages are downloaded automatically on fetch instead.
140pub(crate) async fn download_msg(
141    context: &Context,
142    msg_id: MsgId,
143    session: &mut Session,
144) -> Result<()> {
145    let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else {
146        // If partially downloaded message was already deleted
147        // we do not know its Message-ID anymore
148        // so cannot download it.
149        //
150        // Probably the message expired due to `delete_device_after`
151        // setting or was otherwise removed from the device,
152        // so we don't want it to reappear anyway.
153        return Ok(());
154    };
155
156    let transport_id = session.transport_id();
157    let row = context
158        .sql
159        .query_row_optional(
160            "SELECT uid, folder FROM imap
161             WHERE rfc724_mid=?
162             AND transport_id=?
163             AND target!=''",
164            (&msg.rfc724_mid, transport_id),
165            |row| {
166                let server_uid: u32 = row.get(0)?;
167                let server_folder: String = row.get(1)?;
168                Ok((server_uid, server_folder))
169            },
170        )
171        .await?;
172
173    let Some((server_uid, server_folder)) = row else {
174        // No IMAP record found, we don't know the UID and folder.
175        return Err(anyhow!("Call download_full() again to try over."));
176    };
177
178    session
179        .fetch_single_msg(context, &server_folder, server_uid, msg.rfc724_mid.clone())
180        .await?;
181    Ok(())
182}
183
184impl Session {
185    /// Download a single message and pipe it to receive_imf().
186    ///
187    /// receive_imf() is not directly aware that this is a result of a call to download_msg(),
188    /// however, implicitly knows that as the existing message is flagged as being partly.
189    async fn fetch_single_msg(
190        &mut self,
191        context: &Context,
192        folder: &str,
193        uid: u32,
194        rfc724_mid: String,
195    ) -> Result<()> {
196        if uid == 0 {
197            bail!("Attempt to fetch UID 0");
198        }
199
200        let create = false;
201        let folder_exists = self
202            .select_with_uidvalidity(context, folder, create)
203            .await?;
204        ensure!(folder_exists, "No folder {folder}");
205
206        // we are connected, and the folder is selected
207        info!(context, "Downloading message {}/{} fully...", folder, uid);
208
209        let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
210        uid_message_ids.insert(uid, rfc724_mid);
211        let (sender, receiver) = async_channel::unbounded();
212        self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, false, sender)
213            .await?;
214        if receiver.recv().await.is_err() {
215            bail!("Failed to fetch UID {uid}");
216        }
217        Ok(())
218    }
219}
220
221impl MimeMessage {
222    /// Creates a placeholder part and add that to `parts`.
223    ///
224    /// To create the placeholder, only the outermost header can be used,
225    /// the mime-structure itself is not available.
226    ///
227    /// The placeholder part currently contains a text with size and availability of the message.
228    pub(crate) async fn create_stub_from_partial_download(
229        &mut self,
230        context: &Context,
231        org_bytes: u32,
232    ) -> Result<()> {
233        let mut text = format!(
234            "[{}]",
235            stock_str::partial_download_msg_body(context, org_bytes).await
236        );
237        if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
238            let until = stock_str::download_availability(
239                context,
240                time() + max(delete_server_after, MIN_DELETE_SERVER_AFTER),
241            )
242            .await;
243            text += format!(" [{until}]").as_str();
244        };
245
246        info!(context, "Partial download: {}", text);
247
248        self.do_add_single_part(Part {
249            typ: Viewtype::Text,
250            msg: text,
251            ..Default::default()
252        });
253
254        Ok(())
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use num_traits::FromPrimitive;
261
262    use super::*;
263    use crate::chat::{get_chat_msgs, send_msg};
264    use crate::ephemeral::Timer;
265    use crate::message::delete_msgs;
266    use crate::receive_imf::receive_imf_from_inbox;
267    use crate::test_utils::{E2EE_INFO_MSGS, TestContext, TestContextManager};
268
269    #[test]
270    fn test_downloadstate_values() {
271        // values may be written to disk and must not change
272        assert_eq!(DownloadState::Done, DownloadState::default());
273        assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
274        assert_eq!(
275            DownloadState::Available,
276            DownloadState::from_i32(10).unwrap()
277        );
278        assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
279        assert_eq!(
280            DownloadState::InProgress,
281            DownloadState::from_i32(1000).unwrap()
282        );
283    }
284
285    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
286    async fn test_download_limit() -> Result<()> {
287        let t = TestContext::new_alice().await;
288
289        assert_eq!(t.download_limit().await?, None);
290
291        t.set_config(Config::DownloadLimit, Some("200000")).await?;
292        assert_eq!(t.download_limit().await?, Some(200000));
293
294        t.set_config(Config::DownloadLimit, Some("20000")).await?;
295        assert_eq!(t.download_limit().await?, Some(MIN_DOWNLOAD_LIMIT));
296
297        t.set_config(Config::DownloadLimit, None).await?;
298        assert_eq!(t.download_limit().await?, None);
299
300        for val in &["0", "-1", "-100", "", "foo"] {
301            t.set_config(Config::DownloadLimit, Some(val)).await?;
302            assert_eq!(t.download_limit().await?, None);
303        }
304
305        Ok(())
306    }
307
308    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
309    async fn test_update_download_state() -> Result<()> {
310        let t = TestContext::new_alice().await;
311        let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
312
313        let mut msg = Message::new_text("Hi Bob".to_owned());
314        let msg_id = send_msg(&t, chat.id, &mut msg).await?;
315        let msg = Message::load_from_db(&t, msg_id).await?;
316        assert_eq!(msg.download_state(), DownloadState::Done);
317
318        for s in &[
319            DownloadState::Available,
320            DownloadState::InProgress,
321            DownloadState::Failure,
322            DownloadState::Done,
323            DownloadState::Done,
324        ] {
325            msg_id.update_download_state(&t, *s).await?;
326            let msg = Message::load_from_db(&t, msg_id).await?;
327            assert_eq!(msg.download_state(), *s);
328        }
329        t.sql
330            .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
331            .await?;
332        // Nothing to do is ok.
333        msg_id
334            .update_download_state(&t, DownloadState::Done)
335            .await?;
336
337        Ok(())
338    }
339
340    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
341    async fn test_partial_receive_imf() -> Result<()> {
342        let t = TestContext::new_alice().await;
343
344        let header = "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\
345             From: bob@example.com\n\
346             To: alice@example.org\n\
347             Subject: foo\n\
348             Message-ID: <Mr.12345678901@example.com>\n\
349             Chat-Version: 1.0\n\
350             Date: Sun, 22 Mar 2020 22:37:57 +0000\
351             Content-Type: text/plain";
352
353        receive_imf_from_inbox(
354            &t,
355            "Mr.12345678901@example.com",
356            header.as_bytes(),
357            false,
358            Some(100000),
359        )
360        .await?;
361        let msg = t.get_last_msg().await;
362        assert_eq!(msg.download_state(), DownloadState::Available);
363        assert_eq!(msg.get_subject(), "foo");
364        assert!(
365            msg.get_text()
366                .contains(&stock_str::partial_download_msg_body(&t, 100000).await)
367        );
368
369        receive_imf_from_inbox(
370            &t,
371            "Mr.12345678901@example.com",
372            format!("{header}\n\n100k text...").as_bytes(),
373            false,
374            None,
375        )
376        .await?;
377        let msg = t.get_last_msg().await;
378        assert_eq!(msg.download_state(), DownloadState::Done);
379        assert_eq!(msg.get_subject(), "foo");
380        assert_eq!(msg.get_text(), "100k text...");
381
382        Ok(())
383    }
384
385    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
386    async fn test_partial_download_and_ephemeral() -> Result<()> {
387        let t = TestContext::new_alice().await;
388        let chat_id = t
389            .create_chat_with_contact("bob", "bob@example.org")
390            .await
391            .id;
392        chat_id
393            .set_ephemeral_timer(&t, Timer::Enabled { duration: 60 })
394            .await?;
395
396        // download message from bob partially, this must not change the ephemeral timer
397        receive_imf_from_inbox(
398            &t,
399            "first@example.org",
400            b"From: Bob <bob@example.org>\n\
401                    To: Alice <alice@example.org>\n\
402                    Chat-Version: 1.0\n\
403                    Subject: subject\n\
404                    Message-ID: <first@example.org>\n\
405                    Date: Sun, 14 Nov 2021 00:10:00 +0000\
406                    Content-Type: text/plain",
407            false,
408            Some(100000),
409        )
410        .await?;
411        assert_eq!(
412            chat_id.get_ephemeral_timer(&t).await?,
413            Timer::Enabled { duration: 60 }
414        );
415
416        Ok(())
417    }
418
419    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
420    async fn test_status_update_expands_to_nothing() -> Result<()> {
421        let alice = TestContext::new_alice().await;
422        let bob = TestContext::new_bob().await;
423        let chat_id = alice.create_chat(&bob).await.id;
424
425        let file = alice.get_blobdir().join("minimal.xdc");
426        tokio::fs::write(&file, include_bytes!("../test-data/webxdc/minimal.xdc")).await?;
427        let mut instance = Message::new(Viewtype::File);
428        instance.set_file_and_deduplicate(&alice, &file, None, None)?;
429        let _sent1 = alice.send_msg(chat_id, &mut instance).await;
430
431        alice
432            .send_webxdc_status_update(instance.id, r#"{"payload":7}"#)
433            .await?;
434        alice.flush_status_updates().await?;
435        let sent2 = alice.pop_sent_msg().await;
436        let sent2_rfc724_mid = sent2.load_from_db().await.rfc724_mid;
437
438        // not downloading the status update results in an placeholder
439        receive_imf_from_inbox(
440            &bob,
441            &sent2_rfc724_mid,
442            sent2.payload().as_bytes(),
443            false,
444            Some(sent2.payload().len() as u32),
445        )
446        .await?;
447        let msg = bob.get_last_msg().await;
448        let chat_id = msg.chat_id;
449        assert_eq!(
450            get_chat_msgs(&bob, chat_id).await?.len(),
451            E2EE_INFO_MSGS + 1
452        );
453        assert_eq!(msg.download_state(), DownloadState::Available);
454
455        // downloading the status update afterwards expands to nothing and moves the placeholder to trash-chat
456        // (usually status updates are too small for not being downloaded directly)
457        receive_imf_from_inbox(
458            &bob,
459            &sent2_rfc724_mid,
460            sent2.payload().as_bytes(),
461            false,
462            None,
463        )
464        .await?;
465        assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), E2EE_INFO_MSGS);
466        assert!(
467            Message::load_from_db_optional(&bob, msg.id)
468                .await?
469                .is_none()
470        );
471
472        Ok(())
473    }
474
475    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
476    async fn test_mdn_expands_to_nothing() -> Result<()> {
477        let bob = TestContext::new_bob().await;
478        let raw = b"Subject: Message opened\n\
479            Date: Mon, 10 Jan 2020 00:00:00 +0000\n\
480            Chat-Version: 1.0\n\
481            Message-ID: <bar@example.org>\n\
482            To: Alice <alice@example.org>\n\
483            From: Bob <bob@example.org>\n\
484            Content-Type: multipart/report; report-type=disposition-notification;\n\t\
485            boundary=\"kJBbU58X1xeWNHgBtTbMk80M5qnV4N\"\n\
486            \n\
487            \n\
488            --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
489            Content-Type: text/plain; charset=utf-8\n\
490            \n\
491            bla\n\
492            \n\
493            \n\
494            --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
495            Content-Type: message/disposition-notification\n\
496            \n\
497            Reporting-UA: Delta Chat 1.88.0\n\
498            Original-Recipient: rfc822;bob@example.org\n\
499            Final-Recipient: rfc822;bob@example.org\n\
500            Original-Message-ID: <foo@example.org>\n\
501            Disposition: manual-action/MDN-sent-automatically; displayed\n\
502            \n\
503            \n\
504            --kJBbU58X1xeWNHgBtTbMk80M5qnV4N--\n\
505            ";
506
507        // not downloading the mdn results in an placeholder
508        receive_imf_from_inbox(&bob, "bar@example.org", raw, false, Some(raw.len() as u32)).await?;
509        let msg = bob.get_last_msg().await;
510        let chat_id = msg.chat_id;
511        assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 1);
512        assert_eq!(msg.download_state(), DownloadState::Available);
513
514        // downloading the mdn afterwards expands to nothing and deletes the placeholder directly
515        // (usually mdn are too small for not being downloaded directly)
516        receive_imf_from_inbox(&bob, "bar@example.org", raw, false, None).await?;
517        assert_eq!(get_chat_msgs(&bob, chat_id).await?.len(), 0);
518        assert!(
519            Message::load_from_db_optional(&bob, msg.id)
520                .await?
521                .is_none()
522        );
523
524        Ok(())
525    }
526
527    /// Tests that fully downloading the message
528    /// works even if the Message-ID already exists
529    /// in the database assigned to the trash chat.
530    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
531    async fn test_partial_download_trashed() -> Result<()> {
532        let mut tcm = TestContextManager::new();
533        let alice = &tcm.alice().await;
534
535        let imf_raw = b"From: Bob <bob@example.org>\n\
536              To: Alice <alice@example.org>\n\
537              Chat-Version: 1.0\n\
538              Subject: subject\n\
539              Message-ID: <first@example.org>\n\
540              Date: Sun, 14 Nov 2021 00:10:00 +0000\
541              Content-Type: text/plain";
542
543        // Download message from Bob partially.
544        let partial_received_msg =
545            receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, Some(100000))
546                .await?
547                .unwrap();
548        assert_eq!(partial_received_msg.msg_ids.len(), 1);
549
550        // Delete the received message.
551        // Not it is still in the database,
552        // but in the trash chat.
553        delete_msgs(alice, &[partial_received_msg.msg_ids[0]]).await?;
554
555        // Fully download message after deletion.
556        let full_received_msg =
557            receive_imf_from_inbox(alice, "first@example.org", imf_raw, false, None).await?;
558
559        // The message does not reappear.
560        // However, `receive_imf` should not fail.
561        assert!(full_received_msg.is_none());
562
563        Ok(())
564    }
565}