deltachat/
peer_channels.rs

1//! Peer channels for realtime communication in webxdcs.
2//!
3//! We use Iroh as an ephemeral peer channels provider to create direct communication
4//! channels between webxdcs. See [here](https://webxdc.org/docs/spec/joinRealtimeChannel.html) for the webxdc specs.
5//!
6//! Ephemeral channels should be established lazily, to avoid bootstrapping p2p connectivity
7//! when it's not required. Only when a webxdc subscribes to realtime data or when a reatlime message is sent,
8//! the p2p machinery should be started.
9//!
10//! Adding peer channels to webxdc needs upfront negotiation of a topic and sharing of public keys so that
11//! nodes can connect to each other. The explicit approach is as follows:
12//!
13//! 1. We introduce a new [`IrohGossipTopic`](crate::headerdef::HeaderDef::IrohGossipTopic) message header with a random 32-byte TopicId,
14//!    securely generated on the initial webxdc sender's device. This message header is encrypted
15//!    and sent in the same message as the webxdc application.
16//! 2. Whenever `joinRealtimeChannel().setListener()` or `joinRealtimeChannel().send()` is called by the webxdc application,
17//!    we start a routine to establish p2p connectivity and join the gossip swarm with Iroh.
18//! 3. The first step of this routine is to introduce yourself with a regular message containing the [`IrohNodeAddr`](crate::headerdef::HeaderDef::IrohNodeAddr).
19//!    This message contains the users relay-server and public key.
20//!    Direct IP address is not included as this information can be persisted by email providers.
21//! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet).
22//! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database
23//!    (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()`
24//!    and `joinRealtimeChannel().send()` just like the other peers.
25
26use anyhow::{Context as _, Result, anyhow, bail};
27use data_encoding::BASE32_NOPAD;
28use futures_lite::StreamExt;
29use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
30use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
31use iroh_gossip::proto::TopicId;
32use parking_lot::Mutex;
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use tokio::sync::{RwLock, oneshot};
36use tokio::task::JoinHandle;
37use url::Url;
38
39use crate::EventType;
40use crate::chat::send_msg;
41use crate::config::Config;
42use crate::context::Context;
43use crate::log::{info, warn};
44use crate::message::{Message, MsgId, Viewtype};
45use crate::mimeparser::SystemMessage;
46
47/// The length of an ed25519 `PublicKey`, in bytes.
48const PUBLIC_KEY_LENGTH: usize = 32;
49const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
50
51/// Store Iroh peer channels for the context.
52#[derive(Debug)]
53pub struct Iroh {
54    /// Iroh router  needed for Iroh peer channels.
55    pub(crate) router: iroh::protocol::Router,
56
57    /// [Gossip] needed for Iroh peer channels.
58    pub(crate) gossip: Gossip,
59
60    /// Sequence numbers for gossip channels.
61    pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
62
63    /// Topics for which an advertisement has already been sent.
64    pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
65
66    /// Currently used Iroh public key.
67    ///
68    /// This is attached to every message to work around `iroh_gossip` deduplication.
69    pub(crate) public_key: PublicKey,
70}
71
72impl Iroh {
73    /// Notify the endpoint that the network has changed.
74    pub(crate) async fn network_change(&self) {
75        self.router.endpoint().network_change().await
76    }
77
78    /// Closes the QUIC endpoint.
79    pub(crate) async fn close(self) -> Result<()> {
80        self.router.shutdown().await.context("Closing iroh failed")
81    }
82
83    /// Join a topic and create the subscriber loop for it.
84    ///
85    /// If there is no gossip, create it.
86    ///
87    /// The returned future resolves when the swarm becomes operational.
88    async fn join_and_subscribe_gossip(
89        &self,
90        ctx: &Context,
91        msg_id: MsgId,
92    ) -> Result<Option<oneshot::Receiver<()>>> {
93        let topic = get_iroh_topic_for_msg(ctx, msg_id)
94            .await?
95            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
96
97        // Take exclusive lock to make sure
98        // no other thread can create a second gossip subscription
99        // after we check that it does not exist and before we create a new one.
100        // Otherwise we would receive every message twice or more times.
101        let mut iroh_channels = self.iroh_channels.write().await;
102
103        if iroh_channels.contains_key(&topic) {
104            return Ok(None);
105        }
106
107        let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
108        let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
109
110        info!(
111            ctx,
112            "IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
113        );
114
115        // Inform iroh of potentially new node addresses
116        for node_addr in &peers {
117            if !node_addr.is_empty() {
118                self.router.endpoint().add_node_addr(node_addr.clone())?;
119            }
120        }
121
122        let (join_tx, join_rx) = oneshot::channel();
123
124        let (gossip_sender, gossip_receiver) = self
125            .gossip
126            .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
127            .split();
128
129        let ctx = ctx.clone();
130        let subscribe_loop = tokio::spawn(async move {
131            if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
132                warn!(ctx, "subscribe_loop failed: {e}")
133            }
134        });
135
136        iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
137
138        Ok(Some(join_rx))
139    }
140
141    /// Add gossip peer to realtime channel if it is already active.
142    pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
143        if self.iroh_channels.read().await.get(&topic).is_some() {
144            self.router.endpoint().add_node_addr(peer.clone())?;
145            self.gossip.subscribe(topic, vec![peer.node_id])?;
146        }
147        Ok(())
148    }
149
150    /// Send realtime data to the gossip swarm.
151    pub async fn send_webxdc_realtime_data(
152        &self,
153        ctx: &Context,
154        msg_id: MsgId,
155        mut data: Vec<u8>,
156    ) -> Result<()> {
157        let topic = get_iroh_topic_for_msg(ctx, msg_id)
158            .await?
159            .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
160        self.join_and_subscribe_gossip(ctx, msg_id).await?;
161
162        let seq_num = self.get_and_incr(&topic);
163
164        let mut iroh_channels = self.iroh_channels.write().await;
165        let state = iroh_channels
166            .get_mut(&topic)
167            .context("Just created state does not exist")?;
168        data.extend(seq_num.to_le_bytes());
169        data.extend(self.public_key.as_bytes());
170
171        state.sender.broadcast(data.into()).await?;
172
173        if env::var("REALTIME_DEBUG").is_ok() {
174            info!(ctx, "Sent realtime data");
175        }
176
177        Ok(())
178    }
179
180    fn get_and_incr(&self, topic: &TopicId) -> i32 {
181        let mut sequence_numbers = self.sequence_numbers.lock();
182        let entry = sequence_numbers.entry(*topic).or_default();
183        *entry = entry.wrapping_add(1);
184        *entry
185    }
186
187    /// Get the iroh [NodeAddr] without direct IP addresses.
188    pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
189        let mut addr = self.router.endpoint().node_addr().await?;
190        addr.direct_addresses = BTreeSet::new();
191        Ok(addr)
192    }
193
194    /// Leave the realtime channel for a given topic.
195    pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
196        if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
197            // Dropping the last GossipTopic results in quitting the topic.
198            // It is split into GossipReceiver and GossipSender.
199            // GossipSender (`channel.sender`) is dropped automatically.
200
201            // Subscribe loop owns GossipReceiver.
202            // Aborting it and waiting for it to be dropped
203            // drops the receiver.
204            channel.subscribe_loop.abort();
205            let _ = channel.subscribe_loop.await;
206        }
207        Ok(())
208    }
209}
210
211/// Single gossip channel state.
212#[derive(Debug)]
213pub(crate) struct ChannelState {
214    /// The subscribe loop handle.
215    subscribe_loop: JoinHandle<()>,
216
217    sender: iroh_gossip::net::GossipSender,
218}
219
220impl ChannelState {
221    fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
222        Self {
223            subscribe_loop,
224            sender,
225        }
226    }
227}
228
229impl Context {
230    /// Create iroh endpoint and gossip.
231    async fn init_peer_channels(&self) -> Result<Iroh> {
232        info!(self, "Initializing peer channels.");
233        let secret_key = SecretKey::generate(rand::rngs::OsRng);
234        let public_key = secret_key.public();
235
236        let relay_mode = if let Some(relay_url) = self
237            .metadata
238            .read()
239            .await
240            .as_ref()
241            .and_then(|conf| conf.iroh_relay.clone())
242        {
243            RelayMode::Custom(RelayUrl::from(relay_url).into())
244        } else {
245            // FIXME: this should be RelayMode::Disabled instead.
246            // Currently using default relays because otherwise Rust tests fail.
247            RelayMode::Default
248        };
249
250        let endpoint = Endpoint::builder()
251            .tls_x509() // For compatibility with iroh <0.34.0
252            .secret_key(secret_key)
253            .alpns(vec![GOSSIP_ALPN.to_vec()])
254            .relay_mode(relay_mode)
255            .bind()
256            .await?;
257
258        // create gossip
259        // Allow messages up to 128 KB in size.
260        // We set the limit to 128 KiB to account for internal overhead,
261        // but only guarantee 128 KB of payload to WebXDC developers.
262
263        let gossip = Gossip::builder()
264            .max_message_size(128 * 1024)
265            .spawn(endpoint.clone())
266            .await?;
267
268        let router = iroh::protocol::Router::builder(endpoint)
269            .accept(GOSSIP_ALPN, gossip.clone())
270            .spawn();
271
272        Ok(Iroh {
273            router,
274            gossip,
275            sequence_numbers: Mutex::new(HashMap::new()),
276            iroh_channels: RwLock::new(HashMap::new()),
277            public_key,
278        })
279    }
280
281    /// Get or initialize the iroh peer channel.
282    pub async fn get_or_try_init_peer_channel(
283        &self,
284    ) -> Result<tokio::sync::RwLockReadGuard<'_, Iroh>> {
285        if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
286            bail!("Attempt to get Iroh when realtime is disabled");
287        }
288
289        if let Ok(lock) = tokio::sync::RwLockReadGuard::<'_, std::option::Option<Iroh>>::try_map(
290            self.iroh.read().await,
291            |opt_iroh| opt_iroh.as_ref(),
292        ) {
293            return Ok(lock);
294        }
295
296        let lock = self.iroh.write().await;
297        match tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
298            lock,
299            |opt_iroh| opt_iroh.as_ref(),
300        ) {
301            Ok(lock) => Ok(lock),
302            Err(mut lock) => {
303                let iroh = self.init_peer_channels().await?;
304                *lock = Some(iroh);
305                tokio::sync::RwLockWriteGuard::<'_, std::option::Option<Iroh>>::try_downgrade_map(
306                    lock,
307                    |opt_iroh| opt_iroh.as_ref(),
308                )
309                .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value"))
310            }
311        }
312    }
313
314    pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
315        if let Some(iroh) = &*self.iroh.read().await {
316            info!(
317                self,
318                "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
319            );
320            iroh.maybe_add_gossip_peer(topic, peer).await?;
321        }
322        Ok(())
323    }
324}
325
326/// Cache a peers [NodeId] for one topic.
327pub(crate) async fn iroh_add_peer_for_topic(
328    ctx: &Context,
329    msg_id: MsgId,
330    topic: TopicId,
331    peer: NodeId,
332    relay_server: Option<&str>,
333) -> Result<()> {
334    ctx.sql
335        .execute(
336            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
337            (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server),
338        )
339        .await?;
340    Ok(())
341}
342
343/// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`.
344pub async fn add_gossip_peer_from_header(
345    context: &Context,
346    instance_id: MsgId,
347    node_addr: &str,
348) -> Result<()> {
349    if !context
350        .get_config_bool(Config::WebxdcRealtimeEnabled)
351        .await?
352    {
353        return Ok(());
354    }
355
356    let node_addr =
357        serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
358
359    info!(
360        context,
361        "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
362    );
363
364    context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
365        msg_id: instance_id,
366    });
367
368    let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else {
369        warn!(
370            context,
371            "Could not add iroh peer because {instance_id} has no topic."
372        );
373        return Ok(());
374    };
375
376    let node_id = node_addr.node_id;
377    let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
378    iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
379
380    context.maybe_add_gossip_peer(topic, node_addr).await?;
381    Ok(())
382}
383
384/// Insert topicId into the database so that we can use it to retrieve the topic.
385pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
386    ctx.sql
387        .execute(
388            "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
389            (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None),
390        )
391        .await?;
392    Ok(())
393}
394
395/// Get a list of [NodeAddr]s for one webxdc.
396async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
397    ctx.sql
398        .query_map(
399            "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
400            (msg_id, PUBLIC_KEY_STUB),
401            |row| {
402                let key:  Vec<u8> = row.get(0)?;
403                let server: Option<String> = row.get(1)?;
404                Ok((key, server))
405            },
406            |g| {
407                g.map(|data| {
408                    let (key, server) = data?;
409                    let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
410                    let id = NodeId::from_bytes(&key.try_into()
411                    .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
412                    Ok::<_, anyhow::Error>(NodeAddr::from_parts(
413                        id, server, vec![]
414                    ))
415                })
416                .collect::<std::result::Result<Vec<_>, _>>()
417            },
418        )
419        .await
420}
421
422/// Get the topic for a given [MsgId].
423pub(crate) async fn get_iroh_topic_for_msg(
424    ctx: &Context,
425    msg_id: MsgId,
426) -> Result<Option<TopicId>> {
427    if let Some(bytes) = ctx
428        .sql
429        .query_get_value::<Vec<u8>>(
430            "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
431            (msg_id,),
432        )
433        .await
434        .context("Couldn't restore topic from db")?
435    {
436        let topic_id = TopicId::from_bytes(
437            bytes
438                .try_into()
439                .map_err(|_| anyhow!("Could not convert stored topic ID"))?,
440        );
441        Ok(Some(topic_id))
442    } else {
443        Ok(None)
444    }
445}
446
447/// Send a gossip advertisement to the chat that [MsgId] belongs to.
448/// This method should be called from the frontend when `joinRealtimeChannel` is called.
449pub async fn send_webxdc_realtime_advertisement(
450    ctx: &Context,
451    msg_id: MsgId,
452) -> Result<Option<oneshot::Receiver<()>>> {
453    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
454        return Ok(None);
455    }
456
457    let iroh = ctx.get_or_try_init_peer_channel().await?;
458    let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?;
459
460    let webxdc = Message::load_from_db(ctx, msg_id).await?;
461    let mut msg = Message::new(Viewtype::Text);
462    msg.hidden = true;
463    msg.param.set_cmd(SystemMessage::IrohNodeAddr);
464    msg.in_reply_to = Some(webxdc.rfc724_mid.clone());
465    send_msg(ctx, webxdc.chat_id, &mut msg).await?;
466    info!(ctx, "IROH_REALTIME: Sent realtime advertisement");
467    Ok(conn)
468}
469
470/// Send realtime data to other peers using iroh.
471pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec<u8>) -> Result<()> {
472    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
473        return Ok(());
474    }
475
476    let iroh = ctx.get_or_try_init_peer_channel().await?;
477    iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?;
478    Ok(())
479}
480
481/// Leave the gossip of the webxdc with given [MsgId].
482pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
483    if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
484        return Ok(());
485    }
486    let topic = get_iroh_topic_for_msg(ctx, msg_id)
487        .await?
488        .with_context(|| format!("Message {msg_id} has no gossip topic"))?;
489    let iroh = ctx.get_or_try_init_peer_channel().await?;
490    iroh.leave_realtime(topic).await?;
491    info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
492
493    Ok(())
494}
495
496/// Creates a new random gossip topic.
497fn create_random_topic() -> TopicId {
498    TopicId::from_bytes(rand::random())
499}
500
501/// Creates `Iroh-Gossip-Header` with a new random topic
502/// and stores the topic for the message.
503pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<String> {
504    let topic = create_random_topic();
505    insert_topic_stub(ctx, msg_id, topic).await?;
506    let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
507    Ok(topic_string)
508}
509
510async fn subscribe_loop(
511    context: &Context,
512    mut stream: iroh_gossip::net::GossipReceiver,
513    topic: TopicId,
514    msg_id: MsgId,
515    join_tx: oneshot::Sender<()>,
516) -> Result<()> {
517    let mut join_tx = Some(join_tx);
518
519    while let Some(event) = stream.try_next().await? {
520        match event {
521            Event::Gossip(event) => match event {
522                GossipEvent::Joined(nodes) => {
523                    if let Some(join_tx) = join_tx.take() {
524                        // Try to notify that at least one peer joined,
525                        // but ignore the error if receiver is dropped and nobody listens.
526                        join_tx.send(()).ok();
527                    }
528
529                    for node in nodes {
530                        iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
531                    }
532                }
533                GossipEvent::NeighborUp(node) => {
534                    info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
535                    iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
536                }
537                GossipEvent::NeighborDown(_node) => {}
538                GossipEvent::Received(message) => {
539                    info!(context, "IROH_REALTIME: Received realtime data");
540                    context.emit_event(EventType::WebxdcRealtimeData {
541                        msg_id,
542                        data: message
543                            .content
544                            .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
545                            .context("too few bytes in iroh message")?
546                            .into(),
547                    });
548                }
549            },
550            Event::Lagged => {
551                warn!(context, "Gossip lost some messages");
552            }
553        };
554    }
555    Ok(())
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561    use crate::{
562        EventType,
563        chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg},
564        message::{Message, Viewtype},
565        test_utils::{TestContext, TestContextManager},
566    };
567
568    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
569    async fn test_can_communicate() {
570        let mut tcm = TestContextManager::new();
571        let alice = &mut tcm.alice().await;
572        let bob = &mut tcm.bob().await;
573
574        // Alice sends webxdc to bob
575        let alice_chat = alice.create_chat(bob).await;
576        let mut instance = Message::new(Viewtype::File);
577        instance
578            .set_file_from_bytes(
579                alice,
580                "minimal.xdc",
581                include_bytes!("../test-data/webxdc/minimal.xdc"),
582                None,
583            )
584            .unwrap();
585
586        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
587        let alice_webxdc = alice.get_last_msg().await;
588        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
589
590        let webxdc = alice.pop_sent_msg().await;
591        let bob_webxdc = bob.recv_msg(&webxdc).await;
592        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
593
594        bob_webxdc.chat_id.accept(bob).await.unwrap();
595
596        // Alice advertises herself.
597        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
598            .await
599            .unwrap();
600
601        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
602        loop {
603            let event = bob.evtracker.recv().await.unwrap();
604            if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ {
605                assert!(msg_id == alice_webxdc.id);
606                break;
607            }
608        }
609
610        // Bob adds alice to gossip peers.
611        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
612            .await
613            .unwrap()
614            .into_iter()
615            .map(|addr| addr.node_id)
616            .collect::<Vec<_>>();
617
618        assert_eq!(
619            members,
620            vec![
621                alice
622                    .get_or_try_init_peer_channel()
623                    .await
624                    .unwrap()
625                    .get_node_addr()
626                    .await
627                    .unwrap()
628                    .node_id
629            ]
630        );
631
632        bob.get_or_try_init_peer_channel()
633            .await
634            .unwrap()
635            .join_and_subscribe_gossip(bob, bob_webxdc.id)
636            .await
637            .unwrap()
638            .unwrap()
639            .await
640            .unwrap();
641
642        // Alice sends ephemeral message
643        alice
644            .get_or_try_init_peer_channel()
645            .await
646            .unwrap()
647            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
648            .await
649            .unwrap();
650
651        loop {
652            let event = bob.evtracker.recv().await.unwrap();
653            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
654                if data == "alice -> bob".as_bytes() {
655                    break;
656                } else {
657                    panic!(
658                        "Unexpected status update: {}",
659                        String::from_utf8_lossy(&data)
660                    );
661                }
662            }
663        }
664        // Bob sends ephemeral message
665        bob.get_or_try_init_peer_channel()
666            .await
667            .unwrap()
668            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
669            .await
670            .unwrap();
671
672        loop {
673            let event = alice.evtracker.recv().await.unwrap();
674            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
675                if data == "bob -> alice".as_bytes() {
676                    break;
677                } else {
678                    panic!(
679                        "Unexpected status update: {}",
680                        String::from_utf8_lossy(&data)
681                    );
682                }
683            }
684        }
685
686        // Alice adds bob to gossip peers.
687        let members = get_iroh_gossip_peers(alice, alice_webxdc.id)
688            .await
689            .unwrap()
690            .into_iter()
691            .map(|addr| addr.node_id)
692            .collect::<Vec<_>>();
693
694        assert_eq!(
695            members,
696            vec![
697                bob.get_or_try_init_peer_channel()
698                    .await
699                    .unwrap()
700                    .get_node_addr()
701                    .await
702                    .unwrap()
703                    .node_id
704            ]
705        );
706
707        bob.get_or_try_init_peer_channel()
708            .await
709            .unwrap()
710            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec())
711            .await
712            .unwrap();
713
714        loop {
715            let event = alice.evtracker.recv().await.unwrap();
716            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
717                if data == "bob -> alice 2".as_bytes() {
718                    break;
719                } else {
720                    panic!(
721                        "Unexpected status update: {}",
722                        String::from_utf8_lossy(&data)
723                    );
724                }
725            }
726        }
727
728        // Calling stop_io() closes iroh endpoint as well,
729        // even though I/O was not started in this test.
730        assert!(alice.iroh.read().await.is_some());
731        alice.stop_io().await;
732        assert!(alice.iroh.read().await.is_none());
733    }
734
735    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
736    async fn test_can_reconnect() {
737        let mut tcm = TestContextManager::new();
738        let alice = &mut tcm.alice().await;
739        let bob = &mut tcm.bob().await;
740
741        assert!(
742            alice
743                .get_config_bool(Config::WebxdcRealtimeEnabled)
744                .await
745                .unwrap()
746        );
747        // Alice sends webxdc to bob
748        let alice_chat = alice.create_chat(bob).await;
749        let mut instance = Message::new(Viewtype::File);
750        instance
751            .set_file_from_bytes(
752                alice,
753                "minimal.xdc",
754                include_bytes!("../test-data/webxdc/minimal.xdc"),
755                None,
756            )
757            .unwrap();
758
759        send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
760        let alice_webxdc = alice.get_last_msg().await;
761        assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc);
762
763        let webxdc = alice.pop_sent_msg().await;
764        let bob_webxdc = bob.recv_msg(&webxdc).await;
765        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
766
767        bob_webxdc.chat_id.accept(bob).await.unwrap();
768
769        // Alice advertises herself.
770        send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
771            .await
772            .unwrap();
773
774        bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
775
776        // Bob adds alice to gossip peers.
777        let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
778            .await
779            .unwrap()
780            .into_iter()
781            .map(|addr| addr.node_id)
782            .collect::<Vec<_>>();
783
784        assert_eq!(
785            members,
786            vec![
787                alice
788                    .get_or_try_init_peer_channel()
789                    .await
790                    .unwrap()
791                    .get_node_addr()
792                    .await
793                    .unwrap()
794                    .node_id
795            ]
796        );
797
798        bob.get_or_try_init_peer_channel()
799            .await
800            .unwrap()
801            .join_and_subscribe_gossip(bob, bob_webxdc.id)
802            .await
803            .unwrap()
804            .unwrap()
805            .await
806            .unwrap();
807
808        // Alice sends ephemeral message
809        alice
810            .get_or_try_init_peer_channel()
811            .await
812            .unwrap()
813            .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec())
814            .await
815            .unwrap();
816
817        loop {
818            let event = bob.evtracker.recv().await.unwrap();
819            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
820                if data == "alice -> bob".as_bytes() {
821                    break;
822                } else {
823                    panic!(
824                        "Unexpected status update: {}",
825                        String::from_utf8_lossy(&data)
826                    );
827                }
828            }
829        }
830
831        let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
832            .await
833            .unwrap()
834            .unwrap();
835        let bob_sequence_number = bob
836            .iroh
837            .read()
838            .await
839            .as_ref()
840            .unwrap()
841            .sequence_numbers
842            .lock()
843            .get(&bob_topic)
844            .copied();
845        leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
846        let bob_sequence_number_after = bob
847            .iroh
848            .read()
849            .await
850            .as_ref()
851            .unwrap()
852            .sequence_numbers
853            .lock()
854            .get(&bob_topic)
855            .copied();
856        // Check that sequence number is persisted when leaving the channel.
857        assert_eq!(bob_sequence_number, bob_sequence_number_after);
858
859        bob.get_or_try_init_peer_channel()
860            .await
861            .unwrap()
862            .join_and_subscribe_gossip(bob, bob_webxdc.id)
863            .await
864            .unwrap()
865            .unwrap()
866            .await
867            .unwrap();
868
869        bob.get_or_try_init_peer_channel()
870            .await
871            .unwrap()
872            .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec())
873            .await
874            .unwrap();
875
876        loop {
877            let event = alice.evtracker.recv().await.unwrap();
878            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
879                if data == "bob -> alice".as_bytes() {
880                    break;
881                } else {
882                    panic!(
883                        "Unexpected status update: {}",
884                        String::from_utf8_lossy(&data)
885                    );
886                }
887            }
888        }
889
890        // channel is only used to remember if an advertisement has been sent
891        // bob for example does not change the channels because he never sends an
892        // advertisement
893        assert_eq!(
894            alice
895                .iroh
896                .read()
897                .await
898                .as_ref()
899                .unwrap()
900                .iroh_channels
901                .read()
902                .await
903                .len(),
904            1
905        );
906        leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
907        let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
908            .await
909            .unwrap()
910            .unwrap();
911        assert!(
912            alice
913                .iroh
914                .read()
915                .await
916                .as_ref()
917                .unwrap()
918                .iroh_channels
919                .read()
920                .await
921                .get(&topic)
922                .is_none()
923        );
924    }
925
926    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
927    async fn test_parallel_connect() {
928        let mut tcm = TestContextManager::new();
929        let alice = &mut tcm.alice().await;
930        let bob = &mut tcm.bob().await;
931
932        let chat = alice.create_chat(bob).await.id;
933
934        let mut instance = Message::new(Viewtype::File);
935        instance
936            .set_file_from_bytes(
937                alice,
938                "minimal.xdc",
939                include_bytes!("../test-data/webxdc/minimal.xdc"),
940                None,
941            )
942            .unwrap();
943        connect_alice_bob(alice, chat, &mut instance, bob).await
944    }
945
946    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
947    async fn test_webxdc_resend() {
948        let mut tcm = TestContextManager::new();
949        let alice = &mut tcm.alice().await;
950        let bob = &mut tcm.bob().await;
951        let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat")
952            .await
953            .unwrap();
954
955        // Alice sends webxdc to bob
956        let mut instance = Message::new(Viewtype::File);
957        instance
958            .set_file_from_bytes(
959                alice,
960                "minimal.xdc",
961                include_bytes!("../test-data/webxdc/minimal.xdc"),
962                None,
963            )
964            .unwrap();
965
966        add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
967            .await
968            .unwrap();
969
970        connect_alice_bob(alice, group, &mut instance, bob).await;
971
972        // fiona joins late
973        let fiona = &mut tcm.fiona().await;
974
975        add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
976            .await
977            .unwrap();
978
979        resend_msgs(alice, &[instance.id]).await.unwrap();
980        let msg = alice.pop_sent_msg().await;
981        let fiona_instance = fiona.recv_msg(&msg).await;
982        fiona_instance.chat_id.accept(fiona).await.unwrap();
983        assert!(fiona.ctx.iroh.read().await.is_none());
984
985        let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
986            .await
987            .unwrap()
988            .unwrap();
989        let fiona_advert = fiona.pop_sent_msg().await;
990        alice.recv_msg_trash(&fiona_advert).await;
991
992        fiona_connect_future.await.unwrap();
993
994        let realtime_send_loop = async {
995            // Keep sending in a loop because right after joining
996            // Fiona may miss messages.
997            loop {
998                send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
999                    .await
1000                    .unwrap();
1001                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1002            }
1003        };
1004
1005        let realtime_receive_loop = async {
1006            loop {
1007                let event = fiona.evtracker.recv().await.unwrap();
1008                if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1009                    if data == b"alice -> bob & fiona" {
1010                        break;
1011                    } else {
1012                        panic!(
1013                            "Unexpected status update: {}",
1014                            String::from_utf8_lossy(&data)
1015                        );
1016                    }
1017                }
1018            }
1019        };
1020        tokio::select!(
1021            _ = realtime_send_loop => {
1022                panic!("Send loop should never finish");
1023            },
1024            _ = realtime_receive_loop => {
1025                return;
1026            }
1027        );
1028    }
1029
1030    async fn connect_alice_bob(
1031        alice: &mut TestContext,
1032        alice_chat_id: ChatId,
1033        instance: &mut Message,
1034        bob: &mut TestContext,
1035    ) {
1036        send_msg(alice, alice_chat_id, instance).await.unwrap();
1037        let alice_webxdc = alice.get_last_msg().await;
1038
1039        let webxdc = alice.pop_sent_msg().await;
1040        let bob_webxdc = bob.recv_msg(&webxdc).await;
1041        assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc);
1042
1043        bob_webxdc.chat_id.accept(bob).await.unwrap();
1044
1045        eprintln!("Sending advertisements");
1046        // Alice advertises herself.
1047        let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
1048            .await
1049            .unwrap()
1050            .unwrap();
1051        let alice_advertisement = alice.pop_sent_msg().await;
1052
1053        let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
1054            .await
1055            .unwrap()
1056            .unwrap();
1057        let bob_advertisement = bob.pop_sent_msg().await;
1058
1059        eprintln!("Receiving advertisements");
1060        bob.recv_msg_trash(&alice_advertisement).await;
1061        alice.recv_msg_trash(&bob_advertisement).await;
1062
1063        eprintln!("Alice and Bob wait for connection");
1064        alice_advertisement_future.await.unwrap();
1065        bob_advertisement_future.await.unwrap();
1066
1067        // Alice sends ephemeral message
1068        eprintln!("Sending ephemeral message");
1069        send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into())
1070            .await
1071            .unwrap();
1072
1073        eprintln!("Waiting for ephemeral message");
1074        loop {
1075            let event = bob.evtracker.recv().await.unwrap();
1076            if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
1077                if data == b"alice -> bob" {
1078                    break;
1079                } else {
1080                    panic!(
1081                        "Unexpected status update: {}",
1082                        String::from_utf8_lossy(&data)
1083                    );
1084                }
1085            }
1086        }
1087    }
1088
1089    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1090    async fn test_peer_channels_disabled() {
1091        let mut tcm = TestContextManager::new();
1092        let alice = &mut tcm.alice().await;
1093
1094        alice
1095            .set_config_bool(Config::WebxdcRealtimeEnabled, false)
1096            .await
1097            .unwrap();
1098
1099        // creates iroh endpoint as side effect
1100        send_webxdc_realtime_advertisement(alice, MsgId::new(1))
1101            .await
1102            .unwrap();
1103
1104        assert!(alice.ctx.iroh.read().await.is_none());
1105
1106        // creates iroh endpoint as side effect
1107        send_webxdc_realtime_data(alice, MsgId::new(1), vec![])
1108            .await
1109            .unwrap();
1110
1111        assert!(alice.ctx.iroh.read().await.is_none());
1112
1113        // creates iroh endpoint as side effect
1114        leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap();
1115
1116        assert!(alice.ctx.iroh.read().await.is_none());
1117
1118        // This internal function should return error
1119        // if accidentally called with the setting disabled.
1120        assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err());
1121    }
1122}