deltachat/
scheduler.rs

1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::Config;
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{FolderMeaning, Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::smtp::{Smtp, send_smtp_messages};
24use crate::sql;
25use crate::stats::maybe_send_stats;
26use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
27use crate::transport::ConfiguredLoginParam;
28use crate::{constants, stats};
29
30pub(crate) mod connectivity;
31
32/// State of the IO scheduler, as stored on the [`Context`].
33///
34/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
35/// the IO scheduler will be restarted only if it was running before paused or
36/// [`Context::start_io`] was called in the meantime while it was paused.
37#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39    inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43    pub(crate) fn new() -> Self {
44        Default::default()
45    }
46
47    /// Whether the scheduler is currently running.
48    pub(crate) async fn is_running(&self) -> bool {
49        let inner = self.inner.read().await;
50        matches!(*inner, InnerSchedulerState::Started(_))
51    }
52
53    /// Starts the scheduler if it is not yet started.
54    pub(crate) async fn start(&self, context: &Context) {
55        let mut inner = self.inner.write().await;
56        match *inner {
57            InnerSchedulerState::Started(_) => (),
58            InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
59            InnerSchedulerState::Paused {
60                ref mut started, ..
61            } => *started = true,
62        }
63        context.update_connectivities(&inner);
64    }
65
66    /// Starts the scheduler if it is not yet started.
67    async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
68        info!(context, "starting IO");
69
70        // Notify message processing loop
71        // to allow processing old messages after restart.
72        context.new_msgs_notify.notify_one();
73
74        match Scheduler::start(context).await {
75            Ok(scheduler) => {
76                *inner = InnerSchedulerState::Started(scheduler);
77                context.emit_event(EventType::ConnectivityChanged);
78            }
79            Err(err) => error!(context, "Failed to start IO: {:#}", err),
80        }
81    }
82
83    /// Stops the scheduler if it is currently running.
84    pub(crate) async fn stop(&self, context: &Context) {
85        let mut inner = self.inner.write().await;
86        match *inner {
87            InnerSchedulerState::Started(_) => {
88                Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
89            }
90            InnerSchedulerState::Stopped => (),
91            InnerSchedulerState::Paused {
92                ref mut started, ..
93            } => *started = false,
94        }
95        context.update_connectivities(&inner);
96    }
97
98    /// Stops the scheduler if it is currently running.
99    async fn do_stop(
100        inner: &mut InnerSchedulerState,
101        context: &Context,
102        new_state: InnerSchedulerState,
103    ) {
104        // Sending an event wakes up event pollers (get_next_event)
105        // so the caller of stop_io() can arrange for proper termination.
106        // For this, the caller needs to instruct the event poller
107        // to terminate on receiving the next event and then call stop_io()
108        // which will emit the below event(s)
109        info!(context, "stopping IO");
110
111        // Wake up message processing loop even if there are no messages
112        // to allow for clean shutdown.
113        context.new_msgs_notify.notify_one();
114
115        let debug_logging = context
116            .debug_logging
117            .write()
118            .expect("RwLock is poisoned")
119            .take();
120        if let Some(debug_logging) = debug_logging {
121            debug_logging.loop_handle.abort();
122            debug_logging.loop_handle.await.ok();
123        }
124        let prev_state = std::mem::replace(inner, new_state);
125        context.emit_event(EventType::ConnectivityChanged);
126        match prev_state {
127            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
128            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
129        }
130    }
131
132    /// Pauses the IO scheduler.
133    ///
134    /// If it is currently running the scheduler will be stopped.  When the
135    /// [`IoPausedGuard`] is dropped the scheduler is started again.
136    ///
137    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
138    /// resume will do the right thing and restore the scheduler to the state requested by
139    /// the last call.
140    pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
141        {
142            let mut inner = self.inner.write().await;
143            match *inner {
144                InnerSchedulerState::Started(_) => {
145                    let new_state = InnerSchedulerState::Paused {
146                        started: true,
147                        pause_guards_count: NonZeroUsize::MIN,
148                    };
149                    Self::do_stop(&mut inner, context, new_state).await;
150                }
151                InnerSchedulerState::Stopped => {
152                    *inner = InnerSchedulerState::Paused {
153                        started: false,
154                        pause_guards_count: NonZeroUsize::MIN,
155                    };
156                }
157                InnerSchedulerState::Paused {
158                    ref mut pause_guards_count,
159                    ..
160                } => {
161                    *pause_guards_count = pause_guards_count
162                        .checked_add(1)
163                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
164                }
165            }
166            context.update_connectivities(&inner);
167        }
168
169        let (tx, rx) = oneshot::channel();
170        let context = context.clone();
171        tokio::spawn(async move {
172            rx.await.ok();
173            let mut inner = context.scheduler.inner.write().await;
174            match *inner {
175                InnerSchedulerState::Started(_) => {
176                    warn!(&context, "IoPausedGuard resume: started instead of paused");
177                }
178                InnerSchedulerState::Stopped => {
179                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
180                }
181                InnerSchedulerState::Paused {
182                    ref started,
183                    ref mut pause_guards_count,
184                } => {
185                    if *pause_guards_count == NonZeroUsize::MIN {
186                        match *started {
187                            true => SchedulerState::do_start(&mut inner, &context).await,
188                            false => *inner = InnerSchedulerState::Stopped,
189                        }
190                    } else {
191                        let new_count = pause_guards_count.get() - 1;
192                        // SAFETY: Value was >=2 before due to if condition
193                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
194                    }
195                }
196            }
197            context.update_connectivities(&inner);
198        });
199        Ok(IoPausedGuard { sender: Some(tx) })
200    }
201
202    /// Restarts the scheduler, only if it is running.
203    pub(crate) async fn restart(&self, context: &Context) {
204        info!(context, "restarting IO");
205        if self.is_running().await {
206            self.stop(context).await;
207            self.start(context).await;
208        }
209    }
210
211    /// Indicate that the network likely has come back.
212    pub(crate) async fn maybe_network(&self) {
213        let inner = self.inner.read().await;
214        let (inboxes, oboxes) = match *inner {
215            InnerSchedulerState::Started(ref scheduler) => {
216                scheduler.maybe_network();
217                let inboxes = scheduler
218                    .inboxes
219                    .iter()
220                    .map(|b| b.conn_state.state.connectivity.clone())
221                    .collect::<Vec<_>>();
222                let oboxes = scheduler
223                    .oboxes
224                    .iter()
225                    .map(|b| b.conn_state.state.connectivity.clone())
226                    .collect::<Vec<_>>();
227                (inboxes, oboxes)
228            }
229            _ => return,
230        };
231        drop(inner);
232        connectivity::idle_interrupted(inboxes, oboxes);
233    }
234
235    /// Indicate that the network likely is lost.
236    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
237        let inner = self.inner.read().await;
238        let stores = match *inner {
239            InnerSchedulerState::Started(ref scheduler) => {
240                scheduler.maybe_network_lost();
241                scheduler
242                    .boxes()
243                    .map(|b| b.conn_state.state.connectivity.clone())
244                    .collect()
245            }
246            _ => return,
247        };
248        drop(inner);
249        connectivity::maybe_network_lost(context, stores);
250    }
251
252    pub(crate) async fn interrupt_inbox(&self) {
253        let inner = self.inner.read().await;
254        if let InnerSchedulerState::Started(ref scheduler) = *inner {
255            scheduler.interrupt_inbox();
256        }
257    }
258
259    pub(crate) async fn interrupt_smtp(&self) {
260        let inner = self.inner.read().await;
261        if let InnerSchedulerState::Started(ref scheduler) = *inner {
262            scheduler.interrupt_smtp();
263        }
264    }
265
266    pub(crate) async fn interrupt_ephemeral_task(&self) {
267        let inner = self.inner.read().await;
268        if let InnerSchedulerState::Started(ref scheduler) = *inner {
269            scheduler.interrupt_ephemeral_task();
270        }
271    }
272
273    pub(crate) async fn interrupt_location(&self) {
274        let inner = self.inner.read().await;
275        if let InnerSchedulerState::Started(ref scheduler) = *inner {
276            scheduler.interrupt_location();
277        }
278    }
279
280    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
281        let inner = self.inner.read().await;
282        if let InnerSchedulerState::Started(ref scheduler) = *inner {
283            scheduler.interrupt_recently_seen(contact_id, timestamp);
284        }
285    }
286}
287
288#[derive(Debug, Default)]
289pub(crate) enum InnerSchedulerState {
290    Started(Scheduler),
291    #[default]
292    Stopped,
293    Paused {
294        started: bool,
295        pause_guards_count: NonZeroUsize,
296    },
297}
298
299/// Guard to make sure the IO Scheduler is resumed.
300///
301/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
302/// guard.
303#[derive(Default, Debug)]
304pub(crate) struct IoPausedGuard {
305    sender: Option<oneshot::Sender<()>>,
306}
307
308impl Drop for IoPausedGuard {
309    fn drop(&mut self) {
310        if let Some(sender) = self.sender.take() {
311            // Can only fail if receiver is dropped, but then we're already resumed.
312            sender.send(()).ok();
313        }
314    }
315}
316
317#[derive(Debug)]
318struct SchedBox {
319    /// Address at the used chatmail/email relay
320    addr: String,
321    meaning: FolderMeaning,
322    conn_state: ImapConnectionState,
323
324    /// IMAP loop task handle.
325    handle: task::JoinHandle<()>,
326}
327
328/// Job and connection scheduler.
329#[derive(Debug)]
330pub(crate) struct Scheduler {
331    /// Inboxes, one per transport.
332    inboxes: Vec<SchedBox>,
333    /// Optional boxes -- mvbox.
334    oboxes: Vec<SchedBox>,
335    smtp: SmtpConnectionState,
336    smtp_handle: task::JoinHandle<()>,
337    ephemeral_handle: task::JoinHandle<()>,
338    ephemeral_interrupt_send: Sender<()>,
339    location_handle: task::JoinHandle<()>,
340    location_interrupt_send: Sender<()>,
341
342    recently_seen_loop: RecentlySeenLoop,
343}
344
345async fn inbox_loop(
346    ctx: Context,
347    started: oneshot::Sender<()>,
348    inbox_handlers: ImapConnectionHandlers,
349) {
350    use futures::future::FutureExt;
351
352    info!(ctx, "Starting inbox loop.");
353    let ImapConnectionHandlers {
354        mut connection,
355        stop_token,
356    } = inbox_handlers;
357
358    let ctx1 = ctx.clone();
359    let fut = async move {
360        let ctx = ctx1;
361        if let Err(()) = started.send(()) {
362            warn!(ctx, "Inbox loop, missing started receiver.");
363            return;
364        };
365
366        let mut old_session: Option<Session> = None;
367        loop {
368            let session = if let Some(session) = old_session.take() {
369                session
370            } else {
371                info!(ctx, "Preparing new IMAP session for inbox.");
372                match connection.prepare(&ctx).await {
373                    Err(err) => {
374                        warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
375                        continue;
376                    }
377                    Ok(session) => session,
378                }
379            };
380
381            match inbox_fetch_idle(&ctx, &mut connection, session).await {
382                Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
383                Ok(session) => {
384                    info!(
385                        ctx,
386                        "IMAP loop iteration for inbox finished, keeping the session."
387                    );
388                    old_session = Some(session);
389                }
390            }
391        }
392    };
393
394    stop_token
395        .cancelled()
396        .map(|_| {
397            info!(ctx, "Shutting down inbox loop.");
398        })
399        .race(fut)
400        .await;
401}
402
403/// Convert folder meaning
404/// used internally by [fetch_idle] and [Context::background_fetch].
405///
406/// Returns folder configuration key and folder name
407/// if such folder is configured, `Ok(None)` otherwise.
408pub async fn convert_folder_meaning(
409    ctx: &Context,
410    folder_meaning: FolderMeaning,
411) -> Result<Option<(Config, String)>> {
412    let folder_config = match folder_meaning.to_config() {
413        Some(c) => c,
414        None => {
415            // Such folder cannot be configured,
416            // e.g. a `FolderMeaning::Spam` folder.
417            return Ok(None);
418        }
419    };
420
421    let folder = ctx
422        .get_config(folder_config)
423        .await
424        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
425
426    if let Some(watch_folder) = folder {
427        Ok(Some((folder_config, watch_folder)))
428    } else {
429        Ok(None)
430    }
431}
432
433async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
434    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
435        ctx.set_config_internal(
436            Config::IsChatmail,
437            crate::config::from_bool(session.is_chatmail()),
438        )
439        .await?;
440    }
441
442    // Update quota no more than once a minute.
443    if ctx.quota_needs_update(session.transport_id(), 60).await
444        && let Err(err) = ctx.update_recent_quota(&mut session).await
445    {
446        warn!(ctx, "Failed to update quota: {:#}.", err);
447    }
448
449    if let Ok(()) = imap.resync_request_receiver.try_recv()
450        && let Err(err) = session.resync_folders(ctx).await
451    {
452        warn!(ctx, "Failed to resync folders: {:#}.", err);
453        imap.resync_request_sender.try_send(()).ok();
454    }
455
456    maybe_add_time_based_warnings(ctx).await;
457
458    match ctx.get_config_i64(Config::LastHousekeeping).await {
459        Ok(last_housekeeping_time) => {
460            let next_housekeeping_time =
461                last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
462            if next_housekeeping_time <= time() {
463                sql::housekeeping(ctx).await.log_err(ctx).ok();
464            }
465        }
466        Err(err) => {
467            warn!(ctx, "Failed to get last housekeeping time: {}", err);
468        }
469    };
470
471    maybe_send_stats(ctx).await.log_err(ctx).ok();
472
473    session
474        .update_metadata(ctx)
475        .await
476        .context("update_metadata")?;
477    session
478        .register_token(ctx)
479        .await
480        .context("Failed to register push token")?;
481
482    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
483    Ok(session)
484}
485
486/// Implement a single iteration of IMAP loop.
487///
488/// This function performs all IMAP operations on a single folder, selecting it if necessary and
489/// handling all the errors. In case of an error, an error is returned and connection is dropped,
490/// otherwise connection is returned.
491async fn fetch_idle(
492    ctx: &Context,
493    connection: &mut Imap,
494    mut session: Session,
495    folder_meaning: FolderMeaning,
496) -> Result<Session> {
497    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
498    else {
499        // The folder is not configured.
500        // For example, this happens if the server does not have Sent folder
501        // but watching Sent folder is enabled.
502        connection.connectivity.set_not_configured(ctx);
503        connection.idle_interrupt_receiver.recv().await.ok();
504        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
505    };
506
507    if folder_config == Config::ConfiguredInboxFolder {
508        session
509            .store_seen_flags_on_imap(ctx)
510            .await
511            .context("store_seen_flags_on_imap")?;
512    }
513
514    // Fetch the watched folder.
515    connection
516        .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
517        .await
518        .context("fetch_move_delete")?;
519
520    // Mark expired messages for deletion. Marked messages will be deleted from the server
521    // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
522    // called right before `fetch_move_delete` because it is not well optimized and would
523    // otherwise slow down message fetching.
524    delete_expired_imap_messages(ctx)
525        .await
526        .context("delete_expired_imap_messages")?;
527
528    download_known_post_messages_without_pre_message(ctx, &mut session).await?;
529    download_msgs(ctx, &mut session)
530        .await
531        .context("download_msgs")?;
532
533    // Scan additional folders only after finishing fetching the watched folder.
534    //
535    // On iOS the application has strictly limited time to work in background, so we may not
536    // be able to scan all folders before time is up if there are many of them.
537    if folder_config == Config::ConfiguredInboxFolder {
538        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
539        match connection
540            .scan_folders(ctx, &mut session)
541            .await
542            .context("scan_folders")
543        {
544            Err(err) => {
545                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
546                // but maybe just one folder can't be selected or something
547                warn!(ctx, "{:#}", err);
548            }
549            Ok(true) => {
550                // Fetch the watched folder again in case scanning other folder moved messages
551                // there.
552                //
553                // In most cases this will select the watched folder and return because there are
554                // no new messages. We want to select the watched folder anyway before going IDLE
555                // there, so this does not take additional protocol round-trip.
556                connection
557                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
558                    .await
559                    .context("fetch_move_delete after scan_folders")?;
560            }
561            Ok(false) => {}
562        }
563    }
564
565    // Synchronize Seen flags.
566    session
567        .sync_seen_flags(ctx, &watch_folder)
568        .await
569        .context("sync_seen_flags")
570        .log_err(ctx)
571        .ok();
572
573    connection.connectivity.set_idle(ctx);
574
575    ctx.emit_event(EventType::ImapInboxIdle);
576
577    if !session.can_idle() {
578        info!(
579            ctx,
580            "IMAP session does not support IDLE, going to fake idle."
581        );
582        connection.fake_idle(ctx, watch_folder).await?;
583        return Ok(session);
584    }
585
586    if ctx
587        .get_config_bool(Config::DisableIdle)
588        .await
589        .context("Failed to get disable_idle config")
590        .log_err(ctx)
591        .unwrap_or_default()
592    {
593        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
594        connection.fake_idle(ctx, watch_folder).await?;
595        return Ok(session);
596    }
597
598    info!(
599        ctx,
600        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
601    );
602    let session = session
603        .idle(
604            ctx,
605            connection.idle_interrupt_receiver.clone(),
606            &watch_folder,
607        )
608        .await
609        .context("idle")?;
610
611    Ok(session)
612}
613
614/// Simplified IMAP loop to watch non-inbox folders.
615async fn simple_imap_loop(
616    ctx: Context,
617    started: oneshot::Sender<()>,
618    inbox_handlers: ImapConnectionHandlers,
619    folder_meaning: FolderMeaning,
620) {
621    use futures::future::FutureExt;
622
623    info!(ctx, "Starting simple loop for {folder_meaning}.");
624    let ImapConnectionHandlers {
625        mut connection,
626        stop_token,
627    } = inbox_handlers;
628
629    let ctx1 = ctx.clone();
630
631    let fut = async move {
632        let ctx = ctx1;
633        if let Err(()) = started.send(()) {
634            warn!(
635                ctx,
636                "Simple imap loop for {folder_meaning}, missing started receiver."
637            );
638            return;
639        }
640
641        let mut old_session: Option<Session> = None;
642        loop {
643            let session = if let Some(session) = old_session.take() {
644                session
645            } else {
646                info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
647                match connection.prepare(&ctx).await {
648                    Err(err) => {
649                        warn!(
650                            ctx,
651                            "Failed to prepare {folder_meaning} connection: {err:#}."
652                        );
653                        continue;
654                    }
655                    Ok(session) => session,
656                }
657            };
658
659            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
660                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
661                Ok(session) => {
662                    info!(
663                        ctx,
664                        "IMAP loop iteration for {folder_meaning} finished, keeping the session"
665                    );
666                    old_session = Some(session);
667                }
668            }
669        }
670    };
671
672    stop_token
673        .cancelled()
674        .map(|_| {
675            info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
676        })
677        .race(fut)
678        .await;
679}
680
681async fn smtp_loop(
682    ctx: Context,
683    started: oneshot::Sender<()>,
684    smtp_handlers: SmtpConnectionHandlers,
685) {
686    use futures::future::FutureExt;
687
688    info!(ctx, "Starting SMTP loop.");
689    let SmtpConnectionHandlers {
690        mut connection,
691        stop_token,
692        idle_interrupt_receiver,
693    } = smtp_handlers;
694
695    let ctx1 = ctx.clone();
696    let fut = async move {
697        let ctx = ctx1;
698        if let Err(()) = started.send(()) {
699            warn!(&ctx, "SMTP loop, missing started receiver.");
700            return;
701        }
702
703        let mut timeout = None;
704        loop {
705            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
706                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
707                timeout = Some(timeout.unwrap_or(30));
708            } else {
709                timeout = None;
710                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
711                if !duration_until_can_send.is_zero() {
712                    info!(
713                        ctx,
714                        "smtp got rate limited, waiting for {} until can send again",
715                        duration_to_str(duration_until_can_send)
716                    );
717                    tokio::time::sleep(duration_until_can_send).await;
718                    continue;
719                }
720            }
721
722            stats::maybe_update_message_stats(&ctx)
723                .await
724                .log_err(&ctx)
725                .ok();
726
727            // Fake Idle
728            info!(ctx, "SMTP fake idle started.");
729            match &connection.last_send_error {
730                None => connection.connectivity.set_idle(&ctx),
731                Some(err) => connection.connectivity.set_err(&ctx, err),
732            }
733
734            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
735            // sending is retried (at the latest) after the timeout. If sending fails
736            // again, we increase the timeout exponentially, in order not to do lots of
737            // unnecessary retries.
738            if let Some(t) = timeout {
739                let now = tools::Time::now();
740                info!(
741                    ctx,
742                    "SMTP has messages to retry, planning to retry {t} seconds later."
743                );
744                let duration = std::time::Duration::from_secs(t);
745                tokio::time::timeout(duration, async {
746                    idle_interrupt_receiver.recv().await.unwrap_or_default()
747                })
748                .await
749                .unwrap_or_default();
750                let slept = time_elapsed(&now).as_secs();
751                timeout = Some(cmp::max(
752                    t,
753                    slept.saturating_add(rand::random_range((slept / 2)..=slept)),
754                ));
755            } else {
756                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
757                idle_interrupt_receiver.recv().await.unwrap_or_default();
758            };
759
760            info!(ctx, "SMTP fake idle interrupted.")
761        }
762    };
763
764    stop_token
765        .cancelled()
766        .map(|_| {
767            info!(ctx, "Shutting down SMTP loop.");
768        })
769        .race(fut)
770        .await;
771}
772
773impl Scheduler {
774    /// Start the scheduler.
775    pub async fn start(ctx: &Context) -> Result<Self> {
776        let (smtp, smtp_handlers) = SmtpConnectionState::new();
777
778        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
779        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
780        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
781
782        let mut inboxes = Vec::new();
783        let mut oboxes = Vec::new();
784        let mut start_recvs = Vec::new();
785
786        for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
787            let (conn_state, inbox_handlers) =
788                ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
789            let (inbox_start_send, inbox_start_recv) = oneshot::channel();
790            let handle = {
791                let ctx = ctx.clone();
792                task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
793            };
794            let addr = configured_login_param.addr.clone();
795            let inbox = SchedBox {
796                addr: addr.clone(),
797                meaning: FolderMeaning::Inbox,
798                conn_state,
799                handle,
800            };
801            inboxes.push(inbox);
802            start_recvs.push(inbox_start_recv);
803
804            if ctx.should_watch_mvbox().await? {
805                let (conn_state, handlers) =
806                    ImapConnectionState::new(ctx, transport_id, configured_login_param).await?;
807                let (start_send, start_recv) = oneshot::channel();
808                let ctx = ctx.clone();
809                let meaning = FolderMeaning::Mvbox;
810                let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
811                oboxes.push(SchedBox {
812                    addr,
813                    meaning,
814                    conn_state,
815                    handle,
816                });
817                start_recvs.push(start_recv);
818            }
819        }
820
821        let smtp_handle = {
822            let ctx = ctx.clone();
823            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
824        };
825        start_recvs.push(smtp_start_recv);
826
827        let ephemeral_handle = {
828            let ctx = ctx.clone();
829            task::spawn(async move {
830                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
831            })
832        };
833
834        let location_handle = {
835            let ctx = ctx.clone();
836            task::spawn(async move {
837                location::location_loop(&ctx, location_interrupt_recv).await;
838            })
839        };
840
841        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
842
843        let res = Self {
844            inboxes,
845            oboxes,
846            smtp,
847            smtp_handle,
848            ephemeral_handle,
849            ephemeral_interrupt_send,
850            location_handle,
851            location_interrupt_send,
852            recently_seen_loop,
853        };
854
855        // wait for all loops to be started
856        if let Err(err) = try_join_all(start_recvs).await {
857            bail!("failed to start scheduler: {err}");
858        }
859
860        info!(ctx, "scheduler is running");
861        Ok(res)
862    }
863
864    fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
865        self.inboxes.iter().chain(self.oboxes.iter())
866    }
867
868    fn maybe_network(&self) {
869        for b in self.boxes() {
870            b.conn_state.interrupt();
871        }
872        self.interrupt_smtp();
873    }
874
875    fn maybe_network_lost(&self) {
876        for b in self.boxes() {
877            b.conn_state.interrupt();
878        }
879        self.interrupt_smtp();
880    }
881
882    fn interrupt_inbox(&self) {
883        for b in &self.inboxes {
884            b.conn_state.interrupt();
885        }
886    }
887
888    fn interrupt_smtp(&self) {
889        self.smtp.interrupt();
890    }
891
892    fn interrupt_ephemeral_task(&self) {
893        self.ephemeral_interrupt_send.try_send(()).ok();
894    }
895
896    fn interrupt_location(&self) {
897        self.location_interrupt_send.try_send(()).ok();
898    }
899
900    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
901        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
902    }
903
904    /// Halt the scheduler.
905    ///
906    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
907    /// are forcefully terminated if they cannot shutdown within the timeout.
908    pub(crate) async fn stop(self, context: &Context) {
909        // Send stop signals to tasks so they can shutdown cleanly.
910        for b in self.boxes() {
911            b.conn_state.stop();
912        }
913        self.smtp.stop();
914
915        // Actually shutdown tasks.
916        let timeout_duration = std::time::Duration::from_secs(30);
917
918        let tracker = TaskTracker::new();
919        for b in self.inboxes.into_iter().chain(self.oboxes.into_iter()) {
920            let context = context.clone();
921            tracker.spawn(async move {
922                tokio::time::timeout(timeout_duration, b.handle)
923                    .await
924                    .log_err(&context)
925            });
926        }
927        {
928            let context = context.clone();
929            tracker.spawn(async move {
930                tokio::time::timeout(timeout_duration, self.smtp_handle)
931                    .await
932                    .log_err(&context)
933            });
934        }
935        tracker.close();
936        tracker.wait().await;
937
938        // Abort tasks, then await them to ensure the `Future` is dropped.
939        // Just aborting the task may keep resources such as `Context` clone
940        // moved into it indefinitely, resulting in database not being
941        // closed etc.
942        self.ephemeral_handle.abort();
943        self.ephemeral_handle.await.ok();
944        self.location_handle.abort();
945        self.location_handle.await.ok();
946        self.recently_seen_loop.abort().await;
947    }
948}
949
950/// Connection state logic shared between imap and smtp connections.
951#[derive(Debug)]
952struct ConnectionState {
953    /// Cancellation token to interrupt the whole connection.
954    stop_token: CancellationToken,
955    /// Channel to interrupt idle.
956    idle_interrupt_sender: Sender<()>,
957    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
958    connectivity: ConnectivityStore,
959}
960
961impl ConnectionState {
962    /// Shutdown this connection completely.
963    fn stop(&self) {
964        // Trigger shutdown of the run loop.
965        self.stop_token.cancel();
966    }
967
968    fn interrupt(&self) {
969        // Use try_send to avoid blocking on interrupts.
970        self.idle_interrupt_sender.try_send(()).ok();
971    }
972}
973
974#[derive(Debug)]
975pub(crate) struct SmtpConnectionState {
976    state: ConnectionState,
977}
978
979impl SmtpConnectionState {
980    fn new() -> (Self, SmtpConnectionHandlers) {
981        let stop_token = CancellationToken::new();
982        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
983
984        let handlers = SmtpConnectionHandlers {
985            connection: Smtp::new(),
986            stop_token: stop_token.clone(),
987            idle_interrupt_receiver,
988        };
989
990        let state = ConnectionState {
991            stop_token,
992            idle_interrupt_sender,
993            connectivity: handlers.connection.connectivity.clone(),
994        };
995
996        let conn = SmtpConnectionState { state };
997
998        (conn, handlers)
999    }
1000
1001    /// Interrupt any form of idle.
1002    fn interrupt(&self) {
1003        self.state.interrupt();
1004    }
1005
1006    /// Shutdown this connection completely.
1007    fn stop(&self) {
1008        self.state.stop();
1009    }
1010}
1011
1012struct SmtpConnectionHandlers {
1013    connection: Smtp,
1014    stop_token: CancellationToken,
1015    idle_interrupt_receiver: Receiver<()>,
1016}
1017
1018#[derive(Debug)]
1019pub(crate) struct ImapConnectionState {
1020    state: ConnectionState,
1021}
1022
1023impl ImapConnectionState {
1024    /// Construct a new connection.
1025    async fn new(
1026        context: &Context,
1027        transport_id: u32,
1028        login_param: ConfiguredLoginParam,
1029    ) -> Result<(Self, ImapConnectionHandlers)> {
1030        let stop_token = CancellationToken::new();
1031        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1032
1033        let handlers = ImapConnectionHandlers {
1034            connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
1035                .await?,
1036            stop_token: stop_token.clone(),
1037        };
1038
1039        let state = ConnectionState {
1040            stop_token,
1041            idle_interrupt_sender,
1042            connectivity: handlers.connection.connectivity.clone(),
1043        };
1044
1045        let conn = ImapConnectionState { state };
1046
1047        Ok((conn, handlers))
1048    }
1049
1050    /// Interrupt any form of idle.
1051    fn interrupt(&self) {
1052        self.state.interrupt();
1053    }
1054
1055    /// Shutdown this connection completely.
1056    fn stop(&self) {
1057        self.state.stop();
1058    }
1059}
1060
1061#[derive(Debug)]
1062struct ImapConnectionHandlers {
1063    connection: Imap,
1064    stop_token: CancellationToken,
1065}