deltachat/imex/
transfer.rs

1//! Transfer a backup to an other device.
2//!
3//! This module provides support for using [iroh](https://iroh.computer/)
4//! to initiate transfer of a backup to another device using a QR code.
5//!
6//! There are two parties to this:
7//! - The *Provider*, which starts a server and listens for connections.
8//! - The *Getter*, which connects to the server and retrieves the data.
9//!
10//! Both the provider and the getter are authenticated:
11//!
12//! - The provider is known by its *peer ID*.
13//! - The provider needs an *authentication token* from the getter before it accepts a
14//!   connection.
15//!
16//! Both these are transferred in the QR code offered to the getter.  This ensures that the
17//! getter can not connect to an impersonated provider and the provider does not offer the
18//! download to an impersonated getter.
19//!
20//! Protocol starts by getter opening a bidirectional QUIC stream
21//! to the provider and sending authentication token.
22//! Provider verifies received authentication token,
23//! sends the size of all files in a backup (database and all blobs)
24//! as an unsigned 64-bit big endian integer and streams the backup in tar format.
25//! Getter receives the backup and acknowledges successful reception
26//! by sending a single byte.
27//! Provider closes the endpoint after receiving an acknowledgment.
28
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{bail, format_err, Context as _, Result};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::chat::add_device_msg;
42use crate::context::Context;
43use crate::imex::BlobDirContents;
44use crate::message::Message;
45use crate::qr::Qr;
46use crate::stock_str::backup_transfer_msg_body;
47use crate::tools::{create_id, time, TempPathGuard};
48use crate::{e2ee, EventType};
49
50use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
51
52/// ALPN protocol identifier for the backup transfer protocol.
53const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
54
55/// Provide or send a backup of this device.
56///
57/// This creates a backup of the current device and starts a service which offers another
58/// device to download this backup.
59///
60/// This does not make a full backup on disk, only the SQLite database is created on disk,
61/// the blobs in the blob directory are not copied.
62///
63/// This starts a task which acquires the global "ongoing" mutex.  If you need to stop the
64/// task use the [`Context::stop_ongoing`] mechanism.
65#[derive(Debug)]
66pub struct BackupProvider {
67    /// iroh endpoint.
68    _endpoint: Endpoint,
69
70    /// iroh address.
71    node_addr: iroh::NodeAddr,
72
73    /// Authentication token that should be submitted
74    /// to retrieve the backup.
75    auth_token: String,
76
77    /// Handle for the task accepting backup transfer requests.
78    handle: JoinHandle<Result<()>>,
79
80    /// Guard to cancel the provider on drop.
81    _drop_guard: tokio_util::sync::DropGuard,
82}
83
84impl BackupProvider {
85    /// Prepares for sending a backup to a second device.
86    ///
87    /// Before calling this function all I/O must be stopped so that no changes to the blobs
88    /// or database are happening, this is done by calling the [`Accounts::stop_io`] or
89    /// [`Context::stop_io`] APIs first.
90    ///
91    /// This will acquire the global "ongoing process" mutex, which can be used to cancel
92    /// the process.
93    ///
94    /// [`Accounts::stop_io`]: crate::accounts::Accounts::stop_io
95    pub async fn prepare(context: &Context) -> Result<Self> {
96        let relay_mode = RelayMode::Disabled;
97        let endpoint = Endpoint::builder()
98            .tls_x509() // For compatibility with iroh <0.34.0
99            .alpns(vec![BACKUP_ALPN.to_vec()])
100            .relay_mode(relay_mode)
101            .bind()
102            .await?;
103        let node_addr = endpoint.node_addr().await?;
104
105        // Acquire global "ongoing" mutex.
106        let cancel_token = context.alloc_ongoing().await?;
107        let paused_guard = context.scheduler.pause(context.clone()).await?;
108        let context_dir = context
109            .get_blobdir()
110            .parent()
111            .context("Context dir not found")?;
112
113        // before we export, make sure the private key exists
114        e2ee::ensure_secret_key_exists(context)
115            .await
116            .context("Cannot create private key or private key not available")?;
117
118        let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
119        if fs::metadata(&dbfile).await.is_ok() {
120            fs::remove_file(&dbfile).await?;
121            warn!(context, "Previous database export deleted");
122        }
123        let dbfile = TempPathGuard::new(dbfile);
124
125        // Authentication token that receiver should send us to receive a backup.
126        let auth_token = create_id();
127
128        let passphrase = String::new();
129
130        export_database(context, &dbfile, passphrase, time())
131            .await
132            .context("Database export failed")?;
133
134        let drop_token = CancellationToken::new();
135        let handle = {
136            let context = context.clone();
137            let drop_token = drop_token.clone();
138            let endpoint = endpoint.clone();
139            let auth_token = auth_token.clone();
140            tokio::spawn(async move {
141                Self::accept_loop(
142                    context.clone(),
143                    endpoint,
144                    auth_token,
145                    cancel_token,
146                    drop_token,
147                    dbfile,
148                )
149                .await;
150                info!(context, "Finished accept loop.");
151
152                context.free_ongoing().await;
153
154                // Explicit drop to move the guards into this future
155                drop(paused_guard);
156                Ok(())
157            })
158        };
159        Ok(Self {
160            _endpoint: endpoint,
161            node_addr,
162            auth_token,
163            handle,
164            _drop_guard: drop_token.drop_guard(),
165        })
166    }
167
168    async fn handle_connection(
169        context: Context,
170        conn: iroh::endpoint::Connecting,
171        auth_token: String,
172        dbfile: Arc<TempPathGuard>,
173    ) -> Result<()> {
174        let conn = conn.await?;
175        let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
176
177        // Read authentication token from the stream.
178        let mut received_auth_token = vec![0u8; auth_token.len()];
179        recv_stream.read_exact(&mut received_auth_token).await?;
180        if received_auth_token.as_slice() != auth_token.as_bytes() {
181            warn!(context, "Received wrong backup authentication token.");
182            return Ok(());
183        }
184
185        info!(context, "Received valid backup authentication token.");
186        // Emit a nonzero progress so that UIs can display smth like "Transferring...".
187        context.emit_event(EventType::ImexProgress(1));
188
189        let blobdir = BlobDirContents::new(&context).await?;
190
191        let mut file_size = 0;
192        file_size += dbfile.metadata()?.len();
193        for blob in blobdir.iter() {
194            file_size += blob.to_abs_path().metadata()?.len()
195        }
196
197        send_stream.write_all(&file_size.to_be_bytes()).await?;
198
199        export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
200            .await
201            .context("Failed to write backup into QUIC stream")?;
202        info!(context, "Finished writing backup into QUIC stream.");
203        let mut buf = [0u8; 1];
204        info!(context, "Waiting for acknowledgment.");
205        recv_stream.read_exact(&mut buf).await?;
206        info!(context, "Received backup reception acknowledgement.");
207        context.emit_event(EventType::ImexProgress(1000));
208
209        let mut msg = Message::new_text(backup_transfer_msg_body(&context).await);
210        add_device_msg(&context, None, Some(&mut msg)).await?;
211
212        Ok(())
213    }
214
215    async fn accept_loop(
216        context: Context,
217        endpoint: Endpoint,
218        auth_token: String,
219        cancel_token: async_channel::Receiver<()>,
220        drop_token: CancellationToken,
221        dbfile: TempPathGuard,
222    ) {
223        let dbfile = Arc::new(dbfile);
224        loop {
225            tokio::select! {
226                biased;
227
228                conn = endpoint.accept() => {
229                    if let Some(conn) = conn {
230                        let conn = match conn.accept() {
231                            Ok(conn) => conn,
232                            Err(err) => {
233                               warn!(context, "Failed to accept iroh connection: {err:#}.");
234                               continue;
235                            }
236                        };
237                        // Got a new in-progress connection.
238                        let context = context.clone();
239                        let auth_token = auth_token.clone();
240                        let dbfile = dbfile.clone();
241                        if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
242                            async {
243                                cancel_token.recv().await.ok();
244                                Err(format_err!("Backup transfer cancelled"))
245                            }
246                        ).race(
247                            async {
248                                drop_token.cancelled().await;
249                                Err(format_err!("Backup provider dropped"))
250                            }
251                        ).await {
252                            warn!(context, "Error while handling backup connection: {err:#}.");
253                            context.emit_event(EventType::ImexProgress(0));
254                            break;
255                        } else {
256                            info!(context, "Backup transfer finished successfully.");
257                            break;
258                        }
259                    } else {
260                        break;
261                    }
262                },
263                _ = cancel_token.recv() => {
264                    info!(context, "Backup transfer cancelled by the user, stopping accept loop.");
265                    context.emit_event(EventType::ImexProgress(0));
266                    break;
267                }
268                _ = drop_token.cancelled() => {
269                    info!(context, "Backup transfer cancelled by dropping the provider, stopping accept loop.");
270                    context.emit_event(EventType::ImexProgress(0));
271                    break;
272                }
273            }
274        }
275    }
276
277    /// Returns a QR code that allows fetching this backup.
278    ///
279    /// This QR code can be passed to [`get_backup`] on a (different) device.
280    pub fn qr(&self) -> Qr {
281        Qr::Backup2 {
282            node_addr: self.node_addr.clone(),
283
284            auth_token: self.auth_token.clone(),
285        }
286    }
287}
288
289impl Future for BackupProvider {
290    type Output = Result<()>;
291
292    /// Waits for the backup transfer to complete.
293    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
294        Pin::new(&mut self.handle).poll(cx)?
295    }
296}
297
298pub async fn get_backup2(
299    context: &Context,
300    node_addr: iroh::NodeAddr,
301    auth_token: String,
302) -> Result<()> {
303    let relay_mode = RelayMode::Disabled;
304
305    let endpoint = Endpoint::builder()
306        .tls_x509() // For compatibility with iroh <0.34.0
307        .relay_mode(relay_mode)
308        .bind()
309        .await?;
310
311    let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
312    let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
313    info!(context, "Sending backup authentication token.");
314    send_stream.write_all(auth_token.as_bytes()).await?;
315
316    let passphrase = String::new();
317    info!(context, "Starting to read backup from the stream.");
318
319    let mut file_size_buf = [0u8; 8];
320    recv_stream.read_exact(&mut file_size_buf).await?;
321    let file_size = u64::from_be_bytes(file_size_buf);
322    info!(context, "Received backup file size.");
323    // Emit a nonzero progress so that UIs can display smth like "Transferring...".
324    context.emit_event(EventType::ImexProgress(1));
325
326    import_backup_stream(context, recv_stream, file_size, passphrase)
327        .await
328        .context("Failed to import backup from QUIC stream")?;
329    info!(context, "Finished importing backup from the stream.");
330    context.emit_event(EventType::ImexProgress(1000));
331
332    // Send an acknowledgement, but ignore the errors.
333    // We have imported backup successfully already.
334    send_stream.write_all(b".").await.ok();
335    send_stream.finish().ok();
336    info!(context, "Sent backup reception acknowledgment.");
337
338    // Wait for the peer to acknowledge reception of the acknowledgement
339    // before closing the connection.
340    _ = send_stream.stopped().await;
341
342    Ok(())
343}
344
345/// Contacts a backup provider and receives the backup from it.
346///
347/// This uses a QR code to contact another instance of deltachat which is providing a backup
348/// using the [`BackupProvider`].  Once connected it will authenticate using the secrets in
349/// the QR code and retrieve the backup.
350///
351/// This is a long running operation which will return only when completed.
352///
353/// Using [`Qr`] as argument is a bit odd as it only accepts specific variant of it.  It
354/// does avoid having [`iroh::NodeAddr`] in the primary API however, without
355/// having to revert to untyped bytes.
356pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
357    match qr {
358        Qr::Backup2 {
359            node_addr,
360            auth_token,
361        } => {
362            let cancel_token = context.alloc_ongoing().await?;
363            let res = get_backup2(context, node_addr, auth_token)
364                .race(async {
365                    cancel_token.recv().await.ok();
366                    Err(format_err!("Backup reception cancelled"))
367                })
368                .await;
369            if res.is_err() {
370                context.emit_event(EventType::ImexProgress(0));
371            }
372            context.free_ongoing().await;
373            res?;
374        }
375        _ => bail!("QR code for backup must be of type DCBACKUP2"),
376    }
377    Ok(())
378}
379
380#[cfg(test)]
381mod tests {
382    use std::time::Duration;
383
384    use crate::chat::{get_chat_msgs, send_msg, ChatItem};
385    use crate::message::Viewtype;
386    use crate::test_utils::TestContextManager;
387
388    use super::*;
389
390    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
391    async fn test_send_receive() {
392        let mut tcm = TestContextManager::new();
393
394        // Create first device.
395        let ctx0 = tcm.alice().await;
396
397        // Write a message in the self chat
398        let self_chat = ctx0.get_self_chat().await;
399        let mut msg = Message::new_text("hi there".to_string());
400        send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
401
402        // Send an attachment in the self chat
403        let file = ctx0.get_blobdir().join("hello.txt");
404        fs::write(&file, "i am attachment").await.unwrap();
405        let mut msg = Message::new(Viewtype::File);
406        msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
407            .unwrap();
408        send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
409
410        // Prepare to transfer backup.
411        let provider = BackupProvider::prepare(&ctx0).await.unwrap();
412
413        // Set up second device.
414        let ctx1 = tcm.unconfigured().await;
415        get_backup(&ctx1, provider.qr()).await.unwrap();
416
417        // Make sure the provider finishes without an error.
418        tokio::time::timeout(Duration::from_secs(30), provider)
419            .await
420            .expect("timed out")
421            .expect("error in provider");
422
423        // Check that we have the self message.
424        let self_chat = ctx1.get_self_chat().await;
425        let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
426        assert_eq!(msgs.len(), 2);
427        let msgid = match msgs.first().unwrap() {
428            ChatItem::Message { msg_id } => msg_id,
429            _ => panic!("wrong chat item"),
430        };
431        let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
432        let text = msg.get_text();
433        assert_eq!(text, "hi there");
434        let msgid = match msgs.get(1).unwrap() {
435            ChatItem::Message { msg_id } => msg_id,
436            _ => panic!("wrong chat item"),
437        };
438        let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
439
440        let path = msg.get_file(&ctx1).unwrap();
441        assert_eq!(
442            // That's the hash of the file:
443            path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
444            path
445        );
446        assert_eq!("hello.txt", msg.get_filename().unwrap());
447        let text = fs::read_to_string(&path).await.unwrap();
448        assert_eq!(text, "i am attachment");
449
450        let path = path.with_file_name("saved.txt");
451        msg.save_file(&ctx1, &path).await.unwrap();
452        let text = fs::read_to_string(&path).await.unwrap();
453        assert_eq!(text, "i am attachment");
454        assert!(msg.save_file(&ctx1, &path).await.is_err());
455
456        // Check that both received the ImexProgress events.
457        for ctx in [&ctx0, &ctx1] {
458            ctx.evtracker
459                .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
460                .await;
461            ctx.evtracker
462                .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
463                .await;
464        }
465    }
466
467    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
468    async fn test_drop_provider() {
469        let mut tcm = TestContextManager::new();
470        let ctx = tcm.alice().await;
471
472        let provider = BackupProvider::prepare(&ctx).await.unwrap();
473        drop(provider);
474        ctx.evtracker
475            .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
476            .await;
477    }
478}