1use crate::chat::ChatIdBlocked;
6use crate::chat::{Chat, ChatId, send_msg};
7use crate::constants::{Blocked, Chattype};
8use crate::contact::ContactId;
9use crate::context::{Context, WeakContext};
10use crate::events::EventType;
11use crate::headerdef::HeaderDef;
12use crate::log::warn;
13use crate::message::{Message, MsgId, Viewtype};
14use crate::mimeparser::{MimeMessage, SystemMessage};
15use crate::net::dns::lookup_host_with_cache;
16use crate::param::Param;
17use crate::tools::{normalize_text, time};
18use anyhow::{Context as _, Result, ensure};
19use sdp::SessionDescription;
20use serde::Serialize;
21use std::io::Cursor;
22use std::str::FromStr;
23use std::time::Duration;
24use tokio::task;
25use tokio::time::sleep;
26
27const RINGING_SECONDS: i64 = 120;
37
38const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
41const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
42
43const STUN_PORT: u16 = 3478;
44
45const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2;
53
54#[derive(Debug, Default)]
56pub struct CallInfo {
57 pub place_call_info: String,
59
60 pub accept_call_info: String,
62
63 pub msg: Message,
66}
67
68impl CallInfo {
69 pub fn is_incoming(&self) -> bool {
71 self.msg.from_id != ContactId::SELF
72 }
73
74 pub fn is_stale(&self) -> bool {
76 (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0
77 }
78
79 fn remaining_ring_seconds(&self) -> i64 {
80 let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
81 remaining_seconds.clamp(0, RINGING_SECONDS)
82 }
83
84 async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
85 context
86 .sql
87 .execute(
88 "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
89 (text, normalize_text(text), self.msg.id),
90 )
91 .await?;
92 Ok(())
93 }
94
95 async fn update_text_duration(&self, context: &Context) -> Result<()> {
96 let minutes = self.duration_seconds() / 60;
97 let duration = match minutes {
98 0 => "<1 minute".to_string(),
99 1 => "1 minute".to_string(),
100 n => format!("{n} minutes"),
101 };
102
103 if self.is_incoming() {
104 self.update_text(context, &format!("Incoming call\n{duration}"))
105 .await?;
106 } else {
107 self.update_text(context, &format!("Outgoing call\n{duration}"))
108 .await?;
109 }
110 Ok(())
111 }
112
113 async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
116 self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
117 self.msg.update_param(context).await?;
118 Ok(())
119 }
120
121 pub fn is_accepted(&self) -> bool {
123 self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
124 }
125
126 pub fn is_canceled(&self) -> bool {
134 self.msg.param.exists(CALL_CANCELED_TIMESTAMP)
135 }
136
137 async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
138 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
139 self.msg.update_param(context).await?;
140 Ok(())
141 }
142
143 async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> {
151 let now = time();
152 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
153 self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
154 self.msg.update_param(context).await?;
155 Ok(())
156 }
157
158 pub fn is_ended(&self) -> bool {
160 self.msg.param.exists(CALL_ENDED_TIMESTAMP)
161 }
162
163 pub fn duration_seconds(&self) -> i64 {
165 if let (Some(start), Some(end)) = (
166 self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
167 self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
168 ) {
169 let seconds = end - start;
170 if seconds <= 0 {
171 return 1;
172 }
173 return seconds;
174 }
175 0
176 }
177}
178
179impl Context {
180 pub async fn place_outgoing_call(
182 &self,
183 chat_id: ChatId,
184 place_call_info: String,
185 ) -> Result<MsgId> {
186 let chat = Chat::load_from_db(self, chat_id).await?;
187 ensure!(
188 chat.typ == Chattype::Single,
189 "Can only place calls in 1:1 chats"
190 );
191 ensure!(!chat.is_self_talk(), "Cannot call self");
192
193 let mut call = Message {
194 viewtype: Viewtype::Call,
195 text: "Outgoing call".into(),
196 ..Default::default()
197 };
198 call.param.set(Param::WebrtcRoom, &place_call_info);
199 call.id = send_msg(self, chat_id, &mut call).await?;
200
201 let wait = RINGING_SECONDS;
202 let context = self.get_weak_context();
203 task::spawn(Context::emit_end_call_if_unaccepted(
204 context,
205 wait.try_into()?,
206 call.id,
207 ));
208
209 Ok(call.id)
210 }
211
212 pub async fn accept_incoming_call(
214 &self,
215 call_id: MsgId,
216 accept_call_info: String,
217 ) -> Result<()> {
218 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
219 format!("accept_incoming_call is called with {call_id} which does not refer to a call")
220 })?;
221 ensure!(call.is_incoming());
222 if call.is_accepted() || call.is_ended() {
223 info!(self, "Call already accepted/ended");
224 return Ok(());
225 }
226
227 call.mark_as_accepted(self).await?;
228 let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
229 if chat.is_contact_request() {
230 chat.id.accept(self).await?;
231 }
232
233 let mut msg = Message {
235 viewtype: Viewtype::Text,
236 text: "[Call accepted]".into(),
237 ..Default::default()
238 };
239 msg.param.set_cmd(SystemMessage::CallAccepted);
240 msg.hidden = true;
241 msg.param
242 .set(Param::WebrtcAccepted, accept_call_info.to_string());
243 msg.set_quote(self, Some(&call.msg)).await?;
244 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
245 self.emit_event(EventType::IncomingCallAccepted {
246 msg_id: call.msg.id,
247 chat_id: call.msg.chat_id,
248 });
249 self.emit_msgs_changed(call.msg.chat_id, call_id);
250 Ok(())
251 }
252
253 pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
255 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
256 format!("end_call is called with {call_id} which does not refer to a call")
257 })?;
258 if call.is_ended() {
259 info!(self, "Call already ended");
260 return Ok(());
261 }
262
263 if !call.is_accepted() {
264 if call.is_incoming() {
265 call.mark_as_ended(self).await?;
266 call.update_text(self, "Declined call").await?;
267 } else {
268 call.mark_as_canceled(self).await?;
269 call.update_text(self, "Canceled call").await?;
270 }
271 } else {
272 call.mark_as_ended(self).await?;
273 call.update_text_duration(self).await?;
274 }
275
276 let mut msg = Message {
277 viewtype: Viewtype::Text,
278 text: "[Call ended]".into(),
279 ..Default::default()
280 };
281 msg.param.set_cmd(SystemMessage::CallEnded);
282 msg.hidden = true;
283 msg.set_quote(self, Some(&call.msg)).await?;
284 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
285
286 self.emit_event(EventType::CallEnded {
287 msg_id: call.msg.id,
288 chat_id: call.msg.chat_id,
289 });
290 self.emit_msgs_changed(call.msg.chat_id, call_id);
291 Ok(())
292 }
293
294 async fn emit_end_call_if_unaccepted(
295 context: WeakContext,
296 wait: u64,
297 call_id: MsgId,
298 ) -> Result<()> {
299 sleep(Duration::from_secs(wait)).await;
300 let context = context.upgrade()?;
301 let Some(mut call) = context.load_call_by_id(call_id).await? else {
302 warn!(
303 context,
304 "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call."
305 );
306 return Ok(());
307 };
308 if !call.is_accepted() && !call.is_ended() {
309 if call.is_incoming() {
310 call.mark_as_canceled(&context).await?;
311 call.update_text(&context, "Missed call").await?;
312 } else {
313 call.mark_as_ended(&context).await?;
314 call.update_text(&context, "Canceled call").await?;
315 }
316 context.emit_msgs_changed(call.msg.chat_id, call_id);
317 context.emit_event(EventType::CallEnded {
318 msg_id: call.msg.id,
319 chat_id: call.msg.chat_id,
320 });
321 }
322 Ok(())
323 }
324
325 pub(crate) async fn handle_call_msg(
326 &self,
327 call_id: MsgId,
328 mime_message: &MimeMessage,
329 from_id: ContactId,
330 ) -> Result<()> {
331 if mime_message.is_call() {
332 let Some(call) = self.load_call_by_id(call_id).await? else {
333 warn!(self, "{call_id} does not refer to a call message");
334 return Ok(());
335 };
336
337 if call.is_incoming() {
338 if call.is_stale() {
339 call.update_text(self, "Missed call").await?;
340 self.emit_incoming_msg(call.msg.chat_id, call_id); } else {
342 call.update_text(self, "Incoming call").await?;
343 self.emit_msgs_changed(call.msg.chat_id, call_id); let has_video = match sdp_has_video(&call.place_call_info) {
345 Ok(has_video) => has_video,
346 Err(err) => {
347 warn!(self, "Failed to determine if SDP offer has video: {err:#}.");
348 false
349 }
350 };
351 if let Some(chat_id_blocked) =
352 ChatIdBlocked::lookup_by_contact(self, from_id).await?
353 {
354 match chat_id_blocked.blocked {
355 Blocked::Not => {
356 self.emit_event(EventType::IncomingCall {
357 msg_id: call.msg.id,
358 chat_id: call.msg.chat_id,
359 place_call_info: call.place_call_info.to_string(),
360 has_video,
361 });
362 }
363 Blocked::Yes | Blocked::Request => {
364 }
370 }
371 }
372 let wait = call.remaining_ring_seconds();
373 let context = self.get_weak_context();
374 task::spawn(Context::emit_end_call_if_unaccepted(
375 context,
376 wait.try_into()?,
377 call.msg.id,
378 ));
379 }
380 } else {
381 call.update_text(self, "Outgoing call").await?;
382 self.emit_msgs_changed(call.msg.chat_id, call_id);
383 }
384 } else {
385 match mime_message.is_system_message {
386 SystemMessage::CallAccepted => {
387 let Some(mut call) = self.load_call_by_id(call_id).await? else {
388 warn!(self, "{call_id} does not refer to a call message");
389 return Ok(());
390 };
391
392 if call.is_ended() || call.is_accepted() {
393 info!(self, "CallAccepted received for accepted/ended call");
394 return Ok(());
395 }
396
397 call.mark_as_accepted(self).await?;
398 self.emit_msgs_changed(call.msg.chat_id, call_id);
399 if call.is_incoming() {
400 self.emit_event(EventType::IncomingCallAccepted {
401 msg_id: call.msg.id,
402 chat_id: call.msg.chat_id,
403 });
404 } else {
405 let accept_call_info = mime_message
406 .get_header(HeaderDef::ChatWebrtcAccepted)
407 .unwrap_or_default();
408 self.emit_event(EventType::OutgoingCallAccepted {
409 msg_id: call.msg.id,
410 chat_id: call.msg.chat_id,
411 accept_call_info: accept_call_info.to_string(),
412 });
413 }
414 }
415 SystemMessage::CallEnded => {
416 let Some(mut call) = self.load_call_by_id(call_id).await? else {
417 warn!(self, "{call_id} does not refer to a call message");
418 return Ok(());
419 };
420
421 if call.is_ended() {
422 info!(self, "CallEnded received for ended call");
424 return Ok(());
425 }
426
427 if !call.is_accepted() {
428 if call.is_incoming() {
429 if from_id == ContactId::SELF {
430 call.mark_as_ended(self).await?;
431 call.update_text(self, "Declined call").await?;
432 } else {
433 call.mark_as_canceled(self).await?;
434 call.update_text(self, "Missed call").await?;
435 }
436 } else {
437 if from_id == ContactId::SELF {
439 call.mark_as_canceled(self).await?;
440 call.update_text(self, "Canceled call").await?;
441 } else {
442 call.mark_as_ended(self).await?;
443 call.update_text(self, "Declined call").await?;
444 }
445 }
446 } else {
447 call.mark_as_ended(self).await?;
448 call.update_text_duration(self).await?;
449 }
450
451 self.emit_msgs_changed(call.msg.chat_id, call_id);
452 self.emit_event(EventType::CallEnded {
453 msg_id: call.msg.id,
454 chat_id: call.msg.chat_id,
455 });
456 }
457 _ => {}
458 }
459 }
460 Ok(())
461 }
462
463 pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
468 let call = Message::load_from_db(self, call_id).await?;
469 Ok(self.load_call_by_message(call))
470 }
471
472 fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
476 if call.viewtype != Viewtype::Call {
477 return None;
481 }
482
483 Some(CallInfo {
484 place_call_info: call
485 .param
486 .get(Param::WebrtcRoom)
487 .unwrap_or_default()
488 .to_string(),
489 accept_call_info: call
490 .param
491 .get(Param::WebrtcAccepted)
492 .unwrap_or_default()
493 .to_string(),
494 msg: call,
495 })
496 }
497}
498
499pub fn sdp_has_video(sdp: &str) -> Result<bool> {
501 let mut cursor = Cursor::new(sdp);
502 let session_description =
503 SessionDescription::unmarshal(&mut cursor).context("Failed to parse SDP")?;
504 for media_description in &session_description.media_descriptions {
505 if media_description.media_name.media == "video" {
506 return Ok(true);
507 }
508 }
509 Ok(false)
510}
511
512#[derive(Debug, PartialEq, Eq)]
514pub enum CallState {
515 Alerting,
521
522 Active,
524
525 Completed {
528 duration: i64,
530 },
531
532 Missed,
535
536 Declined,
540
541 Canceled,
548}
549
550pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> {
554 let call = context
555 .load_call_by_id(msg_id)
556 .await?
557 .with_context(|| format!("{msg_id} is not a call message"))?;
558 let state = if call.is_incoming() {
559 if call.is_accepted() {
560 if call.is_ended() {
561 CallState::Completed {
562 duration: call.duration_seconds(),
563 }
564 } else {
565 CallState::Active
566 }
567 } else if call.is_canceled() {
568 CallState::Missed
571 } else if call.is_ended() {
572 CallState::Declined
573 } else if call.is_stale() {
574 CallState::Missed
575 } else {
576 CallState::Alerting
577 }
578 } else if call.is_accepted() {
579 if call.is_ended() {
580 CallState::Completed {
581 duration: call.duration_seconds(),
582 }
583 } else {
584 CallState::Active
585 }
586 } else if call.is_canceled() {
587 CallState::Canceled
588 } else if call.is_ended() || call.is_stale() {
589 CallState::Declined
590 } else {
591 CallState::Alerting
592 };
593 Ok(state)
594}
595
596#[derive(Serialize, Debug, Clone, PartialEq)]
598struct IceServer {
599 pub urls: Vec<String>,
601
602 pub username: Option<String>,
604
605 pub credential: Option<String>,
607}
608
609async fn create_ice_servers(
611 context: &Context,
612 hostname: &str,
613 port: u16,
614 username: &str,
615 password: &str,
616) -> Result<String> {
617 let load_cache = false;
619 let urls: Vec<String> = lookup_host_with_cache(context, hostname, port, "", load_cache)
620 .await?
621 .into_iter()
622 .map(|addr| format!("turn:{addr}"))
623 .collect();
624
625 let ice_server = IceServer {
626 urls,
627 username: Some(username.to_string()),
628 credential: Some(password.to_string()),
629 };
630
631 let json = serde_json::to_string(&[ice_server])?;
632 Ok(json)
633}
634
635pub(crate) async fn create_ice_servers_from_metadata(
645 context: &Context,
646 metadata: &str,
647) -> Result<(i64, String)> {
648 let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?;
649 let (port, rest) = rest.split_once(':').context("Missing port")?;
650 let port = u16::from_str(port).context("Failed to parse the port")?;
651 let (ts, password) = rest.split_once(':').context("Missing timestamp")?;
652 let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?;
653 let ice_servers = create_ice_servers(context, hostname, port, ts, password).await?;
654 Ok((expiration_timestamp, ice_servers))
655}
656
657pub(crate) async fn create_fallback_ice_servers(context: &Context) -> Result<String> {
659 let hostname = "nine.testrun.org";
667 let load_cache = false;
669 let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache)
670 .await?
671 .into_iter()
672 .map(|addr| format!("stun:{addr}"))
673 .collect();
674 let stun_server = IceServer {
675 urls,
676 username: None,
677 credential: None,
678 };
679
680 let hostname = "turn.delta.chat";
681 let load_cache = false;
683 let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache)
684 .await?
685 .into_iter()
686 .map(|addr| format!("turn:{addr}"))
687 .collect();
688 let turn_server = IceServer {
689 urls,
690 username: Some("public".to_string()),
691 credential: Some("o4tR7yG4rG2slhXqRUf9zgmHz".to_string()),
692 };
693
694 let json = serde_json::to_string(&[stun_server, turn_server])?;
695 Ok(json)
696}
697
698pub async fn ice_servers(context: &Context) -> Result<String> {
708 if let Some(ref metadata) = *context.metadata.read().await {
709 Ok(metadata.ice_servers.clone())
710 } else {
711 Ok("[]".to_string())
712 }
713}
714
715#[cfg(test)]
716mod calls_tests;