1use std::collections::BTreeMap;
4
5use anyhow::{Result, anyhow, bail, ensure};
6use deltachat_derive::{FromSql, ToSql};
7use serde::{Deserialize, Serialize};
8
9use crate::context::Context;
10use crate::imap::session::Session;
11use crate::log::warn;
12use crate::message::{self, Message, MsgId, rfc724_mid_exists};
13use crate::{EventType, chatlist_events};
14
15pub(crate) mod post_msg_metadata;
16pub(crate) use post_msg_metadata::PostMsgMetadata;
17
18pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
23
24pub(crate) const PRE_MSG_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
30
31pub(crate) const PRE_MSG_SIZE_WARNING_THRESHOLD: usize = 150_000;
33
34#[derive(
36 Debug,
37 Default,
38 Display,
39 Clone,
40 Copy,
41 PartialEq,
42 Eq,
43 FromPrimitive,
44 ToPrimitive,
45 FromSql,
46 ToSql,
47 Serialize,
48 Deserialize,
49)]
50#[repr(u32)]
51pub enum DownloadState {
52 #[default]
54 Done = 0,
55
56 Available = 10,
58
59 Failure = 20,
61
62 Undecipherable = 30,
64
65 InProgress = 1000,
67}
68
69impl MsgId {
70 pub async fn download_full(self, context: &Context) -> Result<()> {
72 let msg = Message::load_from_db(context, self).await?;
73 match msg.download_state() {
74 DownloadState::Done | DownloadState::Undecipherable => {
75 return Err(anyhow!("Nothing to download."));
76 }
77 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
78 DownloadState::Available | DownloadState::Failure => {
79 if msg.rfc724_mid().is_empty() {
80 return Err(anyhow!("Download not possible, message has no rfc724_mid"));
81 }
82 self.update_download_state(context, DownloadState::InProgress)
83 .await?;
84 info!(
85 context,
86 "Requesting full download of {:?}.",
87 msg.rfc724_mid()
88 );
89 context
90 .sql
91 .execute(
92 "INSERT INTO download (rfc724_mid, msg_id) VALUES (?,?)",
93 (msg.rfc724_mid(), msg.id),
94 )
95 .await?;
96 context.scheduler.interrupt_inbox().await;
97 }
98 }
99 Ok(())
100 }
101
102 pub(crate) async fn update_download_state(
104 self,
105 context: &Context,
106 download_state: DownloadState,
107 ) -> Result<()> {
108 if context
109 .sql
110 .execute(
111 "UPDATE msgs SET download_state=? WHERE id=?;",
112 (download_state, self),
113 )
114 .await?
115 == 0
116 {
117 return Ok(());
118 }
119 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
120 return Ok(());
121 };
122 context.emit_event(EventType::MsgsChanged {
123 chat_id: msg.chat_id,
124 msg_id: self,
125 });
126 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
127 Ok(())
128 }
129}
130
131impl Message {
132 pub fn download_state(&self) -> DownloadState {
134 self.download_state
135 }
136}
137
138pub(crate) async fn download_msg(
142 context: &Context,
143 rfc724_mid: String,
144 session: &mut Session,
145) -> Result<()> {
146 let transport_id = session.transport_id();
147 let row = context
148 .sql
149 .query_row_optional(
150 "SELECT uid, folder FROM imap
151 WHERE rfc724_mid=?
152 AND transport_id=?
153 AND target!=''",
154 (&rfc724_mid, transport_id),
155 |row| {
156 let server_uid: u32 = row.get(0)?;
157 let server_folder: String = row.get(1)?;
158 Ok((server_uid, server_folder))
159 },
160 )
161 .await?;
162
163 let Some((server_uid, server_folder)) = row else {
164 return Err(anyhow!(
166 "IMAP location for {rfc724_mid:?} post-message is unknown"
167 ));
168 };
169
170 session
171 .fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)
172 .await?;
173 Ok(())
174}
175
176impl Session {
177 async fn fetch_single_msg(
182 &mut self,
183 context: &Context,
184 folder: &str,
185 uid: u32,
186 rfc724_mid: String,
187 ) -> Result<()> {
188 if uid == 0 {
189 bail!("Attempt to fetch UID 0");
190 }
191
192 let create = false;
193 let folder_exists = self
194 .select_with_uidvalidity(context, folder, create)
195 .await?;
196 ensure!(folder_exists, "No folder {folder}");
197
198 info!(context, "Downloading message {}/{} fully...", folder, uid);
200
201 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
202 uid_message_ids.insert(uid, rfc724_mid);
203 let (sender, receiver) = async_channel::unbounded();
204 self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, sender)
205 .await?;
206 if receiver.recv().await.is_err() {
207 bail!("Failed to fetch UID {uid}");
208 }
209 Ok(())
210 }
211}
212
213async fn set_state_to_failure(context: &Context, rfc724_mid: &str) -> Result<()> {
214 if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
215 msg_id
222 .update_download_state(context, DownloadState::Failure)
223 .await?;
224 }
225 Ok(())
226}
227
228async fn available_post_msgs_contains_rfc724_mid(
229 context: &Context,
230 rfc724_mid: &str,
231) -> Result<bool> {
232 Ok(context
233 .sql
234 .query_get_value::<String>(
235 "SELECT rfc724_mid FROM available_post_msgs WHERE rfc724_mid=?",
236 (&rfc724_mid,),
237 )
238 .await?
239 .is_some())
240}
241
242async fn delete_from_available_post_msgs(context: &Context, rfc724_mid: &str) -> Result<()> {
243 context
244 .sql
245 .execute(
246 "DELETE FROM available_post_msgs WHERE rfc724_mid=?",
247 (&rfc724_mid,),
248 )
249 .await?;
250 Ok(())
251}
252
253async fn delete_from_downloads(context: &Context, rfc724_mid: &str) -> Result<()> {
254 context
255 .sql
256 .execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
257 .await?;
258 Ok(())
259}
260
261pub(crate) async fn msg_is_downloaded_for(context: &Context, rfc724_mid: &str) -> Result<bool> {
262 Ok(message::rfc724_mid_exists(context, rfc724_mid)
263 .await?
264 .is_some())
265}
266
267pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
268 let rfc724_mids = context
269 .sql
270 .query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
271 let rfc724_mid: String = row.get(0)?;
272 Ok(rfc724_mid)
273 })
274 .await?;
275
276 for rfc724_mid in &rfc724_mids {
277 let res = download_msg(context, rfc724_mid.clone(), session).await;
278 if res.is_ok() {
279 delete_from_downloads(context, rfc724_mid).await?;
280 delete_from_available_post_msgs(context, rfc724_mid).await?;
281 }
282 if let Err(err) = res {
283 warn!(
284 context,
285 "Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
286 );
287 if !msg_is_downloaded_for(context, rfc724_mid).await? {
288 warn!(
290 context,
291 "{rfc724_mid} download failed and there is no downloaded pre-message."
292 );
293 delete_from_downloads(context, rfc724_mid).await?;
294 } else if available_post_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
295 warn!(
296 context,
297 "{rfc724_mid} is in available_post_msgs table but we failed to fetch it,
298 so set the message to DownloadState::Failure - probably it was deleted on the server in the meantime"
299 );
300 set_state_to_failure(context, rfc724_mid).await?;
301 delete_from_downloads(context, rfc724_mid).await?;
302 delete_from_available_post_msgs(context, rfc724_mid).await?;
303 } else {
304 }
307 }
308 }
309
310 Ok(())
311}
312
313pub(crate) async fn download_known_post_messages_without_pre_message(
316 context: &Context,
317 session: &mut Session,
318) -> Result<()> {
319 let rfc724_mids = context
320 .sql
321 .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
322 let rfc724_mid: String = row.get(0)?;
323 Ok(rfc724_mid)
324 })
325 .await?;
326 for rfc724_mid in &rfc724_mids {
327 if !msg_is_downloaded_for(context, rfc724_mid).await? {
328 let res = download_msg(context, rfc724_mid.clone(), session).await;
333 if res.is_ok() {
334 delete_from_available_post_msgs(context, rfc724_mid).await?;
335 }
336 if let Err(err) = res {
337 warn!(
338 context,
339 "download_known_post_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
340 err
341 );
342 }
343 }
344 }
345 Ok(())
346}
347
348#[cfg(test)]
349mod tests {
350 use num_traits::FromPrimitive;
351
352 use super::*;
353 use crate::chat::send_msg;
354 use crate::test_utils::TestContext;
355
356 #[test]
357 fn test_downloadstate_values() {
358 assert_eq!(DownloadState::Done, DownloadState::default());
360 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
361 assert_eq!(
362 DownloadState::Available,
363 DownloadState::from_i32(10).unwrap()
364 );
365 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
366 assert_eq!(
367 DownloadState::InProgress,
368 DownloadState::from_i32(1000).unwrap()
369 );
370 }
371
372 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
373 async fn test_update_download_state() -> Result<()> {
374 let t = TestContext::new_alice().await;
375 let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
376
377 let mut msg = Message::new_text("Hi Bob".to_owned());
378 let msg_id = send_msg(&t, chat.id, &mut msg).await?;
379 let msg = Message::load_from_db(&t, msg_id).await?;
380 assert_eq!(msg.download_state(), DownloadState::Done);
381
382 for s in &[
383 DownloadState::Available,
384 DownloadState::InProgress,
385 DownloadState::Failure,
386 DownloadState::Done,
387 DownloadState::Done,
388 ] {
389 msg_id.update_download_state(&t, *s).await?;
390 let msg = Message::load_from_db(&t, msg_id).await?;
391 assert_eq!(msg.download_state(), *s);
392 }
393 t.sql
394 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
395 .await?;
396 msg_id
398 .update_download_state(&t, DownloadState::Done)
399 .await?;
400
401 Ok(())
402 }
403}