1use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{Context as _, Result, bail, format_err};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::chat::add_device_msg;
42use crate::context::Context;
43use crate::imex::BlobDirContents;
44use crate::log::{info, warn};
45use crate::message::Message;
46use crate::qr::Qr;
47use crate::stock_str::backup_transfer_msg_body;
48use crate::tools::{TempPathGuard, create_id, time};
49use crate::{EventType, e2ee};
50
51use super::{DBFILE_BACKUP_NAME, export_backup_stream, export_database, import_backup_stream};
52
53const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
55
56#[derive(Debug)]
67pub struct BackupProvider {
68 _endpoint: Endpoint,
70
71 node_addr: iroh::NodeAddr,
73
74 auth_token: String,
77
78 handle: JoinHandle<Result<()>>,
80
81 _drop_guard: tokio_util::sync::DropGuard,
83}
84
85impl BackupProvider {
86 pub async fn prepare(context: &Context) -> Result<Self> {
97 let relay_mode = RelayMode::Disabled;
98 let endpoint = Endpoint::builder()
99 .tls_x509() .alpns(vec![BACKUP_ALPN.to_vec()])
101 .relay_mode(relay_mode)
102 .bind()
103 .await?;
104 let node_addr = endpoint.node_addr().await?;
105
106 let cancel_token = context.alloc_ongoing().await?;
108 let paused_guard = context.scheduler.pause(context.clone()).await?;
109 let context_dir = context
110 .get_blobdir()
111 .parent()
112 .context("Context dir not found")?;
113
114 e2ee::ensure_secret_key_exists(context)
116 .await
117 .context("Cannot create private key or private key not available")?;
118
119 let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
120 if fs::metadata(&dbfile).await.is_ok() {
121 fs::remove_file(&dbfile).await?;
122 warn!(context, "Previous database export deleted");
123 }
124 let dbfile = TempPathGuard::new(dbfile);
125
126 let auth_token = create_id();
128
129 let passphrase = String::new();
130
131 export_database(context, &dbfile, passphrase, time())
132 .await
133 .context("Database export failed")?;
134
135 let drop_token = CancellationToken::new();
136 let handle = {
137 let context = context.clone();
138 let drop_token = drop_token.clone();
139 let endpoint = endpoint.clone();
140 let auth_token = auth_token.clone();
141 tokio::spawn(async move {
142 Self::accept_loop(
143 context.clone(),
144 endpoint,
145 auth_token,
146 cancel_token,
147 drop_token,
148 dbfile,
149 )
150 .await;
151 info!(context, "Finished accept loop.");
152
153 context.free_ongoing().await;
154
155 drop(paused_guard);
157 Ok(())
158 })
159 };
160 Ok(Self {
161 _endpoint: endpoint,
162 node_addr,
163 auth_token,
164 handle,
165 _drop_guard: drop_token.drop_guard(),
166 })
167 }
168
169 async fn handle_connection(
170 context: Context,
171 conn: iroh::endpoint::Connecting,
172 auth_token: String,
173 dbfile: Arc<TempPathGuard>,
174 ) -> Result<()> {
175 let conn = conn.await?;
176 let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
177
178 let mut received_auth_token = vec![0u8; auth_token.len()];
180 recv_stream.read_exact(&mut received_auth_token).await?;
181 if received_auth_token.as_slice() != auth_token.as_bytes() {
182 warn!(context, "Received wrong backup authentication token.");
183 return Ok(());
184 }
185
186 info!(context, "Received valid backup authentication token.");
187 context.emit_event(EventType::ImexProgress(1));
189
190 let blobdir = BlobDirContents::new(&context).await?;
191
192 let mut file_size = 0;
193 file_size += dbfile.metadata()?.len();
194 for blob in blobdir.iter() {
195 file_size += blob.to_abs_path().metadata()?.len()
196 }
197
198 send_stream.write_all(&file_size.to_be_bytes()).await?;
199
200 export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
201 .await
202 .context("Failed to write backup into QUIC stream")?;
203 info!(context, "Finished writing backup into QUIC stream.");
204 let mut buf = [0u8; 1];
205 info!(context, "Waiting for acknowledgment.");
206 recv_stream.read_exact(&mut buf).await?;
207 info!(context, "Received backup reception acknowledgement.");
208 context.emit_event(EventType::ImexProgress(1000));
209
210 let mut msg = Message::new_text(backup_transfer_msg_body(&context).await);
211 add_device_msg(&context, None, Some(&mut msg)).await?;
212
213 Ok(())
214 }
215
216 async fn accept_loop(
217 context: Context,
218 endpoint: Endpoint,
219 auth_token: String,
220 cancel_token: async_channel::Receiver<()>,
221 drop_token: CancellationToken,
222 dbfile: TempPathGuard,
223 ) {
224 let dbfile = Arc::new(dbfile);
225 loop {
226 tokio::select! {
227 biased;
228
229 conn = endpoint.accept() => {
230 if let Some(conn) = conn {
231 let conn = match conn.accept() {
232 Ok(conn) => conn,
233 Err(err) => {
234 warn!(context, "Failed to accept iroh connection: {err:#}.");
235 continue;
236 }
237 };
238 let context = context.clone();
240 let auth_token = auth_token.clone();
241 let dbfile = dbfile.clone();
242 if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
243 async {
244 cancel_token.recv().await.ok();
245 Err(format_err!("Backup transfer cancelled"))
246 }
247 ).race(
248 async {
249 drop_token.cancelled().await;
250 Err(format_err!("Backup provider dropped"))
251 }
252 ).await {
253 error!(context, "Error while handling backup connection: {err:#}.");
254 context.emit_event(EventType::ImexProgress(0));
255 break;
256 } else {
257 info!(context, "Backup transfer finished successfully.");
258 break;
259 }
260 } else {
261 break;
262 }
263 },
264 _ = cancel_token.recv() => {
265 info!(context, "Backup transfer cancelled by the user, stopping accept loop.");
266 context.emit_event(EventType::ImexProgress(0));
267 break;
268 }
269 _ = drop_token.cancelled() => {
270 info!(context, "Backup transfer cancelled by dropping the provider, stopping accept loop.");
271 context.emit_event(EventType::ImexProgress(0));
272 break;
273 }
274 }
275 }
276 }
277
278 pub fn qr(&self) -> Qr {
282 Qr::Backup2 {
283 node_addr: self.node_addr.clone(),
284
285 auth_token: self.auth_token.clone(),
286 }
287 }
288}
289
290impl Future for BackupProvider {
291 type Output = Result<()>;
292
293 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
295 Pin::new(&mut self.handle).poll(cx)?
296 }
297}
298
299pub async fn get_backup2(
300 context: &Context,
301 node_addr: iroh::NodeAddr,
302 auth_token: String,
303) -> Result<()> {
304 let relay_mode = RelayMode::Disabled;
305
306 let endpoint = Endpoint::builder()
307 .tls_x509() .relay_mode(relay_mode)
309 .bind()
310 .await?;
311
312 let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
313 let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
314 info!(context, "Sending backup authentication token.");
315 send_stream.write_all(auth_token.as_bytes()).await?;
316
317 let passphrase = String::new();
318 info!(context, "Starting to read backup from the stream.");
319
320 let mut file_size_buf = [0u8; 8];
321 recv_stream.read_exact(&mut file_size_buf).await?;
322 let file_size = u64::from_be_bytes(file_size_buf);
323 info!(context, "Received backup file size.");
324 context.emit_event(EventType::ImexProgress(1));
326
327 import_backup_stream(context, recv_stream, file_size, passphrase)
328 .await
329 .context("Failed to import backup from QUIC stream")?;
330 info!(context, "Finished importing backup from the stream.");
331 context.emit_event(EventType::ImexProgress(1000));
332
333 send_stream.write_all(b".").await.ok();
336 send_stream.finish().ok();
337 info!(context, "Sent backup reception acknowledgment.");
338
339 _ = send_stream.stopped().await;
342
343 Ok(())
344}
345
346pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
358 match qr {
359 Qr::Backup2 {
360 node_addr,
361 auth_token,
362 } => {
363 let cancel_token = context.alloc_ongoing().await?;
364 let res = get_backup2(context, node_addr, auth_token)
365 .race(async {
366 cancel_token.recv().await.ok();
367 Err(format_err!("Backup reception cancelled"))
368 })
369 .await;
370 if let Err(ref res) = res {
371 error!(context, "{:#}", res);
372 context.emit_event(EventType::ImexProgress(0));
373 }
374 context.free_ongoing().await;
375 res?;
376 }
377 _ => bail!("QR code for backup must be of type DCBACKUP2"),
378 }
379 Ok(())
380}
381
382#[cfg(test)]
383mod tests {
384 use std::time::Duration;
385
386 use crate::chat::{ChatItem, get_chat_msgs, send_msg};
387 use crate::message::Viewtype;
388 use crate::test_utils::TestContextManager;
389
390 use super::*;
391
392 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
393 async fn test_send_receive() {
394 let mut tcm = TestContextManager::new();
395
396 let ctx0 = tcm.alice().await;
398
399 let self_chat = ctx0.get_self_chat().await;
401 let mut msg = Message::new_text("hi there".to_string());
402 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
403
404 let file = ctx0.get_blobdir().join("hello.txt");
406 fs::write(&file, "i am attachment").await.unwrap();
407 let mut msg = Message::new(Viewtype::File);
408 msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
409 .unwrap();
410 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
411
412 let provider = BackupProvider::prepare(&ctx0).await.unwrap();
414
415 let ctx1 = tcm.unconfigured().await;
417 get_backup(&ctx1, provider.qr()).await.unwrap();
418
419 tokio::time::timeout(Duration::from_secs(30), provider)
421 .await
422 .expect("timed out")
423 .expect("error in provider");
424
425 let self_chat = ctx1.get_self_chat().await;
427 let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
428 assert_eq!(msgs.len(), 2);
429 let msgid = match msgs.first().unwrap() {
430 ChatItem::Message { msg_id } => msg_id,
431 _ => panic!("wrong chat item"),
432 };
433 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
434 let text = msg.get_text();
435 assert_eq!(text, "hi there");
436 let msgid = match msgs.get(1).unwrap() {
437 ChatItem::Message { msg_id } => msg_id,
438 _ => panic!("wrong chat item"),
439 };
440 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
441
442 let path = msg.get_file(&ctx1).unwrap();
443 assert_eq!(
444 path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
446 path
447 );
448 assert_eq!("hello.txt", msg.get_filename().unwrap());
449 let text = fs::read_to_string(&path).await.unwrap();
450 assert_eq!(text, "i am attachment");
451
452 let path = path.with_file_name("saved.txt");
453 msg.save_file(&ctx1, &path).await.unwrap();
454 let text = fs::read_to_string(&path).await.unwrap();
455 assert_eq!(text, "i am attachment");
456 assert!(msg.save_file(&ctx1, &path).await.is_err());
457
458 for ctx in [&ctx0, &ctx1] {
460 ctx.evtracker
461 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
462 .await;
463 ctx.evtracker
464 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
465 .await;
466 }
467 }
468
469 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
470 async fn test_drop_provider() {
471 let mut tcm = TestContextManager::new();
472 let ctx = tcm.alice().await;
473
474 let provider = BackupProvider::prepare(&ctx).await.unwrap();
475 drop(provider);
476 ctx.evtracker
477 .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
478 .await;
479 }
480}