1use anyhow::{Context as _, Result, anyhow, bail};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{RwLock, oneshot};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::EventType;
40use crate::chat::send_msg;
41use crate::config::Config;
42use crate::context::Context;
43use crate::log::{info, warn};
44use crate::message::{Message, MsgId, Viewtype};
45use crate::mimeparser::SystemMessage;
46
47const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51#[derive(Debug)]
53pub struct Iroh {
54 pub(crate) router: iroh::protocol::Router,
56
57 pub(crate) gossip: Gossip,
59
60 pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63 pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66 pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73 pub(crate) async fn network_change(&self) {
75 self.router.endpoint().network_change().await
76 }
77
78 pub(crate) async fn close(self) -> Result<()> {
80 self.router.shutdown().await.context("Closing iroh failed")
81 }
82
83 async fn join_and_subscribe_gossip(
89 &self,
90 ctx: &Context,
91 msg_id: MsgId,
92 ) -> Result<Option<oneshot::Receiver<()>>> {
93 let topic = get_iroh_topic_for_msg(ctx, msg_id)
94 .await?
95 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97 let mut iroh_channels = self.iroh_channels.write().await;
102
103 if iroh_channels.contains_key(&topic) {
104 return Ok(None);
105 }
106
107 let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108 let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110 info!(
111 ctx,
112 "IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
113 );
114
115 for node_addr in &peers {
117 if !node_addr.is_empty() {
118 self.router.endpoint().add_node_addr(node_addr.clone())?;
119 }
120 }
121
122 let (join_tx, join_rx) = oneshot::channel();
123
124 let (gossip_sender, gossip_receiver) = self
125 .gossip
126 .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127 .split();
128
129 let ctx = ctx.clone();
130 let subscribe_loop = tokio::spawn(async move {
131 if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132 warn!(ctx, "subscribe_loop failed: {e}")
133 }
134 });
135
136 iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138 Ok(Some(join_rx))
139 }
140
141 pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
143 if self.iroh_channels.read().await.get(&topic).is_some() {
144 self.router.endpoint().add_node_addr(peer.clone())?;
145 self.gossip.subscribe(topic, vec![peer.node_id])?;
146 }
147 Ok(())
148 }
149
150 pub async fn send_webxdc_realtime_data(
152 &self,
153 ctx: &Context,
154 msg_id: MsgId,
155 mut data: Vec<u8>,
156 ) -> Result<()> {
157 let topic = get_iroh_topic_for_msg(ctx, msg_id)
158 .await?
159 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
160 self.join_and_subscribe_gossip(ctx, msg_id).await?;
161
162 let seq_num = self.get_and_incr(&topic);
163
164 let mut iroh_channels = self.iroh_channels.write().await;
165 let state = iroh_channels
166 .get_mut(&topic)
167 .context("Just created state does not exist")?;
168 data.extend(seq_num.to_le_bytes());
169 data.extend(self.public_key.as_bytes());
170
171 state.sender.broadcast(data.into()).await?;
172
173 if env::var("REALTIME_DEBUG").is_ok() {
174 info!(ctx, "Sent realtime data");
175 }
176
177 Ok(())
178 }
179
180 fn get_and_incr(&self, topic: &TopicId) -> i32 {
181 let mut sequence_numbers = self.sequence_numbers.lock();
182 let entry = sequence_numbers.entry(*topic).or_default();
183 *entry = entry.wrapping_add(1);
184 *entry
185 }
186
187 pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
193 let mut addr = self.router.endpoint().node_addr().await?;
194 addr.direct_addresses = BTreeSet::new();
195 debug_assert!(addr.relay_url().is_some());
196 Ok(addr)
197 }
198
199 pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
201 if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
202 channel.subscribe_loop.abort();
210 let _ = channel.subscribe_loop.await;
211 }
212 Ok(())
213 }
214}
215
216#[derive(Debug)]
218pub(crate) struct ChannelState {
219 subscribe_loop: JoinHandle<()>,
221
222 sender: iroh_gossip::net::GossipSender,
223}
224
225impl ChannelState {
226 fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
227 Self {
228 subscribe_loop,
229 sender,
230 }
231 }
232}
233
234impl Context {
235 async fn init_peer_channels(&self) -> Result<Iroh> {
237 info!(self, "Initializing peer channels.");
238 let secret_key = SecretKey::generate(rand::rngs::OsRng);
239 let public_key = secret_key.public();
240
241 let relay_mode = if let Some(relay_url) = self
242 .metadata
243 .read()
244 .await
245 .as_ref()
246 .and_then(|conf| conf.iroh_relay.clone())
247 {
248 RelayMode::Custom(RelayUrl::from(relay_url).into())
249 } else {
250 RelayMode::Default
253 };
254
255 let endpoint = Endpoint::builder()
256 .tls_x509() .secret_key(secret_key)
258 .alpns(vec![GOSSIP_ALPN.to_vec()])
259 .relay_mode(relay_mode)
260 .bind()
261 .await?;
262
263 let gossip = Gossip::builder()
269 .max_message_size(128 * 1024)
270 .spawn(endpoint.clone())
271 .await?;
272
273 let router = iroh::protocol::Router::builder(endpoint)
274 .accept(GOSSIP_ALPN, gossip.clone())
275 .spawn();
276
277 Ok(Iroh {
278 router,
279 gossip,
280 sequence_numbers: Mutex::new(HashMap::new()),
281 iroh_channels: RwLock::new(HashMap::new()),
282 public_key,
283 })
284 }
285
286 pub async fn get_peer_channels(&self) -> Option<tokio::sync::RwLockReadGuard<'_, Iroh>> {
288 tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
289 self.iroh.read().await,
290 |opt_iroh| opt_iroh.as_ref(),
291 )
292 .ok()
293 }
294
295 pub async fn get_or_try_init_peer_channel(
297 &self,
298 ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
299 if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
300 bail!("Attempt to initialize Iroh when realtime is disabled");
301 }
302
303 if let Some(lock) = self.get_peer_channels().await {
304 return Ok(lock);
305 }
306
307 let lock = self.iroh.write().await;
308 match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
309 lock,
310 |opt_iroh| opt_iroh.as_ref(),
311 ) {
312 Ok(lock) => Ok(lock),
313 Err(mut lock) => {
314 let iroh = self.init_peer_channels().await?;
315 *lock = Some(iroh);
316 tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
317 lock,
318 |opt_iroh| opt_iroh.as_ref(),
319 )
320 .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
321 }
322 }
323 }
324
325 pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
326 if let Some(iroh) = &*self.iroh.read().await {
327 info!(
328 self,
329 "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
330 );
331 iroh.maybe_add_gossip_peer(topic, peer).await?;
332 }
333 Ok(())
334 }
335}
336
337pub(crate) async fn iroh_add_peer_for_topic(
339 ctx: &Context,
340 msg_id: MsgId,
341 topic: TopicId,
342 peer: NodeId,
343 relay_server: Option<&str>,
344) -> Result<()> {
345 ctx.sql
346 .execute(
347 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
348 (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
349 )
350 .await?;
351 Ok(())
352}
353
354pub async fn add_gossip_peer_from_header(
356 context: &Context,
357 instance_id: MsgId,
358 node_addr: &str,
359) -> Result<()> {
360 if !context
361 .get_config_bool(Config::WebxdcRealtimeEnabled)
362 .await?
363 {
364 return Ok(());
365 }
366
367 let node_addr =
368 serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
369
370 info!(
371 context,
372 "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
373 );
374
375 context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
376 msg_id: instance_id,
377 });
378
379 let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
380 warn!(
381 context,
382 "Could not add iroh peer because {instance_id} has no topic."
383 );
384 return Ok(());
385 };
386
387 let node_id = node_addr.node_id;
388 let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
389 iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
390
391 context.maybe_add_gossip_peer(topic, node_addr).await?;
392 Ok(())
393}
394
395pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
397 ctx.sql
398 .execute(
399 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
400 (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
401 )
402 .await?;
403 Ok(())
404}
405
406async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
408 ctx.sql
409 .query_map(
410 "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
411 (msg_id, PUBLIC_KEY_STUB),
412 |row| {
413 let key: Vec<u8> = row.get(0)?;
414 let server: Option<String> = row.get(1)?;
415 Ok((key, server))
416 },
417 |g| {
418 g.map(|data| {
419 let (key, server) = data?;
420 let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
421 let id = NodeId::from_bytes(&key.try_into()
422 .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
423 Ok::<_, anyhow::Error>(NodeAddr::from_parts(
424 id, server, vec![]
425 ))
426 })
427 .collect::<std::result::Result<Vec<_>, _>>()
428 },
429 )
430 .await
431}
432
433pub(crate) async fn get_iroh_topic_for_msg(
435 ctx: &Context,
436 msg_id: MsgId,
437) -> Result<Option<TopicId>> {
438 if let Some(bytes) = ctx
439 .sql
440 .query_get_value::<Vec<u8>>(
441 "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
442 (msg_id,),
443 )
444 .await
445 .context("Couldn't restore topic from db")?
446 {
447 let topic_id = TopicId::from_bytes(
448 bytes
449 .try_into()
450 .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
451 );
452 Ok(Some(topic_id))
453 } else {
454 Ok(None)
455 }
456}
457
458pub async fn send_webxdc_realtime_advertisement(
461 ctx: &Context,
462 msg_id: MsgId,
463) -> Result<Option<oneshot::Receiver<()>>> {
464 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
465 return Ok(None);
466 }
467
468 let iroh = ctx.get_or_try_init_peer_channel().await?;
469 let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
470
471 let webxdc = Message::load_from_db(ctx, msg_id).await?;
472 let mut msg = Message::new(Viewtype::Text);
473 msg.hidden = true;
474 msg.param.set_cmd(SystemMessage::IrohNodeAddr);
475 msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
476 send_msg(ctx, webxdc.chat_id, &mut msg).await?;
477 info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
478 Ok(conn)
479}
480
481pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
483 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
484 return Ok(());
485 }
486
487 let iroh = ctx.get_or_try_init_peer_channel().await?;
488 iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
489 Ok(())
490}
491
492pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
498 let Some(iroh) = ctx.get_peer_channels().await else {
499 return Ok(());
500 };
501 let Some(topic) = get_iroh_topic_for_msg(ctx, msg_id).await? else {
502 return Ok(());
503 };
504 iroh.leave_realtime(topic).await?;
505 info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
506
507 Ok(())
508}
509
510fn create_random_topic() -> TopicId {
512 TopicId::from_bytes(rand::random())
513}
514
515pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
518 let topic = create_random_topic();
519 insert_topic_stub(ctx, msg_id, topic).await?;
520 let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
521 Ok(topic_string)
522}
523
524async fn subscribe_loop(
525 context: &Context,
526 mut stream: iroh_gossip::net::GossipReceiver,
527 topic: TopicId,
528 msg_id: MsgId,
529 join_tx: oneshot::Sender<()>,
530) -> Result<()> {
531 let mut join_tx = Some(join_tx);
532
533 while let Some(event) = stream.try_next().await? {
534 match event {
535 Event::Gossip(event) => match event {
536 GossipEvent::Joined(nodes) => {
537 if let Some(join_tx) = join_tx.take() {
538 join_tx.send(()).ok();
541 }
542
543 for node in nodes {
544 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
545 }
546 }
547 GossipEvent::NeighborUp(node) => {
548 info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
549 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
550 }
551 GossipEvent::NeighborDown(_node) => {}
552 GossipEvent::Received(message) => {
553 info!(context, "IROH_REALTIME: Received realtime data");
554 context.emit_event(EventType::WebxdcRealtimeData {
555 msg_id,
556 data: message
557 .content
558 .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
559 .context("too few bytes in iroh message")?
560 .into(),
561 });
562 }
563 },
564 Event::Lagged => {
565 warn!(context, "Gossip lost some messages");
566 }
567 };
568 }
569 Ok(())
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use crate::{
576 EventType,
577 chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg},
578 message::{Message, Viewtype},
579 test_utils::{TestContext, TestContextManager},
580 };
581
582 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
583 async fn test_can_communicate() {
584 let mut tcm = TestContextManager::new();
585 let alice = &mut tcm.alice().await;
586 let bob = &mut tcm.bob().await;
587
588 let alice_chat = alice.create_chat(bob).await;
590 let mut instance = Message::new(Viewtype::File);
591 instance
592 .set_file_from_bytes(
593 alice,
594 "minimal.xdc",
595 include_bytes!("../test-data/webxdc/minimal.xdc"),
596 None,
597 )
598 .unwrap();
599
600 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
601 let alice_webxdc = alice.get_last_msg().await;
602 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
603
604 let webxdc = alice.pop_sent_msg().await;
605 let bob_webxdc = bob.recv_msg(&webxdc).await;
606 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
607
608 bob_webxdc.chat_id.accept(bob).await.unwrap();
609
610 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
612 .await
613 .unwrap();
614
615 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
616 loop {
617 let event = bob.evtracker.recv().await.unwrap();
618 if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
619 assert!(msg_id == alice_webxdc.id);
620 break;
621 }
622 }
623
624 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
626 .await
627 .unwrap()
628 .into_iter()
629 .map(|addr| addr.node_id)
630 .collect::<Vec<_>>();
631
632 assert_eq!(
633 members,
634 vec![
635 alice
636 .get_or_try_init_peer_channel()
637 .await
638 .unwrap()
639 .get_node_addr()
640 .await
641 .unwrap()
642 .node_id
643 ]
644 );
645
646 bob.get_or_try_init_peer_channel()
647 .await
648 .unwrap()
649 .join_and_subscribe_gossip(bob, bob_webxdc.id)
650 .await
651 .unwrap()
652 .unwrap()
653 .await
654 .unwrap();
655
656 alice
658 .get_or_try_init_peer_channel()
659 .await
660 .unwrap()
661 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
662 .await
663 .unwrap();
664
665 loop {
666 let event = bob.evtracker.recv().await.unwrap();
667 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
668 if data == "alice -> bob".as_bytes() {
669 break;
670 } else {
671 panic!(
672 "Unexpected status update: {}",
673 String::from_utf8_lossy(&data)
674 );
675 }
676 }
677 }
678 bob.get_or_try_init_peer_channel()
680 .await
681 .unwrap()
682 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
683 .await
684 .unwrap();
685
686 loop {
687 let event = alice.evtracker.recv().await.unwrap();
688 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
689 if data == "bob -> alice".as_bytes() {
690 break;
691 } else {
692 panic!(
693 "Unexpected status update: {}",
694 String::from_utf8_lossy(&data)
695 );
696 }
697 }
698 }
699
700 let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
702 .await
703 .unwrap()
704 .into_iter()
705 .map(|addr| addr.node_id)
706 .collect::<Vec<_>>();
707
708 assert_eq!(
709 members,
710 vec![
711 bob.get_or_try_init_peer_channel()
712 .await
713 .unwrap()
714 .get_node_addr()
715 .await
716 .unwrap()
717 .node_id
718 ]
719 );
720
721 bob.get_or_try_init_peer_channel()
722 .await
723 .unwrap()
724 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
725 .await
726 .unwrap();
727
728 loop {
729 let event = alice.evtracker.recv().await.unwrap();
730 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
731 if data == "bob -> alice 2".as_bytes() {
732 break;
733 } else {
734 panic!(
735 "Unexpected status update: {}",
736 String::from_utf8_lossy(&data)
737 );
738 }
739 }
740 }
741
742 assert!(alice.iroh.read().await.is_some());
745 alice.stop_io().await;
746 assert!(alice.iroh.read().await.is_none());
747 }
748
749 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
750 async fn test_can_reconnect() {
751 let mut tcm = TestContextManager::new();
752 let alice = &mut tcm.alice().await;
753 let bob = &mut tcm.bob().await;
754
755 assert!(
756 alice
757 .get_config_bool(Config::WebxdcRealtimeEnabled)
758 .await
759 .unwrap()
760 );
761 let alice_chat = alice.create_chat(bob).await;
763 let mut instance = Message::new(Viewtype::File);
764 instance
765 .set_file_from_bytes(
766 alice,
767 "minimal.xdc",
768 include_bytes!("../test-data/webxdc/minimal.xdc"),
769 None,
770 )
771 .unwrap();
772
773 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
774 let alice_webxdc = alice.get_last_msg().await;
775 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
776
777 let webxdc = alice.pop_sent_msg().await;
778 let bob_webxdc = bob.recv_msg(&webxdc).await;
779 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
780
781 bob_webxdc.chat_id.accept(bob).await.unwrap();
782
783 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
785 .await
786 .unwrap();
787
788 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
789
790 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
792 .await
793 .unwrap()
794 .into_iter()
795 .map(|addr| addr.node_id)
796 .collect::<Vec<_>>();
797
798 assert_eq!(
799 members,
800 vec![
801 alice
802 .get_or_try_init_peer_channel()
803 .await
804 .unwrap()
805 .get_node_addr()
806 .await
807 .unwrap()
808 .node_id
809 ]
810 );
811
812 bob.get_or_try_init_peer_channel()
813 .await
814 .unwrap()
815 .join_and_subscribe_gossip(bob, bob_webxdc.id)
816 .await
817 .unwrap()
818 .unwrap()
819 .await
820 .unwrap();
821
822 alice
824 .get_or_try_init_peer_channel()
825 .await
826 .unwrap()
827 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
828 .await
829 .unwrap();
830
831 loop {
832 let event = bob.evtracker.recv().await.unwrap();
833 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
834 if data == "alice -> bob".as_bytes() {
835 break;
836 } else {
837 panic!(
838 "Unexpected status update: {}",
839 String::from_utf8_lossy(&data)
840 );
841 }
842 }
843 }
844
845 let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
846 .await
847 .unwrap()
848 .unwrap();
849 let bob_sequence_number = bob
850 .iroh
851 .read()
852 .await
853 .as_ref()
854 .unwrap()
855 .sequence_numbers
856 .lock()
857 .get(&bob_topic)
858 .copied();
859 leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
860 let bob_sequence_number_after = bob
861 .iroh
862 .read()
863 .await
864 .as_ref()
865 .unwrap()
866 .sequence_numbers
867 .lock()
868 .get(&bob_topic)
869 .copied();
870 assert_eq!(bob_sequence_number, bob_sequence_number_after);
872
873 bob.get_or_try_init_peer_channel()
874 .await
875 .unwrap()
876 .join_and_subscribe_gossip(bob, bob_webxdc.id)
877 .await
878 .unwrap()
879 .unwrap()
880 .await
881 .unwrap();
882
883 bob.get_or_try_init_peer_channel()
884 .await
885 .unwrap()
886 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
887 .await
888 .unwrap();
889
890 loop {
891 let event = alice.evtracker.recv().await.unwrap();
892 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
893 if data == "bob -> alice".as_bytes() {
894 break;
895 } else {
896 panic!(
897 "Unexpected status update: {}",
898 String::from_utf8_lossy(&data)
899 );
900 }
901 }
902 }
903
904 assert_eq!(
908 alice
909 .iroh
910 .read()
911 .await
912 .as_ref()
913 .unwrap()
914 .iroh_channels
915 .read()
916 .await
917 .len(),
918 1
919 );
920 leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
921 let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
922 .await
923 .unwrap()
924 .unwrap();
925 assert!(
926 alice
927 .iroh
928 .read()
929 .await
930 .as_ref()
931 .unwrap()
932 .iroh_channels
933 .read()
934 .await
935 .get(&topic)
936 .is_none()
937 );
938 }
939
940 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
941 async fn test_parallel_connect() {
942 let mut tcm = TestContextManager::new();
943 let alice = &mut tcm.alice().await;
944 let bob = &mut tcm.bob().await;
945
946 let chat = alice.create_chat(bob).await.id;
947
948 let mut instance = Message::new(Viewtype::File);
949 instance
950 .set_file_from_bytes(
951 alice,
952 "minimal.xdc",
953 include_bytes!("../test-data/webxdc/minimal.xdc"),
954 None,
955 )
956 .unwrap();
957 connect_alice_bob(alice, chat, &mut instance, bob).await
958 }
959
960 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
961 async fn test_webxdc_resend() {
962 let mut tcm = TestContextManager::new();
963 let alice = &mut tcm.alice().await;
964 let bob = &mut tcm.bob().await;
965 let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat")
966 .await
967 .unwrap();
968
969 let mut instance = Message::new(Viewtype::File);
971 instance
972 .set_file_from_bytes(
973 alice,
974 "minimal.xdc",
975 include_bytes!("../test-data/webxdc/minimal.xdc"),
976 None,
977 )
978 .unwrap();
979
980 add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
981 .await
982 .unwrap();
983
984 connect_alice_bob(alice, group, &mut instance, bob).await;
985
986 let fiona = &mut tcm.fiona().await;
988
989 add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
990 .await
991 .unwrap();
992
993 resend_msgs(alice, &[instance.id]).await.unwrap();
994 let msg = alice.pop_sent_msg().await;
995 let fiona_instance = fiona.recv_msg(&msg).await;
996 fiona_instance.chat_id.accept(fiona).await.unwrap();
997 assert!(fiona.ctx.iroh.read().await.is_none());
998
999 let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
1000 .await
1001 .unwrap()
1002 .unwrap();
1003 let fiona_advert = fiona.pop_sent_msg().await;
1004 alice.recv_msg_trash(&fiona_advert).await;
1005
1006 fiona_connect_future.await.unwrap();
1007
1008 let realtime_send_loop = async {
1009 loop {
1012 send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
1013 .await
1014 .unwrap();
1015 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1016 }
1017 };
1018
1019 let realtime_receive_loop = async {
1020 loop {
1021 let event = fiona.evtracker.recv().await.unwrap();
1022 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1023 if data == b"alice -> bob & fiona" {
1024 break;
1025 } else {
1026 panic!(
1027 "Unexpected status update: {}",
1028 String::from_utf8_lossy(&data)
1029 );
1030 }
1031 }
1032 }
1033 };
1034 tokio::select!(
1035 _ = realtime_send_loop => {
1036 panic!("Send loop should never finish");
1037 },
1038 _ = realtime_receive_loop => {
1039 return;
1040 }
1041 );
1042 }
1043
1044 async fn connect_alice_bob(
1045 alice: &mut TestContext,
1046 alice_chat_id: ChatId,
1047 instance: &mut Message,
1048 bob: &mut TestContext,
1049 ) {
1050 send_msg(alice, alice_chat_id, instance).await.unwrap();
1051 let alice_webxdc = alice.get_last_msg().await;
1052
1053 let webxdc = alice.pop_sent_msg().await;
1054 let bob_webxdc = bob.recv_msg(&webxdc).await;
1055 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
1056
1057 bob_webxdc.chat_id.accept(bob).await.unwrap();
1058
1059 eprintln!("Sending advertisements");
1060 let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
1062 .await
1063 .unwrap()
1064 .unwrap();
1065 let alice_advertisement = alice.pop_sent_msg().await;
1066
1067 let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
1068 .await
1069 .unwrap()
1070 .unwrap();
1071 let bob_advertisement = bob.pop_sent_msg().await;
1072
1073 eprintln!("Receiving advertisements");
1074 bob.recv_msg_trash(&alice_advertisement).await;
1075 alice.recv_msg_trash(&bob_advertisement).await;
1076
1077 eprintln!("Alice and Bob wait for connection");
1078 alice_advertisement_future.await.unwrap();
1079 bob_advertisement_future.await.unwrap();
1080
1081 eprintln!("Sending ephemeral message");
1083 send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
1084 .await
1085 .unwrap();
1086
1087 eprintln!("Waiting for ephemeral message");
1088 loop {
1089 let event = bob.evtracker.recv().await.unwrap();
1090 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1091 if data == b"alice -> bob" {
1092 break;
1093 } else {
1094 panic!(
1095 "Unexpected status update: {}",
1096 String::from_utf8_lossy(&data)
1097 );
1098 }
1099 }
1100 }
1101 }
1102
1103 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1104 async fn test_peer_channels_disabled() {
1105 let mut tcm = TestContextManager::new();
1106 let alice = &mut tcm.alice().await;
1107
1108 alice
1109 .set_config_bool(Config::WebxdcRealtimeEnabled, false)
1110 .await
1111 .unwrap();
1112
1113 send_webxdc_realtime_advertisement(alice, MsgId::new(1))
1115 .await
1116 .unwrap();
1117
1118 assert!(alice.ctx.iroh.read().await.is_none());
1119
1120 send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1122 .await
1123 .unwrap();
1124
1125 assert!(alice.ctx.iroh.read().await.is_none());
1126
1127 leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1128
1129 assert!(alice.ctx.iroh.read().await.is_none());
1130
1131 assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1134 }
1135
1136 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1137 async fn test_leave_webxdc_realtime_uninitialized() {
1138 let mut tcm = TestContextManager::new();
1139 let alice = &mut tcm.alice().await;
1140
1141 alice
1142 .set_config_bool(Config::WebxdcRealtimeEnabled, true)
1143 .await
1144 .unwrap();
1145
1146 assert!(alice.ctx.iroh.read().await.is_none());
1147 leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1148 assert!(alice.ctx.iroh.read().await.is_none());
1149 }
1150}