1use anyhow::{Context as _, Result, anyhow, bail};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{RwLock, oneshot};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::EventType;
40use crate::chat::send_msg;
41use crate::config::Config;
42use crate::context::Context;
43use crate::log::warn;
44use crate::message::{Message, MsgId, Viewtype};
45use crate::mimeparser::SystemMessage;
46
47const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51#[derive(Debug)]
53pub struct Iroh {
54 pub(crate) router: iroh::protocol::Router,
56
57 pub(crate) gossip: Gossip,
59
60 pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63 pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66 pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73 pub(crate) async fn network_change(&self) {
75 self.router.endpoint().network_change().await
76 }
77
78 pub(crate) async fn close(self) -> Result<()> {
80 self.router.shutdown().await.context("Closing iroh failed")
81 }
82
83 async fn join_and_subscribe_gossip(
89 &self,
90 ctx: &Context,
91 msg_id: MsgId,
92 ) -> Result<Option<oneshot::Receiver<()>>> {
93 let topic = get_iroh_topic_for_msg(ctx, msg_id)
94 .await?
95 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97 let mut iroh_channels = self.iroh_channels.write().await;
102
103 if iroh_channels.contains_key(&topic) {
104 return Ok(None);
105 }
106
107 let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108 let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110 info!(
111 ctx,
112 "IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
113 );
114
115 for node_addr in &peers {
117 if !node_addr.is_empty() {
118 self.router.endpoint().add_node_addr(node_addr.clone())?;
119 }
120 }
121
122 let (join_tx, join_rx) = oneshot::channel();
123
124 let (gossip_sender, gossip_receiver) = self
125 .gossip
126 .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127 .split();
128
129 let ctx = ctx.clone();
130 let subscribe_loop = tokio::spawn(async move {
131 if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132 warn!(ctx, "subscribe_loop failed: {e}")
133 }
134 });
135
136 iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138 Ok(Some(join_rx))
139 }
140
141 pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
143 if self.iroh_channels.read().await.get(&topic).is_some() {
144 self.router.endpoint().add_node_addr(peer.clone())?;
145 self.gossip.subscribe(topic, vec![peer.node_id])?;
146 }
147 Ok(())
148 }
149
150 pub async fn send_webxdc_realtime_data(
152 &self,
153 ctx: &Context,
154 msg_id: MsgId,
155 mut data: Vec<u8>,
156 ) -> Result<()> {
157 let topic = get_iroh_topic_for_msg(ctx, msg_id)
158 .await?
159 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
160 self.join_and_subscribe_gossip(ctx, msg_id).await?;
161
162 let seq_num = self.get_and_incr(&topic);
163
164 let mut iroh_channels = self.iroh_channels.write().await;
165 let state = iroh_channels
166 .get_mut(&topic)
167 .context("Just created state does not exist")?;
168 data.extend(seq_num.to_le_bytes());
169 data.extend(self.public_key.as_bytes());
170
171 state.sender.broadcast(data.into()).await?;
172
173 if env::var("REALTIME_DEBUG").is_ok() {
174 info!(ctx, "Sent realtime data");
175 }
176
177 Ok(())
178 }
179
180 fn get_and_incr(&self, topic: &TopicId) -> i32 {
181 let mut sequence_numbers = self.sequence_numbers.lock();
182 let entry = sequence_numbers.entry(*topic).or_default();
183 *entry = entry.wrapping_add(1);
184 *entry
185 }
186
187 pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
193 let mut addr = self.router.endpoint().node_addr().await?;
194 addr.direct_addresses = BTreeSet::new();
195 debug_assert!(addr.relay_url().is_some());
196 Ok(addr)
197 }
198
199 pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
201 if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
202 channel.subscribe_loop.abort();
210 let _ = channel.subscribe_loop.await;
211 }
212 Ok(())
213 }
214}
215
216#[derive(Debug)]
218pub(crate) struct ChannelState {
219 subscribe_loop: JoinHandle<()>,
221
222 sender: iroh_gossip::net::GossipSender,
223}
224
225impl ChannelState {
226 fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
227 Self {
228 subscribe_loop,
229 sender,
230 }
231 }
232}
233
234impl Context {
235 async fn init_peer_channels(&self) -> Result<Iroh> {
237 info!(self, "Initializing peer channels.");
238 let secret_key = SecretKey::generate(rand_old::rngs::OsRng);
239 let public_key = secret_key.public();
240
241 let relay_mode = if let Some(relay_url) = self
242 .metadata
243 .read()
244 .await
245 .as_ref()
246 .and_then(|conf| conf.iroh_relay.clone())
247 {
248 RelayMode::Custom(RelayUrl::from(relay_url).into())
249 } else {
250 RelayMode::Default
253 };
254
255 let endpoint = Endpoint::builder()
256 .tls_x509() .secret_key(secret_key)
258 .alpns(vec![GOSSIP_ALPN.to_vec()])
259 .relay_mode(relay_mode)
260 .bind()
261 .await?;
262
263 let gossip = Gossip::builder()
269 .max_message_size(128 * 1024)
270 .spawn(endpoint.clone())
271 .await?;
272
273 let router = iroh::protocol::Router::builder(endpoint)
274 .accept(GOSSIP_ALPN, gossip.clone())
275 .spawn();
276
277 Ok(Iroh {
278 router,
279 gossip,
280 sequence_numbers: Mutex::new(HashMap::new()),
281 iroh_channels: RwLock::new(HashMap::new()),
282 public_key,
283 })
284 }
285
286 pub async fn get_peer_channels(&self) -> Option<tokio::sync::RwLockReadGuard<'_, Iroh>> {
288 tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
289 self.iroh.read().await,
290 |opt_iroh| opt_iroh.as_ref(),
291 )
292 .ok()
293 }
294
295 pub async fn get_or_try_init_peer_channel(
297 &self,
298 ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
299 if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
300 bail!("Attempt to initialize Iroh when realtime is disabled");
301 }
302
303 if let Some(lock) = self.get_peer_channels().await {
304 return Ok(lock);
305 }
306
307 let lock = self.iroh.write().await;
308 match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
309 lock,
310 |opt_iroh| opt_iroh.as_ref(),
311 ) {
312 Ok(lock) => Ok(lock),
313 Err(mut lock) => {
314 let iroh = self.init_peer_channels().await?;
315 *lock = Some(iroh);
316 tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
317 lock,
318 |opt_iroh| opt_iroh.as_ref(),
319 )
320 .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
321 }
322 }
323 }
324
325 pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
326 if let Some(iroh) = &*self.iroh.read().await {
327 info!(
328 self,
329 "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
330 );
331 iroh.maybe_add_gossip_peer(topic, peer).await?;
332 }
333 Ok(())
334 }
335}
336
337pub(crate) async fn iroh_add_peer_for_topic(
339 ctx: &Context,
340 msg_id: MsgId,
341 topic: TopicId,
342 peer: NodeId,
343 relay_server: Option<&str>,
344) -> Result<()> {
345 ctx.sql
346 .execute(
347 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
348 (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
349 )
350 .await?;
351 Ok(())
352}
353
354pub async fn add_gossip_peer_from_header(
356 context: &Context,
357 instance_id: MsgId,
358 node_addr: &str,
359) -> Result<()> {
360 if !context
361 .get_config_bool(Config::WebxdcRealtimeEnabled)
362 .await?
363 {
364 return Ok(());
365 }
366
367 let node_addr =
368 serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
369
370 info!(
371 context,
372 "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
373 );
374
375 context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
376 msg_id: instance_id,
377 });
378
379 let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
380 warn!(
381 context,
382 "Could not add iroh peer because {instance_id} has no topic."
383 );
384 return Ok(());
385 };
386
387 let node_id = node_addr.node_id;
388 let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
389 iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
390
391 context.maybe_add_gossip_peer(topic, node_addr).await?;
392 Ok(())
393}
394
395pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
397 ctx.sql
398 .execute(
399 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
400 (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
401 )
402 .await?;
403 Ok(())
404}
405
406async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
408 ctx.sql
409 .query_map(
410 "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
411 (msg_id, PUBLIC_KEY_STUB),
412 |row| {
413 let key: Vec<u8> = row.get(0)?;
414 let server: Option<String> = row.get(1)?;
415 Ok((key, server))
416 },
417 |g| {
418 g.map(|data| {
419 let (key, server) = data?;
420 let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
421 let id = NodeId::from_bytes(&key.try_into()
422 .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
423 Ok::<_, anyhow::Error>(NodeAddr::from_parts(
424 id, server, vec![]
425 ))
426 })
427 .collect::<std::result::Result<Vec<_>, _>>()
428 },
429 )
430 .await
431}
432
433pub(crate) async fn get_iroh_topic_for_msg(
435 ctx: &Context,
436 msg_id: MsgId,
437) -> Result<Option<TopicId>> {
438 if let Some(bytes) = ctx
439 .sql
440 .query_get_value::<Vec<u8>>(
441 "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
442 (msg_id,),
443 )
444 .await
445 .context("Couldn't restore topic from db")?
446 {
447 let topic_id = TopicId::from_bytes(
448 bytes
449 .try_into()
450 .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
451 );
452 Ok(Some(topic_id))
453 } else {
454 Ok(None)
455 }
456}
457
458pub async fn send_webxdc_realtime_advertisement(
461 ctx: &Context,
462 msg_id: MsgId,
463) -> Result<Option<oneshot::Receiver<()>>> {
464 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
465 return Ok(None);
466 }
467
468 let iroh = ctx.get_or_try_init_peer_channel().await?;
469 let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
470
471 let webxdc = Message::load_from_db(ctx, msg_id).await?;
472 let mut msg = Message::new(Viewtype::Text);
473 msg.hidden = true;
474 msg.param.set_cmd(SystemMessage::IrohNodeAddr);
475 msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
476 send_msg(ctx, webxdc.chat_id, &mut msg).await?;
477 info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
478 Ok(conn)
479}
480
481pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
483 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
484 return Ok(());
485 }
486
487 let iroh = ctx.get_or_try_init_peer_channel().await?;
488 iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
489 Ok(())
490}
491
492pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
498 let Some(iroh) = ctx.get_peer_channels().await else {
499 return Ok(());
500 };
501 let Some(topic) = get_iroh_topic_for_msg(ctx, msg_id).await? else {
502 return Ok(());
503 };
504 iroh.leave_realtime(topic).await?;
505 info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
506
507 Ok(())
508}
509
510fn create_random_topic() -> TopicId {
512 TopicId::from_bytes(rand::random())
513}
514
515pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
518 let topic = create_random_topic();
519 insert_topic_stub(ctx, msg_id, topic).await?;
520 let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
521 Ok(topic_string)
522}
523
524pub(crate) fn iroh_topic_from_str(topic: &str) -> Result<TopicId> {
526 let mut topic_raw = [0u8; 32];
527 BASE32_NOPAD
528 .decode_mut(topic.to_ascii_uppercase().as_bytes(), &mut topic_raw)
529 .map_err(|e| e.error)
530 .context("Wrong gossip topic header")?;
531
532 let topic = TopicId::from_bytes(topic_raw);
533 Ok(topic)
534}
535
536#[expect(clippy::arithmetic_side_effects)]
537async fn subscribe_loop(
538 context: &Context,
539 mut stream: iroh_gossip::net::GossipReceiver,
540 topic: TopicId,
541 msg_id: MsgId,
542 join_tx: oneshot::Sender<()>,
543) -> Result<()> {
544 let mut join_tx = Some(join_tx);
545
546 while let Some(event) = stream.try_next().await? {
547 match event {
548 Event::Gossip(event) => match event {
549 GossipEvent::Joined(nodes) => {
550 if let Some(join_tx) = join_tx.take() {
551 join_tx.send(()).ok();
554 }
555
556 for node in nodes {
557 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
558 }
559 }
560 GossipEvent::NeighborUp(node) => {
561 info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
562 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
563 }
564 GossipEvent::NeighborDown(_node) => {}
565 GossipEvent::Received(message) => {
566 info!(context, "IROH_REALTIME: Received realtime data");
567 context.emit_event(EventType::WebxdcRealtimeData {
568 msg_id,
569 data: message
570 .content
571 .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
572 .context("too few bytes in iroh message")?
573 .into(),
574 });
575 }
576 },
577 Event::Lagged => {
578 warn!(context, "Gossip lost some messages");
579 }
580 };
581 }
582 Ok(())
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 use crate::{
589 EventType,
590 chat::{self, ChatId, add_contact_to_chat, resend_msgs, send_msg},
591 message::{Message, Viewtype},
592 test_utils::{TestContext, TestContextManager},
593 };
594
595 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
596 async fn test_can_communicate() {
597 let mut tcm = TestContextManager::new();
598 let alice = &mut tcm.alice().await;
599 let bob = &mut tcm.bob().await;
600
601 let alice_chat = alice.create_chat(bob).await;
603 let mut instance = Message::new(Viewtype::File);
604 instance
605 .set_file_from_bytes(
606 alice,
607 "minimal.xdc",
608 include_bytes!("../test-data/webxdc/minimal.xdc"),
609 None,
610 )
611 .unwrap();
612
613 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
614 let alice_webxdc = alice.get_last_msg().await;
615 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
616
617 let webxdc = alice.pop_sent_msg().await;
618 let bob_webxdc = bob.recv_msg(&webxdc).await;
619 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
620
621 bob_webxdc.chat_id.accept(bob).await.unwrap();
622
623 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
625 .await
626 .unwrap();
627
628 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
629 loop {
630 let event = bob.evtracker.recv().await.unwrap();
631 if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
632 assert!(msg_id == bob_webxdc.id);
633 break;
634 }
635 }
636
637 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
639 .await
640 .unwrap()
641 .into_iter()
642 .map(|addr| addr.node_id)
643 .collect::<Vec<_>>();
644
645 assert_eq!(
646 members,
647 vec![
648 alice
649 .get_or_try_init_peer_channel()
650 .await
651 .unwrap()
652 .get_node_addr()
653 .await
654 .unwrap()
655 .node_id
656 ]
657 );
658
659 bob.get_or_try_init_peer_channel()
660 .await
661 .unwrap()
662 .join_and_subscribe_gossip(bob, bob_webxdc.id)
663 .await
664 .unwrap()
665 .unwrap()
666 .await
667 .unwrap();
668
669 alice
671 .get_or_try_init_peer_channel()
672 .await
673 .unwrap()
674 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
675 .await
676 .unwrap();
677
678 loop {
679 let event = bob.evtracker.recv().await.unwrap();
680 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
681 if data == "alice -> bob".as_bytes() {
682 break;
683 } else {
684 panic!(
685 "Unexpected status update: {}",
686 String::from_utf8_lossy(&data)
687 );
688 }
689 }
690 }
691 bob.get_or_try_init_peer_channel()
693 .await
694 .unwrap()
695 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
696 .await
697 .unwrap();
698
699 loop {
700 let event = alice.evtracker.recv().await.unwrap();
701 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
702 if data == "bob -> alice".as_bytes() {
703 break;
704 } else {
705 panic!(
706 "Unexpected status update: {}",
707 String::from_utf8_lossy(&data)
708 );
709 }
710 }
711 }
712
713 let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
715 .await
716 .unwrap()
717 .into_iter()
718 .map(|addr| addr.node_id)
719 .collect::<Vec<_>>();
720
721 assert_eq!(
722 members,
723 vec![
724 bob.get_or_try_init_peer_channel()
725 .await
726 .unwrap()
727 .get_node_addr()
728 .await
729 .unwrap()
730 .node_id
731 ]
732 );
733
734 bob.get_or_try_init_peer_channel()
735 .await
736 .unwrap()
737 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
738 .await
739 .unwrap();
740
741 loop {
742 let event = alice.evtracker.recv().await.unwrap();
743 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
744 if data == "bob -> alice 2".as_bytes() {
745 break;
746 } else {
747 panic!(
748 "Unexpected status update: {}",
749 String::from_utf8_lossy(&data)
750 );
751 }
752 }
753 }
754
755 assert!(alice.iroh.read().await.is_some());
758 alice.stop_io().await;
759 assert!(alice.iroh.read().await.is_none());
760 }
761
762 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
763 async fn test_can_reconnect() {
764 let mut tcm = TestContextManager::new();
765 let alice = &mut tcm.alice().await;
766 let bob = &mut tcm.bob().await;
767
768 assert!(
769 alice
770 .get_config_bool(Config::WebxdcRealtimeEnabled)
771 .await
772 .unwrap()
773 );
774 let alice_chat = alice.create_chat(bob).await;
776 let mut instance = Message::new(Viewtype::File);
777 instance
778 .set_file_from_bytes(
779 alice,
780 "minimal.xdc",
781 include_bytes!("../test-data/webxdc/minimal.xdc"),
782 None,
783 )
784 .unwrap();
785
786 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
787 let alice_webxdc = alice.get_last_msg().await;
788 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
789
790 let webxdc = alice.pop_sent_msg().await;
791 let bob_webxdc = bob.recv_msg(&webxdc).await;
792 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
793
794 bob_webxdc.chat_id.accept(bob).await.unwrap();
795
796 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
798 .await
799 .unwrap();
800
801 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
802
803 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
805 .await
806 .unwrap()
807 .into_iter()
808 .map(|addr| addr.node_id)
809 .collect::<Vec<_>>();
810
811 assert_eq!(
812 members,
813 vec![
814 alice
815 .get_or_try_init_peer_channel()
816 .await
817 .unwrap()
818 .get_node_addr()
819 .await
820 .unwrap()
821 .node_id
822 ]
823 );
824
825 bob.get_or_try_init_peer_channel()
826 .await
827 .unwrap()
828 .join_and_subscribe_gossip(bob, bob_webxdc.id)
829 .await
830 .unwrap()
831 .unwrap()
832 .await
833 .unwrap();
834
835 alice
837 .get_or_try_init_peer_channel()
838 .await
839 .unwrap()
840 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
841 .await
842 .unwrap();
843
844 loop {
845 let event = bob.evtracker.recv().await.unwrap();
846 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
847 if data == "alice -> bob".as_bytes() {
848 break;
849 } else {
850 panic!(
851 "Unexpected status update: {}",
852 String::from_utf8_lossy(&data)
853 );
854 }
855 }
856 }
857
858 let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
859 .await
860 .unwrap()
861 .unwrap();
862 let bob_sequence_number = bob
863 .iroh
864 .read()
865 .await
866 .as_ref()
867 .unwrap()
868 .sequence_numbers
869 .lock()
870 .get(&bob_topic)
871 .copied();
872 leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
873 let bob_sequence_number_after = bob
874 .iroh
875 .read()
876 .await
877 .as_ref()
878 .unwrap()
879 .sequence_numbers
880 .lock()
881 .get(&bob_topic)
882 .copied();
883 assert_eq!(bob_sequence_number, bob_sequence_number_after);
885
886 bob.get_or_try_init_peer_channel()
887 .await
888 .unwrap()
889 .join_and_subscribe_gossip(bob, bob_webxdc.id)
890 .await
891 .unwrap()
892 .unwrap()
893 .await
894 .unwrap();
895
896 bob.get_or_try_init_peer_channel()
897 .await
898 .unwrap()
899 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
900 .await
901 .unwrap();
902
903 loop {
904 let event = alice.evtracker.recv().await.unwrap();
905 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
906 if data == "bob -> alice".as_bytes() {
907 break;
908 } else {
909 panic!(
910 "Unexpected status update: {}",
911 String::from_utf8_lossy(&data)
912 );
913 }
914 }
915 }
916
917 assert_eq!(
921 alice
922 .iroh
923 .read()
924 .await
925 .as_ref()
926 .unwrap()
927 .iroh_channels
928 .read()
929 .await
930 .len(),
931 1
932 );
933 leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
934 let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
935 .await
936 .unwrap()
937 .unwrap();
938 assert!(
939 alice
940 .iroh
941 .read()
942 .await
943 .as_ref()
944 .unwrap()
945 .iroh_channels
946 .read()
947 .await
948 .get(&topic)
949 .is_none()
950 );
951 }
952
953 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
954 async fn test_parallel_connect() {
955 let mut tcm = TestContextManager::new();
956 let alice = &mut tcm.alice().await;
957 let bob = &mut tcm.bob().await;
958
959 let chat = alice.create_chat(bob).await.id;
960
961 let mut instance = Message::new(Viewtype::File);
962 instance
963 .set_file_from_bytes(
964 alice,
965 "minimal.xdc",
966 include_bytes!("../test-data/webxdc/minimal.xdc"),
967 None,
968 )
969 .unwrap();
970 connect_alice_bob(alice, chat, &mut instance, bob).await
971 }
972
973 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
974 async fn test_webxdc_resend() {
975 let mut tcm = TestContextManager::new();
976 let alice = &mut tcm.alice().await;
977 let bob = &mut tcm.bob().await;
978 let group = chat::create_group(alice, "group chat").await.unwrap();
979
980 let mut instance = Message::new(Viewtype::File);
982 instance
983 .set_file_from_bytes(
984 alice,
985 "minimal.xdc",
986 include_bytes!("../test-data/webxdc/minimal.xdc"),
987 None,
988 )
989 .unwrap();
990
991 add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
992 .await
993 .unwrap();
994
995 connect_alice_bob(alice, group, &mut instance, bob).await;
996
997 let fiona = &mut tcm.fiona().await;
999
1000 add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
1001 .await
1002 .unwrap();
1003
1004 resend_msgs(alice, &[instance.id]).await.unwrap();
1005 let msg = alice.pop_sent_msg().await;
1006 let fiona_instance = fiona.recv_msg(&msg).await;
1007 fiona_instance.chat_id.accept(fiona).await.unwrap();
1008 assert!(fiona.ctx.iroh.read().await.is_none());
1009
1010 let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
1011 .await
1012 .unwrap()
1013 .unwrap();
1014 let fiona_advert = fiona.pop_sent_msg().await;
1015 alice.recv_msg_trash(&fiona_advert).await;
1016
1017 fiona_connect_future.await.unwrap();
1018
1019 let realtime_send_loop = async {
1020 loop {
1023 send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
1024 .await
1025 .unwrap();
1026 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1027 }
1028 };
1029
1030 let realtime_receive_loop = async {
1031 loop {
1032 let event = fiona.evtracker.recv().await.unwrap();
1033 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1034 if data == b"alice -> bob & fiona" {
1035 break;
1036 } else {
1037 panic!(
1038 "Unexpected status update: {}",
1039 String::from_utf8_lossy(&data)
1040 );
1041 }
1042 }
1043 }
1044 };
1045 tokio::select!(
1046 _ = realtime_send_loop => {
1047 panic!("Send loop should never finish");
1048 },
1049 _ = realtime_receive_loop => {
1050 return;
1051 }
1052 );
1053 }
1054
1055 async fn connect_alice_bob(
1056 alice: &mut TestContext,
1057 alice_chat_id: ChatId,
1058 instance: &mut Message,
1059 bob: &mut TestContext,
1060 ) {
1061 send_msg(alice, alice_chat_id, instance).await.unwrap();
1062 let alice_webxdc = alice.get_last_msg().await;
1063
1064 let webxdc = alice.pop_sent_msg().await;
1065 let bob_webxdc = bob.recv_msg(&webxdc).await;
1066 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
1067
1068 bob_webxdc.chat_id.accept(bob).await.unwrap();
1069
1070 eprintln!("Sending advertisements");
1071 let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
1073 .await
1074 .unwrap()
1075 .unwrap();
1076 let alice_advertisement = alice.pop_sent_msg().await;
1077
1078 let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
1079 .await
1080 .unwrap()
1081 .unwrap();
1082 let bob_advertisement = bob.pop_sent_msg().await;
1083
1084 eprintln!("Receiving advertisements");
1085 bob.recv_msg_trash(&alice_advertisement).await;
1086 alice.recv_msg_trash(&bob_advertisement).await;
1087
1088 eprintln!("Alice and Bob wait for connection");
1089 alice_advertisement_future.await.unwrap();
1090 bob_advertisement_future.await.unwrap();
1091
1092 eprintln!("Sending ephemeral message");
1094 send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
1095 .await
1096 .unwrap();
1097
1098 eprintln!("Waiting for ephemeral message");
1099 loop {
1100 let event = bob.evtracker.recv().await.unwrap();
1101 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1102 if data == b"alice -> bob" {
1103 break;
1104 } else {
1105 panic!(
1106 "Unexpected status update: {}",
1107 String::from_utf8_lossy(&data)
1108 );
1109 }
1110 }
1111 }
1112 }
1113
1114 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1115 async fn test_peer_channels_disabled() {
1116 let mut tcm = TestContextManager::new();
1117 let alice = &mut tcm.alice().await;
1118
1119 alice
1120 .set_config_bool(Config::WebxdcRealtimeEnabled, false)
1121 .await
1122 .unwrap();
1123
1124 send_webxdc_realtime_advertisement(alice, MsgId::new(1))
1126 .await
1127 .unwrap();
1128
1129 assert!(alice.ctx.iroh.read().await.is_none());
1130
1131 send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1133 .await
1134 .unwrap();
1135
1136 assert!(alice.ctx.iroh.read().await.is_none());
1137
1138 leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1139
1140 assert!(alice.ctx.iroh.read().await.is_none());
1141
1142 assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1145 }
1146
1147 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1148 async fn test_leave_webxdc_realtime_uninitialized() {
1149 let mut tcm = TestContextManager::new();
1150 let alice = &mut tcm.alice().await;
1151
1152 alice
1153 .set_config_bool(Config::WebxdcRealtimeEnabled, true)
1154 .await
1155 .unwrap();
1156
1157 assert!(alice.ctx.iroh.read().await.is_none());
1158 leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1159 assert!(alice.ctx.iroh.read().await.is_none());
1160 }
1161}