deltachat/
peer_channels.rs

1//! Peer channels for realtime communication in webxdcs.
2//!
3//! We use Iroh as an ephemeral peer channels provider to create direct communication
4//! channels between webxdcs. See [here](https://webxdc.org/docs/spec/joinRealtimeChannel.html) for the webxdc specs.
5//!
6//! Ephemeral channels should be established lazily, to avoid bootstrapping p2p connectivity
7//! when it's not required. Only when a webxdc subscribes to realtime data or when a reatlime message is sent,
8//! the p2p machinery should be started.
9//!
10//! Adding peer channels to webxdc needs upfront negotiation of a topic and sharing of public keys so that
11//! nodes can connect to each other. The explicit approach is as follows:
12//!
13//! 1. We introduce a new [`IrohGossipTopic`](crate::headerdef::HeaderDef::IrohGossipTopic) message header with a random 32-byte TopicId,
14//!    securely generated on the initial webxdc sender's device. This message header is encrypted
15//!    and sent in the same message as the webxdc application.
16//! 2. Whenever `joinRealtimeChannel().setListener()` or `joinRealtimeChannel().send()` is called by the webxdc application,
17//!    we start a routine to establish p2p connectivity and join the gossip swarm with Iroh.
18//! 3. The first step of this routine is to introduce yourself with a regular message containing the [`IrohNodeAddr`](crate::headerdef::HeaderDef::IrohNodeAddr).
19//!    This message contains the users relay-server and public key.
20//!    Direct IP address is not included as this information can be persisted by email providers.
21//! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet).
22//! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database
23//!    (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()`
24//!    and `joinRealtimeChannel().send()` just like the other peers.
25
26use anyhow::{Context as _, Result, anyhow, bail};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{RwLock, oneshot};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::EventType;
40use crate::chat::send_msg;
41use crate::config::Config;
42use crate::context::Context;
43use crate::log::{info, warn};
44use crate::message::{Message, MsgId, Viewtype};
45use crate::mimeparser::SystemMessage;
46
47/// The length of an ed25519 `PublicKey`, in bytes.
48const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51/// Store iroh peer channels for the context.
52#[derive(Debug)]
53pub struct Iroh {
54    /// iroh router  needed for iroh peer channels.
55    pub(crate) router: iroh::protocol::Router,
56
57    /// [Gossip] needed for iroh peer channels.
58    pub(crate) gossip: Gossip,
59
60    /// Sequence numbers for gossip channels.
61    pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63    /// Topics for which an advertisement has already been sent.
64    pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66    /// Currently used Iroh public key.
67    ///
68    /// This is attached to every message to work around `iroh_gossip` deduplication.
69    pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73    /// Notify the endpoint that the network has changed.
74    pub(crate) async fn network_change(&self) {
75        self.router.endpoint().network_change().await
76    }
77
78    /// Closes the QUIC endpoint.
79    pub(crate) async fn close(self) -> Result<()> {
80        self.router.shutdown().await.context("Closing iroh failed")
81    }
82
83    /// Join a topic and create the subscriber loop for it.
84    ///
85    /// If there is no gossip, create it.
86    ///
87    /// The returned future resolves when the swarm becomes operational.
88    async fn join_and_subscribe_gossip(
89        &self,
90        ctx: &Context,
91        msg_id: MsgId,
92    ) -> Result<Option<oneshot::Receiver<()>>> {
93        let topic = get_iroh_topic_for_msg(ctx, msg_id)
94            .await?
95            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97        // Take exclusive lock to make sure
98        // no other thread can create a second gossip subscription
99        // after we check that it does not exist and before we create a new one.
100        // Otherwise we would receive every message twice or more times.
101        let mut iroh_channels = self.iroh_channels.write().await;
102
103        if iroh_channels.contains_key(&topic) {
104            return Ok(None);
105        }
106
107        let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108        let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110        info!(
111            ctx,
112            "IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
113        );
114
115        // Inform iroh of potentially new node addresses
116        for node_addr in &peers {
117            if !node_addr.is_empty() {
118                self.router.endpoint().add_node_addr(node_addr.clone())?;
119            }
120        }
121
122        let (join_tx, join_rx) = oneshot::channel();
123
124        let (gossip_sender, gossip_receiver) = self
125            .gossip
126            .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127            .split();
128
129        let ctx = ctx.clone();
130        let subscribe_loop = tokio::spawn(async move {
131            if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132                warn!(ctx, "subscribe_loop failed: {e}")
133            }
134        });
135
136        iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138        Ok(Some(join_rx))
139    }
140
141    /// Add gossip peers to realtime channel if it is already active.
142    pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
143        if self.iroh_channels.read().await.get(&topic).is_some() {
144            for peer in &peers {
145                self.router.endpoint().add_node_addr(peer.clone())?;
146            }
147
148            self.gossip.subscribe_with_opts(
149                topic,
150                JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
151            );
152        }
153        Ok(())
154    }
155
156    /// Send realtime data to the gossip swarm.
157    pub async fn send_webxdc_realtime_data(
158        &self,
159        ctx: &Context,
160        msg_id: MsgId,
161        mut data: Vec<u8>,
162    ) -> Result<()> {
163        let topic = get_iroh_topic_for_msg(ctx, msg_id)
164            .await?
165            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
166        self.join_and_subscribe_gossip(ctx, msg_id).await?;
167
168        let seq_num = self.get_and_incr(&topic);
169
170        let mut iroh_channels = self.iroh_channels.write().await;
171        let state = iroh_channels
172            .get_mut(&topic)
173            .context("Just created state does not exist")?;
174        data.extend(seq_num.to_le_bytes());
175        data.extend(self.public_key.as_bytes());
176
177        state.sender.broadcast(data.into()).await?;
178
179        if env::var("REALTIME_DEBUG").is_ok() {
180            info!(ctx, "Sent realtime data");
181        }
182
183        Ok(())
184    }
185
186    fn get_and_incr(&self, topic: &TopicId) -> i32 {
187        let mut sequence_numbers = self.sequence_numbers.lock();
188        let entry = sequence_numbers.entry(*topic).or_default();
189        *entry = entry.wrapping_add(1);
190        *entry
191    }
192
193    /// Get the iroh [NodeAddr] without direct IP addresses.
194    pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
195        let mut addr = self.router.endpoint().node_addr().await?;
196        addr.direct_addresses = BTreeSet::new();
197        Ok(addr)
198    }
199
200    /// Leave the realtime channel for a given topic.
201    pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
202        if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
203            // Dropping the last GossipTopic results in quitting the topic.
204            // It is split into GossipReceiver and GossipSender.
205            // GossipSender (`channel.sender`) is dropped automatically.
206
207            // Subscribe loop owns GossipReceiver.
208            // Aborting it and waiting for it to be dropped
209            // drops the receiver.
210            channel.subscribe_loop.abort();
211            let _ = channel.subscribe_loop.await;
212        }
213        Ok(())
214    }
215}
216
217/// Single gossip channel state.
218#[derive(Debug)]
219pub(crate) struct ChannelState {
220    /// The subscribe loop handle.
221    subscribe_loop: JoinHandle<()>,
222
223    sender: iroh_gossip::net::GossipSender,
224}
225
226impl ChannelState {
227    fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
228        Self {
229            subscribe_loop,
230            sender,
231        }
232    }
233}
234
235impl Context {
236    /// Create iroh endpoint and gossip.
237    async fn init_peer_channels(&self) -> Result<Iroh> {
238        info!(self, "Initializing peer channels.");
239        let secret_key = SecretKey::generate(rand::rngs::OsRng);
240        let public_key = secret_key.public();
241
242        let relay_mode = if let Some(relay_url) = self
243            .metadata
244            .read()
245            .await
246            .as_ref()
247            .and_then(|conf| conf.iroh_relay.clone())
248        {
249            RelayMode::Custom(RelayUrl::from(relay_url).into())
250        } else {
251            // FIXME: this should be RelayMode::Disabled instead.
252            // Currently using default relays because otherwise Rust tests fail.
253            RelayMode::Default
254        };
255
256        let endpoint = Endpoint::builder()
257            .tls_x509() // For compatibility with iroh <0.34.0
258            .secret_key(secret_key)
259            .alpns(vec![GOSSIP_ALPN.to_vec()])
260            .relay_mode(relay_mode)
261            .bind()
262            .await?;
263
264        // create gossip
265        // Allow messages up to 128 KB in size.
266        // We set the limit to 128 KiB to account for internal overhead,
267        // but only guarantee 128 KB of payload to WebXDC developers.
268
269        let gossip = Gossip::builder()
270            .max_message_size(128 * 1024)
271            .spawn(endpoint.clone())
272            .await?;
273
274        let router = iroh::protocol::Router::builder(endpoint)
275            .accept(GOSSIP_ALPN, gossip.clone())
276            .spawn();
277
278        Ok(Iroh {
279            router,
280            gossip,
281            sequence_numbers: Mutex::new(HashMap::new()),
282            iroh_channels: RwLock::new(HashMap::new()),
283            public_key,
284        })
285    }
286
287    /// Get or initialize the iroh peer channel.
288    pub async fn get_or_try_init_peer_channel(
289        &self,
290    ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
291        if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
292            bail!("Attempt to get Iroh when realtime is disabled");
293        }
294
295        if let Ok(lock) = tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
296            self.iroh.read().await,
297            |opt_iroh| opt_iroh.as_ref(),
298        ) {
299            return Ok(lock);
300        }
301
302        let lock = self.iroh.write().await;
303        match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
304            lock,
305            |opt_iroh| opt_iroh.as_ref(),
306        ) {
307            Ok(lock) => Ok(lock),
308            Err(mut lock) => {
309                let iroh = self.init_peer_channels().await?;
310                *lock = Some(iroh);
311                tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
312                    lock,
313                    |opt_iroh| opt_iroh.as_ref(),
314                )
315                .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
316            }
317        }
318    }
319}
320
321/// Cache a peers [NodeId] for one topic.
322pub(crate) async fn iroh_add_peer_for_topic(
323    ctx: &Context,
324    msg_id: MsgId,
325    topic: TopicId,
326    peer: NodeId,
327    relay_server: Option<&str>,
328) -> Result<()> {
329    ctx.sql
330        .execute(
331            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
332            (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
333        )
334        .await?;
335    Ok(())
336}
337
338/// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`.
339pub async fn add_gossip_peer_from_header(
340    context: &Context,
341    instance_id: MsgId,
342    node_addr: &str,
343) -> Result<()> {
344    if !context
345        .get_config_bool(Config::WebxdcRealtimeEnabled)
346        .await?
347    {
348        return Ok(());
349    }
350
351    info!(
352        context,
353        "Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
354    );
355    let node_addr =
356        serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
357
358    context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
359        msg_id: instance_id,
360    });
361
362    let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
363        warn!(
364            context,
365            "Could not add iroh peer because {instance_id} has no topic."
366        );
367        return Ok(());
368    };
369
370    let node_id = node_addr.node_id;
371    let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
372    iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
373
374    let iroh = context.get_or_try_init_peer_channel().await?;
375    iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
376    Ok(())
377}
378
379/// Insert topicId into the database so that we can use it to retrieve the topic.
380pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
381    ctx.sql
382        .execute(
383            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
384            (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
385        )
386        .await?;
387    Ok(())
388}
389
390/// Get a list of [NodeAddr]s for one webxdc.
391async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
392    ctx.sql
393        .query_map(
394            "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
395            (msg_id, PUBLIC_KEY_STUB),
396            |row| {
397                let key:  Vec<u8> = row.get(0)?;
398                let server: Option<String> = row.get(1)?;
399                Ok((key, server))
400            },
401            |g| {
402                g.map(|data| {
403                    let (key, server) = data?;
404                    let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
405                    let id = NodeId::from_bytes(&key.try_into()
406                    .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
407                    Ok::<_, anyhow::Error>(NodeAddr::from_parts(
408                        id, server, vec![]
409                    ))
410                })
411                .collect::<std::result::Result<Vec<_>, _>>()
412            },
413        )
414        .await
415}
416
417/// Get the topic for a given [MsgId].
418pub(crate) async fn get_iroh_topic_for_msg(
419    ctx: &Context,
420    msg_id: MsgId,
421) -> Result<Option<TopicId>> {
422    if let Some(bytes) = ctx
423        .sql
424        .query_get_value::<Vec<u8>>(
425            "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
426            (msg_id,),
427        )
428        .await
429        .context("Couldn't restore topic from db")?
430    {
431        let topic_id = TopicId::from_bytes(
432            bytes
433                .try_into()
434                .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
435        );
436        Ok(Some(topic_id))
437    } else {
438        Ok(None)
439    }
440}
441
442/// Send a gossip advertisement to the chat that [MsgId] belongs to.
443/// This method should be called from the frontend when `joinRealtimeChannel` is called.
444pub async fn send_webxdc_realtime_advertisement(
445    ctx: &Context,
446    msg_id: MsgId,
447) -> Result<Option<oneshot::Receiver<()>>> {
448    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
449        return Ok(None);
450    }
451
452    let iroh = ctx.get_or_try_init_peer_channel().await?;
453    let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
454
455    let webxdc = Message::load_from_db(ctx, msg_id).await?;
456    let mut msg = Message::new(Viewtype::Text);
457    msg.hidden = true;
458    msg.param.set_cmd(SystemMessage::IrohNodeAddr);
459    msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
460    send_msg(ctx, webxdc.chat_id, &mut msg).await?;
461    info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
462    Ok(conn)
463}
464
465/// Send realtime data to other peers using iroh.
466pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
467    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
468        return Ok(());
469    }
470
471    let iroh = ctx.get_or_try_init_peer_channel().await?;
472    iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
473    Ok(())
474}
475
476/// Leave the gossip of the webxdc with given [MsgId].
477pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
478    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
479        return Ok(());
480    }
481    let topic = get_iroh_topic_for_msg(ctx, msg_id)
482        .await?
483        .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
484    let iroh = ctx.get_or_try_init_peer_channel().await?;
485    iroh.leave_realtime(topic).await?;
486    info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
487
488    Ok(())
489}
490
491/// Creates a new random gossip topic.
492fn create_random_topic() -> TopicId {
493    TopicId::from_bytes(rand::random())
494}
495
496/// Creates `Iroh-Gossip-Header` with a new random topic
497/// and stores the topic for the message.
498pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
499    let topic = create_random_topic();
500    insert_topic_stub(ctx, msg_id, topic).await?;
501    let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
502    Ok(topic_string)
503}
504
505async fn subscribe_loop(
506    context: &Context,
507    mut stream: iroh_gossip::net::GossipReceiver,
508    topic: TopicId,
509    msg_id: MsgId,
510    join_tx: oneshot::Sender<()>,
511) -> Result<()> {
512    let mut join_tx = Some(join_tx);
513
514    while let Some(event) = stream.try_next().await? {
515        match event {
516            Event::Gossip(event) => match event {
517                GossipEvent::Joined(nodes) => {
518                    if let Some(join_tx) = join_tx.take() {
519                        // Try to notify that at least one peer joined,
520                        // but ignore the error if receiver is dropped and nobody listens.
521                        join_tx.send(()).ok();
522                    }
523
524                    for node in nodes {
525                        iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
526                    }
527                }
528                GossipEvent::NeighborUp(node) => {
529                    info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
530                    iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
531                }
532                GossipEvent::NeighborDown(_node) => {}
533                GossipEvent::Received(message) => {
534                    info!(context, "IROH_REALTIME: Received realtime data");
535                    context.emit_event(EventType::WebxdcRealtimeData {
536                        msg_id,
537                        data: message
538                            .content
539                            .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
540                            .context("too few bytes in iroh message")?
541                            .into(),
542                    });
543                }
544            },
545            Event::Lagged => {
546                warn!(context, "Gossip lost some messages");
547            }
548        };
549    }
550    Ok(())
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use crate::{
557        EventType,
558        chat::send_msg,
559        message::{Message, Viewtype},
560        test_utils::TestContextManager,
561    };
562
563    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
564    async fn test_can_communicate() {
565        let mut tcm = TestContextManager::new();
566        let alice = &mut tcm.alice().await;
567        let bob = &mut tcm.bob().await;
568
569        // Alice sends webxdc to bob
570        let alice_chat = alice.create_chat(bob).await;
571        let mut instance = Message::new(Viewtype::File);
572        instance
573            .set_file_from_bytes(
574                alice,
575                "minimal.xdc",
576                include_bytes!("../test-data/webxdc/minimal.xdc"),
577                None,
578            )
579            .unwrap();
580
581        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
582        let alice_webxdc = alice.get_last_msg().await;
583        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
584
585        let webxdc = alice.pop_sent_msg().await;
586        let bob_webxdc = bob.recv_msg(&webxdc).await;
587        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
588
589        bob_webxdc.chat_id.accept(bob).await.unwrap();
590
591        // Alice advertises herself.
592        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
593            .await
594            .unwrap();
595
596        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
597        loop {
598            let event = bob.evtracker.recv().await.unwrap();
599            if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
600                assert!(msg_id == alice_webxdc.id);
601                break;
602            }
603        }
604
605        // Bob adds alice to gossip peers.
606        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
607            .await
608            .unwrap()
609            .into_iter()
610            .map(|addr| addr.node_id)
611            .collect::<Vec<_>>();
612
613        assert_eq!(
614            members,
615            vec![
616                alice
617                    .get_or_try_init_peer_channel()
618                    .await
619                    .unwrap()
620                    .get_node_addr()
621                    .await
622                    .unwrap()
623                    .node_id
624            ]
625        );
626
627        bob.get_or_try_init_peer_channel()
628            .await
629            .unwrap()
630            .join_and_subscribe_gossip(bob, bob_webxdc.id)
631            .await
632            .unwrap()
633            .unwrap()
634            .await
635            .unwrap();
636
637        // Alice sends ephemeral message
638        alice
639            .get_or_try_init_peer_channel()
640            .await
641            .unwrap()
642            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
643            .await
644            .unwrap();
645
646        loop {
647            let event = bob.evtracker.recv().await.unwrap();
648            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
649                if data == "alice -> bob".as_bytes() {
650                    break;
651                } else {
652                    panic!(
653                        "Unexpected status update: {}",
654                        String::from_utf8_lossy(&data)
655                    );
656                }
657            }
658        }
659        // Bob sends ephemeral message
660        bob.get_or_try_init_peer_channel()
661            .await
662            .unwrap()
663            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
664            .await
665            .unwrap();
666
667        loop {
668            let event = alice.evtracker.recv().await.unwrap();
669            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
670                if data == "bob -> alice".as_bytes() {
671                    break;
672                } else {
673                    panic!(
674                        "Unexpected status update: {}",
675                        String::from_utf8_lossy(&data)
676                    );
677                }
678            }
679        }
680
681        // Alice adds bob to gossip peers.
682        let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
683            .await
684            .unwrap()
685            .into_iter()
686            .map(|addr| addr.node_id)
687            .collect::<Vec<_>>();
688
689        assert_eq!(
690            members,
691            vec![
692                bob.get_or_try_init_peer_channel()
693                    .await
694                    .unwrap()
695                    .get_node_addr()
696                    .await
697                    .unwrap()
698                    .node_id
699            ]
700        );
701
702        bob.get_or_try_init_peer_channel()
703            .await
704            .unwrap()
705            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
706            .await
707            .unwrap();
708
709        loop {
710            let event = alice.evtracker.recv().await.unwrap();
711            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
712                if data == "bob -> alice 2".as_bytes() {
713                    break;
714                } else {
715                    panic!(
716                        "Unexpected status update: {}",
717                        String::from_utf8_lossy(&data)
718                    );
719                }
720            }
721        }
722
723        // Calling stop_io() closes iroh endpoint as well,
724        // even though I/O was not started in this test.
725        assert!(alice.iroh.read().await.is_some());
726        alice.stop_io().await;
727        assert!(alice.iroh.read().await.is_none());
728    }
729
730    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
731    async fn test_can_reconnect() {
732        let mut tcm = TestContextManager::new();
733        let alice = &mut tcm.alice().await;
734        let bob = &mut tcm.bob().await;
735
736        assert!(
737            alice
738                .get_config_bool(Config::WebxdcRealtimeEnabled)
739                .await
740                .unwrap()
741        );
742        // Alice sends webxdc to bob
743        let alice_chat = alice.create_chat(bob).await;
744        let mut instance = Message::new(Viewtype::File);
745        instance
746            .set_file_from_bytes(
747                alice,
748                "minimal.xdc",
749                include_bytes!("../test-data/webxdc/minimal.xdc"),
750                None,
751            )
752            .unwrap();
753
754        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
755        let alice_webxdc = alice.get_last_msg().await;
756        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
757
758        let webxdc = alice.pop_sent_msg().await;
759        let bob_webxdc = bob.recv_msg(&webxdc).await;
760        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
761
762        bob_webxdc.chat_id.accept(bob).await.unwrap();
763
764        // Alice advertises herself.
765        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
766            .await
767            .unwrap();
768
769        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
770
771        // Bob adds alice to gossip peers.
772        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
773            .await
774            .unwrap()
775            .into_iter()
776            .map(|addr| addr.node_id)
777            .collect::<Vec<_>>();
778
779        assert_eq!(
780            members,
781            vec![
782                alice
783                    .get_or_try_init_peer_channel()
784                    .await
785                    .unwrap()
786                    .get_node_addr()
787                    .await
788                    .unwrap()
789                    .node_id
790            ]
791        );
792
793        bob.get_or_try_init_peer_channel()
794            .await
795            .unwrap()
796            .join_and_subscribe_gossip(bob, bob_webxdc.id)
797            .await
798            .unwrap()
799            .unwrap()
800            .await
801            .unwrap();
802
803        // Alice sends ephemeral message
804        alice
805            .get_or_try_init_peer_channel()
806            .await
807            .unwrap()
808            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
809            .await
810            .unwrap();
811
812        loop {
813            let event = bob.evtracker.recv().await.unwrap();
814            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
815                if data == "alice -> bob".as_bytes() {
816                    break;
817                } else {
818                    panic!(
819                        "Unexpected status update: {}",
820                        String::from_utf8_lossy(&data)
821                    );
822                }
823            }
824        }
825
826        let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
827            .await
828            .unwrap()
829            .unwrap();
830        let bob_sequence_number = bob
831            .iroh
832            .read()
833            .await
834            .as_ref()
835            .unwrap()
836            .sequence_numbers
837            .lock()
838            .get(&bob_topic)
839            .copied();
840        leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
841        let bob_sequence_number_after = bob
842            .iroh
843            .read()
844            .await
845            .as_ref()
846            .unwrap()
847            .sequence_numbers
848            .lock()
849            .get(&bob_topic)
850            .copied();
851        // Check that sequence number is persisted when leaving the channel.
852        assert_eq!(bob_sequence_number, bob_sequence_number_after);
853
854        bob.get_or_try_init_peer_channel()
855            .await
856            .unwrap()
857            .join_and_subscribe_gossip(bob, bob_webxdc.id)
858            .await
859            .unwrap()
860            .unwrap()
861            .await
862            .unwrap();
863
864        bob.get_or_try_init_peer_channel()
865            .await
866            .unwrap()
867            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
868            .await
869            .unwrap();
870
871        loop {
872            let event = alice.evtracker.recv().await.unwrap();
873            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
874                if data == "bob -> alice".as_bytes() {
875                    break;
876                } else {
877                    panic!(
878                        "Unexpected status update: {}",
879                        String::from_utf8_lossy(&data)
880                    );
881                }
882            }
883        }
884
885        // channel is only used to remember if an advertisement has been sent
886        // bob for example does not change the channels because he never sends an
887        // advertisement
888        assert_eq!(
889            alice
890                .iroh
891                .read()
892                .await
893                .as_ref()
894                .unwrap()
895                .iroh_channels
896                .read()
897                .await
898                .len(),
899            1
900        );
901        leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
902        let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
903            .await
904            .unwrap()
905            .unwrap();
906        assert!(
907            alice
908                .iroh
909                .read()
910                .await
911                .as_ref()
912                .unwrap()
913                .iroh_channels
914                .read()
915                .await
916                .get(&topic)
917                .is_none()
918        );
919    }
920
921    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
922    async fn test_parallel_connect() {
923        let mut tcm = TestContextManager::new();
924        let alice = &mut tcm.alice().await;
925        let bob = &mut tcm.bob().await;
926
927        // Alice sends webxdc to bob
928        let alice_chat = alice.create_chat(bob).await;
929        let mut instance = Message::new(Viewtype::File);
930        instance
931            .set_file_from_bytes(
932                alice,
933                "minimal.xdc",
934                include_bytes!("../test-data/webxdc/minimal.xdc"),
935                None,
936            )
937            .unwrap();
938        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
939        let alice_webxdc = alice.get_last_msg().await;
940
941        let webxdc = alice.pop_sent_msg().await;
942        let bob_webxdc = bob.recv_msg(&webxdc).await;
943        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
944
945        bob_webxdc.chat_id.accept(bob).await.unwrap();
946
947        eprintln!("Sending advertisements");
948        // Alice advertises herself.
949        let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
950            .await
951            .unwrap()
952            .unwrap();
953        let alice_advertisement = alice.pop_sent_msg().await;
954
955        send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
956            .await
957            .unwrap();
958        let bob_advertisement = bob.pop_sent_msg().await;
959
960        eprintln!("Receiving advertisements");
961        bob.recv_msg_trash(&alice_advertisement).await;
962        alice.recv_msg_trash(&bob_advertisement).await;
963
964        eprintln!("Alice waits for connection");
965        alice_advertisement_future.await.unwrap();
966
967        // Alice sends ephemeral message
968        eprintln!("Sending ephemeral message");
969        send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
970            .await
971            .unwrap();
972
973        eprintln!("Waiting for ephemeral message");
974        loop {
975            let event = bob.evtracker.recv().await.unwrap();
976            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
977                if data == b"alice -> bob" {
978                    break;
979                } else {
980                    panic!(
981                        "Unexpected status update: {}",
982                        String::from_utf8_lossy(&data)
983                    );
984                }
985            }
986        }
987    }
988
989    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
990    async fn test_peer_channels_disabled() {
991        let mut tcm = TestContextManager::new();
992        let alice = &mut tcm.alice().await;
993
994        alice
995            .set_config_bool(Config::WebxdcRealtimeEnabled, false)
996            .await
997            .unwrap();
998
999        // creates iroh endpoint as side effect
1000        send_webxdc_realtime_advertisement(alice, MsgId::new(1))
1001            .await
1002            .unwrap();
1003
1004        assert!(alice.ctx.iroh.read().await.is_none());
1005
1006        // creates iroh endpoint as side effect
1007        send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1008            .await
1009            .unwrap();
1010
1011        assert!(alice.ctx.iroh.read().await.is_none());
1012
1013        // creates iroh endpoint as side effect
1014        leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1015
1016        assert!(alice.ctx.iroh.read().await.is_none());
1017
1018        // This internal function should return error
1019        // if accidentally called with the setting disabled.
1020        assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1021    }
1022}