1use crate::chat::ChatIdBlocked;
6use crate::chat::{Chat, ChatId, send_msg};
7use crate::config::Config;
8use crate::constants::{Blocked, Chattype};
9use crate::contact::ContactId;
10use crate::context::{Context, WeakContext};
11use crate::events::EventType;
12use crate::headerdef::HeaderDef;
13use crate::log::warn;
14use crate::message::{Message, MsgId, Viewtype};
15use crate::mimeparser::{MimeMessage, SystemMessage};
16use crate::net::dns::lookup_host_with_cache;
17use crate::param::Param;
18use crate::tools::{normalize_text, time};
19use anyhow::{Context as _, Result, ensure};
20use deltachat_derive::{FromSql, ToSql};
21use num_traits::FromPrimitive;
22use sdp::SessionDescription;
23use serde::Serialize;
24use std::io::Cursor;
25use std::str::FromStr;
26use std::time::Duration;
27use tokio::task;
28use tokio::time::sleep;
29
30const RINGING_SECONDS: i64 = 120;
40
41const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
44const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
45
46const STUN_PORT: u16 = 3478;
47
48const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2;
56
57#[derive(Debug, Default)]
59pub struct CallInfo {
60 pub place_call_info: String,
62
63 pub accept_call_info: String,
65
66 pub msg: Message,
69}
70
71impl CallInfo {
72 pub fn is_incoming(&self) -> bool {
74 self.msg.from_id != ContactId::SELF
75 }
76
77 pub fn is_stale(&self) -> bool {
79 (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0
80 }
81
82 fn remaining_ring_seconds(&self) -> i64 {
83 let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
84 remaining_seconds.clamp(0, RINGING_SECONDS)
85 }
86
87 async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
88 context
89 .sql
90 .execute(
91 "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
92 (text, normalize_text(text), self.msg.id),
93 )
94 .await?;
95 Ok(())
96 }
97
98 async fn update_text_duration(&self, context: &Context) -> Result<()> {
99 let minutes = self.duration_seconds() / 60;
100 let duration = match minutes {
101 0 => "<1 minute".to_string(),
102 1 => "1 minute".to_string(),
103 n => format!("{n} minutes"),
104 };
105
106 if self.is_incoming() {
107 self.update_text(context, &format!("Incoming call\n{duration}"))
108 .await?;
109 } else {
110 self.update_text(context, &format!("Outgoing call\n{duration}"))
111 .await?;
112 }
113 Ok(())
114 }
115
116 async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
119 self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
120 self.msg.update_param(context).await?;
121 Ok(())
122 }
123
124 pub fn is_accepted(&self) -> bool {
126 self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
127 }
128
129 pub fn is_canceled(&self) -> bool {
137 self.msg.param.exists(CALL_CANCELED_TIMESTAMP)
138 }
139
140 async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
141 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
142 self.msg.update_param(context).await?;
143 Ok(())
144 }
145
146 async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> {
154 let now = time();
155 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
156 self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
157 self.msg.update_param(context).await?;
158 Ok(())
159 }
160
161 pub fn is_ended(&self) -> bool {
163 self.msg.param.exists(CALL_ENDED_TIMESTAMP)
164 }
165
166 pub fn duration_seconds(&self) -> i64 {
168 if let (Some(start), Some(end)) = (
169 self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
170 self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
171 ) {
172 let seconds = end - start;
173 if seconds <= 0 {
174 return 1;
175 }
176 return seconds;
177 }
178 0
179 }
180}
181
182impl Context {
183 pub async fn place_outgoing_call(
185 &self,
186 chat_id: ChatId,
187 place_call_info: String,
188 ) -> Result<MsgId> {
189 let chat = Chat::load_from_db(self, chat_id).await?;
190 ensure!(
191 chat.typ == Chattype::Single,
192 "Can only place calls in 1:1 chats"
193 );
194 ensure!(!chat.is_self_talk(), "Cannot call self");
195
196 let mut call = Message {
197 viewtype: Viewtype::Call,
198 text: "Outgoing call".into(),
199 ..Default::default()
200 };
201 call.param.set(Param::WebrtcRoom, &place_call_info);
202 call.id = send_msg(self, chat_id, &mut call).await?;
203
204 let wait = RINGING_SECONDS;
205 let context = self.get_weak_context();
206 task::spawn(Context::emit_end_call_if_unaccepted(
207 context,
208 wait.try_into()?,
209 call.id,
210 ));
211
212 Ok(call.id)
213 }
214
215 pub async fn accept_incoming_call(
217 &self,
218 call_id: MsgId,
219 accept_call_info: String,
220 ) -> Result<()> {
221 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
222 format!("accept_incoming_call is called with {call_id} which does not refer to a call")
223 })?;
224 ensure!(call.is_incoming());
225 if call.is_accepted() || call.is_ended() {
226 info!(self, "Call already accepted/ended");
227 return Ok(());
228 }
229
230 call.mark_as_accepted(self).await?;
231 let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
232 if chat.is_contact_request() {
233 chat.id.accept(self).await?;
234 }
235
236 let mut msg = Message {
238 viewtype: Viewtype::Text,
239 text: "[Call accepted]".into(),
240 ..Default::default()
241 };
242 msg.param.set_cmd(SystemMessage::CallAccepted);
243 msg.hidden = true;
244 msg.param
245 .set(Param::WebrtcAccepted, accept_call_info.to_string());
246 msg.set_quote(self, Some(&call.msg)).await?;
247 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
248 self.emit_event(EventType::IncomingCallAccepted {
249 msg_id: call.msg.id,
250 chat_id: call.msg.chat_id,
251 });
252 self.emit_msgs_changed(call.msg.chat_id, call_id);
253 Ok(())
254 }
255
256 pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
258 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
259 format!("end_call is called with {call_id} which does not refer to a call")
260 })?;
261 if call.is_ended() {
262 info!(self, "Call already ended");
263 return Ok(());
264 }
265
266 if !call.is_accepted() {
267 if call.is_incoming() {
268 call.mark_as_ended(self).await?;
269 call.update_text(self, "Declined call").await?;
270 } else {
271 call.mark_as_canceled(self).await?;
272 call.update_text(self, "Canceled call").await?;
273 }
274 } else {
275 call.mark_as_ended(self).await?;
276 call.update_text_duration(self).await?;
277 }
278
279 let mut msg = Message {
280 viewtype: Viewtype::Text,
281 text: "[Call ended]".into(),
282 ..Default::default()
283 };
284 msg.param.set_cmd(SystemMessage::CallEnded);
285 msg.hidden = true;
286 msg.set_quote(self, Some(&call.msg)).await?;
287 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
288
289 self.emit_event(EventType::CallEnded {
290 msg_id: call.msg.id,
291 chat_id: call.msg.chat_id,
292 });
293 self.emit_msgs_changed(call.msg.chat_id, call_id);
294 Ok(())
295 }
296
297 async fn emit_end_call_if_unaccepted(
298 context: WeakContext,
299 wait: u64,
300 call_id: MsgId,
301 ) -> Result<()> {
302 sleep(Duration::from_secs(wait)).await;
303 let context = context.upgrade()?;
304 let Some(mut call) = context.load_call_by_id(call_id).await? else {
305 warn!(
306 context,
307 "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call."
308 );
309 return Ok(());
310 };
311 if !call.is_accepted() && !call.is_ended() {
312 if call.is_incoming() {
313 call.mark_as_canceled(&context).await?;
314 call.update_text(&context, "Missed call").await?;
315 } else {
316 call.mark_as_ended(&context).await?;
317 call.update_text(&context, "Canceled call").await?;
318 }
319 context.emit_msgs_changed(call.msg.chat_id, call_id);
320 context.emit_event(EventType::CallEnded {
321 msg_id: call.msg.id,
322 chat_id: call.msg.chat_id,
323 });
324 }
325 Ok(())
326 }
327
328 pub(crate) async fn handle_call_msg(
329 &self,
330 call_id: MsgId,
331 mime_message: &MimeMessage,
332 from_id: ContactId,
333 ) -> Result<()> {
334 if mime_message.is_call() {
335 let Some(call) = self.load_call_by_id(call_id).await? else {
336 warn!(self, "{call_id} does not refer to a call message");
337 return Ok(());
338 };
339
340 if call.is_incoming() {
341 if call.is_stale() {
342 call.update_text(self, "Missed call").await?;
343 self.emit_incoming_msg(call.msg.chat_id, call_id); } else {
345 call.update_text(self, "Incoming call").await?;
346 self.emit_msgs_changed(call.msg.chat_id, call_id); let has_video = match sdp_has_video(&call.place_call_info) {
348 Ok(has_video) => has_video,
349 Err(err) => {
350 warn!(self, "Failed to determine if SDP offer has video: {err:#}.");
351 false
352 }
353 };
354 let can_call_me = match who_can_call_me(self).await? {
355 WhoCanCallMe::Contacts => ChatIdBlocked::lookup_by_contact(self, from_id)
356 .await?
357 .is_some_and(|chat_id_blocked| {
358 match chat_id_blocked.blocked {
359 Blocked::Not => true,
360 Blocked::Yes | Blocked::Request => {
361 false
367 }
368 }
369 }),
370 WhoCanCallMe::Everybody => ChatIdBlocked::lookup_by_contact(self, from_id)
371 .await?
372 .is_none_or(|chat_id_blocked| chat_id_blocked.blocked != Blocked::Yes),
373 WhoCanCallMe::Nobody => false,
374 };
375 if can_call_me {
376 self.emit_event(EventType::IncomingCall {
377 msg_id: call.msg.id,
378 chat_id: call.msg.chat_id,
379 place_call_info: call.place_call_info.to_string(),
380 has_video,
381 });
382 }
383 let wait = call.remaining_ring_seconds();
384 let context = self.get_weak_context();
385 task::spawn(Context::emit_end_call_if_unaccepted(
386 context,
387 wait.try_into()?,
388 call.msg.id,
389 ));
390 }
391 } else {
392 call.update_text(self, "Outgoing call").await?;
393 self.emit_msgs_changed(call.msg.chat_id, call_id);
394 }
395 } else {
396 match mime_message.is_system_message {
397 SystemMessage::CallAccepted => {
398 let Some(mut call) = self.load_call_by_id(call_id).await? else {
399 warn!(self, "{call_id} does not refer to a call message");
400 return Ok(());
401 };
402
403 if call.is_ended() || call.is_accepted() {
404 info!(self, "CallAccepted received for accepted/ended call");
405 return Ok(());
406 }
407
408 call.mark_as_accepted(self).await?;
409 self.emit_msgs_changed(call.msg.chat_id, call_id);
410 if call.is_incoming() {
411 self.emit_event(EventType::IncomingCallAccepted {
412 msg_id: call.msg.id,
413 chat_id: call.msg.chat_id,
414 });
415 } else {
416 let accept_call_info = mime_message
417 .get_header(HeaderDef::ChatWebrtcAccepted)
418 .unwrap_or_default();
419 self.emit_event(EventType::OutgoingCallAccepted {
420 msg_id: call.msg.id,
421 chat_id: call.msg.chat_id,
422 accept_call_info: accept_call_info.to_string(),
423 });
424 }
425 }
426 SystemMessage::CallEnded => {
427 let Some(mut call) = self.load_call_by_id(call_id).await? else {
428 warn!(self, "{call_id} does not refer to a call message");
429 return Ok(());
430 };
431
432 if call.is_ended() {
433 info!(self, "CallEnded received for ended call");
435 return Ok(());
436 }
437
438 if !call.is_accepted() {
439 if call.is_incoming() {
440 if from_id == ContactId::SELF {
441 call.mark_as_ended(self).await?;
442 call.update_text(self, "Declined call").await?;
443 } else {
444 call.mark_as_canceled(self).await?;
445 call.update_text(self, "Missed call").await?;
446 }
447 } else {
448 if from_id == ContactId::SELF {
450 call.mark_as_canceled(self).await?;
451 call.update_text(self, "Canceled call").await?;
452 } else {
453 call.mark_as_ended(self).await?;
454 call.update_text(self, "Declined call").await?;
455 }
456 }
457 } else {
458 call.mark_as_ended(self).await?;
459 call.update_text_duration(self).await?;
460 }
461
462 self.emit_msgs_changed(call.msg.chat_id, call_id);
463 self.emit_event(EventType::CallEnded {
464 msg_id: call.msg.id,
465 chat_id: call.msg.chat_id,
466 });
467 }
468 _ => {}
469 }
470 }
471 Ok(())
472 }
473
474 pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
479 let call = Message::load_from_db(self, call_id).await?;
480 Ok(self.load_call_by_message(call))
481 }
482
483 fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
487 if call.viewtype != Viewtype::Call {
488 return None;
492 }
493
494 Some(CallInfo {
495 place_call_info: call
496 .param
497 .get(Param::WebrtcRoom)
498 .unwrap_or_default()
499 .to_string(),
500 accept_call_info: call
501 .param
502 .get(Param::WebrtcAccepted)
503 .unwrap_or_default()
504 .to_string(),
505 msg: call,
506 })
507 }
508}
509
510pub fn sdp_has_video(sdp: &str) -> Result<bool> {
512 let mut cursor = Cursor::new(sdp);
513 let session_description =
514 SessionDescription::unmarshal(&mut cursor).context("Failed to parse SDP")?;
515 for media_description in &session_description.media_descriptions {
516 if media_description.media_name.media == "video" {
517 return Ok(true);
518 }
519 }
520 Ok(false)
521}
522
523#[derive(Debug, PartialEq, Eq)]
525pub enum CallState {
526 Alerting,
532
533 Active,
535
536 Completed {
539 duration: i64,
541 },
542
543 Missed,
546
547 Declined,
551
552 Canceled,
559}
560
561pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> {
565 let call = context
566 .load_call_by_id(msg_id)
567 .await?
568 .with_context(|| format!("{msg_id} is not a call message"))?;
569 let state = if call.is_incoming() {
570 if call.is_accepted() {
571 if call.is_ended() {
572 CallState::Completed {
573 duration: call.duration_seconds(),
574 }
575 } else {
576 CallState::Active
577 }
578 } else if call.is_canceled() {
579 CallState::Missed
582 } else if call.is_ended() {
583 CallState::Declined
584 } else if call.is_stale() {
585 CallState::Missed
586 } else {
587 CallState::Alerting
588 }
589 } else if call.is_accepted() {
590 if call.is_ended() {
591 CallState::Completed {
592 duration: call.duration_seconds(),
593 }
594 } else {
595 CallState::Active
596 }
597 } else if call.is_canceled() {
598 CallState::Canceled
599 } else if call.is_ended() || call.is_stale() {
600 CallState::Declined
601 } else {
602 CallState::Alerting
603 };
604 Ok(state)
605}
606
607#[derive(Serialize, Debug, Clone, PartialEq)]
609struct IceServer {
610 pub urls: Vec<String>,
612
613 pub username: Option<String>,
615
616 pub credential: Option<String>,
618}
619
620pub(crate) async fn create_ice_servers_from_metadata(
630 metadata: &str,
631) -> Result<(i64, Vec<UnresolvedIceServer>)> {
632 let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?;
633 let (port, rest) = rest.split_once(':').context("Missing port")?;
634 let port = u16::from_str(port).context("Failed to parse the port")?;
635 let (ts, password) = rest.split_once(':').context("Missing timestamp")?;
636 let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?;
637 let ice_servers = vec![UnresolvedIceServer::Turn {
638 hostname: hostname.to_string(),
639 port,
640 username: ts.to_string(),
641 credential: password.to_string(),
642 }];
643 Ok((expiration_timestamp, ice_servers))
644}
645
646#[derive(Debug, Clone)]
648pub(crate) enum UnresolvedIceServer {
649 Stun { hostname: String, port: u16 },
651
652 Turn {
654 hostname: String,
655 port: u16,
656 username: String,
657 credential: String,
658 },
659}
660
661pub(crate) async fn resolve_ice_servers(
666 context: &Context,
667 unresolved_ice_servers: Vec<UnresolvedIceServer>,
668) -> Result<String> {
669 let mut result: Vec<IceServer> = Vec::new();
670
671 let load_cache = false;
673
674 for unresolved_ice_server in unresolved_ice_servers {
675 match unresolved_ice_server {
676 UnresolvedIceServer::Stun { hostname, port } => {
677 match lookup_host_with_cache(context, &hostname, port, "", load_cache).await {
678 Ok(addrs) => {
679 let urls: Vec<String> = addrs
680 .into_iter()
681 .map(|addr| format!("stun:{addr}"))
682 .collect();
683 let stun_server = IceServer {
684 urls,
685 username: None,
686 credential: None,
687 };
688 result.push(stun_server);
689 }
690 Err(err) => {
691 warn!(
692 context,
693 "Failed to resolve STUN {hostname}:{port}: {err:#}."
694 );
695 }
696 }
697 }
698 UnresolvedIceServer::Turn {
699 hostname,
700 port,
701 username,
702 credential,
703 } => match lookup_host_with_cache(context, &hostname, port, "", load_cache).await {
704 Ok(addrs) => {
705 let urls: Vec<String> = addrs
706 .into_iter()
707 .map(|addr| format!("turn:{addr}"))
708 .collect();
709 let turn_server = IceServer {
710 urls,
711 username: Some(username),
712 credential: Some(credential),
713 };
714 result.push(turn_server);
715 }
716 Err(err) => {
717 warn!(
718 context,
719 "Failed to resolve TURN {hostname}:{port}: {err:#}."
720 );
721 }
722 },
723 }
724 }
725 let json = serde_json::to_string(&result)?;
726 Ok(json)
727}
728
729pub(crate) fn create_fallback_ice_servers() -> Vec<UnresolvedIceServer> {
731 vec![
739 UnresolvedIceServer::Stun {
740 hostname: "nine.testrun.org".to_string(),
741 port: STUN_PORT,
742 },
743 UnresolvedIceServer::Turn {
744 hostname: "turn.delta.chat".to_string(),
745 port: STUN_PORT,
746 username: "public".to_string(),
747 credential: "o4tR7yG4rG2slhXqRUf9zgmHz".to_string(),
748 },
749 ]
750}
751
752pub async fn ice_servers(context: &Context) -> Result<String> {
762 if let Some(ref metadata) = *context.metadata.read().await {
763 let ice_servers = resolve_ice_servers(context, metadata.ice_servers.clone()).await?;
764 Ok(ice_servers)
765 } else {
766 Ok("[]".to_string())
767 }
768}
769
770#[derive(
772 Debug, Default, Display, Clone, Copy, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql,
773)]
774#[repr(u8)]
775pub enum WhoCanCallMe {
776 Everybody = 0,
780
781 #[default]
783 Contacts = 1,
784
785 Nobody = 2,
787}
788
789async fn who_can_call_me(context: &Context) -> Result<WhoCanCallMe> {
791 let who_can_call_me =
792 WhoCanCallMe::from_i32(context.get_config_int(Config::WhoCanCallMe).await?)
793 .unwrap_or_default();
794 Ok(who_can_call_me)
795}
796
797#[cfg(test)]
798mod calls_tests;