deltachat/
peer_channels.rs

1//! Peer channels for realtime communication in webxdcs.
2//!
3//! We use Iroh as an ephemeral peer channels provider to create direct communication
4//! channels between webxdcs. See [here](https://webxdc.org/docs/spec/joinRealtimeChannel.html) for the webxdc specs.
5//!
6//! Ephemeral channels should be established lazily, to avoid bootstrapping p2p connectivity
7//! when it's not required. Only when a webxdc subscribes to realtime data or when a reatlime message is sent,
8//! the p2p machinery should be started.
9//!
10//! Adding peer channels to webxdc needs upfront negotiation of a topic and sharing of public keys so that
11//! nodes can connect to each other. The explicit approach is as follows:
12//!
13//! 1. We introduce a new [`IrohGossipTopic`](crate::headerdef::HeaderDef::IrohGossipTopic) message header with a random 32-byte TopicId,
14//!    securely generated on the initial webxdc sender's device. This message header is encrypted
15//!    and sent in the same message as the webxdc application.
16//! 2. Whenever `joinRealtimeChannel().setListener()` or `joinRealtimeChannel().send()` is called by the webxdc application,
17//!    we start a routine to establish p2p connectivity and join the gossip swarm with Iroh.
18//! 3. The first step of this routine is to introduce yourself with a regular message containing the [`IrohNodeAddr`](crate::headerdef::HeaderDef::IrohNodeAddr).
19//!    This message contains the users relay-server and public key.
20//!    Direct IP address is not included as this information can be persisted by email providers.
21//! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet).
22//! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database
23//!    (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()`
24//!    and `joinRealtimeChannel().send()` just like the other peers.
25
26use anyhow::{Context as _, Result, anyhow, bail};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{RwLock, oneshot};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::EventType;
40use crate::chat::send_msg;
41use crate::config::Config;
42use crate::context::Context;
43use crate::log::{info, warn};
44use crate::message::{Message, MsgId, Viewtype};
45use crate::mimeparser::SystemMessage;
46
47/// The length of an ed25519 `PublicKey`, in bytes.
48const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51/// Store Iroh peer channels for the context.
52#[derive(Debug)]
53pub struct Iroh {
54    /// Iroh router  needed for Iroh peer channels.
55    pub(crate) router: iroh::protocol::Router,
56
57    /// [Gossip] needed for Iroh peer channels.
58    pub(crate) gossip: Gossip,
59
60    /// Sequence numbers for gossip channels.
61    pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63    /// Topics for which an advertisement has already been sent.
64    pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66    /// Currently used Iroh public key.
67    ///
68    /// This is attached to every message to work around `iroh_gossip` deduplication.
69    pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73    /// Notify the endpoint that the network has changed.
74    pub(crate) async fn network_change(&self) {
75        self.router.endpoint().network_change().await
76    }
77
78    /// Closes the QUIC endpoint.
79    pub(crate) async fn close(self) -> Result<()> {
80        self.router.shutdown().await.context("Closing iroh failed")
81    }
82
83    /// Join a topic and create the subscriber loop for it.
84    ///
85    /// If there is no gossip, create it.
86    ///
87    /// The returned future resolves when the swarm becomes operational.
88    async fn join_and_subscribe_gossip(
89        &self,
90        ctx: &Context,
91        msg_id: MsgId,
92    ) -> Result<Option<oneshot::Receiver<()>>> {
93        let topic = get_iroh_topic_for_msg(ctx, msg_id)
94            .await?
95            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97        // Take exclusive lock to make sure
98        // no other thread can create a second gossip subscription
99        // after we check that it does not exist and before we create a new one.
100        // Otherwise we would receive every message twice or more times.
101        let mut iroh_channels = self.iroh_channels.write().await;
102
103        if iroh_channels.contains_key(&topic) {
104            return Ok(None);
105        }
106
107        let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108        let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110        info!(
111            ctx,
112            "IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
113        );
114
115        // Inform iroh of potentially new node addresses
116        for node_addr in &peers {
117            if !node_addr.is_empty() {
118                self.router.endpoint().add_node_addr(node_addr.clone())?;
119            }
120        }
121
122        let (join_tx, join_rx) = oneshot::channel();
123
124        let (gossip_sender, gossip_receiver) = self
125            .gossip
126            .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127            .split();
128
129        let ctx = ctx.clone();
130        let subscribe_loop = tokio::spawn(async move {
131            if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132                warn!(ctx, "subscribe_loop failed: {e}")
133            }
134        });
135
136        iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138        Ok(Some(join_rx))
139    }
140
141    /// Add gossip peer to realtime channel if it is already active.
142    pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
143        if self.iroh_channels.read().await.get(&topic).is_some() {
144            self.router.endpoint().add_node_addr(peer.clone())?;
145            self.gossip.subscribe(topic, vec![peer.node_id])?;
146        }
147        Ok(())
148    }
149
150    /// Send realtime data to the gossip swarm.
151    pub async fn send_webxdc_realtime_data(
152        &self,
153        ctx: &Context,
154        msg_id: MsgId,
155        mut data: Vec<u8>,
156    ) -> Result<()> {
157        let topic = get_iroh_topic_for_msg(ctx, msg_id)
158            .await?
159            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
160        self.join_and_subscribe_gossip(ctx, msg_id).await?;
161
162        let seq_num = self.get_and_incr(&topic);
163
164        let mut iroh_channels = self.iroh_channels.write().await;
165        let state = iroh_channels
166            .get_mut(&topic)
167            .context("Just created state does not exist")?;
168        data.extend(seq_num.to_le_bytes());
169        data.extend(self.public_key.as_bytes());
170
171        state.sender.broadcast(data.into()).await?;
172
173        if env::var("REALTIME_DEBUG").is_ok() {
174            info!(ctx, "Sent realtime data");
175        }
176
177        Ok(())
178    }
179
180    fn get_and_incr(&self, topic: &TopicId) -> i32 {
181        let mut sequence_numbers = self.sequence_numbers.lock();
182        let entry = sequence_numbers.entry(*topic).or_default();
183        *entry = entry.wrapping_add(1);
184        *entry
185    }
186
187    /// Get the iroh [NodeAddr] without direct IP addresses.
188    ///
189    /// The address is guaranteed to have home relay URL set
190    /// as it is the only way to reach the node
191    /// without global discovery mechanisms.
192    pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
193        let mut addr = self.router.endpoint().node_addr().await?;
194        addr.direct_addresses = BTreeSet::new();
195        debug_assert!(addr.relay_url().is_some());
196        Ok(addr)
197    }
198
199    /// Leave the realtime channel for a given topic.
200    pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
201        if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
202            // Dropping the last GossipTopic results in quitting the topic.
203            // It is split into GossipReceiver and GossipSender.
204            // GossipSender (`channel.sender`) is dropped automatically.
205
206            // Subscribe loop owns GossipReceiver.
207            // Aborting it and waiting for it to be dropped
208            // drops the receiver.
209            channel.subscribe_loop.abort();
210            let _ = channel.subscribe_loop.await;
211        }
212        Ok(())
213    }
214}
215
216/// Single gossip channel state.
217#[derive(Debug)]
218pub(crate) struct ChannelState {
219    /// The subscribe loop handle.
220    subscribe_loop: JoinHandle<()>,
221
222    sender: iroh_gossip::net::GossipSender,
223}
224
225impl ChannelState {
226    fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
227        Self {
228            subscribe_loop,
229            sender,
230        }
231    }
232}
233
234impl Context {
235    /// Create iroh endpoint and gossip.
236    async fn init_peer_channels(&self) -> Result<Iroh> {
237        info!(self, "Initializing peer channels.");
238        let secret_key = SecretKey::generate(rand::rngs::OsRng);
239        let public_key = secret_key.public();
240
241        let relay_mode = if let Some(relay_url) = self
242            .metadata
243            .read()
244            .await
245            .as_ref()
246            .and_then(|conf| conf.iroh_relay.clone())
247        {
248            RelayMode::Custom(RelayUrl::from(relay_url).into())
249        } else {
250            // FIXME: this should be RelayMode::Disabled instead.
251            // Currently using default relays because otherwise Rust tests fail.
252            RelayMode::Default
253        };
254
255        let endpoint = Endpoint::builder()
256            .tls_x509() // For compatibility with iroh <0.34.0
257            .secret_key(secret_key)
258            .alpns(vec![GOSSIP_ALPN.to_vec()])
259            .relay_mode(relay_mode)
260            .bind()
261            .await?;
262
263        // create gossip
264        // Allow messages up to 128 KB in size.
265        // We set the limit to 128 KiB to account for internal overhead,
266        // but only guarantee 128 KB of payload to WebXDC developers.
267
268        let gossip = Gossip::builder()
269            .max_message_size(128 * 1024)
270            .spawn(endpoint.clone())
271            .await?;
272
273        let router = iroh::protocol::Router::builder(endpoint)
274            .accept(GOSSIP_ALPN, gossip.clone())
275            .spawn();
276
277        Ok(Iroh {
278            router,
279            gossip,
280            sequence_numbers: Mutex::new(HashMap::new()),
281            iroh_channels: RwLock::new(HashMap::new()),
282            public_key,
283        })
284    }
285
286    /// Returns [`None`] if the peer channels has not been initialized.
287    pub async fn get_peer_channels(&self) -> Option<tokio::sync::RwLockReadGuard<'_, Iroh>> {
288        tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
289            self.iroh.read().await,
290            |opt_iroh| opt_iroh.as_ref(),
291        )
292        .ok()
293    }
294
295    /// Get or initialize the iroh peer channel.
296    pub async fn get_or_try_init_peer_channel(
297        &self,
298    ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
299        if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
300            bail!("Attempt to initialize Iroh when realtime is disabled");
301        }
302
303        if let Some(lock) = self.get_peer_channels().await {
304            return Ok(lock);
305        }
306
307        let lock = self.iroh.write().await;
308        match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
309            lock,
310            |opt_iroh| opt_iroh.as_ref(),
311        ) {
312            Ok(lock) => Ok(lock),
313            Err(mut lock) => {
314                let iroh = self.init_peer_channels().await?;
315                *lock = Some(iroh);
316                tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
317                    lock,
318                    |opt_iroh| opt_iroh.as_ref(),
319                )
320                .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
321            }
322        }
323    }
324
325    pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
326        if let Some(iroh) = &*self.iroh.read().await {
327            info!(
328                self,
329                "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
330            );
331            iroh.maybe_add_gossip_peer(topic, peer).await?;
332        }
333        Ok(())
334    }
335}
336
337/// Cache a peers [NodeId] for one topic.
338pub(crate) async fn iroh_add_peer_for_topic(
339    ctx: &Context,
340    msg_id: MsgId,
341    topic: TopicId,
342    peer: NodeId,
343    relay_server: Option<&str>,
344) -> Result<()> {
345    ctx.sql
346        .execute(
347            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
348            (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
349        )
350        .await?;
351    Ok(())
352}
353
354/// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`.
355pub async fn add_gossip_peer_from_header(
356    context: &Context,
357    instance_id: MsgId,
358    node_addr: &str,
359) -> Result<()> {
360    if !context
361        .get_config_bool(Config::WebxdcRealtimeEnabled)
362        .await?
363    {
364        return Ok(());
365    }
366
367    let node_addr =
368        serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
369
370    info!(
371        context,
372        "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
373    );
374
375    context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
376        msg_id: instance_id,
377    });
378
379    let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
380        warn!(
381            context,
382            "Could not add iroh peer because {instance_id} has no topic."
383        );
384        return Ok(());
385    };
386
387    let node_id = node_addr.node_id;
388    let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
389    iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
390
391    context.maybe_add_gossip_peer(topic, node_addr).await?;
392    Ok(())
393}
394
395/// Insert topicId into the database so that we can use it to retrieve the topic.
396pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
397    ctx.sql
398        .execute(
399            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
400            (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
401        )
402        .await?;
403    Ok(())
404}
405
406/// Get a list of [NodeAddr]s for one webxdc.
407async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
408    ctx.sql
409        .query_map(
410            "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
411            (msg_id, PUBLIC_KEY_STUB),
412            |row| {
413                let key:  Vec<u8> = row.get(0)?;
414                let server: Option<String> = row.get(1)?;
415                Ok((key, server))
416            },
417            |g| {
418                g.map(|data| {
419                    let (key, server) = data?;
420                    let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
421                    let id = NodeId::from_bytes(&key.try_into()
422                    .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
423                    Ok::<_, anyhow::Error>(NodeAddr::from_parts(
424                        id, server, vec![]
425                    ))
426                })
427                .collect::<std::result::Result<Vec<_>, _>>()
428            },
429        )
430        .await
431}
432
433/// Get the topic for a given [MsgId].
434pub(crate) async fn get_iroh_topic_for_msg(
435    ctx: &Context,
436    msg_id: MsgId,
437) -> Result<Option<TopicId>> {
438    if let Some(bytes) = ctx
439        .sql
440        .query_get_value::<Vec<u8>>(
441            "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
442            (msg_id,),
443        )
444        .await
445        .context("Couldn't restore topic from db")?
446    {
447        let topic_id = TopicId::from_bytes(
448            bytes
449                .try_into()
450                .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
451        );
452        Ok(Some(topic_id))
453    } else {
454        Ok(None)
455    }
456}
457
458/// Send a gossip advertisement to the chat that [MsgId] belongs to.
459/// This method should be called from the frontend when `joinRealtimeChannel` is called.
460pub async fn send_webxdc_realtime_advertisement(
461    ctx: &Context,
462    msg_id: MsgId,
463) -> Result<Option<oneshot::Receiver<()>>> {
464    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
465        return Ok(None);
466    }
467
468    let iroh = ctx.get_or_try_init_peer_channel().await?;
469    let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
470
471    let webxdc = Message::load_from_db(ctx, msg_id).await?;
472    let mut msg = Message::new(Viewtype::Text);
473    msg.hidden = true;
474    msg.param.set_cmd(SystemMessage::IrohNodeAddr);
475    msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
476    send_msg(ctx, webxdc.chat_id, &mut msg).await?;
477    info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
478    Ok(conn)
479}
480
481/// Send realtime data to other peers using iroh.
482pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
483    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
484        return Ok(());
485    }
486
487    let iroh = ctx.get_or_try_init_peer_channel().await?;
488    iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
489    Ok(())
490}
491
492/// Leave the gossip of the webxdc with given [MsgId].
493///
494/// NB: When this is called before closing a webxdc app in UIs, it must be guaranteed that
495/// `send_webxdc_realtime_*()` functions aren't called for the given `msg_id` anymore until the app
496/// is open again.
497pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
498    let Some(iroh) = ctx.get_peer_channels().await else {
499        return Ok(());
500    };
501    let Some(topic) = get_iroh_topic_for_msg(ctx, msg_id).await? else {
502        return Ok(());
503    };
504    iroh.leave_realtime(topic).await?;
505    info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
506
507    Ok(())
508}
509
510/// Creates a new random gossip topic.
511fn create_random_topic() -> TopicId {
512    TopicId::from_bytes(rand::random())
513}
514
515/// Creates `Iroh-Gossip-Header` with a new random topic
516/// and stores the topic for the message.
517pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
518    let topic = create_random_topic();
519    insert_topic_stub(ctx, msg_id, topic).await?;
520    let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
521    Ok(topic_string)
522}
523
524async fn subscribe_loop(
525    context: &Context,
526    mut stream: iroh_gossip::net::GossipReceiver,
527    topic: TopicId,
528    msg_id: MsgId,
529    join_tx: oneshot::Sender<()>,
530) -> Result<()> {
531    let mut join_tx = Some(join_tx);
532
533    while let Some(event) = stream.try_next().await? {
534        match event {
535            Event::Gossip(event) => match event {
536                GossipEvent::Joined(nodes) => {
537                    if let Some(join_tx) = join_tx.take() {
538                        // Try to notify that at least one peer joined,
539                        // but ignore the error if receiver is dropped and nobody listens.
540                        join_tx.send(()).ok();
541                    }
542
543                    for node in nodes {
544                        iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
545                    }
546                }
547                GossipEvent::NeighborUp(node) => {
548                    info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
549                    iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
550                }
551                GossipEvent::NeighborDown(_node) => {}
552                GossipEvent::Received(message) => {
553                    info!(context, "IROH_REALTIME: Received realtime data");
554                    context.emit_event(EventType::WebxdcRealtimeData {
555                        msg_id,
556                        data: message
557                            .content
558                            .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
559                            .context("too few bytes in iroh message")?
560                            .into(),
561                    });
562                }
563            },
564            Event::Lagged => {
565                warn!(context, "Gossip lost some messages");
566            }
567        };
568    }
569    Ok(())
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use crate::{
576        EventType,
577        chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg},
578        message::{Message, Viewtype},
579        test_utils::{TestContext, TestContextManager},
580    };
581
582    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
583    async fn test_can_communicate() {
584        let mut tcm = TestContextManager::new();
585        let alice = &mut tcm.alice().await;
586        let bob = &mut tcm.bob().await;
587
588        // Alice sends webxdc to bob
589        let alice_chat = alice.create_chat(bob).await;
590        let mut instance = Message::new(Viewtype::File);
591        instance
592            .set_file_from_bytes(
593                alice,
594                "minimal.xdc",
595                include_bytes!("../test-data/webxdc/minimal.xdc"),
596                None,
597            )
598            .unwrap();
599
600        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
601        let alice_webxdc = alice.get_last_msg().await;
602        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
603
604        let webxdc = alice.pop_sent_msg().await;
605        let bob_webxdc = bob.recv_msg(&webxdc).await;
606        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
607
608        bob_webxdc.chat_id.accept(bob).await.unwrap();
609
610        // Alice advertises herself.
611        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
612            .await
613            .unwrap();
614
615        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
616        loop {
617            let event = bob.evtracker.recv().await.unwrap();
618            if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
619                assert!(msg_id == alice_webxdc.id);
620                break;
621            }
622        }
623
624        // Bob adds alice to gossip peers.
625        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
626            .await
627            .unwrap()
628            .into_iter()
629            .map(|addr| addr.node_id)
630            .collect::<Vec<_>>();
631
632        assert_eq!(
633            members,
634            vec![
635                alice
636                    .get_or_try_init_peer_channel()
637                    .await
638                    .unwrap()
639                    .get_node_addr()
640                    .await
641                    .unwrap()
642                    .node_id
643            ]
644        );
645
646        bob.get_or_try_init_peer_channel()
647            .await
648            .unwrap()
649            .join_and_subscribe_gossip(bob, bob_webxdc.id)
650            .await
651            .unwrap()
652            .unwrap()
653            .await
654            .unwrap();
655
656        // Alice sends ephemeral message
657        alice
658            .get_or_try_init_peer_channel()
659            .await
660            .unwrap()
661            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
662            .await
663            .unwrap();
664
665        loop {
666            let event = bob.evtracker.recv().await.unwrap();
667            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
668                if data == "alice -> bob".as_bytes() {
669                    break;
670                } else {
671                    panic!(
672                        "Unexpected status update: {}",
673                        String::from_utf8_lossy(&data)
674                    );
675                }
676            }
677        }
678        // Bob sends ephemeral message
679        bob.get_or_try_init_peer_channel()
680            .await
681            .unwrap()
682            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
683            .await
684            .unwrap();
685
686        loop {
687            let event = alice.evtracker.recv().await.unwrap();
688            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
689                if data == "bob -> alice".as_bytes() {
690                    break;
691                } else {
692                    panic!(
693                        "Unexpected status update: {}",
694                        String::from_utf8_lossy(&data)
695                    );
696                }
697            }
698        }
699
700        // Alice adds bob to gossip peers.
701        let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
702            .await
703            .unwrap()
704            .into_iter()
705            .map(|addr| addr.node_id)
706            .collect::<Vec<_>>();
707
708        assert_eq!(
709            members,
710            vec![
711                bob.get_or_try_init_peer_channel()
712                    .await
713                    .unwrap()
714                    .get_node_addr()
715                    .await
716                    .unwrap()
717                    .node_id
718            ]
719        );
720
721        bob.get_or_try_init_peer_channel()
722            .await
723            .unwrap()
724            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
725            .await
726            .unwrap();
727
728        loop {
729            let event = alice.evtracker.recv().await.unwrap();
730            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
731                if data == "bob -> alice 2".as_bytes() {
732                    break;
733                } else {
734                    panic!(
735                        "Unexpected status update: {}",
736                        String::from_utf8_lossy(&data)
737                    );
738                }
739            }
740        }
741
742        // Calling stop_io() closes iroh endpoint as well,
743        // even though I/O was not started in this test.
744        assert!(alice.iroh.read().await.is_some());
745        alice.stop_io().await;
746        assert!(alice.iroh.read().await.is_none());
747    }
748
749    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
750    async fn test_can_reconnect() {
751        let mut tcm = TestContextManager::new();
752        let alice = &mut tcm.alice().await;
753        let bob = &mut tcm.bob().await;
754
755        assert!(
756            alice
757                .get_config_bool(Config::WebxdcRealtimeEnabled)
758                .await
759                .unwrap()
760        );
761        // Alice sends webxdc to bob
762        let alice_chat = alice.create_chat(bob).await;
763        let mut instance = Message::new(Viewtype::File);
764        instance
765            .set_file_from_bytes(
766                alice,
767                "minimal.xdc",
768                include_bytes!("../test-data/webxdc/minimal.xdc"),
769                None,
770            )
771            .unwrap();
772
773        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
774        let alice_webxdc = alice.get_last_msg().await;
775        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
776
777        let webxdc = alice.pop_sent_msg().await;
778        let bob_webxdc = bob.recv_msg(&webxdc).await;
779        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
780
781        bob_webxdc.chat_id.accept(bob).await.unwrap();
782
783        // Alice advertises herself.
784        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
785            .await
786            .unwrap();
787
788        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
789
790        // Bob adds alice to gossip peers.
791        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
792            .await
793            .unwrap()
794            .into_iter()
795            .map(|addr| addr.node_id)
796            .collect::<Vec<_>>();
797
798        assert_eq!(
799            members,
800            vec![
801                alice
802                    .get_or_try_init_peer_channel()
803                    .await
804                    .unwrap()
805                    .get_node_addr()
806                    .await
807                    .unwrap()
808                    .node_id
809            ]
810        );
811
812        bob.get_or_try_init_peer_channel()
813            .await
814            .unwrap()
815            .join_and_subscribe_gossip(bob, bob_webxdc.id)
816            .await
817            .unwrap()
818            .unwrap()
819            .await
820            .unwrap();
821
822        // Alice sends ephemeral message
823        alice
824            .get_or_try_init_peer_channel()
825            .await
826            .unwrap()
827            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
828            .await
829            .unwrap();
830
831        loop {
832            let event = bob.evtracker.recv().await.unwrap();
833            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
834                if data == "alice -> bob".as_bytes() {
835                    break;
836                } else {
837                    panic!(
838                        "Unexpected status update: {}",
839                        String::from_utf8_lossy(&data)
840                    );
841                }
842            }
843        }
844
845        let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
846            .await
847            .unwrap()
848            .unwrap();
849        let bob_sequence_number = bob
850            .iroh
851            .read()
852            .await
853            .as_ref()
854            .unwrap()
855            .sequence_numbers
856            .lock()
857            .get(&bob_topic)
858            .copied();
859        leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
860        let bob_sequence_number_after = bob
861            .iroh
862            .read()
863            .await
864            .as_ref()
865            .unwrap()
866            .sequence_numbers
867            .lock()
868            .get(&bob_topic)
869            .copied();
870        // Check that sequence number is persisted when leaving the channel.
871        assert_eq!(bob_sequence_number, bob_sequence_number_after);
872
873        bob.get_or_try_init_peer_channel()
874            .await
875            .unwrap()
876            .join_and_subscribe_gossip(bob, bob_webxdc.id)
877            .await
878            .unwrap()
879            .unwrap()
880            .await
881            .unwrap();
882
883        bob.get_or_try_init_peer_channel()
884            .await
885            .unwrap()
886            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
887            .await
888            .unwrap();
889
890        loop {
891            let event = alice.evtracker.recv().await.unwrap();
892            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
893                if data == "bob -> alice".as_bytes() {
894                    break;
895                } else {
896                    panic!(
897                        "Unexpected status update: {}",
898                        String::from_utf8_lossy(&data)
899                    );
900                }
901            }
902        }
903
904        // channel is only used to remember if an advertisement has been sent
905        // bob for example does not change the channels because he never sends an
906        // advertisement
907        assert_eq!(
908            alice
909                .iroh
910                .read()
911                .await
912                .as_ref()
913                .unwrap()
914                .iroh_channels
915                .read()
916                .await
917                .len(),
918            1
919        );
920        leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
921        let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
922            .await
923            .unwrap()
924            .unwrap();
925        assert!(
926            alice
927                .iroh
928                .read()
929                .await
930                .as_ref()
931                .unwrap()
932                .iroh_channels
933                .read()
934                .await
935                .get(&topic)
936                .is_none()
937        );
938    }
939
940    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
941    async fn test_parallel_connect() {
942        let mut tcm = TestContextManager::new();
943        let alice = &mut tcm.alice().await;
944        let bob = &mut tcm.bob().await;
945
946        let chat = alice.create_chat(bob).await.id;
947
948        let mut instance = Message::new(Viewtype::File);
949        instance
950            .set_file_from_bytes(
951                alice,
952                "minimal.xdc",
953                include_bytes!("../test-data/webxdc/minimal.xdc"),
954                None,
955            )
956            .unwrap();
957        connect_alice_bob(alice, chat, &mut instance, bob).await
958    }
959
960    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
961    async fn test_webxdc_resend() {
962        let mut tcm = TestContextManager::new();
963        let alice = &mut tcm.alice().await;
964        let bob = &mut tcm.bob().await;
965        let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat")
966            .await
967            .unwrap();
968
969        // Alice sends webxdc to bob
970        let mut instance = Message::new(Viewtype::File);
971        instance
972            .set_file_from_bytes(
973                alice,
974                "minimal.xdc",
975                include_bytes!("../test-data/webxdc/minimal.xdc"),
976                None,
977            )
978            .unwrap();
979
980        add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
981            .await
982            .unwrap();
983
984        connect_alice_bob(alice, group, &mut instance, bob).await;
985
986        // fiona joins late
987        let fiona = &mut tcm.fiona().await;
988
989        add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
990            .await
991            .unwrap();
992
993        resend_msgs(alice, &[instance.id]).await.unwrap();
994        let msg = alice.pop_sent_msg().await;
995        let fiona_instance = fiona.recv_msg(&msg).await;
996        fiona_instance.chat_id.accept(fiona).await.unwrap();
997        assert!(fiona.ctx.iroh.read().await.is_none());
998
999        let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
1000            .await
1001            .unwrap()
1002            .unwrap();
1003        let fiona_advert = fiona.pop_sent_msg().await;
1004        alice.recv_msg_trash(&fiona_advert).await;
1005
1006        fiona_connect_future.await.unwrap();
1007
1008        let realtime_send_loop = async {
1009            // Keep sending in a loop because right after joining
1010            // Fiona may miss messages.
1011            loop {
1012                send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
1013                    .await
1014                    .unwrap();
1015                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1016            }
1017        };
1018
1019        let realtime_receive_loop = async {
1020            loop {
1021                let event = fiona.evtracker.recv().await.unwrap();
1022                if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1023                    if data == b"alice -> bob & fiona" {
1024                        break;
1025                    } else {
1026                        panic!(
1027                            "Unexpected status update: {}",
1028                            String::from_utf8_lossy(&data)
1029                        );
1030                    }
1031                }
1032            }
1033        };
1034        tokio::select!(
1035            _ = realtime_send_loop => {
1036                panic!("Send loop should never finish");
1037            },
1038            _ = realtime_receive_loop => {
1039                return;
1040            }
1041        );
1042    }
1043
1044    async fn connect_alice_bob(
1045        alice: &mut TestContext,
1046        alice_chat_id: ChatId,
1047        instance: &mut Message,
1048        bob: &mut TestContext,
1049    ) {
1050        send_msg(alice, alice_chat_id, instance).await.unwrap();
1051        let alice_webxdc = alice.get_last_msg().await;
1052
1053        let webxdc = alice.pop_sent_msg().await;
1054        let bob_webxdc = bob.recv_msg(&webxdc).await;
1055        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
1056
1057        bob_webxdc.chat_id.accept(bob).await.unwrap();
1058
1059        eprintln!("Sending advertisements");
1060        // Alice advertises herself.
1061        let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
1062            .await
1063            .unwrap()
1064            .unwrap();
1065        let alice_advertisement = alice.pop_sent_msg().await;
1066
1067        let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
1068            .await
1069            .unwrap()
1070            .unwrap();
1071        let bob_advertisement = bob.pop_sent_msg().await;
1072
1073        eprintln!("Receiving advertisements");
1074        bob.recv_msg_trash(&alice_advertisement).await;
1075        alice.recv_msg_trash(&bob_advertisement).await;
1076
1077        eprintln!("Alice and Bob wait for connection");
1078        alice_advertisement_future.await.unwrap();
1079        bob_advertisement_future.await.unwrap();
1080
1081        // Alice sends ephemeral message
1082        eprintln!("Sending ephemeral message");
1083        send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
1084            .await
1085            .unwrap();
1086
1087        eprintln!("Waiting for ephemeral message");
1088        loop {
1089            let event = bob.evtracker.recv().await.unwrap();
1090            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1091                if data == b"alice -> bob" {
1092                    break;
1093                } else {
1094                    panic!(
1095                        "Unexpected status update: {}",
1096                        String::from_utf8_lossy(&data)
1097                    );
1098                }
1099            }
1100        }
1101    }
1102
1103    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1104    async fn test_peer_channels_disabled() {
1105        let mut tcm = TestContextManager::new();
1106        let alice = &mut tcm.alice().await;
1107
1108        alice
1109            .set_config_bool(Config::WebxdcRealtimeEnabled, false)
1110            .await
1111            .unwrap();
1112
1113        // creates iroh endpoint as side effect
1114        send_webxdc_realtime_advertisement(alice, MsgId::new(1))
1115            .await
1116            .unwrap();
1117
1118        assert!(alice.ctx.iroh.read().await.is_none());
1119
1120        // creates iroh endpoint as side effect
1121        send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1122            .await
1123            .unwrap();
1124
1125        assert!(alice.ctx.iroh.read().await.is_none());
1126
1127        leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1128
1129        assert!(alice.ctx.iroh.read().await.is_none());
1130
1131        // This internal function should return error
1132        // if accidentally called with the setting disabled.
1133        assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1134    }
1135
1136    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1137    async fn test_leave_webxdc_realtime_uninitialized() {
1138        let mut tcm = TestContextManager::new();
1139        let alice = &mut tcm.alice().await;
1140
1141        alice
1142            .set_config_bool(Config::WebxdcRealtimeEnabled, true)
1143            .await
1144            .unwrap();
1145
1146        assert!(alice.ctx.iroh.read().await.is_none());
1147        leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1148        assert!(alice.ctx.iroh.read().await.is_none());
1149    }
1150}