1use crate::chat::{Chat, ChatId, send_msg};
6use crate::constants::Chattype;
7use crate::contact::ContactId;
8use crate::context::Context;
9use crate::events::EventType;
10use crate::headerdef::HeaderDef;
11use crate::log::{info, warn};
12use crate::message::{self, Message, MsgId, Viewtype};
13use crate::mimeparser::{MimeMessage, SystemMessage};
14use crate::net::dns::lookup_host_with_cache;
15use crate::param::Param;
16use crate::tools::time;
17use anyhow::{Context as _, Result, ensure};
18use sdp::SessionDescription;
19use serde::Serialize;
20use std::io::Cursor;
21use std::str::FromStr;
22use std::time::Duration;
23use tokio::task;
24use tokio::time::sleep;
25
26const RINGING_SECONDS: i64 = 60;
36
37const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
40const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
41
42const STUN_PORT: u16 = 3478;
43
44const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2;
52
53#[derive(Debug, Default)]
55pub struct CallInfo {
56 pub place_call_info: String,
58
59 pub accept_call_info: String,
61
62 pub msg: Message,
65}
66
67impl CallInfo {
68 pub fn is_incoming(&self) -> bool {
70 self.msg.from_id != ContactId::SELF
71 }
72
73 pub fn is_stale(&self) -> bool {
75 (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0
76 }
77
78 fn remaining_ring_seconds(&self) -> i64 {
79 let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
80 remaining_seconds.clamp(0, RINGING_SECONDS)
81 }
82
83 async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
84 context
85 .sql
86 .execute(
87 "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
88 (text, message::normalize_text(text), self.msg.id),
89 )
90 .await?;
91 Ok(())
92 }
93
94 async fn update_text_duration(&self, context: &Context) -> Result<()> {
95 let minutes = self.duration_seconds() / 60;
96 let duration = match minutes {
97 0 => "<1 minute".to_string(),
98 1 => "1 minute".to_string(),
99 n => format!("{} minutes", n),
100 };
101
102 if self.is_incoming() {
103 self.update_text(context, &format!("Incoming call\n{duration}"))
104 .await?;
105 } else {
106 self.update_text(context, &format!("Outgoing call\n{duration}"))
107 .await?;
108 }
109 Ok(())
110 }
111
112 async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
115 self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
116 self.msg.update_param(context).await?;
117 Ok(())
118 }
119
120 pub fn is_accepted(&self) -> bool {
122 self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
123 }
124
125 pub fn is_canceled(&self) -> bool {
133 self.msg.param.exists(CALL_CANCELED_TIMESTAMP)
134 }
135
136 async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
137 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
138 self.msg.update_param(context).await?;
139 Ok(())
140 }
141
142 async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> {
150 let now = time();
151 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
152 self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
153 self.msg.update_param(context).await?;
154 Ok(())
155 }
156
157 pub fn is_ended(&self) -> bool {
159 self.msg.param.exists(CALL_ENDED_TIMESTAMP)
160 }
161
162 pub fn duration_seconds(&self) -> i64 {
164 if let (Some(start), Some(end)) = (
165 self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
166 self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
167 ) {
168 let seconds = end - start;
169 if seconds <= 0 {
170 return 1;
171 }
172 return seconds;
173 }
174 0
175 }
176}
177
178impl Context {
179 pub async fn place_outgoing_call(
181 &self,
182 chat_id: ChatId,
183 place_call_info: String,
184 ) -> Result<MsgId> {
185 let chat = Chat::load_from_db(self, chat_id).await?;
186 ensure!(
187 chat.typ == Chattype::Single,
188 "Can only place calls in 1:1 chats"
189 );
190 ensure!(!chat.is_self_talk(), "Cannot call self");
191
192 let mut call = Message {
193 viewtype: Viewtype::Call,
194 text: "Outgoing call".into(),
195 ..Default::default()
196 };
197 call.param.set(Param::WebrtcRoom, &place_call_info);
198 call.id = send_msg(self, chat_id, &mut call).await?;
199
200 let wait = RINGING_SECONDS;
201 task::spawn(Context::emit_end_call_if_unaccepted(
202 self.clone(),
203 wait.try_into()?,
204 call.id,
205 ));
206
207 Ok(call.id)
208 }
209
210 pub async fn accept_incoming_call(
212 &self,
213 call_id: MsgId,
214 accept_call_info: String,
215 ) -> Result<()> {
216 let mut call: CallInfo = self.load_call_by_id(call_id).await?;
217 ensure!(call.is_incoming());
218 if call.is_accepted() || call.is_ended() {
219 info!(self, "Call already accepted/ended");
220 return Ok(());
221 }
222
223 call.mark_as_accepted(self).await?;
224 let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
225 if chat.is_contact_request() {
226 chat.id.accept(self).await?;
227 }
228
229 let mut msg = Message {
231 viewtype: Viewtype::Text,
232 text: "[Call accepted]".into(),
233 ..Default::default()
234 };
235 msg.param.set_cmd(SystemMessage::CallAccepted);
236 msg.hidden = true;
237 msg.param
238 .set(Param::WebrtcAccepted, accept_call_info.to_string());
239 msg.set_quote(self, Some(&call.msg)).await?;
240 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
241 self.emit_event(EventType::IncomingCallAccepted {
242 msg_id: call.msg.id,
243 chat_id: call.msg.chat_id,
244 });
245 self.emit_msgs_changed(call.msg.chat_id, call_id);
246 Ok(())
247 }
248
249 pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
251 let mut call: CallInfo = self.load_call_by_id(call_id).await?;
252 if call.is_ended() {
253 info!(self, "Call already ended");
254 return Ok(());
255 }
256
257 if !call.is_accepted() {
258 if call.is_incoming() {
259 call.mark_as_ended(self).await?;
260 call.update_text(self, "Declined call").await?;
261 } else {
262 call.mark_as_canceled(self).await?;
263 call.update_text(self, "Canceled call").await?;
264 }
265 } else {
266 call.mark_as_ended(self).await?;
267 call.update_text_duration(self).await?;
268 }
269
270 let mut msg = Message {
271 viewtype: Viewtype::Text,
272 text: "[Call ended]".into(),
273 ..Default::default()
274 };
275 msg.param.set_cmd(SystemMessage::CallEnded);
276 msg.hidden = true;
277 msg.set_quote(self, Some(&call.msg)).await?;
278 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
279
280 self.emit_event(EventType::CallEnded {
281 msg_id: call.msg.id,
282 chat_id: call.msg.chat_id,
283 });
284 self.emit_msgs_changed(call.msg.chat_id, call_id);
285 Ok(())
286 }
287
288 async fn emit_end_call_if_unaccepted(
289 context: Context,
290 wait: u64,
291 call_id: MsgId,
292 ) -> Result<()> {
293 sleep(Duration::from_secs(wait)).await;
294 let mut call = context.load_call_by_id(call_id).await?;
295 if !call.is_accepted() && !call.is_ended() {
296 if call.is_incoming() {
297 call.mark_as_canceled(&context).await?;
298 call.update_text(&context, "Missed call").await?;
299 } else {
300 call.mark_as_ended(&context).await?;
301 call.update_text(&context, "Canceled call").await?;
302 }
303 context.emit_msgs_changed(call.msg.chat_id, call_id);
304 context.emit_event(EventType::CallEnded {
305 msg_id: call.msg.id,
306 chat_id: call.msg.chat_id,
307 });
308 }
309 Ok(())
310 }
311
312 pub(crate) async fn handle_call_msg(
313 &self,
314 call_id: MsgId,
315 mime_message: &MimeMessage,
316 from_id: ContactId,
317 ) -> Result<()> {
318 if mime_message.is_call() {
319 let call = self.load_call_by_id(call_id).await?;
320
321 if call.is_incoming() {
322 if call.is_stale() {
323 call.update_text(self, "Missed call").await?;
324 self.emit_incoming_msg(call.msg.chat_id, call_id); } else {
326 call.update_text(self, "Incoming call").await?;
327 self.emit_msgs_changed(call.msg.chat_id, call_id); let has_video = match sdp_has_video(&call.place_call_info) {
329 Ok(has_video) => has_video,
330 Err(err) => {
331 warn!(self, "Failed to determine if SDP offer has video: {err:#}.");
332 false
333 }
334 };
335 self.emit_event(EventType::IncomingCall {
336 msg_id: call.msg.id,
337 chat_id: call.msg.chat_id,
338 place_call_info: call.place_call_info.to_string(),
339 has_video,
340 });
341 let wait = call.remaining_ring_seconds();
342 task::spawn(Context::emit_end_call_if_unaccepted(
343 self.clone(),
344 wait.try_into()?,
345 call.msg.id,
346 ));
347 }
348 } else {
349 call.update_text(self, "Outgoing call").await?;
350 self.emit_msgs_changed(call.msg.chat_id, call_id);
351 }
352 } else {
353 match mime_message.is_system_message {
354 SystemMessage::CallAccepted => {
355 let mut call = self.load_call_by_id(call_id).await?;
356 if call.is_ended() || call.is_accepted() {
357 info!(self, "CallAccepted received for accepted/ended call");
358 return Ok(());
359 }
360
361 call.mark_as_accepted(self).await?;
362 self.emit_msgs_changed(call.msg.chat_id, call_id);
363 if call.is_incoming() {
364 self.emit_event(EventType::IncomingCallAccepted {
365 msg_id: call.msg.id,
366 chat_id: call.msg.chat_id,
367 });
368 } else {
369 let accept_call_info = mime_message
370 .get_header(HeaderDef::ChatWebrtcAccepted)
371 .unwrap_or_default();
372 self.emit_event(EventType::OutgoingCallAccepted {
373 msg_id: call.msg.id,
374 chat_id: call.msg.chat_id,
375 accept_call_info: accept_call_info.to_string(),
376 });
377 }
378 }
379 SystemMessage::CallEnded => {
380 let mut call = self.load_call_by_id(call_id).await?;
381 if call.is_ended() {
382 info!(self, "CallEnded received for ended call");
384 return Ok(());
385 }
386
387 if !call.is_accepted() {
388 if call.is_incoming() {
389 if from_id == ContactId::SELF {
390 call.mark_as_ended(self).await?;
391 call.update_text(self, "Declined call").await?;
392 } else {
393 call.mark_as_canceled(self).await?;
394 call.update_text(self, "Missed call").await?;
395 }
396 } else {
397 if from_id == ContactId::SELF {
399 call.mark_as_canceled(self).await?;
400 call.update_text(self, "Canceled call").await?;
401 } else {
402 call.mark_as_ended(self).await?;
403 call.update_text(self, "Declined call").await?;
404 }
405 }
406 } else {
407 call.mark_as_ended(self).await?;
408 call.update_text_duration(self).await?;
409 }
410
411 self.emit_msgs_changed(call.msg.chat_id, call_id);
412 self.emit_event(EventType::CallEnded {
413 msg_id: call.msg.id,
414 chat_id: call.msg.chat_id,
415 });
416 }
417 _ => {}
418 }
419 }
420 Ok(())
421 }
422
423 pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<CallInfo> {
425 let call = Message::load_from_db(self, call_id).await?;
426 self.load_call_by_message(call)
427 }
428
429 fn load_call_by_message(&self, call: Message) -> Result<CallInfo> {
430 ensure!(call.viewtype == Viewtype::Call);
431
432 Ok(CallInfo {
433 place_call_info: call
434 .param
435 .get(Param::WebrtcRoom)
436 .unwrap_or_default()
437 .to_string(),
438 accept_call_info: call
439 .param
440 .get(Param::WebrtcAccepted)
441 .unwrap_or_default()
442 .to_string(),
443 msg: call,
444 })
445 }
446}
447
448pub fn sdp_has_video(sdp: &str) -> Result<bool> {
450 let mut cursor = Cursor::new(sdp);
451 let session_description =
452 SessionDescription::unmarshal(&mut cursor).context("Failed to parse SDP")?;
453 for media_description in &session_description.media_descriptions {
454 if media_description.media_name.media == "video" {
455 return Ok(true);
456 }
457 }
458 Ok(false)
459}
460
461#[derive(Debug, PartialEq, Eq)]
463pub enum CallState {
464 Alerting,
470
471 Active,
473
474 Completed {
477 duration: i64,
479 },
480
481 Missed,
484
485 Declined,
489
490 Canceled,
497}
498
499pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> {
501 let call = context.load_call_by_id(msg_id).await?;
502 let state = if call.is_incoming() {
503 if call.is_accepted() {
504 if call.is_ended() {
505 CallState::Completed {
506 duration: call.duration_seconds(),
507 }
508 } else {
509 CallState::Active
510 }
511 } else if call.is_canceled() {
512 CallState::Missed
515 } else if call.is_ended() {
516 CallState::Declined
517 } else if call.is_stale() {
518 CallState::Missed
519 } else {
520 CallState::Alerting
521 }
522 } else if call.is_accepted() {
523 if call.is_ended() {
524 CallState::Completed {
525 duration: call.duration_seconds(),
526 }
527 } else {
528 CallState::Active
529 }
530 } else if call.is_canceled() {
531 CallState::Canceled
532 } else if call.is_ended() || call.is_stale() {
533 CallState::Declined
534 } else {
535 CallState::Alerting
536 };
537 Ok(state)
538}
539
540#[derive(Serialize, Debug, Clone, PartialEq)]
542struct IceServer {
543 pub urls: Vec<String>,
545
546 pub username: Option<String>,
548
549 pub credential: Option<String>,
551}
552
553async fn create_ice_servers(
555 context: &Context,
556 hostname: &str,
557 port: u16,
558 username: &str,
559 password: &str,
560) -> Result<String> {
561 let load_cache = false;
563 let urls: Vec<String> = lookup_host_with_cache(context, hostname, port, "", load_cache)
564 .await?
565 .into_iter()
566 .map(|addr| format!("turn:{addr}"))
567 .collect();
568
569 let ice_server = IceServer {
570 urls,
571 username: Some(username.to_string()),
572 credential: Some(password.to_string()),
573 };
574
575 let json = serde_json::to_string(&[ice_server])?;
576 Ok(json)
577}
578
579pub(crate) async fn create_ice_servers_from_metadata(
589 context: &Context,
590 metadata: &str,
591) -> Result<(i64, String)> {
592 let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?;
593 let (port, rest) = rest.split_once(':').context("Missing port")?;
594 let port = u16::from_str(port).context("Failed to parse the port")?;
595 let (ts, password) = rest.split_once(':').context("Missing timestamp")?;
596 let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?;
597 let ice_servers = create_ice_servers(context, hostname, port, ts, password).await?;
598 Ok((expiration_timestamp, ice_servers))
599}
600
601pub(crate) async fn create_fallback_ice_servers(context: &Context) -> Result<String> {
603 let hostname = "nine.testrun.org";
612
613 let load_cache = false;
615 let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache)
616 .await?
617 .into_iter()
618 .map(|addr| format!("stun:{addr}"))
619 .collect();
620
621 let ice_server = IceServer {
622 urls,
623 username: None,
624 credential: None,
625 };
626
627 let json = serde_json::to_string(&[ice_server])?;
628 Ok(json)
629}
630
631pub async fn ice_servers(context: &Context) -> Result<String> {
641 if let Some(ref metadata) = *context.metadata.read().await {
642 Ok(metadata.ice_servers.clone())
643 } else {
644 Ok("[]".to_string())
645 }
646}
647
648#[cfg(test)]
649mod calls_tests;