deltachat/
download.rs

1//! # Download large messages manually.
2
3use std::collections::BTreeMap;
4
5use anyhow::{Result, anyhow, bail, ensure};
6use deltachat_derive::{FromSql, ToSql};
7use serde::{Deserialize, Serialize};
8
9use crate::context::Context;
10use crate::imap::session::Session;
11use crate::log::warn;
12use crate::message::{self, Message, MsgId, rfc724_mid_exists};
13use crate::{EventType, chatlist_events};
14
15pub(crate) mod post_msg_metadata;
16pub(crate) use post_msg_metadata::PostMsgMetadata;
17
18/// If a message is downloaded only partially
19/// and `delete_server_after` is set to small timeouts (eg. "at once"),
20/// the user might have no chance to actually download that message.
21/// `MIN_DELETE_SERVER_AFTER` increases the timeout in this case.
22pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
23
24/// From this point onward outgoing messages are considered large
25/// and get a Pre-Message, which announces the Post-Message.
26/// This is only about sending so we can modify it any time.
27/// Current value is a bit less than the minimum auto-download setting from the UIs (which is 160
28/// KiB).
29pub(crate) const PRE_MSG_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
30
31/// Max size for pre messages. A warning is emitted when this is exceeded.
32pub(crate) const PRE_MSG_SIZE_WARNING_THRESHOLD: usize = 150_000;
33
34/// Download state of the message.
35#[derive(
36    Debug,
37    Default,
38    Display,
39    Clone,
40    Copy,
41    PartialEq,
42    Eq,
43    FromPrimitive,
44    ToPrimitive,
45    FromSql,
46    ToSql,
47    Serialize,
48    Deserialize,
49)]
50#[repr(u32)]
51pub enum DownloadState {
52    /// Message is fully downloaded.
53    #[default]
54    Done = 0,
55
56    /// Message is partially downloaded and can be fully downloaded at request.
57    Available = 10,
58
59    /// Failed to fully download the message.
60    Failure = 20,
61
62    /// Undecipherable message.
63    Undecipherable = 30,
64
65    /// Full download of the message is in progress.
66    InProgress = 1000,
67}
68
69impl MsgId {
70    /// Schedules Post-Message download for partially downloaded message.
71    pub async fn download_full(self, context: &Context) -> Result<()> {
72        let msg = Message::load_from_db(context, self).await?;
73        match msg.download_state() {
74            DownloadState::Done | DownloadState::Undecipherable => {
75                return Err(anyhow!("Nothing to download."));
76            }
77            DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
78            DownloadState::Available | DownloadState::Failure => {
79                if msg.rfc724_mid().is_empty() {
80                    return Err(anyhow!("Download not possible, message has no rfc724_mid"));
81                }
82                self.update_download_state(context, DownloadState::InProgress)
83                    .await?;
84                info!(
85                    context,
86                    "Requesting full download of {:?}.",
87                    msg.rfc724_mid()
88                );
89                context
90                    .sql
91                    .execute(
92                        "INSERT INTO download (rfc724_mid, msg_id) VALUES (?,?)",
93                        (msg.rfc724_mid(), msg.id),
94                    )
95                    .await?;
96                context.scheduler.interrupt_inbox().await;
97            }
98        }
99        Ok(())
100    }
101
102    /// Updates the message download state. Returns `Ok` if the message doesn't exist anymore or has
103    /// the download state up to date.
104    pub(crate) async fn update_download_state(
105        self,
106        context: &Context,
107        download_state: DownloadState,
108    ) -> Result<()> {
109        if context
110            .sql
111            .execute(
112                "UPDATE msgs SET download_state=? WHERE id=? AND download_state<>?1",
113                (download_state, self),
114            )
115            .await?
116            == 0
117        {
118            return Ok(());
119        }
120        let Some(msg) = Message::load_from_db_optional(context, self).await? else {
121            return Ok(());
122        };
123        context.emit_event(EventType::MsgsChanged {
124            chat_id: msg.chat_id,
125            msg_id: self,
126        });
127        chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
128        Ok(())
129    }
130}
131
132impl Message {
133    /// Returns the download state of the message.
134    pub fn download_state(&self) -> DownloadState {
135        self.download_state
136    }
137}
138
139/// Actually downloads a message partially downloaded before if the message is available on the
140/// session transport, in which case returns `Some`. If the message is available on another
141/// transport, returns `None`.
142///
143/// Most messages are downloaded automatically on fetch instead.
144pub(crate) async fn download_msg(
145    context: &Context,
146    rfc724_mid: String,
147    session: &mut Session,
148) -> Result<Option<()>> {
149    let transport_id = session.transport_id();
150    let row = context
151        .sql
152        .query_row_optional(
153            "SELECT uid, folder, transport_id FROM imap
154             WHERE rfc724_mid=? AND target!=''
155             ORDER BY transport_id=? DESC LIMIT 1",
156            (&rfc724_mid, transport_id),
157            |row| {
158                let server_uid: u32 = row.get(0)?;
159                let server_folder: String = row.get(1)?;
160                let msg_transport_id: u32 = row.get(2)?;
161                Ok((server_uid, server_folder, msg_transport_id))
162            },
163        )
164        .await?;
165
166    let Some((server_uid, server_folder, msg_transport_id)) = row else {
167        // No IMAP record found, we don't know the UID and folder.
168        delete_from_available_post_msgs(context, &rfc724_mid).await?;
169        return Err(anyhow!(
170            "IMAP location for {rfc724_mid:?} post-message is unknown"
171        ));
172    };
173    if msg_transport_id != transport_id {
174        return Ok(None);
175    }
176    Box::pin(session.fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)).await?;
177    Ok(Some(()))
178}
179
180impl Session {
181    /// Download a single message and pipe it to receive_imf().
182    ///
183    /// receive_imf() is not directly aware that this is a result of a call to download_msg(),
184    /// however, implicitly knows that as the existing message is flagged as being partly.
185    async fn fetch_single_msg(
186        &mut self,
187        context: &Context,
188        folder: &str,
189        uid: u32,
190        rfc724_mid: String,
191    ) -> Result<()> {
192        if uid == 0 {
193            bail!("Attempt to fetch UID 0");
194        }
195
196        let folder_exists = self.select_with_uidvalidity(context, folder).await?;
197        ensure!(folder_exists, "No folder {folder}");
198
199        // we are connected, and the folder is selected
200        info!(context, "Downloading message {}/{} fully...", folder, uid);
201
202        let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
203        uid_message_ids.insert(uid, rfc724_mid);
204        let (sender, receiver) = async_channel::unbounded();
205        {
206            let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
207            self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, sender)
208                .await?;
209        }
210        if receiver.recv().await.is_err() {
211            bail!("Failed to fetch UID {uid}");
212        }
213        Ok(())
214    }
215}
216
217async fn set_state_to_failure(context: &Context, rfc724_mid: &str) -> Result<()> {
218    if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
219        // Update download state to failure
220        // so it can be retried.
221        //
222        // On success update_download_state() is not needed
223        // as receive_imf() already
224        // set the state and emitted the event.
225        msg_id
226            .update_download_state(context, DownloadState::Failure)
227            .await?;
228    }
229    Ok(())
230}
231
232async fn available_post_msgs_contains_rfc724_mid(
233    context: &Context,
234    rfc724_mid: &str,
235) -> Result<bool> {
236    Ok(context
237        .sql
238        .query_get_value::<String>(
239            "SELECT rfc724_mid FROM available_post_msgs WHERE rfc724_mid=?",
240            (&rfc724_mid,),
241        )
242        .await?
243        .is_some())
244}
245
246async fn delete_from_available_post_msgs(context: &Context, rfc724_mid: &str) -> Result<()> {
247    context
248        .sql
249        .execute(
250            "DELETE FROM available_post_msgs WHERE rfc724_mid=?",
251            (&rfc724_mid,),
252        )
253        .await?;
254    Ok(())
255}
256
257async fn delete_from_downloads(context: &Context, rfc724_mid: &str) -> Result<()> {
258    context
259        .sql
260        .execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
261        .await?;
262    Ok(())
263}
264
265pub(crate) async fn msg_is_downloaded_for(context: &Context, rfc724_mid: &str) -> Result<bool> {
266    Ok(message::rfc724_mid_exists(context, rfc724_mid)
267        .await?
268        .is_some())
269}
270
271pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
272    let rfc724_mids = context
273        .sql
274        .query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
275            let rfc724_mid: String = row.get(0)?;
276            Ok(rfc724_mid)
277        })
278        .await?;
279
280    for rfc724_mid in &rfc724_mids {
281        let res = download_msg(context, rfc724_mid.clone(), session).await;
282        if let Ok(Some(())) = res {
283            delete_from_downloads(context, rfc724_mid).await?;
284            delete_from_available_post_msgs(context, rfc724_mid).await?;
285        }
286        if let Err(err) = res {
287            warn!(
288                context,
289                "Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
290            );
291            if !msg_is_downloaded_for(context, rfc724_mid).await? {
292                // This is probably a classical email that vanished before we could download it
293                warn!(
294                    context,
295                    "{rfc724_mid} download failed and there is no downloaded pre-message."
296                );
297                delete_from_downloads(context, rfc724_mid).await?;
298            } else if available_post_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
299                warn!(
300                    context,
301                    "{rfc724_mid} is in available_post_msgs table but we failed to fetch it,
302                    so set the message to DownloadState::Failure - probably it was deleted on the server in the meantime"
303                );
304                set_state_to_failure(context, rfc724_mid).await?;
305                delete_from_downloads(context, rfc724_mid).await?;
306                delete_from_available_post_msgs(context, rfc724_mid).await?;
307            } else {
308                // leave the message in DownloadState::InProgress;
309                // it will be downloaded once it arrives.
310            }
311        }
312    }
313
314    Ok(())
315}
316
317/// Downloads known post-messages without pre-messages
318/// in order to guard against lost pre-messages.
319pub(crate) async fn download_known_post_messages_without_pre_message(
320    context: &Context,
321    session: &mut Session,
322) -> Result<()> {
323    let rfc724_mids = context
324        .sql
325        .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
326            let rfc724_mid: String = row.get(0)?;
327            Ok(rfc724_mid)
328        })
329        .await?;
330    for rfc724_mid in &rfc724_mids {
331        if msg_is_downloaded_for(context, rfc724_mid).await? {
332            delete_from_available_post_msgs(context, rfc724_mid).await?;
333            continue;
334        }
335
336        // Download the Post-Message unconditionally,
337        // because the Pre-Message got lost.
338        // The message may be in the wrong order,
339        // but at least we have it at all.
340        let res = download_msg(context, rfc724_mid.clone(), session).await;
341        if let Ok(Some(())) = res {
342            delete_from_available_post_msgs(context, rfc724_mid).await?;
343        }
344        if let Err(err) = res {
345            warn!(
346                context,
347                "download_known_post_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
348                err
349            );
350        }
351    }
352    Ok(())
353}
354
355#[cfg(test)]
356mod tests {
357    use num_traits::FromPrimitive;
358
359    use super::*;
360    use crate::chat::send_msg;
361    use crate::test_utils::TestContext;
362
363    #[test]
364    fn test_downloadstate_values() {
365        // values may be written to disk and must not change
366        assert_eq!(DownloadState::Done, DownloadState::default());
367        assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
368        assert_eq!(
369            DownloadState::Available,
370            DownloadState::from_i32(10).unwrap()
371        );
372        assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
373        assert_eq!(
374            DownloadState::InProgress,
375            DownloadState::from_i32(1000).unwrap()
376        );
377    }
378
379    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
380    async fn test_update_download_state() -> Result<()> {
381        let t = TestContext::new_alice().await;
382        let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
383
384        let mut msg = Message::new_text("Hi Bob".to_owned());
385        let msg_id = send_msg(&t, chat.id, &mut msg).await?;
386        let msg = Message::load_from_db(&t, msg_id).await?;
387        assert_eq!(msg.download_state(), DownloadState::Done);
388
389        for s in &[
390            DownloadState::Available,
391            DownloadState::InProgress,
392            DownloadState::Failure,
393            DownloadState::Done,
394            DownloadState::Done,
395        ] {
396            msg_id.update_download_state(&t, *s).await?;
397            let msg = Message::load_from_db(&t, msg_id).await?;
398            assert_eq!(msg.download_state(), *s);
399        }
400        t.sql
401            .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
402            .await?;
403        // Nothing to do is ok.
404        msg_id
405            .update_download_state(&t, DownloadState::Done)
406            .await?;
407
408        Ok(())
409    }
410}