1use crate::chat::{Chat, ChatId, send_msg};
6use crate::constants::Chattype;
7use crate::contact::ContactId;
8use crate::context::Context;
9use crate::events::EventType;
10use crate::headerdef::HeaderDef;
11use crate::log::info;
12use crate::message::{self, Message, MsgId, Viewtype};
13use crate::mimeparser::{MimeMessage, SystemMessage};
14use crate::param::Param;
15use crate::tools::time;
16use anyhow::{Result, ensure};
17use std::time::Duration;
18use tokio::task;
19use tokio::time::sleep;
20
21const RINGING_SECONDS: i64 = 60;
31
32const CALL_ACCEPTED_TIMESTAMP: Param = Param::Arg;
34const CALL_ENDED_TIMESTAMP: Param = Param::Arg4;
35
36#[derive(Debug, Default)]
38pub struct CallInfo {
39 pub place_call_info: String,
41
42 pub accept_call_info: String,
44
45 pub msg: Message,
48}
49
50impl CallInfo {
51 fn is_incoming(&self) -> bool {
52 self.msg.from_id != ContactId::SELF
53 }
54
55 fn is_stale(&self) -> bool {
56 self.remaining_ring_seconds() <= 0
57 }
58
59 fn remaining_ring_seconds(&self) -> i64 {
60 let remaining_seconds = self.msg.timestamp_sent + RINGING_SECONDS - time();
61 remaining_seconds.clamp(0, RINGING_SECONDS)
62 }
63
64 async fn update_text(&self, context: &Context, text: &str) -> Result<()> {
65 context
66 .sql
67 .execute(
68 "UPDATE msgs SET txt=?, txt_normalized=? WHERE id=?",
69 (text, message::normalize_text(text), self.msg.id),
70 )
71 .await?;
72 Ok(())
73 }
74
75 async fn update_text_duration(&self, context: &Context) -> Result<()> {
76 let minutes = self.get_duration_seconds() / 60;
77 let duration = match minutes {
78 0 => "<1 minute".to_string(),
79 1 => "1 minute".to_string(),
80 n => format!("{} minutes", n),
81 };
82
83 if self.is_incoming() {
84 self.update_text(context, &format!("Incoming call\n{duration}"))
85 .await?;
86 } else {
87 self.update_text(context, &format!("Outgoing call\n{duration}"))
88 .await?;
89 }
90 Ok(())
91 }
92
93 async fn mark_as_accepted(&mut self, context: &Context) -> Result<()> {
96 self.msg.param.set_i64(CALL_ACCEPTED_TIMESTAMP, time());
97 self.msg.update_param(context).await?;
98 Ok(())
99 }
100
101 fn is_accepted(&self) -> bool {
102 self.msg.param.exists(CALL_ACCEPTED_TIMESTAMP)
103 }
104
105 async fn mark_as_ended(&mut self, context: &Context) -> Result<()> {
106 self.msg.param.set_i64(CALL_ENDED_TIMESTAMP, time());
107 self.msg.update_param(context).await?;
108 Ok(())
109 }
110
111 fn is_ended(&self) -> bool {
112 self.msg.param.exists(CALL_ENDED_TIMESTAMP)
113 }
114
115 fn get_duration_seconds(&self) -> i64 {
116 if let (Some(start), Some(end)) = (
117 self.msg.param.get_i64(CALL_ACCEPTED_TIMESTAMP),
118 self.msg.param.get_i64(CALL_ENDED_TIMESTAMP),
119 ) {
120 let seconds = end - start;
121 if seconds <= 0 {
122 return 1;
123 }
124 return seconds;
125 }
126 0
127 }
128}
129
130impl Context {
131 pub async fn place_outgoing_call(
133 &self,
134 chat_id: ChatId,
135 place_call_info: String,
136 ) -> Result<MsgId> {
137 let chat = Chat::load_from_db(self, chat_id).await?;
138 ensure!(chat.typ == Chattype::Single && !chat.is_self_talk());
139
140 let mut call = Message {
141 viewtype: Viewtype::Call,
142 text: "Outgoing call".into(),
143 ..Default::default()
144 };
145 call.param.set(Param::WebrtcRoom, &place_call_info);
146 call.id = send_msg(self, chat_id, &mut call).await?;
147
148 let wait = RINGING_SECONDS;
149 task::spawn(Context::emit_end_call_if_unaccepted(
150 self.clone(),
151 wait.try_into()?,
152 call.id,
153 ));
154
155 Ok(call.id)
156 }
157
158 pub async fn accept_incoming_call(
160 &self,
161 call_id: MsgId,
162 accept_call_info: String,
163 ) -> Result<()> {
164 let mut call: CallInfo = self.load_call_by_id(call_id).await?;
165 ensure!(call.is_incoming());
166 if call.is_accepted() || call.is_ended() {
167 info!(self, "Call already accepted/ended");
168 return Ok(());
169 }
170
171 call.mark_as_accepted(self).await?;
172 let chat = Chat::load_from_db(self, call.msg.chat_id).await?;
173 if chat.is_contact_request() {
174 chat.id.accept(self).await?;
175 }
176
177 let mut msg = Message {
179 viewtype: Viewtype::Text,
180 text: "[Call accepted]".into(),
181 ..Default::default()
182 };
183 msg.param.set_cmd(SystemMessage::CallAccepted);
184 msg.hidden = true;
185 msg.param
186 .set(Param::WebrtcAccepted, accept_call_info.to_string());
187 msg.set_quote(self, Some(&call.msg)).await?;
188 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
189 self.emit_event(EventType::IncomingCallAccepted {
190 msg_id: call.msg.id,
191 });
192 self.emit_msgs_changed(call.msg.chat_id, call_id);
193 Ok(())
194 }
195
196 pub async fn end_call(&self, call_id: MsgId) -> Result<()> {
198 let mut call: CallInfo = self.load_call_by_id(call_id).await?;
199 if call.is_ended() {
200 info!(self, "Call already ended");
201 return Ok(());
202 }
203 call.mark_as_ended(self).await?;
204
205 if !call.is_accepted() {
206 if call.is_incoming() {
207 call.update_text(self, "Declined call").await?;
208 } else {
209 call.update_text(self, "Cancelled call").await?;
210 }
211 } else {
212 call.update_text_duration(self).await?;
213 }
214
215 let mut msg = Message {
216 viewtype: Viewtype::Text,
217 text: "[Call ended]".into(),
218 ..Default::default()
219 };
220 msg.param.set_cmd(SystemMessage::CallEnded);
221 msg.hidden = true;
222 msg.set_quote(self, Some(&call.msg)).await?;
223 msg.id = send_msg(self, call.msg.chat_id, &mut msg).await?;
224
225 self.emit_event(EventType::CallEnded {
226 msg_id: call.msg.id,
227 });
228 self.emit_msgs_changed(call.msg.chat_id, call_id);
229 Ok(())
230 }
231
232 async fn emit_end_call_if_unaccepted(
233 context: Context,
234 wait: u64,
235 call_id: MsgId,
236 ) -> Result<()> {
237 sleep(Duration::from_secs(wait)).await;
238 let mut call = context.load_call_by_id(call_id).await?;
239 if !call.is_accepted() && !call.is_ended() {
240 call.mark_as_ended(&context).await?;
241 if call.is_incoming() {
242 call.update_text(&context, "Missed call").await?;
243 } else {
244 call.update_text(&context, "Cancelled call").await?;
245 }
246 context.emit_msgs_changed(call.msg.chat_id, call_id);
247 context.emit_event(EventType::CallEnded {
248 msg_id: call.msg.id,
249 });
250 }
251 Ok(())
252 }
253
254 pub(crate) async fn handle_call_msg(
255 &self,
256 call_id: MsgId,
257 mime_message: &MimeMessage,
258 from_id: ContactId,
259 ) -> Result<()> {
260 if mime_message.is_call() {
261 let call = self.load_call_by_id(call_id).await?;
262 if call.is_incoming() {
263 if call.is_stale() {
264 call.update_text(self, "Missed call").await?;
265 self.emit_incoming_msg(call.msg.chat_id, call_id); } else {
267 call.update_text(self, "Incoming call").await?;
268 self.emit_msgs_changed(call.msg.chat_id, call_id); self.emit_event(EventType::IncomingCall {
270 msg_id: call.msg.id,
271 place_call_info: call.place_call_info.to_string(),
272 });
273 let wait = call.remaining_ring_seconds();
274 task::spawn(Context::emit_end_call_if_unaccepted(
275 self.clone(),
276 wait.try_into()?,
277 call.msg.id,
278 ));
279 }
280 } else {
281 call.update_text(self, "Outgoing call").await?;
282 self.emit_msgs_changed(call.msg.chat_id, call_id);
283 }
284 } else {
285 match mime_message.is_system_message {
286 SystemMessage::CallAccepted => {
287 let mut call = self.load_call_by_id(call_id).await?;
288 if call.is_ended() || call.is_accepted() {
289 info!(self, "CallAccepted received for accepted/ended call");
290 return Ok(());
291 }
292
293 call.mark_as_accepted(self).await?;
294 self.emit_msgs_changed(call.msg.chat_id, call_id);
295 if call.is_incoming() {
296 self.emit_event(EventType::IncomingCallAccepted {
297 msg_id: call.msg.id,
298 });
299 } else {
300 let accept_call_info = mime_message
301 .get_header(HeaderDef::ChatWebrtcAccepted)
302 .unwrap_or_default();
303 self.emit_event(EventType::OutgoingCallAccepted {
304 msg_id: call.msg.id,
305 accept_call_info: accept_call_info.to_string(),
306 });
307 }
308 }
309 SystemMessage::CallEnded => {
310 let mut call = self.load_call_by_id(call_id).await?;
311 if call.is_ended() {
312 info!(self, "CallEnded received for ended call");
314 return Ok(());
315 }
316
317 call.mark_as_ended(self).await?;
318 if !call.is_accepted() {
319 if call.is_incoming() {
320 if from_id == ContactId::SELF {
321 call.update_text(self, "Declined call").await?;
322 } else {
323 call.update_text(self, "Missed call").await?;
324 }
325 } else {
326 if from_id == ContactId::SELF {
328 call.update_text(self, "Cancelled call").await?;
329 } else {
330 call.update_text(self, "Declined call").await?;
331 }
332 }
333 } else {
334 call.update_text_duration(self).await?;
335 }
336
337 self.emit_msgs_changed(call.msg.chat_id, call_id);
338 self.emit_event(EventType::CallEnded {
339 msg_id: call.msg.id,
340 });
341 }
342 _ => {}
343 }
344 }
345 Ok(())
346 }
347
348 async fn load_call_by_id(&self, call_id: MsgId) -> Result<CallInfo> {
349 let call = Message::load_from_db(self, call_id).await?;
350 self.load_call_by_message(call)
351 }
352
353 fn load_call_by_message(&self, call: Message) -> Result<CallInfo> {
354 ensure!(call.viewtype == Viewtype::Call);
355
356 Ok(CallInfo {
357 place_call_info: call
358 .param
359 .get(Param::WebrtcRoom)
360 .unwrap_or_default()
361 .to_string(),
362 accept_call_info: call
363 .param
364 .get(Param::WebrtcAccepted)
365 .unwrap_or_default()
366 .to_string(),
367 msg: call,
368 })
369 }
370}
371
372#[cfg(test)]
373mod calls_tests;