1use std::collections::BTreeMap;
4
5use anyhow::{Result, anyhow, bail, ensure};
6use deltachat_derive::{FromSql, ToSql};
7use serde::{Deserialize, Serialize};
8
9use crate::context::Context;
10use crate::imap::session::Session;
11use crate::log::warn;
12use crate::message::{self, Message, MsgId, rfc724_mid_exists};
13use crate::{EventType, chatlist_events};
14
15pub(crate) mod post_msg_metadata;
16pub(crate) use post_msg_metadata::PostMsgMetadata;
17
18pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
23
24pub(crate) const PRE_MSG_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
30
31pub(crate) const PRE_MSG_SIZE_WARNING_THRESHOLD: usize = 150_000;
33
34#[derive(
36 Debug,
37 Default,
38 Display,
39 Clone,
40 Copy,
41 PartialEq,
42 Eq,
43 FromPrimitive,
44 ToPrimitive,
45 FromSql,
46 ToSql,
47 Serialize,
48 Deserialize,
49)]
50#[repr(u32)]
51pub enum DownloadState {
52 #[default]
54 Done = 0,
55
56 Available = 10,
58
59 Failure = 20,
61
62 Undecipherable = 30,
64
65 InProgress = 1000,
67}
68
69impl MsgId {
70 pub async fn download_full(self, context: &Context) -> Result<()> {
72 let msg = Message::load_from_db(context, self).await?;
73 match msg.download_state() {
74 DownloadState::Done | DownloadState::Undecipherable => {
75 return Err(anyhow!("Nothing to download."));
76 }
77 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
78 DownloadState::Available | DownloadState::Failure => {
79 if msg.rfc724_mid().is_empty() {
80 return Err(anyhow!("Download not possible, message has no rfc724_mid"));
81 }
82 self.update_download_state(context, DownloadState::InProgress)
83 .await?;
84 info!(
85 context,
86 "Requesting full download of {:?}.",
87 msg.rfc724_mid()
88 );
89 context
90 .sql
91 .execute(
92 "INSERT INTO download (rfc724_mid, msg_id) VALUES (?,?)",
93 (msg.rfc724_mid(), msg.id),
94 )
95 .await?;
96 context.scheduler.interrupt_inbox().await;
97 }
98 }
99 Ok(())
100 }
101
102 pub(crate) async fn update_download_state(
105 self,
106 context: &Context,
107 download_state: DownloadState,
108 ) -> Result<()> {
109 if context
110 .sql
111 .execute(
112 "UPDATE msgs SET download_state=? WHERE id=? AND download_state<>?1",
113 (download_state, self),
114 )
115 .await?
116 == 0
117 {
118 return Ok(());
119 }
120 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
121 return Ok(());
122 };
123 context.emit_event(EventType::MsgsChanged {
124 chat_id: msg.chat_id,
125 msg_id: self,
126 });
127 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
128 Ok(())
129 }
130}
131
132impl Message {
133 pub fn download_state(&self) -> DownloadState {
135 self.download_state
136 }
137}
138
139pub(crate) async fn download_msg(
145 context: &Context,
146 rfc724_mid: String,
147 session: &mut Session,
148) -> Result<Option<()>> {
149 let transport_id = session.transport_id();
150 let row = context
151 .sql
152 .query_row_optional(
153 "SELECT uid, folder, transport_id FROM imap
154 WHERE rfc724_mid=? AND target!=''
155 ORDER BY transport_id=? DESC LIMIT 1",
156 (&rfc724_mid, transport_id),
157 |row| {
158 let server_uid: u32 = row.get(0)?;
159 let server_folder: String = row.get(1)?;
160 let msg_transport_id: u32 = row.get(2)?;
161 Ok((server_uid, server_folder, msg_transport_id))
162 },
163 )
164 .await?;
165
166 let Some((server_uid, server_folder, msg_transport_id)) = row else {
167 delete_from_available_post_msgs(context, &rfc724_mid).await?;
169 return Err(anyhow!(
170 "IMAP location for {rfc724_mid:?} post-message is unknown"
171 ));
172 };
173 if msg_transport_id != transport_id {
174 return Ok(None);
175 }
176 Box::pin(session.fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)).await?;
177 Ok(Some(()))
178}
179
180impl Session {
181 async fn fetch_single_msg(
186 &mut self,
187 context: &Context,
188 folder: &str,
189 uid: u32,
190 rfc724_mid: String,
191 ) -> Result<()> {
192 if uid == 0 {
193 bail!("Attempt to fetch UID 0");
194 }
195
196 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
197 ensure!(folder_exists, "No folder {folder}");
198
199 info!(context, "Downloading message {}/{} fully...", folder, uid);
201
202 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
203 uid_message_ids.insert(uid, rfc724_mid);
204 let (sender, receiver) = async_channel::unbounded();
205 {
206 let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
207 self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, sender)
208 .await?;
209 }
210 if receiver.recv().await.is_err() {
211 bail!("Failed to fetch UID {uid}");
212 }
213 Ok(())
214 }
215}
216
217async fn set_state_to_failure(context: &Context, rfc724_mid: &str) -> Result<()> {
218 if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
219 msg_id
226 .update_download_state(context, DownloadState::Failure)
227 .await?;
228 }
229 Ok(())
230}
231
232async fn available_post_msgs_contains_rfc724_mid(
233 context: &Context,
234 rfc724_mid: &str,
235) -> Result<bool> {
236 Ok(context
237 .sql
238 .query_get_value::<String>(
239 "SELECT rfc724_mid FROM available_post_msgs WHERE rfc724_mid=?",
240 (&rfc724_mid,),
241 )
242 .await?
243 .is_some())
244}
245
246async fn delete_from_available_post_msgs(context: &Context, rfc724_mid: &str) -> Result<()> {
247 context
248 .sql
249 .execute(
250 "DELETE FROM available_post_msgs WHERE rfc724_mid=?",
251 (&rfc724_mid,),
252 )
253 .await?;
254 Ok(())
255}
256
257async fn delete_from_downloads(context: &Context, rfc724_mid: &str) -> Result<()> {
258 context
259 .sql
260 .execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
261 .await?;
262 Ok(())
263}
264
265pub(crate) async fn msg_is_downloaded_for(context: &Context, rfc724_mid: &str) -> Result<bool> {
266 Ok(message::rfc724_mid_exists(context, rfc724_mid)
267 .await?
268 .is_some())
269}
270
271pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
272 let rfc724_mids = context
273 .sql
274 .query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
275 let rfc724_mid: String = row.get(0)?;
276 Ok(rfc724_mid)
277 })
278 .await?;
279
280 for rfc724_mid in &rfc724_mids {
281 let res = download_msg(context, rfc724_mid.clone(), session).await;
282 if let Ok(Some(())) = res {
283 delete_from_downloads(context, rfc724_mid).await?;
284 delete_from_available_post_msgs(context, rfc724_mid).await?;
285 }
286 if let Err(err) = res {
287 warn!(
288 context,
289 "Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
290 );
291 if !msg_is_downloaded_for(context, rfc724_mid).await? {
292 warn!(
294 context,
295 "{rfc724_mid} download failed and there is no downloaded pre-message."
296 );
297 delete_from_downloads(context, rfc724_mid).await?;
298 } else if available_post_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
299 warn!(
300 context,
301 "{rfc724_mid} is in available_post_msgs table but we failed to fetch it,
302 so set the message to DownloadState::Failure - probably it was deleted on the server in the meantime"
303 );
304 set_state_to_failure(context, rfc724_mid).await?;
305 delete_from_downloads(context, rfc724_mid).await?;
306 delete_from_available_post_msgs(context, rfc724_mid).await?;
307 } else {
308 }
311 }
312 }
313
314 Ok(())
315}
316
317pub(crate) async fn download_known_post_messages_without_pre_message(
320 context: &Context,
321 session: &mut Session,
322) -> Result<()> {
323 let rfc724_mids = context
324 .sql
325 .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
326 let rfc724_mid: String = row.get(0)?;
327 Ok(rfc724_mid)
328 })
329 .await?;
330 for rfc724_mid in &rfc724_mids {
331 if msg_is_downloaded_for(context, rfc724_mid).await? {
332 delete_from_available_post_msgs(context, rfc724_mid).await?;
333 continue;
334 }
335
336 let res = download_msg(context, rfc724_mid.clone(), session).await;
341 if let Ok(Some(())) = res {
342 delete_from_available_post_msgs(context, rfc724_mid).await?;
343 }
344 if let Err(err) = res {
345 warn!(
346 context,
347 "download_known_post_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
348 err
349 );
350 }
351 }
352 Ok(())
353}
354
355#[cfg(test)]
356mod tests {
357 use num_traits::FromPrimitive;
358
359 use super::*;
360 use crate::chat::send_msg;
361 use crate::test_utils::TestContext;
362
363 #[test]
364 fn test_downloadstate_values() {
365 assert_eq!(DownloadState::Done, DownloadState::default());
367 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
368 assert_eq!(
369 DownloadState::Available,
370 DownloadState::from_i32(10).unwrap()
371 );
372 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
373 assert_eq!(
374 DownloadState::InProgress,
375 DownloadState::from_i32(1000).unwrap()
376 );
377 }
378
379 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
380 async fn test_update_download_state() -> Result<()> {
381 let t = TestContext::new_alice().await;
382 let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
383
384 let mut msg = Message::new_text("Hi Bob".to_owned());
385 let msg_id = send_msg(&t, chat.id, &mut msg).await?;
386 let msg = Message::load_from_db(&t, msg_id).await?;
387 assert_eq!(msg.download_state(), DownloadState::Done);
388
389 for s in &[
390 DownloadState::Available,
391 DownloadState::InProgress,
392 DownloadState::Failure,
393 DownloadState::Done,
394 DownloadState::Done,
395 ] {
396 msg_id.update_download_state(&t, *s).await?;
397 let msg = Message::load_from_db(&t, msg_id).await?;
398 assert_eq!(msg.download_state(), *s);
399 }
400 t.sql
401 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
402 .await?;
403 msg_id
405 .update_download_state(&t, DownloadState::Done)
406 .await?;
407
408 Ok(())
409 }
410}