1use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{bail, format_err, Context as _, Result};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::chat::add_device_msg;
42use crate::context::Context;
43use crate::imex::BlobDirContents;
44use crate::message::Message;
45use crate::qr::Qr;
46use crate::stock_str::backup_transfer_msg_body;
47use crate::tools::{create_id, time, TempPathGuard};
48use crate::{e2ee, EventType};
49
50use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
51
52const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
54
55#[derive(Debug)]
66pub struct BackupProvider {
67 _endpoint: Endpoint,
69
70 node_addr: iroh::NodeAddr,
72
73 auth_token: String,
76
77 handle: JoinHandle<Result<()>>,
79
80 _drop_guard: tokio_util::sync::DropGuard,
82}
83
84impl BackupProvider {
85 pub async fn prepare(context: &Context) -> Result<Self> {
96 let relay_mode = RelayMode::Disabled;
97 let endpoint = Endpoint::builder()
98 .alpns(vec![BACKUP_ALPN.to_vec()])
99 .relay_mode(relay_mode)
100 .bind()
101 .await?;
102 let node_addr = endpoint.node_addr().await?;
103
104 let cancel_token = context.alloc_ongoing().await?;
106 let paused_guard = context.scheduler.pause(context.clone()).await?;
107 let context_dir = context
108 .get_blobdir()
109 .parent()
110 .context("Context dir not found")?;
111
112 e2ee::ensure_secret_key_exists(context)
114 .await
115 .context("Cannot create private key or private key not available")?;
116
117 let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
118 if fs::metadata(&dbfile).await.is_ok() {
119 fs::remove_file(&dbfile).await?;
120 warn!(context, "Previous database export deleted");
121 }
122 let dbfile = TempPathGuard::new(dbfile);
123
124 let auth_token = create_id();
126
127 let passphrase = String::new();
128
129 export_database(context, &dbfile, passphrase, time())
130 .await
131 .context("Database export failed")?;
132
133 let drop_token = CancellationToken::new();
134 let handle = {
135 let context = context.clone();
136 let drop_token = drop_token.clone();
137 let endpoint = endpoint.clone();
138 let auth_token = auth_token.clone();
139 tokio::spawn(async move {
140 Self::accept_loop(
141 context.clone(),
142 endpoint,
143 auth_token,
144 cancel_token,
145 drop_token,
146 dbfile,
147 )
148 .await;
149 info!(context, "Finished accept loop.");
150
151 context.free_ongoing().await;
152
153 drop(paused_guard);
155 Ok(())
156 })
157 };
158 Ok(Self {
159 _endpoint: endpoint,
160 node_addr,
161 auth_token,
162 handle,
163 _drop_guard: drop_token.drop_guard(),
164 })
165 }
166
167 async fn handle_connection(
168 context: Context,
169 conn: iroh::endpoint::Connecting,
170 auth_token: String,
171 dbfile: Arc<TempPathGuard>,
172 ) -> Result<()> {
173 let conn = conn.await?;
174 let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
175
176 let mut received_auth_token = vec![0u8; auth_token.len()];
178 recv_stream.read_exact(&mut received_auth_token).await?;
179 if received_auth_token.as_slice() != auth_token.as_bytes() {
180 warn!(context, "Received wrong backup authentication token.");
181 return Ok(());
182 }
183
184 info!(context, "Received valid backup authentication token.");
185 context.emit_event(EventType::ImexProgress(1));
187
188 let blobdir = BlobDirContents::new(&context).await?;
189
190 let mut file_size = 0;
191 file_size += dbfile.metadata()?.len();
192 for blob in blobdir.iter() {
193 file_size += blob.to_abs_path().metadata()?.len()
194 }
195
196 send_stream.write_all(&file_size.to_be_bytes()).await?;
197
198 export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
199 .await
200 .context("Failed to write backup into QUIC stream")?;
201 info!(context, "Finished writing backup into QUIC stream.");
202 let mut buf = [0u8; 1];
203 info!(context, "Waiting for acknowledgment.");
204 recv_stream.read_exact(&mut buf).await?;
205 info!(context, "Received backup reception acknowledgement.");
206 context.emit_event(EventType::ImexProgress(1000));
207
208 let mut msg = Message::new_text(backup_transfer_msg_body(&context).await);
209 add_device_msg(&context, None, Some(&mut msg)).await?;
210
211 Ok(())
212 }
213
214 async fn accept_loop(
215 context: Context,
216 endpoint: Endpoint,
217 auth_token: String,
218 cancel_token: async_channel::Receiver<()>,
219 drop_token: CancellationToken,
220 dbfile: TempPathGuard,
221 ) {
222 let dbfile = Arc::new(dbfile);
223 loop {
224 tokio::select! {
225 biased;
226
227 conn = endpoint.accept() => {
228 if let Some(conn) = conn {
229 let conn = match conn.accept() {
230 Ok(conn) => conn,
231 Err(err) => {
232 warn!(context, "Failed to accept iroh connection: {err:#}.");
233 continue;
234 }
235 };
236 let context = context.clone();
238 let auth_token = auth_token.clone();
239 let dbfile = dbfile.clone();
240 if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
241 async {
242 cancel_token.recv().await.ok();
243 Err(format_err!("Backup transfer cancelled"))
244 }
245 ).race(
246 async {
247 drop_token.cancelled().await;
248 Err(format_err!("Backup provider dropped"))
249 }
250 ).await {
251 warn!(context, "Error while handling backup connection: {err:#}.");
252 context.emit_event(EventType::ImexProgress(0));
253 break;
254 } else {
255 info!(context, "Backup transfer finished successfully.");
256 break;
257 }
258 } else {
259 break;
260 }
261 },
262 _ = cancel_token.recv() => {
263 info!(context, "Backup transfer cancelled by the user, stopping accept loop.");
264 context.emit_event(EventType::ImexProgress(0));
265 break;
266 }
267 _ = drop_token.cancelled() => {
268 info!(context, "Backup transfer cancelled by dropping the provider, stopping accept loop.");
269 context.emit_event(EventType::ImexProgress(0));
270 break;
271 }
272 }
273 }
274 }
275
276 pub fn qr(&self) -> Qr {
280 Qr::Backup2 {
281 node_addr: self.node_addr.clone(),
282
283 auth_token: self.auth_token.clone(),
284 }
285 }
286}
287
288impl Future for BackupProvider {
289 type Output = Result<()>;
290
291 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
293 Pin::new(&mut self.handle).poll(cx)?
294 }
295}
296
297pub async fn get_backup2(
298 context: &Context,
299 node_addr: iroh::NodeAddr,
300 auth_token: String,
301) -> Result<()> {
302 let relay_mode = RelayMode::Disabled;
303
304 let endpoint = Endpoint::builder().relay_mode(relay_mode).bind().await?;
305
306 let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
307 let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
308 info!(context, "Sending backup authentication token.");
309 send_stream.write_all(auth_token.as_bytes()).await?;
310
311 let passphrase = String::new();
312 info!(context, "Starting to read backup from the stream.");
313
314 let mut file_size_buf = [0u8; 8];
315 recv_stream.read_exact(&mut file_size_buf).await?;
316 let file_size = u64::from_be_bytes(file_size_buf);
317 info!(context, "Received backup file size.");
318 context.emit_event(EventType::ImexProgress(1));
320
321 import_backup_stream(context, recv_stream, file_size, passphrase)
322 .await
323 .context("Failed to import backup from QUIC stream")?;
324 info!(context, "Finished importing backup from the stream.");
325 context.emit_event(EventType::ImexProgress(1000));
326
327 send_stream.write_all(b".").await.ok();
330 send_stream.finish().ok();
331 info!(context, "Sent backup reception acknowledgment.");
332
333 _ = send_stream.stopped().await;
336
337 Ok(())
338}
339
340pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
352 match qr {
353 Qr::Backup2 {
354 node_addr,
355 auth_token,
356 } => {
357 let cancel_token = context.alloc_ongoing().await?;
358 let res = get_backup2(context, node_addr, auth_token)
359 .race(async {
360 cancel_token.recv().await.ok();
361 Err(format_err!("Backup reception cancelled"))
362 })
363 .await;
364 if res.is_err() {
365 context.emit_event(EventType::ImexProgress(0));
366 }
367 context.free_ongoing().await;
368 res?;
369 }
370 _ => bail!("QR code for backup must be of type DCBACKUP2"),
371 }
372 Ok(())
373}
374
375#[cfg(test)]
376mod tests {
377 use std::time::Duration;
378
379 use crate::chat::{get_chat_msgs, send_msg, ChatItem};
380 use crate::message::Viewtype;
381 use crate::test_utils::TestContextManager;
382
383 use super::*;
384
385 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
386 async fn test_send_receive() {
387 let mut tcm = TestContextManager::new();
388
389 let ctx0 = tcm.alice().await;
391
392 let self_chat = ctx0.get_self_chat().await;
394 let mut msg = Message::new_text("hi there".to_string());
395 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
396
397 let file = ctx0.get_blobdir().join("hello.txt");
399 fs::write(&file, "i am attachment").await.unwrap();
400 let mut msg = Message::new(Viewtype::File);
401 msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
402 .unwrap();
403 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
404
405 let provider = BackupProvider::prepare(&ctx0).await.unwrap();
407
408 let ctx1 = tcm.unconfigured().await;
410 get_backup(&ctx1, provider.qr()).await.unwrap();
411
412 tokio::time::timeout(Duration::from_secs(30), provider)
414 .await
415 .expect("timed out")
416 .expect("error in provider");
417
418 let self_chat = ctx1.get_self_chat().await;
420 let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
421 assert_eq!(msgs.len(), 2);
422 let msgid = match msgs.first().unwrap() {
423 ChatItem::Message { msg_id } => msg_id,
424 _ => panic!("wrong chat item"),
425 };
426 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
427 let text = msg.get_text();
428 assert_eq!(text, "hi there");
429 let msgid = match msgs.get(1).unwrap() {
430 ChatItem::Message { msg_id } => msg_id,
431 _ => panic!("wrong chat item"),
432 };
433 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
434
435 let path = msg.get_file(&ctx1).unwrap();
436 assert_eq!(
437 path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
439 path
440 );
441 assert_eq!("hello.txt", msg.get_filename().unwrap());
442 let text = fs::read_to_string(&path).await.unwrap();
443 assert_eq!(text, "i am attachment");
444
445 let path = path.with_file_name("saved.txt");
446 msg.save_file(&ctx1, &path).await.unwrap();
447 let text = fs::read_to_string(&path).await.unwrap();
448 assert_eq!(text, "i am attachment");
449 assert!(msg.save_file(&ctx1, &path).await.is_err());
450
451 for ctx in [&ctx0, &ctx1] {
453 ctx.evtracker
454 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
455 .await;
456 ctx.evtracker
457 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
458 .await;
459 }
460 }
461
462 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
463 async fn test_drop_provider() {
464 let mut tcm = TestContextManager::new();
465 let ctx = tcm.alice().await;
466
467 let provider = BackupProvider::prepare(&ctx).await.unwrap();
468 drop(provider);
469 ctx.evtracker
470 .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
471 .await;
472 }
473}