1use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{Context as _, Result, bail, format_err};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::EventType;
42use crate::chat::add_device_msg;
43use crate::context::Context;
44use crate::imex::BlobDirContents;
45use crate::key;
46use crate::log::warn;
47use crate::message::Message;
48use crate::qr::Qr;
49use crate::stock_str::backup_transfer_msg_body;
50use crate::tools::{TempPathGuard, create_id, time};
51
52use super::{DBFILE_BACKUP_NAME, export_backup_stream, export_database, import_backup_stream};
53
54const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
56
57#[derive(Debug)]
68pub struct BackupProvider {
69 _endpoint: Endpoint,
71
72 node_addr: iroh::NodeAddr,
74
75 auth_token: String,
78
79 handle: JoinHandle<Result<()>>,
81
82 _drop_guard: tokio_util::sync::DropGuard,
84}
85
86impl BackupProvider {
87 pub async fn prepare(context: &Context) -> Result<Self> {
98 let relay_mode = RelayMode::Disabled;
99 let endpoint = Endpoint::builder()
100 .tls_x509() .alpns(vec![BACKUP_ALPN.to_vec()])
102 .relay_mode(relay_mode)
103 .bind()
104 .await?;
105 let node_addr = endpoint.node_addr().await?;
106
107 let cancel_token = context.alloc_ongoing().await?;
109 let paused_guard = context.scheduler.pause(context).await?;
110 let context_dir = context
111 .get_blobdir()
112 .parent()
113 .context("Context dir not found")?;
114
115 key::ensure_secret_key_exists(context)
117 .await
118 .context("Cannot create private key or private key not available")?;
119
120 let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
121 if fs::metadata(&dbfile).await.is_ok() {
122 fs::remove_file(&dbfile).await?;
123 warn!(context, "Previous database export deleted");
124 }
125 let dbfile = TempPathGuard::new(dbfile);
126
127 let auth_token = create_id();
129
130 let passphrase = String::new();
131
132 export_database(context, &dbfile, passphrase, time())
133 .await
134 .context("Database export failed")?;
135
136 let drop_token = CancellationToken::new();
137 let handle = {
138 let context = context.clone();
139 let drop_token = drop_token.clone();
140 let endpoint = endpoint.clone();
141 let auth_token = auth_token.clone();
142 tokio::spawn(async move {
143 Self::accept_loop(
144 context.clone(),
145 endpoint,
146 auth_token,
147 cancel_token,
148 drop_token,
149 dbfile,
150 )
151 .await;
152 info!(context, "Finished accept loop.");
153
154 context.free_ongoing().await;
155
156 drop(paused_guard);
158 Ok(())
159 })
160 };
161 Ok(Self {
162 _endpoint: endpoint,
163 node_addr,
164 auth_token,
165 handle,
166 _drop_guard: drop_token.drop_guard(),
167 })
168 }
169
170 async fn handle_connection(
171 context: Context,
172 conn: iroh::endpoint::Connecting,
173 auth_token: String,
174 dbfile: Arc<TempPathGuard>,
175 ) -> Result<()> {
176 let conn = conn.await?;
177 let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
178
179 let mut received_auth_token = vec![0u8; auth_token.len()];
181 recv_stream.read_exact(&mut received_auth_token).await?;
182 if received_auth_token.as_slice() != auth_token.as_bytes() {
183 warn!(context, "Received wrong backup authentication token.");
184 return Ok(());
185 }
186
187 info!(context, "Received valid backup authentication token.");
188 context.emit_event(EventType::ImexProgress(1));
190
191 let blobdir = BlobDirContents::new(&context).await?;
192
193 let mut file_size = dbfile.metadata()?.len();
194 for blob in blobdir.iter() {
195 file_size = file_size
196 .checked_add(blob.to_abs_path().metadata()?.len())
197 .context("File size overflow")?;
198 }
199
200 send_stream.write_all(&file_size.to_be_bytes()).await?;
201
202 export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
203 .await
204 .context("Failed to write backup into QUIC stream")?;
205 info!(context, "Finished writing backup into QUIC stream.");
206 let mut buf = [0u8; 1];
207 info!(context, "Waiting for acknowledgment.");
208 recv_stream.read_exact(&mut buf).await?;
209 info!(context, "Received backup reception acknowledgement.");
210 context.emit_event(EventType::ImexProgress(1000));
211
212 let mut msg = Message::new_text(backup_transfer_msg_body(&context));
213 add_device_msg(&context, None, Some(&mut msg)).await?;
214
215 Ok(())
216 }
217
218 async fn accept_loop(
219 context: Context,
220 endpoint: Endpoint,
221 auth_token: String,
222 cancel_token: async_channel::Receiver<()>,
223 drop_token: CancellationToken,
224 dbfile: TempPathGuard,
225 ) {
226 let dbfile = Arc::new(dbfile);
227 loop {
228 tokio::select! {
229 biased;
230
231 conn = endpoint.accept() => {
232 if let Some(conn) = conn {
233 let conn = match conn.accept() {
234 Ok(conn) => conn,
235 Err(err) => {
236 warn!(context, "Failed to accept iroh connection: {err:#}.");
237 continue;
238 }
239 };
240 let context = context.clone();
242 let auth_token = auth_token.clone();
243 let dbfile = dbfile.clone();
244 if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
245 async {
246 cancel_token.recv().await.ok();
247 Err(format_err!("Backup transfer canceled"))
248 }
249 ).race(
250 async {
251 drop_token.cancelled().await;
252 Err(format_err!("Backup provider dropped"))
253 }
254 ).await {
255 error!(context, "Error while handling backup connection: {err:#}.");
256 context.emit_event(EventType::ImexProgress(0));
257 break;
258 } else {
259 info!(context, "Backup transfer finished successfully.");
260 break;
261 }
262 } else {
263 break;
264 }
265 },
266 _ = cancel_token.recv() => {
267 info!(context, "Backup transfer canceled by the user, stopping accept loop.");
268 context.emit_event(EventType::ImexProgress(0));
269 break;
270 }
271 _ = drop_token.cancelled() => {
272 info!(context, "Backup transfer canceled by dropping the provider, stopping accept loop.");
273 context.emit_event(EventType::ImexProgress(0));
274 break;
275 }
276 }
277 }
278 }
279
280 pub fn qr(&self) -> Qr {
284 Qr::Backup2 {
285 node_addr: self.node_addr.clone(),
286
287 auth_token: self.auth_token.clone(),
288 }
289 }
290}
291
292impl Future for BackupProvider {
293 type Output = Result<()>;
294
295 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
297 Pin::new(&mut self.handle).poll(cx)?
298 }
299}
300
301pub async fn get_backup2(
302 context: &Context,
303 node_addr: iroh::NodeAddr,
304 auth_token: String,
305) -> Result<()> {
306 let relay_mode = RelayMode::Disabled;
307
308 let endpoint = Endpoint::builder()
309 .tls_x509() .relay_mode(relay_mode)
311 .bind()
312 .await?;
313
314 let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
315 let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
316 info!(context, "Sending backup authentication token.");
317 send_stream.write_all(auth_token.as_bytes()).await?;
318
319 let passphrase = String::new();
320 info!(context, "Starting to read backup from the stream.");
321
322 let mut file_size_buf = [0u8; 8];
323 recv_stream.read_exact(&mut file_size_buf).await?;
324 let file_size = u64::from_be_bytes(file_size_buf);
325 info!(context, "Received backup file size.");
326 context.emit_event(EventType::ImexProgress(1));
328
329 import_backup_stream(context, recv_stream, file_size, passphrase)
330 .await
331 .context("Failed to import backup from QUIC stream")?;
332 info!(context, "Finished importing backup from the stream.");
333 context.emit_event(EventType::ImexProgress(1000));
334
335 send_stream.write_all(b".").await.ok();
338 send_stream.finish().ok();
339 info!(context, "Sent backup reception acknowledgment.");
340
341 _ = send_stream.stopped().await;
344
345 Ok(())
346}
347
348pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
360 match qr {
361 Qr::Backup2 {
362 node_addr,
363 auth_token,
364 } => {
365 let cancel_token = context.alloc_ongoing().await?;
366 let res = get_backup2(context, node_addr, auth_token)
367 .race(async {
368 cancel_token.recv().await.ok();
369 Err(format_err!("Backup reception canceled"))
370 })
371 .await;
372 if let Err(ref res) = res {
373 error!(context, "{:#}", res);
374 context.emit_event(EventType::ImexProgress(0));
375 }
376 context.free_ongoing().await;
377 res?;
378 }
379 _ => bail!("QR code for backup must be of type DCBACKUP2"),
380 }
381 Ok(())
382}
383
384#[cfg(test)]
385mod tests {
386 use std::time::Duration;
387
388 use crate::chat::{ChatItem, get_chat_msgs, send_msg};
389 use crate::message::Viewtype;
390 use crate::test_utils::TestContextManager;
391
392 use super::*;
393
394 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
395 async fn test_send_receive() {
396 let mut tcm = TestContextManager::new();
397
398 let ctx0 = tcm.alice().await;
400
401 let self_chat = ctx0.get_self_chat().await;
403 let mut msg = Message::new_text("hi there".to_string());
404 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
405
406 let file = ctx0.get_blobdir().join("hello.txt");
408 fs::write(&file, "i am attachment").await.unwrap();
409 let mut msg = Message::new(Viewtype::File);
410 msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
411 .unwrap();
412 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
413
414 let provider = BackupProvider::prepare(&ctx0).await.unwrap();
416
417 let ctx1 = tcm.unconfigured().await;
419 get_backup(&ctx1, provider.qr()).await.unwrap();
420
421 tokio::time::timeout(Duration::from_secs(30), provider)
423 .await
424 .expect("timed out")
425 .expect("error in provider");
426
427 let self_chat = ctx1.get_self_chat().await;
429 let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
430 assert_eq!(msgs.len(), 2);
431 let ChatItem::Message { msg_id } = msgs.first().unwrap() else {
432 panic!("wrong chat item");
433 };
434 let msg = Message::load_from_db(&ctx1, *msg_id).await.unwrap();
435 let text = msg.get_text();
436 assert_eq!(text, "hi there");
437 let ChatItem::Message { msg_id } = msgs.get(1).unwrap() else {
438 panic!("wrong chat item");
439 };
440 let msg = Message::load_from_db(&ctx1, *msg_id).await.unwrap();
441
442 let path = msg.get_file(&ctx1).unwrap();
443 assert_eq!(
444 path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
446 path
447 );
448 assert_eq!("hello.txt", msg.get_filename().unwrap());
449 let text = fs::read_to_string(&path).await.unwrap();
450 assert_eq!(text, "i am attachment");
451
452 let path = path.with_file_name("saved.txt");
453 msg.save_file(&ctx1, &path).await.unwrap();
454 let text = fs::read_to_string(&path).await.unwrap();
455 assert_eq!(text, "i am attachment");
456 assert!(msg.save_file(&ctx1, &path).await.is_err());
457
458 for ctx in [&ctx0, &ctx1] {
460 ctx.evtracker
461 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
462 .await;
463 ctx.evtracker
464 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
465 .await;
466 }
467 }
468
469 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
472 async fn test_cant_overwrite_profile_in_use() -> Result<()> {
473 let mut tcm = TestContextManager::new();
474 let ctx0 = &tcm.alice().await;
475 let ctx1 = &tcm.bob().await;
476
477 let provider = BackupProvider::prepare(ctx0).await?;
479
480 let err = get_backup(ctx1, provider.qr()).await.unwrap_err();
482 assert!(format!("{err:#}").contains("Cannot import backups to accounts in use"));
483
484 provider.await.unwrap();
486 ctx0.evtracker
487 .get_matching(|e| matches!(e, EventType::Error(_)))
488 .await;
489
490 assert_eq!(ctx1.get_primary_self_addr().await?, "bob@example.net");
491
492 Ok(())
493 }
494
495 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
496 async fn test_drop_provider() {
497 let mut tcm = TestContextManager::new();
498 let ctx = tcm.alice().await;
499
500 let provider = BackupProvider::prepare(&ctx).await.unwrap();
501 drop(provider);
502 ctx.evtracker
503 .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
504 .await;
505 }
506}