1use std::cmp::max;
66use std::collections::BTreeSet;
67use std::fmt;
68use std::num::ParseIntError;
69use std::str::FromStr;
70use std::time::{Duration, UNIX_EPOCH};
71
72use anyhow::{Context as _, Result, ensure};
73use async_channel::Receiver;
74use serde::{Deserialize, Serialize};
75use tokio::time::timeout;
76
77use crate::chat::{ChatId, ChatIdBlocked, send_msg};
78use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH};
79use crate::contact::ContactId;
80use crate::context::Context;
81use crate::download::MIN_DELETE_SERVER_AFTER;
82use crate::events::EventType;
83use crate::log::{LogExt, warn};
84use crate::message::{Message, MessageState, MsgId, Viewtype};
85use crate::mimeparser::SystemMessage;
86use crate::stock_str;
87use crate::tools::{SystemTime, duration_to_str, time};
88use crate::{location, stats};
89
90#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize, Default)]
92pub enum Timer {
93 #[default]
95 Disabled,
96
97 Enabled {
99 duration: u32,
103 },
104}
105
106impl Timer {
107 pub fn to_u32(self) -> u32 {
111 match self {
112 Self::Disabled => 0,
113 Self::Enabled { duration } => duration,
114 }
115 }
116
117 pub fn from_u32(duration: u32) -> Self {
121 if duration == 0 {
122 Self::Disabled
123 } else {
124 Self::Enabled { duration }
125 }
126 }
127}
128
129impl fmt::Display for Timer {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 write!(f, "{}", self.to_u32())
132 }
133}
134
135impl FromStr for Timer {
136 type Err = ParseIntError;
137
138 fn from_str(input: &str) -> Result<Timer, ParseIntError> {
139 input.parse::<u32>().map(Self::from_u32)
140 }
141}
142
143impl rusqlite::types::ToSql for Timer {
144 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
145 let val = rusqlite::types::Value::Integer(match self {
146 Self::Disabled => 0,
147 Self::Enabled { duration } => i64::from(*duration),
148 });
149 let out = rusqlite::types::ToSqlOutput::Owned(val);
150 Ok(out)
151 }
152}
153
154impl rusqlite::types::FromSql for Timer {
155 fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult<Self> {
156 i64::column_result(value).and_then(|value| {
157 if value == 0 {
158 Ok(Self::Disabled)
159 } else if let Ok(duration) = u32::try_from(value) {
160 Ok(Self::Enabled { duration })
161 } else {
162 Err(rusqlite::types::FromSqlError::OutOfRange(value))
163 }
164 })
165 }
166}
167
168impl ChatId {
169 pub async fn get_ephemeral_timer(self, context: &Context) -> Result<Timer> {
171 let timer = context
172 .sql
173 .query_get_value(
174 "SELECT IFNULL(ephemeral_timer, 0) FROM chats WHERE id=?",
175 (self,),
176 )
177 .await?
178 .with_context(|| format!("Chat {self} not found"))?;
179 Ok(timer)
180 }
181
182 pub(crate) async fn inner_set_ephemeral_timer(
187 self,
188 context: &Context,
189 timer: Timer,
190 ) -> Result<()> {
191 ensure!(!self.is_special(), "Invalid chat ID");
192
193 context
194 .sql
195 .execute(
196 "UPDATE chats
197 SET ephemeral_timer=?
198 WHERE id=?;",
199 (timer, self),
200 )
201 .await?;
202
203 context.emit_event(EventType::ChatEphemeralTimerModified {
204 chat_id: self,
205 timer,
206 });
207 Ok(())
208 }
209
210 pub async fn set_ephemeral_timer(self, context: &Context, timer: Timer) -> Result<()> {
214 if timer == self.get_ephemeral_timer(context).await? {
215 return Ok(());
216 }
217 self.inner_set_ephemeral_timer(context, timer).await?;
218
219 if self.is_promoted(context).await? {
220 let mut msg = Message::new_text(
221 stock_ephemeral_timer_changed(context, timer, ContactId::SELF).await,
222 );
223 msg.param.set_cmd(SystemMessage::EphemeralTimerChanged);
224 if let Err(err) = send_msg(context, self, &mut msg).await {
225 error!(
226 context,
227 "Failed to send a message about ephemeral message timer change: {:?}", err
228 );
229 }
230 }
231 Ok(())
232 }
233}
234
235pub(crate) async fn stock_ephemeral_timer_changed(
237 context: &Context,
238 timer: Timer,
239 from_id: ContactId,
240) -> String {
241 match timer {
242 Timer::Disabled => stock_str::msg_ephemeral_timer_disabled(context, from_id).await,
243 Timer::Enabled { duration } => match duration {
244 0..=60 => {
245 stock_str::msg_ephemeral_timer_enabled(context, &timer.to_string(), from_id).await
246 }
247 61..=3599 => {
248 stock_str::msg_ephemeral_timer_minutes(
249 context,
250 &format!("{}", (f64::from(duration) / 6.0).round() / 10.0),
251 from_id,
252 )
253 .await
254 }
255 3600 => stock_str::msg_ephemeral_timer_hour(context, from_id).await,
256 3601..=86399 => {
257 stock_str::msg_ephemeral_timer_hours(
258 context,
259 &format!("{}", (f64::from(duration) / 360.0).round() / 10.0),
260 from_id,
261 )
262 .await
263 }
264 86400 => stock_str::msg_ephemeral_timer_day(context, from_id).await,
265 86401..=604_799 => {
266 stock_str::msg_ephemeral_timer_days(
267 context,
268 &format!("{}", (f64::from(duration) / 8640.0).round() / 10.0),
269 from_id,
270 )
271 .await
272 }
273 604_800 => stock_str::msg_ephemeral_timer_week(context, from_id).await,
274 31_536_000..=31_708_800 => stock_str::msg_ephemeral_timer_year(context, from_id).await,
275 _ => {
276 stock_str::msg_ephemeral_timer_weeks(
277 context,
278 &format!("{}", (f64::from(duration) / 60480.0).round() / 10.0),
279 from_id,
280 )
281 .await
282 }
283 },
284 }
285}
286
287impl MsgId {
288 pub(crate) async fn ephemeral_timer(self, context: &Context) -> Result<Timer> {
290 let res = match context
291 .sql
292 .query_get_value("SELECT ephemeral_timer FROM msgs WHERE id=?", (self,))
293 .await?
294 {
295 None | Some(0) => Timer::Disabled,
296 Some(duration) => Timer::Enabled { duration },
297 };
298 Ok(res)
299 }
300
301 pub(crate) async fn start_ephemeral_timer(self, context: &Context) -> Result<()> {
303 if let Timer::Enabled { duration } = self.ephemeral_timer(context).await? {
304 let ephemeral_timestamp = time().saturating_add(duration.into());
305
306 context
307 .sql
308 .execute(
309 "UPDATE msgs SET ephemeral_timestamp = ? \
310 WHERE (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?) \
311 AND id = ?",
312 (ephemeral_timestamp, ephemeral_timestamp, self),
313 )
314 .await?;
315 context.scheduler.interrupt_ephemeral_task().await;
316 }
317 Ok(())
318 }
319}
320
321pub(crate) async fn start_ephemeral_timers_msgids(
322 context: &Context,
323 msg_ids: &[MsgId],
324) -> Result<()> {
325 let now = time();
326 let should_interrupt =
327 context
328 .sql
329 .transaction(move |transaction| {
330 let mut should_interrupt = false;
331 let mut stmt =
332 transaction.prepare(
333 "UPDATE msgs SET ephemeral_timestamp = ?1 + ephemeral_timer
334 WHERE (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?1 + ephemeral_timer) AND ephemeral_timer > 0
335 AND id=?2")?;
336 for msg_id in msg_ids {
337 should_interrupt |= stmt.execute((now, msg_id))? > 0;
338 }
339 Ok(should_interrupt)
340 }).await?;
341 if should_interrupt {
342 context.scheduler.interrupt_ephemeral_task().await;
343 }
344 Ok(())
345}
346
347pub(crate) async fn start_chat_ephemeral_timers(context: &Context, chat_id: ChatId) -> Result<()> {
351 let now = time();
352 let should_interrupt = context
353 .sql
354 .execute(
355 "UPDATE msgs SET ephemeral_timestamp = ?1 + ephemeral_timer
356 WHERE chat_id = ?2
357 AND ephemeral_timer > 0
358 AND (ephemeral_timestamp == 0 OR ephemeral_timestamp > ?1 + ephemeral_timer)",
359 (now, chat_id),
360 )
361 .await?
362 > 0;
363 if should_interrupt {
364 context.scheduler.interrupt_ephemeral_task().await;
365 }
366 Ok(())
367}
368
369async fn select_expired_messages(
378 context: &Context,
379 now: i64,
380) -> Result<Vec<(MsgId, ChatId, Viewtype, u32)>> {
381 let mut rows = context
382 .sql
383 .query_map_vec(
384 r#"
385SELECT id, chat_id, type, location_id
386FROM msgs
387WHERE
388 ephemeral_timestamp != 0
389 AND ephemeral_timestamp <= ?
390 AND chat_id != ?
391"#,
392 (now, DC_CHAT_ID_TRASH),
393 |row| {
394 let id: MsgId = row.get("id")?;
395 let chat_id: ChatId = row.get("chat_id")?;
396 let viewtype: Viewtype = row
397 .get("type")
398 .context("Using default viewtype for ephemeral handling.")
399 .log_err(context)
400 .unwrap_or_default();
401 let location_id: u32 = row.get("location_id")?;
402 Ok((id, chat_id, viewtype, location_id))
403 },
404 )
405 .await?;
406
407 if let Some(delete_device_after) = context.get_config_delete_device_after().await? {
408 let self_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::SELF)
409 .await?
410 .map(|c| c.id)
411 .unwrap_or_default();
412 let device_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::DEVICE)
413 .await?
414 .map(|c| c.id)
415 .unwrap_or_default();
416
417 let threshold_timestamp = now.saturating_sub(delete_device_after);
418
419 let rows_expired = context
420 .sql
421 .query_map_vec(
422 r#"
423SELECT id, chat_id, type, location_id
424FROM msgs
425WHERE
426 timestamp < ?1
427 AND timestamp_rcvd < ?1
428 AND chat_id > ?
429 AND chat_id != ?
430 AND chat_id != ?
431"#,
432 (
433 threshold_timestamp,
434 DC_CHAT_ID_LAST_SPECIAL,
435 self_chat_id,
436 device_chat_id,
437 ),
438 |row| {
439 let id: MsgId = row.get("id")?;
440 let chat_id: ChatId = row.get("chat_id")?;
441 let viewtype: Viewtype = row
442 .get("type")
443 .context("Using default viewtype for delete-old handling.")
444 .log_err(context)
445 .unwrap_or_default();
446 let location_id: u32 = row.get("location_id")?;
447 Ok((id, chat_id, viewtype, location_id))
448 },
449 )
450 .await?;
451
452 rows.extend(rows_expired);
453 }
454
455 Ok(rows)
456}
457
458pub(crate) async fn delete_expired_messages(context: &Context, now: i64) -> Result<()> {
464 let rows = select_expired_messages(context, now).await?;
465
466 if !rows.is_empty() {
467 info!(context, "Attempting to delete {} messages.", rows.len());
468
469 let (msgs_changed, webxdc_deleted) = context
470 .sql
471 .transaction(|transaction| {
472 let mut msgs_changed = Vec::with_capacity(rows.len());
473 let mut webxdc_deleted = Vec::new();
474 let mut del_msg_stmt = transaction.prepare(
477 "INSERT OR REPLACE INTO msgs (id, rfc724_mid, timestamp, chat_id)
478 SELECT ?1, rfc724_mid, timestamp, ? FROM msgs WHERE id=?1",
479 )?;
480 let mut del_location_stmt =
481 transaction.prepare("DELETE FROM locations WHERE independent=1 AND id=?")?;
482 for (msg_id, chat_id, viewtype, location_id) in rows {
483 del_msg_stmt.execute((msg_id, DC_CHAT_ID_TRASH))?;
484 if location_id > 0 {
485 del_location_stmt.execute((location_id,))?;
486 }
487
488 msgs_changed.push((chat_id, msg_id));
489 if viewtype == Viewtype::Webxdc {
490 webxdc_deleted.push(msg_id)
491 }
492 }
493 Ok((msgs_changed, webxdc_deleted))
494 })
495 .await?;
496
497 let mut modified_chat_ids = BTreeSet::new();
498
499 for (chat_id, msg_id) in msgs_changed {
500 context.emit_event(EventType::MsgDeleted { chat_id, msg_id });
501 modified_chat_ids.insert(chat_id);
502 }
503
504 for modified_chat_id in modified_chat_ids {
505 context.emit_msgs_changed_without_msg_id(modified_chat_id);
506 }
507
508 for msg_id in webxdc_deleted {
509 context.emit_event(EventType::WebxdcInstanceDeleted { msg_id });
510 }
511 }
512
513 Ok(())
514}
515
516async fn next_delete_device_after_timestamp(context: &Context) -> Result<Option<i64>> {
519 if let Some(delete_device_after) = context.get_config_delete_device_after().await? {
520 let self_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::SELF)
521 .await?
522 .map(|c| c.id)
523 .unwrap_or_default();
524 let device_chat_id = ChatIdBlocked::lookup_by_contact(context, ContactId::DEVICE)
525 .await?
526 .map(|c| c.id)
527 .unwrap_or_default();
528
529 let oldest_message_timestamp: Option<i64> = context
530 .sql
531 .query_get_value(
532 r#"
533 SELECT min(max(timestamp, timestamp_rcvd))
534 FROM msgs
535 WHERE chat_id > ?
536 AND chat_id != ?
537 AND chat_id != ?
538 HAVING count(*) > 0
539 "#,
540 (DC_CHAT_ID_TRASH, self_chat_id, device_chat_id),
541 )
542 .await?;
543
544 Ok(oldest_message_timestamp.map(|x| x.saturating_add(delete_device_after)))
545 } else {
546 Ok(None)
547 }
548}
549
550async fn next_expiration_timestamp(context: &Context) -> Option<i64> {
555 let ephemeral_timestamp: Option<i64> = match context
556 .sql
557 .query_get_value(
558 r#"
559 SELECT min(ephemeral_timestamp)
560 FROM msgs
561 WHERE ephemeral_timestamp != 0
562 AND chat_id != ?
563 HAVING count(*) > 0
564 "#,
565 (DC_CHAT_ID_TRASH,), )
567 .await
568 {
569 Err(err) => {
570 warn!(context, "Can't calculate next ephemeral timeout: {}", err);
571 None
572 }
573 Ok(ephemeral_timestamp) => ephemeral_timestamp,
574 };
575
576 let delete_device_after_timestamp: Option<i64> =
577 match next_delete_device_after_timestamp(context).await {
578 Err(err) => {
579 warn!(
580 context,
581 "Can't calculate timestamp of the next message expiration: {}", err
582 );
583 None
584 }
585 Ok(timestamp) => timestamp,
586 };
587
588 ephemeral_timestamp
589 .into_iter()
590 .chain(delete_device_after_timestamp)
591 .min()
592}
593
594pub(crate) async fn ephemeral_loop(context: &Context, interrupt_receiver: Receiver<()>) {
595 loop {
596 let ephemeral_timestamp = next_expiration_timestamp(context).await;
597
598 let now = SystemTime::now();
599 let until = if let Some(ephemeral_timestamp) = ephemeral_timestamp {
600 UNIX_EPOCH
601 + Duration::from_secs(ephemeral_timestamp.try_into().unwrap_or(u64::MAX))
602 + Duration::from_secs(1)
603 } else {
604 now + Duration::from_secs(86400) };
607
608 if let Ok(duration) = until.duration_since(now) {
609 info!(
610 context,
611 "Ephemeral loop waiting for deletion in {} or interrupt",
612 duration_to_str(duration)
613 );
614 match timeout(duration, interrupt_receiver.recv()).await {
615 Ok(Ok(())) => {
616 continue;
618 }
619 Ok(Err(err)) => {
620 warn!(
621 context,
622 "Interrupt channel closed, ephemeral loop exits now: {err:#}."
623 );
624 return;
625 }
626 Err(_err) => {
627 }
629 }
630 }
631
632 stats::maybe_update_message_stats(context)
634 .await
635 .log_err(context)
636 .ok();
637
638 delete_expired_messages(context, time())
639 .await
640 .log_err(context)
641 .ok();
642
643 location::delete_expired(context, time())
644 .await
645 .log_err(context)
646 .ok();
647 }
648}
649
650pub(crate) async fn delete_expired_imap_messages(context: &Context) -> Result<()> {
652 let now = time();
653
654 let (threshold_timestamp, threshold_timestamp_extended) =
655 match context.get_config_delete_server_after().await? {
656 None => (0, 0),
657 Some(delete_server_after) => (
658 match delete_server_after {
659 0 => i64::MAX,
661 _ => now - delete_server_after,
662 },
663 now - max(delete_server_after, MIN_DELETE_SERVER_AFTER),
664 ),
665 };
666 let target = context.get_delete_msgs_target().await?;
667
668 context
669 .sql
670 .execute(
671 "UPDATE imap
672 SET target=?
673 WHERE rfc724_mid IN (
674 SELECT rfc724_mid FROM msgs
675 WHERE ((download_state = 0 AND timestamp < ?) OR
676 (download_state != 0 AND timestamp < ?) OR
677 (ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?))
678 )",
679 (
680 &target,
681 threshold_timestamp,
682 threshold_timestamp_extended,
683 now,
684 ),
685 )
686 .await?;
687
688 Ok(())
689}
690
691pub(crate) async fn start_ephemeral_timers(context: &Context) -> Result<()> {
701 context
702 .sql
703 .execute(
704 "UPDATE msgs \
705 SET ephemeral_timestamp = ? + ephemeral_timer \
706 WHERE ephemeral_timer > 0 \
707 AND ephemeral_timestamp = 0 \
708 AND state NOT IN (?, ?, ?)",
709 (
710 time(),
711 MessageState::InFresh,
712 MessageState::InNoticed,
713 MessageState::OutDraft,
714 ),
715 )
716 .await?;
717
718 Ok(())
719}
720
721#[cfg(test)]
722mod ephemeral_tests;