Skip to main content

deltachat/
download.rs

1//! # Download large messages manually.
2
3use std::collections::BTreeMap;
4
5use anyhow::{Result, anyhow, bail, ensure};
6use deltachat_derive::{FromSql, ToSql};
7use serde::{Deserialize, Serialize};
8
9use crate::config::Config;
10use crate::context::Context;
11use crate::imap::session::Session;
12use crate::log::warn;
13use crate::message::{self, Message, MsgId, rfc724_mid_exists};
14use crate::{EventType, chatlist_events, ephemeral};
15
16pub(crate) mod post_msg_metadata;
17pub(crate) use post_msg_metadata::PostMsgMetadata;
18
19/// From this point onward outgoing messages are considered large
20/// and get a Pre-Message, which announces the Post-Message.
21/// This is only about sending so we can modify it any time.
22/// Current value is a bit less than the minimum auto-download setting from the UIs (which is 160
23/// KiB).
24pub(crate) const PRE_MSG_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
25
26/// Max size for pre messages. A warning is emitted when this is exceeded.
27pub(crate) const PRE_MSG_SIZE_WARNING_THRESHOLD: usize = 150_000;
28
29/// Download state of the message.
30#[derive(
31    Debug,
32    Default,
33    Display,
34    Clone,
35    Copy,
36    PartialEq,
37    Eq,
38    FromPrimitive,
39    ToPrimitive,
40    FromSql,
41    ToSql,
42    Serialize,
43    Deserialize,
44)]
45#[repr(u32)]
46pub enum DownloadState {
47    /// Message is fully downloaded.
48    #[default]
49    Done = 0,
50
51    /// Message is partially downloaded and can be fully downloaded at request.
52    Available = 10,
53
54    /// Failed to fully download the message.
55    Failure = 20,
56
57    /// Undecipherable message.
58    Undecipherable = 30,
59
60    /// Full download of the message is in progress.
61    InProgress = 1000,
62}
63
64impl MsgId {
65    /// Schedules Post-Message download for partially downloaded message.
66    pub async fn download_full(self, context: &Context) -> Result<()> {
67        let msg = Message::load_from_db(context, self).await?;
68        match msg.download_state() {
69            DownloadState::Done | DownloadState::Undecipherable => {
70                return Err(anyhow!("Nothing to download."));
71            }
72            DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
73            DownloadState::Available | DownloadState::Failure => {
74                if msg.rfc724_mid().is_empty() {
75                    return Err(anyhow!("Download not possible, message has no rfc724_mid"));
76                }
77                self.update_download_state(context, DownloadState::InProgress)
78                    .await?;
79                info!(
80                    context,
81                    "Requesting full download of {:?}.",
82                    msg.rfc724_mid()
83                );
84                context
85                    .sql
86                    .execute(
87                        "INSERT INTO download (rfc724_mid, msg_id) VALUES (?,?)",
88                        (msg.rfc724_mid(), msg.id),
89                    )
90                    .await?;
91                context.scheduler.interrupt_inbox().await;
92            }
93        }
94        Ok(())
95    }
96
97    /// Updates the message download state. Returns `Ok` if the message doesn't exist anymore or has
98    /// the download state up to date.
99    pub(crate) async fn update_download_state(
100        self,
101        context: &Context,
102        download_state: DownloadState,
103    ) -> Result<()> {
104        if context
105            .sql
106            .execute(
107                "UPDATE msgs SET download_state=? WHERE id=? AND download_state<>?1",
108                (download_state, self),
109            )
110            .await?
111            == 0
112        {
113            return Ok(());
114        }
115        let Some(msg) = Message::load_from_db_optional(context, self).await? else {
116            return Ok(());
117        };
118        context.emit_event(EventType::MsgsChanged {
119            chat_id: msg.chat_id,
120            msg_id: self,
121        });
122        chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
123        Ok(())
124    }
125}
126
127impl Message {
128    /// Returns the download state of the message.
129    pub fn download_state(&self) -> DownloadState {
130        self.download_state
131    }
132}
133
134/// Actually downloads a message partially downloaded before if the message is available on the
135/// session transport, in which case returns `Some`. If the message is available on another
136/// transport, returns `None`.
137///
138/// Most messages are downloaded automatically on fetch instead.
139pub(crate) async fn download_msg(
140    context: &Context,
141    rfc724_mid: String,
142    session: &mut Session,
143) -> Result<Option<()>> {
144    let transport_id = session.transport_id();
145    let row = context
146        .sql
147        .query_row_optional(
148            "SELECT uid, folder, transport_id FROM imap
149             WHERE rfc724_mid=? AND target!=''
150             ORDER BY transport_id=? DESC LIMIT 1",
151            (&rfc724_mid, transport_id),
152            |row| {
153                let server_uid: u32 = row.get(0)?;
154                let server_folder: String = row.get(1)?;
155                let msg_transport_id: u32 = row.get(2)?;
156                Ok((server_uid, server_folder, msg_transport_id))
157            },
158        )
159        .await?;
160
161    let Some((server_uid, server_folder, msg_transport_id)) = row else {
162        // No IMAP record found, we don't know the UID and folder.
163        delete_from_available_post_msgs(context, &rfc724_mid).await?;
164        return Err(anyhow!(
165            "IMAP location for {rfc724_mid:?} post-message is unknown"
166        ));
167    };
168    if msg_transport_id != transport_id {
169        return Ok(None);
170    }
171    Box::pin(session.fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)).await?;
172
173    let bcc_self = context.get_config_bool(Config::BccSelf).await?;
174    if ephemeral::should_delete_all_downloaded_messages(bcc_self, session.is_chatmail()) {
175        // Now that the message was downloaded, it likely needs to be deleted;
176        // trigger a re-check by interrupting the inbox folder.
177        // This is mainly needed to make the tests pass;
178        // on real devices, it would be fine to delete the message at the next iteration.
179        context.scheduler.interrupt_inbox().await;
180    }
181
182    Ok(Some(()))
183}
184
185impl Session {
186    /// Download a single message and pipe it to receive_imf().
187    ///
188    /// receive_imf() is not directly aware that this is a result of a call to download_msg(),
189    /// however, implicitly knows that as the existing message is flagged as being partly.
190    async fn fetch_single_msg(
191        &mut self,
192        context: &Context,
193        folder: &str,
194        uid: u32,
195        rfc724_mid: String,
196    ) -> Result<()> {
197        if uid == 0 {
198            bail!("Attempt to fetch UID 0");
199        }
200
201        let folder_exists = self.select_with_uidvalidity(context, folder).await?;
202        ensure!(folder_exists, "No folder {folder}");
203
204        // we are connected, and the folder is selected
205        info!(context, "Downloading message {}/{} fully...", folder, uid);
206
207        let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
208        uid_message_ids.insert(uid, rfc724_mid);
209        let (sender, receiver) = async_channel::unbounded();
210        {
211            let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
212            self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, sender)
213                .await?;
214        }
215        if receiver.recv().await.is_err() {
216            bail!("Failed to fetch UID {uid}");
217        }
218        Ok(())
219    }
220}
221
222async fn set_state_to_failure(context: &Context, rfc724_mid: &str) -> Result<()> {
223    if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
224        // Update download state to failure
225        // so it can be retried.
226        //
227        // On success update_download_state() is not needed
228        // as receive_imf() already
229        // set the state and emitted the event.
230        msg_id
231            .update_download_state(context, DownloadState::Failure)
232            .await?;
233    }
234    Ok(())
235}
236
237async fn available_post_msgs_contains_rfc724_mid(
238    context: &Context,
239    rfc724_mid: &str,
240) -> Result<bool> {
241    Ok(context
242        .sql
243        .query_get_value::<String>(
244            "SELECT rfc724_mid FROM available_post_msgs WHERE rfc724_mid=?",
245            (&rfc724_mid,),
246        )
247        .await?
248        .is_some())
249}
250
251async fn delete_from_available_post_msgs(context: &Context, rfc724_mid: &str) -> Result<()> {
252    context
253        .sql
254        .execute(
255            "DELETE FROM available_post_msgs WHERE rfc724_mid=?",
256            (&rfc724_mid,),
257        )
258        .await?;
259    Ok(())
260}
261
262async fn delete_from_downloads(context: &Context, rfc724_mid: &str) -> Result<()> {
263    context
264        .sql
265        .execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
266        .await?;
267    Ok(())
268}
269
270pub(crate) async fn msg_is_downloaded_for(context: &Context, rfc724_mid: &str) -> Result<bool> {
271    Ok(message::rfc724_mid_exists(context, rfc724_mid)
272        .await?
273        .is_some())
274}
275
276pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
277    let rfc724_mids = context
278        .sql
279        .query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
280            let rfc724_mid: String = row.get(0)?;
281            Ok(rfc724_mid)
282        })
283        .await?;
284
285    for rfc724_mid in &rfc724_mids {
286        let res = download_msg(context, rfc724_mid.clone(), session).await;
287        if let Ok(Some(())) = res {
288            delete_from_downloads(context, rfc724_mid).await?;
289            delete_from_available_post_msgs(context, rfc724_mid).await?;
290        }
291        if let Err(err) = res {
292            warn!(
293                context,
294                "Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
295            );
296            if !msg_is_downloaded_for(context, rfc724_mid).await? {
297                // This is probably a classical email that vanished before we could download it
298                warn!(
299                    context,
300                    "{rfc724_mid} download failed and there is no downloaded pre-message."
301                );
302                delete_from_downloads(context, rfc724_mid).await?;
303            } else if available_post_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
304                warn!(
305                    context,
306                    "{rfc724_mid} is in available_post_msgs table but we failed to fetch it,
307                    so set the message to DownloadState::Failure - probably it was deleted on the server in the meantime"
308                );
309                set_state_to_failure(context, rfc724_mid).await?;
310                delete_from_downloads(context, rfc724_mid).await?;
311                delete_from_available_post_msgs(context, rfc724_mid).await?;
312            } else {
313                // leave the message in DownloadState::InProgress;
314                // it will be downloaded once it arrives.
315            }
316        }
317    }
318
319    Ok(())
320}
321
322/// Downloads known post-messages without pre-messages
323/// in order to guard against lost pre-messages.
324pub(crate) async fn download_known_post_messages_without_pre_message(
325    context: &Context,
326    session: &mut Session,
327) -> Result<()> {
328    let rfc724_mids = context
329        .sql
330        .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
331            let rfc724_mid: String = row.get(0)?;
332            Ok(rfc724_mid)
333        })
334        .await?;
335    for rfc724_mid in &rfc724_mids {
336        if msg_is_downloaded_for(context, rfc724_mid).await? {
337            delete_from_available_post_msgs(context, rfc724_mid).await?;
338            continue;
339        }
340
341        // Download the Post-Message unconditionally,
342        // because the Pre-Message got lost.
343        // The message may be in the wrong order,
344        // but at least we have it at all.
345        let res = download_msg(context, rfc724_mid.clone(), session).await;
346        if let Ok(Some(())) = res {
347            delete_from_available_post_msgs(context, rfc724_mid).await?;
348        }
349        if let Err(err) = res {
350            warn!(
351                context,
352                "download_known_post_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
353                err
354            );
355        }
356    }
357    Ok(())
358}
359
360#[cfg(test)]
361mod tests {
362    use num_traits::FromPrimitive;
363
364    use super::*;
365    use crate::chat::send_msg;
366    use crate::test_utils::TestContextManager;
367
368    #[test]
369    fn test_downloadstate_values() {
370        // values may be written to disk and must not change
371        assert_eq!(DownloadState::Done, DownloadState::default());
372        assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
373        assert_eq!(
374            DownloadState::Available,
375            DownloadState::from_i32(10).unwrap()
376        );
377        assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
378        assert_eq!(
379            DownloadState::InProgress,
380            DownloadState::from_i32(1000).unwrap()
381        );
382    }
383
384    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
385    async fn test_update_download_state() -> Result<()> {
386        let mut tcm = TestContextManager::new();
387        let t = &tcm.alice().await;
388        let bob = &tcm.bob().await;
389        let chat_id = t.create_chat_id(bob).await;
390
391        let mut msg = Message::new_text("Hi Bob".to_owned());
392        let msg_id = send_msg(t, chat_id, &mut msg).await?;
393        let msg = Message::load_from_db(t, msg_id).await?;
394        assert_eq!(msg.download_state(), DownloadState::Done);
395
396        for s in &[
397            DownloadState::Available,
398            DownloadState::InProgress,
399            DownloadState::Failure,
400            DownloadState::Done,
401            DownloadState::Done,
402        ] {
403            msg_id.update_download_state(t, *s).await?;
404            let msg = Message::load_from_db(t, msg_id).await?;
405            assert_eq!(msg.download_state(), *s);
406        }
407        t.sql
408            .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
409            .await?;
410        // Nothing to do is ok.
411        msg_id.update_download_state(t, DownloadState::Done).await?;
412
413        Ok(())
414    }
415}