deltachat/
peer_channels.rs

1//! Peer channels for realtime communication in webxdcs.
2//!
3//! We use Iroh as an ephemeral peer channels provider to create direct communication
4//! channels between webxdcs. See [here](https://webxdc.org/docs/spec/joinRealtimeChannel.html) for the webxdc specs.
5//!
6//! Ephemeral channels should be established lazily, to avoid bootstrapping p2p connectivity
7//! when it's not required. Only when a webxdc subscribes to realtime data or when a reatlime message is sent,
8//! the p2p machinery should be started.
9//!
10//! Adding peer channels to webxdc needs upfront negotiation of a topic and sharing of public keys so that
11//! nodes can connect to each other. The explicit approach is as follows:
12//!
13//! 1. We introduce a new [`IrohGossipTopic`](crate::headerdef::HeaderDef::IrohGossipTopic) message header with a random 32-byte TopicId,
14//!    securely generated on the initial webxdc sender's device. This message header is encrypted
15//!    and sent in the same message as the webxdc application.
16//! 2. Whenever `joinRealtimeChannel().setListener()` or `joinRealtimeChannel().send()` is called by the webxdc application,
17//!    we start a routine to establish p2p connectivity and join the gossip swarm with Iroh.
18//! 3. The first step of this routine is to introduce yourself with a regular message containing the [`IrohNodeAddr`](crate::headerdef::HeaderDef::IrohNodeAddr).
19//!    This message contains the users relay-server and public key.
20//!    Direct IP address is not included as this information can be persisted by email providers.
21//! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet).
22//! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database
23//!    (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()`
24//!    and `joinRealtimeChannel().send()` just like the other peers.
25
26use anyhow::{Context as _, Result, anyhow, bail};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{RwLock, oneshot};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::EventType;
40use crate::chat::send_msg;
41use crate::config::Config;
42use crate::context::Context;
43use crate::log::warn;
44use crate::message::{Message, MsgId, Viewtype};
45use crate::mimeparser::SystemMessage;
46
47/// The length of an ed25519 `PublicKey`, in bytes.
48const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51/// Store Iroh peer channels for the context.
52#[derive(Debug)]
53pub struct Iroh {
54    /// Iroh router  needed for Iroh peer channels.
55    pub(crate) router: iroh::protocol::Router,
56
57    /// [Gossip] needed for Iroh peer channels.
58    pub(crate) gossip: Gossip,
59
60    /// Sequence numbers for gossip channels.
61    pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63    /// Topics for which an advertisement has already been sent.
64    pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66    /// Currently used Iroh public key.
67    ///
68    /// This is attached to every message to work around `iroh_gossip` deduplication.
69    pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73    /// Notify the endpoint that the network has changed.
74    pub(crate) async fn network_change(&self) {
75        self.router.endpoint().network_change().await
76    }
77
78    /// Closes the QUIC endpoint.
79    pub(crate) async fn close(self) -> Result<()> {
80        self.router.shutdown().await.context("Closing iroh failed")
81    }
82
83    /// Join a topic and create the subscriber loop for it.
84    ///
85    /// If there is no gossip, create it.
86    ///
87    /// The returned future resolves when the swarm becomes operational.
88    async fn join_and_subscribe_gossip(
89        &self,
90        ctx: &Context,
91        msg_id: MsgId,
92    ) -> Result<Option<oneshot::Receiver<()>>> {
93        let topic = get_iroh_topic_for_msg(ctx, msg_id)
94            .await?
95            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97        // Take exclusive lock to make sure
98        // no other thread can create a second gossip subscription
99        // after we check that it does not exist and before we create a new one.
100        // Otherwise we would receive every message twice or more times.
101        let mut iroh_channels = self.iroh_channels.write().await;
102
103        if iroh_channels.contains_key(&topic) {
104            return Ok(None);
105        }
106
107        let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108        let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110        info!(
111            ctx,
112            "IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
113        );
114
115        // Inform iroh of potentially new node addresses
116        for node_addr in &peers {
117            if !node_addr.is_empty() {
118                self.router.endpoint().add_node_addr(node_addr.clone())?;
119            }
120        }
121
122        let (join_tx, join_rx) = oneshot::channel();
123
124        let (gossip_sender, gossip_receiver) = self
125            .gossip
126            .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127            .split();
128
129        let ctx = ctx.clone();
130        let subscribe_loop = tokio::spawn(async move {
131            if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132                warn!(ctx, "subscribe_loop failed: {e}")
133            }
134        });
135
136        iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138        Ok(Some(join_rx))
139    }
140
141    /// Add gossip peer to realtime channel if it is already active.
142    pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
143        if self.iroh_channels.read().await.get(&topic).is_some() {
144            self.router.endpoint().add_node_addr(peer.clone())?;
145            self.gossip.subscribe(topic, vec![peer.node_id])?;
146        }
147        Ok(())
148    }
149
150    /// Send realtime data to the gossip swarm.
151    pub async fn send_webxdc_realtime_data(
152        &self,
153        ctx: &Context,
154        msg_id: MsgId,
155        mut data: Vec<u8>,
156    ) -> Result<()> {
157        let topic = get_iroh_topic_for_msg(ctx, msg_id)
158            .await?
159            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
160        self.join_and_subscribe_gossip(ctx, msg_id).await?;
161
162        let seq_num = self.get_and_incr(&topic);
163
164        let mut iroh_channels = self.iroh_channels.write().await;
165        let state = iroh_channels
166            .get_mut(&topic)
167            .context("Just created state does not exist")?;
168        data.extend(seq_num.to_le_bytes());
169        data.extend(self.public_key.as_bytes());
170
171        state.sender.broadcast(data.into()).await?;
172
173        if env::var("REALTIME_DEBUG").is_ok() {
174            info!(ctx, "Sent realtime data");
175        }
176
177        Ok(())
178    }
179
180    fn get_and_incr(&self, topic: &TopicId) -> i32 {
181        let mut sequence_numbers = self.sequence_numbers.lock();
182        let entry = sequence_numbers.entry(*topic).or_default();
183        *entry = entry.wrapping_add(1);
184        *entry
185    }
186
187    /// Get the iroh [NodeAddr] without direct IP addresses.
188    ///
189    /// The address is guaranteed to have home relay URL set
190    /// as it is the only way to reach the node
191    /// without global discovery mechanisms.
192    pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
193        let mut addr = self.router.endpoint().node_addr().await?;
194        addr.direct_addresses = BTreeSet::new();
195        debug_assert!(addr.relay_url().is_some());
196        Ok(addr)
197    }
198
199    /// Leave the realtime channel for a given topic.
200    pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
201        if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
202            // Dropping the last GossipTopic results in quitting the topic.
203            // It is split into GossipReceiver and GossipSender.
204            // GossipSender (`channel.sender`) is dropped automatically.
205
206            // Subscribe loop owns GossipReceiver.
207            // Aborting it and waiting for it to be dropped
208            // drops the receiver.
209            channel.subscribe_loop.abort();
210            let _ = channel.subscribe_loop.await;
211        }
212        Ok(())
213    }
214}
215
216/// Single gossip channel state.
217#[derive(Debug)]
218pub(crate) struct ChannelState {
219    /// The subscribe loop handle.
220    subscribe_loop: JoinHandle<()>,
221
222    sender: iroh_gossip::net::GossipSender,
223}
224
225impl ChannelState {
226    fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
227        Self {
228            subscribe_loop,
229            sender,
230        }
231    }
232}
233
234impl Context {
235    /// Create iroh endpoint and gossip.
236    async fn init_peer_channels(&self) -> Result<Iroh> {
237        info!(self, "Initializing peer channels.");
238        let secret_key = SecretKey::generate(rand_old::rngs::OsRng);
239        let public_key = secret_key.public();
240
241        let relay_mode = if let Some(relay_url) = self
242            .metadata
243            .read()
244            .await
245            .as_ref()
246            .and_then(|conf| conf.iroh_relay.clone())
247        {
248            RelayMode::Custom(RelayUrl::from(relay_url).into())
249        } else {
250            // FIXME: this should be RelayMode::Disabled instead.
251            // Currently using default relays because otherwise Rust tests fail.
252            RelayMode::Default
253        };
254
255        let endpoint = Endpoint::builder()
256            .tls_x509() // For compatibility with iroh <0.34.0
257            .secret_key(secret_key)
258            .alpns(vec![GOSSIP_ALPN.to_vec()])
259            .relay_mode(relay_mode)
260            .bind()
261            .await?;
262
263        // create gossip
264        // Allow messages up to 128 KB in size.
265        // We set the limit to 128 KiB to account for internal overhead,
266        // but only guarantee 128 KB of payload to WebXDC developers.
267
268        let gossip = Gossip::builder()
269            .max_message_size(128 * 1024)
270            .spawn(endpoint.clone())
271            .await?;
272
273        let router = iroh::protocol::Router::builder(endpoint)
274            .accept(GOSSIP_ALPN, gossip.clone())
275            .spawn();
276
277        Ok(Iroh {
278            router,
279            gossip,
280            sequence_numbers: Mutex::new(HashMap::new()),
281            iroh_channels: RwLock::new(HashMap::new()),
282            public_key,
283        })
284    }
285
286    /// Returns [`None`] if the peer channels has not been initialized.
287    pub async fn get_peer_channels(&self) -> Option<tokio::sync::RwLockReadGuard<'_, Iroh>> {
288        tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
289            self.iroh.read().await,
290            |opt_iroh| opt_iroh.as_ref(),
291        )
292        .ok()
293    }
294
295    /// Get or initialize the iroh peer channel.
296    pub async fn get_or_try_init_peer_channel(
297        &self,
298    ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
299        if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
300            bail!("Attempt to initialize Iroh when realtime is disabled");
301        }
302
303        if let Some(lock) = self.get_peer_channels().await {
304            return Ok(lock);
305        }
306
307        let lock = self.iroh.write().await;
308        match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
309            lock,
310            |opt_iroh| opt_iroh.as_ref(),
311        ) {
312            Ok(lock) => Ok(lock),
313            Err(mut lock) => {
314                let iroh = self.init_peer_channels().await?;
315                *lock = Some(iroh);
316                tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
317                    lock,
318                    |opt_iroh| opt_iroh.as_ref(),
319                )
320                .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
321            }
322        }
323    }
324
325    pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
326        if let Some(iroh) = &*self.iroh.read().await {
327            info!(
328                self,
329                "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
330            );
331            iroh.maybe_add_gossip_peer(topic, peer).await?;
332        }
333        Ok(())
334    }
335}
336
337/// Cache a peers [NodeId] for one topic.
338pub(crate) async fn iroh_add_peer_for_topic(
339    ctx: &Context,
340    msg_id: MsgId,
341    topic: TopicId,
342    peer: NodeId,
343    relay_server: Option<&str>,
344) -> Result<()> {
345    ctx.sql
346        .execute(
347            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
348            (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
349        )
350        .await?;
351    Ok(())
352}
353
354/// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`.
355pub async fn add_gossip_peer_from_header(
356    context: &Context,
357    instance_id: MsgId,
358    node_addr: &str,
359) -> Result<()> {
360    if !context
361        .get_config_bool(Config::WebxdcRealtimeEnabled)
362        .await?
363    {
364        return Ok(());
365    }
366
367    let node_addr =
368        serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
369
370    info!(
371        context,
372        "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
373    );
374
375    context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
376        msg_id: instance_id,
377    });
378
379    let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
380        warn!(
381            context,
382            "Could not add iroh peer because {instance_id} has no topic."
383        );
384        return Ok(());
385    };
386
387    let node_id = node_addr.node_id;
388    let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
389    iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
390
391    context.maybe_add_gossip_peer(topic, node_addr).await?;
392    Ok(())
393}
394
395/// Insert topicId into the database so that we can use it to retrieve the topic.
396pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
397    ctx.sql
398        .execute(
399            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
400            (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
401        )
402        .await?;
403    Ok(())
404}
405
406/// Get a list of [NodeAddr]s for one webxdc.
407async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
408    ctx.sql
409        .query_map(
410            "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
411            (msg_id, PUBLIC_KEY_STUB),
412            |row| {
413                let key:  Vec<u8> = row.get(0)?;
414                let server: Option<String> = row.get(1)?;
415                Ok((key, server))
416            },
417            |g| {
418                g.map(|data| {
419                    let (key, server) = data?;
420                    let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
421                    let id = NodeId::from_bytes(&key.try_into()
422                    .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
423                    Ok::<_, anyhow::Error>(NodeAddr::from_parts(
424                        id, server, vec![]
425                    ))
426                })
427                .collect::<std::result::Result<Vec<_>, _>>()
428            },
429        )
430        .await
431}
432
433/// Get the topic for a given [MsgId].
434pub(crate) async fn get_iroh_topic_for_msg(
435    ctx: &Context,
436    msg_id: MsgId,
437) -> Result<Option<TopicId>> {
438    if let Some(bytes) = ctx
439        .sql
440        .query_get_value::<Vec<u8>>(
441            "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
442            (msg_id,),
443        )
444        .await
445        .context("Couldn't restore topic from db")?
446    {
447        let topic_id = TopicId::from_bytes(
448            bytes
449                .try_into()
450                .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
451        );
452        Ok(Some(topic_id))
453    } else {
454        Ok(None)
455    }
456}
457
458/// Send a gossip advertisement to the chat that [MsgId] belongs to.
459/// This method should be called from the frontend when `joinRealtimeChannel` is called.
460pub async fn send_webxdc_realtime_advertisement(
461    ctx: &Context,
462    msg_id: MsgId,
463) -> Result<Option<oneshot::Receiver<()>>> {
464    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
465        return Ok(None);
466    }
467
468    let iroh = ctx.get_or_try_init_peer_channel().await?;
469    let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
470
471    let webxdc = Message::load_from_db(ctx, msg_id).await?;
472    let mut msg = Message::new(Viewtype::Text);
473    msg.hidden = true;
474    msg.param.set_cmd(SystemMessage::IrohNodeAddr);
475    msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
476    send_msg(ctx, webxdc.chat_id, &mut msg).await?;
477    info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
478    Ok(conn)
479}
480
481/// Send realtime data to other peers using iroh.
482pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
483    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
484        return Ok(());
485    }
486
487    let iroh = ctx.get_or_try_init_peer_channel().await?;
488    iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
489    Ok(())
490}
491
492/// Leave the gossip of the webxdc with given [MsgId].
493///
494/// NB: When this is called before closing a webxdc app in UIs, it must be guaranteed that
495/// `send_webxdc_realtime_*()` functions aren't called for the given `msg_id` anymore until the app
496/// is open again.
497pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
498    let Some(iroh) = ctx.get_peer_channels().await else {
499        return Ok(());
500    };
501    let Some(topic) = get_iroh_topic_for_msg(ctx, msg_id).await? else {
502        return Ok(());
503    };
504    iroh.leave_realtime(topic).await?;
505    info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
506
507    Ok(())
508}
509
510/// Creates a new random gossip topic.
511fn create_random_topic() -> TopicId {
512    TopicId::from_bytes(rand::random())
513}
514
515/// Creates `Iroh-Gossip-Header` with a new random topic
516/// and stores the topic for the message.
517pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
518    let topic = create_random_topic();
519    insert_topic_stub(ctx, msg_id, topic).await?;
520    let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
521    Ok(topic_string)
522}
523
524/// Converts `Iroh-Gossip-Header` contents to iroh topic ID.
525pub(crate) fn iroh_topic_from_str(topic: &str) -> Result<TopicId> {
526    let mut topic_raw = [0u8; 32];
527    BASE32_NOPAD
528        .decode_mut(topic.to_ascii_uppercase().as_bytes(), &mut topic_raw)
529        .map_err(|e| e.error)
530        .context("Wrong gossip topic header")?;
531
532    let topic = TopicId::from_bytes(topic_raw);
533    Ok(topic)
534}
535
536#[expect(clippy::arithmetic_side_effects)]
537async fn subscribe_loop(
538    context: &Context,
539    mut stream: iroh_gossip::net::GossipReceiver,
540    topic: TopicId,
541    msg_id: MsgId,
542    join_tx: oneshot::Sender<()>,
543) -> Result<()> {
544    let mut join_tx = Some(join_tx);
545
546    while let Some(event) = stream.try_next().await? {
547        match event {
548            Event::Gossip(event) => match event {
549                GossipEvent::Joined(nodes) => {
550                    if let Some(join_tx) = join_tx.take() {
551                        // Try to notify that at least one peer joined,
552                        // but ignore the error if receiver is dropped and nobody listens.
553                        join_tx.send(()).ok();
554                    }
555
556                    for node in nodes {
557                        iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
558                    }
559                }
560                GossipEvent::NeighborUp(node) => {
561                    info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
562                    iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
563                }
564                GossipEvent::NeighborDown(_node) => {}
565                GossipEvent::Received(message) => {
566                    info!(context, "IROH_REALTIME: Received realtime data");
567                    context.emit_event(EventType::WebxdcRealtimeData {
568                        msg_id,
569                        data: message
570                            .content
571                            .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
572                            .context("too few bytes in iroh message")?
573                            .into(),
574                    });
575                }
576            },
577            Event::Lagged => {
578                warn!(context, "Gossip lost some messages");
579            }
580        };
581    }
582    Ok(())
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use crate::{
589        EventType,
590        chat::{self, ChatId, add_contact_to_chat, resend_msgs, send_msg},
591        message::{Message, Viewtype},
592        test_utils::{TestContext, TestContextManager},
593    };
594
595    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
596    async fn test_can_communicate() {
597        let mut tcm = TestContextManager::new();
598        let alice = &mut tcm.alice().await;
599        let bob = &mut tcm.bob().await;
600
601        // Alice sends webxdc to bob
602        let alice_chat = alice.create_chat(bob).await;
603        let mut instance = Message::new(Viewtype::File);
604        instance
605            .set_file_from_bytes(
606                alice,
607                "minimal.xdc",
608                include_bytes!("../test-data/webxdc/minimal.xdc"),
609                None,
610            )
611            .unwrap();
612
613        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
614        let alice_webxdc = alice.get_last_msg().await;
615        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
616
617        let webxdc = alice.pop_sent_msg().await;
618        let bob_webxdc = bob.recv_msg(&webxdc).await;
619        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
620
621        bob_webxdc.chat_id.accept(bob).await.unwrap();
622
623        // Alice advertises herself.
624        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
625            .await
626            .unwrap();
627
628        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
629        loop {
630            let event = bob.evtracker.recv().await.unwrap();
631            if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
632                assert!(msg_id == bob_webxdc.id);
633                break;
634            }
635        }
636
637        // Bob adds alice to gossip peers.
638        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
639            .await
640            .unwrap()
641            .into_iter()
642            .map(|addr| addr.node_id)
643            .collect::<Vec<_>>();
644
645        assert_eq!(
646            members,
647            vec![
648                alice
649                    .get_or_try_init_peer_channel()
650                    .await
651                    .unwrap()
652                    .get_node_addr()
653                    .await
654                    .unwrap()
655                    .node_id
656            ]
657        );
658
659        bob.get_or_try_init_peer_channel()
660            .await
661            .unwrap()
662            .join_and_subscribe_gossip(bob, bob_webxdc.id)
663            .await
664            .unwrap()
665            .unwrap()
666            .await
667            .unwrap();
668
669        // Alice sends ephemeral message
670        alice
671            .get_or_try_init_peer_channel()
672            .await
673            .unwrap()
674            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
675            .await
676            .unwrap();
677
678        loop {
679            let event = bob.evtracker.recv().await.unwrap();
680            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
681                if data == "alice -> bob".as_bytes() {
682                    break;
683                } else {
684                    panic!(
685                        "Unexpected status update: {}",
686                        String::from_utf8_lossy(&data)
687                    );
688                }
689            }
690        }
691        // Bob sends ephemeral message
692        bob.get_or_try_init_peer_channel()
693            .await
694            .unwrap()
695            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
696            .await
697            .unwrap();
698
699        loop {
700            let event = alice.evtracker.recv().await.unwrap();
701            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
702                if data == "bob -> alice".as_bytes() {
703                    break;
704                } else {
705                    panic!(
706                        "Unexpected status update: {}",
707                        String::from_utf8_lossy(&data)
708                    );
709                }
710            }
711        }
712
713        // Alice adds bob to gossip peers.
714        let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
715            .await
716            .unwrap()
717            .into_iter()
718            .map(|addr| addr.node_id)
719            .collect::<Vec<_>>();
720
721        assert_eq!(
722            members,
723            vec![
724                bob.get_or_try_init_peer_channel()
725                    .await
726                    .unwrap()
727                    .get_node_addr()
728                    .await
729                    .unwrap()
730                    .node_id
731            ]
732        );
733
734        bob.get_or_try_init_peer_channel()
735            .await
736            .unwrap()
737            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
738            .await
739            .unwrap();
740
741        loop {
742            let event = alice.evtracker.recv().await.unwrap();
743            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
744                if data == "bob -> alice 2".as_bytes() {
745                    break;
746                } else {
747                    panic!(
748                        "Unexpected status update: {}",
749                        String::from_utf8_lossy(&data)
750                    );
751                }
752            }
753        }
754
755        // Calling stop_io() closes iroh endpoint as well,
756        // even though I/O was not started in this test.
757        assert!(alice.iroh.read().await.is_some());
758        alice.stop_io().await;
759        assert!(alice.iroh.read().await.is_none());
760    }
761
762    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
763    async fn test_can_reconnect() {
764        let mut tcm = TestContextManager::new();
765        let alice = &mut tcm.alice().await;
766        let bob = &mut tcm.bob().await;
767
768        assert!(
769            alice
770                .get_config_bool(Config::WebxdcRealtimeEnabled)
771                .await
772                .unwrap()
773        );
774        // Alice sends webxdc to bob
775        let alice_chat = alice.create_chat(bob).await;
776        let mut instance = Message::new(Viewtype::File);
777        instance
778            .set_file_from_bytes(
779                alice,
780                "minimal.xdc",
781                include_bytes!("../test-data/webxdc/minimal.xdc"),
782                None,
783            )
784            .unwrap();
785
786        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
787        let alice_webxdc = alice.get_last_msg().await;
788        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
789
790        let webxdc = alice.pop_sent_msg().await;
791        let bob_webxdc = bob.recv_msg(&webxdc).await;
792        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
793
794        bob_webxdc.chat_id.accept(bob).await.unwrap();
795
796        // Alice advertises herself.
797        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
798            .await
799            .unwrap();
800
801        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
802
803        // Bob adds alice to gossip peers.
804        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
805            .await
806            .unwrap()
807            .into_iter()
808            .map(|addr| addr.node_id)
809            .collect::<Vec<_>>();
810
811        assert_eq!(
812            members,
813            vec![
814                alice
815                    .get_or_try_init_peer_channel()
816                    .await
817                    .unwrap()
818                    .get_node_addr()
819                    .await
820                    .unwrap()
821                    .node_id
822            ]
823        );
824
825        bob.get_or_try_init_peer_channel()
826            .await
827            .unwrap()
828            .join_and_subscribe_gossip(bob, bob_webxdc.id)
829            .await
830            .unwrap()
831            .unwrap()
832            .await
833            .unwrap();
834
835        // Alice sends ephemeral message
836        alice
837            .get_or_try_init_peer_channel()
838            .await
839            .unwrap()
840            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
841            .await
842            .unwrap();
843
844        loop {
845            let event = bob.evtracker.recv().await.unwrap();
846            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
847                if data == "alice -> bob".as_bytes() {
848                    break;
849                } else {
850                    panic!(
851                        "Unexpected status update: {}",
852                        String::from_utf8_lossy(&data)
853                    );
854                }
855            }
856        }
857
858        let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
859            .await
860            .unwrap()
861            .unwrap();
862        let bob_sequence_number = bob
863            .iroh
864            .read()
865            .await
866            .as_ref()
867            .unwrap()
868            .sequence_numbers
869            .lock()
870            .get(&bob_topic)
871            .copied();
872        leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
873        let bob_sequence_number_after = bob
874            .iroh
875            .read()
876            .await
877            .as_ref()
878            .unwrap()
879            .sequence_numbers
880            .lock()
881            .get(&bob_topic)
882            .copied();
883        // Check that sequence number is persisted when leaving the channel.
884        assert_eq!(bob_sequence_number, bob_sequence_number_after);
885
886        bob.get_or_try_init_peer_channel()
887            .await
888            .unwrap()
889            .join_and_subscribe_gossip(bob, bob_webxdc.id)
890            .await
891            .unwrap()
892            .unwrap()
893            .await
894            .unwrap();
895
896        bob.get_or_try_init_peer_channel()
897            .await
898            .unwrap()
899            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
900            .await
901            .unwrap();
902
903        loop {
904            let event = alice.evtracker.recv().await.unwrap();
905            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
906                if data == "bob -> alice".as_bytes() {
907                    break;
908                } else {
909                    panic!(
910                        "Unexpected status update: {}",
911                        String::from_utf8_lossy(&data)
912                    );
913                }
914            }
915        }
916
917        // channel is only used to remember if an advertisement has been sent
918        // bob for example does not change the channels because he never sends an
919        // advertisement
920        assert_eq!(
921            alice
922                .iroh
923                .read()
924                .await
925                .as_ref()
926                .unwrap()
927                .iroh_channels
928                .read()
929                .await
930                .len(),
931            1
932        );
933        leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
934        let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
935            .await
936            .unwrap()
937            .unwrap();
938        assert!(
939            alice
940                .iroh
941                .read()
942                .await
943                .as_ref()
944                .unwrap()
945                .iroh_channels
946                .read()
947                .await
948                .get(&topic)
949                .is_none()
950        );
951    }
952
953    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
954    async fn test_parallel_connect() {
955        let mut tcm = TestContextManager::new();
956        let alice = &mut tcm.alice().await;
957        let bob = &mut tcm.bob().await;
958
959        let chat = alice.create_chat(bob).await.id;
960
961        let mut instance = Message::new(Viewtype::File);
962        instance
963            .set_file_from_bytes(
964                alice,
965                "minimal.xdc",
966                include_bytes!("../test-data/webxdc/minimal.xdc"),
967                None,
968            )
969            .unwrap();
970        connect_alice_bob(alice, chat, &mut instance, bob).await
971    }
972
973    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
974    async fn test_webxdc_resend() {
975        let mut tcm = TestContextManager::new();
976        let alice = &mut tcm.alice().await;
977        let bob = &mut tcm.bob().await;
978        let group = chat::create_group(alice, "group chat").await.unwrap();
979
980        // Alice sends webxdc to bob
981        let mut instance = Message::new(Viewtype::File);
982        instance
983            .set_file_from_bytes(
984                alice,
985                "minimal.xdc",
986                include_bytes!("../test-data/webxdc/minimal.xdc"),
987                None,
988            )
989            .unwrap();
990
991        add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
992            .await
993            .unwrap();
994
995        connect_alice_bob(alice, group, &mut instance, bob).await;
996
997        // fiona joins late
998        let fiona = &mut tcm.fiona().await;
999
1000        add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
1001            .await
1002            .unwrap();
1003
1004        resend_msgs(alice, &[instance.id]).await.unwrap();
1005        let msg = alice.pop_sent_msg().await;
1006        let fiona_instance = fiona.recv_msg(&msg).await;
1007        fiona_instance.chat_id.accept(fiona).await.unwrap();
1008        assert!(fiona.ctx.iroh.read().await.is_none());
1009
1010        let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
1011            .await
1012            .unwrap()
1013            .unwrap();
1014        let fiona_advert = fiona.pop_sent_msg().await;
1015        alice.recv_msg_trash(&fiona_advert).await;
1016
1017        fiona_connect_future.await.unwrap();
1018
1019        let realtime_send_loop = async {
1020            // Keep sending in a loop because right after joining
1021            // Fiona may miss messages.
1022            loop {
1023                send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
1024                    .await
1025                    .unwrap();
1026                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1027            }
1028        };
1029
1030        let realtime_receive_loop = async {
1031            loop {
1032                let event = fiona.evtracker.recv().await.unwrap();
1033                if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1034                    if data == b"alice -> bob & fiona" {
1035                        break;
1036                    } else {
1037                        panic!(
1038                            "Unexpected status update: {}",
1039                            String::from_utf8_lossy(&data)
1040                        );
1041                    }
1042                }
1043            }
1044        };
1045        tokio::select!(
1046            _ = realtime_send_loop => {
1047                panic!("Send loop should never finish");
1048            },
1049            _ = realtime_receive_loop => {
1050                return;
1051            }
1052        );
1053    }
1054
1055    async fn connect_alice_bob(
1056        alice: &mut TestContext,
1057        alice_chat_id: ChatId,
1058        instance: &mut Message,
1059        bob: &mut TestContext,
1060    ) {
1061        send_msg(alice, alice_chat_id, instance).await.unwrap();
1062        let alice_webxdc = alice.get_last_msg().await;
1063
1064        let webxdc = alice.pop_sent_msg().await;
1065        let bob_webxdc = bob.recv_msg(&webxdc).await;
1066        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
1067
1068        bob_webxdc.chat_id.accept(bob).await.unwrap();
1069
1070        eprintln!("Sending advertisements");
1071        // Alice advertises herself.
1072        let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
1073            .await
1074            .unwrap()
1075            .unwrap();
1076        let alice_advertisement = alice.pop_sent_msg().await;
1077
1078        let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
1079            .await
1080            .unwrap()
1081            .unwrap();
1082        let bob_advertisement = bob.pop_sent_msg().await;
1083
1084        eprintln!("Receiving advertisements");
1085        bob.recv_msg_trash(&alice_advertisement).await;
1086        alice.recv_msg_trash(&bob_advertisement).await;
1087
1088        eprintln!("Alice and Bob wait for connection");
1089        alice_advertisement_future.await.unwrap();
1090        bob_advertisement_future.await.unwrap();
1091
1092        // Alice sends ephemeral message
1093        eprintln!("Sending ephemeral message");
1094        send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
1095            .await
1096            .unwrap();
1097
1098        eprintln!("Waiting for ephemeral message");
1099        loop {
1100            let event = bob.evtracker.recv().await.unwrap();
1101            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1102                if data == b"alice -> bob" {
1103                    break;
1104                } else {
1105                    panic!(
1106                        "Unexpected status update: {}",
1107                        String::from_utf8_lossy(&data)
1108                    );
1109                }
1110            }
1111        }
1112    }
1113
1114    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1115    async fn test_peer_channels_disabled() {
1116        let mut tcm = TestContextManager::new();
1117        let alice = &mut tcm.alice().await;
1118
1119        alice
1120            .set_config_bool(Config::WebxdcRealtimeEnabled, false)
1121            .await
1122            .unwrap();
1123
1124        // creates iroh endpoint as side effect
1125        send_webxdc_realtime_advertisement(alice, MsgId::new(1))
1126            .await
1127            .unwrap();
1128
1129        assert!(alice.ctx.iroh.read().await.is_none());
1130
1131        // creates iroh endpoint as side effect
1132        send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1133            .await
1134            .unwrap();
1135
1136        assert!(alice.ctx.iroh.read().await.is_none());
1137
1138        leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1139
1140        assert!(alice.ctx.iroh.read().await.is_none());
1141
1142        // This internal function should return error
1143        // if accidentally called with the setting disabled.
1144        assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1145    }
1146
1147    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1148    async fn test_leave_webxdc_realtime_uninitialized() {
1149        let mut tcm = TestContextManager::new();
1150        let alice = &mut tcm.alice().await;
1151
1152        alice
1153            .set_config_bool(Config::WebxdcRealtimeEnabled, true)
1154            .await
1155            .unwrap();
1156
1157        assert!(alice.ctx.iroh.read().await.is_none());
1158        leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1159        assert!(alice.ctx.iroh.read().await.is_none());
1160    }
1161}