1use anyhow::{Context as _, Result, anyhow, bail};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{RwLock, oneshot};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::EventType;
40use crate::chat::send_msg;
41use crate::config::Config;
42use crate::context::Context;
43use crate::log::{info, warn};
44use crate::message::{Message, MsgId, Viewtype};
45use crate::mimeparser::SystemMessage;
46
47const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51#[derive(Debug)]
53pub struct Iroh {
54 pub(crate) router: iroh::protocol::Router,
56
57 pub(crate) gossip: Gossip,
59
60 pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63 pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66 pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73 pub(crate) async fn network_change(&self) {
75 self.router.endpoint().network_change().await
76 }
77
78 pub(crate) async fn close(self) -> Result<()> {
80 self.router.shutdown().await.context("Closing iroh failed")
81 }
82
83 async fn join_and_subscribe_gossip(
89 &self,
90 ctx: &Context,
91 msg_id: MsgId,
92 ) -> Result<Option<oneshot::Receiver<()>>> {
93 let topic = get_iroh_topic_for_msg(ctx, msg_id)
94 .await?
95 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97 let mut iroh_channels = self.iroh_channels.write().await;
102
103 if iroh_channels.contains_key(&topic) {
104 return Ok(None);
105 }
106
107 let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108 let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110 info!(
111 ctx,
112 "IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
113 );
114
115 for node_addr in &peers {
117 if !node_addr.is_empty() {
118 self.router.endpoint().add_node_addr(node_addr.clone())?;
119 }
120 }
121
122 let (join_tx, join_rx) = oneshot::channel();
123
124 let (gossip_sender, gossip_receiver) = self
125 .gossip
126 .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127 .split();
128
129 let ctx = ctx.clone();
130 let subscribe_loop = tokio::spawn(async move {
131 if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132 warn!(ctx, "subscribe_loop failed: {e}")
133 }
134 });
135
136 iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138 Ok(Some(join_rx))
139 }
140
141 pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
143 if self.iroh_channels.read().await.get(&topic).is_some() {
144 self.router.endpoint().add_node_addr(peer.clone())?;
145 self.gossip.subscribe(topic, vec![peer.node_id])?;
146 }
147 Ok(())
148 }
149
150 pub async fn send_webxdc_realtime_data(
152 &self,
153 ctx: &Context,
154 msg_id: MsgId,
155 mut data: Vec<u8>,
156 ) -> Result<()> {
157 let topic = get_iroh_topic_for_msg(ctx, msg_id)
158 .await?
159 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
160 self.join_and_subscribe_gossip(ctx, msg_id).await?;
161
162 let seq_num = self.get_and_incr(&topic);
163
164 let mut iroh_channels = self.iroh_channels.write().await;
165 let state = iroh_channels
166 .get_mut(&topic)
167 .context("Just created state does not exist")?;
168 data.extend(seq_num.to_le_bytes());
169 data.extend(self.public_key.as_bytes());
170
171 state.sender.broadcast(data.into()).await?;
172
173 if env::var("REALTIME_DEBUG").is_ok() {
174 info!(ctx, "Sent realtime data");
175 }
176
177 Ok(())
178 }
179
180 fn get_and_incr(&self, topic: &TopicId) -> i32 {
181 let mut sequence_numbers = self.sequence_numbers.lock();
182 let entry = sequence_numbers.entry(*topic).or_default();
183 *entry = entry.wrapping_add(1);
184 *entry
185 }
186
187 pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
189 let mut addr = self.router.endpoint().node_addr().await?;
190 addr.direct_addresses = BTreeSet::new();
191 Ok(addr)
192 }
193
194 pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
196 if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
197 channel.subscribe_loop.abort();
205 let _ = channel.subscribe_loop.await;
206 }
207 Ok(())
208 }
209}
210
211#[derive(Debug)]
213pub(crate) struct ChannelState {
214 subscribe_loop: JoinHandle<()>,
216
217 sender: iroh_gossip::net::GossipSender,
218}
219
220impl ChannelState {
221 fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
222 Self {
223 subscribe_loop,
224 sender,
225 }
226 }
227}
228
229impl Context {
230 async fn init_peer_channels(&self) -> Result<Iroh> {
232 info!(self, "Initializing peer channels.");
233 let secret_key = SecretKey::generate(rand::rngs::OsRng);
234 let public_key = secret_key.public();
235
236 let relay_mode = if let Some(relay_url) = self
237 .metadata
238 .read()
239 .await
240 .as_ref()
241 .and_then(|conf| conf.iroh_relay.clone())
242 {
243 RelayMode::Custom(RelayUrl::from(relay_url).into())
244 } else {
245 RelayMode::Default
248 };
249
250 let endpoint = Endpoint::builder()
251 .tls_x509() .secret_key(secret_key)
253 .alpns(vec![GOSSIP_ALPN.to_vec()])
254 .relay_mode(relay_mode)
255 .bind()
256 .await?;
257
258 let gossip = Gossip::builder()
264 .max_message_size(128 * 1024)
265 .spawn(endpoint.clone())
266 .await?;
267
268 let router = iroh::protocol::Router::builder(endpoint)
269 .accept(GOSSIP_ALPN, gossip.clone())
270 .spawn();
271
272 Ok(Iroh {
273 router,
274 gossip,
275 sequence_numbers: Mutex::new(HashMap::new()),
276 iroh_channels: RwLock::new(HashMap::new()),
277 public_key,
278 })
279 }
280
281 pub async fn get_or_try_init_peer_channel(
283 &self,
284 ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
285 if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
286 bail!("Attempt to get Iroh when realtime is disabled");
287 }
288
289 if let Ok(lock) = tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
290 self.iroh.read().await,
291 |opt_iroh| opt_iroh.as_ref(),
292 ) {
293 return Ok(lock);
294 }
295
296 let lock = self.iroh.write().await;
297 match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
298 lock,
299 |opt_iroh| opt_iroh.as_ref(),
300 ) {
301 Ok(lock) => Ok(lock),
302 Err(mut lock) => {
303 let iroh = self.init_peer_channels().await?;
304 *lock = Some(iroh);
305 tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
306 lock,
307 |opt_iroh| opt_iroh.as_ref(),
308 )
309 .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
310 }
311 }
312 }
313
314 pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
315 if let Some(iroh) = &*self.iroh.read().await {
316 info!(
317 self,
318 "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
319 );
320 iroh.maybe_add_gossip_peer(topic, peer).await?;
321 }
322 Ok(())
323 }
324}
325
326pub(crate) async fn iroh_add_peer_for_topic(
328 ctx: &Context,
329 msg_id: MsgId,
330 topic: TopicId,
331 peer: NodeId,
332 relay_server: Option<&str>,
333) -> Result<()> {
334 ctx.sql
335 .execute(
336 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
337 (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
338 )
339 .await?;
340 Ok(())
341}
342
343pub async fn add_gossip_peer_from_header(
345 context: &Context,
346 instance_id: MsgId,
347 node_addr: &str,
348) -> Result<()> {
349 if !context
350 .get_config_bool(Config::WebxdcRealtimeEnabled)
351 .await?
352 {
353 return Ok(());
354 }
355
356 let node_addr =
357 serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
358
359 info!(
360 context,
361 "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
362 );
363
364 context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
365 msg_id: instance_id,
366 });
367
368 let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
369 warn!(
370 context,
371 "Could not add iroh peer because {instance_id} has no topic."
372 );
373 return Ok(());
374 };
375
376 let node_id = node_addr.node_id;
377 let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
378 iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
379
380 context.maybe_add_gossip_peer(topic, node_addr).await?;
381 Ok(())
382}
383
384pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
386 ctx.sql
387 .execute(
388 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
389 (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
390 )
391 .await?;
392 Ok(())
393}
394
395async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
397 ctx.sql
398 .query_map(
399 "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
400 (msg_id, PUBLIC_KEY_STUB),
401 |row| {
402 let key: Vec<u8> = row.get(0)?;
403 let server: Option<String> = row.get(1)?;
404 Ok((key, server))
405 },
406 |g| {
407 g.map(|data| {
408 let (key, server) = data?;
409 let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
410 let id = NodeId::from_bytes(&key.try_into()
411 .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
412 Ok::<_, anyhow::Error>(NodeAddr::from_parts(
413 id, server, vec![]
414 ))
415 })
416 .collect::<std::result::Result<Vec<_>, _>>()
417 },
418 )
419 .await
420}
421
422pub(crate) async fn get_iroh_topic_for_msg(
424 ctx: &Context,
425 msg_id: MsgId,
426) -> Result<Option<TopicId>> {
427 if let Some(bytes) = ctx
428 .sql
429 .query_get_value::<Vec<u8>>(
430 "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
431 (msg_id,),
432 )
433 .await
434 .context("Couldn't restore topic from db")?
435 {
436 let topic_id = TopicId::from_bytes(
437 bytes
438 .try_into()
439 .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
440 );
441 Ok(Some(topic_id))
442 } else {
443 Ok(None)
444 }
445}
446
447pub async fn send_webxdc_realtime_advertisement(
450 ctx: &Context,
451 msg_id: MsgId,
452) -> Result<Option<oneshot::Receiver<()>>> {
453 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
454 return Ok(None);
455 }
456
457 let iroh = ctx.get_or_try_init_peer_channel().await?;
458 let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
459
460 let webxdc = Message::load_from_db(ctx, msg_id).await?;
461 let mut msg = Message::new(Viewtype::Text);
462 msg.hidden = true;
463 msg.param.set_cmd(SystemMessage::IrohNodeAddr);
464 msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
465 send_msg(ctx, webxdc.chat_id, &mut msg).await?;
466 info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
467 Ok(conn)
468}
469
470pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
472 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
473 return Ok(());
474 }
475
476 let iroh = ctx.get_or_try_init_peer_channel().await?;
477 iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
478 Ok(())
479}
480
481pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
483 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
484 return Ok(());
485 }
486 let topic = get_iroh_topic_for_msg(ctx, msg_id)
487 .await?
488 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
489 let iroh = ctx.get_or_try_init_peer_channel().await?;
490 iroh.leave_realtime(topic).await?;
491 info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
492
493 Ok(())
494}
495
496fn create_random_topic() -> TopicId {
498 TopicId::from_bytes(rand::random())
499}
500
501pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
504 let topic = create_random_topic();
505 insert_topic_stub(ctx, msg_id, topic).await?;
506 let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
507 Ok(topic_string)
508}
509
510async fn subscribe_loop(
511 context: &Context,
512 mut stream: iroh_gossip::net::GossipReceiver,
513 topic: TopicId,
514 msg_id: MsgId,
515 join_tx: oneshot::Sender<()>,
516) -> Result<()> {
517 let mut join_tx = Some(join_tx);
518
519 while let Some(event) = stream.try_next().await? {
520 match event {
521 Event::Gossip(event) => match event {
522 GossipEvent::Joined(nodes) => {
523 if let Some(join_tx) = join_tx.take() {
524 join_tx.send(()).ok();
527 }
528
529 for node in nodes {
530 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
531 }
532 }
533 GossipEvent::NeighborUp(node) => {
534 info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
535 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
536 }
537 GossipEvent::NeighborDown(_node) => {}
538 GossipEvent::Received(message) => {
539 info!(context, "IROH_REALTIME: Received realtime data");
540 context.emit_event(EventType::WebxdcRealtimeData {
541 msg_id,
542 data: message
543 .content
544 .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
545 .context("too few bytes in iroh message")?
546 .into(),
547 });
548 }
549 },
550 Event::Lagged => {
551 warn!(context, "Gossip lost some messages");
552 }
553 };
554 }
555 Ok(())
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561 use crate::{
562 EventType,
563 chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg},
564 message::{Message, Viewtype},
565 test_utils::{TestContext, TestContextManager},
566 };
567
568 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
569 async fn test_can_communicate() {
570 let mut tcm = TestContextManager::new();
571 let alice = &mut tcm.alice().await;
572 let bob = &mut tcm.bob().await;
573
574 let alice_chat = alice.create_chat(bob).await;
576 let mut instance = Message::new(Viewtype::File);
577 instance
578 .set_file_from_bytes(
579 alice,
580 "minimal.xdc",
581 include_bytes!("../test-data/webxdc/minimal.xdc"),
582 None,
583 )
584 .unwrap();
585
586 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
587 let alice_webxdc = alice.get_last_msg().await;
588 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
589
590 let webxdc = alice.pop_sent_msg().await;
591 let bob_webxdc = bob.recv_msg(&webxdc).await;
592 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
593
594 bob_webxdc.chat_id.accept(bob).await.unwrap();
595
596 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
598 .await
599 .unwrap();
600
601 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
602 loop {
603 let event = bob.evtracker.recv().await.unwrap();
604 if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
605 assert!(msg_id == alice_webxdc.id);
606 break;
607 }
608 }
609
610 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
612 .await
613 .unwrap()
614 .into_iter()
615 .map(|addr| addr.node_id)
616 .collect::<Vec<_>>();
617
618 assert_eq!(
619 members,
620 vec![
621 alice
622 .get_or_try_init_peer_channel()
623 .await
624 .unwrap()
625 .get_node_addr()
626 .await
627 .unwrap()
628 .node_id
629 ]
630 );
631
632 bob.get_or_try_init_peer_channel()
633 .await
634 .unwrap()
635 .join_and_subscribe_gossip(bob, bob_webxdc.id)
636 .await
637 .unwrap()
638 .unwrap()
639 .await
640 .unwrap();
641
642 alice
644 .get_or_try_init_peer_channel()
645 .await
646 .unwrap()
647 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
648 .await
649 .unwrap();
650
651 loop {
652 let event = bob.evtracker.recv().await.unwrap();
653 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
654 if data == "alice -> bob".as_bytes() {
655 break;
656 } else {
657 panic!(
658 "Unexpected status update: {}",
659 String::from_utf8_lossy(&data)
660 );
661 }
662 }
663 }
664 bob.get_or_try_init_peer_channel()
666 .await
667 .unwrap()
668 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
669 .await
670 .unwrap();
671
672 loop {
673 let event = alice.evtracker.recv().await.unwrap();
674 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
675 if data == "bob -> alice".as_bytes() {
676 break;
677 } else {
678 panic!(
679 "Unexpected status update: {}",
680 String::from_utf8_lossy(&data)
681 );
682 }
683 }
684 }
685
686 let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
688 .await
689 .unwrap()
690 .into_iter()
691 .map(|addr| addr.node_id)
692 .collect::<Vec<_>>();
693
694 assert_eq!(
695 members,
696 vec![
697 bob.get_or_try_init_peer_channel()
698 .await
699 .unwrap()
700 .get_node_addr()
701 .await
702 .unwrap()
703 .node_id
704 ]
705 );
706
707 bob.get_or_try_init_peer_channel()
708 .await
709 .unwrap()
710 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
711 .await
712 .unwrap();
713
714 loop {
715 let event = alice.evtracker.recv().await.unwrap();
716 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
717 if data == "bob -> alice 2".as_bytes() {
718 break;
719 } else {
720 panic!(
721 "Unexpected status update: {}",
722 String::from_utf8_lossy(&data)
723 );
724 }
725 }
726 }
727
728 assert!(alice.iroh.read().await.is_some());
731 alice.stop_io().await;
732 assert!(alice.iroh.read().await.is_none());
733 }
734
735 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
736 async fn test_can_reconnect() {
737 let mut tcm = TestContextManager::new();
738 let alice = &mut tcm.alice().await;
739 let bob = &mut tcm.bob().await;
740
741 assert!(
742 alice
743 .get_config_bool(Config::WebxdcRealtimeEnabled)
744 .await
745 .unwrap()
746 );
747 let alice_chat = alice.create_chat(bob).await;
749 let mut instance = Message::new(Viewtype::File);
750 instance
751 .set_file_from_bytes(
752 alice,
753 "minimal.xdc",
754 include_bytes!("../test-data/webxdc/minimal.xdc"),
755 None,
756 )
757 .unwrap();
758
759 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
760 let alice_webxdc = alice.get_last_msg().await;
761 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
762
763 let webxdc = alice.pop_sent_msg().await;
764 let bob_webxdc = bob.recv_msg(&webxdc).await;
765 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
766
767 bob_webxdc.chat_id.accept(bob).await.unwrap();
768
769 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
771 .await
772 .unwrap();
773
774 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
775
776 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
778 .await
779 .unwrap()
780 .into_iter()
781 .map(|addr| addr.node_id)
782 .collect::<Vec<_>>();
783
784 assert_eq!(
785 members,
786 vec![
787 alice
788 .get_or_try_init_peer_channel()
789 .await
790 .unwrap()
791 .get_node_addr()
792 .await
793 .unwrap()
794 .node_id
795 ]
796 );
797
798 bob.get_or_try_init_peer_channel()
799 .await
800 .unwrap()
801 .join_and_subscribe_gossip(bob, bob_webxdc.id)
802 .await
803 .unwrap()
804 .unwrap()
805 .await
806 .unwrap();
807
808 alice
810 .get_or_try_init_peer_channel()
811 .await
812 .unwrap()
813 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
814 .await
815 .unwrap();
816
817 loop {
818 let event = bob.evtracker.recv().await.unwrap();
819 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
820 if data == "alice -> bob".as_bytes() {
821 break;
822 } else {
823 panic!(
824 "Unexpected status update: {}",
825 String::from_utf8_lossy(&data)
826 );
827 }
828 }
829 }
830
831 let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
832 .await
833 .unwrap()
834 .unwrap();
835 let bob_sequence_number = bob
836 .iroh
837 .read()
838 .await
839 .as_ref()
840 .unwrap()
841 .sequence_numbers
842 .lock()
843 .get(&bob_topic)
844 .copied();
845 leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
846 let bob_sequence_number_after = bob
847 .iroh
848 .read()
849 .await
850 .as_ref()
851 .unwrap()
852 .sequence_numbers
853 .lock()
854 .get(&bob_topic)
855 .copied();
856 assert_eq!(bob_sequence_number, bob_sequence_number_after);
858
859 bob.get_or_try_init_peer_channel()
860 .await
861 .unwrap()
862 .join_and_subscribe_gossip(bob, bob_webxdc.id)
863 .await
864 .unwrap()
865 .unwrap()
866 .await
867 .unwrap();
868
869 bob.get_or_try_init_peer_channel()
870 .await
871 .unwrap()
872 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
873 .await
874 .unwrap();
875
876 loop {
877 let event = alice.evtracker.recv().await.unwrap();
878 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
879 if data == "bob -> alice".as_bytes() {
880 break;
881 } else {
882 panic!(
883 "Unexpected status update: {}",
884 String::from_utf8_lossy(&data)
885 );
886 }
887 }
888 }
889
890 assert_eq!(
894 alice
895 .iroh
896 .read()
897 .await
898 .as_ref()
899 .unwrap()
900 .iroh_channels
901 .read()
902 .await
903 .len(),
904 1
905 );
906 leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
907 let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
908 .await
909 .unwrap()
910 .unwrap();
911 assert!(
912 alice
913 .iroh
914 .read()
915 .await
916 .as_ref()
917 .unwrap()
918 .iroh_channels
919 .read()
920 .await
921 .get(&topic)
922 .is_none()
923 );
924 }
925
926 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
927 async fn test_parallel_connect() {
928 let mut tcm = TestContextManager::new();
929 let alice = &mut tcm.alice().await;
930 let bob = &mut tcm.bob().await;
931
932 let chat = alice.create_chat(bob).await.id;
933
934 let mut instance = Message::new(Viewtype::File);
935 instance
936 .set_file_from_bytes(
937 alice,
938 "minimal.xdc",
939 include_bytes!("../test-data/webxdc/minimal.xdc"),
940 None,
941 )
942 .unwrap();
943 connect_alice_bob(alice, chat, &mut instance, bob).await
944 }
945
946 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
947 async fn test_webxdc_resend() {
948 let mut tcm = TestContextManager::new();
949 let alice = &mut tcm.alice().await;
950 let bob = &mut tcm.bob().await;
951 let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat")
952 .await
953 .unwrap();
954
955 let mut instance = Message::new(Viewtype::File);
957 instance
958 .set_file_from_bytes(
959 alice,
960 "minimal.xdc",
961 include_bytes!("../test-data/webxdc/minimal.xdc"),
962 None,
963 )
964 .unwrap();
965
966 add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
967 .await
968 .unwrap();
969
970 connect_alice_bob(alice, group, &mut instance, bob).await;
971
972 let fiona = &mut tcm.fiona().await;
974
975 add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
976 .await
977 .unwrap();
978
979 resend_msgs(alice, &[instance.id]).await.unwrap();
980 let msg = alice.pop_sent_msg().await;
981 let fiona_instance = fiona.recv_msg(&msg).await;
982 fiona_instance.chat_id.accept(fiona).await.unwrap();
983 assert!(fiona.ctx.iroh.read().await.is_none());
984
985 let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
986 .await
987 .unwrap()
988 .unwrap();
989 let fiona_advert = fiona.pop_sent_msg().await;
990 alice.recv_msg_trash(&fiona_advert).await;
991
992 fiona_connect_future.await.unwrap();
993
994 let realtime_send_loop = async {
995 loop {
998 send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
999 .await
1000 .unwrap();
1001 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1002 }
1003 };
1004
1005 let realtime_receive_loop = async {
1006 loop {
1007 let event = fiona.evtracker.recv().await.unwrap();
1008 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1009 if data == b"alice -> bob & fiona" {
1010 break;
1011 } else {
1012 panic!(
1013 "Unexpected status update: {}",
1014 String::from_utf8_lossy(&data)
1015 );
1016 }
1017 }
1018 }
1019 };
1020 tokio::select!(
1021 _ = realtime_send_loop => {
1022 panic!("Send loop should never finish");
1023 },
1024 _ = realtime_receive_loop => {
1025 return;
1026 }
1027 );
1028 }
1029
1030 async fn connect_alice_bob(
1031 alice: &mut TestContext,
1032 alice_chat_id: ChatId,
1033 instance: &mut Message,
1034 bob: &mut TestContext,
1035 ) {
1036 send_msg(alice, alice_chat_id, instance).await.unwrap();
1037 let alice_webxdc = alice.get_last_msg().await;
1038
1039 let webxdc = alice.pop_sent_msg().await;
1040 let bob_webxdc = bob.recv_msg(&webxdc).await;
1041 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
1042
1043 bob_webxdc.chat_id.accept(bob).await.unwrap();
1044
1045 eprintln!("Sending advertisements");
1046 let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
1048 .await
1049 .unwrap()
1050 .unwrap();
1051 let alice_advertisement = alice.pop_sent_msg().await;
1052
1053 let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
1054 .await
1055 .unwrap()
1056 .unwrap();
1057 let bob_advertisement = bob.pop_sent_msg().await;
1058
1059 eprintln!("Receiving advertisements");
1060 bob.recv_msg_trash(&alice_advertisement).await;
1061 alice.recv_msg_trash(&bob_advertisement).await;
1062
1063 eprintln!("Alice and Bob wait for connection");
1064 alice_advertisement_future.await.unwrap();
1065 bob_advertisement_future.await.unwrap();
1066
1067 eprintln!("Sending ephemeral message");
1069 send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
1070 .await
1071 .unwrap();
1072
1073 eprintln!("Waiting for ephemeral message");
1074 loop {
1075 let event = bob.evtracker.recv().await.unwrap();
1076 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1077 if data == b"alice -> bob" {
1078 break;
1079 } else {
1080 panic!(
1081 "Unexpected status update: {}",
1082 String::from_utf8_lossy(&data)
1083 );
1084 }
1085 }
1086 }
1087 }
1088
1089 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1090 async fn test_peer_channels_disabled() {
1091 let mut tcm = TestContextManager::new();
1092 let alice = &mut tcm.alice().await;
1093
1094 alice
1095 .set_config_bool(Config::WebxdcRealtimeEnabled, false)
1096 .await
1097 .unwrap();
1098
1099 send_webxdc_realtime_advertisement(alice, MsgId::new(1))
1101 .await
1102 .unwrap();
1103
1104 assert!(alice.ctx.iroh.read().await.is_none());
1105
1106 send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1108 .await
1109 .unwrap();
1110
1111 assert!(alice.ctx.iroh.read().await.is_none());
1112
1113 leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1115
1116 assert!(alice.ctx.iroh.read().await.is_none());
1117
1118 assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1121 }
1122}