deltachat/
calls.rs

1//! # Handle calls.
2//!
3//! Internally, calls are bound a user-visible message initializing the call.
4//! This means, the "Call ID" is a "Message ID" - similar to Webxdc IDs.
5use crate::chat::ChatIdBlocked;
6use crate::chat::{Chat, ChatId, send_msg};
7use crate::constants::{Blocked, Chattype};
8use crate::contact::ContactId;
9use crate::context::{Context, WeakContext};
10use crate::events::EventType;
11use crate::headerdef::HeaderDef;
12use crate::log::warn;
13use crate::message::{Message, MsgId, Viewtype};
14use crate::mimeparser::{MimeMessage, SystemMessage};
15use crate::net::dns::lookup_host_with_cache;
16use crate::param::Param;
17use crate::tools::{normalize_text, time};
18use anyhow::{Context as _, Result, ensure};
19use sdp::SessionDescription;
20use serde::Serialize;
21use std::io::Cursor;
22use std::str::FromStr;
23use std::time::Duration;
24use tokio::task;
25use tokio::time::sleep;
26
27/// How long callee's or caller's phone ring.
28///
29/// For the callee, this is to prevent endless ringing
30/// in case the initial "call" is received, but then the caller went offline.
31/// Moreover, this prevents outdated calls to ring
32/// in case the initial "call" message arrives delayed.
33///
34/// For the caller, this means they should also not wait longer,
35/// as the callee won't start the call afterwards.
36const RINGING_SECONDS: i64 = 120;
37
38// For persisting parameters in the call, we use Param::Arg*
39
40const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
41const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
42
43const STUN_PORT: u16 = 3478;
44
45/// Set if incoming call was ended explicitly
46/// by the other side before we accepted it.
47///
48/// It is used to distinguish "ended" calls
49/// that are rejected by us from the calls
50/// canceled by the other side
51/// immediately after ringing started.
52const CALL_CANCELED_TIMESTAMP: Param = Param::Arg2;
53
54/// Information about the status of a call.
55#[derive(Debug, Default)]
56pub struct CallInfo {
57    /// User-defined text as given to place_outgoing_call()
58    pub place_call_info: String,
59
60    /// User-defined text as given to accept_incoming_call()
61    pub accept_call_info: String,
62
63    /// Message referring to the call.
64    /// Data are persisted along with the message using Param::Arg*
65    pub msg: Message,
66}
67
68impl CallInfo {
69    /// Returns true if the call is an incoming call.
70    pub fn is_incoming(&self) -> bool {
71        self.msg.from_id != ContactId::SELF
72    }
73
74    /// Returns true if the call should not ring anymore.
75    pub fn is_stale(&self) -> bool {
76        (self.is_incoming() || self.msg.timestamp_sent != 0) && self.remaining_ring_seconds() <= 0
77    }
78
79    fn remaining_ring_seconds(&self) -> i64 {
80        let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
81        remaining_seconds.clamp(0, RINGING_SECONDS)
82    }
83
84    async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
85        context
86            .sql
87            .execute(
88                "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
89                (text, normalize_text(text), self.msg.id),
90            )
91            .await?;
92        Ok(())
93    }
94
95    async fn update_text_duration(&self, context: &Context) -> Result<()> {
96        let minutes = self.duration_seconds() / 60;
97        let duration = match minutes {
98            0 => "<1 minute".to_string(),
99            1 => "1 minute".to_string(),
100            n => format!("{n} minutes"),
101        };
102
103        if self.is_incoming() {
104            self.update_text(context, &format!("Incoming call\n{duration}"))
105                .await?;
106        } else {
107            self.update_text(context, &format!("Outgoing call\n{duration}"))
108                .await?;
109        }
110        Ok(())
111    }
112
113    /// Mark calls as accepted.
114    /// This is needed for all devices where a stale-timer runs, to prevent accepted calls being terminated as stale.
115    async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
116        self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
117        self.msg.update_param(context).await?;
118        Ok(())
119    }
120
121    /// Returns true if the call is accepted.
122    pub fn is_accepted(&self) -> bool {
123        self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
124    }
125
126    /// Returns true if the call is missed
127    /// because the caller canceled it
128    /// explicitly before ringing stopped.
129    ///
130    /// For outgoing calls this means
131    /// the receiver has rejected the call
132    /// explicitly.
133    pub fn is_canceled(&self) -> bool {
134        self.msg.param.exists(CALL_CANCELED_TIMESTAMP)
135    }
136
137    async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
138        self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
139        self.msg.update_param(context).await?;
140        Ok(())
141    }
142
143    /// Explicitly mark the call as canceled.
144    ///
145    /// For incoming calls this should be called
146    /// when "call ended" message is received
147    /// from the caller before we picked up the call.
148    /// In this case the call becomes "missed" early
149    /// before the ringing timeout.
150    async fn mark_as_canceled(&mut self, context: &Context) -> Result<()> {
151        let now = time();
152        self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, now);
153        self.msg.param.set_i64(CALL_CANCELED_TIMESTAMP, now);
154        self.msg.update_param(context).await?;
155        Ok(())
156    }
157
158    /// Returns true if the call is ended.
159    pub fn is_ended(&self) -> bool {
160        self.msg.param.exists(CALL_ENDED_TIMESTAMP)
161    }
162
163    /// Returns call duration in seconds.
164    pub fn duration_seconds(&self) -> i64 {
165        if let (Some(start), Some(end)) = (
166            self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
167            self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
168        ) {
169            let seconds = end - start;
170            if seconds <= 0 {
171                return 1;
172            }
173            return seconds;
174        }
175        0
176    }
177}
178
179impl Context {
180    /// Start an outgoing call.
181    pub async fn place_outgoing_call(
182        &self,
183        chat_id: ChatId,
184        place_call_info: String,
185    ) -> Result<MsgId> {
186        let chat = Chat::load_from_db(self, chat_id).await?;
187        ensure!(
188            chat.typ == Chattype::Single,
189            "Can only place calls in 1:1 chats"
190        );
191        ensure!(!chat.is_self_talk(), "Cannot call self");
192
193        let mut call = Message {
194            viewtype: Viewtype::Call,
195            text: "Outgoing call".into(),
196            ..Default::default()
197        };
198        call.param.set(Param::WebrtcRoom, &place_call_info);
199        call.id = send_msg(self, chat_id, &mut call).await?;
200
201        let wait = RINGING_SECONDS;
202        let context = self.get_weak_context();
203        task::spawn(Context::emit_end_call_if_unaccepted(
204            context,
205            wait.try_into()?,
206            call.id,
207        ));
208
209        Ok(call.id)
210    }
211
212    /// Accept an incoming call.
213    pub async fn accept_incoming_call(
214        &self,
215        call_id: MsgId,
216        accept_call_info: String,
217    ) -> Result<()> {
218        let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
219            format!("accept_incoming_call is called with {call_id} which does not refer to a call")
220        })?;
221        ensure!(call.is_incoming());
222        if call.is_accepted() || call.is_ended() {
223            info!(self, "Call already accepted/ended");
224            return Ok(());
225        }
226
227        call.mark_as_accepted(self).await?;
228        let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
229        if chat.is_contact_request() {
230            chat.id.accept(self).await?;
231        }
232
233        // send an acceptance message around: to the caller as well as to the other devices of the callee
234        let mut msg = Message {
235            viewtype: Viewtype::Text,
236            text: "[Call accepted]".into(),
237            ..Default::default()
238        };
239        msg.param.set_cmd(SystemMessage::CallAccepted);
240        msg.hidden = true;
241        msg.param
242            .set(Param::WebrtcAccepted, accept_call_info.to_string());
243        msg.set_quote(self, Some(&call.msg)).await?;
244        msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
245        self.emit_event(EventType::IncomingCallAccepted {
246            msg_id: call.msg.id,
247            chat_id: call.msg.chat_id,
248        });
249        self.emit_msgs_changed(call.msg.chat_id, call_id);
250        Ok(())
251    }
252
253    /// Cancel, decline or hangup an incoming or outgoing call.
254    pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
255        let mut call: CallInfo = self.load_call_by_id(call_id).await?.with_context(|| {
256            format!("end_call is called with {call_id} which does not refer to a call")
257        })?;
258        if call.is_ended() {
259            info!(self, "Call already ended");
260            return Ok(());
261        }
262
263        if !call.is_accepted() {
264            if call.is_incoming() {
265                call.mark_as_ended(self).await?;
266                call.update_text(self, "Declined call").await?;
267            } else {
268                call.mark_as_canceled(self).await?;
269                call.update_text(self, "Canceled call").await?;
270            }
271        } else {
272            call.mark_as_ended(self).await?;
273            call.update_text_duration(self).await?;
274        }
275
276        let mut msg = Message {
277            viewtype: Viewtype::Text,
278            text: "[Call ended]".into(),
279            ..Default::default()
280        };
281        msg.param.set_cmd(SystemMessage::CallEnded);
282        msg.hidden = true;
283        msg.set_quote(self, Some(&call.msg)).await?;
284        msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
285
286        self.emit_event(EventType::CallEnded {
287            msg_id: call.msg.id,
288            chat_id: call.msg.chat_id,
289        });
290        self.emit_msgs_changed(call.msg.chat_id, call_id);
291        Ok(())
292    }
293
294    async fn emit_end_call_if_unaccepted(
295        context: WeakContext,
296        wait: u64,
297        call_id: MsgId,
298    ) -> Result<()> {
299        sleep(Duration::from_secs(wait)).await;
300        let context = context.upgrade()?;
301        let Some(mut call) = context.load_call_by_id(call_id).await? else {
302            warn!(
303                context,
304                "emit_end_call_if_unaccepted is called with {call_id} which does not refer to a call."
305            );
306            return Ok(());
307        };
308        if !call.is_accepted() && !call.is_ended() {
309            if call.is_incoming() {
310                call.mark_as_canceled(&context).await?;
311                call.update_text(&context, "Missed call").await?;
312            } else {
313                call.mark_as_ended(&context).await?;
314                call.update_text(&context, "Canceled call").await?;
315            }
316            context.emit_msgs_changed(call.msg.chat_id, call_id);
317            context.emit_event(EventType::CallEnded {
318                msg_id: call.msg.id,
319                chat_id: call.msg.chat_id,
320            });
321        }
322        Ok(())
323    }
324
325    pub(crate) async fn handle_call_msg(
326        &self,
327        call_id: MsgId,
328        mime_message: &MimeMessage,
329        from_id: ContactId,
330    ) -> Result<()> {
331        if mime_message.is_call() {
332            let Some(call) = self.load_call_by_id(call_id).await? else {
333                warn!(self, "{call_id} does not refer to a call message");
334                return Ok(());
335            };
336
337            if call.is_incoming() {
338                if call.is_stale() {
339                    call.update_text(self, "Missed call").await?;
340                    self.emit_incoming_msg(call.msg.chat_id, call_id); // notify missed call
341                } else {
342                    call.update_text(self, "Incoming call").await?;
343                    self.emit_msgs_changed(call.msg.chat_id, call_id); // ringing calls are not additionally notified
344                    let has_video = match sdp_has_video(&call.place_call_info) {
345                        Ok(has_video) => has_video,
346                        Err(err) => {
347                            warn!(self, "Failed to determine if SDP offer has video: {err:#}.");
348                            false
349                        }
350                    };
351                    if let Some(chat_id_blocked) =
352                        ChatIdBlocked::lookup_by_contact(self, from_id).await?
353                    {
354                        match chat_id_blocked.blocked {
355                            Blocked::Not => {
356                                self.emit_event(EventType::IncomingCall {
357                                    msg_id: call.msg.id,
358                                    chat_id: call.msg.chat_id,
359                                    place_call_info: call.place_call_info.to_string(),
360                                    has_video,
361                                });
362                            }
363                            Blocked::Yes | Blocked::Request => {
364                                // Do not notify about incoming calls
365                                // from contact requests and blocked contacts.
366                                //
367                                // User can still access the call and accept it
368                                // via the chat in case of contact requests.
369                            }
370                        }
371                    }
372                    let wait = call.remaining_ring_seconds();
373                    let context = self.get_weak_context();
374                    task::spawn(Context::emit_end_call_if_unaccepted(
375                        context,
376                        wait.try_into()?,
377                        call.msg.id,
378                    ));
379                }
380            } else {
381                call.update_text(self, "Outgoing call").await?;
382                self.emit_msgs_changed(call.msg.chat_id, call_id);
383            }
384        } else {
385            match mime_message.is_system_message {
386                SystemMessage::CallAccepted => {
387                    let Some(mut call) = self.load_call_by_id(call_id).await? else {
388                        warn!(self, "{call_id} does not refer to a call message");
389                        return Ok(());
390                    };
391
392                    if call.is_ended() || call.is_accepted() {
393                        info!(self, "CallAccepted received for accepted/ended call");
394                        return Ok(());
395                    }
396
397                    call.mark_as_accepted(self).await?;
398                    self.emit_msgs_changed(call.msg.chat_id, call_id);
399                    if call.is_incoming() {
400                        self.emit_event(EventType::IncomingCallAccepted {
401                            msg_id: call.msg.id,
402                            chat_id: call.msg.chat_id,
403                        });
404                    } else {
405                        let accept_call_info = mime_message
406                            .get_header(HeaderDef::ChatWebrtcAccepted)
407                            .unwrap_or_default();
408                        self.emit_event(EventType::OutgoingCallAccepted {
409                            msg_id: call.msg.id,
410                            chat_id: call.msg.chat_id,
411                            accept_call_info: accept_call_info.to_string(),
412                        });
413                    }
414                }
415                SystemMessage::CallEnded => {
416                    let Some(mut call) = self.load_call_by_id(call_id).await? else {
417                        warn!(self, "{call_id} does not refer to a call message");
418                        return Ok(());
419                    };
420
421                    if call.is_ended() {
422                        // may happen eg. if a a message is missed
423                        info!(self, "CallEnded received for ended call");
424                        return Ok(());
425                    }
426
427                    if !call.is_accepted() {
428                        if call.is_incoming() {
429                            if from_id == ContactId::SELF {
430                                call.mark_as_ended(self).await?;
431                                call.update_text(self, "Declined call").await?;
432                            } else {
433                                call.mark_as_canceled(self).await?;
434                                call.update_text(self, "Missed call").await?;
435                            }
436                        } else {
437                            // outgoing
438                            if from_id == ContactId::SELF {
439                                call.mark_as_canceled(self).await?;
440                                call.update_text(self, "Canceled call").await?;
441                            } else {
442                                call.mark_as_ended(self).await?;
443                                call.update_text(self, "Declined call").await?;
444                            }
445                        }
446                    } else {
447                        call.mark_as_ended(self).await?;
448                        call.update_text_duration(self).await?;
449                    }
450
451                    self.emit_msgs_changed(call.msg.chat_id, call_id);
452                    self.emit_event(EventType::CallEnded {
453                        msg_id: call.msg.id,
454                        chat_id: call.msg.chat_id,
455                    });
456                }
457                _ => {}
458            }
459        }
460        Ok(())
461    }
462
463    /// Loads information about the call given its ID.
464    ///
465    /// If the message referred to by ID is
466    /// not a call message, returns `None`.
467    pub async fn load_call_by_id(&self, call_id: MsgId) -> Result<Option<CallInfo>> {
468        let call = Message::load_from_db(self, call_id).await?;
469        Ok(self.load_call_by_message(call))
470    }
471
472    // Loads information about the call given the `Message`.
473    //
474    // If the `Message` is not a call message, returns `None`
475    fn load_call_by_message(&self, call: Message) -> Option<CallInfo> {
476        if call.viewtype != Viewtype::Call {
477            // This can happen e.g. if a "call accepted"
478            // or "call ended" message is received
479            // with `In-Reply-To` referring to non-call message.
480            return None;
481        }
482
483        Some(CallInfo {
484            place_call_info: call
485                .param
486                .get(Param::WebrtcRoom)
487                .unwrap_or_default()
488                .to_string(),
489            accept_call_info: call
490                .param
491                .get(Param::WebrtcAccepted)
492                .unwrap_or_default()
493                .to_string(),
494            msg: call,
495        })
496    }
497}
498
499/// Returns true if SDP offer has a video.
500pub fn sdp_has_video(sdp: &str) -> Result<bool> {
501    let mut cursor = Cursor::new(sdp);
502    let session_description =
503        SessionDescription::unmarshal(&mut cursor).context("Failed to parse SDP")?;
504    for media_description in &session_description.media_descriptions {
505        if media_description.media_name.media == "video" {
506            return Ok(true);
507        }
508    }
509    Ok(false)
510}
511
512/// State of the call for display in the message bubble.
513#[derive(Debug, PartialEq, Eq)]
514pub enum CallState {
515    /// Fresh incoming or outgoing call that is still ringing.
516    ///
517    /// There is no separate state for outgoing call
518    /// that has been dialled but not ringing on the other side yet
519    /// as we don't know whether the other side received our call.
520    Alerting,
521
522    /// Active call.
523    Active,
524
525    /// Completed call that was once active
526    /// and then was terminated for any reason.
527    Completed {
528        /// Call duration in seconds.
529        duration: i64,
530    },
531
532    /// Incoming call that was not picked up within a timeout
533    /// or was explicitly ended by the caller before we picked up.
534    Missed,
535
536    /// Incoming call that was explicitly ended on our side
537    /// before picking up or outgoing call
538    /// that was declined before the timeout.
539    Declined,
540
541    /// Outgoing call that has been canceled on our side
542    /// before receiving a response.
543    ///
544    /// Incoming calls cannot be canceled,
545    /// on the receiver side canceled calls
546    /// usually result in missed calls.
547    Canceled,
548}
549
550/// Returns call state given the message ID.
551///
552/// Returns an error if the message is not a call message.
553pub async fn call_state(context: &Context, msg_id: MsgId) -> Result<CallState> {
554    let call = context
555        .load_call_by_id(msg_id)
556        .await?
557        .with_context(|| format!("{msg_id} is not a call message"))?;
558    let state = if call.is_incoming() {
559        if call.is_accepted() {
560            if call.is_ended() {
561                CallState::Completed {
562                    duration: call.duration_seconds(),
563                }
564            } else {
565                CallState::Active
566            }
567        } else if call.is_canceled() {
568            // Call was explicitly canceled
569            // by the caller before we picked it up.
570            CallState::Missed
571        } else if call.is_ended() {
572            CallState::Declined
573        } else if call.is_stale() {
574            CallState::Missed
575        } else {
576            CallState::Alerting
577        }
578    } else if call.is_accepted() {
579        if call.is_ended() {
580            CallState::Completed {
581                duration: call.duration_seconds(),
582            }
583        } else {
584            CallState::Active
585        }
586    } else if call.is_canceled() {
587        CallState::Canceled
588    } else if call.is_ended() || call.is_stale() {
589        CallState::Declined
590    } else {
591        CallState::Alerting
592    };
593    Ok(state)
594}
595
596/// ICE server for JSON serialization.
597#[derive(Serialize, Debug, Clone, PartialEq)]
598struct IceServer {
599    /// STUN or TURN URLs.
600    pub urls: Vec<String>,
601
602    /// Username for TURN server authentication.
603    pub username: Option<String>,
604
605    /// Password for logging into the server.
606    pub credential: Option<String>,
607}
608
609/// Creates JSON with ICE servers.
610async fn create_ice_servers(
611    context: &Context,
612    hostname: &str,
613    port: u16,
614    username: &str,
615    password: &str,
616) -> Result<String> {
617    // Do not use cache because there is no TLS.
618    let load_cache = false;
619    let urls: Vec<String> = lookup_host_with_cache(context, hostname, port, "", load_cache)
620        .await?
621        .into_iter()
622        .map(|addr| format!("turn:{addr}"))
623        .collect();
624
625    let ice_server = IceServer {
626        urls,
627        username: Some(username.to_string()),
628        credential: Some(password.to_string()),
629    };
630
631    let json = serde_json::to_string(&[ice_server])?;
632    Ok(json)
633}
634
635/// Creates JSON with ICE servers from a line received over IMAP METADATA.
636///
637/// IMAP METADATA returns a line such as
638/// `example.com:3478:1758650868:8Dqkyyu11MVESBqjbIylmB06rv8=`
639///
640/// 1758650868 is the username and expiration timestamp
641/// at the same time,
642/// while `8Dqkyyu11MVESBqjbIylmB06rv8=`
643/// is the password.
644pub(crate) async fn create_ice_servers_from_metadata(
645    context: &Context,
646    metadata: &str,
647) -> Result<(i64, String)> {
648    let (hostname, rest) = metadata.split_once(':').context("Missing hostname")?;
649    let (port, rest) = rest.split_once(':').context("Missing port")?;
650    let port = u16::from_str(port).context("Failed to parse the port")?;
651    let (ts, password) = rest.split_once(':').context("Missing timestamp")?;
652    let expiration_timestamp = i64::from_str(ts).context("Failed to parse the timestamp")?;
653    let ice_servers = create_ice_servers(context, hostname, port, ts, password).await?;
654    Ok((expiration_timestamp, ice_servers))
655}
656
657/// Creates JSON with ICE servers when no TURN servers are known.
658pub(crate) async fn create_fallback_ice_servers(context: &Context) -> Result<String> {
659    // Do not use public STUN server from https://stunprotocol.org/.
660    // It changes the hostname every year
661    // (e.g. stunserver2025.stunprotocol.org
662    // which was previously stunserver2024.stunprotocol.org)
663    // because of bandwidth costs:
664    // <https://github.com/jselbie/stunserver/issues/50>
665
666    let hostname = "nine.testrun.org";
667    // Do not use cache because there is no TLS.
668    let load_cache = false;
669    let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache)
670        .await?
671        .into_iter()
672        .map(|addr| format!("stun:{addr}"))
673        .collect();
674    let stun_server = IceServer {
675        urls,
676        username: None,
677        credential: None,
678    };
679
680    let hostname = "turn.delta.chat";
681    // Do not use cache because there is no TLS.
682    let load_cache = false;
683    let urls: Vec<String> = lookup_host_with_cache(context, hostname, STUN_PORT, "", load_cache)
684        .await?
685        .into_iter()
686        .map(|addr| format!("turn:{addr}"))
687        .collect();
688    let turn_server = IceServer {
689        urls,
690        username: Some("public".to_string()),
691        credential: Some("o4tR7yG4rG2slhXqRUf9zgmHz".to_string()),
692    };
693
694    let json = serde_json::to_string(&[stun_server, turn_server])?;
695    Ok(json)
696}
697
698/// Returns JSON with ICE servers.
699///
700/// <https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/RTCPeerConnection#iceservers>
701///
702/// All returned servers are resolved to their IP addresses.
703/// The primary point of DNS lookup is that Delta Chat Desktop
704/// relies on the servers being specified by IP,
705/// because it itself cannot utilize DNS. See
706/// <https://github.com/deltachat/deltachat-desktop/issues/5447>.
707pub async fn ice_servers(context: &Context) -> Result<String> {
708    if let Some(ref metadata) = *context.metadata.read().await {
709        Ok(metadata.ice_servers.clone())
710    } else {
711        Ok("[]".to_string())
712    }
713}
714
715#[cfg(test)]
716mod calls_tests;