deltachat/
peer_channels.rs

1//! Peer channels for realtime communication in webxdcs.
2//!
3//! We use Iroh as an ephemeral peer channels provider to create direct communication
4//! channels between webxdcs. See [here](https://webxdc.org/docs/spec/joinRealtimeChannel.html) for the webxdc specs.
5//!
6//! Ephemeral channels should be established lazily, to avoid bootstrapping p2p connectivity
7//! when it's not required. Only when a webxdc subscribes to realtime data or when a reatlime message is sent,
8//! the p2p machinery should be started.
9//!
10//! Adding peer channels to webxdc needs upfront negotiation of a topic and sharing of public keys so that
11//! nodes can connect to each other. The explicit approach is as follows:
12//!
13//! 1. We introduce a new [`IrohGossipTopic`](crate::headerdef::HeaderDef::IrohGossipTopic) message header with a random 32-byte TopicId,
14//!    securely generated on the initial webxdc sender's device. This message header is encrypted
15//!    and sent in the same message as the webxdc application.
16//! 2. Whenever `joinRealtimeChannel().setListener()` or `joinRealtimeChannel().send()` is called by the webxdc application,
17//!    we start a routine to establish p2p connectivity and join the gossip swarm with Iroh.
18//! 3. The first step of this routine is to introduce yourself with a regular message containing the [`IrohNodeAddr`](crate::headerdef::HeaderDef::IrohNodeAddr).
19//!    This message contains the users relay-server and public key.
20//!    Direct IP address is not included as this information can be persisted by email providers.
21//! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet).
22//! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database
23//!    (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()`
24//!    and `joinRealtimeChannel().send()` just like the other peers.
25
26use anyhow::{anyhow, bail, Context as _, Result};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMap, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{oneshot, RwLock};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::chat::send_msg;
40use crate::config::Config;
41use crate::context::Context;
42use crate::message::{Message, MsgId, Viewtype};
43use crate::mimeparser::SystemMessage;
44use crate::EventType;
45
46/// The length of an ed25519 `PublicKey`, in bytes.
47const PUBLIC_KEY_LENGTH: usize = 32;
48const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
49
50/// Store iroh peer channels for the context.
51#[derive(Debug)]
52pub struct Iroh {
53    /// iroh router  needed for iroh peer channels.
54    pub(crate) router: iroh::protocol::Router,
55
56    /// [Gossip] needed for iroh peer channels.
57    pub(crate) gossip: Gossip,
58
59    /// Sequence numbers for gossip channels.
60    pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
61
62    /// Topics for which an advertisement has already been sent.
63    pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
64
65    /// Currently used Iroh public key.
66    ///
67    /// This is attached to every message to work around `iroh_gossip` deduplication.
68    pub(crate) public_key: PublicKey,
69}
70
71impl Iroh {
72    /// Notify the endpoint that the network has changed.
73    pub(crate) async fn network_change(&self) {
74        self.router.endpoint().network_change().await
75    }
76
77    /// Closes the QUIC endpoint.
78    pub(crate) async fn close(self) -> Result<()> {
79        self.router.shutdown().await.context("Closing iroh failed")
80    }
81
82    /// Join a topic and create the subscriber loop for it.
83    ///
84    /// If there is no gossip, create it.
85    ///
86    /// The returned future resolves when the swarm becomes operational.
87    async fn join_and_subscribe_gossip(
88        &self,
89        ctx: &Context,
90        msg_id: MsgId,
91    ) -> Result<Option<oneshot::Receiver<()>>> {
92        let topic = get_iroh_topic_for_msg(ctx, msg_id)
93            .await?
94            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
95
96        // Take exclusive lock to make sure
97        // no other thread can create a second gossip subscription
98        // after we check that it does not exist and before we create a new one.
99        // Otherwise we would receive every message twice or more times.
100        let mut iroh_channels = self.iroh_channels.write().await;
101
102        if iroh_channels.contains_key(&topic) {
103            return Ok(None);
104        }
105
106        let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
107        let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
108
109        info!(
110            ctx,
111            "IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
112        );
113
114        // Inform iroh of potentially new node addresses
115        for node_addr in &peers {
116            if !node_addr.is_empty() {
117                self.router.endpoint().add_node_addr(node_addr.clone())?;
118            }
119        }
120
121        let (join_tx, join_rx) = oneshot::channel();
122
123        let (gossip_sender, gossip_receiver) = self
124            .gossip
125            .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
126            .split();
127
128        let ctx = ctx.clone();
129        let subscribe_loop = tokio::spawn(async move {
130            if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
131                warn!(ctx, "subscribe_loop failed: {e}")
132            }
133        });
134
135        iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
136
137        Ok(Some(join_rx))
138    }
139
140    /// Add gossip peers to realtime channel if it is already active.
141    pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
142        if self.iroh_channels.read().await.get(&topic).is_some() {
143            for peer in &peers {
144                self.router.endpoint().add_node_addr(peer.clone())?;
145            }
146
147            self.gossip.subscribe_with_opts(
148                topic,
149                JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
150            );
151        }
152        Ok(())
153    }
154
155    /// Send realtime data to the gossip swarm.
156    pub async fn send_webxdc_realtime_data(
157        &self,
158        ctx: &Context,
159        msg_id: MsgId,
160        mut data: Vec<u8>,
161    ) -> Result<()> {
162        let topic = get_iroh_topic_for_msg(ctx, msg_id)
163            .await?
164            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
165        self.join_and_subscribe_gossip(ctx, msg_id).await?;
166
167        let seq_num = self.get_and_incr(&topic);
168
169        let mut iroh_channels = self.iroh_channels.write().await;
170        let state = iroh_channels
171            .get_mut(&topic)
172            .context("Just created state does not exist")?;
173        data.extend(seq_num.to_le_bytes());
174        data.extend(self.public_key.as_bytes());
175
176        state.sender.broadcast(data.into()).await?;
177
178        if env::var("REALTIME_DEBUG").is_ok() {
179            info!(ctx, "Sent realtime data");
180        }
181
182        Ok(())
183    }
184
185    fn get_and_incr(&self, topic: &TopicId) -> i32 {
186        let mut sequence_numbers = self.sequence_numbers.lock();
187        let entry = sequence_numbers.entry(*topic).or_default();
188        *entry = entry.wrapping_add(1);
189        *entry
190    }
191
192    /// Get the iroh [NodeAddr] without direct IP addresses.
193    pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
194        let mut addr = self.router.endpoint().node_addr().await?;
195        addr.direct_addresses = BTreeSet::new();
196        Ok(addr)
197    }
198
199    /// Leave the realtime channel for a given topic.
200    pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
201        if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
202            // Dropping the last GossipTopic results in quitting the topic.
203            // It is split into GossipReceiver and GossipSender.
204            // GossipSender (`channel.sender`) is dropped automatically.
205
206            // Subscribe loop owns GossipReceiver.
207            // Aborting it and waiting for it to be dropped
208            // drops the receiver.
209            channel.subscribe_loop.abort();
210            let _ = channel.subscribe_loop.await;
211        }
212        Ok(())
213    }
214}
215
216/// Single gossip channel state.
217#[derive(Debug)]
218pub(crate) struct ChannelState {
219    /// The subscribe loop handle.
220    subscribe_loop: JoinHandle<()>,
221
222    sender: iroh_gossip::net::GossipSender,
223}
224
225impl ChannelState {
226    fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
227        Self {
228            subscribe_loop,
229            sender,
230        }
231    }
232}
233
234impl Context {
235    /// Create iroh endpoint and gossip.
236    async fn init_peer_channels(&self) -> Result<Iroh> {
237        info!(self, "Initializing peer channels.");
238        let secret_key = SecretKey::generate(rand::rngs::OsRng);
239        let public_key = secret_key.public();
240
241        let relay_mode = if let Some(relay_url) = self
242            .metadata
243            .read()
244            .await
245            .as_ref()
246            .and_then(|conf| conf.iroh_relay.clone())
247        {
248            RelayMode::Custom(RelayMap::from_url(RelayUrl::from(relay_url)))
249        } else {
250            // FIXME: this should be RelayMode::Disabled instead.
251            // Currently using default relays because otherwise Rust tests fail.
252            RelayMode::Default
253        };
254
255        let endpoint = Endpoint::builder()
256            .secret_key(secret_key)
257            .alpns(vec![GOSSIP_ALPN.to_vec()])
258            .relay_mode(relay_mode)
259            .bind()
260            .await?;
261
262        // create gossip
263        // Allow messages up to 128 KB in size.
264        // We set the limit to 128 KiB to account for internal overhead,
265        // but only guarantee 128 KB of payload to WebXDC developers.
266
267        let gossip = Gossip::builder()
268            .max_message_size(128 * 1024)
269            .spawn(endpoint.clone())
270            .await?;
271
272        let router = iroh::protocol::Router::builder(endpoint)
273            .accept(GOSSIP_ALPN, gossip.clone())
274            .spawn()
275            .await?;
276
277        Ok(Iroh {
278            router,
279            gossip,
280            sequence_numbers: Mutex::new(HashMap::new()),
281            iroh_channels: RwLock::new(HashMap::new()),
282            public_key,
283        })
284    }
285
286    /// Get or initialize the iroh peer channel.
287    pub async fn get_or_try_init_peer_channel(
288        &self,
289    ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
290        if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
291            bail!("Attempt to get Iroh when realtime is disabled");
292        }
293
294        if let Ok(lock) = tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
295            self.iroh.read().await,
296            |opt_iroh| opt_iroh.as_ref(),
297        ) {
298            return Ok(lock);
299        }
300
301        let lock = self.iroh.write().await;
302        match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
303            lock,
304            |opt_iroh| opt_iroh.as_ref(),
305        ) {
306            Ok(lock) => Ok(lock),
307            Err(mut lock) => {
308                let iroh = self.init_peer_channels().await?;
309                *lock = Some(iroh);
310                tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
311                    lock,
312                    |opt_iroh| opt_iroh.as_ref(),
313                )
314                .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
315            }
316        }
317    }
318}
319
320/// Cache a peers [NodeId] for one topic.
321pub(crate) async fn iroh_add_peer_for_topic(
322    ctx: &Context,
323    msg_id: MsgId,
324    topic: TopicId,
325    peer: NodeId,
326    relay_server: Option<&str>,
327) -> Result<()> {
328    ctx.sql
329        .execute(
330            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
331            (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
332        )
333        .await?;
334    Ok(())
335}
336
337/// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`.
338pub async fn add_gossip_peer_from_header(
339    context: &Context,
340    instance_id: MsgId,
341    node_addr: &str,
342) -> Result<()> {
343    if !context
344        .get_config_bool(Config::WebxdcRealtimeEnabled)
345        .await?
346    {
347        return Ok(());
348    }
349
350    info!(
351        context,
352        "Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
353    );
354    let node_addr =
355        serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
356
357    context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
358        msg_id: instance_id,
359    });
360
361    let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
362        warn!(
363            context,
364            "Could not add iroh peer because {instance_id} has no topic."
365        );
366        return Ok(());
367    };
368
369    let node_id = node_addr.node_id;
370    let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
371    iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
372
373    let iroh = context.get_or_try_init_peer_channel().await?;
374    iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
375    Ok(())
376}
377
378/// Insert topicId into the database so that we can use it to retrieve the topic.
379pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
380    ctx.sql
381        .execute(
382            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
383            (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
384        )
385        .await?;
386    Ok(())
387}
388
389/// Get a list of [NodeAddr]s for one webxdc.
390async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
391    ctx.sql
392        .query_map(
393            "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
394            (msg_id, PUBLIC_KEY_STUB),
395            |row| {
396                let key:  Vec<u8> = row.get(0)?;
397                let server: Option<String> = row.get(1)?;
398                Ok((key, server))
399            },
400            |g| {
401                g.map(|data| {
402                    let (key, server) = data?;
403                    let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
404                    let id = NodeId::from_bytes(&key.try_into()
405                    .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
406                    Ok::<_, anyhow::Error>(NodeAddr::from_parts(
407                        id, server, vec![]
408                    ))
409                })
410                .collect::<std::result::Result<Vec<_>, _>>()
411            },
412        )
413        .await
414}
415
416/// Get the topic for a given [MsgId].
417pub(crate) async fn get_iroh_topic_for_msg(
418    ctx: &Context,
419    msg_id: MsgId,
420) -> Result<Option<TopicId>> {
421    if let Some(bytes) = ctx
422        .sql
423        .query_get_value::<Vec<u8>>(
424            "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
425            (msg_id,),
426        )
427        .await
428        .context("Couldn't restore topic from db")?
429    {
430        let topic_id = TopicId::from_bytes(
431            bytes
432                .try_into()
433                .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
434        );
435        Ok(Some(topic_id))
436    } else {
437        Ok(None)
438    }
439}
440
441/// Send a gossip advertisement to the chat that [MsgId] belongs to.
442/// This method should be called from the frontend when `joinRealtimeChannel` is called.
443pub async fn send_webxdc_realtime_advertisement(
444    ctx: &Context,
445    msg_id: MsgId,
446) -> Result<Option<oneshot::Receiver<()>>> {
447    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
448        return Ok(None);
449    }
450
451    let iroh = ctx.get_or_try_init_peer_channel().await?;
452    let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
453
454    let webxdc = Message::load_from_db(ctx, msg_id).await?;
455    let mut msg = Message::new(Viewtype::Text);
456    msg.hidden = true;
457    msg.param.set_cmd(SystemMessage::IrohNodeAddr);
458    msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
459    send_msg(ctx, webxdc.chat_id, &mut msg).await?;
460    info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
461    Ok(conn)
462}
463
464/// Send realtime data to other peers using iroh.
465pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
466    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
467        return Ok(());
468    }
469
470    let iroh = ctx.get_or_try_init_peer_channel().await?;
471    iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
472    Ok(())
473}
474
475/// Leave the gossip of the webxdc with given [MsgId].
476pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
477    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
478        return Ok(());
479    }
480    let topic = get_iroh_topic_for_msg(ctx, msg_id)
481        .await?
482        .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
483    let iroh = ctx.get_or_try_init_peer_channel().await?;
484    iroh.leave_realtime(topic).await?;
485    info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
486
487    Ok(())
488}
489
490/// Creates a new random gossip topic.
491fn create_random_topic() -> TopicId {
492    TopicId::from_bytes(rand::random())
493}
494
495/// Creates `Iroh-Gossip-Header` with a new random topic
496/// and stores the topic for the message.
497pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
498    let topic = create_random_topic();
499    insert_topic_stub(ctx, msg_id, topic).await?;
500    let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
501    Ok(topic_string)
502}
503
504async fn subscribe_loop(
505    context: &Context,
506    mut stream: iroh_gossip::net::GossipReceiver,
507    topic: TopicId,
508    msg_id: MsgId,
509    join_tx: oneshot::Sender<()>,
510) -> Result<()> {
511    let mut join_tx = Some(join_tx);
512
513    while let Some(event) = stream.try_next().await? {
514        match event {
515            Event::Gossip(event) => match event {
516                GossipEvent::Joined(nodes) => {
517                    if let Some(join_tx) = join_tx.take() {
518                        // Try to notify that at least one peer joined,
519                        // but ignore the error if receiver is dropped and nobody listens.
520                        join_tx.send(()).ok();
521                    }
522
523                    for node in nodes {
524                        iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
525                    }
526                }
527                GossipEvent::NeighborUp(node) => {
528                    info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
529                    iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
530                }
531                GossipEvent::NeighborDown(_node) => {}
532                GossipEvent::Received(message) => {
533                    info!(context, "IROH_REALTIME: Received realtime data");
534                    context.emit_event(EventType::WebxdcRealtimeData {
535                        msg_id,
536                        data: message
537                            .content
538                            .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
539                            .context("too few bytes in iroh message")?
540                            .into(),
541                    });
542                }
543            },
544            Event::Lagged => {
545                warn!(context, "Gossip lost some messages");
546            }
547        };
548    }
549    Ok(())
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use crate::{
556        chat::send_msg,
557        message::{Message, Viewtype},
558        test_utils::TestContextManager,
559        EventType,
560    };
561
562    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
563    async fn test_can_communicate() {
564        let mut tcm = TestContextManager::new();
565        let alice = &mut tcm.alice().await;
566        let bob = &mut tcm.bob().await;
567
568        // Alice sends webxdc to bob
569        let alice_chat = alice.create_chat(bob).await;
570        let mut instance = Message::new(Viewtype::File);
571        instance
572            .set_file_from_bytes(
573                alice,
574                "minimal.xdc",
575                include_bytes!("../test-data/webxdc/minimal.xdc"),
576                None,
577            )
578            .unwrap();
579
580        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
581        let alice_webxdc = alice.get_last_msg().await;
582        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
583
584        let webxdc = alice.pop_sent_msg().await;
585        let bob_webxdc = bob.recv_msg(&webxdc).await;
586        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
587
588        bob_webxdc.chat_id.accept(bob).await.unwrap();
589
590        // Alice advertises herself.
591        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
592            .await
593            .unwrap();
594
595        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
596        loop {
597            let event = bob.evtracker.recv().await.unwrap();
598            if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
599                assert!(msg_id == alice_webxdc.id);
600                break;
601            }
602        }
603
604        // Bob adds alice to gossip peers.
605        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
606            .await
607            .unwrap()
608            .into_iter()
609            .map(|addr| addr.node_id)
610            .collect::<Vec<_>>();
611
612        assert_eq!(
613            members,
614            vec![
615                alice
616                    .get_or_try_init_peer_channel()
617                    .await
618                    .unwrap()
619                    .get_node_addr()
620                    .await
621                    .unwrap()
622                    .node_id
623            ]
624        );
625
626        bob.get_or_try_init_peer_channel()
627            .await
628            .unwrap()
629            .join_and_subscribe_gossip(bob, bob_webxdc.id)
630            .await
631            .unwrap()
632            .unwrap()
633            .await
634            .unwrap();
635
636        // Alice sends ephemeral message
637        alice
638            .get_or_try_init_peer_channel()
639            .await
640            .unwrap()
641            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
642            .await
643            .unwrap();
644
645        loop {
646            let event = bob.evtracker.recv().await.unwrap();
647            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
648                if data == "alice -> bob".as_bytes() {
649                    break;
650                } else {
651                    panic!(
652                        "Unexpected status update: {}",
653                        String::from_utf8_lossy(&data)
654                    );
655                }
656            }
657        }
658        // Bob sends ephemeral message
659        bob.get_or_try_init_peer_channel()
660            .await
661            .unwrap()
662            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
663            .await
664            .unwrap();
665
666        loop {
667            let event = alice.evtracker.recv().await.unwrap();
668            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
669                if data == "bob -> alice".as_bytes() {
670                    break;
671                } else {
672                    panic!(
673                        "Unexpected status update: {}",
674                        String::from_utf8_lossy(&data)
675                    );
676                }
677            }
678        }
679
680        // Alice adds bob to gossip peers.
681        let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
682            .await
683            .unwrap()
684            .into_iter()
685            .map(|addr| addr.node_id)
686            .collect::<Vec<_>>();
687
688        assert_eq!(
689            members,
690            vec![
691                bob.get_or_try_init_peer_channel()
692                    .await
693                    .unwrap()
694                    .get_node_addr()
695                    .await
696                    .unwrap()
697                    .node_id
698            ]
699        );
700
701        bob.get_or_try_init_peer_channel()
702            .await
703            .unwrap()
704            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
705            .await
706            .unwrap();
707
708        loop {
709            let event = alice.evtracker.recv().await.unwrap();
710            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
711                if data == "bob -> alice 2".as_bytes() {
712                    break;
713                } else {
714                    panic!(
715                        "Unexpected status update: {}",
716                        String::from_utf8_lossy(&data)
717                    );
718                }
719            }
720        }
721
722        // Calling stop_io() closes iroh endpoint as well,
723        // even though I/O was not started in this test.
724        assert!(alice.iroh.read().await.is_some());
725        alice.stop_io().await;
726        assert!(alice.iroh.read().await.is_none());
727    }
728
729    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
730    async fn test_can_reconnect() {
731        let mut tcm = TestContextManager::new();
732        let alice = &mut tcm.alice().await;
733        let bob = &mut tcm.bob().await;
734
735        assert!(alice
736            .get_config_bool(Config::WebxdcRealtimeEnabled)
737            .await
738            .unwrap());
739        // Alice sends webxdc to bob
740        let alice_chat = alice.create_chat(bob).await;
741        let mut instance = Message::new(Viewtype::File);
742        instance
743            .set_file_from_bytes(
744                alice,
745                "minimal.xdc",
746                include_bytes!("../test-data/webxdc/minimal.xdc"),
747                None,
748            )
749            .unwrap();
750
751        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
752        let alice_webxdc = alice.get_last_msg().await;
753        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
754
755        let webxdc = alice.pop_sent_msg().await;
756        let bob_webxdc = bob.recv_msg(&webxdc).await;
757        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
758
759        bob_webxdc.chat_id.accept(bob).await.unwrap();
760
761        // Alice advertises herself.
762        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
763            .await
764            .unwrap();
765
766        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
767
768        // Bob adds alice to gossip peers.
769        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
770            .await
771            .unwrap()
772            .into_iter()
773            .map(|addr| addr.node_id)
774            .collect::<Vec<_>>();
775
776        assert_eq!(
777            members,
778            vec![
779                alice
780                    .get_or_try_init_peer_channel()
781                    .await
782                    .unwrap()
783                    .get_node_addr()
784                    .await
785                    .unwrap()
786                    .node_id
787            ]
788        );
789
790        bob.get_or_try_init_peer_channel()
791            .await
792            .unwrap()
793            .join_and_subscribe_gossip(bob, bob_webxdc.id)
794            .await
795            .unwrap()
796            .unwrap()
797            .await
798            .unwrap();
799
800        // Alice sends ephemeral message
801        alice
802            .get_or_try_init_peer_channel()
803            .await
804            .unwrap()
805            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
806            .await
807            .unwrap();
808
809        loop {
810            let event = bob.evtracker.recv().await.unwrap();
811            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
812                if data == "alice -> bob".as_bytes() {
813                    break;
814                } else {
815                    panic!(
816                        "Unexpected status update: {}",
817                        String::from_utf8_lossy(&data)
818                    );
819                }
820            }
821        }
822
823        let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
824            .await
825            .unwrap()
826            .unwrap();
827        let bob_sequence_number = bob
828            .iroh
829            .read()
830            .await
831            .as_ref()
832            .unwrap()
833            .sequence_numbers
834            .lock()
835            .get(&bob_topic)
836            .copied();
837        leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
838        let bob_sequence_number_after = bob
839            .iroh
840            .read()
841            .await
842            .as_ref()
843            .unwrap()
844            .sequence_numbers
845            .lock()
846            .get(&bob_topic)
847            .copied();
848        // Check that sequence number is persisted when leaving the channel.
849        assert_eq!(bob_sequence_number, bob_sequence_number_after);
850
851        bob.get_or_try_init_peer_channel()
852            .await
853            .unwrap()
854            .join_and_subscribe_gossip(bob, bob_webxdc.id)
855            .await
856            .unwrap()
857            .unwrap()
858            .await
859            .unwrap();
860
861        bob.get_or_try_init_peer_channel()
862            .await
863            .unwrap()
864            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
865            .await
866            .unwrap();
867
868        loop {
869            let event = alice.evtracker.recv().await.unwrap();
870            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
871                if data == "bob -> alice".as_bytes() {
872                    break;
873                } else {
874                    panic!(
875                        "Unexpected status update: {}",
876                        String::from_utf8_lossy(&data)
877                    );
878                }
879            }
880        }
881
882        // channel is only used to remember if an advertisement has been sent
883        // bob for example does not change the channels because he never sends an
884        // advertisement
885        assert_eq!(
886            alice
887                .iroh
888                .read()
889                .await
890                .as_ref()
891                .unwrap()
892                .iroh_channels
893                .read()
894                .await
895                .len(),
896            1
897        );
898        leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
899        let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
900            .await
901            .unwrap()
902            .unwrap();
903        assert!(alice
904            .iroh
905            .read()
906            .await
907            .as_ref()
908            .unwrap()
909            .iroh_channels
910            .read()
911            .await
912            .get(&topic)
913            .is_none());
914    }
915
916    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
917    async fn test_parallel_connect() {
918        let mut tcm = TestContextManager::new();
919        let alice = &mut tcm.alice().await;
920        let bob = &mut tcm.bob().await;
921
922        // Alice sends webxdc to bob
923        let alice_chat = alice.create_chat(bob).await;
924        let mut instance = Message::new(Viewtype::File);
925        instance
926            .set_file_from_bytes(
927                alice,
928                "minimal.xdc",
929                include_bytes!("../test-data/webxdc/minimal.xdc"),
930                None,
931            )
932            .unwrap();
933        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
934        let alice_webxdc = alice.get_last_msg().await;
935
936        let webxdc = alice.pop_sent_msg().await;
937        let bob_webxdc = bob.recv_msg(&webxdc).await;
938        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
939
940        bob_webxdc.chat_id.accept(bob).await.unwrap();
941
942        eprintln!("Sending advertisements");
943        // Alice advertises herself.
944        let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
945            .await
946            .unwrap()
947            .unwrap();
948        let alice_advertisement = alice.pop_sent_msg().await;
949
950        send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
951            .await
952            .unwrap();
953        let bob_advertisement = bob.pop_sent_msg().await;
954
955        eprintln!("Receiving advertisements");
956        bob.recv_msg_trash(&alice_advertisement).await;
957        alice.recv_msg_trash(&bob_advertisement).await;
958
959        eprintln!("Alice waits for connection");
960        alice_advertisement_future.await.unwrap();
961
962        // Alice sends ephemeral message
963        eprintln!("Sending ephemeral message");
964        send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
965            .await
966            .unwrap();
967
968        eprintln!("Waiting for ephemeral message");
969        loop {
970            let event = bob.evtracker.recv().await.unwrap();
971            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
972                if data == b"alice -> bob" {
973                    break;
974                } else {
975                    panic!(
976                        "Unexpected status update: {}",
977                        String::from_utf8_lossy(&data)
978                    );
979                }
980            }
981        }
982    }
983
984    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
985    async fn test_peer_channels_disabled() {
986        let mut tcm = TestContextManager::new();
987        let alice = &mut tcm.alice().await;
988
989        alice
990            .set_config_bool(Config::WebxdcRealtimeEnabled, false)
991            .await
992            .unwrap();
993
994        // creates iroh endpoint as side effect
995        send_webxdc_realtime_advertisement(alice, MsgId::new(1))
996            .await
997            .unwrap();
998
999        assert!(alice.ctx.iroh.read().await.is_none());
1000
1001        // creates iroh endpoint as side effect
1002        send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1003            .await
1004            .unwrap();
1005
1006        assert!(alice.ctx.iroh.read().await.is_none());
1007
1008        // creates iroh endpoint as side effect
1009        leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1010
1011        assert!(alice.ctx.iroh.read().await.is_none());
1012
1013        // This internal function should return error
1014        // if accidentally called with the setting disabled.
1015        assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1016    }
1017}