1use crate::chat::ChatIdBlocked;
6use crate::chat::{Chat, ChatId, send_msg};
7use crate::config::Config;
8use crate::constants::{Blocked, Chattype};
9use crate::contact::ContactId;
10use crate::context::{Context, WeakContext};
11use crate::events::EventType;
12use crate::headerdef::HeaderDef;
13use crate::log::warn;
14use crate::message::{Message, MsgId, Viewtype, markseen_msgs};
15use crate::mimeparser::{MimeMessage, SystemMessage};
16use crate::net::dns::lookup_host_with_cache;
17use crate::param::Param;
18use crate::stock_str;
19use crate::tools::{normalize_text, time};
20use anyhow::{Context as _, Result, ensure};
21use deltachat_derive::{FromSql, ToSql};
22use num_traits::FromPrimitive;
23use serde::Serialize;
24use std::str::FromStr;
25use std::time::Duration;
26use tokio::task;
27use tokio::time::sleep;
28
29const RINGING_SECONDS: i64 = 120;
39
40const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
43const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
44
45const STUN_PORT: u16 = 3478;
46
47const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2;
55
56#[derive(Debug, Default)]
58pub struct CallInfo {
59 pub place_call_info: String,
61
62 pub accept_call_info: String,
64
65 pub msg: Message,
68}
69
70impl CallInfo {
71 pub fn is_incoming(&self) -> bool {
73 self.msg.from_id != ContactId::SELF
74 }
75
76 pub fn is_stale(&self) -> bool {
78 (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0
79 }
80
81 fn remaining_ring_seconds(&self) -> i64 {
82 #[expect(clippy::arithmetic_side_effects)]
83 let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
84 remaining_seconds.clamp(0, RINGING_SECONDS)
85 }
86
87 async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
88 context
89 .sql
90 .execute(
91 "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
92 (text, normalize_text(text), self.msg.id),
93 )
94 .await?;
95 Ok(())
96 }
97
98 async fn update_text_duration(&self, context: &Context) -> Result<()> {
99 let minutes = self.duration_seconds() / 60;
100 let duration = match minutes {
101 0 => "<1 minute".to_string(),
102 1 => "1 minute".to_string(),
103 n => format!("{n} minutes"),
104 };
105
106 if self.is_incoming() {
107 let incoming_call_str = stock_str::incoming_call(context, self.has_video_initially());
108 self.update_text(context, &format!("{incoming_call_str}\n{duration}"))
109 .await?;
110 } else {
111 let outgoing_call_str = stock_str::outgoing_call(context, self.has_video_initially());
112 self.update_text(context, &format!("{outgoing_call_str}\n{duration}"))
113 .await?;
114 }
115 Ok(())
116 }
117
118 async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
121 self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
122 self.msg.update_param(context).await?;
123 Ok(())
124 }
125
126 pub fn is_accepted(&self) -> bool {
128 self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
129 }
130
131 pub fn has_video_initially(&self) -> bool {
133 self.msg
134 .param
135 .get_bool(Param::WebrtcHasVideoInitially)
136 .unwrap_or(false)
137 }
138
139 pub fn is_canceled(&self) -> bool {
147 self.msg.param.exists(CALL_CANCELED_TIMESTAMP)
148 }
149
150 async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
151 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
152 self.msg.update_param(context).await?;
153 Ok(())
154 }
155
156 async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> {
164 let now = time();
165 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
166 self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
167 self.msg.update_param(context).await?;
168 Ok(())
169 }
170
171 pub fn is_ended(&self) -> bool {
173 self.msg.param.exists(CALL_ENDED_TIMESTAMP)
174 }
175
176 #[expect(clippy::arithmetic_side_effects)]
178 pub fn duration_seconds(&self) -> i64 {
179 if let (Some(start), Some(end)) = (
180 self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
181 self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
182 ) {
183 let seconds = end - start;
184 if seconds <= 0 {
185 return 1;
186 }
187 return seconds;
188 }
189 0
190 }
191}
192
193impl Context {
194 pub async fn place_outgoing_call(
196 &self,
197 chat_id: ChatId,
198 place_call_info: String,
199 has_video_initially: bool,
200 ) -> Result<MsgId> {
201 let chat = Chat::load_from_db(self, chat_id).await?;
202 ensure!(
203 chat.typ == Chattype::Single,
204 "Can only place calls in 1:1 chats"
205 );
206 ensure!(!chat.is_self_talk(), "Cannot call self");
207
208 let outgoing_call_str = stock_str::outgoing_call(self, has_video_initially);
209 let mut call = Message {
210 viewtype: Viewtype::Call,
211 text: outgoing_call_str,
212 ..Default::default()
213 };
214 call.param.set(Param::WebrtcRoom, &place_call_info);
215 call.param
216 .set_int(Param::WebrtcHasVideoInitially, has_video_initially.into());
217 call.id = send_msg(self, chat_id, &mut call).await?;
218
219 let wait = RINGING_SECONDS;
220 let context = self.get_weak_context();
221 task::spawn(Context::emit_end_call_if_unaccepted(
222 context,
223 wait.try_into()?,
224 call.id,
225 ));
226
227 Ok(call.id)
228 }
229
230 pub async fn accept_incoming_call(
232 &self,
233 call_id: MsgId,
234 accept_call_info: String,
235 ) -> Result<()> {
236 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
237 format!("accept_incoming_call is called with {call_id} which does not refer to a call")
238 })?;
239 ensure!(call.is_incoming());
240 if call.is_accepted() || call.is_ended() {
241 info!(self, "Call already accepted/ended");
242 return Ok(());
243 }
244
245 call.mark_as_accepted(self).await?;
246 let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
247 if chat.is_contact_request() {
248 chat.id.accept(self).await?;
249 }
250 markseen_msgs(self, vec![call_id]).await?;
251
252 let mut msg = Message {
254 viewtype: Viewtype::Text,
255 text: "[Call accepted]".into(),
256 ..Default::default()
257 };
258 msg.param.set_cmd(SystemMessage::CallAccepted);
259 msg.hidden = true;
260 msg.param
261 .set(Param::WebrtcAccepted, accept_call_info.to_string());
262 msg.set_quote(self, Some(&call.msg)).await?;
263 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
264 self.emit_event(EventType::IncomingCallAccepted {
265 msg_id: call.msg.id,
266 chat_id: call.msg.chat_id,
267 from_this_device: true,
268 });
269 self.emit_msgs_changed(call.msg.chat_id, call_id);
270 Ok(())
271 }
272
273 pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
275 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
276 format!("end_call is called with {call_id} which does not refer to a call")
277 })?;
278 if call.is_ended() {
279 info!(self, "Call already ended");
280 return Ok(());
281 }
282
283 if !call.is_accepted() {
284 if call.is_incoming() {
285 call.mark_as_ended(self).await?;
286 markseen_msgs(self, vec![call_id]).await?;
287 let declined_call_str = stock_str::declined_call(self);
288 call.update_text(self, &declined_call_str).await?;
289 } else {
290 call.mark_as_canceled(self).await?;
291 let canceled_call_str = stock_str::canceled_call(self);
292 call.update_text(self, &canceled_call_str).await?;
293 }
294 } else {
295 call.mark_as_ended(self).await?;
296 call.update_text_duration(self).await?;
297 }
298
299 let mut msg = Message {
300 viewtype: Viewtype::Text,
301 text: "[Call ended]".into(),
302 ..Default::default()
303 };
304 msg.param.set_cmd(SystemMessage::CallEnded);
305 msg.hidden = true;
306 msg.set_quote(self, Some(&call.msg)).await?;
307 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
308
309 self.emit_event(EventType::CallEnded {
310 msg_id: call.msg.id,
311 chat_id: call.msg.chat_id,
312 });
313 self.emit_msgs_changed(call.msg.chat_id, call_id);
314 Ok(())
315 }
316
317 async fn emit_end_call_if_unaccepted(
318 context: WeakContext,
319 wait: u64,
320 call_id: MsgId,
321 ) -> Result<()> {
322 sleep(Duration::from_secs(wait)).await;
323 let context = context.upgrade()?;
324 let Some(mut call) = context.load_call_by_id(call_id).await? else {
325 warn!(
326 context,
327 "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call."
328 );
329 return Ok(());
330 };
331 if !call.is_accepted() && !call.is_ended() {
332 if call.is_incoming() {
333 call.mark_as_canceled(&context).await?;
334 let missed_call_str = stock_str::missed_call(&context);
335 call.update_text(&context, &missed_call_str).await?;
336 } else {
337 call.mark_as_ended(&context).await?;
338 let canceled_call_str = stock_str::canceled_call(&context);
339 call.update_text(&context, &canceled_call_str).await?;
340 }
341 context.emit_msgs_changed(call.msg.chat_id, call_id);
342 context.emit_event(EventType::CallEnded {
343 msg_id: call.msg.id,
344 chat_id: call.msg.chat_id,
345 });
346 }
347 Ok(())
348 }
349
350 pub(crate) async fn handle_call_msg(
351 &self,
352 call_id: MsgId,
353 mime_message: &MimeMessage,
354 from_id: ContactId,
355 ) -> Result<()> {
356 if mime_message.is_call() {
357 let Some(call) = self.load_call_by_id(call_id).await? else {
358 warn!(self, "{call_id} does not refer to a call message");
359 return Ok(());
360 };
361
362 if call.is_incoming() {
363 if call.is_stale() {
364 let missed_call_str = stock_str::missed_call(self);
365 call.update_text(self, &missed_call_str).await?;
366 self.emit_incoming_msg(call.msg.chat_id, call_id); } else {
368 let incoming_call_str =
369 stock_str::incoming_call(self, call.has_video_initially());
370 call.update_text(self, &incoming_call_str).await?;
371 self.emit_msgs_changed(call.msg.chat_id, call_id); let can_call_me = match who_can_call_me(self).await? {
373 WhoCanCallMe::Contacts => ChatIdBlocked::lookup_by_contact(self, from_id)
374 .await?
375 .is_some_and(|chat_id_blocked| {
376 match chat_id_blocked.blocked {
377 Blocked::Not => true,
378 Blocked::Yes | Blocked::Request => {
379 false
385 }
386 }
387 }),
388 WhoCanCallMe::Everybody => ChatIdBlocked::lookup_by_contact(self, from_id)
389 .await?
390 .is_none_or(|chat_id_blocked| chat_id_blocked.blocked != Blocked::Yes),
391 WhoCanCallMe::Nobody => false,
392 };
393 if can_call_me {
394 self.emit_event(EventType::IncomingCall {
395 msg_id: call.msg.id,
396 chat_id: call.msg.chat_id,
397 place_call_info: call.place_call_info.to_string(),
398 has_video: call.has_video_initially(),
399 });
400 }
401 let wait = call.remaining_ring_seconds();
402 let context = self.get_weak_context();
403 task::spawn(Context::emit_end_call_if_unaccepted(
404 context,
405 wait.try_into()?,
406 call.msg.id,
407 ));
408 }
409 } else {
410 let outgoing_call_str = stock_str::outgoing_call(self, call.has_video_initially());
411 call.update_text(self, &outgoing_call_str).await?;
412 self.emit_msgs_changed(call.msg.chat_id, call_id);
413 }
414 } else {
415 match mime_message.is_system_message {
416 SystemMessage::CallAccepted => {
417 let Some(mut call) = self.load_call_by_id(call_id).await? else {
418 warn!(self, "{call_id} does not refer to a call message");
419 return Ok(());
420 };
421
422 if call.is_ended() || call.is_accepted() {
423 info!(self, "CallAccepted received for accepted/ended call");
424 return Ok(());
425 }
426
427 call.mark_as_accepted(self).await?;
428 self.emit_msgs_changed(call.msg.chat_id, call_id);
429 if call.is_incoming() {
430 self.emit_event(EventType::IncomingCallAccepted {
431 msg_id: call.msg.id,
432 chat_id: call.msg.chat_id,
433 from_this_device: false,
434 });
435 } else {
436 let accept_call_info = mime_message
437 .get_header(HeaderDef::ChatWebrtcAccepted)
438 .unwrap_or_default();
439 self.emit_event(EventType::OutgoingCallAccepted {
440 msg_id: call.msg.id,
441 chat_id: call.msg.chat_id,
442 accept_call_info: accept_call_info.to_string(),
443 });
444 }
445 }
446 SystemMessage::CallEnded => {
447 let Some(mut call) = self.load_call_by_id(call_id).await? else {
448 warn!(self, "{call_id} does not refer to a call message");
449 return Ok(());
450 };
451
452 if call.is_ended() {
453 info!(self, "CallEnded received for ended call");
455 return Ok(());
456 }
457
458 if !call.is_accepted() {
459 if call.is_incoming() {
460 if from_id == ContactId::SELF {
461 call.mark_as_ended(self).await?;
462 let declined_call_str = stock_str::declined_call(self);
463 call.update_text(self, &declined_call_str).await?;
464 } else {
465 call.mark_as_canceled(self).await?;
466 let missed_call_str = stock_str::missed_call(self);
467 call.update_text(self, &missed_call_str).await?;
468 }
469 } else {
470 if from_id == ContactId::SELF {
472 call.mark_as_canceled(self).await?;
473 let canceled_call_str = stock_str::canceled_call(self);
474 call.update_text(self, &canceled_call_str).await?;
475 } else {
476 call.mark_as_ended(self).await?;
477 let declined_call_str = stock_str::declined_call(self);
478 call.update_text(self, &declined_call_str).await?;
479 }
480 }
481 } else {
482 call.mark_as_ended(self).await?;
483 call.update_text_duration(self).await?;
484 }
485
486 self.emit_msgs_changed(call.msg.chat_id, call_id);
487 self.emit_event(EventType::CallEnded {
488 msg_id: call.msg.id,
489 chat_id: call.msg.chat_id,
490 });
491 }
492 _ => {}
493 }
494 }
495 Ok(())
496 }
497
498 pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
503 let call = Message::load_from_db(self, call_id).await?;
504 Ok(self.load_call_by_message(call))
505 }
506
507 fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
511 if call.viewtype != Viewtype::Call {
512 return None;
516 }
517
518 Some(CallInfo {
519 place_call_info: call
520 .param
521 .get(Param::WebrtcRoom)
522 .unwrap_or_default()
523 .to_string(),
524 accept_call_info: call
525 .param
526 .get(Param::WebrtcAccepted)
527 .unwrap_or_default()
528 .to_string(),
529 msg: call,
530 })
531 }
532}
533
534#[derive(Debug, PartialEq, Eq)]
536pub enum CallState {
537 Alerting,
543
544 Active,
546
547 Completed {
550 duration: i64,
552 },
553
554 Missed,
557
558 Declined,
562
563 Canceled,
570}
571
572pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> {
576 let call = context
577 .load_call_by_id(msg_id)
578 .await?
579 .with_context(|| format!("{msg_id} is not a call message"))?;
580 let state = if call.is_incoming() {
581 if call.is_accepted() {
582 if call.is_ended() {
583 CallState::Completed {
584 duration: call.duration_seconds(),
585 }
586 } else {
587 CallState::Active
588 }
589 } else if call.is_canceled() {
590 CallState::Missed
593 } else if call.is_ended() {
594 CallState::Declined
595 } else if call.is_stale() {
596 CallState::Missed
597 } else {
598 CallState::Alerting
599 }
600 } else if call.is_accepted() {
601 if call.is_ended() {
602 CallState::Completed {
603 duration: call.duration_seconds(),
604 }
605 } else {
606 CallState::Active
607 }
608 } else if call.is_canceled() {
609 CallState::Canceled
610 } else if call.is_ended() || call.is_stale() {
611 CallState::Declined
612 } else {
613 CallState::Alerting
614 };
615 Ok(state)
616}
617
618#[derive(Serialize, Debug, Clone, PartialEq)]
620struct IceServer {
621 pub urls: Vec<String>,
623
624 pub username: Option<String>,
626
627 pub credential: Option<String>,
629}
630
631pub(crate) async fn create_ice_servers_from_metadata(
641 metadata: &str,
642) -> Result<(i64, Vec<UnresolvedIceServer>)> {
643 let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?;
644 let (port, rest) = rest.split_once(':').context("Missing port")?;
645 let port = u16::from_str(port).context("Failed to parse the port")?;
646 let (ts, password) = rest.split_once(':').context("Missing timestamp")?;
647 let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?;
648 let ice_servers = vec![UnresolvedIceServer::Turn {
649 hostname: hostname.to_string(),
650 port,
651 username: ts.to_string(),
652 credential: password.to_string(),
653 }];
654 Ok((expiration_timestamp, ice_servers))
655}
656
657#[derive(Debug, Clone)]
659pub(crate) enum UnresolvedIceServer {
660 Stun { hostname: String, port: u16 },
662
663 Turn {
665 hostname: String,
666 port: u16,
667 username: String,
668 credential: String,
669 },
670}
671
672pub(crate) async fn resolve_ice_servers(
677 context: &Context,
678 unresolved_ice_servers: Vec<UnresolvedIceServer>,
679) -> Result<String> {
680 let mut result: Vec<IceServer> = Vec::new();
681
682 let load_cache = false;
684
685 for unresolved_ice_server in unresolved_ice_servers {
686 match unresolved_ice_server {
687 UnresolvedIceServer::Stun { hostname, port } => {
688 match lookup_host_with_cache(context, &hostname, port, "", load_cache).await {
689 Ok(addrs) => {
690 let urls: Vec<String> = addrs
691 .into_iter()
692 .map(|addr| format!("stun:{addr}"))
693 .collect();
694 let stun_server = IceServer {
695 urls,
696 username: None,
697 credential: None,
698 };
699 result.push(stun_server);
700 }
701 Err(err) => {
702 warn!(
703 context,
704 "Failed to resolve STUN {hostname}:{port}: {err:#}."
705 );
706 }
707 }
708 }
709 UnresolvedIceServer::Turn {
710 hostname,
711 port,
712 username,
713 credential,
714 } => match lookup_host_with_cache(context, &hostname, port, "", load_cache).await {
715 Ok(addrs) => {
716 let urls: Vec<String> = addrs
717 .into_iter()
718 .map(|addr| format!("turn:{addr}"))
719 .collect();
720 let turn_server = IceServer {
721 urls,
722 username: Some(username),
723 credential: Some(credential),
724 };
725 result.push(turn_server);
726 }
727 Err(err) => {
728 warn!(
729 context,
730 "Failed to resolve TURN {hostname}:{port}: {err:#}."
731 );
732 }
733 },
734 }
735 }
736 let json = serde_json::to_string(&result)?;
737 Ok(json)
738}
739
740pub(crate) fn create_fallback_ice_servers() -> Vec<UnresolvedIceServer> {
742 vec![
750 UnresolvedIceServer::Stun {
751 hostname: "nine.testrun.org".to_string(),
752 port: STUN_PORT,
753 },
754 UnresolvedIceServer::Turn {
755 hostname: "turn.delta.chat".to_string(),
756 port: STUN_PORT,
757 username: "public".to_string(),
758 credential: "o4tR7yG4rG2slhXqRUf9zgmHz".to_string(),
759 },
760 ]
761}
762
763pub async fn ice_servers(context: &Context) -> Result<String> {
773 if let Some(ref metadata) = *context.metadata.read().await {
774 let ice_servers = resolve_ice_servers(context, metadata.ice_servers.clone()).await?;
775 Ok(ice_servers)
776 } else {
777 Ok("[]".to_string())
778 }
779}
780
781#[derive(
783 Debug, Default, Display, Clone, Copy, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql,
784)]
785#[repr(u8)]
786pub enum WhoCanCallMe {
787 Everybody = 0,
791
792 #[default]
794 Contacts = 1,
795
796 Nobody = 2,
798}
799
800async fn who_can_call_me(context: &Context) -> Result<WhoCanCallMe> {
802 let who_can_call_me =
803 WhoCanCallMe::from_i32(context.get_config_int(Config::WhoCanCallMe).await?)
804 .unwrap_or_default();
805 Ok(who_can_call_me)
806}
807
808#[cfg(test)]
809mod calls_tests;