1use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::Poll;
33
34use anyhow::{bail, format_err, Context as _, Result};
35use futures_lite::FutureExt;
36use iroh::{Endpoint, RelayMode};
37use tokio::fs;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41use crate::chat::add_device_msg;
42use crate::context::Context;
43use crate::imex::BlobDirContents;
44use crate::message::Message;
45use crate::qr::Qr;
46use crate::stock_str::backup_transfer_msg_body;
47use crate::tools::{create_id, time, TempPathGuard};
48use crate::{e2ee, EventType};
49
50use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
51
52const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
54
55#[derive(Debug)]
66pub struct BackupProvider {
67 _endpoint: Endpoint,
69
70 node_addr: iroh::NodeAddr,
72
73 auth_token: String,
76
77 handle: JoinHandle<Result<()>>,
79
80 _drop_guard: tokio_util::sync::DropGuard,
82}
83
84impl BackupProvider {
85 pub async fn prepare(context: &Context) -> Result<Self> {
96 let relay_mode = RelayMode::Disabled;
97 let endpoint = Endpoint::builder()
98 .tls_x509() .alpns(vec![BACKUP_ALPN.to_vec()])
100 .relay_mode(relay_mode)
101 .bind()
102 .await?;
103 let node_addr = endpoint.node_addr().await?;
104
105 let cancel_token = context.alloc_ongoing().await?;
107 let paused_guard = context.scheduler.pause(context.clone()).await?;
108 let context_dir = context
109 .get_blobdir()
110 .parent()
111 .context("Context dir not found")?;
112
113 e2ee::ensure_secret_key_exists(context)
115 .await
116 .context("Cannot create private key or private key not available")?;
117
118 let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
119 if fs::metadata(&dbfile).await.is_ok() {
120 fs::remove_file(&dbfile).await?;
121 warn!(context, "Previous database export deleted");
122 }
123 let dbfile = TempPathGuard::new(dbfile);
124
125 let auth_token = create_id();
127
128 let passphrase = String::new();
129
130 export_database(context, &dbfile, passphrase, time())
131 .await
132 .context("Database export failed")?;
133
134 let drop_token = CancellationToken::new();
135 let handle = {
136 let context = context.clone();
137 let drop_token = drop_token.clone();
138 let endpoint = endpoint.clone();
139 let auth_token = auth_token.clone();
140 tokio::spawn(async move {
141 Self::accept_loop(
142 context.clone(),
143 endpoint,
144 auth_token,
145 cancel_token,
146 drop_token,
147 dbfile,
148 )
149 .await;
150 info!(context, "Finished accept loop.");
151
152 context.free_ongoing().await;
153
154 drop(paused_guard);
156 Ok(())
157 })
158 };
159 Ok(Self {
160 _endpoint: endpoint,
161 node_addr,
162 auth_token,
163 handle,
164 _drop_guard: drop_token.drop_guard(),
165 })
166 }
167
168 async fn handle_connection(
169 context: Context,
170 conn: iroh::endpoint::Connecting,
171 auth_token: String,
172 dbfile: Arc<TempPathGuard>,
173 ) -> Result<()> {
174 let conn = conn.await?;
175 let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
176
177 let mut received_auth_token = vec![0u8; auth_token.len()];
179 recv_stream.read_exact(&mut received_auth_token).await?;
180 if received_auth_token.as_slice() != auth_token.as_bytes() {
181 warn!(context, "Received wrong backup authentication token.");
182 return Ok(());
183 }
184
185 info!(context, "Received valid backup authentication token.");
186 context.emit_event(EventType::ImexProgress(1));
188
189 let blobdir = BlobDirContents::new(&context).await?;
190
191 let mut file_size = 0;
192 file_size += dbfile.metadata()?.len();
193 for blob in blobdir.iter() {
194 file_size += blob.to_abs_path().metadata()?.len()
195 }
196
197 send_stream.write_all(&file_size.to_be_bytes()).await?;
198
199 export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
200 .await
201 .context("Failed to write backup into QUIC stream")?;
202 info!(context, "Finished writing backup into QUIC stream.");
203 let mut buf = [0u8; 1];
204 info!(context, "Waiting for acknowledgment.");
205 recv_stream.read_exact(&mut buf).await?;
206 info!(context, "Received backup reception acknowledgement.");
207 context.emit_event(EventType::ImexProgress(1000));
208
209 let mut msg = Message::new_text(backup_transfer_msg_body(&context).await);
210 add_device_msg(&context, None, Some(&mut msg)).await?;
211
212 Ok(())
213 }
214
215 async fn accept_loop(
216 context: Context,
217 endpoint: Endpoint,
218 auth_token: String,
219 cancel_token: async_channel::Receiver<()>,
220 drop_token: CancellationToken,
221 dbfile: TempPathGuard,
222 ) {
223 let dbfile = Arc::new(dbfile);
224 loop {
225 tokio::select! {
226 biased;
227
228 conn = endpoint.accept() => {
229 if let Some(conn) = conn {
230 let conn = match conn.accept() {
231 Ok(conn) => conn,
232 Err(err) => {
233 warn!(context, "Failed to accept iroh connection: {err:#}.");
234 continue;
235 }
236 };
237 let context = context.clone();
239 let auth_token = auth_token.clone();
240 let dbfile = dbfile.clone();
241 if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
242 async {
243 cancel_token.recv().await.ok();
244 Err(format_err!("Backup transfer cancelled"))
245 }
246 ).race(
247 async {
248 drop_token.cancelled().await;
249 Err(format_err!("Backup provider dropped"))
250 }
251 ).await {
252 warn!(context, "Error while handling backup connection: {err:#}.");
253 context.emit_event(EventType::ImexProgress(0));
254 break;
255 } else {
256 info!(context, "Backup transfer finished successfully.");
257 break;
258 }
259 } else {
260 break;
261 }
262 },
263 _ = cancel_token.recv() => {
264 info!(context, "Backup transfer cancelled by the user, stopping accept loop.");
265 context.emit_event(EventType::ImexProgress(0));
266 break;
267 }
268 _ = drop_token.cancelled() => {
269 info!(context, "Backup transfer cancelled by dropping the provider, stopping accept loop.");
270 context.emit_event(EventType::ImexProgress(0));
271 break;
272 }
273 }
274 }
275 }
276
277 pub fn qr(&self) -> Qr {
281 Qr::Backup2 {
282 node_addr: self.node_addr.clone(),
283
284 auth_token: self.auth_token.clone(),
285 }
286 }
287}
288
289impl Future for BackupProvider {
290 type Output = Result<()>;
291
292 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
294 Pin::new(&mut self.handle).poll(cx)?
295 }
296}
297
298pub async fn get_backup2(
299 context: &Context,
300 node_addr: iroh::NodeAddr,
301 auth_token: String,
302) -> Result<()> {
303 let relay_mode = RelayMode::Disabled;
304
305 let endpoint = Endpoint::builder()
306 .tls_x509() .relay_mode(relay_mode)
308 .bind()
309 .await?;
310
311 let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
312 let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
313 info!(context, "Sending backup authentication token.");
314 send_stream.write_all(auth_token.as_bytes()).await?;
315
316 let passphrase = String::new();
317 info!(context, "Starting to read backup from the stream.");
318
319 let mut file_size_buf = [0u8; 8];
320 recv_stream.read_exact(&mut file_size_buf).await?;
321 let file_size = u64::from_be_bytes(file_size_buf);
322 info!(context, "Received backup file size.");
323 context.emit_event(EventType::ImexProgress(1));
325
326 import_backup_stream(context, recv_stream, file_size, passphrase)
327 .await
328 .context("Failed to import backup from QUIC stream")?;
329 info!(context, "Finished importing backup from the stream.");
330 context.emit_event(EventType::ImexProgress(1000));
331
332 send_stream.write_all(b".").await.ok();
335 send_stream.finish().ok();
336 info!(context, "Sent backup reception acknowledgment.");
337
338 _ = send_stream.stopped().await;
341
342 Ok(())
343}
344
345pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
357 match qr {
358 Qr::Backup2 {
359 node_addr,
360 auth_token,
361 } => {
362 let cancel_token = context.alloc_ongoing().await?;
363 let res = get_backup2(context, node_addr, auth_token)
364 .race(async {
365 cancel_token.recv().await.ok();
366 Err(format_err!("Backup reception cancelled"))
367 })
368 .await;
369 if res.is_err() {
370 context.emit_event(EventType::ImexProgress(0));
371 }
372 context.free_ongoing().await;
373 res?;
374 }
375 _ => bail!("QR code for backup must be of type DCBACKUP2"),
376 }
377 Ok(())
378}
379
380#[cfg(test)]
381mod tests {
382 use std::time::Duration;
383
384 use crate::chat::{get_chat_msgs, send_msg, ChatItem};
385 use crate::message::Viewtype;
386 use crate::test_utils::TestContextManager;
387
388 use super::*;
389
390 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
391 async fn test_send_receive() {
392 let mut tcm = TestContextManager::new();
393
394 let ctx0 = tcm.alice().await;
396
397 let self_chat = ctx0.get_self_chat().await;
399 let mut msg = Message::new_text("hi there".to_string());
400 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
401
402 let file = ctx0.get_blobdir().join("hello.txt");
404 fs::write(&file, "i am attachment").await.unwrap();
405 let mut msg = Message::new(Viewtype::File);
406 msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain"))
407 .unwrap();
408 send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
409
410 let provider = BackupProvider::prepare(&ctx0).await.unwrap();
412
413 let ctx1 = tcm.unconfigured().await;
415 get_backup(&ctx1, provider.qr()).await.unwrap();
416
417 tokio::time::timeout(Duration::from_secs(30), provider)
419 .await
420 .expect("timed out")
421 .expect("error in provider");
422
423 let self_chat = ctx1.get_self_chat().await;
425 let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
426 assert_eq!(msgs.len(), 2);
427 let msgid = match msgs.first().unwrap() {
428 ChatItem::Message { msg_id } => msg_id,
429 _ => panic!("wrong chat item"),
430 };
431 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
432 let text = msg.get_text();
433 assert_eq!(text, "hi there");
434 let msgid = match msgs.get(1).unwrap() {
435 ChatItem::Message { msg_id } => msg_id,
436 _ => panic!("wrong chat item"),
437 };
438 let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
439
440 let path = msg.get_file(&ctx1).unwrap();
441 assert_eq!(
442 path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"),
444 path
445 );
446 assert_eq!("hello.txt", msg.get_filename().unwrap());
447 let text = fs::read_to_string(&path).await.unwrap();
448 assert_eq!(text, "i am attachment");
449
450 let path = path.with_file_name("saved.txt");
451 msg.save_file(&ctx1, &path).await.unwrap();
452 let text = fs::read_to_string(&path).await.unwrap();
453 assert_eq!(text, "i am attachment");
454 assert!(msg.save_file(&ctx1, &path).await.is_err());
455
456 for ctx in [&ctx0, &ctx1] {
458 ctx.evtracker
459 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
460 .await;
461 ctx.evtracker
462 .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
463 .await;
464 }
465 }
466
467 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
468 async fn test_drop_provider() {
469 let mut tcm = TestContextManager::new();
470 let ctx = tcm.alice().await;
471
472 let provider = BackupProvider::prepare(&ctx).await.unwrap();
473 drop(provider);
474 ctx.evtracker
475 .get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
476 .await;
477 }
478}