1use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{bail, format_err, Context as _, Result};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::chat::add_device_msg;
42use crate::context::Context;
43use crate::imex::BlobDirContents;
44use crate::log::{info, warn};
45use crate::message::Message;
46use crate::qr::Qr;
47use crate::stock_str::backup_transfer_msg_body;
48use crate::tools::{create_id, time, TempPathGuard};
49use crate::{e2ee, EventType};
50
51use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
52
53const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
55
56#[derive(Debug)]
67pub struct BackupProvider {
68 _endpoint: Endpoint,
70
71 node_addr: iroh::NodeAddr,
73
74 auth_token: String,
77
78 handle: JoinHandle<Result<()>>,
80
81 _drop_guard: tokio_util::sync::DropGuard,
83}
84
85impl BackupProvider {
86 pub async fn prepare(context: &Context) -> Result<Self> {
97 let relay_mode = RelayMode::Disabled;
98 let endpoint = Endpoint::builder()
99 .tls_x509() .alpns(vec![BACKUP_ALPN.to_vec()])
101 .relay_mode(relay_mode)
102 .bind()
103 .await?;
104 let node_addr = endpoint.node_addr().await?;
105
106 let cancel_token = context.alloc_ongoing().await?;
108 let paused_guard = context.scheduler.pause(context.clone()).await?;
109 let context_dir = context
110 .get_blobdir()
111 .parent()
112 .context("Context dir not found")?;
113
114 e2ee::ensure_secret_key_exists(context)
116 .await
117 .context("Cannot create private key or private key not available")?;
118
119 let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
120 if fs::metadata(&dbfile).await.is_ok() {
121 fs::remove_file(&dbfile).await?;
122 warn!(context, "Previous database export deleted");
123 }
124 let dbfile = TempPathGuard::new(dbfile);
125
126 let auth_token = create_id();
128
129 let passphrase = String::new();
130
131 export_database(context, &dbfile, passphrase, time())
132 .await
133 .context("Database export failed")?;
134
135 let drop_token = CancellationToken::new();
136 let handle = {
137 let context = context.clone();
138 let drop_token = drop_token.clone();
139 let endpoint = endpoint.clone();
140 let auth_token = auth_token.clone();
141 tokio::spawn(async move {
142 Self::accept_loop(
143 context.clone(),
144 endpoint,
145 auth_token,
146 cancel_token,
147 drop_token,
148 dbfile,
149 )
150 .await;
151 info!(context, "Finished accept loop.");
152
153 context.free_ongoing().await;
154
155 drop(paused_guard);
157 Ok(())
158 })
159 };
160 Ok(Self {
161 _endpoint: endpoint,
162 node_addr,
163 auth_token,
164 handle,
165 _drop_guard: drop_token.drop_guard(),
166 })
167 }
168
169 async fn handle_connection(
170 context: Context,
171 conn: iroh::endpoint::Connecting,
172 auth_token: String,
173 dbfile: Arc<TempPathGuard>,
174 ) -> Result<()> {
175 let conn = conn.await?;
176 let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
177
178 let mut received_auth_token = vec![0u8; auth_token.len()];
180 recv_stream.read_exact(&mut received_auth_token).await?;
181 if received_auth_token.as_slice() != auth_token.as_bytes() {
182 warn!(context, "Received wrong backup authentication token.");
183 return Ok(());
184 }
185
186 info!(context, "Received valid backup authentication token.");
187 context.emit_event(EventType::ImexProgress(1));
189
190 let blobdir = BlobDirContents::new(&context).await?;
191
192 let mut file_size = 0;
193 file_size += dbfile.metadata()?.len();
194 for blob in blobdir.iter() {
195 file_size += blob.to_abs_path().metadata()?.len()
196 }
197
198 send_stream.write_all(&file_size.to_be_bytes()).await?;
199
200 export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
201 .await
202 .context("Failed to write backup into QUIC stream")?;
203 info!(context, "Finished writing backup into QUIC stream.");
204 let mut buf = [0u8; 1];
205 info!(context, "Waiting for acknowledgment.");
206 recv_stream.read_exact(&mut buf).await?;
207 info!(context, "Received backup reception acknowledgement.");
208 context.emit_event(EventType::ImexProgress(1000));
209
210 let mut msg = Message::new_text(backup_transfer_msg_body(&context).await);
211 add_device_msg(&context, None, Some(&mut msg)).await?;
212
213 Ok(())
214 }
215
216 async fn accept_loop(
217 context: Context,
218 endpoint: Endpoint,
219 auth_token: String,
220 cancel_token: async_channel::Receiver<()>,
221 drop_token: CancellationToken,
222 dbfile: TempPathGuard,
223 ) {
224 let dbfile = Arc::new(dbfile);
225 loop {
226 tokio::select! {
227 biased;
228
229 conn = endpoint.accept() => {
230 if let Some(conn) = conn {
231 let conn = match conn.accept() {
232 Ok(conn) => conn,
233 Err(err) => {
234 warn!(context, "Failed to accept iroh connection: {err:#}.");
235 continue;
236 }
237 };
238 let context = context.clone();
240 let auth_token = auth_token.clone();
241 let dbfile = dbfile.clone();
242 if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
243 async {
244 cancel_token.recv().await.ok();
245 Err(format_err!("Backup transfer cancelled"))
246 }
247 ).race(
248 async {
249 drop_token.cancelled().await;
250 Err(format_err!("Backup provider dropped"))
251 }
252 ).await {
253 warn!(context, "Error while handling backup connection: {err:#}.");
254 context.emit_event(EventType::ImexProgress(0));
255 break;
256 } else {
257 info!(context, "Backup transfer finished successfully.");
258 break;
259 }
260 } else {
261 break;
262 }
263 },
264 _ = cancel_token.recv() => {
265 info!(context, "Backup transfer cancelled by the user, stopping accept loop.");
266 context.emit_event(EventType::ImexProgress(0));
267 break;
268 }
269 _ = drop_token.cancelled() => {
270 info!(context, "Backup transfer cancelled by dropping the provider, stopping accept loop.");
271 context.emit_event(EventType::ImexProgress(0));
272 break;
273 }
274 }
275 }
276 }
277
278 pub fn qr(&self) -> Qr {
282 Qr::Backup2 {
283 node_addr: self.node_addr.clone(),
284
285 auth_token: self.auth_token.clone(),
286 }
287 }
288}
289
290impl Future for BackupProvider {
291 type Output = Result<()>;
292
293 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
295 Pin::new(&mut self.handle).poll(cx)?
296 }
297}
298
299pub async fn get_backup2(
300 context: &Context,
301 node_addr: iroh::NodeAddr,
302 auth_token: String,
303) -> Result<()> {
304 let relay_mode = RelayMode::Disabled;
305
306 let endpoint = Endpoint::builder()
307 .tls_x509() .relay_mode(relay_mode)
309 .bind()
310 .await?;
311
312 let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
313 let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
314 info!(context, "Sending backup authentication token.");
315 send_stream.write_all(auth_token.as_bytes()).await?;
316
317 let passphrase = String::new();
318 info!(context, "Starting to read backup from the stream.");
319
320 let mut file_size_buf = [0u8; 8];
321 recv_stream.read_exact(&mut file_size_buf).await?;
322 let file_size = u64::from_be_bytes(file_size_buf);
323 info!(context, "Received backup file size.");
324 context.emit_event(EventType::ImexProgress(1));
326
327 import_backup_stream(context, recv_stream, file_size, passphrase)
328 .await
329 .context("Failed to import backup from QUIC stream")?;
330 info!(context, "Finished importing backup from the stream.");
331 context.emit_event(EventType::ImexProgress(1000));
332
333 send_stream.write_all(b".").await.ok();
336 send_stream.finish().ok();
337 info!(context, "Sent backup reception acknowledgment.");
338
339 _ = send_stream.stopped().await;
342
343 Ok(())
344}
345
346pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
358 match qr {
359 Qr::Backup2 {
360 node_addr,
361 auth_token,
362 } => {
363 let cancel_token = context.alloc_ongoing().await?;
364 let res = get_backup2(context, node_addr, auth_token)
365 .race(async {
366 cancel_token.recv().await.ok();
367 Err(format_err!("Backup reception cancelled"))
368 })
369 .await;
370 if res.is_err() {
371 context.emit_event(EventType::ImexProgress(0));
372 }
373 context.free_ongoing().await;
374 res?;
375 }
376 _ => bail!("QR code for backup must be of type DCBACKUP2"),
377 }
378 Ok(())
379}
380
381#[cfg(test)]
382mod tests {
383 use std::time::Duration;
384
385 use crate::chat::{get_chat_msgs, send_msg, ChatItem};
386 use crate::message::Viewtype;
387 use crate::test_utils::TestContextManager;
388
389 use super::*;
390
391 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
392 async fn test_send_receive() {
393 let mut tcm = TestContextManager::new();
394
395 let ctx0 = tcm.alice().await;
397
398 let self_chat = ctx0.get_self_chat().await;
400 let mut msg = Message::new_text("hi there".to_string());
401 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
402
403 let file = ctx0.get_blobdir().join("hello.txt");
405 fs::write(&file, "i am attachment").await.unwrap();
406 let mut msg = Message::new(Viewtype::File);
407 msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
408 .unwrap();
409 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
410
411 let provider = BackupProvider::prepare(&ctx0).await.unwrap();
413
414 let ctx1 = tcm.unconfigured().await;
416 get_backup(&ctx1, provider.qr()).await.unwrap();
417
418 tokio::time::timeout(Duration::from_secs(30), provider)
420 .await
421 .expect("timed out")
422 .expect("error in provider");
423
424 let self_chat = ctx1.get_self_chat().await;
426 let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
427 assert_eq!(msgs.len(), 2);
428 let msgid = match msgs.first().unwrap() {
429 ChatItem::Message { msg_id } => msg_id,
430 _ => panic!("wrong chat item"),
431 };
432 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
433 let text = msg.get_text();
434 assert_eq!(text, "hi there");
435 let msgid = match msgs.get(1).unwrap() {
436 ChatItem::Message { msg_id } => msg_id,
437 _ => panic!("wrong chat item"),
438 };
439 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
440
441 let path = msg.get_file(&ctx1).unwrap();
442 assert_eq!(
443 path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
445 path
446 );
447 assert_eq!("hello.txt", msg.get_filename().unwrap());
448 let text = fs::read_to_string(&path).await.unwrap();
449 assert_eq!(text, "i am attachment");
450
451 let path = path.with_file_name("saved.txt");
452 msg.save_file(&ctx1, &path).await.unwrap();
453 let text = fs::read_to_string(&path).await.unwrap();
454 assert_eq!(text, "i am attachment");
455 assert!(msg.save_file(&ctx1, &path).await.is_err());
456
457 for ctx in [&ctx0, &ctx1] {
459 ctx.evtracker
460 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
461 .await;
462 ctx.evtracker
463 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
464 .await;
465 }
466 }
467
468 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
469 async fn test_drop_provider() {
470 let mut tcm = TestContextManager::new();
471 let ctx = tcm.alice().await;
472
473 let provider = BackupProvider::prepare(&ctx).await.unwrap();
474 drop(provider);
475 ctx.evtracker
476 .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
477 .await;
478 }
479}