use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use anyhow::{bail, format_err, Context as _, Result};
use futures_lite::FutureExt;
use iroh_net::relay::RelayMode;
use iroh_net::Endpoint;
use tokio::fs;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::chat::add_device_msg;
use crate::context::Context;
use crate::imex::BlobDirContents;
use crate::message::Message;
use crate::qr::Qr;
use crate::stock_str::backup_transfer_msg_body;
use crate::tools::{create_id, time, TempPathGuard};
use crate::EventType;
use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
#[derive(Debug)]
pub struct BackupProvider {
_endpoint: Endpoint,
node_addr: iroh_net::NodeAddr,
auth_token: String,
handle: JoinHandle<Result<()>>,
_drop_guard: tokio_util::sync::DropGuard,
}
impl BackupProvider {
pub async fn prepare(context: &Context) -> Result<Self> {
let relay_mode = RelayMode::Disabled;
let endpoint = Endpoint::builder()
.alpns(vec![BACKUP_ALPN.to_vec()])
.relay_mode(relay_mode)
.bind()
.await?;
let node_addr = endpoint.node_addr().await?;
let cancel_token = context.alloc_ongoing().await?;
let paused_guard = context.scheduler.pause(context.clone()).await?;
let context_dir = context
.get_blobdir()
.parent()
.context("Context dir not found")?;
let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
if fs::metadata(&dbfile).await.is_ok() {
fs::remove_file(&dbfile).await?;
warn!(context, "Previous database export deleted");
}
let dbfile = TempPathGuard::new(dbfile);
let auth_token = create_id();
let passphrase = String::new();
export_database(context, &dbfile, passphrase, time())
.await
.context("Database export failed")?;
let drop_token = CancellationToken::new();
let handle = {
let context = context.clone();
let drop_token = drop_token.clone();
let endpoint = endpoint.clone();
let auth_token = auth_token.clone();
tokio::spawn(async move {
Self::accept_loop(
context.clone(),
endpoint,
auth_token,
cancel_token,
drop_token,
dbfile,
)
.await;
info!(context, "Finished accept loop.");
context.free_ongoing().await;
drop(paused_guard);
Ok(())
})
};
Ok(Self {
_endpoint: endpoint,
node_addr,
auth_token,
handle,
_drop_guard: drop_token.drop_guard(),
})
}
async fn handle_connection(
context: Context,
conn: iroh_net::endpoint::Connecting,
auth_token: String,
dbfile: Arc<TempPathGuard>,
) -> Result<()> {
let conn = conn.await?;
let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
let mut received_auth_token = vec![0u8; auth_token.len()];
recv_stream.read_exact(&mut received_auth_token).await?;
if received_auth_token.as_slice() != auth_token.as_bytes() {
warn!(context, "Received wrong backup authentication token.");
return Ok(());
}
info!(context, "Received valid backup authentication token.");
context.emit_event(EventType::ImexProgress(1));
let blobdir = BlobDirContents::new(&context).await?;
let mut file_size = 0;
file_size += dbfile.metadata()?.len();
for blob in blobdir.iter() {
file_size += blob.to_abs_path().metadata()?.len()
}
send_stream.write_all(&file_size.to_be_bytes()).await?;
export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
.await
.context("Failed to write backup into QUIC stream")?;
info!(context, "Finished writing backup into QUIC stream.");
let mut buf = [0u8; 1];
info!(context, "Waiting for acknowledgment.");
recv_stream.read_exact(&mut buf).await?;
info!(context, "Received backup reception acknowledgement.");
context.emit_event(EventType::ImexProgress(1000));
let mut msg = Message::new_text(backup_transfer_msg_body(&context).await);
add_device_msg(&context, None, Some(&mut msg)).await?;
Ok(())
}
async fn accept_loop(
context: Context,
endpoint: Endpoint,
auth_token: String,
cancel_token: async_channel::Receiver<()>,
drop_token: CancellationToken,
dbfile: TempPathGuard,
) {
let dbfile = Arc::new(dbfile);
loop {
tokio::select! {
biased;
conn = endpoint.accept() => {
if let Some(conn) = conn {
let conn = match conn.accept() {
Ok(conn) => conn,
Err(err) => {
warn!(context, "Failed to accept iroh connection: {err:#}.");
continue;
}
};
let context = context.clone();
let auth_token = auth_token.clone();
let dbfile = dbfile.clone();
if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race(
async {
cancel_token.recv().await.ok();
Err(format_err!("Backup transfer cancelled"))
}
).race(
async {
drop_token.cancelled().await;
Err(format_err!("Backup provider dropped"))
}
).await {
warn!(context, "Error while handling backup connection: {err:#}.");
context.emit_event(EventType::ImexProgress(0));
break;
} else {
info!(context, "Backup transfer finished successfully.");
break;
}
} else {
break;
}
},
_ = cancel_token.recv() => {
info!(context, "Backup transfer cancelled by the user, stopping accept loop.");
context.emit_event(EventType::ImexProgress(0));
break;
}
_ = drop_token.cancelled() => {
info!(context, "Backup transfer cancelled by dropping the provider, stopping accept loop.");
context.emit_event(EventType::ImexProgress(0));
break;
}
}
}
}
pub fn qr(&self) -> Qr {
Qr::Backup2 {
node_addr: self.node_addr.clone(),
auth_token: self.auth_token.clone(),
}
}
}
impl Future for BackupProvider {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle).poll(cx)?
}
}
pub async fn get_backup2(
context: &Context,
node_addr: iroh_net::NodeAddr,
auth_token: String,
) -> Result<()> {
let relay_mode = RelayMode::Disabled;
let endpoint = Endpoint::builder().relay_mode(relay_mode).bind().await?;
let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
info!(context, "Sending backup authentication token.");
send_stream.write_all(auth_token.as_bytes()).await?;
let passphrase = String::new();
info!(context, "Starting to read backup from the stream.");
let mut file_size_buf = [0u8; 8];
recv_stream.read_exact(&mut file_size_buf).await?;
let file_size = u64::from_be_bytes(file_size_buf);
info!(context, "Received backup file size.");
context.emit_event(EventType::ImexProgress(1));
import_backup_stream(context, recv_stream, file_size, passphrase)
.await
.context("Failed to import backup from QUIC stream")?;
info!(context, "Finished importing backup from the stream.");
context.emit_event(EventType::ImexProgress(1000));
send_stream.write_all(b".").await.ok();
send_stream.finish().ok();
info!(context, "Sent backup reception acknowledgment.");
_ = send_stream.stopped().await;
Ok(())
}
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
match qr {
Qr::Backup2 {
node_addr,
auth_token,
} => {
let cancel_token = context.alloc_ongoing().await?;
let res = get_backup2(context, node_addr, auth_token)
.race(async {
cancel_token.recv().await.ok();
Err(format_err!("Backup reception cancelled"))
})
.await;
if res.is_err() {
context.emit_event(EventType::ImexProgress(0));
}
context.free_ongoing().await;
res?;
}
_ => bail!("QR code for backup must be of type DCBACKUP2"),
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::chat::{get_chat_msgs, send_msg, ChatItem};
use crate::message::Viewtype;
use crate::test_utils::TestContextManager;
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_send_receive() {
let mut tcm = TestContextManager::new();
let ctx0 = tcm.alice().await;
let self_chat = ctx0.get_self_chat().await;
let mut msg = Message::new_text("hi there".to_string());
send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
let file = ctx0.get_blobdir().join("hello.txt");
fs::write(&file, "i am attachment").await.unwrap();
let mut msg = Message::new(Viewtype::File);
msg.set_file(file.to_str().unwrap(), Some("text/plain"));
send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
let provider = BackupProvider::prepare(&ctx0).await.unwrap();
let ctx1 = tcm.unconfigured().await;
get_backup(&ctx1, provider.qr()).await.unwrap();
tokio::time::timeout(Duration::from_secs(30), provider)
.await
.expect("timed out")
.expect("error in provider");
let self_chat = ctx1.get_self_chat().await;
let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap();
assert_eq!(msgs.len(), 2);
let msgid = match msgs.first().unwrap() {
ChatItem::Message { msg_id } => msg_id,
_ => panic!("wrong chat item"),
};
let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
let text = msg.get_text();
assert_eq!(text, "hi there");
let msgid = match msgs.get(1).unwrap() {
ChatItem::Message { msg_id } => msg_id,
_ => panic!("wrong chat item"),
};
let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
let path = msg.get_file(&ctx1).unwrap();
assert_eq!(path.with_file_name("hello.txt"), path);
let text = fs::read_to_string(&path).await.unwrap();
assert_eq!(text, "i am attachment");
let path = path.with_file_name("saved.txt");
msg.save_file(&ctx1, &path).await.unwrap();
let text = fs::read_to_string(&path).await.unwrap();
assert_eq!(text, "i am attachment");
assert!(msg.save_file(&ctx1, &path).await.is_err());
for ctx in [&ctx0, &ctx1] {
ctx.evtracker
.get_matching(|ev| matches!(ev, EventType::ImexProgress(1)))
.await;
ctx.evtracker
.get_matching(|ev| matches!(ev, EventType::ImexProgress(1000)))
.await;
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_drop_provider() {
let mut tcm = TestContextManager::new();
let ctx = tcm.alice().await;
let provider = BackupProvider::prepare(&ctx).await.unwrap();
drop(provider);
ctx.evtracker
.get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
.await;
}
}