1use std::collections::BTreeMap;
4
5use anyhow::{Result, anyhow, bail, ensure};
6use deltachat_derive::{FromSql, ToSql};
7use serde::{Deserialize, Serialize};
8
9use crate::context::Context;
10use crate::imap::session::Session;
11use crate::log::warn;
12use crate::message::{self, Message, MsgId, rfc724_mid_exists};
13use crate::{EventType, chatlist_events};
14
15pub(crate) mod post_msg_metadata;
16pub(crate) use post_msg_metadata::PostMsgMetadata;
17
18pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
23
24pub(crate) const PRE_MSG_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
30
31pub(crate) const PRE_MSG_SIZE_WARNING_THRESHOLD: usize = 150_000;
33
34#[derive(
36 Debug,
37 Default,
38 Display,
39 Clone,
40 Copy,
41 PartialEq,
42 Eq,
43 FromPrimitive,
44 ToPrimitive,
45 FromSql,
46 ToSql,
47 Serialize,
48 Deserialize,
49)]
50#[repr(u32)]
51pub enum DownloadState {
52 #[default]
54 Done = 0,
55
56 Available = 10,
58
59 Failure = 20,
61
62 Undecipherable = 30,
64
65 InProgress = 1000,
67}
68
69impl MsgId {
70 pub async fn download_full(self, context: &Context) -> Result<()> {
72 let msg = Message::load_from_db(context, self).await?;
73 match msg.download_state() {
74 DownloadState::Done | DownloadState::Undecipherable => {
75 return Err(anyhow!("Nothing to download."));
76 }
77 DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
78 DownloadState::Available | DownloadState::Failure => {
79 if msg.rfc724_mid().is_empty() {
80 return Err(anyhow!("Download not possible, message has no rfc724_mid"));
81 }
82 self.update_download_state(context, DownloadState::InProgress)
83 .await?;
84 info!(
85 context,
86 "Requesting full download of {:?}.",
87 msg.rfc724_mid()
88 );
89 context
90 .sql
91 .execute(
92 "INSERT INTO download (rfc724_mid, msg_id) VALUES (?,?)",
93 (msg.rfc724_mid(), msg.id),
94 )
95 .await?;
96 context.scheduler.interrupt_inbox().await;
97 }
98 }
99 Ok(())
100 }
101
102 pub(crate) async fn update_download_state(
105 self,
106 context: &Context,
107 download_state: DownloadState,
108 ) -> Result<()> {
109 if context
110 .sql
111 .execute(
112 "UPDATE msgs SET download_state=? WHERE id=? AND download_state<>?1",
113 (download_state, self),
114 )
115 .await?
116 == 0
117 {
118 return Ok(());
119 }
120 let Some(msg) = Message::load_from_db_optional(context, self).await? else {
121 return Ok(());
122 };
123 context.emit_event(EventType::MsgsChanged {
124 chat_id: msg.chat_id,
125 msg_id: self,
126 });
127 chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
128 Ok(())
129 }
130}
131
132impl Message {
133 pub fn download_state(&self) -> DownloadState {
135 self.download_state
136 }
137}
138
139pub(crate) async fn download_msg(
145 context: &Context,
146 rfc724_mid: String,
147 session: &mut Session,
148) -> Result<Option<()>> {
149 let transport_id = session.transport_id();
150 let row = context
151 .sql
152 .query_row_optional(
153 "SELECT uid, folder, transport_id FROM imap
154 WHERE rfc724_mid=? AND target!=''
155 ORDER BY transport_id=? DESC LIMIT 1",
156 (&rfc724_mid, transport_id),
157 |row| {
158 let server_uid: u32 = row.get(0)?;
159 let server_folder: String = row.get(1)?;
160 let msg_transport_id: u32 = row.get(2)?;
161 Ok((server_uid, server_folder, msg_transport_id))
162 },
163 )
164 .await?;
165
166 let Some((server_uid, server_folder, msg_transport_id)) = row else {
167 delete_from_available_post_msgs(context, &rfc724_mid).await?;
169 return Err(anyhow!(
170 "IMAP location for {rfc724_mid:?} post-message is unknown"
171 ));
172 };
173 if msg_transport_id != transport_id {
174 return Ok(None);
175 }
176 session
177 .fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)
178 .await?;
179 Ok(Some(()))
180}
181
182impl Session {
183 async fn fetch_single_msg(
188 &mut self,
189 context: &Context,
190 folder: &str,
191 uid: u32,
192 rfc724_mid: String,
193 ) -> Result<()> {
194 if uid == 0 {
195 bail!("Attempt to fetch UID 0");
196 }
197
198 let folder_exists = self.select_with_uidvalidity(context, folder).await?;
199 ensure!(folder_exists, "No folder {folder}");
200
201 info!(context, "Downloading message {}/{} fully...", folder, uid);
203
204 let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
205 uid_message_ids.insert(uid, rfc724_mid);
206 let (sender, receiver) = async_channel::unbounded();
207 self.fetch_many_msgs(context, folder, vec![uid], &uid_message_ids, sender)
208 .await?;
209 if receiver.recv().await.is_err() {
210 bail!("Failed to fetch UID {uid}");
211 }
212 Ok(())
213 }
214}
215
216async fn set_state_to_failure(context: &Context, rfc724_mid: &str) -> Result<()> {
217 if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
218 msg_id
225 .update_download_state(context, DownloadState::Failure)
226 .await?;
227 }
228 Ok(())
229}
230
231async fn available_post_msgs_contains_rfc724_mid(
232 context: &Context,
233 rfc724_mid: &str,
234) -> Result<bool> {
235 Ok(context
236 .sql
237 .query_get_value::<String>(
238 "SELECT rfc724_mid FROM available_post_msgs WHERE rfc724_mid=?",
239 (&rfc724_mid,),
240 )
241 .await?
242 .is_some())
243}
244
245async fn delete_from_available_post_msgs(context: &Context, rfc724_mid: &str) -> Result<()> {
246 context
247 .sql
248 .execute(
249 "DELETE FROM available_post_msgs WHERE rfc724_mid=?",
250 (&rfc724_mid,),
251 )
252 .await?;
253 Ok(())
254}
255
256async fn delete_from_downloads(context: &Context, rfc724_mid: &str) -> Result<()> {
257 context
258 .sql
259 .execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
260 .await?;
261 Ok(())
262}
263
264pub(crate) async fn msg_is_downloaded_for(context: &Context, rfc724_mid: &str) -> Result<bool> {
265 Ok(message::rfc724_mid_exists(context, rfc724_mid)
266 .await?
267 .is_some())
268}
269
270pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
271 let rfc724_mids = context
272 .sql
273 .query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
274 let rfc724_mid: String = row.get(0)?;
275 Ok(rfc724_mid)
276 })
277 .await?;
278
279 for rfc724_mid in &rfc724_mids {
280 let res = download_msg(context, rfc724_mid.clone(), session).await;
281 if let Ok(Some(())) = res {
282 delete_from_downloads(context, rfc724_mid).await?;
283 delete_from_available_post_msgs(context, rfc724_mid).await?;
284 }
285 if let Err(err) = res {
286 warn!(
287 context,
288 "Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
289 );
290 if !msg_is_downloaded_for(context, rfc724_mid).await? {
291 warn!(
293 context,
294 "{rfc724_mid} download failed and there is no downloaded pre-message."
295 );
296 delete_from_downloads(context, rfc724_mid).await?;
297 } else if available_post_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
298 warn!(
299 context,
300 "{rfc724_mid} is in available_post_msgs table but we failed to fetch it,
301 so set the message to DownloadState::Failure - probably it was deleted on the server in the meantime"
302 );
303 set_state_to_failure(context, rfc724_mid).await?;
304 delete_from_downloads(context, rfc724_mid).await?;
305 delete_from_available_post_msgs(context, rfc724_mid).await?;
306 } else {
307 }
310 }
311 }
312
313 Ok(())
314}
315
316pub(crate) async fn download_known_post_messages_without_pre_message(
319 context: &Context,
320 session: &mut Session,
321) -> Result<()> {
322 let rfc724_mids = context
323 .sql
324 .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
325 let rfc724_mid: String = row.get(0)?;
326 Ok(rfc724_mid)
327 })
328 .await?;
329 for rfc724_mid in &rfc724_mids {
330 if msg_is_downloaded_for(context, rfc724_mid).await? {
331 delete_from_available_post_msgs(context, rfc724_mid).await?;
332 continue;
333 }
334
335 let res = download_msg(context, rfc724_mid.clone(), session).await;
340 if let Ok(Some(())) = res {
341 delete_from_available_post_msgs(context, rfc724_mid).await?;
342 }
343 if let Err(err) = res {
344 warn!(
345 context,
346 "download_known_post_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
347 err
348 );
349 }
350 }
351 Ok(())
352}
353
354#[cfg(test)]
355mod tests {
356 use num_traits::FromPrimitive;
357
358 use super::*;
359 use crate::chat::send_msg;
360 use crate::test_utils::TestContext;
361
362 #[test]
363 fn test_downloadstate_values() {
364 assert_eq!(DownloadState::Done, DownloadState::default());
366 assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
367 assert_eq!(
368 DownloadState::Available,
369 DownloadState::from_i32(10).unwrap()
370 );
371 assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
372 assert_eq!(
373 DownloadState::InProgress,
374 DownloadState::from_i32(1000).unwrap()
375 );
376 }
377
378 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
379 async fn test_update_download_state() -> Result<()> {
380 let t = TestContext::new_alice().await;
381 let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
382
383 let mut msg = Message::new_text("Hi Bob".to_owned());
384 let msg_id = send_msg(&t, chat.id, &mut msg).await?;
385 let msg = Message::load_from_db(&t, msg_id).await?;
386 assert_eq!(msg.download_state(), DownloadState::Done);
387
388 for s in &[
389 DownloadState::Available,
390 DownloadState::InProgress,
391 DownloadState::Failure,
392 DownloadState::Done,
393 DownloadState::Done,
394 ] {
395 msg_id.update_download_state(&t, *s).await?;
396 let msg = Message::load_from_db(&t, msg_id).await?;
397 assert_eq!(msg.download_state(), *s);
398 }
399 t.sql
400 .execute("DELETE FROM msgs WHERE id=?", (msg_id,))
401 .await?;
402 msg_id
404 .update_download_state(&t, DownloadState::Done)
405 .await?;
406
407 Ok(())
408 }
409}