Skip to main content

deltachat/
scheduler.rs

1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::Config;
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
18use crate::ephemeral;
19use crate::events::EventType;
20use crate::imap::{Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::smtp::{Smtp, send_smtp_messages};
24use crate::sql;
25use crate::stats::maybe_send_stats;
26use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
27use crate::transport::ConfiguredLoginParam;
28use crate::{constants, stats};
29
30pub(crate) mod connectivity;
31
32/// State of the IO scheduler, as stored on the [`Context`].
33///
34/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
35/// the IO scheduler will be restarted only if it was running before paused or
36/// [`Context::start_io`] was called in the meantime while it was paused.
37#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39    inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43    pub(crate) fn new() -> Self {
44        Default::default()
45    }
46
47    /// Whether the scheduler is currently running.
48    pub(crate) async fn is_running(&self) -> bool {
49        let inner = self.inner.read().await;
50        matches!(*inner, InnerSchedulerState::Started(_))
51    }
52
53    /// Starts the scheduler if it is not yet started.
54    pub(crate) async fn start(&self, context: &Context) {
55        let mut inner = self.inner.write().await;
56        match *inner {
57            InnerSchedulerState::Started(_) => (),
58            InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
59            InnerSchedulerState::Paused {
60                ref mut started, ..
61            } => *started = true,
62        }
63        context.update_connectivities(&inner);
64    }
65
66    /// Starts the scheduler if it is not yet started.
67    async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
68        info!(context, "starting IO");
69
70        // Notify message processing loop
71        // to allow processing old messages after restart.
72        context.new_msgs_notify.notify_one();
73
74        match Scheduler::start(context).await {
75            Ok(scheduler) => {
76                *inner = InnerSchedulerState::Started(scheduler);
77                context.emit_event(EventType::ConnectivityChanged);
78            }
79            Err(err) => error!(context, "Failed to start IO: {:#}", err),
80        }
81    }
82
83    /// Stops the scheduler if it is currently running.
84    pub(crate) async fn stop(&self, context: &Context) {
85        let mut inner = self.inner.write().await;
86        match *inner {
87            InnerSchedulerState::Started(_) => {
88                Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
89            }
90            InnerSchedulerState::Stopped => (),
91            InnerSchedulerState::Paused {
92                ref mut started, ..
93            } => *started = false,
94        }
95        context.update_connectivities(&inner);
96    }
97
98    /// Stops the scheduler if it is currently running.
99    async fn do_stop(
100        inner: &mut InnerSchedulerState,
101        context: &Context,
102        new_state: InnerSchedulerState,
103    ) {
104        // Sending an event wakes up event pollers (get_next_event)
105        // so the caller of stop_io() can arrange for proper termination.
106        // For this, the caller needs to instruct the event poller
107        // to terminate on receiving the next event and then call stop_io()
108        // which will emit the below event(s)
109        info!(context, "stopping IO");
110
111        // Wake up message processing loop even if there are no messages
112        // to allow for clean shutdown.
113        context.new_msgs_notify.notify_one();
114
115        let debug_logging = context
116            .debug_logging
117            .write()
118            .expect("RwLock is poisoned")
119            .take();
120        if let Some(debug_logging) = debug_logging {
121            debug_logging.loop_handle.abort();
122            debug_logging.loop_handle.await.ok();
123        }
124        let prev_state = std::mem::replace(inner, new_state);
125        context.emit_event(EventType::ConnectivityChanged);
126        match prev_state {
127            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
128            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
129        }
130    }
131
132    /// Pauses the IO scheduler.
133    ///
134    /// If it is currently running the scheduler will be stopped.  When the
135    /// [`IoPausedGuard`] is dropped the scheduler is started again.
136    ///
137    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
138    /// resume will do the right thing and restore the scheduler to the state requested by
139    /// the last call.
140    pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
141        {
142            let mut inner = self.inner.write().await;
143            match *inner {
144                InnerSchedulerState::Started(_) => {
145                    let new_state = InnerSchedulerState::Paused {
146                        started: true,
147                        pause_guards_count: NonZeroUsize::MIN,
148                    };
149                    Self::do_stop(&mut inner, context, new_state).await;
150                }
151                InnerSchedulerState::Stopped => {
152                    *inner = InnerSchedulerState::Paused {
153                        started: false,
154                        pause_guards_count: NonZeroUsize::MIN,
155                    };
156                }
157                InnerSchedulerState::Paused {
158                    ref mut pause_guards_count,
159                    ..
160                } => {
161                    *pause_guards_count = pause_guards_count
162                        .checked_add(1)
163                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
164                }
165            }
166            context.update_connectivities(&inner);
167        }
168
169        let (tx, rx) = oneshot::channel();
170        let context = context.clone();
171        tokio::spawn(async move {
172            rx.await.ok();
173            let mut inner = context.scheduler.inner.write().await;
174            match *inner {
175                InnerSchedulerState::Started(_) => {
176                    warn!(&context, "IoPausedGuard resume: started instead of paused");
177                }
178                InnerSchedulerState::Stopped => {
179                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
180                }
181                InnerSchedulerState::Paused {
182                    ref started,
183                    ref mut pause_guards_count,
184                } => {
185                    if *pause_guards_count == NonZeroUsize::MIN {
186                        match *started {
187                            true => SchedulerState::do_start(&mut inner, &context).await,
188                            false => *inner = InnerSchedulerState::Stopped,
189                        }
190                    } else {
191                        let new_count = pause_guards_count.get() - 1;
192                        // SAFETY: Value was >=2 before due to if condition
193                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
194                    }
195                }
196            }
197            context.update_connectivities(&inner);
198        });
199        Ok(IoPausedGuard { sender: Some(tx) })
200    }
201
202    /// Restarts the scheduler, only if it is running.
203    pub(crate) async fn restart(&self, context: &Context) {
204        info!(context, "restarting IO");
205        if self.is_running().await {
206            self.stop(context).await;
207            self.start(context).await;
208        }
209    }
210
211    /// Indicate that the network likely has come back.
212    pub(crate) async fn maybe_network(&self) {
213        let inner = self.inner.read().await;
214        let inboxes = match *inner {
215            InnerSchedulerState::Started(ref scheduler) => {
216                scheduler.maybe_network();
217                scheduler
218                    .inboxes
219                    .iter()
220                    .map(|b| b.conn_state.state.connectivity.clone())
221                    .collect::<Vec<_>>()
222            }
223            _ => return,
224        };
225        drop(inner);
226        connectivity::idle_interrupted(inboxes);
227    }
228
229    /// Indicate that the network likely is lost.
230    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
231        let inner = self.inner.read().await;
232        let stores = match *inner {
233            InnerSchedulerState::Started(ref scheduler) => {
234                scheduler.maybe_network_lost();
235                scheduler
236                    .boxes()
237                    .map(|b| b.conn_state.state.connectivity.clone())
238                    .collect()
239            }
240            _ => return,
241        };
242        drop(inner);
243        connectivity::maybe_network_lost(context, stores);
244    }
245
246    pub(crate) async fn interrupt_inbox(&self) {
247        let inner = self.inner.read().await;
248        if let InnerSchedulerState::Started(ref scheduler) = *inner {
249            scheduler.interrupt_inbox();
250        }
251    }
252
253    pub(crate) async fn interrupt_smtp(&self) {
254        let inner = self.inner.read().await;
255        if let InnerSchedulerState::Started(ref scheduler) = *inner {
256            scheduler.interrupt_smtp();
257        }
258    }
259
260    pub(crate) async fn interrupt_ephemeral_task(&self) {
261        let inner = self.inner.read().await;
262        if let InnerSchedulerState::Started(ref scheduler) = *inner {
263            scheduler.interrupt_ephemeral_task();
264        }
265    }
266
267    pub(crate) async fn interrupt_location(&self) {
268        let inner = self.inner.read().await;
269        if let InnerSchedulerState::Started(ref scheduler) = *inner {
270            scheduler.interrupt_location();
271        }
272    }
273
274    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
275        let inner = self.inner.read().await;
276        if let InnerSchedulerState::Started(ref scheduler) = *inner {
277            scheduler.interrupt_recently_seen(contact_id, timestamp);
278        }
279    }
280}
281
282#[derive(Debug, Default)]
283pub(crate) enum InnerSchedulerState {
284    Started(Scheduler),
285    #[default]
286    Stopped,
287    Paused {
288        started: bool,
289        pause_guards_count: NonZeroUsize,
290    },
291}
292
293/// Guard to make sure the IO Scheduler is resumed.
294///
295/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
296/// guard.
297#[derive(Default, Debug)]
298pub(crate) struct IoPausedGuard {
299    sender: Option<oneshot::Sender<()>>,
300}
301
302impl Drop for IoPausedGuard {
303    fn drop(&mut self) {
304        if let Some(sender) = self.sender.take() {
305            // Can only fail if receiver is dropped, but then we're already resumed.
306            sender.send(()).ok();
307        }
308    }
309}
310
311#[derive(Debug)]
312struct SchedBox {
313    /// Address at the used chatmail/email relay
314    addr: String,
315
316    /// Folder name
317    folder: String,
318
319    conn_state: ImapConnectionState,
320
321    /// IMAP loop task handle.
322    handle: task::JoinHandle<()>,
323}
324
325/// Job and connection scheduler.
326#[derive(Debug)]
327pub(crate) struct Scheduler {
328    /// Inboxes, one per transport.
329    inboxes: Vec<SchedBox>,
330    smtp: SmtpConnectionState,
331    smtp_handle: task::JoinHandle<()>,
332    ephemeral_handle: task::JoinHandle<()>,
333    ephemeral_interrupt_send: Sender<()>,
334    location_handle: task::JoinHandle<()>,
335    location_interrupt_send: Sender<()>,
336
337    recently_seen_loop: RecentlySeenLoop,
338}
339
340async fn inbox_loop(
341    ctx: Context,
342    started: oneshot::Sender<()>,
343    inbox_handlers: ImapConnectionHandlers,
344) {
345    use futures::future::FutureExt;
346
347    info!(ctx, "Starting inbox loop.");
348    let ImapConnectionHandlers {
349        mut connection,
350        stop_token,
351    } = inbox_handlers;
352
353    let transport_id = connection.transport_id();
354    let ctx1 = ctx.clone();
355    let fut = async move {
356        let ctx = ctx1;
357        if let Err(()) = started.send(()) {
358            warn!(ctx, "Inbox loop, missing started receiver.");
359            return;
360        };
361
362        let mut old_session: Option<Session> = None;
363        loop {
364            let session = if let Some(session) = old_session.take() {
365                session
366            } else {
367                info!(
368                    ctx,
369                    "Transport {transport_id}: Preparing new IMAP session for inbox."
370                );
371                match connection.prepare(&ctx).await {
372                    Err(err) => {
373                        warn!(
374                            ctx,
375                            "Transport {transport_id}: Failed to prepare inbox connection: {err:#}."
376                        );
377                        continue;
378                    }
379                    Ok(session) => {
380                        info!(
381                            ctx,
382                            "Transport {transport_id}: Prepared new IMAP session for inbox."
383                        );
384                        session
385                    }
386                }
387            };
388
389            match inbox_fetch_idle(&ctx, &mut connection, session).await {
390                Err(err) => warn!(
391                    ctx,
392                    "Transport {transport_id}: Failed inbox fetch_idle: {err:#}."
393                ),
394                Ok(session) => {
395                    old_session = Some(session);
396                }
397            }
398        }
399    };
400
401    stop_token
402        .cancelled()
403        .map(|_| {
404            info!(ctx, "Transport {transport_id}: Shutting down inbox loop.");
405        })
406        .race(fut)
407        .await;
408}
409
410async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
411    let transport_id = session.transport_id();
412
413    // Update quota no more than once a minute.
414    if ctx.quota_needs_update(session.transport_id(), 60).await
415        && let Err(err) = ctx.update_recent_quota(&mut session, &imap.folder).await
416    {
417        warn!(
418            ctx,
419            "Transport {transport_id}: Failed to update quota: {err:#}."
420        );
421    }
422
423    if let Ok(()) = imap.resync_request_receiver.try_recv()
424        && let Err(err) = session.resync_folders(ctx).await
425    {
426        warn!(
427            ctx,
428            "Transport {transport_id}: Failed to resync folders: {err:#}."
429        );
430        imap.resync_request_sender.try_send(()).ok();
431    }
432
433    maybe_add_time_based_warnings(ctx).await;
434
435    match ctx.get_config_i64(Config::LastHousekeeping).await {
436        Ok(last_housekeeping_time) => {
437            let next_housekeeping_time =
438                last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
439            if next_housekeeping_time <= time() {
440                sql::housekeeping(ctx).await.log_err(ctx).ok();
441            }
442        }
443        Err(err) => {
444            warn!(
445                ctx,
446                "Transport {transport_id}: Failed to get last housekeeping time: {err:#}"
447            );
448        }
449    };
450
451    maybe_send_stats(ctx).await.log_err(ctx).ok();
452
453    session
454        .update_metadata(ctx)
455        .await
456        .context("update_metadata")?;
457    if let Err(err) = session.register_token(ctx).await {
458        warn!(
459            ctx,
460            "Transport {transport_id}: Failed to register push token: {err:#}."
461        );
462    }
463
464    let session = fetch_idle(ctx, imap, session).await?;
465    Ok(session)
466}
467
468/// Implement a single iteration of IMAP loop.
469///
470/// This function performs all IMAP operations on a single folder, selecting it if necessary and
471/// handling all the errors. In case of an error, an error is returned and connection is dropped,
472/// otherwise connection is returned.
473async fn fetch_idle(ctx: &Context, connection: &mut Imap, mut session: Session) -> Result<Session> {
474    let transport_id = session.transport_id();
475
476    let watch_folder = connection.folder.clone();
477
478    session
479        .store_seen_flags_on_imap(ctx)
480        .await
481        .context("store_seen_flags_on_imap")?;
482
483    // Fetch the watched folder.
484    connection
485        .fetch_move_delete(ctx, &mut session, &watch_folder)
486        .await
487        .context("fetch_move_delete")?;
488
489    download_known_post_messages_without_pre_message(ctx, &mut session).await?;
490    download_msgs(ctx, &mut session)
491        .await
492        .context("download_msgs")?;
493
494    connection.connectivity.set_idle(ctx);
495
496    ctx.emit_event(EventType::ImapInboxIdle);
497
498    if !session.can_idle() {
499        info!(
500            ctx,
501            "Transport {transport_id}: IMAP session does not support IDLE, going to fake idle."
502        );
503        connection.fake_idle(ctx, &watch_folder).await?;
504        return Ok(session);
505    }
506
507    if ctx
508        .get_config_bool(Config::DisableIdle)
509        .await
510        .context("Failed to get disable_idle config")
511        .log_err(ctx)
512        .unwrap_or_default()
513    {
514        info!(
515            ctx,
516            "Transport {transport_id}: IMAP IDLE is disabled, going to fake idle."
517        );
518        connection.fake_idle(ctx, &watch_folder).await?;
519        return Ok(session);
520    }
521
522    let session = session
523        .idle(
524            ctx,
525            connection.idle_interrupt_receiver.clone(),
526            &watch_folder,
527        )
528        .await
529        .context("idle")?;
530
531    Ok(session)
532}
533
534async fn smtp_loop(
535    ctx: Context,
536    started: oneshot::Sender<()>,
537    smtp_handlers: SmtpConnectionHandlers,
538) {
539    use futures::future::FutureExt;
540
541    info!(ctx, "Starting SMTP loop.");
542    let SmtpConnectionHandlers {
543        mut connection,
544        stop_token,
545        idle_interrupt_receiver,
546    } = smtp_handlers;
547
548    let ctx1 = ctx.clone();
549    let fut = async move {
550        let ctx = ctx1;
551        if let Err(()) = started.send(()) {
552            warn!(&ctx, "SMTP loop, missing started receiver.");
553            return;
554        }
555
556        let mut timeout = None;
557        loop {
558            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
559                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
560                timeout = Some(timeout.unwrap_or(30));
561            } else {
562                timeout = None;
563                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
564                if !duration_until_can_send.is_zero() {
565                    info!(
566                        ctx,
567                        "smtp got rate limited, waiting for {} until can send again",
568                        duration_to_str(duration_until_can_send)
569                    );
570                    tokio::time::sleep(duration_until_can_send).await;
571                    continue;
572                }
573            }
574
575            stats::maybe_update_message_stats(&ctx)
576                .await
577                .log_err(&ctx)
578                .ok();
579
580            // Fake Idle
581            info!(ctx, "SMTP fake idle started.");
582            match &connection.last_send_error {
583                None => connection.connectivity.set_idle(&ctx),
584                Some(err) => connection.connectivity.set_err(&ctx, err.clone()),
585            }
586
587            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
588            // sending is retried (at the latest) after the timeout. If sending fails
589            // again, we increase the timeout exponentially, in order not to do lots of
590            // unnecessary retries.
591            if let Some(t) = timeout {
592                let now = tools::Time::now();
593                info!(
594                    ctx,
595                    "SMTP has messages to retry, planning to retry {t} seconds later."
596                );
597                let duration = std::time::Duration::from_secs(t);
598                tokio::time::timeout(duration, async {
599                    idle_interrupt_receiver.recv().await.unwrap_or_default()
600                })
601                .await
602                .unwrap_or_default();
603                let slept = time_elapsed(&now).as_secs();
604                timeout = Some(cmp::max(
605                    t,
606                    slept.saturating_add(rand::random_range((slept / 2)..=slept)),
607                ));
608            } else {
609                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
610                idle_interrupt_receiver.recv().await.unwrap_or_default();
611            };
612
613            info!(ctx, "SMTP fake idle interrupted.")
614        }
615    };
616
617    stop_token
618        .cancelled()
619        .map(|_| {
620            info!(ctx, "Shutting down SMTP loop.");
621        })
622        .race(fut)
623        .await;
624}
625
626impl Scheduler {
627    /// Start the scheduler.
628    pub async fn start(ctx: &Context) -> Result<Self> {
629        let (smtp, smtp_handlers) = SmtpConnectionState::new();
630
631        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
632        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
633        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
634
635        let mut inboxes = Vec::new();
636        let mut start_recvs = Vec::new();
637
638        for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
639            let (conn_state, inbox_handlers) =
640                ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
641            let (inbox_start_send, inbox_start_recv) = oneshot::channel();
642            let handle = {
643                let ctx = ctx.clone();
644                task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
645            };
646            let addr = configured_login_param.addr.clone();
647            let folder = configured_login_param
648                .imap_folder
649                .unwrap_or_else(|| "INBOX".to_string());
650            let inbox = SchedBox {
651                addr: addr.clone(),
652                folder,
653                conn_state,
654                handle,
655            };
656            inboxes.push(inbox);
657            start_recvs.push(inbox_start_recv);
658        }
659
660        let smtp_handle = {
661            let ctx = ctx.clone();
662            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
663        };
664        start_recvs.push(smtp_start_recv);
665
666        let ephemeral_handle = {
667            let ctx = ctx.clone();
668            task::spawn(async move {
669                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
670            })
671        };
672
673        let location_handle = {
674            let ctx = ctx.clone();
675            task::spawn(async move {
676                location::location_loop(&ctx, location_interrupt_recv).await;
677            })
678        };
679
680        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
681
682        let res = Self {
683            inboxes,
684            smtp,
685            smtp_handle,
686            ephemeral_handle,
687            ephemeral_interrupt_send,
688            location_handle,
689            location_interrupt_send,
690            recently_seen_loop,
691        };
692
693        // wait for all loops to be started
694        if let Err(err) = try_join_all(start_recvs).await {
695            bail!("failed to start scheduler: {err}");
696        }
697
698        info!(ctx, "scheduler is running");
699        Ok(res)
700    }
701
702    fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
703        self.inboxes.iter()
704    }
705
706    fn maybe_network(&self) {
707        for b in self.boxes() {
708            b.conn_state.interrupt();
709        }
710        self.interrupt_smtp();
711    }
712
713    fn maybe_network_lost(&self) {
714        for b in self.boxes() {
715            b.conn_state.interrupt();
716        }
717        self.interrupt_smtp();
718    }
719
720    fn interrupt_inbox(&self) {
721        for b in &self.inboxes {
722            b.conn_state.interrupt();
723        }
724    }
725
726    fn interrupt_smtp(&self) {
727        self.smtp.interrupt();
728    }
729
730    fn interrupt_ephemeral_task(&self) {
731        self.ephemeral_interrupt_send.try_send(()).ok();
732    }
733
734    fn interrupt_location(&self) {
735        self.location_interrupt_send.try_send(()).ok();
736    }
737
738    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
739        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
740    }
741
742    /// Halt the scheduler.
743    ///
744    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
745    /// are forcefully terminated if they cannot shutdown within the timeout.
746    pub(crate) async fn stop(self, context: &Context) {
747        // Send stop signals to tasks so they can shutdown cleanly.
748        for b in self.boxes() {
749            b.conn_state.stop();
750        }
751        self.smtp.stop();
752
753        // Actually shutdown tasks.
754        let timeout_duration = std::time::Duration::from_secs(30);
755
756        let tracker = TaskTracker::new();
757        for b in self.inboxes {
758            let context = context.clone();
759            tracker.spawn(async move {
760                tokio::time::timeout(timeout_duration, b.handle)
761                    .await
762                    .log_err(&context)
763            });
764        }
765        {
766            let context = context.clone();
767            tracker.spawn(async move {
768                tokio::time::timeout(timeout_duration, self.smtp_handle)
769                    .await
770                    .log_err(&context)
771            });
772        }
773        tracker.close();
774        tracker.wait().await;
775
776        // Abort tasks, then await them to ensure the `Future` is dropped.
777        // Just aborting the task may keep resources such as `Context` clone
778        // moved into it indefinitely, resulting in database not being
779        // closed etc.
780        self.ephemeral_handle.abort();
781        self.ephemeral_handle.await.ok();
782        self.location_handle.abort();
783        self.location_handle.await.ok();
784        self.recently_seen_loop.abort().await;
785    }
786}
787
788/// Connection state logic shared between imap and smtp connections.
789#[derive(Debug)]
790struct ConnectionState {
791    /// Cancellation token to interrupt the whole connection.
792    stop_token: CancellationToken,
793    /// Channel to interrupt idle.
794    idle_interrupt_sender: Sender<()>,
795    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
796    connectivity: ConnectivityStore,
797}
798
799impl ConnectionState {
800    /// Shutdown this connection completely.
801    fn stop(&self) {
802        // Trigger shutdown of the run loop.
803        self.stop_token.cancel();
804    }
805
806    fn interrupt(&self) {
807        // Use try_send to avoid blocking on interrupts.
808        self.idle_interrupt_sender.try_send(()).ok();
809    }
810}
811
812#[derive(Debug)]
813pub(crate) struct SmtpConnectionState {
814    state: ConnectionState,
815}
816
817impl SmtpConnectionState {
818    fn new() -> (Self, SmtpConnectionHandlers) {
819        let stop_token = CancellationToken::new();
820        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
821
822        let handlers = SmtpConnectionHandlers {
823            connection: Smtp::new(),
824            stop_token: stop_token.clone(),
825            idle_interrupt_receiver,
826        };
827
828        let state = ConnectionState {
829            stop_token,
830            idle_interrupt_sender,
831            connectivity: handlers.connection.connectivity.clone(),
832        };
833
834        let conn = SmtpConnectionState { state };
835
836        (conn, handlers)
837    }
838
839    /// Interrupt any form of idle.
840    fn interrupt(&self) {
841        self.state.interrupt();
842    }
843
844    /// Shutdown this connection completely.
845    fn stop(&self) {
846        self.state.stop();
847    }
848}
849
850struct SmtpConnectionHandlers {
851    connection: Smtp,
852    stop_token: CancellationToken,
853    idle_interrupt_receiver: Receiver<()>,
854}
855
856#[derive(Debug)]
857pub(crate) struct ImapConnectionState {
858    state: ConnectionState,
859}
860
861impl ImapConnectionState {
862    /// Construct a new connection.
863    async fn new(
864        context: &Context,
865        transport_id: u32,
866        login_param: ConfiguredLoginParam,
867    ) -> Result<(Self, ImapConnectionHandlers)> {
868        let stop_token = CancellationToken::new();
869        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
870
871        let handlers = ImapConnectionHandlers {
872            connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
873                .await?,
874            stop_token: stop_token.clone(),
875        };
876
877        let state = ConnectionState {
878            stop_token,
879            idle_interrupt_sender,
880            connectivity: handlers.connection.connectivity.clone(),
881        };
882
883        let conn = ImapConnectionState { state };
884
885        Ok((conn, handlers))
886    }
887
888    /// Interrupt any form of idle.
889    fn interrupt(&self) {
890        self.state.interrupt();
891    }
892
893    /// Shutdown this connection completely.
894    fn stop(&self) {
895        self.state.stop();
896    }
897}
898
899#[derive(Debug)]
900struct ImapConnectionHandlers {
901    connection: Imap,
902    stop_token: CancellationToken,
903}