1use std::collections::BTreeMap;
4
5use anyhow::{Result, anyhow, bail, ensure};
6use deltachat_derive::{FromSql, ToSql};
7use serde::{Deserialize, Serialize};
8
9use crate::context::Context;
10use crate::imap::session::Session;
11use crate::log::warn;
12use crate::message::{self, Message, MsgId, rfc724_mid_exists};
13use crate::{EventType, chatlist_events, ephemeral};
14
15pub(crate) mod post_msg_metadata;
16pub(crate) use post_msg_metadata::PostMsgMetadata;
17
18pub(crate) const PRE_MSG_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
24
25pub(crate) const PRE_MSG_SIZE_WARNING_THRESHOLD: usize = 150_000;
27
28#[derive(
30 Debug,
31 Default,
32 Display,
33 Clone,
34 Copy,
35 PartialEq,
36 Eq,
37 FromPrimitive,
38 ToPrimitive,
39 FromSql,
40 ToSql,
41 Serialize,
42 Deserialize,
43)]
44#[repr(u32)]
45pub enum DownloadState {
46 #[default]
48 Done = 0,
49
50 Available = 10,
52
53 Failure = 20,
55
56 Undecipherable = 30,
58
59 InProgress = 1000,
61}
62
63impl MsgId {
64 pub async fn download_full(self, context: &Context) -> Result<()> {
66 let msg = Message::load_from_db(context, self).await?;
67 match msg.download_state() {
68 DownloadState::Done | DownloadState::Undecipherable => {
69 return Err(anyhow!("Nothing to download."));
70 }
71 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
72 DownloadState::Available | DownloadState::Failure => {
73 if msg.rfc724_mid().is_empty() {
74 return Err(anyhow!("Download not possible, message has no rfc724_mid"));
75 }
76 self.update_download_state(context, DownloadState::InProgress)
77 .await?;
78 info!(
79 context,
80 "Requesting full download of {:?}.",
81 msg.rfc724_mid()
82 );
83 context
84 .sql
85 .execute(
86 "INSERT INTO download (rfc724_mid, msg_id) VALUES (?,?)",
87 (msg.rfc724_mid(), msg.id),
88 )
89 .await?;
90 context.scheduler.interrupt_inbox().await;
91 }
92 }
93 Ok(())
94 }
95
96 pub(crate) async fn update_download_state(
99 self,
100 context: &Context,
101 download_state: DownloadState,
102 ) -> Result<()> {
103 if context
104 .sql
105 .execute(
106 "UPDATE msgs SET download_state=? WHERE id=? AND download_state<>?1",
107 (download_state, self),
108 )
109 .await?
110 == 0
111 {
112 return Ok(());
113 }
114 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
115 return Ok(());
116 };
117 context.emit_event(EventType::MsgsChanged {
118 chat_id: msg.chat_id,
119 msg_id: self,
120 });
121 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
122 Ok(())
123 }
124}
125
126impl Message {
127 pub fn download_state(&self) -> DownloadState {
129 self.download_state
130 }
131}
132
133pub(crate) async fn download_msg(
139 context: &Context,
140 rfc724_mid: String,
141 session: &mut Session,
142) -> Result<Option<()>> {
143 let transport_id = session.transport_id();
144 let row = context
145 .sql
146 .query_row_optional(
147 "SELECT uid, folder, transport_id FROM imap
148 WHERE rfc724_mid=? AND target!=''
149 ORDER BY transport_id=? DESC LIMIT 1",
150 (&rfc724_mid, transport_id),
151 |row| {
152 let server_uid: u32 = row.get(0)?;
153 let server_folder: String = row.get(1)?;
154 let msg_transport_id: u32 = row.get(2)?;
155 Ok((server_uid, server_folder, msg_transport_id))
156 },
157 )
158 .await?;
159
160 let Some((server_uid, server_folder, msg_transport_id)) = row else {
161 delete_from_available_post_msgs(context, &rfc724_mid).await?;
163 return Err(anyhow!(
164 "IMAP location for {rfc724_mid:?} post-message is unknown"
165 ));
166 };
167 if msg_transport_id != transport_id {
168 return Ok(None);
169 }
170 Box::pin(session.fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)).await?;
171
172 if ephemeral::should_delete_all_downloaded_messages(context, session.is_chatmail()).await? {
173 context.scheduler.interrupt_inbox().await;
178 }
179
180 Ok(Some(()))
181}
182
183impl Session {
184 async fn fetch_single_msg(
189 &mut self,
190 context: &Context,
191 folder: &str,
192 uid: u32,
193 rfc724_mid: String,
194 ) -> Result<()> {
195 if uid == 0 {
196 bail!("Attempt to fetch UID 0");
197 }
198
199 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
200 ensure!(folder_exists, "No folder {folder}");
201
202 info!(context, "Downloading message {}/{} fully...", folder, uid);
204
205 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
206 uid_message_ids.insert(uid, rfc724_mid);
207 let (sender, receiver) = async_channel::unbounded();
208 {
209 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
210 self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, sender)
211 .await?;
212 }
213 if receiver.recv().await.is_err() {
214 bail!("Failed to fetch UID {uid}");
215 }
216 Ok(())
217 }
218}
219
220async fn set_state_to_failure(context: &Context, rfc724_mid: &str) -> Result<()> {
221 if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
222 msg_id
229 .update_download_state(context, DownloadState::Failure)
230 .await?;
231 }
232 Ok(())
233}
234
235async fn available_post_msgs_contains_rfc724_mid(
236 context: &Context,
237 rfc724_mid: &str,
238) -> Result<bool> {
239 Ok(context
240 .sql
241 .query_get_value::<String>(
242 "SELECT rfc724_mid FROM available_post_msgs WHERE rfc724_mid=?",
243 (&rfc724_mid,),
244 )
245 .await?
246 .is_some())
247}
248
249async fn delete_from_available_post_msgs(context: &Context, rfc724_mid: &str) -> Result<()> {
250 context
251 .sql
252 .execute(
253 "DELETE FROM available_post_msgs WHERE rfc724_mid=?",
254 (&rfc724_mid,),
255 )
256 .await?;
257 Ok(())
258}
259
260async fn delete_from_downloads(context: &Context, rfc724_mid: &str) -> Result<()> {
261 context
262 .sql
263 .execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
264 .await?;
265 Ok(())
266}
267
268pub(crate) async fn msg_is_downloaded_for(context: &Context, rfc724_mid: &str) -> Result<bool> {
269 Ok(message::rfc724_mid_exists(context, rfc724_mid)
270 .await?
271 .is_some())
272}
273
274pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
275 let rfc724_mids = context
276 .sql
277 .query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
278 let rfc724_mid: String = row.get(0)?;
279 Ok(rfc724_mid)
280 })
281 .await?;
282
283 for rfc724_mid in &rfc724_mids {
284 let res = download_msg(context, rfc724_mid.clone(), session).await;
285 if let Ok(Some(())) = res {
286 delete_from_downloads(context, rfc724_mid).await?;
287 delete_from_available_post_msgs(context, rfc724_mid).await?;
288 }
289 if let Err(err) = res {
290 warn!(
291 context,
292 "Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
293 );
294 if !msg_is_downloaded_for(context, rfc724_mid).await? {
295 warn!(
297 context,
298 "{rfc724_mid} download failed and there is no downloaded pre-message."
299 );
300 delete_from_downloads(context, rfc724_mid).await?;
301 } else if available_post_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
302 warn!(
303 context,
304 "{rfc724_mid} is in available_post_msgs table but we failed to fetch it,
305 so set the message to DownloadState::Failure - probably it was deleted on the server in the meantime"
306 );
307 set_state_to_failure(context, rfc724_mid).await?;
308 delete_from_downloads(context, rfc724_mid).await?;
309 delete_from_available_post_msgs(context, rfc724_mid).await?;
310 } else {
311 }
314 }
315 }
316
317 Ok(())
318}
319
320pub(crate) async fn download_known_post_messages_without_pre_message(
323 context: &Context,
324 session: &mut Session,
325) -> Result<()> {
326 let rfc724_mids = context
327 .sql
328 .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
329 let rfc724_mid: String = row.get(0)?;
330 Ok(rfc724_mid)
331 })
332 .await?;
333 for rfc724_mid in &rfc724_mids {
334 if msg_is_downloaded_for(context, rfc724_mid).await? {
335 delete_from_available_post_msgs(context, rfc724_mid).await?;
336 continue;
337 }
338
339 let res = download_msg(context, rfc724_mid.clone(), session).await;
344 if let Ok(Some(())) = res {
345 delete_from_available_post_msgs(context, rfc724_mid).await?;
346 }
347 if let Err(err) = res {
348 warn!(
349 context,
350 "download_known_post_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
351 err
352 );
353 }
354 }
355 Ok(())
356}
357
358#[cfg(test)]
359mod tests {
360 use num_traits::FromPrimitive;
361
362 use super::*;
363 use crate::chat::send_msg;
364 use crate::test_utils::TestContextManager;
365
366 #[test]
367 fn test_downloadstate_values() {
368 assert_eq!(DownloadState::Done, DownloadState::default());
370 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
371 assert_eq!(
372 DownloadState::Available,
373 DownloadState::from_i32(10).unwrap()
374 );
375 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
376 assert_eq!(
377 DownloadState::InProgress,
378 DownloadState::from_i32(1000).unwrap()
379 );
380 }
381
382 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
383 async fn test_update_download_state() -> Result<()> {
384 let mut tcm = TestContextManager::new();
385 let t = &tcm.alice().await;
386 let bob = &tcm.bob().await;
387 let chat_id = t.create_chat_id(bob).await;
388
389 let mut msg = Message::new_text("Hi Bob".to_owned());
390 let msg_id = send_msg(t, chat_id, &mut msg).await?;
391 let msg = Message::load_from_db(t, msg_id).await?;
392 assert_eq!(msg.download_state(), DownloadState::Done);
393
394 for s in &[
395 DownloadState::Available,
396 DownloadState::InProgress,
397 DownloadState::Failure,
398 DownloadState::Done,
399 DownloadState::Done,
400 ] {
401 msg_id.update_download_state(t, *s).await?;
402 let msg = Message::load_from_db(t, msg_id).await?;
403 assert_eq!(msg.download_state(), *s);
404 }
405 t.sql
406 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
407 .await?;
408 msg_id.update_download_state(t, DownloadState::Done).await?;
410
411 Ok(())
412 }
413}