1use std::collections::BTreeSet;
65use std::fmt;
66use std::num::ParseIntError;
67use std::str::FromStr;
68use std::time::{Duration, UNIX_EPOCH};
69
70use anyhow::{Context as _, Result, ensure};
71use async_channel::Receiver;
72use serde::{Deserialize, Serialize};
73use tokio::time::timeout;
74
75use crate::chat::{ChatId, ChatIdBlocked, send_msg};
76use crate::config::Config;
77use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH};
78use crate::contact::ContactId;
79use crate::context::Context;
80use crate::download::DownloadState;
81use crate::events::EventType;
82use crate::log::{LogExt, warn};
83use crate::message::{Message, MessageState, MsgId, Viewtype};
84use crate::mimeparser::SystemMessage;
85use crate::stock_str;
86use crate::tools::{SystemTime, duration_to_str, time};
87use crate::{location, stats};
88
89#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize, Default)]
91pub enum Timer {
92 #[default]
94 Disabled,
95
96 Enabled {
98 duration: u32,
102 },
103}
104
105impl Timer {
106 pub fn to_u32(self) -> u32 {
110 match self {
111 Self::Disabled => 0,
112 Self::Enabled { duration } => duration,
113 }
114 }
115
116 pub fn from_u32(duration: u32) -> Self {
120 if duration == 0 {
121 Self::Disabled
122 } else {
123 Self::Enabled { duration }
124 }
125 }
126}
127
128impl fmt::Display for Timer {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}", self.to_u32())
131 }
132}
133
134impl FromStr for Timer {
135 type Err = ParseIntError;
136
137 fn from_str(input: &str) -> Result<Timer, ParseIntError> {
138 input.parse::<u32>().map(Self::from_u32)
139 }
140}
141
142impl rusqlite::types::ToSql for Timer {
143 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
144 let val = rusqlite::types::Value::Integer(match self {
145 Self::Disabled => 0,
146 Self::Enabled { duration } => i64::from(*duration),
147 });
148 let out = rusqlite::types::ToSqlOutput::Owned(val);
149 Ok(out)
150 }
151}
152
153impl rusqlite::types::FromSql for Timer {
154 fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult<Self> {
155 i64::column_result(value).and_then(|value| {
156 if value == 0 {
157 Ok(Self::Disabled)
158 } else if let Ok(duration) = u32::try_from(value) {
159 Ok(Self::Enabled { duration })
160 } else {
161 Err(rusqlite::types::FromSqlError::OutOfRange(value))
162 }
163 })
164 }
165}
166
167impl ChatId {
168 pub async fn get_ephemeral_timer(self, context: &Context) -> Result<Timer> {
170 let timer = context
171 .sql
172 .query_get_value(
173 "SELECT IFNULL(ephemeral_timer, 0) FROM chats WHERE id=?",
174 (self,),
175 )
176 .await?
177 .with_context(|| format!("Chat {self} not found"))?;
178 Ok(timer)
179 }
180
181 pub(crate) async fn inner_set_ephemeral_timer(
186 self,
187 context: &Context,
188 timer: Timer,
189 ) -> Result<()> {
190 ensure!(!self.is_special(), "Invalid chat ID");
191
192 context
193 .sql
194 .execute(
195 "UPDATE chats
196 SET ephemeral_timer=?
197 WHERE id=?;",
198 (timer, self),
199 )
200 .await?;
201
202 context.emit_event(EventType::ChatEphemeralTimerModified {
203 chat_id: self,
204 timer,
205 });
206 Ok(())
207 }
208
209 pub async fn set_ephemeral_timer(self, context: &Context, timer: Timer) -> Result<()> {
213 if timer == self.get_ephemeral_timer(context).await? {
214 return Ok(());
215 }
216 self.inner_set_ephemeral_timer(context, timer).await?;
217
218 if self.is_promoted(context).await? {
219 let mut msg = Message::new_text(
220 stock_ephemeral_timer_changed(context, timer, ContactId::SELF).await,
221 );
222 msg.param.set_cmd(SystemMessage::EphemeralTimerChanged);
223 if let Err(err) = send_msg(context, self, &mut msg).await {
224 error!(
225 context,
226 "Failed to send a message about ephemeral message timer change: {:?}", err
227 );
228 }
229 }
230 Ok(())
231 }
232}
233
234pub(crate) async fn stock_ephemeral_timer_changed(
236 context: &Context,
237 timer: Timer,
238 from_id: ContactId,
239) -> String {
240 match timer {
241 Timer::Disabled => stock_str::msg_ephemeral_timer_disabled(context, from_id).await,
242 Timer::Enabled { duration } => match duration {
243 0..=60 => {
244 stock_str::msg_ephemeral_timer_enabled(context, &timer.to_string(), from_id).await
245 }
246 61..=3599 => {
247 stock_str::msg_ephemeral_timer_minutes(
248 context,
249 &format!("{}", (f64::from(duration) / 6.0).round() / 10.0),
250 from_id,
251 )
252 .await
253 }
254 3600 => stock_str::msg_ephemeral_timer_hour(context, from_id).await,
255 3601..=86399 => {
256 stock_str::msg_ephemeral_timer_hours(
257 context,
258 &format!("{}", (f64::from(duration) / 360.0).round() / 10.0),
259 from_id,
260 )
261 .await
262 }
263 86400 => stock_str::msg_ephemeral_timer_day(context, from_id).await,
264 86401..=604_799 => {
265 stock_str::msg_ephemeral_timer_days(
266 context,
267 &format!("{}", (f64::from(duration) / 8640.0).round() / 10.0),
268 from_id,
269 )
270 .await
271 }
272 604_800 => stock_str::msg_ephemeral_timer_week(context, from_id).await,
273 31_536_000..=31_708_800 => stock_str::msg_ephemeral_timer_year(context, from_id).await,
274 _ => {
275 stock_str::msg_ephemeral_timer_weeks(
276 context,
277 &format!("{}", (f64::from(duration) / 60480.0).round() / 10.0),
278 from_id,
279 )
280 .await
281 }
282 },
283 }
284}
285
286impl MsgId {
287 pub(crate) async fn ephemeral_timer(self, context: &Context) -> Result<Timer> {
289 let res = match context
290 .sql
291 .query_get_value("SELECT ephemeral_timer FROM msgs WHERE id=?", (self,))
292 .await?
293 {
294 None | Some(0) => Timer::Disabled,
295 Some(duration) => Timer::Enabled { duration },
296 };
297 Ok(res)
298 }
299
300 pub(crate) async fn start_ephemeral_timer(self, context: &Context) -> Result<()> {
302 if let Timer::Enabled { duration } = self.ephemeral_timer(context).await? {
303 let ephemeral_timestamp = time().saturating_add(duration.into());
304
305 context
306 .sql
307 .execute(
308 "UPDATE msgs SET ephemeral_timestamp = ? \
309 WHERE (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?) \
310 AND id = ?",
311 (ephemeral_timestamp, ephemeral_timestamp, self),
312 )
313 .await?;
314 context.scheduler.interrupt_ephemeral_task().await;
315 }
316 Ok(())
317 }
318}
319
320pub(crate) async fn start_ephemeral_timers_msgids(
321 context: &Context,
322 msg_ids: &[MsgId],
323) -> Result<()> {
324 let now = time();
325 let should_interrupt =
326 context
327 .sql
328 .transaction(move |transaction| {
329 let mut should_interrupt = false;
330 let mut stmt =
331 transaction.prepare(
332 "UPDATE msgs SET ephemeral_timestamp = ?1 + ephemeral_timer
333 WHERE (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?1 + ephemeral_timer) AND ephemeral_timer > 0
334 AND id=?2")?;
335 for msg_id in msg_ids {
336 should_interrupt |= stmt.execute((now, msg_id))? > 0;
337 }
338 Ok(should_interrupt)
339 }).await?;
340 if should_interrupt {
341 context.scheduler.interrupt_ephemeral_task().await;
342 }
343 Ok(())
344}
345
346pub(crate) async fn start_chat_ephemeral_timers(context: &Context, chat_id: ChatId) -> Result<()> {
350 let now = time();
351 let should_interrupt = context
352 .sql
353 .execute(
354 "UPDATE msgs SET ephemeral_timestamp = ?1 + ephemeral_timer
355 WHERE chat_id = ?2
356 AND ephemeral_timer > 0
357 AND (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?1 + ephemeral_timer)",
358 (now, chat_id),
359 )
360 .await?
361 > 0;
362 if should_interrupt {
363 context.scheduler.interrupt_ephemeral_task().await;
364 }
365 Ok(())
366}
367
368async fn select_expired_messages(
377 context: &Context,
378 now: i64,
379) -> Result<Vec<(MsgId, ChatId, Viewtype, u32)>> {
380 let mut rows = context
381 .sql
382 .query_map_vec(
383 r#"
384SELECT id, chat_id, type, location_id
385FROM msgs
386WHERE
387 ephemeral_timestamp != 0
388 AND ephemeral_timestamp <= ?
389 AND chat_id != ?
390"#,
391 (now, DC_CHAT_ID_TRASH),
392 |row| {
393 let id: MsgId = row.get("id")?;
394 let chat_id: ChatId = row.get("chat_id")?;
395 let viewtype: Viewtype = row
396 .get("type")
397 .context("Using default viewtype for ephemeral handling.")
398 .log_err(context)
399 .unwrap_or_default();
400 let location_id: u32 = row.get("location_id")?;
401 Ok((id, chat_id, viewtype, location_id))
402 },
403 )
404 .await?;
405
406 if let Some(delete_device_after) = context.get_config_delete_device_after().await? {
407 let self_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::SELF)
408 .await?
409 .map(|c| c.id)
410 .unwrap_or_default();
411 let device_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::DEVICE)
412 .await?
413 .map(|c| c.id)
414 .unwrap_or_default();
415
416 let threshold_timestamp = now.saturating_sub(delete_device_after);
417
418 let rows_expired = context
419 .sql
420 .query_map_vec(
421 r#"
422SELECT id, chat_id, type, location_id
423FROM msgs
424WHERE
425 timestamp < ?1
426 AND timestamp_rcvd < ?1
427 AND chat_id > ?
428 AND chat_id != ?
429 AND chat_id != ?
430"#,
431 (
432 threshold_timestamp,
433 DC_CHAT_ID_LAST_SPECIAL,
434 self_chat_id,
435 device_chat_id,
436 ),
437 |row| {
438 let id: MsgId = row.get("id")?;
439 let chat_id: ChatId = row.get("chat_id")?;
440 let viewtype: Viewtype = row
441 .get("type")
442 .context("Using default viewtype for delete-old handling.")
443 .log_err(context)
444 .unwrap_or_default();
445 let location_id: u32 = row.get("location_id")?;
446 Ok((id, chat_id, viewtype, location_id))
447 },
448 )
449 .await?;
450
451 rows.extend(rows_expired);
452 }
453
454 Ok(rows)
455}
456
457pub(crate) async fn delete_expired_messages(context: &Context, now: i64) -> Result<()> {
466 let rows = select_expired_messages(context, now).await?;
467
468 if !rows.is_empty() {
469 info!(context, "Attempting to delete {} messages.", rows.len());
470
471 let (msgs_changed, webxdc_deleted) = context
472 .sql
473 .transaction(|transaction| {
474 let mut msgs_changed = Vec::with_capacity(rows.len());
475 let mut webxdc_deleted = Vec::new();
476 let mut del_msg_stmt = transaction.prepare(
479 "
480INSERT OR REPLACE INTO msgs (id, rfc724_mid, pre_rfc724_mid, timestamp, chat_id)
481SELECT ?1, rfc724_mid, pre_rfc724_mid, timestamp, ? FROM msgs WHERE id=?1
482 ",
483 )?;
484 let mut del_location_stmt =
485 transaction.prepare("DELETE FROM locations WHERE independent=1 AND id=?")?;
486 for (msg_id, chat_id, viewtype, location_id) in rows {
487 del_msg_stmt.execute((msg_id, DC_CHAT_ID_TRASH))?;
488 if location_id > 0 {
489 del_location_stmt.execute((location_id,))?;
490 }
491
492 msgs_changed.push((chat_id, msg_id));
493 if viewtype == Viewtype::Webxdc {
494 webxdc_deleted.push(msg_id)
495 }
496 }
497 Ok((msgs_changed, webxdc_deleted))
498 })
499 .await?;
500
501 let mut modified_chat_ids = BTreeSet::new();
502
503 for (chat_id, msg_id) in msgs_changed {
504 context.emit_event(EventType::MsgDeleted { chat_id, msg_id });
505 modified_chat_ids.insert(chat_id);
506 }
507
508 for modified_chat_id in modified_chat_ids {
509 context.emit_msgs_changed_without_msg_id(modified_chat_id);
510 }
511
512 for msg_id in webxdc_deleted {
513 context.emit_event(EventType::WebxdcInstanceDeleted { msg_id });
514 }
515 }
516
517 Ok(())
518}
519
520async fn next_delete_device_after_timestamp(context: &Context) -> Result<Option<i64>> {
523 if let Some(delete_device_after) = context.get_config_delete_device_after().await? {
524 let self_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::SELF)
525 .await?
526 .map(|c| c.id)
527 .unwrap_or_default();
528 let device_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::DEVICE)
529 .await?
530 .map(|c| c.id)
531 .unwrap_or_default();
532
533 let oldest_message_timestamp: Option<i64> = context
534 .sql
535 .query_get_value(
536 r#"
537 SELECT min(max(timestamp, timestamp_rcvd))
538 FROM msgs
539 WHERE chat_id > ?
540 AND chat_id != ?
541 AND chat_id != ?
542 HAVING count(*) > 0
543 "#,
544 (DC_CHAT_ID_TRASH, self_chat_id, device_chat_id),
545 )
546 .await?;
547
548 Ok(oldest_message_timestamp.map(|x| x.saturating_add(delete_device_after)))
549 } else {
550 Ok(None)
551 }
552}
553
554async fn next_expiration_timestamp(context: &Context) -> Option<i64> {
559 let ephemeral_timestamp: Option<i64> = match context
560 .sql
561 .query_get_value(
562 r#"
563 SELECT min(ephemeral_timestamp)
564 FROM msgs
565 WHERE ephemeral_timestamp != 0
566 AND chat_id != ?
567 HAVING count(*) > 0
568 "#,
569 (DC_CHAT_ID_TRASH,), )
571 .await
572 {
573 Err(err) => {
574 warn!(context, "Can't calculate next ephemeral timeout: {}", err);
575 None
576 }
577 Ok(ephemeral_timestamp) => ephemeral_timestamp,
578 };
579
580 let delete_device_after_timestamp: Option<i64> =
581 match next_delete_device_after_timestamp(context).await {
582 Err(err) => {
583 warn!(
584 context,
585 "Can't calculate timestamp of the next message expiration: {}", err
586 );
587 None
588 }
589 Ok(timestamp) => timestamp,
590 };
591
592 ephemeral_timestamp
593 .into_iter()
594 .chain(delete_device_after_timestamp)
595 .min()
596}
597
598#[expect(clippy::arithmetic_side_effects)]
599pub(crate) async fn ephemeral_loop(context: &Context, interrupt_receiver: Receiver<()>) {
600 loop {
601 let ephemeral_timestamp = next_expiration_timestamp(context).await;
602
603 let now = SystemTime::now();
604 let until = if let Some(ephemeral_timestamp) = ephemeral_timestamp {
605 UNIX_EPOCH
606 + Duration::from_secs(ephemeral_timestamp.try_into().unwrap_or(u64::MAX))
607 + Duration::from_secs(1)
608 } else {
609 now + Duration::from_secs(86400) };
612
613 if let Ok(duration) = until.duration_since(now) {
614 info!(
615 context,
616 "Ephemeral loop waiting for deletion in {} or interrupt",
617 duration_to_str(duration)
618 );
619 match timeout(duration, interrupt_receiver.recv()).await {
620 Ok(Ok(())) => {
621 continue;
623 }
624 Ok(Err(err)) => {
625 warn!(
626 context,
627 "Interrupt channel closed, ephemeral loop exits now: {err:#}."
628 );
629 return;
630 }
631 Err(_err) => {
632 }
634 }
635 }
636
637 stats::maybe_update_message_stats(context)
639 .await
640 .log_err(context)
641 .ok();
642
643 delete_expired_messages(context, time())
644 .await
645 .log_err(context)
646 .ok();
647
648 location::delete_expired(context, time())
649 .await
650 .log_err(context)
651 .ok();
652 }
653}
654
655pub(crate) async fn delete_expired_imap_messages(
660 context: &Context,
661 transport_id: u32,
662 is_chatmail: bool,
663) -> Result<()> {
664 let now = time();
665
666 let bcc_self = context.get_config_bool(Config::BccSelf).await?;
667 if should_delete_all_downloaded_messages(bcc_self, is_chatmail) {
668 context
677 .sql
678 .execute(
679 "UPDATE imap
680 SET target=''
681 WHERE transport_id=?1
682 AND rfc724_mid IN (
683 SELECT rfc724_mid FROM msgs
684 WHERE ((ephemeral_timestamp!=0 AND ephemeral_timestamp<=?2) OR download_state=?3)
685 AND id>9
686 UNION
687 SELECT pre_rfc724_mid FROM msgs
688 WHERE pre_rfc724_mid!=''
689 AND id>9
690 )",
691 (transport_id, now, DownloadState::Done),
692 )
693 .await?;
694 } else if bcc_self {
695 context
699 .sql
700 .execute(
701 "UPDATE imap
702 SET target=''
703 WHERE transport_id=?1
704 AND rfc724_mid IN (
705 SELECT rfc724_mid FROM msgs
706 WHERE ephemeral_timestamp!=0 AND ephemeral_timestamp<=?2 AND id>9
707 UNION
708 SELECT pre_rfc724_mid FROM msgs
709 WHERE pre_rfc724_mid!=''
710 AND ephemeral_timestamp!=0 AND ephemeral_timestamp<=?2 AND id>9
711 )",
712 (transport_id, now),
713 )
714 .await?;
715 } else {
716 context
719 .sql
720 .execute(
721 "UPDATE imap
722 SET target=''
723 WHERE transport_id=?1
724 AND rfc724_mid IN (
725 SELECT rfc724_mid FROM msgs
726 WHERE id>9
727 AND ((ephemeral_timestamp!=0 AND ephemeral_timestamp<=?2) OR
728 ((param GLOB '*\nc=1*' OR param GLOB 'c=1*') AND download_state=?3))
729 UNION
730 SELECT pre_rfc724_mid FROM msgs
731 WHERE pre_rfc724_mid!=''
732 AND id>9
733 AND ((ephemeral_timestamp!=0 AND ephemeral_timestamp<=?2) OR
734 (param GLOB '*\nc=1*' OR param GLOB 'c=1*'))
735 )",
736 (transport_id, now, DownloadState::Done),
737 )
738 .await?;
739 }
740
741 Ok(())
742}
743
744pub(crate) fn should_delete_all_downloaded_messages(bcc_self: bool, is_chatmail: bool) -> bool {
745 !bcc_self && is_chatmail
746}
747
748pub(crate) async fn start_ephemeral_timers(context: &Context) -> Result<()> {
758 context
759 .sql
760 .execute(
761 "UPDATE msgs \
762 SET ephemeral_timestamp = ? + ephemeral_timer \
763 WHERE ephemeral_timer > 0 \
764 AND ephemeral_timestamp = 0 \
765 AND state NOT IN (?, ?, ?)",
766 (
767 time(),
768 MessageState::InFresh,
769 MessageState::InNoticed,
770 MessageState::OutDraft,
771 ),
772 )
773 .await?;
774
775 Ok(())
776}
777
778#[cfg(test)]
779mod ephemeral_tests;