Skip to main content

deltachat/imex/
transfer.rs

1//! Transfer a backup to an other device.
2//!
3//! This module provides support for using [iroh](https://iroh.computer/)
4//! to initiate transfer of a backup to another device using a QR code.
5//!
6//! There are two parties to this:
7//! - The *Provider*, which starts a server and listens for connections.
8//! - The *Getter*, which connects to the server and retrieves the data.
9//!
10//! Both the provider and the getter are authenticated:
11//!
12//! - The provider is known by its *peer ID*.
13//! - The provider needs an *authentication token* from the getter before it accepts a
14//!   connection.
15//!
16//! Both these are transferred in the QR code offered to the getter.  This ensures that the
17//! getter can not connect to an impersonated provider and the provider does not offer the
18//! download to an impersonated getter.
19//!
20//! Protocol starts by getter opening a bidirectional QUIC stream
21//! to the provider and sending authentication token.
22//! Provider verifies received authentication token,
23//! sends the size of all files in a backup (database and all blobs)
24//! as an unsigned 64-bit big endian integer and streams the backup in tar format.
25//! Getter receives the backup and acknowledges successful reception
26//! by sending a single byte.
27//! Provider closes the endpoint after receiving an acknowledgment.
28
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{Context as _, Result, bail, format_err};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::EventType;
42use crate::chat::add_device_msg;
43use crate::context::Context;
44use crate::imex::BlobDirContents;
45use crate::key;
46use crate::log::warn;
47use crate::message::Message;
48use crate::qr::Qr;
49use crate::stock_str::backup_transfer_msg_body;
50use crate::tools::{TempPathGuard, create_id, time};
51
52use super::{DBFILE_BACKUP_NAME, export_backup_stream, export_database, import_backup_stream};
53
54/// ALPN protocol identifier for the backup transfer protocol.
55const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
56
57/// Provide or send a backup of this device.
58///
59/// This creates a backup of the current device and starts a service which offers another
60/// device to download this backup.
61///
62/// This does not make a full backup on disk, only the SQLite database is created on disk,
63/// the blobs in the blob directory are not copied.
64///
65/// This starts a task which acquires the global "ongoing" mutex.  If you need to stop the
66/// task use the [`Context::stop_ongoing`] mechanism.
67#[derive(Debug)]
68pub struct BackupProvider {
69    /// iroh endpoint.
70    _endpoint: Endpoint,
71
72    /// iroh address.
73    node_addr: iroh::NodeAddr,
74
75    /// Authentication token that should be submitted
76    /// to retrieve the backup.
77    auth_token: String,
78
79    /// Handle for the task accepting backup transfer requests.
80    handle: JoinHandle<Result<()>>,
81
82    /// Guard to cancel the provider on drop.
83    _drop_guard: tokio_util::sync::DropGuard,
84}
85
86impl BackupProvider {
87    /// Prepares for sending a backup to a second device.
88    ///
89    /// Before calling this function all I/O must be stopped so that no changes to the blobs
90    /// or database are happening, this is done by calling the [`Accounts::stop_io`] or
91    /// [`Context::stop_io`] APIs first.
92    ///
93    /// This will acquire the global "ongoing process" mutex, which can be used to cancel
94    /// the process.
95    ///
96    /// [`Accounts::stop_io`]: crate::accounts::Accounts::stop_io
97    pub async fn prepare(context: &Context) -> Result<Self> {
98        let relay_mode = RelayMode::Disabled;
99        let endpoint = Endpoint::builder()
100            .tls_x509() // For compatibility with iroh <0.34.0
101            .alpns(vec![BACKUP_ALPN.to_vec()])
102            .relay_mode(relay_mode)
103            .bind()
104            .await?;
105        let node_addr = endpoint.node_addr().await?;
106
107        // Acquire global "ongoing" mutex.
108        let cancel_token = context.alloc_ongoing().await?;
109        let paused_guard = context.scheduler.pause(context).await?;
110        let context_dir = context
111            .get_blobdir()
112            .parent()
113            .context("Context dir not found")?;
114
115        // before we export, make sure the private key exists
116        key::ensure_secret_key_exists(context)
117            .await
118            .context("Cannot create private key or private key not available")?;
119
120        let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
121        if fs::metadata(&dbfile).await.is_ok() {
122            fs::remove_file(&dbfile).await?;
123            warn!(context, "Previous database export deleted");
124        }
125        let dbfile = TempPathGuard::new(dbfile);
126
127        // Authentication token that receiver should send us to receive a backup.
128        let auth_token = create_id();
129
130        let passphrase = String::new();
131
132        export_database(context, &dbfile, passphrase, time())
133            .await
134            .context("Database export failed")?;
135
136        let drop_token = CancellationToken::new();
137        let handle = {
138            let context = context.clone();
139            let drop_token = drop_token.clone();
140            let endpoint = endpoint.clone();
141            let auth_token = auth_token.clone();
142            tokio::spawn(async move {
143                Self::accept_loop(
144                    context.clone(),
145                    endpoint,
146                    auth_token,
147                    cancel_token,
148                    drop_token,
149                    dbfile,
150                )
151                .await;
152                info!(context, "Finished accept loop.");
153
154                context.free_ongoing().await;
155
156                // Explicit drop to move the guards into this future
157                drop(paused_guard);
158                Ok(())
159            })
160        };
161        Ok(Self {
162            _endpoint: endpoint,
163            node_addr,
164            auth_token,
165            handle,
166            _drop_guard: drop_token.drop_guard(),
167        })
168    }
169
170    async fn handle_connection(
171        context: Context,
172        conn: iroh::endpoint::Connecting,
173        auth_token: String,
174        dbfile: Arc<TempPathGuard>,
175    ) -> Result<()> {
176        let conn = conn.await?;
177        let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
178
179        // Read authentication token from the stream.
180        let mut received_auth_token = vec![0u8; auth_token.len()];
181        recv_stream.read_exact(&mut received_auth_token).await?;
182        if received_auth_token.as_slice() != auth_token.as_bytes() {
183            warn!(context, "Received wrong backup authentication token.");
184            return Ok(());
185        }
186
187        info!(context, "Received valid backup authentication token.");
188        // Emit a nonzero progress so that UIs can display smth like "Transferring...".
189        context.emit_event(EventType::ImexProgress(1));
190
191        let blobdir = BlobDirContents::new(&context).await?;
192
193        let mut file_size = dbfile.metadata()?.len();
194        for blob in blobdir.iter() {
195            file_size = file_size
196                .checked_add(blob.to_abs_path().metadata()?.len())
197                .context("File size overflow")?;
198        }
199
200        send_stream.write_all(&file_size.to_be_bytes()).await?;
201
202        export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
203            .await
204            .context("Failed to write backup into QUIC stream")?;
205        info!(context, "Finished writing backup into QUIC stream.");
206        let mut buf = [0u8; 1];
207        info!(context, "Waiting for acknowledgment.");
208        recv_stream.read_exact(&mut buf).await?;
209        info!(context, "Received backup reception acknowledgement.");
210        context.emit_event(EventType::ImexProgress(1000));
211
212        let mut msg = Message::new_text(backup_transfer_msg_body(&context));
213        add_device_msg(&context, None, Some(&mut msg)).await?;
214
215        Ok(())
216    }
217
218    async fn accept_loop(
219        context: Context,
220        endpoint: Endpoint,
221        auth_token: String,
222        cancel_token: async_channel::Receiver<()>,
223        drop_token: CancellationToken,
224        dbfile: TempPathGuard,
225    ) {
226        let dbfile = Arc::new(dbfile);
227        loop {
228            tokio::select! {
229                biased;
230
231                conn = endpoint.accept() => {
232                    if let Some(conn) = conn {
233                        let conn = match conn.accept() {
234                            Ok(conn) => conn,
235                            Err(err) => {
236                               warn!(context, "Failed to accept iroh connection: {err:#}.");
237                               continue;
238                            }
239                        };
240                        // Got a new in-progress connection.
241                        let context = context.clone();
242                        let auth_token = auth_token.clone();
243                        let dbfile = dbfile.clone();
244                        if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
245                            async {
246                                cancel_token.recv().await.ok();
247                                Err(format_err!("Backup transfer canceled"))
248                            }
249                        ).race(
250                            async {
251                                drop_token.cancelled().await;
252                                Err(format_err!("Backup provider dropped"))
253                            }
254                        ).await {
255                            error!(context, "Error while handling backup connection: {err:#}.");
256                            context.emit_event(EventType::ImexProgress(0));
257                            break;
258                        } else {
259                            info!(context, "Backup transfer finished successfully.");
260                            break;
261                        }
262                    } else {
263                        break;
264                    }
265                },
266                _ = cancel_token.recv() => {
267                    info!(context, "Backup transfer canceled by the user, stopping accept loop.");
268                    context.emit_event(EventType::ImexProgress(0));
269                    break;
270                }
271                _ = drop_token.cancelled() => {
272                    info!(context, "Backup transfer canceled by dropping the provider, stopping accept loop.");
273                    context.emit_event(EventType::ImexProgress(0));
274                    break;
275                }
276            }
277        }
278    }
279
280    /// Returns a QR code that allows fetching this backup.
281    ///
282    /// This QR code can be passed to [`get_backup`] on a (different) device.
283    pub fn qr(&self) -> Qr {
284        Qr::Backup2 {
285            node_addr: self.node_addr.clone(),
286
287            auth_token: self.auth_token.clone(),
288        }
289    }
290}
291
292impl Future for BackupProvider {
293    type Output = Result<()>;
294
295    /// Waits for the backup transfer to complete.
296    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
297        Pin::new(&mut self.handle).poll(cx)?
298    }
299}
300
301pub async fn get_backup2(
302    context: &Context,
303    node_addr: iroh::NodeAddr,
304    auth_token: String,
305) -> Result<()> {
306    let relay_mode = RelayMode::Disabled;
307
308    let endpoint = Endpoint::builder()
309        .tls_x509() // For compatibility with iroh <0.34.0
310        .relay_mode(relay_mode)
311        .bind()
312        .await?;
313
314    let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
315    let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
316    info!(context, "Sending backup authentication token.");
317    send_stream.write_all(auth_token.as_bytes()).await?;
318
319    let passphrase = String::new();
320    info!(context, "Starting to read backup from the stream.");
321
322    let mut file_size_buf = [0u8; 8];
323    recv_stream.read_exact(&mut file_size_buf).await?;
324    let file_size = u64::from_be_bytes(file_size_buf);
325    info!(context, "Received backup file size.");
326    // Emit a nonzero progress so that UIs can display smth like "Transferring...".
327    context.emit_event(EventType::ImexProgress(1));
328
329    import_backup_stream(context, recv_stream, file_size, passphrase)
330        .await
331        .context("Failed to import backup from QUIC stream")?;
332    info!(context, "Finished importing backup from the stream.");
333    context.emit_event(EventType::ImexProgress(1000));
334
335    // Send an acknowledgement, but ignore the errors.
336    // We have imported backup successfully already.
337    send_stream.write_all(b".").await.ok();
338    send_stream.finish().ok();
339    info!(context, "Sent backup reception acknowledgment.");
340
341    // Wait for the peer to acknowledge reception of the acknowledgement
342    // before closing the connection.
343    _ = send_stream.stopped().await;
344
345    Ok(())
346}
347
348/// Contacts a backup provider and receives the backup from it.
349///
350/// This uses a QR code to contact another instance of deltachat which is providing a backup
351/// using the [`BackupProvider`].  Once connected it will authenticate using the secrets in
352/// the QR code and retrieve the backup.
353///
354/// This is a long running operation which will return only when completed.
355///
356/// Using [`Qr`] as argument is a bit odd as it only accepts specific variant of it.  It
357/// does avoid having [`iroh::NodeAddr`] in the primary API however, without
358/// having to revert to untyped bytes.
359pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
360    match qr {
361        Qr::Backup2 {
362            node_addr,
363            auth_token,
364        } => {
365            let cancel_token = context.alloc_ongoing().await?;
366            let res = get_backup2(context, node_addr, auth_token)
367                .race(async {
368                    cancel_token.recv().await.ok();
369                    Err(format_err!("Backup reception canceled"))
370                })
371                .await;
372            if let Err(ref res) = res {
373                error!(context, "{:#}", res);
374                context.emit_event(EventType::ImexProgress(0));
375            }
376            context.free_ongoing().await;
377            res?;
378        }
379        _ => bail!("QR code for backup must be of type DCBACKUP2"),
380    }
381    Ok(())
382}
383
384#[cfg(test)]
385mod tests {
386    use std::time::Duration;
387
388    use crate::chat::{ChatItem, get_chat_msgs, send_msg};
389    use crate::message::Viewtype;
390    use crate::test_utils::TestContextManager;
391
392    use super::*;
393
394    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
395    async fn test_send_receive() {
396        let mut tcm = TestContextManager::new();
397
398        // Create first device.
399        let ctx0 = tcm.alice().await;
400
401        // Write a message in the self chat
402        let self_chat = ctx0.get_self_chat().await;
403        let mut msg = Message::new_text("hi there".to_string());
404        send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
405
406        // Send an attachment in the self chat
407        let file = ctx0.get_blobdir().join("hello.txt");
408        fs::write(&file, "i am attachment").await.unwrap();
409        let mut msg = Message::new(Viewtype::File);
410        msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
411            .unwrap();
412        send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
413
414        // Prepare to transfer backup.
415        let provider = BackupProvider::prepare(&ctx0).await.unwrap();
416
417        // Set up second device.
418        let ctx1 = tcm.unconfigured().await;
419        get_backup(&ctx1, provider.qr()).await.unwrap();
420
421        // Make sure the provider finishes without an error.
422        tokio::time::timeout(Duration::from_secs(30), provider)
423            .await
424            .expect("timed out")
425            .expect("error in provider");
426
427        // Check that we have the self message.
428        let self_chat = ctx1.get_self_chat().await;
429        let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
430        assert_eq!(msgs.len(), 2);
431        let ChatItem::Message { msg_id } = msgs.first().unwrap() else {
432            panic!("wrong chat item");
433        };
434        let msg = Message::load_from_db(&ctx1, *msg_id).await.unwrap();
435        let text = msg.get_text();
436        assert_eq!(text, "hi there");
437        let ChatItem::Message { msg_id } = msgs.get(1).unwrap() else {
438            panic!("wrong chat item");
439        };
440        let msg = Message::load_from_db(&ctx1, *msg_id).await.unwrap();
441
442        let path = msg.get_file(&ctx1).unwrap();
443        assert_eq!(
444            // That's the hash of the file:
445            path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
446            path
447        );
448        assert_eq!("hello.txt", msg.get_filename().unwrap());
449        let text = fs::read_to_string(&path).await.unwrap();
450        assert_eq!(text, "i am attachment");
451
452        let path = path.with_file_name("saved.txt");
453        msg.save_file(&ctx1, &path).await.unwrap();
454        let text = fs::read_to_string(&path).await.unwrap();
455        assert_eq!(text, "i am attachment");
456        assert!(msg.save_file(&ctx1, &path).await.is_err());
457
458        // Check that both received the ImexProgress events.
459        for ctx in [&ctx0, &ctx1] {
460            ctx.evtracker
461                .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
462                .await;
463            ctx.evtracker
464                .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
465                .await;
466        }
467    }
468
469    /// Tests that trying to accidentally overwrite a profile
470    /// that is in use will fail.
471    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
472    async fn test_cant_overwrite_profile_in_use() -> Result<()> {
473        let mut tcm = TestContextManager::new();
474        let ctx0 = &tcm.alice().await;
475        let ctx1 = &tcm.bob().await;
476
477        // Prepare to transfer backup.
478        let provider = BackupProvider::prepare(ctx0).await?;
479
480        // Try to overwrite an existing profile.
481        let err = get_backup(ctx1, provider.qr()).await.unwrap_err();
482        assert!(format!("{err:#}").contains("Cannot import backups to accounts in use"));
483
484        // ctx0 is supposed to also finish, and emit an error:
485        provider.await.unwrap();
486        ctx0.evtracker
487            .get_matching(|e| matches!(e, EventType::Error(_)))
488            .await;
489
490        assert_eq!(ctx1.get_primary_self_addr().await?, "bob@example.net");
491
492        Ok(())
493    }
494
495    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
496    async fn test_drop_provider() {
497        let mut tcm = TestContextManager::new();
498        let ctx = tcm.alice().await;
499
500        let provider = BackupProvider::prepare(&ctx).await.unwrap();
501        drop(provider);
502        ctx.evtracker
503            .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
504            .await;
505    }
506}