1use crate::chat::ChatIdBlocked;
6use crate::chat::{Chat, ChatId, send_msg};
7use crate::config::Config;
8use crate::constants::{Blocked, Chattype};
9use crate::contact::ContactId;
10use crate::context::{Context, WeakContext};
11use crate::events::EventType;
12use crate::headerdef::HeaderDef;
13use crate::log::warn;
14use crate::message::{Message, MsgId, Viewtype};
15use crate::mimeparser::{MimeMessage, SystemMessage};
16use crate::net::dns::lookup_host_with_cache;
17use crate::param::Param;
18use crate::stock_str;
19use crate::tools::{normalize_text, time};
20use anyhow::{Context as _, Result, ensure};
21use deltachat_derive::{FromSql, ToSql};
22use num_traits::FromPrimitive;
23use serde::Serialize;
24use std::str::FromStr;
25use std::time::Duration;
26use tokio::task;
27use tokio::time::sleep;
28
29const RINGING_SECONDS: i64 = 120;
39
40const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
43const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
44
45const STUN_PORT: u16 = 3478;
46
47const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2;
55
56#[derive(Debug, Default)]
58pub struct CallInfo {
59 pub place_call_info: String,
61
62 pub accept_call_info: String,
64
65 pub msg: Message,
68}
69
70impl CallInfo {
71 pub fn is_incoming(&self) -> bool {
73 self.msg.from_id != ContactId::SELF
74 }
75
76 pub fn is_stale(&self) -> bool {
78 (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0
79 }
80
81 fn remaining_ring_seconds(&self) -> i64 {
82 let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
83 remaining_seconds.clamp(0, RINGING_SECONDS)
84 }
85
86 async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
87 context
88 .sql
89 .execute(
90 "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
91 (text, normalize_text(text), self.msg.id),
92 )
93 .await?;
94 Ok(())
95 }
96
97 async fn update_text_duration(&self, context: &Context) -> Result<()> {
98 let minutes = self.duration_seconds() / 60;
99 let duration = match minutes {
100 0 => "<1 minute".to_string(),
101 1 => "1 minute".to_string(),
102 n => format!("{n} minutes"),
103 };
104
105 if self.is_incoming() {
106 let incoming_call_str =
107 stock_str::incoming_call(context, self.has_video_initially()).await;
108 self.update_text(context, &format!("{incoming_call_str}\n{duration}"))
109 .await?;
110 } else {
111 let outgoing_call_str =
112 stock_str::outgoing_call(context, self.has_video_initially()).await;
113 self.update_text(context, &format!("{outgoing_call_str}\n{duration}"))
114 .await?;
115 }
116 Ok(())
117 }
118
119 async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
122 self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
123 self.msg.update_param(context).await?;
124 Ok(())
125 }
126
127 pub fn is_accepted(&self) -> bool {
129 self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
130 }
131
132 pub fn has_video_initially(&self) -> bool {
134 self.msg
135 .param
136 .get_bool(Param::WebrtcHasVideoInitially)
137 .unwrap_or(false)
138 }
139
140 pub fn is_canceled(&self) -> bool {
148 self.msg.param.exists(CALL_CANCELED_TIMESTAMP)
149 }
150
151 async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
152 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
153 self.msg.update_param(context).await?;
154 Ok(())
155 }
156
157 async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> {
165 let now = time();
166 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
167 self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
168 self.msg.update_param(context).await?;
169 Ok(())
170 }
171
172 pub fn is_ended(&self) -> bool {
174 self.msg.param.exists(CALL_ENDED_TIMESTAMP)
175 }
176
177 pub fn duration_seconds(&self) -> i64 {
179 if let (Some(start), Some(end)) = (
180 self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
181 self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
182 ) {
183 let seconds = end - start;
184 if seconds <= 0 {
185 return 1;
186 }
187 return seconds;
188 }
189 0
190 }
191}
192
193impl Context {
194 pub async fn place_outgoing_call(
196 &self,
197 chat_id: ChatId,
198 place_call_info: String,
199 has_video_initially: bool,
200 ) -> Result<MsgId> {
201 let chat = Chat::load_from_db(self, chat_id).await?;
202 ensure!(
203 chat.typ == Chattype::Single,
204 "Can only place calls in 1:1 chats"
205 );
206 ensure!(!chat.is_self_talk(), "Cannot call self");
207
208 let outgoing_call_str = stock_str::outgoing_call(self, has_video_initially).await;
209 let mut call = Message {
210 viewtype: Viewtype::Call,
211 text: outgoing_call_str,
212 ..Default::default()
213 };
214 call.param.set(Param::WebrtcRoom, &place_call_info);
215 call.param
216 .set_int(Param::WebrtcHasVideoInitially, has_video_initially.into());
217 call.id = send_msg(self, chat_id, &mut call).await?;
218
219 let wait = RINGING_SECONDS;
220 let context = self.get_weak_context();
221 task::spawn(Context::emit_end_call_if_unaccepted(
222 context,
223 wait.try_into()?,
224 call.id,
225 ));
226
227 Ok(call.id)
228 }
229
230 pub async fn accept_incoming_call(
232 &self,
233 call_id: MsgId,
234 accept_call_info: String,
235 ) -> Result<()> {
236 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
237 format!("accept_incoming_call is called with {call_id} which does not refer to a call")
238 })?;
239 ensure!(call.is_incoming());
240 if call.is_accepted() || call.is_ended() {
241 info!(self, "Call already accepted/ended");
242 return Ok(());
243 }
244
245 call.mark_as_accepted(self).await?;
246 let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
247 if chat.is_contact_request() {
248 chat.id.accept(self).await?;
249 }
250
251 let mut msg = Message {
253 viewtype: Viewtype::Text,
254 text: "[Call accepted]".into(),
255 ..Default::default()
256 };
257 msg.param.set_cmd(SystemMessage::CallAccepted);
258 msg.hidden = true;
259 msg.param
260 .set(Param::WebrtcAccepted, accept_call_info.to_string());
261 msg.set_quote(self, Some(&call.msg)).await?;
262 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
263 self.emit_event(EventType::IncomingCallAccepted {
264 msg_id: call.msg.id,
265 chat_id: call.msg.chat_id,
266 });
267 self.emit_msgs_changed(call.msg.chat_id, call_id);
268 Ok(())
269 }
270
271 pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
273 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
274 format!("end_call is called with {call_id} which does not refer to a call")
275 })?;
276 if call.is_ended() {
277 info!(self, "Call already ended");
278 return Ok(());
279 }
280
281 if !call.is_accepted() {
282 if call.is_incoming() {
283 call.mark_as_ended(self).await?;
284 let declined_call_str = stock_str::declined_call(self).await;
285 call.update_text(self, &declined_call_str).await?;
286 } else {
287 call.mark_as_canceled(self).await?;
288 let canceled_call_str = stock_str::canceled_call(self).await;
289 call.update_text(self, &canceled_call_str).await?;
290 }
291 } else {
292 call.mark_as_ended(self).await?;
293 call.update_text_duration(self).await?;
294 }
295
296 let mut msg = Message {
297 viewtype: Viewtype::Text,
298 text: "[Call ended]".into(),
299 ..Default::default()
300 };
301 msg.param.set_cmd(SystemMessage::CallEnded);
302 msg.hidden = true;
303 msg.set_quote(self, Some(&call.msg)).await?;
304 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
305
306 self.emit_event(EventType::CallEnded {
307 msg_id: call.msg.id,
308 chat_id: call.msg.chat_id,
309 });
310 self.emit_msgs_changed(call.msg.chat_id, call_id);
311 Ok(())
312 }
313
314 async fn emit_end_call_if_unaccepted(
315 context: WeakContext,
316 wait: u64,
317 call_id: MsgId,
318 ) -> Result<()> {
319 sleep(Duration::from_secs(wait)).await;
320 let context = context.upgrade()?;
321 let Some(mut call) = context.load_call_by_id(call_id).await? else {
322 warn!(
323 context,
324 "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call."
325 );
326 return Ok(());
327 };
328 if !call.is_accepted() && !call.is_ended() {
329 if call.is_incoming() {
330 call.mark_as_canceled(&context).await?;
331 let missed_call_str = stock_str::missed_call(&context).await;
332 call.update_text(&context, &missed_call_str).await?;
333 } else {
334 call.mark_as_ended(&context).await?;
335 let canceled_call_str = stock_str::canceled_call(&context).await;
336 call.update_text(&context, &canceled_call_str).await?;
337 }
338 context.emit_msgs_changed(call.msg.chat_id, call_id);
339 context.emit_event(EventType::CallEnded {
340 msg_id: call.msg.id,
341 chat_id: call.msg.chat_id,
342 });
343 }
344 Ok(())
345 }
346
347 pub(crate) async fn handle_call_msg(
348 &self,
349 call_id: MsgId,
350 mime_message: &MimeMessage,
351 from_id: ContactId,
352 ) -> Result<()> {
353 if mime_message.is_call() {
354 let Some(call) = self.load_call_by_id(call_id).await? else {
355 warn!(self, "{call_id} does not refer to a call message");
356 return Ok(());
357 };
358
359 if call.is_incoming() {
360 if call.is_stale() {
361 let missed_call_str = stock_str::missed_call(self).await;
362 call.update_text(self, &missed_call_str).await?;
363 self.emit_incoming_msg(call.msg.chat_id, call_id); } else {
365 let incoming_call_str =
366 stock_str::incoming_call(self, call.has_video_initially()).await;
367 call.update_text(self, &incoming_call_str).await?;
368 self.emit_msgs_changed(call.msg.chat_id, call_id); let can_call_me = match who_can_call_me(self).await? {
370 WhoCanCallMe::Contacts => ChatIdBlocked::lookup_by_contact(self, from_id)
371 .await?
372 .is_some_and(|chat_id_blocked| {
373 match chat_id_blocked.blocked {
374 Blocked::Not => true,
375 Blocked::Yes | Blocked::Request => {
376 false
382 }
383 }
384 }),
385 WhoCanCallMe::Everybody => ChatIdBlocked::lookup_by_contact(self, from_id)
386 .await?
387 .is_none_or(|chat_id_blocked| chat_id_blocked.blocked != Blocked::Yes),
388 WhoCanCallMe::Nobody => false,
389 };
390 if can_call_me {
391 self.emit_event(EventType::IncomingCall {
392 msg_id: call.msg.id,
393 chat_id: call.msg.chat_id,
394 place_call_info: call.place_call_info.to_string(),
395 has_video: call.has_video_initially(),
396 });
397 }
398 let wait = call.remaining_ring_seconds();
399 let context = self.get_weak_context();
400 task::spawn(Context::emit_end_call_if_unaccepted(
401 context,
402 wait.try_into()?,
403 call.msg.id,
404 ));
405 }
406 } else {
407 let outgoing_call_str =
408 stock_str::outgoing_call(self, call.has_video_initially()).await;
409 call.update_text(self, &outgoing_call_str).await?;
410 self.emit_msgs_changed(call.msg.chat_id, call_id);
411 }
412 } else {
413 match mime_message.is_system_message {
414 SystemMessage::CallAccepted => {
415 let Some(mut call) = self.load_call_by_id(call_id).await? else {
416 warn!(self, "{call_id} does not refer to a call message");
417 return Ok(());
418 };
419
420 if call.is_ended() || call.is_accepted() {
421 info!(self, "CallAccepted received for accepted/ended call");
422 return Ok(());
423 }
424
425 call.mark_as_accepted(self).await?;
426 self.emit_msgs_changed(call.msg.chat_id, call_id);
427 if call.is_incoming() {
428 self.emit_event(EventType::IncomingCallAccepted {
429 msg_id: call.msg.id,
430 chat_id: call.msg.chat_id,
431 });
432 } else {
433 let accept_call_info = mime_message
434 .get_header(HeaderDef::ChatWebrtcAccepted)
435 .unwrap_or_default();
436 self.emit_event(EventType::OutgoingCallAccepted {
437 msg_id: call.msg.id,
438 chat_id: call.msg.chat_id,
439 accept_call_info: accept_call_info.to_string(),
440 });
441 }
442 }
443 SystemMessage::CallEnded => {
444 let Some(mut call) = self.load_call_by_id(call_id).await? else {
445 warn!(self, "{call_id} does not refer to a call message");
446 return Ok(());
447 };
448
449 if call.is_ended() {
450 info!(self, "CallEnded received for ended call");
452 return Ok(());
453 }
454
455 if !call.is_accepted() {
456 if call.is_incoming() {
457 if from_id == ContactId::SELF {
458 call.mark_as_ended(self).await?;
459 let declined_call_str = stock_str::declined_call(self).await;
460 call.update_text(self, &declined_call_str).await?;
461 } else {
462 call.mark_as_canceled(self).await?;
463 let missed_call_str = stock_str::missed_call(self).await;
464 call.update_text(self, &missed_call_str).await?;
465 }
466 } else {
467 if from_id == ContactId::SELF {
469 call.mark_as_canceled(self).await?;
470 let canceled_call_str = stock_str::canceled_call(self).await;
471 call.update_text(self, &canceled_call_str).await?;
472 } else {
473 call.mark_as_ended(self).await?;
474 let declined_call_str = stock_str::declined_call(self).await;
475 call.update_text(self, &declined_call_str).await?;
476 }
477 }
478 } else {
479 call.mark_as_ended(self).await?;
480 call.update_text_duration(self).await?;
481 }
482
483 self.emit_msgs_changed(call.msg.chat_id, call_id);
484 self.emit_event(EventType::CallEnded {
485 msg_id: call.msg.id,
486 chat_id: call.msg.chat_id,
487 });
488 }
489 _ => {}
490 }
491 }
492 Ok(())
493 }
494
495 pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
500 let call = Message::load_from_db(self, call_id).await?;
501 Ok(self.load_call_by_message(call))
502 }
503
504 fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
508 if call.viewtype != Viewtype::Call {
509 return None;
513 }
514
515 Some(CallInfo {
516 place_call_info: call
517 .param
518 .get(Param::WebrtcRoom)
519 .unwrap_or_default()
520 .to_string(),
521 accept_call_info: call
522 .param
523 .get(Param::WebrtcAccepted)
524 .unwrap_or_default()
525 .to_string(),
526 msg: call,
527 })
528 }
529}
530
531#[derive(Debug, PartialEq, Eq)]
533pub enum CallState {
534 Alerting,
540
541 Active,
543
544 Completed {
547 duration: i64,
549 },
550
551 Missed,
554
555 Declined,
559
560 Canceled,
567}
568
569pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> {
573 let call = context
574 .load_call_by_id(msg_id)
575 .await?
576 .with_context(|| format!("{msg_id} is not a call message"))?;
577 let state = if call.is_incoming() {
578 if call.is_accepted() {
579 if call.is_ended() {
580 CallState::Completed {
581 duration: call.duration_seconds(),
582 }
583 } else {
584 CallState::Active
585 }
586 } else if call.is_canceled() {
587 CallState::Missed
590 } else if call.is_ended() {
591 CallState::Declined
592 } else if call.is_stale() {
593 CallState::Missed
594 } else {
595 CallState::Alerting
596 }
597 } else if call.is_accepted() {
598 if call.is_ended() {
599 CallState::Completed {
600 duration: call.duration_seconds(),
601 }
602 } else {
603 CallState::Active
604 }
605 } else if call.is_canceled() {
606 CallState::Canceled
607 } else if call.is_ended() || call.is_stale() {
608 CallState::Declined
609 } else {
610 CallState::Alerting
611 };
612 Ok(state)
613}
614
615#[derive(Serialize, Debug, Clone, PartialEq)]
617struct IceServer {
618 pub urls: Vec<String>,
620
621 pub username: Option<String>,
623
624 pub credential: Option<String>,
626}
627
628pub(crate) async fn create_ice_servers_from_metadata(
638 metadata: &str,
639) -> Result<(i64, Vec<UnresolvedIceServer>)> {
640 let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?;
641 let (port, rest) = rest.split_once(':').context("Missing port")?;
642 let port = u16::from_str(port).context("Failed to parse the port")?;
643 let (ts, password) = rest.split_once(':').context("Missing timestamp")?;
644 let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?;
645 let ice_servers = vec![UnresolvedIceServer::Turn {
646 hostname: hostname.to_string(),
647 port,
648 username: ts.to_string(),
649 credential: password.to_string(),
650 }];
651 Ok((expiration_timestamp, ice_servers))
652}
653
654#[derive(Debug, Clone)]
656pub(crate) enum UnresolvedIceServer {
657 Stun { hostname: String, port: u16 },
659
660 Turn {
662 hostname: String,
663 port: u16,
664 username: String,
665 credential: String,
666 },
667}
668
669pub(crate) async fn resolve_ice_servers(
674 context: &Context,
675 unresolved_ice_servers: Vec<UnresolvedIceServer>,
676) -> Result<String> {
677 let mut result: Vec<IceServer> = Vec::new();
678
679 let load_cache = false;
681
682 for unresolved_ice_server in unresolved_ice_servers {
683 match unresolved_ice_server {
684 UnresolvedIceServer::Stun { hostname, port } => {
685 match lookup_host_with_cache(context, &hostname, port, "", load_cache).await {
686 Ok(addrs) => {
687 let urls: Vec<String> = addrs
688 .into_iter()
689 .map(|addr| format!("stun:{addr}"))
690 .collect();
691 let stun_server = IceServer {
692 urls,
693 username: None,
694 credential: None,
695 };
696 result.push(stun_server);
697 }
698 Err(err) => {
699 warn!(
700 context,
701 "Failed to resolve STUN {hostname}:{port}: {err:#}."
702 );
703 }
704 }
705 }
706 UnresolvedIceServer::Turn {
707 hostname,
708 port,
709 username,
710 credential,
711 } => match lookup_host_with_cache(context, &hostname, port, "", load_cache).await {
712 Ok(addrs) => {
713 let urls: Vec<String> = addrs
714 .into_iter()
715 .map(|addr| format!("turn:{addr}"))
716 .collect();
717 let turn_server = IceServer {
718 urls,
719 username: Some(username),
720 credential: Some(credential),
721 };
722 result.push(turn_server);
723 }
724 Err(err) => {
725 warn!(
726 context,
727 "Failed to resolve TURN {hostname}:{port}: {err:#}."
728 );
729 }
730 },
731 }
732 }
733 let json = serde_json::to_string(&result)?;
734 Ok(json)
735}
736
737pub(crate) fn create_fallback_ice_servers() -> Vec<UnresolvedIceServer> {
739 vec![
747 UnresolvedIceServer::Stun {
748 hostname: "nine.testrun.org".to_string(),
749 port: STUN_PORT,
750 },
751 UnresolvedIceServer::Turn {
752 hostname: "turn.delta.chat".to_string(),
753 port: STUN_PORT,
754 username: "public".to_string(),
755 credential: "o4tR7yG4rG2slhXqRUf9zgmHz".to_string(),
756 },
757 ]
758}
759
760pub async fn ice_servers(context: &Context) -> Result<String> {
770 if let Some(ref metadata) = *context.metadata.read().await {
771 let ice_servers = resolve_ice_servers(context, metadata.ice_servers.clone()).await?;
772 Ok(ice_servers)
773 } else {
774 Ok("[]".to_string())
775 }
776}
777
778#[derive(
780 Debug, Default, Display, Clone, Copy, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql,
781)]
782#[repr(u8)]
783pub enum WhoCanCallMe {
784 Everybody = 0,
788
789 #[default]
791 Contacts = 1,
792
793 Nobody = 2,
795}
796
797async fn who_can_call_me(context: &Context) -> Result<WhoCanCallMe> {
799 let who_can_call_me =
800 WhoCanCallMe::from_i32(context.get_config_int(Config::WhoCanCallMe).await?)
801 .unwrap_or_default();
802 Ok(who_can_call_me)
803}
804
805#[cfg(test)]
806mod calls_tests;