1use anyhow::{anyhow, bail, Context as _, Result};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMap, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{oneshot, RwLock};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::chat::send_msg;
40use crate::config::Config;
41use crate::context::Context;
42use crate::message::{Message, MsgId, Viewtype};
43use crate::mimeparser::SystemMessage;
44use crate::EventType;
45
46const PUBLIC_KEY_LENGTH: usize = 32;
48const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
49
50#[derive(Debug)]
52pub struct Iroh {
53 pub(crate) router: iroh::protocol::Router,
55
56 pub(crate) gossip: Gossip,
58
59 pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
61
62 pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
64
65 pub(crate) public_key: PublicKey,
69}
70
71impl Iroh {
72 pub(crate) async fn network_change(&self) {
74 self.router.endpoint().network_change().await
75 }
76
77 pub(crate) async fn close(self) -> Result<()> {
79 self.router.shutdown().await.context("Closing iroh failed")
80 }
81
82 async fn join_and_subscribe_gossip(
88 &self,
89 ctx: &Context,
90 msg_id: MsgId,
91 ) -> Result<Option<oneshot::Receiver<()>>> {
92 let topic = get_iroh_topic_for_msg(ctx, msg_id)
93 .await?
94 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
95
96 let mut iroh_channels = self.iroh_channels.write().await;
101
102 if iroh_channels.contains_key(&topic) {
103 return Ok(None);
104 }
105
106 let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
107 let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
108
109 info!(
110 ctx,
111 "IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
112 );
113
114 for node_addr in &peers {
116 if !node_addr.is_empty() {
117 self.router.endpoint().add_node_addr(node_addr.clone())?;
118 }
119 }
120
121 let (join_tx, join_rx) = oneshot::channel();
122
123 let (gossip_sender, gossip_receiver) = self
124 .gossip
125 .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
126 .split();
127
128 let ctx = ctx.clone();
129 let subscribe_loop = tokio::spawn(async move {
130 if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
131 warn!(ctx, "subscribe_loop failed: {e}")
132 }
133 });
134
135 iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
136
137 Ok(Some(join_rx))
138 }
139
140 pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
142 if self.iroh_channels.read().await.get(&topic).is_some() {
143 for peer in &peers {
144 self.router.endpoint().add_node_addr(peer.clone())?;
145 }
146
147 self.gossip.subscribe_with_opts(
148 topic,
149 JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
150 );
151 }
152 Ok(())
153 }
154
155 pub async fn send_webxdc_realtime_data(
157 &self,
158 ctx: &Context,
159 msg_id: MsgId,
160 mut data: Vec<u8>,
161 ) -> Result<()> {
162 let topic = get_iroh_topic_for_msg(ctx, msg_id)
163 .await?
164 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
165 self.join_and_subscribe_gossip(ctx, msg_id).await?;
166
167 let seq_num = self.get_and_incr(&topic);
168
169 let mut iroh_channels = self.iroh_channels.write().await;
170 let state = iroh_channels
171 .get_mut(&topic)
172 .context("Just created state does not exist")?;
173 data.extend(seq_num.to_le_bytes());
174 data.extend(self.public_key.as_bytes());
175
176 state.sender.broadcast(data.into()).await?;
177
178 if env::var("REALTIME_DEBUG").is_ok() {
179 info!(ctx, "Sent realtime data");
180 }
181
182 Ok(())
183 }
184
185 fn get_and_incr(&self, topic: &TopicId) -> i32 {
186 let mut sequence_numbers = self.sequence_numbers.lock();
187 let entry = sequence_numbers.entry(*topic).or_default();
188 *entry = entry.wrapping_add(1);
189 *entry
190 }
191
192 pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
194 let mut addr = self.router.endpoint().node_addr().await?;
195 addr.direct_addresses = BTreeSet::new();
196 Ok(addr)
197 }
198
199 pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
201 if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
202 channel.subscribe_loop.abort();
210 let _ = channel.subscribe_loop.await;
211 }
212 Ok(())
213 }
214}
215
216#[derive(Debug)]
218pub(crate) struct ChannelState {
219 subscribe_loop: JoinHandle<()>,
221
222 sender: iroh_gossip::net::GossipSender,
223}
224
225impl ChannelState {
226 fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
227 Self {
228 subscribe_loop,
229 sender,
230 }
231 }
232}
233
234impl Context {
235 async fn init_peer_channels(&self) -> Result<Iroh> {
237 info!(self, "Initializing peer channels.");
238 let secret_key = SecretKey::generate(rand::rngs::OsRng);
239 let public_key = secret_key.public();
240
241 let relay_mode = if let Some(relay_url) = self
242 .metadata
243 .read()
244 .await
245 .as_ref()
246 .and_then(|conf| conf.iroh_relay.clone())
247 {
248 RelayMode::Custom(RelayMap::from_url(RelayUrl::from(relay_url)))
249 } else {
250 RelayMode::Default
253 };
254
255 let endpoint = Endpoint::builder()
256 .secret_key(secret_key)
257 .alpns(vec![GOSSIP_ALPN.to_vec()])
258 .relay_mode(relay_mode)
259 .bind()
260 .await?;
261
262 let gossip = Gossip::builder()
268 .max_message_size(128 * 1024)
269 .spawn(endpoint.clone())
270 .await?;
271
272 let router = iroh::protocol::Router::builder(endpoint)
273 .accept(GOSSIP_ALPN, gossip.clone())
274 .spawn()
275 .await?;
276
277 Ok(Iroh {
278 router,
279 gossip,
280 sequence_numbers: Mutex::new(HashMap::new()),
281 iroh_channels: RwLock::new(HashMap::new()),
282 public_key,
283 })
284 }
285
286 pub async fn get_or_try_init_peer_channel(
288 &self,
289 ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
290 if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
291 bail!("Attempt to get Iroh when realtime is disabled");
292 }
293
294 if let Ok(lock) = tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
295 self.iroh.read().await,
296 |opt_iroh| opt_iroh.as_ref(),
297 ) {
298 return Ok(lock);
299 }
300
301 let lock = self.iroh.write().await;
302 match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
303 lock,
304 |opt_iroh| opt_iroh.as_ref(),
305 ) {
306 Ok(lock) => Ok(lock),
307 Err(mut lock) => {
308 let iroh = self.init_peer_channels().await?;
309 *lock = Some(iroh);
310 tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
311 lock,
312 |opt_iroh| opt_iroh.as_ref(),
313 )
314 .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
315 }
316 }
317 }
318}
319
320pub(crate) async fn iroh_add_peer_for_topic(
322 ctx: &Context,
323 msg_id: MsgId,
324 topic: TopicId,
325 peer: NodeId,
326 relay_server: Option<&str>,
327) -> Result<()> {
328 ctx.sql
329 .execute(
330 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
331 (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
332 )
333 .await?;
334 Ok(())
335}
336
337pub async fn add_gossip_peer_from_header(
339 context: &Context,
340 instance_id: MsgId,
341 node_addr: &str,
342) -> Result<()> {
343 if !context
344 .get_config_bool(Config::WebxdcRealtimeEnabled)
345 .await?
346 {
347 return Ok(());
348 }
349
350 info!(
351 context,
352 "Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
353 );
354 let node_addr =
355 serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
356
357 context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
358 msg_id: instance_id,
359 });
360
361 let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
362 warn!(
363 context,
364 "Could not add iroh peer because {instance_id} has no topic."
365 );
366 return Ok(());
367 };
368
369 let node_id = node_addr.node_id;
370 let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
371 iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
372
373 let iroh = context.get_or_try_init_peer_channel().await?;
374 iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
375 Ok(())
376}
377
378pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
380 ctx.sql
381 .execute(
382 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
383 (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
384 )
385 .await?;
386 Ok(())
387}
388
389async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
391 ctx.sql
392 .query_map(
393 "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
394 (msg_id, PUBLIC_KEY_STUB),
395 |row| {
396 let key: Vec<u8> = row.get(0)?;
397 let server: Option<String> = row.get(1)?;
398 Ok((key, server))
399 },
400 |g| {
401 g.map(|data| {
402 let (key, server) = data?;
403 let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
404 let id = NodeId::from_bytes(&key.try_into()
405 .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
406 Ok::<_, anyhow::Error>(NodeAddr::from_parts(
407 id, server, vec![]
408 ))
409 })
410 .collect::<std::result::Result<Vec<_>, _>>()
411 },
412 )
413 .await
414}
415
416pub(crate) async fn get_iroh_topic_for_msg(
418 ctx: &Context,
419 msg_id: MsgId,
420) -> Result<Option<TopicId>> {
421 if let Some(bytes) = ctx
422 .sql
423 .query_get_value::<Vec<u8>>(
424 "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
425 (msg_id,),
426 )
427 .await
428 .context("Couldn't restore topic from db")?
429 {
430 let topic_id = TopicId::from_bytes(
431 bytes
432 .try_into()
433 .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
434 );
435 Ok(Some(topic_id))
436 } else {
437 Ok(None)
438 }
439}
440
441pub async fn send_webxdc_realtime_advertisement(
444 ctx: &Context,
445 msg_id: MsgId,
446) -> Result<Option<oneshot::Receiver<()>>> {
447 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
448 return Ok(None);
449 }
450
451 let iroh = ctx.get_or_try_init_peer_channel().await?;
452 let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
453
454 let webxdc = Message::load_from_db(ctx, msg_id).await?;
455 let mut msg = Message::new(Viewtype::Text);
456 msg.hidden = true;
457 msg.param.set_cmd(SystemMessage::IrohNodeAddr);
458 msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
459 send_msg(ctx, webxdc.chat_id, &mut msg).await?;
460 info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
461 Ok(conn)
462}
463
464pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
466 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
467 return Ok(());
468 }
469
470 let iroh = ctx.get_or_try_init_peer_channel().await?;
471 iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
472 Ok(())
473}
474
475pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
477 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
478 return Ok(());
479 }
480 let topic = get_iroh_topic_for_msg(ctx, msg_id)
481 .await?
482 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
483 let iroh = ctx.get_or_try_init_peer_channel().await?;
484 iroh.leave_realtime(topic).await?;
485 info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
486
487 Ok(())
488}
489
490fn create_random_topic() -> TopicId {
492 TopicId::from_bytes(rand::random())
493}
494
495pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
498 let topic = create_random_topic();
499 insert_topic_stub(ctx, msg_id, topic).await?;
500 let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
501 Ok(topic_string)
502}
503
504async fn subscribe_loop(
505 context: &Context,
506 mut stream: iroh_gossip::net::GossipReceiver,
507 topic: TopicId,
508 msg_id: MsgId,
509 join_tx: oneshot::Sender<()>,
510) -> Result<()> {
511 let mut join_tx = Some(join_tx);
512
513 while let Some(event) = stream.try_next().await? {
514 match event {
515 Event::Gossip(event) => match event {
516 GossipEvent::Joined(nodes) => {
517 if let Some(join_tx) = join_tx.take() {
518 join_tx.send(()).ok();
521 }
522
523 for node in nodes {
524 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
525 }
526 }
527 GossipEvent::NeighborUp(node) => {
528 info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
529 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
530 }
531 GossipEvent::NeighborDown(_node) => {}
532 GossipEvent::Received(message) => {
533 info!(context, "IROH_REALTIME: Received realtime data");
534 context.emit_event(EventType::WebxdcRealtimeData {
535 msg_id,
536 data: message
537 .content
538 .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
539 .context("too few bytes in iroh message")?
540 .into(),
541 });
542 }
543 },
544 Event::Lagged => {
545 warn!(context, "Gossip lost some messages");
546 }
547 };
548 }
549 Ok(())
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555 use crate::{
556 chat::send_msg,
557 message::{Message, Viewtype},
558 test_utils::TestContextManager,
559 EventType,
560 };
561
562 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
563 async fn test_can_communicate() {
564 let mut tcm = TestContextManager::new();
565 let alice = &mut tcm.alice().await;
566 let bob = &mut tcm.bob().await;
567
568 let alice_chat = alice.create_chat(bob).await;
570 let mut instance = Message::new(Viewtype::File);
571 instance
572 .set_file_from_bytes(
573 alice,
574 "minimal.xdc",
575 include_bytes!("../test-data/webxdc/minimal.xdc"),
576 None,
577 )
578 .unwrap();
579
580 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
581 let alice_webxdc = alice.get_last_msg().await;
582 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
583
584 let webxdc = alice.pop_sent_msg().await;
585 let bob_webxdc = bob.recv_msg(&webxdc).await;
586 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
587
588 bob_webxdc.chat_id.accept(bob).await.unwrap();
589
590 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
592 .await
593 .unwrap();
594
595 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
596 loop {
597 let event = bob.evtracker.recv().await.unwrap();
598 if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
599 assert!(msg_id == alice_webxdc.id);
600 break;
601 }
602 }
603
604 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
606 .await
607 .unwrap()
608 .into_iter()
609 .map(|addr| addr.node_id)
610 .collect::<Vec<_>>();
611
612 assert_eq!(
613 members,
614 vec![
615 alice
616 .get_or_try_init_peer_channel()
617 .await
618 .unwrap()
619 .get_node_addr()
620 .await
621 .unwrap()
622 .node_id
623 ]
624 );
625
626 bob.get_or_try_init_peer_channel()
627 .await
628 .unwrap()
629 .join_and_subscribe_gossip(bob, bob_webxdc.id)
630 .await
631 .unwrap()
632 .unwrap()
633 .await
634 .unwrap();
635
636 alice
638 .get_or_try_init_peer_channel()
639 .await
640 .unwrap()
641 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
642 .await
643 .unwrap();
644
645 loop {
646 let event = bob.evtracker.recv().await.unwrap();
647 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
648 if data == "alice -> bob".as_bytes() {
649 break;
650 } else {
651 panic!(
652 "Unexpected status update: {}",
653 String::from_utf8_lossy(&data)
654 );
655 }
656 }
657 }
658 bob.get_or_try_init_peer_channel()
660 .await
661 .unwrap()
662 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
663 .await
664 .unwrap();
665
666 loop {
667 let event = alice.evtracker.recv().await.unwrap();
668 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
669 if data == "bob -> alice".as_bytes() {
670 break;
671 } else {
672 panic!(
673 "Unexpected status update: {}",
674 String::from_utf8_lossy(&data)
675 );
676 }
677 }
678 }
679
680 let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
682 .await
683 .unwrap()
684 .into_iter()
685 .map(|addr| addr.node_id)
686 .collect::<Vec<_>>();
687
688 assert_eq!(
689 members,
690 vec![
691 bob.get_or_try_init_peer_channel()
692 .await
693 .unwrap()
694 .get_node_addr()
695 .await
696 .unwrap()
697 .node_id
698 ]
699 );
700
701 bob.get_or_try_init_peer_channel()
702 .await
703 .unwrap()
704 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
705 .await
706 .unwrap();
707
708 loop {
709 let event = alice.evtracker.recv().await.unwrap();
710 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
711 if data == "bob -> alice 2".as_bytes() {
712 break;
713 } else {
714 panic!(
715 "Unexpected status update: {}",
716 String::from_utf8_lossy(&data)
717 );
718 }
719 }
720 }
721
722 assert!(alice.iroh.read().await.is_some());
725 alice.stop_io().await;
726 assert!(alice.iroh.read().await.is_none());
727 }
728
729 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
730 async fn test_can_reconnect() {
731 let mut tcm = TestContextManager::new();
732 let alice = &mut tcm.alice().await;
733 let bob = &mut tcm.bob().await;
734
735 assert!(alice
736 .get_config_bool(Config::WebxdcRealtimeEnabled)
737 .await
738 .unwrap());
739 let alice_chat = alice.create_chat(bob).await;
741 let mut instance = Message::new(Viewtype::File);
742 instance
743 .set_file_from_bytes(
744 alice,
745 "minimal.xdc",
746 include_bytes!("../test-data/webxdc/minimal.xdc"),
747 None,
748 )
749 .unwrap();
750
751 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
752 let alice_webxdc = alice.get_last_msg().await;
753 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
754
755 let webxdc = alice.pop_sent_msg().await;
756 let bob_webxdc = bob.recv_msg(&webxdc).await;
757 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
758
759 bob_webxdc.chat_id.accept(bob).await.unwrap();
760
761 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
763 .await
764 .unwrap();
765
766 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
767
768 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
770 .await
771 .unwrap()
772 .into_iter()
773 .map(|addr| addr.node_id)
774 .collect::<Vec<_>>();
775
776 assert_eq!(
777 members,
778 vec![
779 alice
780 .get_or_try_init_peer_channel()
781 .await
782 .unwrap()
783 .get_node_addr()
784 .await
785 .unwrap()
786 .node_id
787 ]
788 );
789
790 bob.get_or_try_init_peer_channel()
791 .await
792 .unwrap()
793 .join_and_subscribe_gossip(bob, bob_webxdc.id)
794 .await
795 .unwrap()
796 .unwrap()
797 .await
798 .unwrap();
799
800 alice
802 .get_or_try_init_peer_channel()
803 .await
804 .unwrap()
805 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
806 .await
807 .unwrap();
808
809 loop {
810 let event = bob.evtracker.recv().await.unwrap();
811 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
812 if data == "alice -> bob".as_bytes() {
813 break;
814 } else {
815 panic!(
816 "Unexpected status update: {}",
817 String::from_utf8_lossy(&data)
818 );
819 }
820 }
821 }
822
823 let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
824 .await
825 .unwrap()
826 .unwrap();
827 let bob_sequence_number = bob
828 .iroh
829 .read()
830 .await
831 .as_ref()
832 .unwrap()
833 .sequence_numbers
834 .lock()
835 .get(&bob_topic)
836 .copied();
837 leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
838 let bob_sequence_number_after = bob
839 .iroh
840 .read()
841 .await
842 .as_ref()
843 .unwrap()
844 .sequence_numbers
845 .lock()
846 .get(&bob_topic)
847 .copied();
848 assert_eq!(bob_sequence_number, bob_sequence_number_after);
850
851 bob.get_or_try_init_peer_channel()
852 .await
853 .unwrap()
854 .join_and_subscribe_gossip(bob, bob_webxdc.id)
855 .await
856 .unwrap()
857 .unwrap()
858 .await
859 .unwrap();
860
861 bob.get_or_try_init_peer_channel()
862 .await
863 .unwrap()
864 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
865 .await
866 .unwrap();
867
868 loop {
869 let event = alice.evtracker.recv().await.unwrap();
870 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
871 if data == "bob -> alice".as_bytes() {
872 break;
873 } else {
874 panic!(
875 "Unexpected status update: {}",
876 String::from_utf8_lossy(&data)
877 );
878 }
879 }
880 }
881
882 assert_eq!(
886 alice
887 .iroh
888 .read()
889 .await
890 .as_ref()
891 .unwrap()
892 .iroh_channels
893 .read()
894 .await
895 .len(),
896 1
897 );
898 leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
899 let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
900 .await
901 .unwrap()
902 .unwrap();
903 assert!(alice
904 .iroh
905 .read()
906 .await
907 .as_ref()
908 .unwrap()
909 .iroh_channels
910 .read()
911 .await
912 .get(&topic)
913 .is_none());
914 }
915
916 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
917 async fn test_parallel_connect() {
918 let mut tcm = TestContextManager::new();
919 let alice = &mut tcm.alice().await;
920 let bob = &mut tcm.bob().await;
921
922 let alice_chat = alice.create_chat(bob).await;
924 let mut instance = Message::new(Viewtype::File);
925 instance
926 .set_file_from_bytes(
927 alice,
928 "minimal.xdc",
929 include_bytes!("../test-data/webxdc/minimal.xdc"),
930 None,
931 )
932 .unwrap();
933 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
934 let alice_webxdc = alice.get_last_msg().await;
935
936 let webxdc = alice.pop_sent_msg().await;
937 let bob_webxdc = bob.recv_msg(&webxdc).await;
938 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
939
940 bob_webxdc.chat_id.accept(bob).await.unwrap();
941
942 eprintln!("Sending advertisements");
943 let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
945 .await
946 .unwrap()
947 .unwrap();
948 let alice_advertisement = alice.pop_sent_msg().await;
949
950 send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
951 .await
952 .unwrap();
953 let bob_advertisement = bob.pop_sent_msg().await;
954
955 eprintln!("Receiving advertisements");
956 bob.recv_msg_trash(&alice_advertisement).await;
957 alice.recv_msg_trash(&bob_advertisement).await;
958
959 eprintln!("Alice waits for connection");
960 alice_advertisement_future.await.unwrap();
961
962 eprintln!("Sending ephemeral message");
964 send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
965 .await
966 .unwrap();
967
968 eprintln!("Waiting for ephemeral message");
969 loop {
970 let event = bob.evtracker.recv().await.unwrap();
971 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
972 if data == b"alice -> bob" {
973 break;
974 } else {
975 panic!(
976 "Unexpected status update: {}",
977 String::from_utf8_lossy(&data)
978 );
979 }
980 }
981 }
982 }
983
984 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
985 async fn test_peer_channels_disabled() {
986 let mut tcm = TestContextManager::new();
987 let alice = &mut tcm.alice().await;
988
989 alice
990 .set_config_bool(Config::WebxdcRealtimeEnabled, false)
991 .await
992 .unwrap();
993
994 send_webxdc_realtime_advertisement(alice, MsgId::new(1))
996 .await
997 .unwrap();
998
999 assert!(alice.ctx.iroh.read().await.is_none());
1000
1001 send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1003 .await
1004 .unwrap();
1005
1006 assert!(alice.ctx.iroh.read().await.is_none());
1007
1008 leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1010
1011 assert!(alice.ctx.iroh.read().await.is_none());
1012
1013 assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1016 }
1017}