Skip to main content

deltachat/
download.rs

1//! # Download large messages manually.
2
3use std::collections::BTreeMap;
4
5use anyhow::{Result, anyhow, bail, ensure};
6use deltachat_derive::{FromSql, ToSql};
7use serde::{Deserialize, Serialize};
8
9use crate::context::Context;
10use crate::imap::session::Session;
11use crate::log::warn;
12use crate::message::{self, Message, MsgId, rfc724_mid_exists};
13use crate::{EventType, chatlist_events, ephemeral};
14
15pub(crate) mod post_msg_metadata;
16pub(crate) use post_msg_metadata::PostMsgMetadata;
17
18/// From this point onward outgoing messages are considered large
19/// and get a Pre-Message, which announces the Post-Message.
20/// This is only about sending so we can modify it any time.
21/// Current value is a bit less than the minimum auto-download setting from the UIs (which is 160
22/// KiB).
23pub(crate) const PRE_MSG_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
24
25/// Max size for pre messages. A warning is emitted when this is exceeded.
26pub(crate) const PRE_MSG_SIZE_WARNING_THRESHOLD: usize = 150_000;
27
28/// Download state of the message.
29#[derive(
30    Debug,
31    Default,
32    Display,
33    Clone,
34    Copy,
35    PartialEq,
36    Eq,
37    FromPrimitive,
38    ToPrimitive,
39    FromSql,
40    ToSql,
41    Serialize,
42    Deserialize,
43)]
44#[repr(u32)]
45pub enum DownloadState {
46    /// Message is fully downloaded.
47    #[default]
48    Done = 0,
49
50    /// Message is partially downloaded and can be fully downloaded at request.
51    Available = 10,
52
53    /// Failed to fully download the message.
54    Failure = 20,
55
56    /// Undecipherable message.
57    Undecipherable = 30,
58
59    /// Full download of the message is in progress.
60    InProgress = 1000,
61}
62
63impl MsgId {
64    /// Schedules Post-Message download for partially downloaded message.
65    pub async fn download_full(self, context: &Context) -> Result<()> {
66        let msg = Message::load_from_db(context, self).await?;
67        match msg.download_state() {
68            DownloadState::Done | DownloadState::Undecipherable => {
69                return Err(anyhow!("Nothing to download."));
70            }
71            DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
72            DownloadState::Available | DownloadState::Failure => {
73                if msg.rfc724_mid().is_empty() {
74                    return Err(anyhow!("Download not possible, message has no rfc724_mid"));
75                }
76                self.update_download_state(context, DownloadState::InProgress)
77                    .await?;
78                info!(
79                    context,
80                    "Requesting full download of {:?}.",
81                    msg.rfc724_mid()
82                );
83                context
84                    .sql
85                    .execute(
86                        "INSERT INTO download (rfc724_mid, msg_id) VALUES (?,?)",
87                        (msg.rfc724_mid(), msg.id),
88                    )
89                    .await?;
90                context.scheduler.interrupt_inbox().await;
91            }
92        }
93        Ok(())
94    }
95
96    /// Updates the message download state. Returns `Ok` if the message doesn't exist anymore or has
97    /// the download state up to date.
98    pub(crate) async fn update_download_state(
99        self,
100        context: &Context,
101        download_state: DownloadState,
102    ) -> Result<()> {
103        if context
104            .sql
105            .execute(
106                "UPDATE msgs SET download_state=? WHERE id=? AND download_state<>?1",
107                (download_state, self),
108            )
109            .await?
110            == 0
111        {
112            return Ok(());
113        }
114        let Some(msg) = Message::load_from_db_optional(context, self).await? else {
115            return Ok(());
116        };
117        context.emit_event(EventType::MsgsChanged {
118            chat_id: msg.chat_id,
119            msg_id: self,
120        });
121        chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
122        Ok(())
123    }
124}
125
126impl Message {
127    /// Returns the download state of the message.
128    pub fn download_state(&self) -> DownloadState {
129        self.download_state
130    }
131}
132
133/// Actually downloads a message partially downloaded before if the message is available on the
134/// session transport, in which case returns `Some`. If the message is available on another
135/// transport, returns `None`.
136///
137/// Most messages are downloaded automatically on fetch instead.
138pub(crate) async fn download_msg(
139    context: &Context,
140    rfc724_mid: String,
141    session: &mut Session,
142) -> Result<Option<()>> {
143    let transport_id = session.transport_id();
144    let row = context
145        .sql
146        .query_row_optional(
147            "SELECT uid, folder, transport_id FROM imap
148             WHERE rfc724_mid=? AND target!=''
149             ORDER BY transport_id=? DESC LIMIT 1",
150            (&rfc724_mid, transport_id),
151            |row| {
152                let server_uid: u32 = row.get(0)?;
153                let server_folder: String = row.get(1)?;
154                let msg_transport_id: u32 = row.get(2)?;
155                Ok((server_uid, server_folder, msg_transport_id))
156            },
157        )
158        .await?;
159
160    let Some((server_uid, server_folder, msg_transport_id)) = row else {
161        // No IMAP record found, we don't know the UID and folder.
162        delete_from_available_post_msgs(context, &rfc724_mid).await?;
163        return Err(anyhow!(
164            "IMAP location for {rfc724_mid:?} post-message is unknown"
165        ));
166    };
167    if msg_transport_id != transport_id {
168        return Ok(None);
169    }
170    Box::pin(session.fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)).await?;
171
172    if ephemeral::should_delete_all_downloaded_messages(context, session.is_chatmail()).await? {
173        // Now that the message was downloaded, it likely needs to be deleted;
174        // trigger a re-check by interrupting the inbox folder.
175        // This is mainly needed to make the tests pass;
176        // on real devices, it would be fine to delete the message at the next iteration.
177        context.scheduler.interrupt_inbox().await;
178    }
179
180    Ok(Some(()))
181}
182
183impl Session {
184    /// Download a single message and pipe it to receive_imf().
185    ///
186    /// receive_imf() is not directly aware that this is a result of a call to download_msg(),
187    /// however, implicitly knows that as the existing message is flagged as being partly.
188    async fn fetch_single_msg(
189        &mut self,
190        context: &Context,
191        folder: &str,
192        uid: u32,
193        rfc724_mid: String,
194    ) -> Result<()> {
195        if uid == 0 {
196            bail!("Attempt to fetch UID 0");
197        }
198
199        let folder_exists = self.select_with_uidvalidity(context, folder).await?;
200        ensure!(folder_exists, "No folder {folder}");
201
202        // we are connected, and the folder is selected
203        info!(context, "Downloading message {}/{} fully...", folder, uid);
204
205        let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
206        uid_message_ids.insert(uid, rfc724_mid);
207        let (sender, receiver) = async_channel::unbounded();
208        {
209            let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
210            self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, sender)
211                .await?;
212        }
213        if receiver.recv().await.is_err() {
214            bail!("Failed to fetch UID {uid}");
215        }
216        Ok(())
217    }
218}
219
220async fn set_state_to_failure(context: &Context, rfc724_mid: &str) -> Result<()> {
221    if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
222        // Update download state to failure
223        // so it can be retried.
224        //
225        // On success update_download_state() is not needed
226        // as receive_imf() already
227        // set the state and emitted the event.
228        msg_id
229            .update_download_state(context, DownloadState::Failure)
230            .await?;
231    }
232    Ok(())
233}
234
235async fn available_post_msgs_contains_rfc724_mid(
236    context: &Context,
237    rfc724_mid: &str,
238) -> Result<bool> {
239    Ok(context
240        .sql
241        .query_get_value::<String>(
242            "SELECT rfc724_mid FROM available_post_msgs WHERE rfc724_mid=?",
243            (&rfc724_mid,),
244        )
245        .await?
246        .is_some())
247}
248
249async fn delete_from_available_post_msgs(context: &Context, rfc724_mid: &str) -> Result<()> {
250    context
251        .sql
252        .execute(
253            "DELETE FROM available_post_msgs WHERE rfc724_mid=?",
254            (&rfc724_mid,),
255        )
256        .await?;
257    Ok(())
258}
259
260async fn delete_from_downloads(context: &Context, rfc724_mid: &str) -> Result<()> {
261    context
262        .sql
263        .execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
264        .await?;
265    Ok(())
266}
267
268pub(crate) async fn msg_is_downloaded_for(context: &Context, rfc724_mid: &str) -> Result<bool> {
269    Ok(message::rfc724_mid_exists(context, rfc724_mid)
270        .await?
271        .is_some())
272}
273
274pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
275    let rfc724_mids = context
276        .sql
277        .query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
278            let rfc724_mid: String = row.get(0)?;
279            Ok(rfc724_mid)
280        })
281        .await?;
282
283    for rfc724_mid in &rfc724_mids {
284        let res = download_msg(context, rfc724_mid.clone(), session).await;
285        if let Ok(Some(())) = res {
286            delete_from_downloads(context, rfc724_mid).await?;
287            delete_from_available_post_msgs(context, rfc724_mid).await?;
288        }
289        if let Err(err) = res {
290            warn!(
291                context,
292                "Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
293            );
294            if !msg_is_downloaded_for(context, rfc724_mid).await? {
295                // This is probably a classical email that vanished before we could download it
296                warn!(
297                    context,
298                    "{rfc724_mid} download failed and there is no downloaded pre-message."
299                );
300                delete_from_downloads(context, rfc724_mid).await?;
301            } else if available_post_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
302                warn!(
303                    context,
304                    "{rfc724_mid} is in available_post_msgs table but we failed to fetch it,
305                    so set the message to DownloadState::Failure - probably it was deleted on the server in the meantime"
306                );
307                set_state_to_failure(context, rfc724_mid).await?;
308                delete_from_downloads(context, rfc724_mid).await?;
309                delete_from_available_post_msgs(context, rfc724_mid).await?;
310            } else {
311                // leave the message in DownloadState::InProgress;
312                // it will be downloaded once it arrives.
313            }
314        }
315    }
316
317    Ok(())
318}
319
320/// Downloads known post-messages without pre-messages
321/// in order to guard against lost pre-messages.
322pub(crate) async fn download_known_post_messages_without_pre_message(
323    context: &Context,
324    session: &mut Session,
325) -> Result<()> {
326    let rfc724_mids = context
327        .sql
328        .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
329            let rfc724_mid: String = row.get(0)?;
330            Ok(rfc724_mid)
331        })
332        .await?;
333    for rfc724_mid in &rfc724_mids {
334        if msg_is_downloaded_for(context, rfc724_mid).await? {
335            delete_from_available_post_msgs(context, rfc724_mid).await?;
336            continue;
337        }
338
339        // Download the Post-Message unconditionally,
340        // because the Pre-Message got lost.
341        // The message may be in the wrong order,
342        // but at least we have it at all.
343        let res = download_msg(context, rfc724_mid.clone(), session).await;
344        if let Ok(Some(())) = res {
345            delete_from_available_post_msgs(context, rfc724_mid).await?;
346        }
347        if let Err(err) = res {
348            warn!(
349                context,
350                "download_known_post_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
351                err
352            );
353        }
354    }
355    Ok(())
356}
357
358#[cfg(test)]
359mod tests {
360    use num_traits::FromPrimitive;
361
362    use super::*;
363    use crate::chat::send_msg;
364    use crate::test_utils::TestContextManager;
365
366    #[test]
367    fn test_downloadstate_values() {
368        // values may be written to disk and must not change
369        assert_eq!(DownloadState::Done, DownloadState::default());
370        assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
371        assert_eq!(
372            DownloadState::Available,
373            DownloadState::from_i32(10).unwrap()
374        );
375        assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
376        assert_eq!(
377            DownloadState::InProgress,
378            DownloadState::from_i32(1000).unwrap()
379        );
380    }
381
382    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
383    async fn test_update_download_state() -> Result<()> {
384        let mut tcm = TestContextManager::new();
385        let t = &tcm.alice().await;
386        let bob = &tcm.bob().await;
387        let chat_id = t.create_chat_id(bob).await;
388
389        let mut msg = Message::new_text("Hi Bob".to_owned());
390        let msg_id = send_msg(t, chat_id, &mut msg).await?;
391        let msg = Message::load_from_db(t, msg_id).await?;
392        assert_eq!(msg.download_state(), DownloadState::Done);
393
394        for s in &[
395            DownloadState::Available,
396            DownloadState::InProgress,
397            DownloadState::Failure,
398            DownloadState::Done,
399            DownloadState::Done,
400        ] {
401            msg_id.update_download_state(t, *s).await?;
402            let msg = Message::load_from_db(t, msg_id).await?;
403            assert_eq!(msg.download_state(), *s);
404        }
405        t.sql
406            .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
407            .await?;
408        // Nothing to do is ok.
409        msg_id.update_download_state(t, DownloadState::Done).await?;
410
411        Ok(())
412    }
413}