1use std::collections::BTreeMap;
4
5use anyhow::{Result, anyhow, bail, ensure};
6use deltachat_derive::{FromSql, ToSql};
7use serde::{Deserialize, Serialize};
8
9use crate::config::Config;
10use crate::context::Context;
11use crate::imap::session::Session;
12use crate::log::warn;
13use crate::message::{self, Message, MsgId, rfc724_mid_exists};
14use crate::{EventType, chatlist_events, ephemeral};
15
16pub(crate) mod post_msg_metadata;
17pub(crate) use post_msg_metadata::PostMsgMetadata;
18
19pub(crate) const PRE_MSG_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
25
26pub(crate) const PRE_MSG_SIZE_WARNING_THRESHOLD: usize = 150_000;
28
29#[derive(
31 Debug,
32 Default,
33 Display,
34 Clone,
35 Copy,
36 PartialEq,
37 Eq,
38 FromPrimitive,
39 ToPrimitive,
40 FromSql,
41 ToSql,
42 Serialize,
43 Deserialize,
44)]
45#[repr(u32)]
46pub enum DownloadState {
47 #[default]
49 Done = 0,
50
51 Available = 10,
53
54 Failure = 20,
56
57 Undecipherable = 30,
59
60 InProgress = 1000,
62}
63
64impl MsgId {
65 pub async fn download_full(self, context: &Context) -> Result<()> {
67 let msg = Message::load_from_db(context, self).await?;
68 match msg.download_state() {
69 DownloadState::Done | DownloadState::Undecipherable => {
70 return Err(anyhow!("Nothing to download."));
71 }
72 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
73 DownloadState::Available | DownloadState::Failure => {
74 if msg.rfc724_mid().is_empty() {
75 return Err(anyhow!("Download not possible, message has no rfc724_mid"));
76 }
77 self.update_download_state(context, DownloadState::InProgress)
78 .await?;
79 info!(
80 context,
81 "Requesting full download of {:?}.",
82 msg.rfc724_mid()
83 );
84 context
85 .sql
86 .execute(
87 "INSERT INTO download (rfc724_mid, msg_id) VALUES (?,?)",
88 (msg.rfc724_mid(), msg.id),
89 )
90 .await?;
91 context.scheduler.interrupt_inbox().await;
92 }
93 }
94 Ok(())
95 }
96
97 pub(crate) async fn update_download_state(
100 self,
101 context: &Context,
102 download_state: DownloadState,
103 ) -> Result<()> {
104 if context
105 .sql
106 .execute(
107 "UPDATE msgs SET download_state=? WHERE id=? AND download_state<>?1",
108 (download_state, self),
109 )
110 .await?
111 == 0
112 {
113 return Ok(());
114 }
115 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
116 return Ok(());
117 };
118 context.emit_event(EventType::MsgsChanged {
119 chat_id: msg.chat_id,
120 msg_id: self,
121 });
122 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
123 Ok(())
124 }
125}
126
127impl Message {
128 pub fn download_state(&self) -> DownloadState {
130 self.download_state
131 }
132}
133
134pub(crate) async fn download_msg(
140 context: &Context,
141 rfc724_mid: String,
142 session: &mut Session,
143) -> Result<Option<()>> {
144 let transport_id = session.transport_id();
145 let row = context
146 .sql
147 .query_row_optional(
148 "SELECT uid, folder, transport_id FROM imap
149 WHERE rfc724_mid=? AND target!=''
150 ORDER BY transport_id=? DESC LIMIT 1",
151 (&rfc724_mid, transport_id),
152 |row| {
153 let server_uid: u32 = row.get(0)?;
154 let server_folder: String = row.get(1)?;
155 let msg_transport_id: u32 = row.get(2)?;
156 Ok((server_uid, server_folder, msg_transport_id))
157 },
158 )
159 .await?;
160
161 let Some((server_uid, server_folder, msg_transport_id)) = row else {
162 delete_from_available_post_msgs(context, &rfc724_mid).await?;
164 return Err(anyhow!(
165 "IMAP location for {rfc724_mid:?} post-message is unknown"
166 ));
167 };
168 if msg_transport_id != transport_id {
169 return Ok(None);
170 }
171 Box::pin(session.fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)).await?;
172
173 let bcc_self = context.get_config_bool(Config::BccSelf).await?;
174 if ephemeral::should_delete_all_downloaded_messages(bcc_self, session.is_chatmail()) {
175 context.scheduler.interrupt_inbox().await;
180 }
181
182 Ok(Some(()))
183}
184
185impl Session {
186 async fn fetch_single_msg(
191 &mut self,
192 context: &Context,
193 folder: &str,
194 uid: u32,
195 rfc724_mid: String,
196 ) -> Result<()> {
197 if uid == 0 {
198 bail!("Attempt to fetch UID 0");
199 }
200
201 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
202 ensure!(folder_exists, "No folder {folder}");
203
204 info!(context, "Downloading message {}/{} fully...", folder, uid);
206
207 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
208 uid_message_ids.insert(uid, rfc724_mid);
209 let (sender, receiver) = async_channel::unbounded();
210 {
211 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
212 self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, sender)
213 .await?;
214 }
215 if receiver.recv().await.is_err() {
216 bail!("Failed to fetch UID {uid}");
217 }
218 Ok(())
219 }
220}
221
222async fn set_state_to_failure(context: &Context, rfc724_mid: &str) -> Result<()> {
223 if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
224 msg_id
231 .update_download_state(context, DownloadState::Failure)
232 .await?;
233 }
234 Ok(())
235}
236
237async fn available_post_msgs_contains_rfc724_mid(
238 context: &Context,
239 rfc724_mid: &str,
240) -> Result<bool> {
241 Ok(context
242 .sql
243 .query_get_value::<String>(
244 "SELECT rfc724_mid FROM available_post_msgs WHERE rfc724_mid=?",
245 (&rfc724_mid,),
246 )
247 .await?
248 .is_some())
249}
250
251async fn delete_from_available_post_msgs(context: &Context, rfc724_mid: &str) -> Result<()> {
252 context
253 .sql
254 .execute(
255 "DELETE FROM available_post_msgs WHERE rfc724_mid=?",
256 (&rfc724_mid,),
257 )
258 .await?;
259 Ok(())
260}
261
262async fn delete_from_downloads(context: &Context, rfc724_mid: &str) -> Result<()> {
263 context
264 .sql
265 .execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
266 .await?;
267 Ok(())
268}
269
270pub(crate) async fn msg_is_downloaded_for(context: &Context, rfc724_mid: &str) -> Result<bool> {
271 Ok(message::rfc724_mid_exists(context, rfc724_mid)
272 .await?
273 .is_some())
274}
275
276pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
277 let rfc724_mids = context
278 .sql
279 .query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
280 let rfc724_mid: String = row.get(0)?;
281 Ok(rfc724_mid)
282 })
283 .await?;
284
285 for rfc724_mid in &rfc724_mids {
286 let res = download_msg(context, rfc724_mid.clone(), session).await;
287 if let Ok(Some(())) = res {
288 delete_from_downloads(context, rfc724_mid).await?;
289 delete_from_available_post_msgs(context, rfc724_mid).await?;
290 }
291 if let Err(err) = res {
292 warn!(
293 context,
294 "Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
295 );
296 if !msg_is_downloaded_for(context, rfc724_mid).await? {
297 warn!(
299 context,
300 "{rfc724_mid} download failed and there is no downloaded pre-message."
301 );
302 delete_from_downloads(context, rfc724_mid).await?;
303 } else if available_post_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
304 warn!(
305 context,
306 "{rfc724_mid} is in available_post_msgs table but we failed to fetch it,
307 so set the message to DownloadState::Failure - probably it was deleted on the server in the meantime"
308 );
309 set_state_to_failure(context, rfc724_mid).await?;
310 delete_from_downloads(context, rfc724_mid).await?;
311 delete_from_available_post_msgs(context, rfc724_mid).await?;
312 } else {
313 }
316 }
317 }
318
319 Ok(())
320}
321
322pub(crate) async fn download_known_post_messages_without_pre_message(
325 context: &Context,
326 session: &mut Session,
327) -> Result<()> {
328 let rfc724_mids = context
329 .sql
330 .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
331 let rfc724_mid: String = row.get(0)?;
332 Ok(rfc724_mid)
333 })
334 .await?;
335 for rfc724_mid in &rfc724_mids {
336 if msg_is_downloaded_for(context, rfc724_mid).await? {
337 delete_from_available_post_msgs(context, rfc724_mid).await?;
338 continue;
339 }
340
341 let res = download_msg(context, rfc724_mid.clone(), session).await;
346 if let Ok(Some(())) = res {
347 delete_from_available_post_msgs(context, rfc724_mid).await?;
348 }
349 if let Err(err) = res {
350 warn!(
351 context,
352 "download_known_post_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
353 err
354 );
355 }
356 }
357 Ok(())
358}
359
360#[cfg(test)]
361mod tests {
362 use num_traits::FromPrimitive;
363
364 use super::*;
365 use crate::chat::send_msg;
366 use crate::test_utils::TestContextManager;
367
368 #[test]
369 fn test_downloadstate_values() {
370 assert_eq!(DownloadState::Done, DownloadState::default());
372 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
373 assert_eq!(
374 DownloadState::Available,
375 DownloadState::from_i32(10).unwrap()
376 );
377 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
378 assert_eq!(
379 DownloadState::InProgress,
380 DownloadState::from_i32(1000).unwrap()
381 );
382 }
383
384 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
385 async fn test_update_download_state() -> Result<()> {
386 let mut tcm = TestContextManager::new();
387 let t = &tcm.alice().await;
388 let bob = &tcm.bob().await;
389 let chat_id = t.create_chat_id(bob).await;
390
391 let mut msg = Message::new_text("Hi Bob".to_owned());
392 let msg_id = send_msg(t, chat_id, &mut msg).await?;
393 let msg = Message::load_from_db(t, msg_id).await?;
394 assert_eq!(msg.download_state(), DownloadState::Done);
395
396 for s in &[
397 DownloadState::Available,
398 DownloadState::InProgress,
399 DownloadState::Failure,
400 DownloadState::Done,
401 DownloadState::Done,
402 ] {
403 msg_id.update_download_state(t, *s).await?;
404 let msg = Message::load_from_db(t, msg_id).await?;
405 assert_eq!(msg.download_state(), *s);
406 }
407 t.sql
408 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
409 .await?;
410 msg_id.update_download_state(t, DownloadState::Done).await?;
412
413 Ok(())
414 }
415}