deltachat/
scheduler.rs

1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::{self, Config};
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{DownloadState, download_msg};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{FolderMeaning, Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::message::MsgId;
24use crate::smtp::{Smtp, send_smtp_messages};
25use crate::sql;
26use crate::stats::maybe_send_stats;
27use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
28use crate::transport::ConfiguredLoginParam;
29use crate::{constants, stats};
30
31pub(crate) mod connectivity;
32
33/// State of the IO scheduler, as stored on the [`Context`].
34///
35/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
36/// the IO scheduler will be restarted only if it was running before paused or
37/// [`Context::start_io`] was called in the meantime while it was paused.
38#[derive(Debug, Default)]
39pub(crate) struct SchedulerState {
40    inner: RwLock<InnerSchedulerState>,
41}
42
43impl SchedulerState {
44    pub(crate) fn new() -> Self {
45        Default::default()
46    }
47
48    /// Whether the scheduler is currently running.
49    pub(crate) async fn is_running(&self) -> bool {
50        let inner = self.inner.read().await;
51        matches!(*inner, InnerSchedulerState::Started(_))
52    }
53
54    /// Starts the scheduler if it is not yet started.
55    pub(crate) async fn start(&self, context: &Context) {
56        let mut inner = self.inner.write().await;
57        match *inner {
58            InnerSchedulerState::Started(_) => (),
59            InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
60            InnerSchedulerState::Paused {
61                ref mut started, ..
62            } => *started = true,
63        }
64        context.update_connectivities(&inner);
65    }
66
67    /// Starts the scheduler if it is not yet started.
68    async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
69        info!(context, "starting IO");
70
71        // Notify message processing loop
72        // to allow processing old messages after restart.
73        context.new_msgs_notify.notify_one();
74
75        match Scheduler::start(context).await {
76            Ok(scheduler) => {
77                *inner = InnerSchedulerState::Started(scheduler);
78                context.emit_event(EventType::ConnectivityChanged);
79            }
80            Err(err) => error!(context, "Failed to start IO: {:#}", err),
81        }
82    }
83
84    /// Stops the scheduler if it is currently running.
85    pub(crate) async fn stop(&self, context: &Context) {
86        let mut inner = self.inner.write().await;
87        match *inner {
88            InnerSchedulerState::Started(_) => {
89                Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
90            }
91            InnerSchedulerState::Stopped => (),
92            InnerSchedulerState::Paused {
93                ref mut started, ..
94            } => *started = false,
95        }
96        context.update_connectivities(&inner);
97    }
98
99    /// Stops the scheduler if it is currently running.
100    async fn do_stop(
101        inner: &mut InnerSchedulerState,
102        context: &Context,
103        new_state: InnerSchedulerState,
104    ) {
105        // Sending an event wakes up event pollers (get_next_event)
106        // so the caller of stop_io() can arrange for proper termination.
107        // For this, the caller needs to instruct the event poller
108        // to terminate on receiving the next event and then call stop_io()
109        // which will emit the below event(s)
110        info!(context, "stopping IO");
111
112        // Wake up message processing loop even if there are no messages
113        // to allow for clean shutdown.
114        context.new_msgs_notify.notify_one();
115
116        let debug_logging = context
117            .debug_logging
118            .write()
119            .expect("RwLock is poisoned")
120            .take();
121        if let Some(debug_logging) = debug_logging {
122            debug_logging.loop_handle.abort();
123            debug_logging.loop_handle.await.ok();
124        }
125        let prev_state = std::mem::replace(inner, new_state);
126        context.emit_event(EventType::ConnectivityChanged);
127        match prev_state {
128            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
129            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
130        }
131    }
132
133    /// Pauses the IO scheduler.
134    ///
135    /// If it is currently running the scheduler will be stopped.  When the
136    /// [`IoPausedGuard`] is dropped the scheduler is started again.
137    ///
138    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
139    /// resume will do the right thing and restore the scheduler to the state requested by
140    /// the last call.
141    pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
142        {
143            let mut inner = self.inner.write().await;
144            match *inner {
145                InnerSchedulerState::Started(_) => {
146                    let new_state = InnerSchedulerState::Paused {
147                        started: true,
148                        pause_guards_count: NonZeroUsize::MIN,
149                    };
150                    Self::do_stop(&mut inner, context, new_state).await;
151                }
152                InnerSchedulerState::Stopped => {
153                    *inner = InnerSchedulerState::Paused {
154                        started: false,
155                        pause_guards_count: NonZeroUsize::MIN,
156                    };
157                }
158                InnerSchedulerState::Paused {
159                    ref mut pause_guards_count,
160                    ..
161                } => {
162                    *pause_guards_count = pause_guards_count
163                        .checked_add(1)
164                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
165                }
166            }
167            context.update_connectivities(&inner);
168        }
169
170        let (tx, rx) = oneshot::channel();
171        let context = context.clone();
172        tokio::spawn(async move {
173            rx.await.ok();
174            let mut inner = context.scheduler.inner.write().await;
175            match *inner {
176                InnerSchedulerState::Started(_) => {
177                    warn!(&context, "IoPausedGuard resume: started instead of paused");
178                }
179                InnerSchedulerState::Stopped => {
180                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
181                }
182                InnerSchedulerState::Paused {
183                    ref started,
184                    ref mut pause_guards_count,
185                } => {
186                    if *pause_guards_count == NonZeroUsize::MIN {
187                        match *started {
188                            true => SchedulerState::do_start(&mut inner, &context).await,
189                            false => *inner = InnerSchedulerState::Stopped,
190                        }
191                    } else {
192                        let new_count = pause_guards_count.get() - 1;
193                        // SAFETY: Value was >=2 before due to if condition
194                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
195                    }
196                }
197            }
198            context.update_connectivities(&inner);
199        });
200        Ok(IoPausedGuard { sender: Some(tx) })
201    }
202
203    /// Restarts the scheduler, only if it is running.
204    pub(crate) async fn restart(&self, context: &Context) {
205        info!(context, "restarting IO");
206        if self.is_running().await {
207            self.stop(context).await;
208            self.start(context).await;
209        }
210    }
211
212    /// Indicate that the network likely has come back.
213    pub(crate) async fn maybe_network(&self) {
214        let inner = self.inner.read().await;
215        let (inboxes, oboxes) = match *inner {
216            InnerSchedulerState::Started(ref scheduler) => {
217                scheduler.maybe_network();
218                let inboxes = scheduler
219                    .inboxes
220                    .iter()
221                    .map(|b| b.conn_state.state.connectivity.clone())
222                    .collect::<Vec<_>>();
223                let oboxes = scheduler
224                    .oboxes
225                    .iter()
226                    .map(|b| b.conn_state.state.connectivity.clone())
227                    .collect::<Vec<_>>();
228                (inboxes, oboxes)
229            }
230            _ => return,
231        };
232        drop(inner);
233        connectivity::idle_interrupted(inboxes, oboxes);
234    }
235
236    /// Indicate that the network likely is lost.
237    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
238        let inner = self.inner.read().await;
239        let stores = match *inner {
240            InnerSchedulerState::Started(ref scheduler) => {
241                scheduler.maybe_network_lost();
242                scheduler
243                    .boxes()
244                    .map(|b| b.conn_state.state.connectivity.clone())
245                    .collect()
246            }
247            _ => return,
248        };
249        drop(inner);
250        connectivity::maybe_network_lost(context, stores);
251    }
252
253    pub(crate) async fn interrupt_inbox(&self) {
254        let inner = self.inner.read().await;
255        if let InnerSchedulerState::Started(ref scheduler) = *inner {
256            scheduler.interrupt_inbox();
257        }
258    }
259
260    /// Interrupt optional boxes (mvbox currently) loops.
261    pub(crate) async fn interrupt_oboxes(&self) {
262        let inner = self.inner.read().await;
263        if let InnerSchedulerState::Started(ref scheduler) = *inner {
264            scheduler.interrupt_oboxes();
265        }
266    }
267
268    pub(crate) async fn interrupt_smtp(&self) {
269        let inner = self.inner.read().await;
270        if let InnerSchedulerState::Started(ref scheduler) = *inner {
271            scheduler.interrupt_smtp();
272        }
273    }
274
275    pub(crate) async fn interrupt_ephemeral_task(&self) {
276        let inner = self.inner.read().await;
277        if let InnerSchedulerState::Started(ref scheduler) = *inner {
278            scheduler.interrupt_ephemeral_task();
279        }
280    }
281
282    pub(crate) async fn interrupt_location(&self) {
283        let inner = self.inner.read().await;
284        if let InnerSchedulerState::Started(ref scheduler) = *inner {
285            scheduler.interrupt_location();
286        }
287    }
288
289    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
290        let inner = self.inner.read().await;
291        if let InnerSchedulerState::Started(ref scheduler) = *inner {
292            scheduler.interrupt_recently_seen(contact_id, timestamp);
293        }
294    }
295}
296
297#[derive(Debug, Default)]
298pub(crate) enum InnerSchedulerState {
299    Started(Scheduler),
300    #[default]
301    Stopped,
302    Paused {
303        started: bool,
304        pause_guards_count: NonZeroUsize,
305    },
306}
307
308/// Guard to make sure the IO Scheduler is resumed.
309///
310/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
311/// guard.
312#[derive(Default, Debug)]
313pub(crate) struct IoPausedGuard {
314    sender: Option<oneshot::Sender<()>>,
315}
316
317impl Drop for IoPausedGuard {
318    fn drop(&mut self) {
319        if let Some(sender) = self.sender.take() {
320            // Can only fail if receiver is dropped, but then we're already resumed.
321            sender.send(()).ok();
322        }
323    }
324}
325
326#[derive(Debug)]
327struct SchedBox {
328    /// Hostname of used chatmail/email relay
329    host: String,
330    meaning: FolderMeaning,
331    conn_state: ImapConnectionState,
332
333    /// IMAP loop task handle.
334    handle: task::JoinHandle<()>,
335}
336
337/// Job and connection scheduler.
338#[derive(Debug)]
339pub(crate) struct Scheduler {
340    /// Inboxes, one per transport.
341    inboxes: Vec<SchedBox>,
342    /// Optional boxes -- mvbox.
343    oboxes: Vec<SchedBox>,
344    smtp: SmtpConnectionState,
345    smtp_handle: task::JoinHandle<()>,
346    ephemeral_handle: task::JoinHandle<()>,
347    ephemeral_interrupt_send: Sender<()>,
348    location_handle: task::JoinHandle<()>,
349    location_interrupt_send: Sender<()>,
350
351    recently_seen_loop: RecentlySeenLoop,
352}
353
354async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
355    let msg_ids = context
356        .sql
357        .query_map_vec("SELECT msg_id FROM download", (), |row| {
358            let msg_id: MsgId = row.get(0)?;
359            Ok(msg_id)
360        })
361        .await?;
362
363    for msg_id in msg_ids {
364        if let Err(err) = download_msg(context, msg_id, session).await {
365            warn!(context, "Failed to download message {msg_id}: {:#}.", err);
366
367            // Update download state to failure
368            // so it can be retried.
369            //
370            // On success update_download_state() is not needed
371            // as receive_imf() already
372            // set the state and emitted the event.
373            msg_id
374                .update_download_state(context, DownloadState::Failure)
375                .await?;
376        }
377        context
378            .sql
379            .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
380            .await?;
381    }
382
383    Ok(())
384}
385
386async fn inbox_loop(
387    ctx: Context,
388    started: oneshot::Sender<()>,
389    inbox_handlers: ImapConnectionHandlers,
390) {
391    use futures::future::FutureExt;
392
393    info!(ctx, "Starting inbox loop.");
394    let ImapConnectionHandlers {
395        mut connection,
396        stop_token,
397    } = inbox_handlers;
398
399    let ctx1 = ctx.clone();
400    let fut = async move {
401        let ctx = ctx1;
402        if let Err(()) = started.send(()) {
403            warn!(ctx, "Inbox loop, missing started receiver.");
404            return;
405        };
406
407        let mut old_session: Option<Session> = None;
408        loop {
409            let session = if let Some(session) = old_session.take() {
410                session
411            } else {
412                info!(ctx, "Preparing new IMAP session for inbox.");
413                match connection.prepare(&ctx).await {
414                    Err(err) => {
415                        warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
416                        continue;
417                    }
418                    Ok(session) => session,
419                }
420            };
421
422            match inbox_fetch_idle(&ctx, &mut connection, session).await {
423                Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
424                Ok(session) => {
425                    info!(
426                        ctx,
427                        "IMAP loop iteration for inbox finished, keeping the session."
428                    );
429                    old_session = Some(session);
430                }
431            }
432        }
433    };
434
435    stop_token
436        .cancelled()
437        .map(|_| {
438            info!(ctx, "Shutting down inbox loop.");
439        })
440        .race(fut)
441        .await;
442}
443
444/// Convert folder meaning
445/// used internally by [fetch_idle] and [Context::background_fetch].
446///
447/// Returns folder configuration key and folder name
448/// if such folder is configured, `Ok(None)` otherwise.
449pub async fn convert_folder_meaning(
450    ctx: &Context,
451    folder_meaning: FolderMeaning,
452) -> Result<Option<(Config, String)>> {
453    let folder_config = match folder_meaning.to_config() {
454        Some(c) => c,
455        None => {
456            // Such folder cannot be configured,
457            // e.g. a `FolderMeaning::Spam` folder.
458            return Ok(None);
459        }
460    };
461
462    let folder = ctx
463        .get_config(folder_config)
464        .await
465        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
466
467    if let Some(watch_folder) = folder {
468        Ok(Some((folder_config, watch_folder)))
469    } else {
470        Ok(None)
471    }
472}
473
474async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
475    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
476        ctx.set_config_internal(
477            Config::IsChatmail,
478            crate::config::from_bool(session.is_chatmail()),
479        )
480        .await?;
481    }
482
483    // Update quota no more than once a minute.
484    if ctx.quota_needs_update(session.transport_id(), 60).await
485        && let Err(err) = ctx.update_recent_quota(&mut session).await
486    {
487        warn!(ctx, "Failed to update quota: {:#}.", err);
488    }
489
490    if let Ok(()) = imap.resync_request_receiver.try_recv()
491        && let Err(err) = session.resync_folders(ctx).await
492    {
493        warn!(ctx, "Failed to resync folders: {:#}.", err);
494        imap.resync_request_sender.try_send(()).ok();
495    }
496
497    maybe_add_time_based_warnings(ctx).await;
498
499    match ctx.get_config_i64(Config::LastHousekeeping).await {
500        Ok(last_housekeeping_time) => {
501            let next_housekeeping_time =
502                last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
503            if next_housekeeping_time <= time() {
504                sql::housekeeping(ctx).await.log_err(ctx).ok();
505            }
506        }
507        Err(err) => {
508            warn!(ctx, "Failed to get last housekeeping time: {}", err);
509        }
510    };
511
512    maybe_send_stats(ctx).await.log_err(ctx).ok();
513    match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
514        Ok(fetched_existing_msgs) => {
515            if !fetched_existing_msgs {
516                // Consider it done even if we fail.
517                //
518                // This operation is not critical enough to retry,
519                // especially if the error is persistent.
520                if let Err(err) = ctx
521                    .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
522                    .await
523                {
524                    warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
525                }
526
527                if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
528                    warn!(ctx, "Failed to fetch existing messages: {:#}", err);
529                }
530            }
531        }
532        Err(err) => {
533            warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
534        }
535    }
536
537    download_msgs(ctx, &mut session)
538        .await
539        .context("Failed to download messages")?;
540    session
541        .update_metadata(ctx)
542        .await
543        .context("update_metadata")?;
544    session
545        .register_token(ctx)
546        .await
547        .context("Failed to register push token")?;
548
549    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
550    Ok(session)
551}
552
553/// Implement a single iteration of IMAP loop.
554///
555/// This function performs all IMAP operations on a single folder, selecting it if necessary and
556/// handling all the errors. In case of an error, an error is returned and connection is dropped,
557/// otherwise connection is returned.
558async fn fetch_idle(
559    ctx: &Context,
560    connection: &mut Imap,
561    mut session: Session,
562    folder_meaning: FolderMeaning,
563) -> Result<Session> {
564    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
565    else {
566        // The folder is not configured.
567        // For example, this happens if the server does not have Sent folder
568        // but watching Sent folder is enabled.
569        connection.connectivity.set_not_configured(ctx);
570        connection.idle_interrupt_receiver.recv().await.ok();
571        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
572    };
573
574    if folder_config == Config::ConfiguredInboxFolder {
575        let mvbox;
576        let syncbox = match ctx.should_move_sync_msgs().await? {
577            false => &watch_folder,
578            true => {
579                mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
580                mvbox.as_deref().unwrap_or(&watch_folder)
581            }
582        };
583        if ctx
584            .get_config(Config::ConfiguredAddr)
585            .await?
586            .unwrap_or_default()
587            == connection.addr
588        {
589            session
590                .send_sync_msgs(ctx, syncbox)
591                .await
592                .context("fetch_idle: send_sync_msgs")
593                .log_err(ctx)
594                .ok();
595        }
596
597        session
598            .store_seen_flags_on_imap(ctx)
599            .await
600            .context("store_seen_flags_on_imap")?;
601    }
602
603    if !ctx.should_delete_to_trash().await?
604        || ctx
605            .get_config(Config::ConfiguredTrashFolder)
606            .await?
607            .is_some()
608    {
609        // Fetch the watched folder.
610        connection
611            .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
612            .await
613            .context("fetch_move_delete")?;
614
615        // Mark expired messages for deletion. Marked messages will be deleted from the server
616        // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
617        // called right before `fetch_move_delete` because it is not well optimized and would
618        // otherwise slow down message fetching.
619        delete_expired_imap_messages(ctx)
620            .await
621            .context("delete_expired_imap_messages")?;
622    } else if folder_config == Config::ConfiguredInboxFolder {
623        session.last_full_folder_scan.lock().await.take();
624    }
625
626    // Scan additional folders only after finishing fetching the watched folder.
627    //
628    // On iOS the application has strictly limited time to work in background, so we may not
629    // be able to scan all folders before time is up if there are many of them.
630    if folder_config == Config::ConfiguredInboxFolder {
631        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
632        match connection
633            .scan_folders(ctx, &mut session)
634            .await
635            .context("scan_folders")
636        {
637            Err(err) => {
638                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
639                // but maybe just one folder can't be selected or something
640                warn!(ctx, "{:#}", err);
641            }
642            Ok(true) => {
643                // Fetch the watched folder again in case scanning other folder moved messages
644                // there.
645                //
646                // In most cases this will select the watched folder and return because there are
647                // no new messages. We want to select the watched folder anyway before going IDLE
648                // there, so this does not take additional protocol round-trip.
649                connection
650                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
651                    .await
652                    .context("fetch_move_delete after scan_folders")?;
653            }
654            Ok(false) => {}
655        }
656    }
657
658    // Synchronize Seen flags.
659    session
660        .sync_seen_flags(ctx, &watch_folder)
661        .await
662        .context("sync_seen_flags")
663        .log_err(ctx)
664        .ok();
665
666    connection.connectivity.set_idle(ctx);
667
668    ctx.emit_event(EventType::ImapInboxIdle);
669
670    if !session.can_idle() {
671        info!(
672            ctx,
673            "IMAP session does not support IDLE, going to fake idle."
674        );
675        connection.fake_idle(ctx, watch_folder).await?;
676        return Ok(session);
677    }
678
679    if ctx
680        .get_config_bool(Config::DisableIdle)
681        .await
682        .context("Failed to get disable_idle config")
683        .log_err(ctx)
684        .unwrap_or_default()
685    {
686        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
687        connection.fake_idle(ctx, watch_folder).await?;
688        return Ok(session);
689    }
690
691    info!(
692        ctx,
693        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
694    );
695    let session = session
696        .idle(
697            ctx,
698            connection.idle_interrupt_receiver.clone(),
699            &watch_folder,
700        )
701        .await
702        .context("idle")?;
703
704    Ok(session)
705}
706
707async fn simple_imap_loop(
708    ctx: Context,
709    started: oneshot::Sender<()>,
710    inbox_handlers: ImapConnectionHandlers,
711    folder_meaning: FolderMeaning,
712) {
713    use futures::future::FutureExt;
714
715    info!(ctx, "Starting simple loop for {folder_meaning}.");
716    let ImapConnectionHandlers {
717        mut connection,
718        stop_token,
719    } = inbox_handlers;
720
721    let ctx1 = ctx.clone();
722
723    let fut = async move {
724        let ctx = ctx1;
725        if let Err(()) = started.send(()) {
726            warn!(
727                ctx,
728                "Simple imap loop for {folder_meaning}, missing started receiver."
729            );
730            return;
731        }
732
733        let mut old_session: Option<Session> = None;
734        loop {
735            let session = if let Some(session) = old_session.take() {
736                session
737            } else {
738                info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
739                match connection.prepare(&ctx).await {
740                    Err(err) => {
741                        warn!(
742                            ctx,
743                            "Failed to prepare {folder_meaning} connection: {err:#}."
744                        );
745                        continue;
746                    }
747                    Ok(session) => session,
748                }
749            };
750
751            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
752                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
753                Ok(session) => {
754                    info!(
755                        ctx,
756                        "IMAP loop iteration for {folder_meaning} finished, keeping the session"
757                    );
758                    old_session = Some(session);
759                }
760            }
761        }
762    };
763
764    stop_token
765        .cancelled()
766        .map(|_| {
767            info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
768        })
769        .race(fut)
770        .await;
771}
772
773async fn smtp_loop(
774    ctx: Context,
775    started: oneshot::Sender<()>,
776    smtp_handlers: SmtpConnectionHandlers,
777) {
778    use futures::future::FutureExt;
779
780    info!(ctx, "Starting SMTP loop.");
781    let SmtpConnectionHandlers {
782        mut connection,
783        stop_token,
784        idle_interrupt_receiver,
785    } = smtp_handlers;
786
787    let ctx1 = ctx.clone();
788    let fut = async move {
789        let ctx = ctx1;
790        if let Err(()) = started.send(()) {
791            warn!(&ctx, "SMTP loop, missing started receiver.");
792            return;
793        }
794
795        let mut timeout = None;
796        loop {
797            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
798                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
799                timeout = Some(timeout.unwrap_or(30));
800            } else {
801                timeout = None;
802                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
803                if !duration_until_can_send.is_zero() {
804                    info!(
805                        ctx,
806                        "smtp got rate limited, waiting for {} until can send again",
807                        duration_to_str(duration_until_can_send)
808                    );
809                    tokio::time::sleep(duration_until_can_send).await;
810                    continue;
811                }
812            }
813
814            stats::maybe_update_message_stats(&ctx)
815                .await
816                .log_err(&ctx)
817                .ok();
818
819            // Fake Idle
820            info!(ctx, "SMTP fake idle started.");
821            match &connection.last_send_error {
822                None => connection.connectivity.set_idle(&ctx),
823                Some(err) => connection.connectivity.set_err(&ctx, err),
824            }
825
826            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
827            // sending is retried (at the latest) after the timeout. If sending fails
828            // again, we increase the timeout exponentially, in order not to do lots of
829            // unnecessary retries.
830            if let Some(t) = timeout {
831                let now = tools::Time::now();
832                info!(
833                    ctx,
834                    "SMTP has messages to retry, planning to retry {t} seconds later."
835                );
836                let duration = std::time::Duration::from_secs(t);
837                tokio::time::timeout(duration, async {
838                    idle_interrupt_receiver.recv().await.unwrap_or_default()
839                })
840                .await
841                .unwrap_or_default();
842                let slept = time_elapsed(&now).as_secs();
843                timeout = Some(cmp::max(
844                    t,
845                    slept.saturating_add(rand::random_range((slept / 2)..=slept)),
846                ));
847            } else {
848                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
849                idle_interrupt_receiver.recv().await.unwrap_or_default();
850            };
851
852            info!(ctx, "SMTP fake idle interrupted.")
853        }
854    };
855
856    stop_token
857        .cancelled()
858        .map(|_| {
859            info!(ctx, "Shutting down SMTP loop.");
860        })
861        .race(fut)
862        .await;
863}
864
865impl Scheduler {
866    /// Start the scheduler.
867    pub async fn start(ctx: &Context) -> Result<Self> {
868        let (smtp, smtp_handlers) = SmtpConnectionState::new();
869
870        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
871        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
872        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
873
874        let mut inboxes = Vec::new();
875        let mut oboxes = Vec::new();
876        let mut start_recvs = Vec::new();
877
878        for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
879            let (conn_state, inbox_handlers) =
880                ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
881            let (inbox_start_send, inbox_start_recv) = oneshot::channel();
882            let handle = {
883                let ctx = ctx.clone();
884                task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
885            };
886            let host = configured_login_param
887                .addr
888                .split("@")
889                .last()
890                .context("address has no host")?
891                .to_owned();
892            let inbox = SchedBox {
893                host: host.clone(),
894                meaning: FolderMeaning::Inbox,
895                conn_state,
896                handle,
897            };
898            inboxes.push(inbox);
899            start_recvs.push(inbox_start_recv);
900
901            if ctx.should_watch_mvbox().await? {
902                let (conn_state, handlers) =
903                    ImapConnectionState::new(ctx, transport_id, configured_login_param).await?;
904                let (start_send, start_recv) = oneshot::channel();
905                let ctx = ctx.clone();
906                let meaning = FolderMeaning::Mvbox;
907                let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
908                oboxes.push(SchedBox {
909                    host,
910                    meaning,
911                    conn_state,
912                    handle,
913                });
914                start_recvs.push(start_recv);
915            }
916        }
917
918        let smtp_handle = {
919            let ctx = ctx.clone();
920            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
921        };
922        start_recvs.push(smtp_start_recv);
923
924        let ephemeral_handle = {
925            let ctx = ctx.clone();
926            task::spawn(async move {
927                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
928            })
929        };
930
931        let location_handle = {
932            let ctx = ctx.clone();
933            task::spawn(async move {
934                location::location_loop(&ctx, location_interrupt_recv).await;
935            })
936        };
937
938        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
939
940        let res = Self {
941            inboxes,
942            oboxes,
943            smtp,
944            smtp_handle,
945            ephemeral_handle,
946            ephemeral_interrupt_send,
947            location_handle,
948            location_interrupt_send,
949            recently_seen_loop,
950        };
951
952        // wait for all loops to be started
953        if let Err(err) = try_join_all(start_recvs).await {
954            bail!("failed to start scheduler: {err}");
955        }
956
957        info!(ctx, "scheduler is running");
958        Ok(res)
959    }
960
961    fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
962        self.inboxes.iter().chain(self.oboxes.iter())
963    }
964
965    fn maybe_network(&self) {
966        for b in self.boxes() {
967            b.conn_state.interrupt();
968        }
969        self.interrupt_smtp();
970    }
971
972    fn maybe_network_lost(&self) {
973        for b in self.boxes() {
974            b.conn_state.interrupt();
975        }
976        self.interrupt_smtp();
977    }
978
979    fn interrupt_inbox(&self) {
980        for b in &self.inboxes {
981            b.conn_state.interrupt();
982        }
983    }
984
985    fn interrupt_oboxes(&self) {
986        for b in &self.oboxes {
987            b.conn_state.interrupt();
988        }
989    }
990
991    fn interrupt_smtp(&self) {
992        self.smtp.interrupt();
993    }
994
995    fn interrupt_ephemeral_task(&self) {
996        self.ephemeral_interrupt_send.try_send(()).ok();
997    }
998
999    fn interrupt_location(&self) {
1000        self.location_interrupt_send.try_send(()).ok();
1001    }
1002
1003    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
1004        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
1005    }
1006
1007    /// Halt the scheduler.
1008    ///
1009    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
1010    /// are forcefully terminated if they cannot shutdown within the timeout.
1011    pub(crate) async fn stop(self, context: &Context) {
1012        // Send stop signals to tasks so they can shutdown cleanly.
1013        for b in self.boxes() {
1014            b.conn_state.stop();
1015        }
1016        self.smtp.stop();
1017
1018        // Actually shutdown tasks.
1019        let timeout_duration = std::time::Duration::from_secs(30);
1020
1021        let tracker = TaskTracker::new();
1022        for b in self.inboxes.into_iter().chain(self.oboxes.into_iter()) {
1023            let context = context.clone();
1024            tracker.spawn(async move {
1025                tokio::time::timeout(timeout_duration, b.handle)
1026                    .await
1027                    .log_err(&context)
1028            });
1029        }
1030        {
1031            let context = context.clone();
1032            tracker.spawn(async move {
1033                tokio::time::timeout(timeout_duration, self.smtp_handle)
1034                    .await
1035                    .log_err(&context)
1036            });
1037        }
1038        tracker.close();
1039        tracker.wait().await;
1040
1041        // Abort tasks, then await them to ensure the `Future` is dropped.
1042        // Just aborting the task may keep resources such as `Context` clone
1043        // moved into it indefinitely, resulting in database not being
1044        // closed etc.
1045        self.ephemeral_handle.abort();
1046        self.ephemeral_handle.await.ok();
1047        self.location_handle.abort();
1048        self.location_handle.await.ok();
1049        self.recently_seen_loop.abort().await;
1050    }
1051}
1052
1053/// Connection state logic shared between imap and smtp connections.
1054#[derive(Debug)]
1055struct ConnectionState {
1056    /// Cancellation token to interrupt the whole connection.
1057    stop_token: CancellationToken,
1058    /// Channel to interrupt idle.
1059    idle_interrupt_sender: Sender<()>,
1060    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
1061    connectivity: ConnectivityStore,
1062}
1063
1064impl ConnectionState {
1065    /// Shutdown this connection completely.
1066    fn stop(&self) {
1067        // Trigger shutdown of the run loop.
1068        self.stop_token.cancel();
1069    }
1070
1071    fn interrupt(&self) {
1072        // Use try_send to avoid blocking on interrupts.
1073        self.idle_interrupt_sender.try_send(()).ok();
1074    }
1075}
1076
1077#[derive(Debug)]
1078pub(crate) struct SmtpConnectionState {
1079    state: ConnectionState,
1080}
1081
1082impl SmtpConnectionState {
1083    fn new() -> (Self, SmtpConnectionHandlers) {
1084        let stop_token = CancellationToken::new();
1085        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1086
1087        let handlers = SmtpConnectionHandlers {
1088            connection: Smtp::new(),
1089            stop_token: stop_token.clone(),
1090            idle_interrupt_receiver,
1091        };
1092
1093        let state = ConnectionState {
1094            stop_token,
1095            idle_interrupt_sender,
1096            connectivity: handlers.connection.connectivity.clone(),
1097        };
1098
1099        let conn = SmtpConnectionState { state };
1100
1101        (conn, handlers)
1102    }
1103
1104    /// Interrupt any form of idle.
1105    fn interrupt(&self) {
1106        self.state.interrupt();
1107    }
1108
1109    /// Shutdown this connection completely.
1110    fn stop(&self) {
1111        self.state.stop();
1112    }
1113}
1114
1115struct SmtpConnectionHandlers {
1116    connection: Smtp,
1117    stop_token: CancellationToken,
1118    idle_interrupt_receiver: Receiver<()>,
1119}
1120
1121#[derive(Debug)]
1122pub(crate) struct ImapConnectionState {
1123    state: ConnectionState,
1124}
1125
1126impl ImapConnectionState {
1127    /// Construct a new connection.
1128    async fn new(
1129        context: &Context,
1130        transport_id: u32,
1131        login_param: ConfiguredLoginParam,
1132    ) -> Result<(Self, ImapConnectionHandlers)> {
1133        let stop_token = CancellationToken::new();
1134        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1135
1136        let handlers = ImapConnectionHandlers {
1137            connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
1138                .await?,
1139            stop_token: stop_token.clone(),
1140        };
1141
1142        let state = ConnectionState {
1143            stop_token,
1144            idle_interrupt_sender,
1145            connectivity: handlers.connection.connectivity.clone(),
1146        };
1147
1148        let conn = ImapConnectionState { state };
1149
1150        Ok((conn, handlers))
1151    }
1152
1153    /// Interrupt any form of idle.
1154    fn interrupt(&self) {
1155        self.state.interrupt();
1156    }
1157
1158    /// Shutdown this connection completely.
1159    fn stop(&self) {
1160        self.state.stop();
1161    }
1162}
1163
1164#[derive(Debug)]
1165struct ImapConnectionHandlers {
1166    connection: Imap,
1167    stop_token: CancellationToken,
1168}