1use std::cmp::max;
66use std::collections::BTreeSet;
67use std::fmt;
68use std::num::ParseIntError;
69use std::str::FromStr;
70use std::time::{Duration, UNIX_EPOCH};
71
72use anyhow::{Context as _, Result, ensure};
73use async_channel::Receiver;
74use serde::{Deserialize, Serialize};
75use tokio::time::timeout;
76
77use crate::chat::{ChatId, ChatIdBlocked, send_msg};
78use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH};
79use crate::contact::ContactId;
80use crate::context::Context;
81use crate::download::MIN_DELETE_SERVER_AFTER;
82use crate::events::EventType;
83use crate::log::{LogExt, error, info, warn};
84use crate::message::{Message, MessageState, MsgId, Viewtype};
85use crate::mimeparser::SystemMessage;
86use crate::stock_str;
87use crate::tools::{SystemTime, duration_to_str, time};
88use crate::{location, stats};
89
90#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
92pub enum Timer {
93 Disabled,
95
96 Enabled {
98 duration: u32,
102 },
103}
104
105impl Timer {
106 pub fn to_u32(self) -> u32 {
110 match self {
111 Self::Disabled => 0,
112 Self::Enabled { duration } => duration,
113 }
114 }
115
116 pub fn from_u32(duration: u32) -> Self {
120 if duration == 0 {
121 Self::Disabled
122 } else {
123 Self::Enabled { duration }
124 }
125 }
126}
127
128impl Default for Timer {
129 fn default() -> Self {
130 Self::Disabled
131 }
132}
133
134impl fmt::Display for Timer {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 write!(f, "{}", self.to_u32())
137 }
138}
139
140impl FromStr for Timer {
141 type Err = ParseIntError;
142
143 fn from_str(input: &str) -> Result<Timer, ParseIntError> {
144 input.parse::<u32>().map(Self::from_u32)
145 }
146}
147
148impl rusqlite::types::ToSql for Timer {
149 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
150 let val = rusqlite::types::Value::Integer(match self {
151 Self::Disabled => 0,
152 Self::Enabled { duration } => i64::from(*duration),
153 });
154 let out = rusqlite::types::ToSqlOutput::Owned(val);
155 Ok(out)
156 }
157}
158
159impl rusqlite::types::FromSql for Timer {
160 fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult<Self> {
161 i64::column_result(value).and_then(|value| {
162 if value == 0 {
163 Ok(Self::Disabled)
164 } else if let Ok(duration) = u32::try_from(value) {
165 Ok(Self::Enabled { duration })
166 } else {
167 Err(rusqlite::types::FromSqlError::OutOfRange(value))
168 }
169 })
170 }
171}
172
173impl ChatId {
174 pub async fn get_ephemeral_timer(self, context: &Context) -> Result<Timer> {
176 let timer = context
177 .sql
178 .query_get_value(
179 "SELECT IFNULL(ephemeral_timer, 0) FROM chats WHERE id=?",
180 (self,),
181 )
182 .await?
183 .with_context(|| format!("Chat {self} not found"))?;
184 Ok(timer)
185 }
186
187 pub(crate) async fn inner_set_ephemeral_timer(
192 self,
193 context: &Context,
194 timer: Timer,
195 ) -> Result<()> {
196 ensure!(!self.is_special(), "Invalid chat ID");
197
198 context
199 .sql
200 .execute(
201 "UPDATE chats
202 SET ephemeral_timer=?
203 WHERE id=?;",
204 (timer, self),
205 )
206 .await?;
207
208 context.emit_event(EventType::ChatEphemeralTimerModified {
209 chat_id: self,
210 timer,
211 });
212 Ok(())
213 }
214
215 pub async fn set_ephemeral_timer(self, context: &Context, timer: Timer) -> Result<()> {
219 if timer == self.get_ephemeral_timer(context).await? {
220 return Ok(());
221 }
222 self.inner_set_ephemeral_timer(context, timer).await?;
223
224 if self.is_promoted(context).await? {
225 let mut msg = Message::new_text(
226 stock_ephemeral_timer_changed(context, timer, ContactId::SELF).await,
227 );
228 msg.param.set_cmd(SystemMessage::EphemeralTimerChanged);
229 if let Err(err) = send_msg(context, self, &mut msg).await {
230 error!(
231 context,
232 "Failed to send a message about ephemeral message timer change: {:?}", err
233 );
234 }
235 }
236 Ok(())
237 }
238}
239
240pub(crate) async fn stock_ephemeral_timer_changed(
242 context: &Context,
243 timer: Timer,
244 from_id: ContactId,
245) -> String {
246 match timer {
247 Timer::Disabled => stock_str::msg_ephemeral_timer_disabled(context, from_id).await,
248 Timer::Enabled { duration } => match duration {
249 0..=59 => {
250 stock_str::msg_ephemeral_timer_enabled(context, &timer.to_string(), from_id).await
251 }
252 60 => stock_str::msg_ephemeral_timer_minute(context, from_id).await,
253 61..=3599 => {
254 stock_str::msg_ephemeral_timer_minutes(
255 context,
256 &format!("{}", (f64::from(duration) / 6.0).round() / 10.0),
257 from_id,
258 )
259 .await
260 }
261 3600 => stock_str::msg_ephemeral_timer_hour(context, from_id).await,
262 3601..=86399 => {
263 stock_str::msg_ephemeral_timer_hours(
264 context,
265 &format!("{}", (f64::from(duration) / 360.0).round() / 10.0),
266 from_id,
267 )
268 .await
269 }
270 86400 => stock_str::msg_ephemeral_timer_day(context, from_id).await,
271 86401..=604_799 => {
272 stock_str::msg_ephemeral_timer_days(
273 context,
274 &format!("{}", (f64::from(duration) / 8640.0).round() / 10.0),
275 from_id,
276 )
277 .await
278 }
279 604_800 => stock_str::msg_ephemeral_timer_week(context, from_id).await,
280 31_536_000..=31_708_800 => stock_str::msg_ephemeral_timer_year(context, from_id).await,
281 _ => {
282 stock_str::msg_ephemeral_timer_weeks(
283 context,
284 &format!("{}", (f64::from(duration) / 60480.0).round() / 10.0),
285 from_id,
286 )
287 .await
288 }
289 },
290 }
291}
292
293impl MsgId {
294 pub(crate) async fn ephemeral_timer(self, context: &Context) -> Result<Timer> {
296 let res = match context
297 .sql
298 .query_get_value("SELECT ephemeral_timer FROM msgs WHERE id=?", (self,))
299 .await?
300 {
301 None | Some(0) => Timer::Disabled,
302 Some(duration) => Timer::Enabled { duration },
303 };
304 Ok(res)
305 }
306
307 pub(crate) async fn start_ephemeral_timer(self, context: &Context) -> Result<()> {
309 if let Timer::Enabled { duration } = self.ephemeral_timer(context).await? {
310 let ephemeral_timestamp = time().saturating_add(duration.into());
311
312 context
313 .sql
314 .execute(
315 "UPDATE msgs SET ephemeral_timestamp = ? \
316 WHERE (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?) \
317 AND id = ?",
318 (ephemeral_timestamp, ephemeral_timestamp, self),
319 )
320 .await?;
321 context.scheduler.interrupt_ephemeral_task().await;
322 }
323 Ok(())
324 }
325}
326
327pub(crate) async fn start_ephemeral_timers_msgids(
328 context: &Context,
329 msg_ids: &[MsgId],
330) -> Result<()> {
331 let now = time();
332 let should_interrupt =
333 context
334 .sql
335 .transaction(move |transaction| {
336 let mut should_interrupt = false;
337 let mut stmt =
338 transaction.prepare(
339 "UPDATE msgs SET ephemeral_timestamp = ?1 + ephemeral_timer
340 WHERE (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?1 + ephemeral_timer) AND ephemeral_timer > 0
341 AND id=?2")?;
342 for msg_id in msg_ids {
343 should_interrupt |= stmt.execute((now, msg_id))? > 0;
344 }
345 Ok(should_interrupt)
346 }).await?;
347 if should_interrupt {
348 context.scheduler.interrupt_ephemeral_task().await;
349 }
350 Ok(())
351}
352
353pub(crate) async fn start_chat_ephemeral_timers(context: &Context, chat_id: ChatId) -> Result<()> {
357 let now = time();
358 let should_interrupt = context
359 .sql
360 .execute(
361 "UPDATE msgs SET ephemeral_timestamp = ?1 + ephemeral_timer
362 WHERE chat_id = ?2
363 AND ephemeral_timer > 0
364 AND (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?1 + ephemeral_timer)",
365 (now, chat_id),
366 )
367 .await?
368 > 0;
369 if should_interrupt {
370 context.scheduler.interrupt_ephemeral_task().await;
371 }
372 Ok(())
373}
374
375async fn select_expired_messages(
384 context: &Context,
385 now: i64,
386) -> Result<Vec<(MsgId, ChatId, Viewtype, u32)>> {
387 let mut rows = context
388 .sql
389 .query_map_vec(
390 r#"
391SELECT id, chat_id, type, location_id
392FROM msgs
393WHERE
394 ephemeral_timestamp != 0
395 AND ephemeral_timestamp <= ?
396 AND chat_id != ?
397"#,
398 (now, DC_CHAT_ID_TRASH),
399 |row| {
400 let id: MsgId = row.get("id")?;
401 let chat_id: ChatId = row.get("chat_id")?;
402 let viewtype: Viewtype = row
403 .get("type")
404 .context("Using default viewtype for ephemeral handling.")
405 .log_err(context)
406 .unwrap_or_default();
407 let location_id: u32 = row.get("location_id")?;
408 Ok((id, chat_id, viewtype, location_id))
409 },
410 )
411 .await?;
412
413 if let Some(delete_device_after) = context.get_config_delete_device_after().await? {
414 let self_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::SELF)
415 .await?
416 .map(|c| c.id)
417 .unwrap_or_default();
418 let device_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::DEVICE)
419 .await?
420 .map(|c| c.id)
421 .unwrap_or_default();
422
423 let threshold_timestamp = now.saturating_sub(delete_device_after);
424
425 let rows_expired = context
426 .sql
427 .query_map_vec(
428 r#"
429SELECT id, chat_id, type, location_id
430FROM msgs
431WHERE
432 timestamp < ?1
433 AND timestamp_rcvd < ?1
434 AND chat_id > ?
435 AND chat_id != ?
436 AND chat_id != ?
437"#,
438 (
439 threshold_timestamp,
440 DC_CHAT_ID_LAST_SPECIAL,
441 self_chat_id,
442 device_chat_id,
443 ),
444 |row| {
445 let id: MsgId = row.get("id")?;
446 let chat_id: ChatId = row.get("chat_id")?;
447 let viewtype: Viewtype = row
448 .get("type")
449 .context("Using default viewtype for delete-old handling.")
450 .log_err(context)
451 .unwrap_or_default();
452 let location_id: u32 = row.get("location_id")?;
453 Ok((id, chat_id, viewtype, location_id))
454 },
455 )
456 .await?;
457
458 rows.extend(rows_expired);
459 }
460
461 Ok(rows)
462}
463
464pub(crate) async fn delete_expired_messages(context: &Context, now: i64) -> Result<()> {
470 let rows = select_expired_messages(context, now).await?;
471
472 if !rows.is_empty() {
473 info!(context, "Attempting to delete {} messages.", rows.len());
474
475 let (msgs_changed, webxdc_deleted) = context
476 .sql
477 .transaction(|transaction| {
478 let mut msgs_changed = Vec::with_capacity(rows.len());
479 let mut webxdc_deleted = Vec::new();
480 let mut del_msg_stmt = transaction.prepare(
483 "INSERT OR REPLACE INTO msgs (id, rfc724_mid, timestamp, chat_id)
484 SELECT ?1, rfc724_mid, timestamp, ? FROM msgs WHERE id=?1",
485 )?;
486 let mut del_location_stmt =
487 transaction.prepare("DELETE FROM locations WHERE independent=1 AND id=?")?;
488 for (msg_id, chat_id, viewtype, location_id) in rows {
489 del_msg_stmt.execute((msg_id, DC_CHAT_ID_TRASH))?;
490 if location_id > 0 {
491 del_location_stmt.execute((location_id,))?;
492 }
493
494 msgs_changed.push((chat_id, msg_id));
495 if viewtype == Viewtype::Webxdc {
496 webxdc_deleted.push(msg_id)
497 }
498 }
499 Ok((msgs_changed, webxdc_deleted))
500 })
501 .await?;
502
503 let mut modified_chat_ids = BTreeSet::new();
504
505 for (chat_id, msg_id) in msgs_changed {
506 context.emit_event(EventType::MsgDeleted { chat_id, msg_id });
507 modified_chat_ids.insert(chat_id);
508 }
509
510 for modified_chat_id in modified_chat_ids {
511 context.emit_msgs_changed_without_msg_id(modified_chat_id);
512 }
513
514 for msg_id in webxdc_deleted {
515 context.emit_event(EventType::WebxdcInstanceDeleted { msg_id });
516 }
517 }
518
519 Ok(())
520}
521
522async fn next_delete_device_after_timestamp(context: &Context) -> Result<Option<i64>> {
525 if let Some(delete_device_after) = context.get_config_delete_device_after().await? {
526 let self_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::SELF)
527 .await?
528 .map(|c| c.id)
529 .unwrap_or_default();
530 let device_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::DEVICE)
531 .await?
532 .map(|c| c.id)
533 .unwrap_or_default();
534
535 let oldest_message_timestamp: Option<i64> = context
536 .sql
537 .query_get_value(
538 r#"
539 SELECT min(max(timestamp, timestamp_rcvd))
540 FROM msgs
541 WHERE chat_id > ?
542 AND chat_id != ?
543 AND chat_id != ?
544 HAVING count(*) > 0
545 "#,
546 (DC_CHAT_ID_TRASH, self_chat_id, device_chat_id),
547 )
548 .await?;
549
550 Ok(oldest_message_timestamp.map(|x| x.saturating_add(delete_device_after)))
551 } else {
552 Ok(None)
553 }
554}
555
556async fn next_expiration_timestamp(context: &Context) -> Option<i64> {
561 let ephemeral_timestamp: Option<i64> = match context
562 .sql
563 .query_get_value(
564 r#"
565 SELECT min(ephemeral_timestamp)
566 FROM msgs
567 WHERE ephemeral_timestamp != 0
568 AND chat_id != ?
569 HAVING count(*) > 0
570 "#,
571 (DC_CHAT_ID_TRASH,), )
573 .await
574 {
575 Err(err) => {
576 warn!(context, "Can't calculate next ephemeral timeout: {}", err);
577 None
578 }
579 Ok(ephemeral_timestamp) => ephemeral_timestamp,
580 };
581
582 let delete_device_after_timestamp: Option<i64> =
583 match next_delete_device_after_timestamp(context).await {
584 Err(err) => {
585 warn!(
586 context,
587 "Can't calculate timestamp of the next message expiration: {}", err
588 );
589 None
590 }
591 Ok(timestamp) => timestamp,
592 };
593
594 ephemeral_timestamp
595 .into_iter()
596 .chain(delete_device_after_timestamp)
597 .min()
598}
599
600pub(crate) async fn ephemeral_loop(context: &Context, interrupt_receiver: Receiver<()>) {
601 loop {
602 let ephemeral_timestamp = next_expiration_timestamp(context).await;
603
604 let now = SystemTime::now();
605 let until = if let Some(ephemeral_timestamp) = ephemeral_timestamp {
606 UNIX_EPOCH
607 + Duration::from_secs(ephemeral_timestamp.try_into().unwrap_or(u64::MAX))
608 + Duration::from_secs(1)
609 } else {
610 now + Duration::from_secs(86400) };
613
614 if let Ok(duration) = until.duration_since(now) {
615 info!(
616 context,
617 "Ephemeral loop waiting for deletion in {} or interrupt",
618 duration_to_str(duration)
619 );
620 match timeout(duration, interrupt_receiver.recv()).await {
621 Ok(Ok(())) => {
622 continue;
624 }
625 Ok(Err(err)) => {
626 warn!(
627 context,
628 "Interrupt channel closed, ephemeral loop exits now: {err:#}."
629 );
630 return;
631 }
632 Err(_err) => {
633 }
635 }
636 }
637
638 stats::maybe_update_message_stats(context)
640 .await
641 .log_err(context)
642 .ok();
643
644 delete_expired_messages(context, time())
645 .await
646 .log_err(context)
647 .ok();
648
649 location::delete_expired(context, time())
650 .await
651 .log_err(context)
652 .ok();
653 }
654}
655
656pub(crate) async fn delete_expired_imap_messages(context: &Context) -> Result<()> {
658 let now = time();
659
660 let (threshold_timestamp, threshold_timestamp_extended) =
661 match context.get_config_delete_server_after().await? {
662 None => (0, 0),
663 Some(delete_server_after) => (
664 match delete_server_after {
665 0 => i64::MAX,
667 _ => now - delete_server_after,
668 },
669 now - max(delete_server_after, MIN_DELETE_SERVER_AFTER),
670 ),
671 };
672 let target = context.get_delete_msgs_target().await?;
673
674 context
675 .sql
676 .execute(
677 "UPDATE imap
678 SET target=?
679 WHERE rfc724_mid IN (
680 SELECT rfc724_mid FROM msgs
681 WHERE ((download_state = 0 AND timestamp < ?) OR
682 (download_state != 0 AND timestamp < ?) OR
683 (ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?))
684 )",
685 (
686 &target,
687 threshold_timestamp,
688 threshold_timestamp_extended,
689 now,
690 ),
691 )
692 .await?;
693
694 Ok(())
695}
696
697pub(crate) async fn start_ephemeral_timers(context: &Context) -> Result<()> {
707 context
708 .sql
709 .execute(
710 "UPDATE msgs \
711 SET ephemeral_timestamp = ? + ephemeral_timer \
712 WHERE ephemeral_timer > 0 \
713 AND ephemeral_timestamp = 0 \
714 AND state NOT IN (?, ?, ?)",
715 (
716 time(),
717 MessageState::InFresh,
718 MessageState::InNoticed,
719 MessageState::OutDraft,
720 ),
721 )
722 .await?;
723
724 Ok(())
725}
726
727#[cfg(test)]
728mod ephemeral_tests;