1use crate::chat::ChatIdBlocked;
6use crate::chat::{Chat, ChatId, send_msg};
7use crate::constants::{Blocked, Chattype};
8use crate::contact::ContactId;
9use crate::context::Context;
10use crate::events::EventType;
11use crate::headerdef::HeaderDef;
12use crate::log::{info, warn};
13use crate::message::{self, Message, MsgId, Viewtype};
14use crate::mimeparser::{MimeMessage, SystemMessage};
15use crate::net::dns::lookup_host_with_cache;
16use crate::param::Param;
17use crate::tools::time;
18use anyhow::{Context as _, Result, ensure};
19use sdp::SessionDescription;
20use serde::Serialize;
21use std::io::Cursor;
22use std::str::FromStr;
23use std::time::Duration;
24use tokio::task;
25use tokio::time::sleep;
26
27const RINGING_SECONDS: i64 = 60;
37
38const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
41const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
42
43const STUN_PORT: u16 = 3478;
44
45const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2;
53
54#[derive(Debug, Default)]
56pub struct CallInfo {
57 pub place_call_info: String,
59
60 pub accept_call_info: String,
62
63 pub msg: Message,
66}
67
68impl CallInfo {
69 pub fn is_incoming(&self) -> bool {
71 self.msg.from_id != ContactId::SELF
72 }
73
74 pub fn is_stale(&self) -> bool {
76 (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0
77 }
78
79 fn remaining_ring_seconds(&self) -> i64 {
80 let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
81 remaining_seconds.clamp(0, RINGING_SECONDS)
82 }
83
84 async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
85 context
86 .sql
87 .execute(
88 "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
89 (text, message::normalize_text(text), self.msg.id),
90 )
91 .await?;
92 Ok(())
93 }
94
95 async fn update_text_duration(&self, context: &Context) -> Result<()> {
96 let minutes = self.duration_seconds() / 60;
97 let duration = match minutes {
98 0 => "<1 minute".to_string(),
99 1 => "1 minute".to_string(),
100 n => format!("{n} minutes"),
101 };
102
103 if self.is_incoming() {
104 self.update_text(context, &format!("Incoming call\n{duration}"))
105 .await?;
106 } else {
107 self.update_text(context, &format!("Outgoing call\n{duration}"))
108 .await?;
109 }
110 Ok(())
111 }
112
113 async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
116 self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
117 self.msg.update_param(context).await?;
118 Ok(())
119 }
120
121 pub fn is_accepted(&self) -> bool {
123 self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
124 }
125
126 pub fn is_canceled(&self) -> bool {
134 self.msg.param.exists(CALL_CANCELED_TIMESTAMP)
135 }
136
137 async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
138 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
139 self.msg.update_param(context).await?;
140 Ok(())
141 }
142
143 async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> {
151 let now = time();
152 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
153 self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
154 self.msg.update_param(context).await?;
155 Ok(())
156 }
157
158 pub fn is_ended(&self) -> bool {
160 self.msg.param.exists(CALL_ENDED_TIMESTAMP)
161 }
162
163 pub fn duration_seconds(&self) -> i64 {
165 if let (Some(start), Some(end)) = (
166 self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
167 self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
168 ) {
169 let seconds = end - start;
170 if seconds <= 0 {
171 return 1;
172 }
173 return seconds;
174 }
175 0
176 }
177}
178
179impl Context {
180 pub async fn place_outgoing_call(
182 &self,
183 chat_id: ChatId,
184 place_call_info: String,
185 ) -> Result<MsgId> {
186 let chat = Chat::load_from_db(self, chat_id).await?;
187 ensure!(
188 chat.typ == Chattype::Single,
189 "Can only place calls in 1:1 chats"
190 );
191 ensure!(!chat.is_self_talk(), "Cannot call self");
192
193 let mut call = Message {
194 viewtype: Viewtype::Call,
195 text: "Outgoing call".into(),
196 ..Default::default()
197 };
198 call.param.set(Param::WebrtcRoom, &place_call_info);
199 call.id = send_msg(self, chat_id, &mut call).await?;
200
201 let wait = RINGING_SECONDS;
202 task::spawn(Context::emit_end_call_if_unaccepted(
203 self.clone(),
204 wait.try_into()?,
205 call.id,
206 ));
207
208 Ok(call.id)
209 }
210
211 pub async fn accept_incoming_call(
213 &self,
214 call_id: MsgId,
215 accept_call_info: String,
216 ) -> Result<()> {
217 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
218 format!("accept_incoming_call is called with {call_id} which does not refer to a call")
219 })?;
220 ensure!(call.is_incoming());
221 if call.is_accepted() || call.is_ended() {
222 info!(self, "Call already accepted/ended");
223 return Ok(());
224 }
225
226 call.mark_as_accepted(self).await?;
227 let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
228 if chat.is_contact_request() {
229 chat.id.accept(self).await?;
230 }
231
232 let mut msg = Message {
234 viewtype: Viewtype::Text,
235 text: "[Call accepted]".into(),
236 ..Default::default()
237 };
238 msg.param.set_cmd(SystemMessage::CallAccepted);
239 msg.hidden = true;
240 msg.param
241 .set(Param::WebrtcAccepted, accept_call_info.to_string());
242 msg.set_quote(self, Some(&call.msg)).await?;
243 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
244 self.emit_event(EventType::IncomingCallAccepted {
245 msg_id: call.msg.id,
246 chat_id: call.msg.chat_id,
247 });
248 self.emit_msgs_changed(call.msg.chat_id, call_id);
249 Ok(())
250 }
251
252 pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
254 let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
255 format!("end_call is called with {call_id} which does not refer to a call")
256 })?;
257 if call.is_ended() {
258 info!(self, "Call already ended");
259 return Ok(());
260 }
261
262 if !call.is_accepted() {
263 if call.is_incoming() {
264 call.mark_as_ended(self).await?;
265 call.update_text(self, "Declined call").await?;
266 } else {
267 call.mark_as_canceled(self).await?;
268 call.update_text(self, "Canceled call").await?;
269 }
270 } else {
271 call.mark_as_ended(self).await?;
272 call.update_text_duration(self).await?;
273 }
274
275 let mut msg = Message {
276 viewtype: Viewtype::Text,
277 text: "[Call ended]".into(),
278 ..Default::default()
279 };
280 msg.param.set_cmd(SystemMessage::CallEnded);
281 msg.hidden = true;
282 msg.set_quote(self, Some(&call.msg)).await?;
283 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
284
285 self.emit_event(EventType::CallEnded {
286 msg_id: call.msg.id,
287 chat_id: call.msg.chat_id,
288 });
289 self.emit_msgs_changed(call.msg.chat_id, call_id);
290 Ok(())
291 }
292
293 async fn emit_end_call_if_unaccepted(
294 context: Context,
295 wait: u64,
296 call_id: MsgId,
297 ) -> Result<()> {
298 sleep(Duration::from_secs(wait)).await;
299 let Some(mut call) = context.load_call_by_id(call_id).await? else {
300 warn!(
301 context,
302 "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call."
303 );
304 return Ok(());
305 };
306 if !call.is_accepted() && !call.is_ended() {
307 if call.is_incoming() {
308 call.mark_as_canceled(&context).await?;
309 call.update_text(&context, "Missed call").await?;
310 } else {
311 call.mark_as_ended(&context).await?;
312 call.update_text(&context, "Canceled call").await?;
313 }
314 context.emit_msgs_changed(call.msg.chat_id, call_id);
315 context.emit_event(EventType::CallEnded {
316 msg_id: call.msg.id,
317 chat_id: call.msg.chat_id,
318 });
319 }
320 Ok(())
321 }
322
323 pub(crate) async fn handle_call_msg(
324 &self,
325 call_id: MsgId,
326 mime_message: &MimeMessage,
327 from_id: ContactId,
328 ) -> Result<()> {
329 if mime_message.is_call() {
330 let Some(call) = self.load_call_by_id(call_id).await? else {
331 warn!(self, "{call_id} does not refer to a call message");
332 return Ok(());
333 };
334
335 if call.is_incoming() {
336 if call.is_stale() {
337 call.update_text(self, "Missed call").await?;
338 self.emit_incoming_msg(call.msg.chat_id, call_id); } else {
340 call.update_text(self, "Incoming call").await?;
341 self.emit_msgs_changed(call.msg.chat_id, call_id); let has_video = match sdp_has_video(&call.place_call_info) {
343 Ok(has_video) => has_video,
344 Err(err) => {
345 warn!(self, "Failed to determine if SDP offer has video: {err:#}.");
346 false
347 }
348 };
349 if let Some(chat_id_blocked) =
350 ChatIdBlocked::lookup_by_contact(self, from_id).await?
351 {
352 match chat_id_blocked.blocked {
353 Blocked::Not => {
354 self.emit_event(EventType::IncomingCall {
355 msg_id: call.msg.id,
356 chat_id: call.msg.chat_id,
357 place_call_info: call.place_call_info.to_string(),
358 has_video,
359 });
360 }
361 Blocked::Yes | Blocked::Request => {
362 }
368 }
369 }
370 let wait = call.remaining_ring_seconds();
371 task::spawn(Context::emit_end_call_if_unaccepted(
372 self.clone(),
373 wait.try_into()?,
374 call.msg.id,
375 ));
376 }
377 } else {
378 call.update_text(self, "Outgoing call").await?;
379 self.emit_msgs_changed(call.msg.chat_id, call_id);
380 }
381 } else {
382 match mime_message.is_system_message {
383 SystemMessage::CallAccepted => {
384 let Some(mut call) = self.load_call_by_id(call_id).await? else {
385 warn!(self, "{call_id} does not refer to a call message");
386 return Ok(());
387 };
388
389 if call.is_ended() || call.is_accepted() {
390 info!(self, "CallAccepted received for accepted/ended call");
391 return Ok(());
392 }
393
394 call.mark_as_accepted(self).await?;
395 self.emit_msgs_changed(call.msg.chat_id, call_id);
396 if call.is_incoming() {
397 self.emit_event(EventType::IncomingCallAccepted {
398 msg_id: call.msg.id,
399 chat_id: call.msg.chat_id,
400 });
401 } else {
402 let accept_call_info = mime_message
403 .get_header(HeaderDef::ChatWebrtcAccepted)
404 .unwrap_or_default();
405 self.emit_event(EventType::OutgoingCallAccepted {
406 msg_id: call.msg.id,
407 chat_id: call.msg.chat_id,
408 accept_call_info: accept_call_info.to_string(),
409 });
410 }
411 }
412 SystemMessage::CallEnded => {
413 let Some(mut call) = self.load_call_by_id(call_id).await? else {
414 warn!(self, "{call_id} does not refer to a call message");
415 return Ok(());
416 };
417
418 if call.is_ended() {
419 info!(self, "CallEnded received for ended call");
421 return Ok(());
422 }
423
424 if !call.is_accepted() {
425 if call.is_incoming() {
426 if from_id == ContactId::SELF {
427 call.mark_as_ended(self).await?;
428 call.update_text(self, "Declined call").await?;
429 } else {
430 call.mark_as_canceled(self).await?;
431 call.update_text(self, "Missed call").await?;
432 }
433 } else {
434 if from_id == ContactId::SELF {
436 call.mark_as_canceled(self).await?;
437 call.update_text(self, "Canceled call").await?;
438 } else {
439 call.mark_as_ended(self).await?;
440 call.update_text(self, "Declined call").await?;
441 }
442 }
443 } else {
444 call.mark_as_ended(self).await?;
445 call.update_text_duration(self).await?;
446 }
447
448 self.emit_msgs_changed(call.msg.chat_id, call_id);
449 self.emit_event(EventType::CallEnded {
450 msg_id: call.msg.id,
451 chat_id: call.msg.chat_id,
452 });
453 }
454 _ => {}
455 }
456 }
457 Ok(())
458 }
459
460 pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
465 let call = Message::load_from_db(self, call_id).await?;
466 Ok(self.load_call_by_message(call))
467 }
468
469 fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
473 if call.viewtype != Viewtype::Call {
474 return None;
478 }
479
480 Some(CallInfo {
481 place_call_info: call
482 .param
483 .get(Param::WebrtcRoom)
484 .unwrap_or_default()
485 .to_string(),
486 accept_call_info: call
487 .param
488 .get(Param::WebrtcAccepted)
489 .unwrap_or_default()
490 .to_string(),
491 msg: call,
492 })
493 }
494}
495
496pub fn sdp_has_video(sdp: &str) -> Result<bool> {
498 let mut cursor = Cursor::new(sdp);
499 let session_description =
500 SessionDescription::unmarshal(&mut cursor).context("Failed to parse SDP")?;
501 for media_description in &session_description.media_descriptions {
502 if media_description.media_name.media == "video" {
503 return Ok(true);
504 }
505 }
506 Ok(false)
507}
508
509#[derive(Debug, PartialEq, Eq)]
511pub enum CallState {
512 Alerting,
518
519 Active,
521
522 Completed {
525 duration: i64,
527 },
528
529 Missed,
532
533 Declined,
537
538 Canceled,
545}
546
547pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> {
551 let call = context
552 .load_call_by_id(msg_id)
553 .await?
554 .with_context(|| format!("{msg_id} is not a call message"))?;
555 let state = if call.is_incoming() {
556 if call.is_accepted() {
557 if call.is_ended() {
558 CallState::Completed {
559 duration: call.duration_seconds(),
560 }
561 } else {
562 CallState::Active
563 }
564 } else if call.is_canceled() {
565 CallState::Missed
568 } else if call.is_ended() {
569 CallState::Declined
570 } else if call.is_stale() {
571 CallState::Missed
572 } else {
573 CallState::Alerting
574 }
575 } else if call.is_accepted() {
576 if call.is_ended() {
577 CallState::Completed {
578 duration: call.duration_seconds(),
579 }
580 } else {
581 CallState::Active
582 }
583 } else if call.is_canceled() {
584 CallState::Canceled
585 } else if call.is_ended() || call.is_stale() {
586 CallState::Declined
587 } else {
588 CallState::Alerting
589 };
590 Ok(state)
591}
592
593#[derive(Serialize, Debug, Clone, PartialEq)]
595struct IceServer {
596 pub urls: Vec<String>,
598
599 pub username: Option<String>,
601
602 pub credential: Option<String>,
604}
605
606async fn create_ice_servers(
608 context: &Context,
609 hostname: &str,
610 port: u16,
611 username: &str,
612 password: &str,
613) -> Result<String> {
614 let load_cache = false;
616 let urls: Vec<String> = lookup_host_with_cache(context, hostname, port, "", load_cache)
617 .await?
618 .into_iter()
619 .map(|addr| format!("turn:{addr}"))
620 .collect();
621
622 let ice_server = IceServer {
623 urls,
624 username: Some(username.to_string()),
625 credential: Some(password.to_string()),
626 };
627
628 let json = serde_json::to_string(&[ice_server])?;
629 Ok(json)
630}
631
632pub(crate) async fn create_ice_servers_from_metadata(
642 context: &Context,
643 metadata: &str,
644) -> Result<(i64, String)> {
645 let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?;
646 let (port, rest) = rest.split_once(':').context("Missing port")?;
647 let port = u16::from_str(port).context("Failed to parse the port")?;
648 let (ts, password) = rest.split_once(':').context("Missing timestamp")?;
649 let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?;
650 let ice_servers = create_ice_servers(context, hostname, port, ts, password).await?;
651 Ok((expiration_timestamp, ice_servers))
652}
653
654pub(crate) async fn create_fallback_ice_servers(context: &Context) -> Result<String> {
656 let hostname = "nine.testrun.org";
665
666 let load_cache = false;
668 let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache)
669 .await?
670 .into_iter()
671 .map(|addr| format!("stun:{addr}"))
672 .collect();
673
674 let ice_server = IceServer {
675 urls,
676 username: None,
677 credential: None,
678 };
679
680 let json = serde_json::to_string(&[ice_server])?;
681 Ok(json)
682}
683
684pub async fn ice_servers(context: &Context) -> Result<String> {
694 if let Some(ref metadata) = *context.metadata.read().await {
695 Ok(metadata.ice_servers.clone())
696 } else {
697 Ok("[]".to_string())
698 }
699}
700
701#[cfg(test)]
702mod calls_tests;