deltachat/
scheduler.rs

1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::Config;
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::smtp::{Smtp, send_smtp_messages};
24use crate::sql;
25use crate::stats::maybe_send_stats;
26use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
27use crate::transport::ConfiguredLoginParam;
28use crate::{constants, stats};
29
30pub(crate) mod connectivity;
31
32/// State of the IO scheduler, as stored on the [`Context`].
33///
34/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
35/// the IO scheduler will be restarted only if it was running before paused or
36/// [`Context::start_io`] was called in the meantime while it was paused.
37#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39    inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43    pub(crate) fn new() -> Self {
44        Default::default()
45    }
46
47    /// Whether the scheduler is currently running.
48    pub(crate) async fn is_running(&self) -> bool {
49        let inner = self.inner.read().await;
50        matches!(*inner, InnerSchedulerState::Started(_))
51    }
52
53    /// Starts the scheduler if it is not yet started.
54    pub(crate) async fn start(&self, context: &Context) {
55        let mut inner = self.inner.write().await;
56        match *inner {
57            InnerSchedulerState::Started(_) => (),
58            InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
59            InnerSchedulerState::Paused {
60                ref mut started, ..
61            } => *started = true,
62        }
63        context.update_connectivities(&inner);
64    }
65
66    /// Starts the scheduler if it is not yet started.
67    async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
68        info!(context, "starting IO");
69
70        // Notify message processing loop
71        // to allow processing old messages after restart.
72        context.new_msgs_notify.notify_one();
73
74        match Scheduler::start(context).await {
75            Ok(scheduler) => {
76                *inner = InnerSchedulerState::Started(scheduler);
77                context.emit_event(EventType::ConnectivityChanged);
78            }
79            Err(err) => error!(context, "Failed to start IO: {:#}", err),
80        }
81    }
82
83    /// Stops the scheduler if it is currently running.
84    pub(crate) async fn stop(&self, context: &Context) {
85        let mut inner = self.inner.write().await;
86        match *inner {
87            InnerSchedulerState::Started(_) => {
88                Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
89            }
90            InnerSchedulerState::Stopped => (),
91            InnerSchedulerState::Paused {
92                ref mut started, ..
93            } => *started = false,
94        }
95        context.update_connectivities(&inner);
96    }
97
98    /// Stops the scheduler if it is currently running.
99    async fn do_stop(
100        inner: &mut InnerSchedulerState,
101        context: &Context,
102        new_state: InnerSchedulerState,
103    ) {
104        // Sending an event wakes up event pollers (get_next_event)
105        // so the caller of stop_io() can arrange for proper termination.
106        // For this, the caller needs to instruct the event poller
107        // to terminate on receiving the next event and then call stop_io()
108        // which will emit the below event(s)
109        info!(context, "stopping IO");
110
111        // Wake up message processing loop even if there are no messages
112        // to allow for clean shutdown.
113        context.new_msgs_notify.notify_one();
114
115        let debug_logging = context
116            .debug_logging
117            .write()
118            .expect("RwLock is poisoned")
119            .take();
120        if let Some(debug_logging) = debug_logging {
121            debug_logging.loop_handle.abort();
122            debug_logging.loop_handle.await.ok();
123        }
124        let prev_state = std::mem::replace(inner, new_state);
125        context.emit_event(EventType::ConnectivityChanged);
126        match prev_state {
127            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
128            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
129        }
130    }
131
132    /// Pauses the IO scheduler.
133    ///
134    /// If it is currently running the scheduler will be stopped.  When the
135    /// [`IoPausedGuard`] is dropped the scheduler is started again.
136    ///
137    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
138    /// resume will do the right thing and restore the scheduler to the state requested by
139    /// the last call.
140    pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
141        {
142            let mut inner = self.inner.write().await;
143            match *inner {
144                InnerSchedulerState::Started(_) => {
145                    let new_state = InnerSchedulerState::Paused {
146                        started: true,
147                        pause_guards_count: NonZeroUsize::MIN,
148                    };
149                    Self::do_stop(&mut inner, context, new_state).await;
150                }
151                InnerSchedulerState::Stopped => {
152                    *inner = InnerSchedulerState::Paused {
153                        started: false,
154                        pause_guards_count: NonZeroUsize::MIN,
155                    };
156                }
157                InnerSchedulerState::Paused {
158                    ref mut pause_guards_count,
159                    ..
160                } => {
161                    *pause_guards_count = pause_guards_count
162                        .checked_add(1)
163                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
164                }
165            }
166            context.update_connectivities(&inner);
167        }
168
169        let (tx, rx) = oneshot::channel();
170        let context = context.clone();
171        tokio::spawn(async move {
172            rx.await.ok();
173            let mut inner = context.scheduler.inner.write().await;
174            match *inner {
175                InnerSchedulerState::Started(_) => {
176                    warn!(&context, "IoPausedGuard resume: started instead of paused");
177                }
178                InnerSchedulerState::Stopped => {
179                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
180                }
181                InnerSchedulerState::Paused {
182                    ref started,
183                    ref mut pause_guards_count,
184                } => {
185                    if *pause_guards_count == NonZeroUsize::MIN {
186                        match *started {
187                            true => SchedulerState::do_start(&mut inner, &context).await,
188                            false => *inner = InnerSchedulerState::Stopped,
189                        }
190                    } else {
191                        let new_count = pause_guards_count.get() - 1;
192                        // SAFETY: Value was >=2 before due to if condition
193                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
194                    }
195                }
196            }
197            context.update_connectivities(&inner);
198        });
199        Ok(IoPausedGuard { sender: Some(tx) })
200    }
201
202    /// Restarts the scheduler, only if it is running.
203    pub(crate) async fn restart(&self, context: &Context) {
204        info!(context, "restarting IO");
205        if self.is_running().await {
206            self.stop(context).await;
207            self.start(context).await;
208        }
209    }
210
211    /// Indicate that the network likely has come back.
212    pub(crate) async fn maybe_network(&self) {
213        let inner = self.inner.read().await;
214        let inboxes = match *inner {
215            InnerSchedulerState::Started(ref scheduler) => {
216                scheduler.maybe_network();
217                scheduler
218                    .inboxes
219                    .iter()
220                    .map(|b| b.conn_state.state.connectivity.clone())
221                    .collect::<Vec<_>>()
222            }
223            _ => return,
224        };
225        drop(inner);
226        connectivity::idle_interrupted(inboxes);
227    }
228
229    /// Indicate that the network likely is lost.
230    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
231        let inner = self.inner.read().await;
232        let stores = match *inner {
233            InnerSchedulerState::Started(ref scheduler) => {
234                scheduler.maybe_network_lost();
235                scheduler
236                    .boxes()
237                    .map(|b| b.conn_state.state.connectivity.clone())
238                    .collect()
239            }
240            _ => return,
241        };
242        drop(inner);
243        connectivity::maybe_network_lost(context, stores);
244    }
245
246    pub(crate) async fn interrupt_inbox(&self) {
247        let inner = self.inner.read().await;
248        if let InnerSchedulerState::Started(ref scheduler) = *inner {
249            scheduler.interrupt_inbox();
250        }
251    }
252
253    pub(crate) async fn clear_all_relay_storage(&self) -> Result<()> {
254        let inner = self.inner.read().await;
255        if let InnerSchedulerState::Started(ref scheduler) = *inner {
256            scheduler.clear_all_relay_storage();
257            Ok(())
258        } else {
259            bail!("IO is not started");
260        }
261    }
262
263    pub(crate) async fn interrupt_smtp(&self) {
264        let inner = self.inner.read().await;
265        if let InnerSchedulerState::Started(ref scheduler) = *inner {
266            scheduler.interrupt_smtp();
267        }
268    }
269
270    pub(crate) async fn interrupt_ephemeral_task(&self) {
271        let inner = self.inner.read().await;
272        if let InnerSchedulerState::Started(ref scheduler) = *inner {
273            scheduler.interrupt_ephemeral_task();
274        }
275    }
276
277    pub(crate) async fn interrupt_location(&self) {
278        let inner = self.inner.read().await;
279        if let InnerSchedulerState::Started(ref scheduler) = *inner {
280            scheduler.interrupt_location();
281        }
282    }
283
284    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
285        let inner = self.inner.read().await;
286        if let InnerSchedulerState::Started(ref scheduler) = *inner {
287            scheduler.interrupt_recently_seen(contact_id, timestamp);
288        }
289    }
290}
291
292#[derive(Debug, Default)]
293pub(crate) enum InnerSchedulerState {
294    Started(Scheduler),
295    #[default]
296    Stopped,
297    Paused {
298        started: bool,
299        pause_guards_count: NonZeroUsize,
300    },
301}
302
303/// Guard to make sure the IO Scheduler is resumed.
304///
305/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
306/// guard.
307#[derive(Default, Debug)]
308pub(crate) struct IoPausedGuard {
309    sender: Option<oneshot::Sender<()>>,
310}
311
312impl Drop for IoPausedGuard {
313    fn drop(&mut self) {
314        if let Some(sender) = self.sender.take() {
315            // Can only fail if receiver is dropped, but then we're already resumed.
316            sender.send(()).ok();
317        }
318    }
319}
320
321#[derive(Debug)]
322struct SchedBox {
323    /// Address at the used chatmail/email relay
324    addr: String,
325
326    /// Folder name
327    folder: String,
328
329    conn_state: ImapConnectionState,
330
331    /// IMAP loop task handle.
332    handle: task::JoinHandle<()>,
333}
334
335/// Job and connection scheduler.
336#[derive(Debug)]
337pub(crate) struct Scheduler {
338    /// Inboxes, one per transport.
339    inboxes: Vec<SchedBox>,
340    smtp: SmtpConnectionState,
341    smtp_handle: task::JoinHandle<()>,
342    ephemeral_handle: task::JoinHandle<()>,
343    ephemeral_interrupt_send: Sender<()>,
344    location_handle: task::JoinHandle<()>,
345    location_interrupt_send: Sender<()>,
346
347    recently_seen_loop: RecentlySeenLoop,
348}
349
350async fn inbox_loop(
351    ctx: Context,
352    started: oneshot::Sender<()>,
353    inbox_handlers: ImapConnectionHandlers,
354) {
355    use futures::future::FutureExt;
356
357    info!(ctx, "Starting inbox loop.");
358    let ImapConnectionHandlers {
359        mut connection,
360        stop_token,
361        clear_storage_request_receiver,
362    } = inbox_handlers;
363
364    let transport_id = connection.transport_id();
365    let ctx1 = ctx.clone();
366    let fut = async move {
367        let ctx = ctx1;
368        if let Err(()) = started.send(()) {
369            warn!(ctx, "Inbox loop, missing started receiver.");
370            return;
371        };
372
373        let mut old_session: Option<Session> = None;
374        loop {
375            let session = if let Some(session) = old_session.take() {
376                session
377            } else {
378                info!(
379                    ctx,
380                    "Transport {transport_id}: Preparing new IMAP session for inbox."
381                );
382                match connection.prepare(&ctx).await {
383                    Err(err) => {
384                        warn!(
385                            ctx,
386                            "Transport {transport_id}: Failed to prepare inbox connection: {err:#}."
387                        );
388                        continue;
389                    }
390                    Ok(session) => {
391                        info!(
392                            ctx,
393                            "Transport {transport_id}: Prepared new IMAP session for inbox."
394                        );
395                        session
396                    }
397                }
398            };
399
400            match inbox_fetch_idle(
401                &ctx,
402                &mut connection,
403                session,
404                &clear_storage_request_receiver,
405            )
406            .await
407            {
408                Err(err) => warn!(
409                    ctx,
410                    "Transport {transport_id}: Failed inbox fetch_idle: {err:#}."
411                ),
412                Ok(session) => {
413                    old_session = Some(session);
414                }
415            }
416        }
417    };
418
419    stop_token
420        .cancelled()
421        .map(|_| {
422            info!(ctx, "Transport {transport_id}: Shutting down inbox loop.");
423        })
424        .race(fut)
425        .await;
426}
427
428async fn inbox_fetch_idle(
429    ctx: &Context,
430    imap: &mut Imap,
431    mut session: Session,
432    clear_storage_request_receiver: &Receiver<()>,
433) -> Result<Session> {
434    let transport_id = session.transport_id();
435
436    // Clear IMAP storage on request.
437    //
438    // Only doing this for chatmail relays to avoid
439    // accidentally deleting all emails in a shared mailbox.
440    let should_clear_imap_storage =
441        clear_storage_request_receiver.try_recv().is_ok() && session.is_chatmail();
442    if should_clear_imap_storage {
443        info!(ctx, "Transport {transport_id}: Clearing IMAP storage.");
444        session.delete_all_messages(ctx, &imap.folder).await?;
445    }
446
447    // Update quota no more than once a minute.
448    //
449    // Always update if we just cleared IMAP storage.
450    if (ctx.quota_needs_update(session.transport_id(), 60).await || should_clear_imap_storage)
451        && let Err(err) = ctx.update_recent_quota(&mut session, &imap.folder).await
452    {
453        warn!(
454            ctx,
455            "Transport {transport_id}: Failed to update quota: {err:#}."
456        );
457    }
458
459    if let Ok(()) = imap.resync_request_receiver.try_recv()
460        && let Err(err) = session.resync_folders(ctx).await
461    {
462        warn!(
463            ctx,
464            "Transport {transport_id}: Failed to resync folders: {err:#}."
465        );
466        imap.resync_request_sender.try_send(()).ok();
467    }
468
469    maybe_add_time_based_warnings(ctx).await;
470
471    match ctx.get_config_i64(Config::LastHousekeeping).await {
472        Ok(last_housekeeping_time) => {
473            let next_housekeeping_time =
474                last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
475            if next_housekeeping_time <= time() {
476                sql::housekeeping(ctx).await.log_err(ctx).ok();
477            }
478        }
479        Err(err) => {
480            warn!(
481                ctx,
482                "Transport {transport_id}: Failed to get last housekeeping time: {err:#}"
483            );
484        }
485    };
486
487    maybe_send_stats(ctx).await.log_err(ctx).ok();
488
489    session
490        .update_metadata(ctx)
491        .await
492        .context("update_metadata")?;
493    session
494        .register_token(ctx)
495        .await
496        .context("Failed to register push token")?;
497
498    let session = fetch_idle(ctx, imap, session).await?;
499    Ok(session)
500}
501
502/// Implement a single iteration of IMAP loop.
503///
504/// This function performs all IMAP operations on a single folder, selecting it if necessary and
505/// handling all the errors. In case of an error, an error is returned and connection is dropped,
506/// otherwise connection is returned.
507async fn fetch_idle(ctx: &Context, connection: &mut Imap, mut session: Session) -> Result<Session> {
508    let transport_id = session.transport_id();
509
510    let watch_folder = connection.folder.clone();
511
512    session
513        .store_seen_flags_on_imap(ctx)
514        .await
515        .context("store_seen_flags_on_imap")?;
516
517    // Fetch the watched folder.
518    connection
519        .fetch_move_delete(ctx, &mut session, &watch_folder)
520        .await
521        .context("fetch_move_delete")?;
522
523    // Mark expired messages for deletion. Marked messages will be deleted from the server
524    // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
525    // called right before `fetch_move_delete` because it is not well optimized and would
526    // otherwise slow down message fetching.
527    delete_expired_imap_messages(ctx)
528        .await
529        .context("delete_expired_imap_messages")?;
530
531    download_known_post_messages_without_pre_message(ctx, &mut session).await?;
532    download_msgs(ctx, &mut session)
533        .await
534        .context("download_msgs")?;
535
536    // Synchronize Seen flags.
537    session
538        .sync_seen_flags(ctx, &watch_folder)
539        .await
540        .context("sync_seen_flags")
541        .log_err(ctx)
542        .ok();
543
544    connection.connectivity.set_idle(ctx);
545
546    ctx.emit_event(EventType::ImapInboxIdle);
547
548    if !session.can_idle() {
549        info!(
550            ctx,
551            "Transport {transport_id}: IMAP session does not support IDLE, going to fake idle."
552        );
553        connection.fake_idle(ctx, &watch_folder).await?;
554        return Ok(session);
555    }
556
557    if ctx
558        .get_config_bool(Config::DisableIdle)
559        .await
560        .context("Failed to get disable_idle config")
561        .log_err(ctx)
562        .unwrap_or_default()
563    {
564        info!(
565            ctx,
566            "Transport {transport_id}: IMAP IDLE is disabled, going to fake idle."
567        );
568        connection.fake_idle(ctx, &watch_folder).await?;
569        return Ok(session);
570    }
571
572    let session = session
573        .idle(
574            ctx,
575            connection.idle_interrupt_receiver.clone(),
576            &watch_folder,
577        )
578        .await
579        .context("idle")?;
580
581    Ok(session)
582}
583
584async fn smtp_loop(
585    ctx: Context,
586    started: oneshot::Sender<()>,
587    smtp_handlers: SmtpConnectionHandlers,
588) {
589    use futures::future::FutureExt;
590
591    info!(ctx, "Starting SMTP loop.");
592    let SmtpConnectionHandlers {
593        mut connection,
594        stop_token,
595        idle_interrupt_receiver,
596    } = smtp_handlers;
597
598    let ctx1 = ctx.clone();
599    let fut = async move {
600        let ctx = ctx1;
601        if let Err(()) = started.send(()) {
602            warn!(&ctx, "SMTP loop, missing started receiver.");
603            return;
604        }
605
606        let mut timeout = None;
607        loop {
608            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
609                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
610                timeout = Some(timeout.unwrap_or(30));
611            } else {
612                timeout = None;
613                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
614                if !duration_until_can_send.is_zero() {
615                    info!(
616                        ctx,
617                        "smtp got rate limited, waiting for {} until can send again",
618                        duration_to_str(duration_until_can_send)
619                    );
620                    tokio::time::sleep(duration_until_can_send).await;
621                    continue;
622                }
623            }
624
625            stats::maybe_update_message_stats(&ctx)
626                .await
627                .log_err(&ctx)
628                .ok();
629
630            // Fake Idle
631            info!(ctx, "SMTP fake idle started.");
632            match &connection.last_send_error {
633                None => connection.connectivity.set_idle(&ctx),
634                Some(err) => connection.connectivity.set_err(&ctx, err),
635            }
636
637            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
638            // sending is retried (at the latest) after the timeout. If sending fails
639            // again, we increase the timeout exponentially, in order not to do lots of
640            // unnecessary retries.
641            if let Some(t) = timeout {
642                let now = tools::Time::now();
643                info!(
644                    ctx,
645                    "SMTP has messages to retry, planning to retry {t} seconds later."
646                );
647                let duration = std::time::Duration::from_secs(t);
648                tokio::time::timeout(duration, async {
649                    idle_interrupt_receiver.recv().await.unwrap_or_default()
650                })
651                .await
652                .unwrap_or_default();
653                let slept = time_elapsed(&now).as_secs();
654                timeout = Some(cmp::max(
655                    t,
656                    slept.saturating_add(rand::random_range((slept / 2)..=slept)),
657                ));
658            } else {
659                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
660                idle_interrupt_receiver.recv().await.unwrap_or_default();
661            };
662
663            info!(ctx, "SMTP fake idle interrupted.")
664        }
665    };
666
667    stop_token
668        .cancelled()
669        .map(|_| {
670            info!(ctx, "Shutting down SMTP loop.");
671        })
672        .race(fut)
673        .await;
674}
675
676impl Scheduler {
677    /// Start the scheduler.
678    pub async fn start(ctx: &Context) -> Result<Self> {
679        let (smtp, smtp_handlers) = SmtpConnectionState::new();
680
681        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
682        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
683        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
684
685        let mut inboxes = Vec::new();
686        let mut start_recvs = Vec::new();
687
688        for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
689            let (conn_state, inbox_handlers) =
690                ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
691            let (inbox_start_send, inbox_start_recv) = oneshot::channel();
692            let handle = {
693                let ctx = ctx.clone();
694                task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
695            };
696            let addr = configured_login_param.addr.clone();
697            let folder = configured_login_param
698                .imap_folder
699                .unwrap_or_else(|| "INBOX".to_string());
700            let inbox = SchedBox {
701                addr: addr.clone(),
702                folder,
703                conn_state,
704                handle,
705            };
706            inboxes.push(inbox);
707            start_recvs.push(inbox_start_recv);
708        }
709
710        let smtp_handle = {
711            let ctx = ctx.clone();
712            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
713        };
714        start_recvs.push(smtp_start_recv);
715
716        let ephemeral_handle = {
717            let ctx = ctx.clone();
718            task::spawn(async move {
719                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
720            })
721        };
722
723        let location_handle = {
724            let ctx = ctx.clone();
725            task::spawn(async move {
726                location::location_loop(&ctx, location_interrupt_recv).await;
727            })
728        };
729
730        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
731
732        let res = Self {
733            inboxes,
734            smtp,
735            smtp_handle,
736            ephemeral_handle,
737            ephemeral_interrupt_send,
738            location_handle,
739            location_interrupt_send,
740            recently_seen_loop,
741        };
742
743        // wait for all loops to be started
744        if let Err(err) = try_join_all(start_recvs).await {
745            bail!("failed to start scheduler: {err}");
746        }
747
748        info!(ctx, "scheduler is running");
749        Ok(res)
750    }
751
752    fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
753        self.inboxes.iter()
754    }
755
756    fn maybe_network(&self) {
757        for b in self.boxes() {
758            b.conn_state.interrupt();
759        }
760        self.interrupt_smtp();
761    }
762
763    fn maybe_network_lost(&self) {
764        for b in self.boxes() {
765            b.conn_state.interrupt();
766        }
767        self.interrupt_smtp();
768    }
769
770    fn interrupt_inbox(&self) {
771        for b in &self.inboxes {
772            b.conn_state.interrupt();
773        }
774    }
775
776    fn clear_all_relay_storage(&self) {
777        for b in &self.inboxes {
778            b.conn_state.clear_relay_storage();
779        }
780    }
781
782    fn interrupt_smtp(&self) {
783        self.smtp.interrupt();
784    }
785
786    fn interrupt_ephemeral_task(&self) {
787        self.ephemeral_interrupt_send.try_send(()).ok();
788    }
789
790    fn interrupt_location(&self) {
791        self.location_interrupt_send.try_send(()).ok();
792    }
793
794    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
795        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
796    }
797
798    /// Halt the scheduler.
799    ///
800    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
801    /// are forcefully terminated if they cannot shutdown within the timeout.
802    pub(crate) async fn stop(self, context: &Context) {
803        // Send stop signals to tasks so they can shutdown cleanly.
804        for b in self.boxes() {
805            b.conn_state.stop();
806        }
807        self.smtp.stop();
808
809        // Actually shutdown tasks.
810        let timeout_duration = std::time::Duration::from_secs(30);
811
812        let tracker = TaskTracker::new();
813        for b in self.inboxes {
814            let context = context.clone();
815            tracker.spawn(async move {
816                tokio::time::timeout(timeout_duration, b.handle)
817                    .await
818                    .log_err(&context)
819            });
820        }
821        {
822            let context = context.clone();
823            tracker.spawn(async move {
824                tokio::time::timeout(timeout_duration, self.smtp_handle)
825                    .await
826                    .log_err(&context)
827            });
828        }
829        tracker.close();
830        tracker.wait().await;
831
832        // Abort tasks, then await them to ensure the `Future` is dropped.
833        // Just aborting the task may keep resources such as `Context` clone
834        // moved into it indefinitely, resulting in database not being
835        // closed etc.
836        self.ephemeral_handle.abort();
837        self.ephemeral_handle.await.ok();
838        self.location_handle.abort();
839        self.location_handle.await.ok();
840        self.recently_seen_loop.abort().await;
841    }
842}
843
844/// Connection state logic shared between imap and smtp connections.
845#[derive(Debug)]
846struct ConnectionState {
847    /// Cancellation token to interrupt the whole connection.
848    stop_token: CancellationToken,
849    /// Channel to interrupt idle.
850    idle_interrupt_sender: Sender<()>,
851    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
852    connectivity: ConnectivityStore,
853}
854
855impl ConnectionState {
856    /// Shutdown this connection completely.
857    fn stop(&self) {
858        // Trigger shutdown of the run loop.
859        self.stop_token.cancel();
860    }
861
862    fn interrupt(&self) {
863        // Use try_send to avoid blocking on interrupts.
864        self.idle_interrupt_sender.try_send(()).ok();
865    }
866}
867
868#[derive(Debug)]
869pub(crate) struct SmtpConnectionState {
870    state: ConnectionState,
871}
872
873impl SmtpConnectionState {
874    fn new() -> (Self, SmtpConnectionHandlers) {
875        let stop_token = CancellationToken::new();
876        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
877
878        let handlers = SmtpConnectionHandlers {
879            connection: Smtp::new(),
880            stop_token: stop_token.clone(),
881            idle_interrupt_receiver,
882        };
883
884        let state = ConnectionState {
885            stop_token,
886            idle_interrupt_sender,
887            connectivity: handlers.connection.connectivity.clone(),
888        };
889
890        let conn = SmtpConnectionState { state };
891
892        (conn, handlers)
893    }
894
895    /// Interrupt any form of idle.
896    fn interrupt(&self) {
897        self.state.interrupt();
898    }
899
900    /// Shutdown this connection completely.
901    fn stop(&self) {
902        self.state.stop();
903    }
904}
905
906struct SmtpConnectionHandlers {
907    connection: Smtp,
908    stop_token: CancellationToken,
909    idle_interrupt_receiver: Receiver<()>,
910}
911
912#[derive(Debug)]
913pub(crate) struct ImapConnectionState {
914    state: ConnectionState,
915
916    /// Channel to request clearing the folder.
917    ///
918    /// IMAP loop receiving this should clear the folder
919    /// on the next iteration if IMAP server is a chatmail relay
920    /// and otherwise ignore the request.
921    clear_storage_request_sender: Sender<()>,
922}
923
924impl ImapConnectionState {
925    /// Construct a new connection.
926    async fn new(
927        context: &Context,
928        transport_id: u32,
929        login_param: ConfiguredLoginParam,
930    ) -> Result<(Self, ImapConnectionHandlers)> {
931        let stop_token = CancellationToken::new();
932        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
933        let (clear_storage_request_sender, clear_storage_request_receiver) = channel::bounded(1);
934
935        let handlers = ImapConnectionHandlers {
936            connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
937                .await?,
938            stop_token: stop_token.clone(),
939            clear_storage_request_receiver,
940        };
941
942        let state = ConnectionState {
943            stop_token,
944            idle_interrupt_sender,
945            connectivity: handlers.connection.connectivity.clone(),
946        };
947
948        let conn = ImapConnectionState {
949            state,
950            clear_storage_request_sender,
951        };
952
953        Ok((conn, handlers))
954    }
955
956    /// Interrupt any form of idle.
957    fn interrupt(&self) {
958        self.state.interrupt();
959    }
960
961    /// Shutdown this connection completely.
962    fn stop(&self) {
963        self.state.stop();
964    }
965
966    /// Requests clearing relay storage and interrupts the inbox.
967    fn clear_relay_storage(&self) {
968        self.clear_storage_request_sender.try_send(()).ok();
969        self.state.interrupt();
970    }
971}
972
973#[derive(Debug)]
974struct ImapConnectionHandlers {
975    connection: Imap,
976    stop_token: CancellationToken,
977
978    /// Channel receiver to get requests to clear IMAP storage.
979    pub(crate) clear_storage_request_receiver: Receiver<()>,
980}