deltachat/imex/
transfer.rs

1//! Transfer a backup to an other device.
2//!
3//! This module provides support for using [iroh](https://iroh.computer/)
4//! to initiate transfer of a backup to another device using a QR code.
5//!
6//! There are two parties to this:
7//! - The *Provider*, which starts a server and listens for connections.
8//! - The *Getter*, which connects to the server and retrieves the data.
9//!
10//! Both the provider and the getter are authenticated:
11//!
12//! - The provider is known by its *peer ID*.
13//! - The provider needs an *authentication token* from the getter before it accepts a
14//!   connection.
15//!
16//! Both these are transferred in the QR code offered to the getter.  This ensures that the
17//! getter can not connect to an impersonated provider and the provider does not offer the
18//! download to an impersonated getter.
19//!
20//! Protocol starts by getter opening a bidirectional QUIC stream
21//! to the provider and sending authentication token.
22//! Provider verifies received authentication token,
23//! sends the size of all files in a backup (database and all blobs)
24//! as an unsigned 64-bit big endian integer and streams the backup in tar format.
25//! Getter receives the backup and acknowledges successful reception
26//! by sending a single byte.
27//! Provider closes the endpoint after receiving an acknowledgment.
28
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{Context as _, Result, bail, format_err};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::chat::add_device_msg;
42use crate::context::Context;
43use crate::imex::BlobDirContents;
44use crate::log::warn;
45use crate::message::Message;
46use crate::qr::Qr;
47use crate::stock_str::backup_transfer_msg_body;
48use crate::tools::{TempPathGuard, create_id, time};
49use crate::{EventType, e2ee};
50
51use super::{DBFILE_BACKUP_NAME, export_backup_stream, export_database, import_backup_stream};
52
53/// ALPN protocol identifier for the backup transfer protocol.
54const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
55
56/// Provide or send a backup of this device.
57///
58/// This creates a backup of the current device and starts a service which offers another
59/// device to download this backup.
60///
61/// This does not make a full backup on disk, only the SQLite database is created on disk,
62/// the blobs in the blob directory are not copied.
63///
64/// This starts a task which acquires the global "ongoing" mutex.  If you need to stop the
65/// task use the [`Context::stop_ongoing`] mechanism.
66#[derive(Debug)]
67pub struct BackupProvider {
68    /// iroh endpoint.
69    _endpoint: Endpoint,
70
71    /// iroh address.
72    node_addr: iroh::NodeAddr,
73
74    /// Authentication token that should be submitted
75    /// to retrieve the backup.
76    auth_token: String,
77
78    /// Handle for the task accepting backup transfer requests.
79    handle: JoinHandle<Result<()>>,
80
81    /// Guard to cancel the provider on drop.
82    _drop_guard: tokio_util::sync::DropGuard,
83}
84
85impl BackupProvider {
86    /// Prepares for sending a backup to a second device.
87    ///
88    /// Before calling this function all I/O must be stopped so that no changes to the blobs
89    /// or database are happening, this is done by calling the [`Accounts::stop_io`] or
90    /// [`Context::stop_io`] APIs first.
91    ///
92    /// This will acquire the global "ongoing process" mutex, which can be used to cancel
93    /// the process.
94    ///
95    /// [`Accounts::stop_io`]: crate::accounts::Accounts::stop_io
96    pub async fn prepare(context: &Context) -> Result<Self> {
97        let relay_mode = RelayMode::Disabled;
98        let endpoint = Endpoint::builder()
99            .tls_x509() // For compatibility with iroh <0.34.0
100            .alpns(vec![BACKUP_ALPN.to_vec()])
101            .relay_mode(relay_mode)
102            .bind()
103            .await?;
104        let node_addr = endpoint.node_addr().await?;
105
106        // Acquire global "ongoing" mutex.
107        let cancel_token = context.alloc_ongoing().await?;
108        let paused_guard = context.scheduler.pause(context).await?;
109        let context_dir = context
110            .get_blobdir()
111            .parent()
112            .context("Context dir not found")?;
113
114        // before we export, make sure the private key exists
115        e2ee::ensure_secret_key_exists(context)
116            .await
117            .context("Cannot create private key or private key not available")?;
118
119        let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
120        if fs::metadata(&dbfile).await.is_ok() {
121            fs::remove_file(&dbfile).await?;
122            warn!(context, "Previous database export deleted");
123        }
124        let dbfile = TempPathGuard::new(dbfile);
125
126        // Authentication token that receiver should send us to receive a backup.
127        let auth_token = create_id();
128
129        let passphrase = String::new();
130
131        export_database(context, &dbfile, passphrase, time())
132            .await
133            .context("Database export failed")?;
134
135        let drop_token = CancellationToken::new();
136        let handle = {
137            let context = context.clone();
138            let drop_token = drop_token.clone();
139            let endpoint = endpoint.clone();
140            let auth_token = auth_token.clone();
141            tokio::spawn(async move {
142                Self::accept_loop(
143                    context.clone(),
144                    endpoint,
145                    auth_token,
146                    cancel_token,
147                    drop_token,
148                    dbfile,
149                )
150                .await;
151                info!(context, "Finished accept loop.");
152
153                context.free_ongoing().await;
154
155                // Explicit drop to move the guards into this future
156                drop(paused_guard);
157                Ok(())
158            })
159        };
160        Ok(Self {
161            _endpoint: endpoint,
162            node_addr,
163            auth_token,
164            handle,
165            _drop_guard: drop_token.drop_guard(),
166        })
167    }
168
169    async fn handle_connection(
170        context: Context,
171        conn: iroh::endpoint::Connecting,
172        auth_token: String,
173        dbfile: Arc<TempPathGuard>,
174    ) -> Result<()> {
175        let conn = conn.await?;
176        let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
177
178        // Read authentication token from the stream.
179        let mut received_auth_token = vec![0u8; auth_token.len()];
180        recv_stream.read_exact(&mut received_auth_token).await?;
181        if received_auth_token.as_slice() != auth_token.as_bytes() {
182            warn!(context, "Received wrong backup authentication token.");
183            return Ok(());
184        }
185
186        info!(context, "Received valid backup authentication token.");
187        // Emit a nonzero progress so that UIs can display smth like "Transferring...".
188        context.emit_event(EventType::ImexProgress(1));
189
190        let blobdir = BlobDirContents::new(&context).await?;
191
192        let mut file_size = dbfile.metadata()?.len();
193        for blob in blobdir.iter() {
194            file_size = file_size
195                .checked_add(blob.to_abs_path().metadata()?.len())
196                .context("File size overflow")?;
197        }
198
199        send_stream.write_all(&file_size.to_be_bytes()).await?;
200
201        export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
202            .await
203            .context("Failed to write backup into QUIC stream")?;
204        info!(context, "Finished writing backup into QUIC stream.");
205        let mut buf = [0u8; 1];
206        info!(context, "Waiting for acknowledgment.");
207        recv_stream.read_exact(&mut buf).await?;
208        info!(context, "Received backup reception acknowledgement.");
209        context.emit_event(EventType::ImexProgress(1000));
210
211        let mut msg = Message::new_text(backup_transfer_msg_body(&context).await);
212        add_device_msg(&context, None, Some(&mut msg)).await?;
213
214        Ok(())
215    }
216
217    async fn accept_loop(
218        context: Context,
219        endpoint: Endpoint,
220        auth_token: String,
221        cancel_token: async_channel::Receiver<()>,
222        drop_token: CancellationToken,
223        dbfile: TempPathGuard,
224    ) {
225        let dbfile = Arc::new(dbfile);
226        loop {
227            tokio::select! {
228                biased;
229
230                conn = endpoint.accept() => {
231                    if let Some(conn) = conn {
232                        let conn = match conn.accept() {
233                            Ok(conn) => conn,
234                            Err(err) => {
235                               warn!(context, "Failed to accept iroh connection: {err:#}.");
236                               continue;
237                            }
238                        };
239                        // Got a new in-progress connection.
240                        let context = context.clone();
241                        let auth_token = auth_token.clone();
242                        let dbfile = dbfile.clone();
243                        if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
244                            async {
245                                cancel_token.recv().await.ok();
246                                Err(format_err!("Backup transfer canceled"))
247                            }
248                        ).race(
249                            async {
250                                drop_token.cancelled().await;
251                                Err(format_err!("Backup provider dropped"))
252                            }
253                        ).await {
254                            error!(context, "Error while handling backup connection: {err:#}.");
255                            context.emit_event(EventType::ImexProgress(0));
256                            break;
257                        } else {
258                            info!(context, "Backup transfer finished successfully.");
259                            break;
260                        }
261                    } else {
262                        break;
263                    }
264                },
265                _ = cancel_token.recv() => {
266                    info!(context, "Backup transfer canceled by the user, stopping accept loop.");
267                    context.emit_event(EventType::ImexProgress(0));
268                    break;
269                }
270                _ = drop_token.cancelled() => {
271                    info!(context, "Backup transfer canceled by dropping the provider, stopping accept loop.");
272                    context.emit_event(EventType::ImexProgress(0));
273                    break;
274                }
275            }
276        }
277    }
278
279    /// Returns a QR code that allows fetching this backup.
280    ///
281    /// This QR code can be passed to [`get_backup`] on a (different) device.
282    pub fn qr(&self) -> Qr {
283        Qr::Backup2 {
284            node_addr: self.node_addr.clone(),
285
286            auth_token: self.auth_token.clone(),
287        }
288    }
289}
290
291impl Future for BackupProvider {
292    type Output = Result<()>;
293
294    /// Waits for the backup transfer to complete.
295    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
296        Pin::new(&mut self.handle).poll(cx)?
297    }
298}
299
300pub async fn get_backup2(
301    context: &Context,
302    node_addr: iroh::NodeAddr,
303    auth_token: String,
304) -> Result<()> {
305    let relay_mode = RelayMode::Disabled;
306
307    let endpoint = Endpoint::builder()
308        .tls_x509() // For compatibility with iroh <0.34.0
309        .relay_mode(relay_mode)
310        .bind()
311        .await?;
312
313    let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
314    let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
315    info!(context, "Sending backup authentication token.");
316    send_stream.write_all(auth_token.as_bytes()).await?;
317
318    let passphrase = String::new();
319    info!(context, "Starting to read backup from the stream.");
320
321    let mut file_size_buf = [0u8; 8];
322    recv_stream.read_exact(&mut file_size_buf).await?;
323    let file_size = u64::from_be_bytes(file_size_buf);
324    info!(context, "Received backup file size.");
325    // Emit a nonzero progress so that UIs can display smth like "Transferring...".
326    context.emit_event(EventType::ImexProgress(1));
327
328    import_backup_stream(context, recv_stream, file_size, passphrase)
329        .await
330        .context("Failed to import backup from QUIC stream")?;
331    info!(context, "Finished importing backup from the stream.");
332    context.emit_event(EventType::ImexProgress(1000));
333
334    // Send an acknowledgement, but ignore the errors.
335    // We have imported backup successfully already.
336    send_stream.write_all(b".").await.ok();
337    send_stream.finish().ok();
338    info!(context, "Sent backup reception acknowledgment.");
339
340    // Wait for the peer to acknowledge reception of the acknowledgement
341    // before closing the connection.
342    _ = send_stream.stopped().await;
343
344    Ok(())
345}
346
347/// Contacts a backup provider and receives the backup from it.
348///
349/// This uses a QR code to contact another instance of deltachat which is providing a backup
350/// using the [`BackupProvider`].  Once connected it will authenticate using the secrets in
351/// the QR code and retrieve the backup.
352///
353/// This is a long running operation which will return only when completed.
354///
355/// Using [`Qr`] as argument is a bit odd as it only accepts specific variant of it.  It
356/// does avoid having [`iroh::NodeAddr`] in the primary API however, without
357/// having to revert to untyped bytes.
358pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
359    match qr {
360        Qr::Backup2 {
361            node_addr,
362            auth_token,
363        } => {
364            let cancel_token = context.alloc_ongoing().await?;
365            let res = get_backup2(context, node_addr, auth_token)
366                .race(async {
367                    cancel_token.recv().await.ok();
368                    Err(format_err!("Backup reception canceled"))
369                })
370                .await;
371            if let Err(ref res) = res {
372                error!(context, "{:#}", res);
373                context.emit_event(EventType::ImexProgress(0));
374            }
375            context.free_ongoing().await;
376            res?;
377        }
378        _ => bail!("QR code for backup must be of type DCBACKUP2"),
379    }
380    Ok(())
381}
382
383#[cfg(test)]
384mod tests {
385    use std::time::Duration;
386
387    use crate::chat::{ChatItem, get_chat_msgs, send_msg};
388    use crate::message::Viewtype;
389    use crate::test_utils::TestContextManager;
390
391    use super::*;
392
393    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
394    async fn test_send_receive() {
395        let mut tcm = TestContextManager::new();
396
397        // Create first device.
398        let ctx0 = tcm.alice().await;
399
400        // Write a message in the self chat
401        let self_chat = ctx0.get_self_chat().await;
402        let mut msg = Message::new_text("hi there".to_string());
403        send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
404
405        // Send an attachment in the self chat
406        let file = ctx0.get_blobdir().join("hello.txt");
407        fs::write(&file, "i am attachment").await.unwrap();
408        let mut msg = Message::new(Viewtype::File);
409        msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
410            .unwrap();
411        send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
412
413        // Prepare to transfer backup.
414        let provider = BackupProvider::prepare(&ctx0).await.unwrap();
415
416        // Set up second device.
417        let ctx1 = tcm.unconfigured().await;
418        get_backup(&ctx1, provider.qr()).await.unwrap();
419
420        // Make sure the provider finishes without an error.
421        tokio::time::timeout(Duration::from_secs(30), provider)
422            .await
423            .expect("timed out")
424            .expect("error in provider");
425
426        // Check that we have the self message.
427        let self_chat = ctx1.get_self_chat().await;
428        let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
429        assert_eq!(msgs.len(), 2);
430        let msgid = match msgs.first().unwrap() {
431            ChatItem::Message { msg_id } => msg_id,
432            _ => panic!("wrong chat item"),
433        };
434        let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
435        let text = msg.get_text();
436        assert_eq!(text, "hi there");
437        let msgid = match msgs.get(1).unwrap() {
438            ChatItem::Message { msg_id } => msg_id,
439            _ => panic!("wrong chat item"),
440        };
441        let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
442
443        let path = msg.get_file(&ctx1).unwrap();
444        assert_eq!(
445            // That's the hash of the file:
446            path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
447            path
448        );
449        assert_eq!("hello.txt", msg.get_filename().unwrap());
450        let text = fs::read_to_string(&path).await.unwrap();
451        assert_eq!(text, "i am attachment");
452
453        let path = path.with_file_name("saved.txt");
454        msg.save_file(&ctx1, &path).await.unwrap();
455        let text = fs::read_to_string(&path).await.unwrap();
456        assert_eq!(text, "i am attachment");
457        assert!(msg.save_file(&ctx1, &path).await.is_err());
458
459        // Check that both received the ImexProgress events.
460        for ctx in [&ctx0, &ctx1] {
461            ctx.evtracker
462                .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
463                .await;
464            ctx.evtracker
465                .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
466                .await;
467        }
468    }
469
470    /// Tests that trying to accidentally overwrite a profile
471    /// that is in use will fail.
472    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
473    async fn test_cant_overwrite_profile_in_use() -> Result<()> {
474        let mut tcm = TestContextManager::new();
475        let ctx0 = &tcm.alice().await;
476        let ctx1 = &tcm.bob().await;
477
478        // Prepare to transfer backup.
479        let provider = BackupProvider::prepare(ctx0).await?;
480
481        // Try to overwrite an existing profile.
482        let err = get_backup(ctx1, provider.qr()).await.unwrap_err();
483        assert!(format!("{err:#}").contains("Cannot import backups to accounts in use"));
484
485        // ctx0 is supposed to also finish, and emit an error:
486        provider.await.unwrap();
487        ctx0.evtracker
488            .get_matching(|e| matches!(e, EventType::Error(_)))
489            .await;
490
491        assert_eq!(ctx1.get_primary_self_addr().await?, "bob@example.net");
492
493        Ok(())
494    }
495
496    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
497    async fn test_drop_provider() {
498        let mut tcm = TestContextManager::new();
499        let ctx = tcm.alice().await;
500
501        let provider = BackupProvider::prepare(&ctx).await.unwrap();
502        drop(provider);
503        ctx.evtracker
504            .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
505            .await;
506    }
507}