1use anyhow::{Context as _, Result, anyhow, bail};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{RwLock, oneshot};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::EventType;
40use crate::chat::send_msg;
41use crate::config::Config;
42use crate::context::Context;
43use crate::log::{info, warn};
44use crate::message::{Message, MsgId, Viewtype};
45use crate::mimeparser::SystemMessage;
46
47const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51#[derive(Debug)]
53pub struct Iroh {
54 pub(crate) router: iroh::protocol::Router,
56
57 pub(crate) gossip: Gossip,
59
60 pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63 pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66 pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73 pub(crate) async fn network_change(&self) {
75 self.router.endpoint().network_change().await
76 }
77
78 pub(crate) async fn close(self) -> Result<()> {
80 self.router.shutdown().await.context("Closing iroh failed")
81 }
82
83 async fn join_and_subscribe_gossip(
89 &self,
90 ctx: &Context,
91 msg_id: MsgId,
92 ) -> Result<Option<oneshot::Receiver<()>>> {
93 let topic = get_iroh_topic_for_msg(ctx, msg_id)
94 .await?
95 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97 let mut iroh_channels = self.iroh_channels.write().await;
102
103 if iroh_channels.contains_key(&topic) {
104 return Ok(None);
105 }
106
107 let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108 let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110 info!(
111 ctx,
112 "IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
113 );
114
115 for node_addr in &peers {
117 if !node_addr.is_empty() {
118 self.router.endpoint().add_node_addr(node_addr.clone())?;
119 }
120 }
121
122 let (join_tx, join_rx) = oneshot::channel();
123
124 let (gossip_sender, gossip_receiver) = self
125 .gossip
126 .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127 .split();
128
129 let ctx = ctx.clone();
130 let subscribe_loop = tokio::spawn(async move {
131 if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132 warn!(ctx, "subscribe_loop failed: {e}")
133 }
134 });
135
136 iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138 Ok(Some(join_rx))
139 }
140
141 pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
143 if self.iroh_channels.read().await.get(&topic).is_some() {
144 for peer in &peers {
145 self.router.endpoint().add_node_addr(peer.clone())?;
146 }
147
148 self.gossip.subscribe_with_opts(
149 topic,
150 JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
151 );
152 }
153 Ok(())
154 }
155
156 pub async fn send_webxdc_realtime_data(
158 &self,
159 ctx: &Context,
160 msg_id: MsgId,
161 mut data: Vec<u8>,
162 ) -> Result<()> {
163 let topic = get_iroh_topic_for_msg(ctx, msg_id)
164 .await?
165 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
166 self.join_and_subscribe_gossip(ctx, msg_id).await?;
167
168 let seq_num = self.get_and_incr(&topic);
169
170 let mut iroh_channels = self.iroh_channels.write().await;
171 let state = iroh_channels
172 .get_mut(&topic)
173 .context("Just created state does not exist")?;
174 data.extend(seq_num.to_le_bytes());
175 data.extend(self.public_key.as_bytes());
176
177 state.sender.broadcast(data.into()).await?;
178
179 if env::var("REALTIME_DEBUG").is_ok() {
180 info!(ctx, "Sent realtime data");
181 }
182
183 Ok(())
184 }
185
186 fn get_and_incr(&self, topic: &TopicId) -> i32 {
187 let mut sequence_numbers = self.sequence_numbers.lock();
188 let entry = sequence_numbers.entry(*topic).or_default();
189 *entry = entry.wrapping_add(1);
190 *entry
191 }
192
193 pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
195 let mut addr = self.router.endpoint().node_addr().await?;
196 addr.direct_addresses = BTreeSet::new();
197 Ok(addr)
198 }
199
200 pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
202 if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
203 channel.subscribe_loop.abort();
211 let _ = channel.subscribe_loop.await;
212 }
213 Ok(())
214 }
215}
216
217#[derive(Debug)]
219pub(crate) struct ChannelState {
220 subscribe_loop: JoinHandle<()>,
222
223 sender: iroh_gossip::net::GossipSender,
224}
225
226impl ChannelState {
227 fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
228 Self {
229 subscribe_loop,
230 sender,
231 }
232 }
233}
234
235impl Context {
236 async fn init_peer_channels(&self) -> Result<Iroh> {
238 info!(self, "Initializing peer channels.");
239 let secret_key = SecretKey::generate(rand::rngs::OsRng);
240 let public_key = secret_key.public();
241
242 let relay_mode = if let Some(relay_url) = self
243 .metadata
244 .read()
245 .await
246 .as_ref()
247 .and_then(|conf| conf.iroh_relay.clone())
248 {
249 RelayMode::Custom(RelayUrl::from(relay_url).into())
250 } else {
251 RelayMode::Default
254 };
255
256 let endpoint = Endpoint::builder()
257 .tls_x509() .secret_key(secret_key)
259 .alpns(vec![GOSSIP_ALPN.to_vec()])
260 .relay_mode(relay_mode)
261 .bind()
262 .await?;
263
264 let gossip = Gossip::builder()
270 .max_message_size(128 * 1024)
271 .spawn(endpoint.clone())
272 .await?;
273
274 let router = iroh::protocol::Router::builder(endpoint)
275 .accept(GOSSIP_ALPN, gossip.clone())
276 .spawn();
277
278 Ok(Iroh {
279 router,
280 gossip,
281 sequence_numbers: Mutex::new(HashMap::new()),
282 iroh_channels: RwLock::new(HashMap::new()),
283 public_key,
284 })
285 }
286
287 pub async fn get_or_try_init_peer_channel(
289 &self,
290 ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
291 if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
292 bail!("Attempt to get Iroh when realtime is disabled");
293 }
294
295 if let Ok(lock) = tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
296 self.iroh.read().await,
297 |opt_iroh| opt_iroh.as_ref(),
298 ) {
299 return Ok(lock);
300 }
301
302 let lock = self.iroh.write().await;
303 match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
304 lock,
305 |opt_iroh| opt_iroh.as_ref(),
306 ) {
307 Ok(lock) => Ok(lock),
308 Err(mut lock) => {
309 let iroh = self.init_peer_channels().await?;
310 *lock = Some(iroh);
311 tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
312 lock,
313 |opt_iroh| opt_iroh.as_ref(),
314 )
315 .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
316 }
317 }
318 }
319}
320
321pub(crate) async fn iroh_add_peer_for_topic(
323 ctx: &Context,
324 msg_id: MsgId,
325 topic: TopicId,
326 peer: NodeId,
327 relay_server: Option<&str>,
328) -> Result<()> {
329 ctx.sql
330 .execute(
331 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
332 (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
333 )
334 .await?;
335 Ok(())
336}
337
338pub async fn add_gossip_peer_from_header(
340 context: &Context,
341 instance_id: MsgId,
342 node_addr: &str,
343) -> Result<()> {
344 if !context
345 .get_config_bool(Config::WebxdcRealtimeEnabled)
346 .await?
347 {
348 return Ok(());
349 }
350
351 info!(
352 context,
353 "Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
354 );
355 let node_addr =
356 serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
357
358 context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
359 msg_id: instance_id,
360 });
361
362 let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
363 warn!(
364 context,
365 "Could not add iroh peer because {instance_id} has no topic."
366 );
367 return Ok(());
368 };
369
370 let node_id = node_addr.node_id;
371 let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
372 iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
373
374 let iroh = context.get_or_try_init_peer_channel().await?;
375 iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
376 Ok(())
377}
378
379pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
381 ctx.sql
382 .execute(
383 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
384 (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
385 )
386 .await?;
387 Ok(())
388}
389
390async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
392 ctx.sql
393 .query_map(
394 "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
395 (msg_id, PUBLIC_KEY_STUB),
396 |row| {
397 let key: Vec<u8> = row.get(0)?;
398 let server: Option<String> = row.get(1)?;
399 Ok((key, server))
400 },
401 |g| {
402 g.map(|data| {
403 let (key, server) = data?;
404 let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
405 let id = NodeId::from_bytes(&key.try_into()
406 .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
407 Ok::<_, anyhow::Error>(NodeAddr::from_parts(
408 id, server, vec![]
409 ))
410 })
411 .collect::<std::result::Result<Vec<_>, _>>()
412 },
413 )
414 .await
415}
416
417pub(crate) async fn get_iroh_topic_for_msg(
419 ctx: &Context,
420 msg_id: MsgId,
421) -> Result<Option<TopicId>> {
422 if let Some(bytes) = ctx
423 .sql
424 .query_get_value::<Vec<u8>>(
425 "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
426 (msg_id,),
427 )
428 .await
429 .context("Couldn't restore topic from db")?
430 {
431 let topic_id = TopicId::from_bytes(
432 bytes
433 .try_into()
434 .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
435 );
436 Ok(Some(topic_id))
437 } else {
438 Ok(None)
439 }
440}
441
442pub async fn send_webxdc_realtime_advertisement(
445 ctx: &Context,
446 msg_id: MsgId,
447) -> Result<Option<oneshot::Receiver<()>>> {
448 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
449 return Ok(None);
450 }
451
452 let iroh = ctx.get_or_try_init_peer_channel().await?;
453 let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
454
455 let webxdc = Message::load_from_db(ctx, msg_id).await?;
456 let mut msg = Message::new(Viewtype::Text);
457 msg.hidden = true;
458 msg.param.set_cmd(SystemMessage::IrohNodeAddr);
459 msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
460 send_msg(ctx, webxdc.chat_id, &mut msg).await?;
461 info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
462 Ok(conn)
463}
464
465pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
467 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
468 return Ok(());
469 }
470
471 let iroh = ctx.get_or_try_init_peer_channel().await?;
472 iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
473 Ok(())
474}
475
476pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
478 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
479 return Ok(());
480 }
481 let topic = get_iroh_topic_for_msg(ctx, msg_id)
482 .await?
483 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
484 let iroh = ctx.get_or_try_init_peer_channel().await?;
485 iroh.leave_realtime(topic).await?;
486 info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
487
488 Ok(())
489}
490
491fn create_random_topic() -> TopicId {
493 TopicId::from_bytes(rand::random())
494}
495
496pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
499 let topic = create_random_topic();
500 insert_topic_stub(ctx, msg_id, topic).await?;
501 let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
502 Ok(topic_string)
503}
504
505async fn subscribe_loop(
506 context: &Context,
507 mut stream: iroh_gossip::net::GossipReceiver,
508 topic: TopicId,
509 msg_id: MsgId,
510 join_tx: oneshot::Sender<()>,
511) -> Result<()> {
512 let mut join_tx = Some(join_tx);
513
514 while let Some(event) = stream.try_next().await? {
515 match event {
516 Event::Gossip(event) => match event {
517 GossipEvent::Joined(nodes) => {
518 if let Some(join_tx) = join_tx.take() {
519 join_tx.send(()).ok();
522 }
523
524 for node in nodes {
525 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
526 }
527 }
528 GossipEvent::NeighborUp(node) => {
529 info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
530 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
531 }
532 GossipEvent::NeighborDown(_node) => {}
533 GossipEvent::Received(message) => {
534 info!(context, "IROH_REALTIME: Received realtime data");
535 context.emit_event(EventType::WebxdcRealtimeData {
536 msg_id,
537 data: message
538 .content
539 .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
540 .context("too few bytes in iroh message")?
541 .into(),
542 });
543 }
544 },
545 Event::Lagged => {
546 warn!(context, "Gossip lost some messages");
547 }
548 };
549 }
550 Ok(())
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use crate::{
557 EventType,
558 chat::send_msg,
559 message::{Message, Viewtype},
560 test_utils::TestContextManager,
561 };
562
563 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
564 async fn test_can_communicate() {
565 let mut tcm = TestContextManager::new();
566 let alice = &mut tcm.alice().await;
567 let bob = &mut tcm.bob().await;
568
569 let alice_chat = alice.create_chat(bob).await;
571 let mut instance = Message::new(Viewtype::File);
572 instance
573 .set_file_from_bytes(
574 alice,
575 "minimal.xdc",
576 include_bytes!("../test-data/webxdc/minimal.xdc"),
577 None,
578 )
579 .unwrap();
580
581 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
582 let alice_webxdc = alice.get_last_msg().await;
583 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
584
585 let webxdc = alice.pop_sent_msg().await;
586 let bob_webxdc = bob.recv_msg(&webxdc).await;
587 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
588
589 bob_webxdc.chat_id.accept(bob).await.unwrap();
590
591 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
593 .await
594 .unwrap();
595
596 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
597 loop {
598 let event = bob.evtracker.recv().await.unwrap();
599 if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
600 assert!(msg_id == alice_webxdc.id);
601 break;
602 }
603 }
604
605 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
607 .await
608 .unwrap()
609 .into_iter()
610 .map(|addr| addr.node_id)
611 .collect::<Vec<_>>();
612
613 assert_eq!(
614 members,
615 vec![
616 alice
617 .get_or_try_init_peer_channel()
618 .await
619 .unwrap()
620 .get_node_addr()
621 .await
622 .unwrap()
623 .node_id
624 ]
625 );
626
627 bob.get_or_try_init_peer_channel()
628 .await
629 .unwrap()
630 .join_and_subscribe_gossip(bob, bob_webxdc.id)
631 .await
632 .unwrap()
633 .unwrap()
634 .await
635 .unwrap();
636
637 alice
639 .get_or_try_init_peer_channel()
640 .await
641 .unwrap()
642 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
643 .await
644 .unwrap();
645
646 loop {
647 let event = bob.evtracker.recv().await.unwrap();
648 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
649 if data == "alice -> bob".as_bytes() {
650 break;
651 } else {
652 panic!(
653 "Unexpected status update: {}",
654 String::from_utf8_lossy(&data)
655 );
656 }
657 }
658 }
659 bob.get_or_try_init_peer_channel()
661 .await
662 .unwrap()
663 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
664 .await
665 .unwrap();
666
667 loop {
668 let event = alice.evtracker.recv().await.unwrap();
669 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
670 if data == "bob -> alice".as_bytes() {
671 break;
672 } else {
673 panic!(
674 "Unexpected status update: {}",
675 String::from_utf8_lossy(&data)
676 );
677 }
678 }
679 }
680
681 let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
683 .await
684 .unwrap()
685 .into_iter()
686 .map(|addr| addr.node_id)
687 .collect::<Vec<_>>();
688
689 assert_eq!(
690 members,
691 vec![
692 bob.get_or_try_init_peer_channel()
693 .await
694 .unwrap()
695 .get_node_addr()
696 .await
697 .unwrap()
698 .node_id
699 ]
700 );
701
702 bob.get_or_try_init_peer_channel()
703 .await
704 .unwrap()
705 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
706 .await
707 .unwrap();
708
709 loop {
710 let event = alice.evtracker.recv().await.unwrap();
711 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
712 if data == "bob -> alice 2".as_bytes() {
713 break;
714 } else {
715 panic!(
716 "Unexpected status update: {}",
717 String::from_utf8_lossy(&data)
718 );
719 }
720 }
721 }
722
723 assert!(alice.iroh.read().await.is_some());
726 alice.stop_io().await;
727 assert!(alice.iroh.read().await.is_none());
728 }
729
730 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
731 async fn test_can_reconnect() {
732 let mut tcm = TestContextManager::new();
733 let alice = &mut tcm.alice().await;
734 let bob = &mut tcm.bob().await;
735
736 assert!(
737 alice
738 .get_config_bool(Config::WebxdcRealtimeEnabled)
739 .await
740 .unwrap()
741 );
742 let alice_chat = alice.create_chat(bob).await;
744 let mut instance = Message::new(Viewtype::File);
745 instance
746 .set_file_from_bytes(
747 alice,
748 "minimal.xdc",
749 include_bytes!("../test-data/webxdc/minimal.xdc"),
750 None,
751 )
752 .unwrap();
753
754 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
755 let alice_webxdc = alice.get_last_msg().await;
756 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
757
758 let webxdc = alice.pop_sent_msg().await;
759 let bob_webxdc = bob.recv_msg(&webxdc).await;
760 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
761
762 bob_webxdc.chat_id.accept(bob).await.unwrap();
763
764 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
766 .await
767 .unwrap();
768
769 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
770
771 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
773 .await
774 .unwrap()
775 .into_iter()
776 .map(|addr| addr.node_id)
777 .collect::<Vec<_>>();
778
779 assert_eq!(
780 members,
781 vec![
782 alice
783 .get_or_try_init_peer_channel()
784 .await
785 .unwrap()
786 .get_node_addr()
787 .await
788 .unwrap()
789 .node_id
790 ]
791 );
792
793 bob.get_or_try_init_peer_channel()
794 .await
795 .unwrap()
796 .join_and_subscribe_gossip(bob, bob_webxdc.id)
797 .await
798 .unwrap()
799 .unwrap()
800 .await
801 .unwrap();
802
803 alice
805 .get_or_try_init_peer_channel()
806 .await
807 .unwrap()
808 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
809 .await
810 .unwrap();
811
812 loop {
813 let event = bob.evtracker.recv().await.unwrap();
814 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
815 if data == "alice -> bob".as_bytes() {
816 break;
817 } else {
818 panic!(
819 "Unexpected status update: {}",
820 String::from_utf8_lossy(&data)
821 );
822 }
823 }
824 }
825
826 let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
827 .await
828 .unwrap()
829 .unwrap();
830 let bob_sequence_number = bob
831 .iroh
832 .read()
833 .await
834 .as_ref()
835 .unwrap()
836 .sequence_numbers
837 .lock()
838 .get(&bob_topic)
839 .copied();
840 leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
841 let bob_sequence_number_after = bob
842 .iroh
843 .read()
844 .await
845 .as_ref()
846 .unwrap()
847 .sequence_numbers
848 .lock()
849 .get(&bob_topic)
850 .copied();
851 assert_eq!(bob_sequence_number, bob_sequence_number_after);
853
854 bob.get_or_try_init_peer_channel()
855 .await
856 .unwrap()
857 .join_and_subscribe_gossip(bob, bob_webxdc.id)
858 .await
859 .unwrap()
860 .unwrap()
861 .await
862 .unwrap();
863
864 bob.get_or_try_init_peer_channel()
865 .await
866 .unwrap()
867 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
868 .await
869 .unwrap();
870
871 loop {
872 let event = alice.evtracker.recv().await.unwrap();
873 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
874 if data == "bob -> alice".as_bytes() {
875 break;
876 } else {
877 panic!(
878 "Unexpected status update: {}",
879 String::from_utf8_lossy(&data)
880 );
881 }
882 }
883 }
884
885 assert_eq!(
889 alice
890 .iroh
891 .read()
892 .await
893 .as_ref()
894 .unwrap()
895 .iroh_channels
896 .read()
897 .await
898 .len(),
899 1
900 );
901 leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
902 let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
903 .await
904 .unwrap()
905 .unwrap();
906 assert!(
907 alice
908 .iroh
909 .read()
910 .await
911 .as_ref()
912 .unwrap()
913 .iroh_channels
914 .read()
915 .await
916 .get(&topic)
917 .is_none()
918 );
919 }
920
921 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
922 async fn test_parallel_connect() {
923 let mut tcm = TestContextManager::new();
924 let alice = &mut tcm.alice().await;
925 let bob = &mut tcm.bob().await;
926
927 let alice_chat = alice.create_chat(bob).await;
929 let mut instance = Message::new(Viewtype::File);
930 instance
931 .set_file_from_bytes(
932 alice,
933 "minimal.xdc",
934 include_bytes!("../test-data/webxdc/minimal.xdc"),
935 None,
936 )
937 .unwrap();
938 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
939 let alice_webxdc = alice.get_last_msg().await;
940
941 let webxdc = alice.pop_sent_msg().await;
942 let bob_webxdc = bob.recv_msg(&webxdc).await;
943 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
944
945 bob_webxdc.chat_id.accept(bob).await.unwrap();
946
947 eprintln!("Sending advertisements");
948 let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
950 .await
951 .unwrap()
952 .unwrap();
953 let alice_advertisement = alice.pop_sent_msg().await;
954
955 send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
956 .await
957 .unwrap();
958 let bob_advertisement = bob.pop_sent_msg().await;
959
960 eprintln!("Receiving advertisements");
961 bob.recv_msg_trash(&alice_advertisement).await;
962 alice.recv_msg_trash(&bob_advertisement).await;
963
964 eprintln!("Alice waits for connection");
965 alice_advertisement_future.await.unwrap();
966
967 eprintln!("Sending ephemeral message");
969 send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
970 .await
971 .unwrap();
972
973 eprintln!("Waiting for ephemeral message");
974 loop {
975 let event = bob.evtracker.recv().await.unwrap();
976 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
977 if data == b"alice -> bob" {
978 break;
979 } else {
980 panic!(
981 "Unexpected status update: {}",
982 String::from_utf8_lossy(&data)
983 );
984 }
985 }
986 }
987 }
988
989 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
990 async fn test_peer_channels_disabled() {
991 let mut tcm = TestContextManager::new();
992 let alice = &mut tcm.alice().await;
993
994 alice
995 .set_config_bool(Config::WebxdcRealtimeEnabled, false)
996 .await
997 .unwrap();
998
999 send_webxdc_realtime_advertisement(alice, MsgId::new(1))
1001 .await
1002 .unwrap();
1003
1004 assert!(alice.ctx.iroh.read().await.is_none());
1005
1006 send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1008 .await
1009 .unwrap();
1010
1011 assert!(alice.ctx.iroh.read().await.is_none());
1012
1013 leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1015
1016 assert!(alice.ctx.iroh.read().await.is_none());
1017
1018 assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1021 }
1022}