1use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{Context as _, Result, bail, format_err};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::chat::add_device_msg;
42use crate::context::Context;
43use crate::imex::BlobDirContents;
44use crate::log::warn;
45use crate::message::Message;
46use crate::qr::Qr;
47use crate::stock_str::backup_transfer_msg_body;
48use crate::tools::{TempPathGuard, create_id, time};
49use crate::{EventType, e2ee};
50
51use super::{DBFILE_BACKUP_NAME, export_backup_stream, export_database, import_backup_stream};
52
53const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
55
56#[derive(Debug)]
67pub struct BackupProvider {
68 _endpoint: Endpoint,
70
71 node_addr: iroh::NodeAddr,
73
74 auth_token: String,
77
78 handle: JoinHandle<Result<()>>,
80
81 _drop_guard: tokio_util::sync::DropGuard,
83}
84
85impl BackupProvider {
86 pub async fn prepare(context: &Context) -> Result<Self> {
97 let relay_mode = RelayMode::Disabled;
98 let endpoint = Endpoint::builder()
99 .tls_x509() .alpns(vec![BACKUP_ALPN.to_vec()])
101 .relay_mode(relay_mode)
102 .bind()
103 .await?;
104 let node_addr = endpoint.node_addr().await?;
105
106 let cancel_token = context.alloc_ongoing().await?;
108 let paused_guard = context.scheduler.pause(context).await?;
109 let context_dir = context
110 .get_blobdir()
111 .parent()
112 .context("Context dir not found")?;
113
114 e2ee::ensure_secret_key_exists(context)
116 .await
117 .context("Cannot create private key or private key not available")?;
118
119 let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
120 if fs::metadata(&dbfile).await.is_ok() {
121 fs::remove_file(&dbfile).await?;
122 warn!(context, "Previous database export deleted");
123 }
124 let dbfile = TempPathGuard::new(dbfile);
125
126 let auth_token = create_id();
128
129 let passphrase = String::new();
130
131 export_database(context, &dbfile, passphrase, time())
132 .await
133 .context("Database export failed")?;
134
135 let drop_token = CancellationToken::new();
136 let handle = {
137 let context = context.clone();
138 let drop_token = drop_token.clone();
139 let endpoint = endpoint.clone();
140 let auth_token = auth_token.clone();
141 tokio::spawn(async move {
142 Self::accept_loop(
143 context.clone(),
144 endpoint,
145 auth_token,
146 cancel_token,
147 drop_token,
148 dbfile,
149 )
150 .await;
151 info!(context, "Finished accept loop.");
152
153 context.free_ongoing().await;
154
155 drop(paused_guard);
157 Ok(())
158 })
159 };
160 Ok(Self {
161 _endpoint: endpoint,
162 node_addr,
163 auth_token,
164 handle,
165 _drop_guard: drop_token.drop_guard(),
166 })
167 }
168
169 async fn handle_connection(
170 context: Context,
171 conn: iroh::endpoint::Connecting,
172 auth_token: String,
173 dbfile: Arc<TempPathGuard>,
174 ) -> Result<()> {
175 let conn = conn.await?;
176 let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
177
178 let mut received_auth_token = vec![0u8; auth_token.len()];
180 recv_stream.read_exact(&mut received_auth_token).await?;
181 if received_auth_token.as_slice() != auth_token.as_bytes() {
182 warn!(context, "Received wrong backup authentication token.");
183 return Ok(());
184 }
185
186 info!(context, "Received valid backup authentication token.");
187 context.emit_event(EventType::ImexProgress(1));
189
190 let blobdir = BlobDirContents::new(&context).await?;
191
192 let mut file_size = dbfile.metadata()?.len();
193 for blob in blobdir.iter() {
194 file_size = file_size
195 .checked_add(blob.to_abs_path().metadata()?.len())
196 .context("File size overflow")?;
197 }
198
199 send_stream.write_all(&file_size.to_be_bytes()).await?;
200
201 export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
202 .await
203 .context("Failed to write backup into QUIC stream")?;
204 info!(context, "Finished writing backup into QUIC stream.");
205 let mut buf = [0u8; 1];
206 info!(context, "Waiting for acknowledgment.");
207 recv_stream.read_exact(&mut buf).await?;
208 info!(context, "Received backup reception acknowledgement.");
209 context.emit_event(EventType::ImexProgress(1000));
210
211 let mut msg = Message::new_text(backup_transfer_msg_body(&context).await);
212 add_device_msg(&context, None, Some(&mut msg)).await?;
213
214 Ok(())
215 }
216
217 async fn accept_loop(
218 context: Context,
219 endpoint: Endpoint,
220 auth_token: String,
221 cancel_token: async_channel::Receiver<()>,
222 drop_token: CancellationToken,
223 dbfile: TempPathGuard,
224 ) {
225 let dbfile = Arc::new(dbfile);
226 loop {
227 tokio::select! {
228 biased;
229
230 conn = endpoint.accept() => {
231 if let Some(conn) = conn {
232 let conn = match conn.accept() {
233 Ok(conn) => conn,
234 Err(err) => {
235 warn!(context, "Failed to accept iroh connection: {err:#}.");
236 continue;
237 }
238 };
239 let context = context.clone();
241 let auth_token = auth_token.clone();
242 let dbfile = dbfile.clone();
243 if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
244 async {
245 cancel_token.recv().await.ok();
246 Err(format_err!("Backup transfer canceled"))
247 }
248 ).race(
249 async {
250 drop_token.cancelled().await;
251 Err(format_err!("Backup provider dropped"))
252 }
253 ).await {
254 error!(context, "Error while handling backup connection: {err:#}.");
255 context.emit_event(EventType::ImexProgress(0));
256 break;
257 } else {
258 info!(context, "Backup transfer finished successfully.");
259 break;
260 }
261 } else {
262 break;
263 }
264 },
265 _ = cancel_token.recv() => {
266 info!(context, "Backup transfer canceled by the user, stopping accept loop.");
267 context.emit_event(EventType::ImexProgress(0));
268 break;
269 }
270 _ = drop_token.cancelled() => {
271 info!(context, "Backup transfer canceled by dropping the provider, stopping accept loop.");
272 context.emit_event(EventType::ImexProgress(0));
273 break;
274 }
275 }
276 }
277 }
278
279 pub fn qr(&self) -> Qr {
283 Qr::Backup2 {
284 node_addr: self.node_addr.clone(),
285
286 auth_token: self.auth_token.clone(),
287 }
288 }
289}
290
291impl Future for BackupProvider {
292 type Output = Result<()>;
293
294 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
296 Pin::new(&mut self.handle).poll(cx)?
297 }
298}
299
300pub async fn get_backup2(
301 context: &Context,
302 node_addr: iroh::NodeAddr,
303 auth_token: String,
304) -> Result<()> {
305 let relay_mode = RelayMode::Disabled;
306
307 let endpoint = Endpoint::builder()
308 .tls_x509() .relay_mode(relay_mode)
310 .bind()
311 .await?;
312
313 let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
314 let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
315 info!(context, "Sending backup authentication token.");
316 send_stream.write_all(auth_token.as_bytes()).await?;
317
318 let passphrase = String::new();
319 info!(context, "Starting to read backup from the stream.");
320
321 let mut file_size_buf = [0u8; 8];
322 recv_stream.read_exact(&mut file_size_buf).await?;
323 let file_size = u64::from_be_bytes(file_size_buf);
324 info!(context, "Received backup file size.");
325 context.emit_event(EventType::ImexProgress(1));
327
328 import_backup_stream(context, recv_stream, file_size, passphrase)
329 .await
330 .context("Failed to import backup from QUIC stream")?;
331 info!(context, "Finished importing backup from the stream.");
332 context.emit_event(EventType::ImexProgress(1000));
333
334 send_stream.write_all(b".").await.ok();
337 send_stream.finish().ok();
338 info!(context, "Sent backup reception acknowledgment.");
339
340 _ = send_stream.stopped().await;
343
344 Ok(())
345}
346
347pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
359 match qr {
360 Qr::Backup2 {
361 node_addr,
362 auth_token,
363 } => {
364 let cancel_token = context.alloc_ongoing().await?;
365 let res = get_backup2(context, node_addr, auth_token)
366 .race(async {
367 cancel_token.recv().await.ok();
368 Err(format_err!("Backup reception canceled"))
369 })
370 .await;
371 if let Err(ref res) = res {
372 error!(context, "{:#}", res);
373 context.emit_event(EventType::ImexProgress(0));
374 }
375 context.free_ongoing().await;
376 res?;
377 }
378 _ => bail!("QR code for backup must be of type DCBACKUP2"),
379 }
380 Ok(())
381}
382
383#[cfg(test)]
384mod tests {
385 use std::time::Duration;
386
387 use crate::chat::{ChatItem, get_chat_msgs, send_msg};
388 use crate::message::Viewtype;
389 use crate::test_utils::TestContextManager;
390
391 use super::*;
392
393 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
394 async fn test_send_receive() {
395 let mut tcm = TestContextManager::new();
396
397 let ctx0 = tcm.alice().await;
399
400 let self_chat = ctx0.get_self_chat().await;
402 let mut msg = Message::new_text("hi there".to_string());
403 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
404
405 let file = ctx0.get_blobdir().join("hello.txt");
407 fs::write(&file, "i am attachment").await.unwrap();
408 let mut msg = Message::new(Viewtype::File);
409 msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
410 .unwrap();
411 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
412
413 let provider = BackupProvider::prepare(&ctx0).await.unwrap();
415
416 let ctx1 = tcm.unconfigured().await;
418 get_backup(&ctx1, provider.qr()).await.unwrap();
419
420 tokio::time::timeout(Duration::from_secs(30), provider)
422 .await
423 .expect("timed out")
424 .expect("error in provider");
425
426 let self_chat = ctx1.get_self_chat().await;
428 let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
429 assert_eq!(msgs.len(), 2);
430 let msgid = match msgs.first().unwrap() {
431 ChatItem::Message { msg_id } => msg_id,
432 _ => panic!("wrong chat item"),
433 };
434 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
435 let text = msg.get_text();
436 assert_eq!(text, "hi there");
437 let msgid = match msgs.get(1).unwrap() {
438 ChatItem::Message { msg_id } => msg_id,
439 _ => panic!("wrong chat item"),
440 };
441 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
442
443 let path = msg.get_file(&ctx1).unwrap();
444 assert_eq!(
445 path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
447 path
448 );
449 assert_eq!("hello.txt", msg.get_filename().unwrap());
450 let text = fs::read_to_string(&path).await.unwrap();
451 assert_eq!(text, "i am attachment");
452
453 let path = path.with_file_name("saved.txt");
454 msg.save_file(&ctx1, &path).await.unwrap();
455 let text = fs::read_to_string(&path).await.unwrap();
456 assert_eq!(text, "i am attachment");
457 assert!(msg.save_file(&ctx1, &path).await.is_err());
458
459 for ctx in [&ctx0, &ctx1] {
461 ctx.evtracker
462 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
463 .await;
464 ctx.evtracker
465 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
466 .await;
467 }
468 }
469
470 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
473 async fn test_cant_overwrite_profile_in_use() -> Result<()> {
474 let mut tcm = TestContextManager::new();
475 let ctx0 = &tcm.alice().await;
476 let ctx1 = &tcm.bob().await;
477
478 let provider = BackupProvider::prepare(ctx0).await?;
480
481 let err = get_backup(ctx1, provider.qr()).await.unwrap_err();
483 assert!(format!("{err:#}").contains("Cannot import backups to accounts in use"));
484
485 provider.await.unwrap();
487 ctx0.evtracker
488 .get_matching(|e| matches!(e, EventType::Error(_)))
489 .await;
490
491 assert_eq!(ctx1.get_primary_self_addr().await?, "bob@example.net");
492
493 Ok(())
494 }
495
496 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
497 async fn test_drop_provider() {
498 let mut tcm = TestContextManager::new();
499 let ctx = tcm.alice().await;
500
501 let provider = BackupProvider::prepare(&ctx).await.unwrap();
502 drop(provider);
503 ctx.evtracker
504 .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
505 .await;
506 }
507}