deltachat/
calls.rs

1//! # Handle calls.
2//!
3//! Internally, calls are bound a user-visible message initializing the call.
4//! This means, the "Call ID" is a "Message ID" - similar to Webxdc IDs.
5use crate::chat::ChatIdBlocked;
6use crate::chat::{Chat, ChatId, send_msg};
7use crate::constants::{Blocked, Chattype};
8use crate::contact::ContactId;
9use crate::context::Context;
10use crate::events::EventType;
11use crate::headerdef::HeaderDef;
12use crate::log::{info, warn};
13use crate::message::{self, Message, MsgId, Viewtype};
14use crate::mimeparser::{MimeMessage, SystemMessage};
15use crate::net::dns::lookup_host_with_cache;
16use crate::param::Param;
17use crate::tools::time;
18use anyhow::{Context as _, Result, ensure};
19use sdp::SessionDescription;
20use serde::Serialize;
21use std::io::Cursor;
22use std::str::FromStr;
23use std::time::Duration;
24use tokio::task;
25use tokio::time::sleep;
26
27/// How long callee's or caller's phone ring.
28///
29/// For the callee, this is to prevent endless ringing
30/// in case the initial "call" is received, but then the caller went offline.
31/// Moreover, this prevents outdated calls to ring
32/// in case the initial "call" message arrives delayed.
33///
34/// For the caller, this means they should also not wait longer,
35/// as the callee won't start the call afterwards.
36const RINGING_SECONDS: i64 = 60;
37
38// For persisting parameters in the call, we use Param::Arg*
39
40const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
41const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
42
43const STUN_PORT: u16 = 3478;
44
45/// Set if incoming call was ended explicitly
46/// by the other side before we accepted it.
47///
48/// It is used to distinguish "ended" calls
49/// that are rejected by us from the calls
50/// canceled by the other side
51/// immediately after ringing started.
52const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2;
53
54/// Information about the status of a call.
55#[derive(Debug, Default)]
56pub struct CallInfo {
57    /// User-defined text as given to place_outgoing_call()
58    pub place_call_info: String,
59
60    /// User-defined text as given to accept_incoming_call()
61    pub accept_call_info: String,
62
63    /// Message referring to the call.
64    /// Data are persisted along with the message using Param::Arg*
65    pub msg: Message,
66}
67
68impl CallInfo {
69    /// Returns true if the call is an incoming call.
70    pub fn is_incoming(&self) -> bool {
71        self.msg.from_id != ContactId::SELF
72    }
73
74    /// Returns true if the call should not ring anymore.
75    pub fn is_stale(&self) -> bool {
76        (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0
77    }
78
79    fn remaining_ring_seconds(&self) -> i64 {
80        let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
81        remaining_seconds.clamp(0, RINGING_SECONDS)
82    }
83
84    async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
85        context
86            .sql
87            .execute(
88                "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
89                (text, message::normalize_text(text), self.msg.id),
90            )
91            .await?;
92        Ok(())
93    }
94
95    async fn update_text_duration(&self, context: &Context) -> Result<()> {
96        let minutes = self.duration_seconds() / 60;
97        let duration = match minutes {
98            0 => "<1 minute".to_string(),
99            1 => "1 minute".to_string(),
100            n => format!("{n} minutes"),
101        };
102
103        if self.is_incoming() {
104            self.update_text(context, &format!("Incoming call\n{duration}"))
105                .await?;
106        } else {
107            self.update_text(context, &format!("Outgoing call\n{duration}"))
108                .await?;
109        }
110        Ok(())
111    }
112
113    /// Mark calls as accepted.
114    /// This is needed for all devices where a stale-timer runs, to prevent accepted calls being terminated as stale.
115    async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
116        self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
117        self.msg.update_param(context).await?;
118        Ok(())
119    }
120
121    /// Returns true if the call is accepted.
122    pub fn is_accepted(&self) -> bool {
123        self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
124    }
125
126    /// Returns true if the call is missed
127    /// because the caller canceled it
128    /// explicitly before ringing stopped.
129    ///
130    /// For outgoing calls this means
131    /// the receiver has rejected the call
132    /// explicitly.
133    pub fn is_canceled(&self) -> bool {
134        self.msg.param.exists(CALL_CANCELED_TIMESTAMP)
135    }
136
137    async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
138        self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
139        self.msg.update_param(context).await?;
140        Ok(())
141    }
142
143    /// Explicitly mark the call as canceled.
144    ///
145    /// For incoming calls this should be called
146    /// when "call ended" message is received
147    /// from the caller before we picked up the call.
148    /// In this case the call becomes "missed" early
149    /// before the ringing timeout.
150    async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> {
151        let now = time();
152        self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
153        self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
154        self.msg.update_param(context).await?;
155        Ok(())
156    }
157
158    /// Returns true if the call is ended.
159    pub fn is_ended(&self) -> bool {
160        self.msg.param.exists(CALL_ENDED_TIMESTAMP)
161    }
162
163    /// Returns call duration in seconds.
164    pub fn duration_seconds(&self) -> i64 {
165        if let (Some(start), Some(end)) = (
166            self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
167            self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
168        ) {
169            let seconds = end - start;
170            if seconds <= 0 {
171                return 1;
172            }
173            return seconds;
174        }
175        0
176    }
177}
178
179impl Context {
180    /// Start an outgoing call.
181    pub async fn place_outgoing_call(
182        &self,
183        chat_id: ChatId,
184        place_call_info: String,
185    ) -> Result<MsgId> {
186        let chat = Chat::load_from_db(self, chat_id).await?;
187        ensure!(
188            chat.typ == Chattype::Single,
189            "Can only place calls in 1:1 chats"
190        );
191        ensure!(!chat.is_self_talk(), "Cannot call self");
192
193        let mut call = Message {
194            viewtype: Viewtype::Call,
195            text: "Outgoing call".into(),
196            ..Default::default()
197        };
198        call.param.set(Param::WebrtcRoom, &place_call_info);
199        call.id = send_msg(self, chat_id, &mut call).await?;
200
201        let wait = RINGING_SECONDS;
202        task::spawn(Context::emit_end_call_if_unaccepted(
203            self.clone(),
204            wait.try_into()?,
205            call.id,
206        ));
207
208        Ok(call.id)
209    }
210
211    /// Accept an incoming call.
212    pub async fn accept_incoming_call(
213        &self,
214        call_id: MsgId,
215        accept_call_info: String,
216    ) -> Result<()> {
217        let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
218            format!("accept_incoming_call is called with {call_id} which does not refer to a call")
219        })?;
220        ensure!(call.is_incoming());
221        if call.is_accepted() || call.is_ended() {
222            info!(self, "Call already accepted/ended");
223            return Ok(());
224        }
225
226        call.mark_as_accepted(self).await?;
227        let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
228        if chat.is_contact_request() {
229            chat.id.accept(self).await?;
230        }
231
232        // send an acceptance message around: to the caller as well as to the other devices of the callee
233        let mut msg = Message {
234            viewtype: Viewtype::Text,
235            text: "[Call accepted]".into(),
236            ..Default::default()
237        };
238        msg.param.set_cmd(SystemMessage::CallAccepted);
239        msg.hidden = true;
240        msg.param
241            .set(Param::WebrtcAccepted, accept_call_info.to_string());
242        msg.set_quote(self, Some(&call.msg)).await?;
243        msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
244        self.emit_event(EventType::IncomingCallAccepted {
245            msg_id: call.msg.id,
246            chat_id: call.msg.chat_id,
247        });
248        self.emit_msgs_changed(call.msg.chat_id, call_id);
249        Ok(())
250    }
251
252    /// Cancel, decline or hangup an incoming or outgoing call.
253    pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
254        let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
255            format!("end_call is called with {call_id} which does not refer to a call")
256        })?;
257        if call.is_ended() {
258            info!(self, "Call already ended");
259            return Ok(());
260        }
261
262        if !call.is_accepted() {
263            if call.is_incoming() {
264                call.mark_as_ended(self).await?;
265                call.update_text(self, "Declined call").await?;
266            } else {
267                call.mark_as_canceled(self).await?;
268                call.update_text(self, "Canceled call").await?;
269            }
270        } else {
271            call.mark_as_ended(self).await?;
272            call.update_text_duration(self).await?;
273        }
274
275        let mut msg = Message {
276            viewtype: Viewtype::Text,
277            text: "[Call ended]".into(),
278            ..Default::default()
279        };
280        msg.param.set_cmd(SystemMessage::CallEnded);
281        msg.hidden = true;
282        msg.set_quote(self, Some(&call.msg)).await?;
283        msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
284
285        self.emit_event(EventType::CallEnded {
286            msg_id: call.msg.id,
287            chat_id: call.msg.chat_id,
288        });
289        self.emit_msgs_changed(call.msg.chat_id, call_id);
290        Ok(())
291    }
292
293    async fn emit_end_call_if_unaccepted(
294        context: Context,
295        wait: u64,
296        call_id: MsgId,
297    ) -> Result<()> {
298        sleep(Duration::from_secs(wait)).await;
299        let Some(mut call) = context.load_call_by_id(call_id).await? else {
300            warn!(
301                context,
302                "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call."
303            );
304            return Ok(());
305        };
306        if !call.is_accepted() && !call.is_ended() {
307            if call.is_incoming() {
308                call.mark_as_canceled(&context).await?;
309                call.update_text(&context, "Missed call").await?;
310            } else {
311                call.mark_as_ended(&context).await?;
312                call.update_text(&context, "Canceled call").await?;
313            }
314            context.emit_msgs_changed(call.msg.chat_id, call_id);
315            context.emit_event(EventType::CallEnded {
316                msg_id: call.msg.id,
317                chat_id: call.msg.chat_id,
318            });
319        }
320        Ok(())
321    }
322
323    pub(crate) async fn handle_call_msg(
324        &self,
325        call_id: MsgId,
326        mime_message: &MimeMessage,
327        from_id: ContactId,
328    ) -> Result<()> {
329        if mime_message.is_call() {
330            let Some(call) = self.load_call_by_id(call_id).await? else {
331                warn!(self, "{call_id} does not refer to a call message");
332                return Ok(());
333            };
334
335            if call.is_incoming() {
336                if call.is_stale() {
337                    call.update_text(self, "Missed call").await?;
338                    self.emit_incoming_msg(call.msg.chat_id, call_id); // notify missed call
339                } else {
340                    call.update_text(self, "Incoming call").await?;
341                    self.emit_msgs_changed(call.msg.chat_id, call_id); // ringing calls are not additionally notified
342                    let has_video = match sdp_has_video(&call.place_call_info) {
343                        Ok(has_video) => has_video,
344                        Err(err) => {
345                            warn!(self, "Failed to determine if SDP offer has video: {err:#}.");
346                            false
347                        }
348                    };
349                    if let Some(chat_id_blocked) =
350                        ChatIdBlocked::lookup_by_contact(self, from_id).await?
351                    {
352                        match chat_id_blocked.blocked {
353                            Blocked::Not => {
354                                self.emit_event(EventType::IncomingCall {
355                                    msg_id: call.msg.id,
356                                    chat_id: call.msg.chat_id,
357                                    place_call_info: call.place_call_info.to_string(),
358                                    has_video,
359                                });
360                            }
361                            Blocked::Yes | Blocked::Request => {
362                                // Do not notify about incoming calls
363                                // from contact requests and blocked contacts.
364                                //
365                                // User can still access the call and accept it
366                                // via the chat in case of contact requests.
367                            }
368                        }
369                    }
370                    let wait = call.remaining_ring_seconds();
371                    task::spawn(Context::emit_end_call_if_unaccepted(
372                        self.clone(),
373                        wait.try_into()?,
374                        call.msg.id,
375                    ));
376                }
377            } else {
378                call.update_text(self, "Outgoing call").await?;
379                self.emit_msgs_changed(call.msg.chat_id, call_id);
380            }
381        } else {
382            match mime_message.is_system_message {
383                SystemMessage::CallAccepted => {
384                    let Some(mut call) = self.load_call_by_id(call_id).await? else {
385                        warn!(self, "{call_id} does not refer to a call message");
386                        return Ok(());
387                    };
388
389                    if call.is_ended() || call.is_accepted() {
390                        info!(self, "CallAccepted received for accepted/ended call");
391                        return Ok(());
392                    }
393
394                    call.mark_as_accepted(self).await?;
395                    self.emit_msgs_changed(call.msg.chat_id, call_id);
396                    if call.is_incoming() {
397                        self.emit_event(EventType::IncomingCallAccepted {
398                            msg_id: call.msg.id,
399                            chat_id: call.msg.chat_id,
400                        });
401                    } else {
402                        let accept_call_info = mime_message
403                            .get_header(HeaderDef::ChatWebrtcAccepted)
404                            .unwrap_or_default();
405                        self.emit_event(EventType::OutgoingCallAccepted {
406                            msg_id: call.msg.id,
407                            chat_id: call.msg.chat_id,
408                            accept_call_info: accept_call_info.to_string(),
409                        });
410                    }
411                }
412                SystemMessage::CallEnded => {
413                    let Some(mut call) = self.load_call_by_id(call_id).await? else {
414                        warn!(self, "{call_id} does not refer to a call message");
415                        return Ok(());
416                    };
417
418                    if call.is_ended() {
419                        // may happen eg. if a a message is missed
420                        info!(self, "CallEnded received for ended call");
421                        return Ok(());
422                    }
423
424                    if !call.is_accepted() {
425                        if call.is_incoming() {
426                            if from_id == ContactId::SELF {
427                                call.mark_as_ended(self).await?;
428                                call.update_text(self, "Declined call").await?;
429                            } else {
430                                call.mark_as_canceled(self).await?;
431                                call.update_text(self, "Missed call").await?;
432                            }
433                        } else {
434                            // outgoing
435                            if from_id == ContactId::SELF {
436                                call.mark_as_canceled(self).await?;
437                                call.update_text(self, "Canceled call").await?;
438                            } else {
439                                call.mark_as_ended(self).await?;
440                                call.update_text(self, "Declined call").await?;
441                            }
442                        }
443                    } else {
444                        call.mark_as_ended(self).await?;
445                        call.update_text_duration(self).await?;
446                    }
447
448                    self.emit_msgs_changed(call.msg.chat_id, call_id);
449                    self.emit_event(EventType::CallEnded {
450                        msg_id: call.msg.id,
451                        chat_id: call.msg.chat_id,
452                    });
453                }
454                _ => {}
455            }
456        }
457        Ok(())
458    }
459
460    /// Loads information about the call given its ID.
461    ///
462    /// If the message referred to by ID is
463    /// not a call message, returns `None`.
464    pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
465        let call = Message::load_from_db(self, call_id).await?;
466        Ok(self.load_call_by_message(call))
467    }
468
469    // Loads information about the call given the `Message`.
470    //
471    // If the `Message` is not a call message, returns `None`
472    fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
473        if call.viewtype != Viewtype::Call {
474            // This can happen e.g. if a "call accepted"
475            // or "call ended" message is received
476            // with `In-Reply-To` referring to non-call message.
477            return None;
478        }
479
480        Some(CallInfo {
481            place_call_info: call
482                .param
483                .get(Param::WebrtcRoom)
484                .unwrap_or_default()
485                .to_string(),
486            accept_call_info: call
487                .param
488                .get(Param::WebrtcAccepted)
489                .unwrap_or_default()
490                .to_string(),
491            msg: call,
492        })
493    }
494}
495
496/// Returns true if SDP offer has a video.
497pub fn sdp_has_video(sdp: &str) -> Result<bool> {
498    let mut cursor = Cursor::new(sdp);
499    let session_description =
500        SessionDescription::unmarshal(&mut cursor).context("Failed to parse SDP")?;
501    for media_description in &session_description.media_descriptions {
502        if media_description.media_name.media == "video" {
503            return Ok(true);
504        }
505    }
506    Ok(false)
507}
508
509/// State of the call for display in the message bubble.
510#[derive(Debug, PartialEq, Eq)]
511pub enum CallState {
512    /// Fresh incoming or outgoing call that is still ringing.
513    ///
514    /// There is no separate state for outgoing call
515    /// that has been dialled but not ringing on the other side yet
516    /// as we don't know whether the other side received our call.
517    Alerting,
518
519    /// Active call.
520    Active,
521
522    /// Completed call that was once active
523    /// and then was terminated for any reason.
524    Completed {
525        /// Call duration in seconds.
526        duration: i64,
527    },
528
529    /// Incoming call that was not picked up within a timeout
530    /// or was explicitly ended by the caller before we picked up.
531    Missed,
532
533    /// Incoming call that was explicitly ended on our side
534    /// before picking up or outgoing call
535    /// that was declined before the timeout.
536    Declined,
537
538    /// Outgoing call that has been canceled on our side
539    /// before receiving a response.
540    ///
541    /// Incoming calls cannot be canceled,
542    /// on the receiver side canceled calls
543    /// usually result in missed calls.
544    Canceled,
545}
546
547/// Returns call state given the message ID.
548///
549/// Returns an error if the message is not a call message.
550pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> {
551    let call = context
552        .load_call_by_id(msg_id)
553        .await?
554        .with_context(|| format!("{msg_id} is not a call message"))?;
555    let state = if call.is_incoming() {
556        if call.is_accepted() {
557            if call.is_ended() {
558                CallState::Completed {
559                    duration: call.duration_seconds(),
560                }
561            } else {
562                CallState::Active
563            }
564        } else if call.is_canceled() {
565            // Call was explicitly canceled
566            // by the caller before we picked it up.
567            CallState::Missed
568        } else if call.is_ended() {
569            CallState::Declined
570        } else if call.is_stale() {
571            CallState::Missed
572        } else {
573            CallState::Alerting
574        }
575    } else if call.is_accepted() {
576        if call.is_ended() {
577            CallState::Completed {
578                duration: call.duration_seconds(),
579            }
580        } else {
581            CallState::Active
582        }
583    } else if call.is_canceled() {
584        CallState::Canceled
585    } else if call.is_ended() || call.is_stale() {
586        CallState::Declined
587    } else {
588        CallState::Alerting
589    };
590    Ok(state)
591}
592
593/// ICE server for JSON serialization.
594#[derive(Serialize, Debug, Clone, PartialEq)]
595struct IceServer {
596    /// STUN or TURN URLs.
597    pub urls: Vec<String>,
598
599    /// Username for TURN server authentication.
600    pub username: Option<String>,
601
602    /// Password for logging into the server.
603    pub credential: Option<String>,
604}
605
606/// Creates JSON with ICE servers.
607async fn create_ice_servers(
608    context: &Context,
609    hostname: &str,
610    port: u16,
611    username: &str,
612    password: &str,
613) -> Result<String> {
614    // Do not use cache because there is no TLS.
615    let load_cache = false;
616    let urls: Vec<String> = lookup_host_with_cache(context, hostname, port, "", load_cache)
617        .await?
618        .into_iter()
619        .map(|addr| format!("turn:{addr}"))
620        .collect();
621
622    let ice_server = IceServer {
623        urls,
624        username: Some(username.to_string()),
625        credential: Some(password.to_string()),
626    };
627
628    let json = serde_json::to_string(&[ice_server])?;
629    Ok(json)
630}
631
632/// Creates JSON with ICE servers from a line received over IMAP METADATA.
633///
634/// IMAP METADATA returns a line such as
635/// `example.com:3478:1758650868:8Dqkyyu11MVESBqjbIylmB06rv8=`
636///
637/// 1758650868 is the username and expiration timestamp
638/// at the same time,
639/// while `8Dqkyyu11MVESBqjbIylmB06rv8=`
640/// is the password.
641pub(crate) async fn create_ice_servers_from_metadata(
642    context: &Context,
643    metadata: &str,
644) -> Result<(i64, String)> {
645    let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?;
646    let (port, rest) = rest.split_once(':').context("Missing port")?;
647    let port = u16::from_str(port).context("Failed to parse the port")?;
648    let (ts, password) = rest.split_once(':').context("Missing timestamp")?;
649    let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?;
650    let ice_servers = create_ice_servers(context, hostname, port, ts, password).await?;
651    Ok((expiration_timestamp, ice_servers))
652}
653
654/// Creates JSON with ICE servers when no TURN servers are known.
655pub(crate) async fn create_fallback_ice_servers(context: &Context) -> Result<String> {
656    // Do not use public STUN server from https://stunprotocol.org/.
657    // It changes the hostname every year
658    // (e.g. stunserver2025.stunprotocol.org
659    // which was previously stunserver2024.stunprotocol.org)
660    // because of bandwidth costs:
661    // <https://github.com/jselbie/stunserver/issues/50>
662
663    // We use nine.testrun.org for a default STUN server.
664    let hostname = "nine.testrun.org";
665
666    // Do not use cache because there is no TLS.
667    let load_cache = false;
668    let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache)
669        .await?
670        .into_iter()
671        .map(|addr| format!("stun:{addr}"))
672        .collect();
673
674    let ice_server = IceServer {
675        urls,
676        username: None,
677        credential: None,
678    };
679
680    let json = serde_json::to_string(&[ice_server])?;
681    Ok(json)
682}
683
684/// Returns JSON with ICE servers.
685///
686/// <https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/RTCPeerConnection#iceservers>
687///
688/// All returned servers are resolved to their IP addresses.
689/// The primary point of DNS lookup is that Delta Chat Desktop
690/// relies on the servers being specified by IP,
691/// because it itself cannot utilize DNS. See
692/// <https://github.com/deltachat/deltachat-desktop/issues/5447>.
693pub async fn ice_servers(context: &Context) -> Result<String> {
694    if let Some(ref metadata) = *context.metadata.read().await {
695        Ok(metadata.ice_servers.clone())
696    } else {
697        Ok("[]".to_string())
698    }
699}
700
701#[cfg(test)]
702mod calls_tests;