1use anyhow::{anyhow, bail, Context as _, Result};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{oneshot, RwLock};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::chat::send_msg;
40use crate::config::Config;
41use crate::context::Context;
42use crate::log::{info, warn};
43use crate::message::{Message, MsgId, Viewtype};
44use crate::mimeparser::SystemMessage;
45use crate::EventType;
46
47const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51#[derive(Debug)]
53pub struct Iroh {
54 pub(crate) router: iroh::protocol::Router,
56
57 pub(crate) gossip: Gossip,
59
60 pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63 pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66 pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73 pub(crate) async fn network_change(&self) {
75 self.router.endpoint().network_change().await
76 }
77
78 pub(crate) async fn close(self) -> Result<()> {
80 self.router.shutdown().await.context("Closing iroh failed")
81 }
82
83 async fn join_and_subscribe_gossip(
89 &self,
90 ctx: &Context,
91 msg_id: MsgId,
92 ) -> Result<Option<oneshot::Receiver<()>>> {
93 let topic = get_iroh_topic_for_msg(ctx, msg_id)
94 .await?
95 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97 let mut iroh_channels = self.iroh_channels.write().await;
102
103 if iroh_channels.contains_key(&topic) {
104 return Ok(None);
105 }
106
107 let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108 let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110 info!(
111 ctx,
112 "IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
113 );
114
115 for node_addr in &peers {
117 if !node_addr.is_empty() {
118 self.router.endpoint().add_node_addr(node_addr.clone())?;
119 }
120 }
121
122 let (join_tx, join_rx) = oneshot::channel();
123
124 let (gossip_sender, gossip_receiver) = self
125 .gossip
126 .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127 .split();
128
129 let ctx = ctx.clone();
130 let subscribe_loop = tokio::spawn(async move {
131 if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132 warn!(ctx, "subscribe_loop failed: {e}")
133 }
134 });
135
136 iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138 Ok(Some(join_rx))
139 }
140
141 pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
143 if self.iroh_channels.read().await.get(&topic).is_some() {
144 for peer in &peers {
145 self.router.endpoint().add_node_addr(peer.clone())?;
146 }
147
148 self.gossip.subscribe_with_opts(
149 topic,
150 JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
151 );
152 }
153 Ok(())
154 }
155
156 pub async fn send_webxdc_realtime_data(
158 &self,
159 ctx: &Context,
160 msg_id: MsgId,
161 mut data: Vec<u8>,
162 ) -> Result<()> {
163 let topic = get_iroh_topic_for_msg(ctx, msg_id)
164 .await?
165 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
166 self.join_and_subscribe_gossip(ctx, msg_id).await?;
167
168 let seq_num = self.get_and_incr(&topic);
169
170 let mut iroh_channels = self.iroh_channels.write().await;
171 let state = iroh_channels
172 .get_mut(&topic)
173 .context("Just created state does not exist")?;
174 data.extend(seq_num.to_le_bytes());
175 data.extend(self.public_key.as_bytes());
176
177 state.sender.broadcast(data.into()).await?;
178
179 if env::var("REALTIME_DEBUG").is_ok() {
180 info!(ctx, "Sent realtime data");
181 }
182
183 Ok(())
184 }
185
186 fn get_and_incr(&self, topic: &TopicId) -> i32 {
187 let mut sequence_numbers = self.sequence_numbers.lock();
188 let entry = sequence_numbers.entry(*topic).or_default();
189 *entry = entry.wrapping_add(1);
190 *entry
191 }
192
193 pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
195 let mut addr = self.router.endpoint().node_addr().await?;
196 addr.direct_addresses = BTreeSet::new();
197 Ok(addr)
198 }
199
200 pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
202 if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
203 channel.subscribe_loop.abort();
211 let _ = channel.subscribe_loop.await;
212 }
213 Ok(())
214 }
215}
216
217#[derive(Debug)]
219pub(crate) struct ChannelState {
220 subscribe_loop: JoinHandle<()>,
222
223 sender: iroh_gossip::net::GossipSender,
224}
225
226impl ChannelState {
227 fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
228 Self {
229 subscribe_loop,
230 sender,
231 }
232 }
233}
234
235impl Context {
236 async fn init_peer_channels(&self) -> Result<Iroh> {
238 info!(self, "Initializing peer channels.");
239 let secret_key = SecretKey::generate(rand::rngs::OsRng);
240 let public_key = secret_key.public();
241
242 let relay_mode = if let Some(relay_url) = self
243 .metadata
244 .read()
245 .await
246 .as_ref()
247 .and_then(|conf| conf.iroh_relay.clone())
248 {
249 RelayMode::Custom(RelayUrl::from(relay_url).into())
250 } else {
251 RelayMode::Default
254 };
255
256 let endpoint = Endpoint::builder()
257 .tls_x509() .secret_key(secret_key)
259 .alpns(vec![GOSSIP_ALPN.to_vec()])
260 .relay_mode(relay_mode)
261 .bind()
262 .await?;
263
264 let gossip = Gossip::builder()
270 .max_message_size(128 * 1024)
271 .spawn(endpoint.clone())
272 .await?;
273
274 let router = iroh::protocol::Router::builder(endpoint)
275 .accept(GOSSIP_ALPN, gossip.clone())
276 .spawn();
277
278 Ok(Iroh {
279 router,
280 gossip,
281 sequence_numbers: Mutex::new(HashMap::new()),
282 iroh_channels: RwLock::new(HashMap::new()),
283 public_key,
284 })
285 }
286
287 pub async fn get_or_try_init_peer_channel(
289 &self,
290 ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
291 if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
292 bail!("Attempt to get Iroh when realtime is disabled");
293 }
294
295 if let Ok(lock) = tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
296 self.iroh.read().await,
297 |opt_iroh| opt_iroh.as_ref(),
298 ) {
299 return Ok(lock);
300 }
301
302 let lock = self.iroh.write().await;
303 match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
304 lock,
305 |opt_iroh| opt_iroh.as_ref(),
306 ) {
307 Ok(lock) => Ok(lock),
308 Err(mut lock) => {
309 let iroh = self.init_peer_channels().await?;
310 *lock = Some(iroh);
311 tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
312 lock,
313 |opt_iroh| opt_iroh.as_ref(),
314 )
315 .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
316 }
317 }
318 }
319}
320
321pub(crate) async fn iroh_add_peer_for_topic(
323 ctx: &Context,
324 msg_id: MsgId,
325 topic: TopicId,
326 peer: NodeId,
327 relay_server: Option<&str>,
328) -> Result<()> {
329 ctx.sql
330 .execute(
331 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
332 (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
333 )
334 .await?;
335 Ok(())
336}
337
338pub async fn add_gossip_peer_from_header(
340 context: &Context,
341 instance_id: MsgId,
342 node_addr: &str,
343) -> Result<()> {
344 if !context
345 .get_config_bool(Config::WebxdcRealtimeEnabled)
346 .await?
347 {
348 return Ok(());
349 }
350
351 info!(
352 context,
353 "Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
354 );
355 let node_addr =
356 serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
357
358 context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
359 msg_id: instance_id,
360 });
361
362 let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
363 warn!(
364 context,
365 "Could not add iroh peer because {instance_id} has no topic."
366 );
367 return Ok(());
368 };
369
370 let node_id = node_addr.node_id;
371 let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
372 iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
373
374 let iroh = context.get_or_try_init_peer_channel().await?;
375 iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
376 Ok(())
377}
378
379pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
381 ctx.sql
382 .execute(
383 "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
384 (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
385 )
386 .await?;
387 Ok(())
388}
389
390async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
392 ctx.sql
393 .query_map(
394 "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
395 (msg_id, PUBLIC_KEY_STUB),
396 |row| {
397 let key: Vec<u8> = row.get(0)?;
398 let server: Option<String> = row.get(1)?;
399 Ok((key, server))
400 },
401 |g| {
402 g.map(|data| {
403 let (key, server) = data?;
404 let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
405 let id = NodeId::from_bytes(&key.try_into()
406 .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
407 Ok::<_, anyhow::Error>(NodeAddr::from_parts(
408 id, server, vec![]
409 ))
410 })
411 .collect::<std::result::Result<Vec<_>, _>>()
412 },
413 )
414 .await
415}
416
417pub(crate) async fn get_iroh_topic_for_msg(
419 ctx: &Context,
420 msg_id: MsgId,
421) -> Result<Option<TopicId>> {
422 if let Some(bytes) = ctx
423 .sql
424 .query_get_value::<Vec<u8>>(
425 "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
426 (msg_id,),
427 )
428 .await
429 .context("Couldn't restore topic from db")?
430 {
431 let topic_id = TopicId::from_bytes(
432 bytes
433 .try_into()
434 .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
435 );
436 Ok(Some(topic_id))
437 } else {
438 Ok(None)
439 }
440}
441
442pub async fn send_webxdc_realtime_advertisement(
445 ctx: &Context,
446 msg_id: MsgId,
447) -> Result<Option<oneshot::Receiver<()>>> {
448 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
449 return Ok(None);
450 }
451
452 let iroh = ctx.get_or_try_init_peer_channel().await?;
453 let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
454
455 let webxdc = Message::load_from_db(ctx, msg_id).await?;
456 let mut msg = Message::new(Viewtype::Text);
457 msg.hidden = true;
458 msg.param.set_cmd(SystemMessage::IrohNodeAddr);
459 msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
460 send_msg(ctx, webxdc.chat_id, &mut msg).await?;
461 info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
462 Ok(conn)
463}
464
465pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
467 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
468 return Ok(());
469 }
470
471 let iroh = ctx.get_or_try_init_peer_channel().await?;
472 iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
473 Ok(())
474}
475
476pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
478 if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
479 return Ok(());
480 }
481 let topic = get_iroh_topic_for_msg(ctx, msg_id)
482 .await?
483 .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
484 let iroh = ctx.get_or_try_init_peer_channel().await?;
485 iroh.leave_realtime(topic).await?;
486 info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
487
488 Ok(())
489}
490
491fn create_random_topic() -> TopicId {
493 TopicId::from_bytes(rand::random())
494}
495
496pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
499 let topic = create_random_topic();
500 insert_topic_stub(ctx, msg_id, topic).await?;
501 let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
502 Ok(topic_string)
503}
504
505async fn subscribe_loop(
506 context: &Context,
507 mut stream: iroh_gossip::net::GossipReceiver,
508 topic: TopicId,
509 msg_id: MsgId,
510 join_tx: oneshot::Sender<()>,
511) -> Result<()> {
512 let mut join_tx = Some(join_tx);
513
514 while let Some(event) = stream.try_next().await? {
515 match event {
516 Event::Gossip(event) => match event {
517 GossipEvent::Joined(nodes) => {
518 if let Some(join_tx) = join_tx.take() {
519 join_tx.send(()).ok();
522 }
523
524 for node in nodes {
525 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
526 }
527 }
528 GossipEvent::NeighborUp(node) => {
529 info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
530 iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
531 }
532 GossipEvent::NeighborDown(_node) => {}
533 GossipEvent::Received(message) => {
534 info!(context, "IROH_REALTIME: Received realtime data");
535 context.emit_event(EventType::WebxdcRealtimeData {
536 msg_id,
537 data: message
538 .content
539 .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
540 .context("too few bytes in iroh message")?
541 .into(),
542 });
543 }
544 },
545 Event::Lagged => {
546 warn!(context, "Gossip lost some messages");
547 }
548 };
549 }
550 Ok(())
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use crate::{
557 chat::send_msg,
558 message::{Message, Viewtype},
559 test_utils::TestContextManager,
560 EventType,
561 };
562
563 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
564 async fn test_can_communicate() {
565 let mut tcm = TestContextManager::new();
566 let alice = &mut tcm.alice().await;
567 let bob = &mut tcm.bob().await;
568
569 let alice_chat = alice.create_chat(bob).await;
571 let mut instance = Message::new(Viewtype::File);
572 instance
573 .set_file_from_bytes(
574 alice,
575 "minimal.xdc",
576 include_bytes!("../test-data/webxdc/minimal.xdc"),
577 None,
578 )
579 .unwrap();
580
581 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
582 let alice_webxdc = alice.get_last_msg().await;
583 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
584
585 let webxdc = alice.pop_sent_msg().await;
586 let bob_webxdc = bob.recv_msg(&webxdc).await;
587 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
588
589 bob_webxdc.chat_id.accept(bob).await.unwrap();
590
591 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
593 .await
594 .unwrap();
595
596 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
597 loop {
598 let event = bob.evtracker.recv().await.unwrap();
599 if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
600 assert!(msg_id == alice_webxdc.id);
601 break;
602 }
603 }
604
605 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
607 .await
608 .unwrap()
609 .into_iter()
610 .map(|addr| addr.node_id)
611 .collect::<Vec<_>>();
612
613 assert_eq!(
614 members,
615 vec![
616 alice
617 .get_or_try_init_peer_channel()
618 .await
619 .unwrap()
620 .get_node_addr()
621 .await
622 .unwrap()
623 .node_id
624 ]
625 );
626
627 bob.get_or_try_init_peer_channel()
628 .await
629 .unwrap()
630 .join_and_subscribe_gossip(bob, bob_webxdc.id)
631 .await
632 .unwrap()
633 .unwrap()
634 .await
635 .unwrap();
636
637 alice
639 .get_or_try_init_peer_channel()
640 .await
641 .unwrap()
642 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
643 .await
644 .unwrap();
645
646 loop {
647 let event = bob.evtracker.recv().await.unwrap();
648 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
649 if data == "alice -> bob".as_bytes() {
650 break;
651 } else {
652 panic!(
653 "Unexpected status update: {}",
654 String::from_utf8_lossy(&data)
655 );
656 }
657 }
658 }
659 bob.get_or_try_init_peer_channel()
661 .await
662 .unwrap()
663 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
664 .await
665 .unwrap();
666
667 loop {
668 let event = alice.evtracker.recv().await.unwrap();
669 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
670 if data == "bob -> alice".as_bytes() {
671 break;
672 } else {
673 panic!(
674 "Unexpected status update: {}",
675 String::from_utf8_lossy(&data)
676 );
677 }
678 }
679 }
680
681 let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
683 .await
684 .unwrap()
685 .into_iter()
686 .map(|addr| addr.node_id)
687 .collect::<Vec<_>>();
688
689 assert_eq!(
690 members,
691 vec![
692 bob.get_or_try_init_peer_channel()
693 .await
694 .unwrap()
695 .get_node_addr()
696 .await
697 .unwrap()
698 .node_id
699 ]
700 );
701
702 bob.get_or_try_init_peer_channel()
703 .await
704 .unwrap()
705 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
706 .await
707 .unwrap();
708
709 loop {
710 let event = alice.evtracker.recv().await.unwrap();
711 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
712 if data == "bob -> alice 2".as_bytes() {
713 break;
714 } else {
715 panic!(
716 "Unexpected status update: {}",
717 String::from_utf8_lossy(&data)
718 );
719 }
720 }
721 }
722
723 assert!(alice.iroh.read().await.is_some());
726 alice.stop_io().await;
727 assert!(alice.iroh.read().await.is_none());
728 }
729
730 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
731 async fn test_can_reconnect() {
732 let mut tcm = TestContextManager::new();
733 let alice = &mut tcm.alice().await;
734 let bob = &mut tcm.bob().await;
735
736 assert!(alice
737 .get_config_bool(Config::WebxdcRealtimeEnabled)
738 .await
739 .unwrap());
740 let alice_chat = alice.create_chat(bob).await;
742 let mut instance = Message::new(Viewtype::File);
743 instance
744 .set_file_from_bytes(
745 alice,
746 "minimal.xdc",
747 include_bytes!("../test-data/webxdc/minimal.xdc"),
748 None,
749 )
750 .unwrap();
751
752 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
753 let alice_webxdc = alice.get_last_msg().await;
754 assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
755
756 let webxdc = alice.pop_sent_msg().await;
757 let bob_webxdc = bob.recv_msg(&webxdc).await;
758 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
759
760 bob_webxdc.chat_id.accept(bob).await.unwrap();
761
762 send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
764 .await
765 .unwrap();
766
767 bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
768
769 let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
771 .await
772 .unwrap()
773 .into_iter()
774 .map(|addr| addr.node_id)
775 .collect::<Vec<_>>();
776
777 assert_eq!(
778 members,
779 vec![
780 alice
781 .get_or_try_init_peer_channel()
782 .await
783 .unwrap()
784 .get_node_addr()
785 .await
786 .unwrap()
787 .node_id
788 ]
789 );
790
791 bob.get_or_try_init_peer_channel()
792 .await
793 .unwrap()
794 .join_and_subscribe_gossip(bob, bob_webxdc.id)
795 .await
796 .unwrap()
797 .unwrap()
798 .await
799 .unwrap();
800
801 alice
803 .get_or_try_init_peer_channel()
804 .await
805 .unwrap()
806 .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
807 .await
808 .unwrap();
809
810 loop {
811 let event = bob.evtracker.recv().await.unwrap();
812 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
813 if data == "alice -> bob".as_bytes() {
814 break;
815 } else {
816 panic!(
817 "Unexpected status update: {}",
818 String::from_utf8_lossy(&data)
819 );
820 }
821 }
822 }
823
824 let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
825 .await
826 .unwrap()
827 .unwrap();
828 let bob_sequence_number = bob
829 .iroh
830 .read()
831 .await
832 .as_ref()
833 .unwrap()
834 .sequence_numbers
835 .lock()
836 .get(&bob_topic)
837 .copied();
838 leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
839 let bob_sequence_number_after = bob
840 .iroh
841 .read()
842 .await
843 .as_ref()
844 .unwrap()
845 .sequence_numbers
846 .lock()
847 .get(&bob_topic)
848 .copied();
849 assert_eq!(bob_sequence_number, bob_sequence_number_after);
851
852 bob.get_or_try_init_peer_channel()
853 .await
854 .unwrap()
855 .join_and_subscribe_gossip(bob, bob_webxdc.id)
856 .await
857 .unwrap()
858 .unwrap()
859 .await
860 .unwrap();
861
862 bob.get_or_try_init_peer_channel()
863 .await
864 .unwrap()
865 .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
866 .await
867 .unwrap();
868
869 loop {
870 let event = alice.evtracker.recv().await.unwrap();
871 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
872 if data == "bob -> alice".as_bytes() {
873 break;
874 } else {
875 panic!(
876 "Unexpected status update: {}",
877 String::from_utf8_lossy(&data)
878 );
879 }
880 }
881 }
882
883 assert_eq!(
887 alice
888 .iroh
889 .read()
890 .await
891 .as_ref()
892 .unwrap()
893 .iroh_channels
894 .read()
895 .await
896 .len(),
897 1
898 );
899 leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
900 let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
901 .await
902 .unwrap()
903 .unwrap();
904 assert!(alice
905 .iroh
906 .read()
907 .await
908 .as_ref()
909 .unwrap()
910 .iroh_channels
911 .read()
912 .await
913 .get(&topic)
914 .is_none());
915 }
916
917 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
918 async fn test_parallel_connect() {
919 let mut tcm = TestContextManager::new();
920 let alice = &mut tcm.alice().await;
921 let bob = &mut tcm.bob().await;
922
923 let alice_chat = alice.create_chat(bob).await;
925 let mut instance = Message::new(Viewtype::File);
926 instance
927 .set_file_from_bytes(
928 alice,
929 "minimal.xdc",
930 include_bytes!("../test-data/webxdc/minimal.xdc"),
931 None,
932 )
933 .unwrap();
934 send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
935 let alice_webxdc = alice.get_last_msg().await;
936
937 let webxdc = alice.pop_sent_msg().await;
938 let bob_webxdc = bob.recv_msg(&webxdc).await;
939 assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
940
941 bob_webxdc.chat_id.accept(bob).await.unwrap();
942
943 eprintln!("Sending advertisements");
944 let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
946 .await
947 .unwrap()
948 .unwrap();
949 let alice_advertisement = alice.pop_sent_msg().await;
950
951 send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
952 .await
953 .unwrap();
954 let bob_advertisement = bob.pop_sent_msg().await;
955
956 eprintln!("Receiving advertisements");
957 bob.recv_msg_trash(&alice_advertisement).await;
958 alice.recv_msg_trash(&bob_advertisement).await;
959
960 eprintln!("Alice waits for connection");
961 alice_advertisement_future.await.unwrap();
962
963 eprintln!("Sending ephemeral message");
965 send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
966 .await
967 .unwrap();
968
969 eprintln!("Waiting for ephemeral message");
970 loop {
971 let event = bob.evtracker.recv().await.unwrap();
972 if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
973 if data == b"alice -> bob" {
974 break;
975 } else {
976 panic!(
977 "Unexpected status update: {}",
978 String::from_utf8_lossy(&data)
979 );
980 }
981 }
982 }
983 }
984
985 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
986 async fn test_peer_channels_disabled() {
987 let mut tcm = TestContextManager::new();
988 let alice = &mut tcm.alice().await;
989
990 alice
991 .set_config_bool(Config::WebxdcRealtimeEnabled, false)
992 .await
993 .unwrap();
994
995 send_webxdc_realtime_advertisement(alice, MsgId::new(1))
997 .await
998 .unwrap();
999
1000 assert!(alice.ctx.iroh.read().await.is_none());
1001
1002 send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1004 .await
1005 .unwrap();
1006
1007 assert!(alice.ctx.iroh.read().await.is_none());
1008
1009 leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1011
1012 assert!(alice.ctx.iroh.read().await.is_none());
1013
1014 assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1017 }
1018}