deltachat/
scheduler.rs

1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::{self, Config};
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{FolderMeaning, Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::smtp::{Smtp, send_smtp_messages};
24use crate::sql;
25use crate::stats::maybe_send_stats;
26use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
27use crate::transport::ConfiguredLoginParam;
28use crate::{constants, stats};
29
30pub(crate) mod connectivity;
31
32/// State of the IO scheduler, as stored on the [`Context`].
33///
34/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
35/// the IO scheduler will be restarted only if it was running before paused or
36/// [`Context::start_io`] was called in the meantime while it was paused.
37#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39    inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43    pub(crate) fn new() -> Self {
44        Default::default()
45    }
46
47    /// Whether the scheduler is currently running.
48    pub(crate) async fn is_running(&self) -> bool {
49        let inner = self.inner.read().await;
50        matches!(*inner, InnerSchedulerState::Started(_))
51    }
52
53    /// Starts the scheduler if it is not yet started.
54    pub(crate) async fn start(&self, context: &Context) {
55        let mut inner = self.inner.write().await;
56        match *inner {
57            InnerSchedulerState::Started(_) => (),
58            InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
59            InnerSchedulerState::Paused {
60                ref mut started, ..
61            } => *started = true,
62        }
63        context.update_connectivities(&inner);
64    }
65
66    /// Starts the scheduler if it is not yet started.
67    async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
68        info!(context, "starting IO");
69
70        // Notify message processing loop
71        // to allow processing old messages after restart.
72        context.new_msgs_notify.notify_one();
73
74        match Scheduler::start(context).await {
75            Ok(scheduler) => {
76                *inner = InnerSchedulerState::Started(scheduler);
77                context.emit_event(EventType::ConnectivityChanged);
78            }
79            Err(err) => error!(context, "Failed to start IO: {:#}", err),
80        }
81    }
82
83    /// Stops the scheduler if it is currently running.
84    pub(crate) async fn stop(&self, context: &Context) {
85        let mut inner = self.inner.write().await;
86        match *inner {
87            InnerSchedulerState::Started(_) => {
88                Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
89            }
90            InnerSchedulerState::Stopped => (),
91            InnerSchedulerState::Paused {
92                ref mut started, ..
93            } => *started = false,
94        }
95        context.update_connectivities(&inner);
96    }
97
98    /// Stops the scheduler if it is currently running.
99    async fn do_stop(
100        inner: &mut InnerSchedulerState,
101        context: &Context,
102        new_state: InnerSchedulerState,
103    ) {
104        // Sending an event wakes up event pollers (get_next_event)
105        // so the caller of stop_io() can arrange for proper termination.
106        // For this, the caller needs to instruct the event poller
107        // to terminate on receiving the next event and then call stop_io()
108        // which will emit the below event(s)
109        info!(context, "stopping IO");
110
111        // Wake up message processing loop even if there are no messages
112        // to allow for clean shutdown.
113        context.new_msgs_notify.notify_one();
114
115        let debug_logging = context
116            .debug_logging
117            .write()
118            .expect("RwLock is poisoned")
119            .take();
120        if let Some(debug_logging) = debug_logging {
121            debug_logging.loop_handle.abort();
122            debug_logging.loop_handle.await.ok();
123        }
124        let prev_state = std::mem::replace(inner, new_state);
125        context.emit_event(EventType::ConnectivityChanged);
126        match prev_state {
127            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
128            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
129        }
130    }
131
132    /// Pauses the IO scheduler.
133    ///
134    /// If it is currently running the scheduler will be stopped.  When the
135    /// [`IoPausedGuard`] is dropped the scheduler is started again.
136    ///
137    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
138    /// resume will do the right thing and restore the scheduler to the state requested by
139    /// the last call.
140    pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
141        {
142            let mut inner = self.inner.write().await;
143            match *inner {
144                InnerSchedulerState::Started(_) => {
145                    let new_state = InnerSchedulerState::Paused {
146                        started: true,
147                        pause_guards_count: NonZeroUsize::MIN,
148                    };
149                    Self::do_stop(&mut inner, context, new_state).await;
150                }
151                InnerSchedulerState::Stopped => {
152                    *inner = InnerSchedulerState::Paused {
153                        started: false,
154                        pause_guards_count: NonZeroUsize::MIN,
155                    };
156                }
157                InnerSchedulerState::Paused {
158                    ref mut pause_guards_count,
159                    ..
160                } => {
161                    *pause_guards_count = pause_guards_count
162                        .checked_add(1)
163                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
164                }
165            }
166            context.update_connectivities(&inner);
167        }
168
169        let (tx, rx) = oneshot::channel();
170        let context = context.clone();
171        tokio::spawn(async move {
172            rx.await.ok();
173            let mut inner = context.scheduler.inner.write().await;
174            match *inner {
175                InnerSchedulerState::Started(_) => {
176                    warn!(&context, "IoPausedGuard resume: started instead of paused");
177                }
178                InnerSchedulerState::Stopped => {
179                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
180                }
181                InnerSchedulerState::Paused {
182                    ref started,
183                    ref mut pause_guards_count,
184                } => {
185                    if *pause_guards_count == NonZeroUsize::MIN {
186                        match *started {
187                            true => SchedulerState::do_start(&mut inner, &context).await,
188                            false => *inner = InnerSchedulerState::Stopped,
189                        }
190                    } else {
191                        let new_count = pause_guards_count.get() - 1;
192                        // SAFETY: Value was >=2 before due to if condition
193                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
194                    }
195                }
196            }
197            context.update_connectivities(&inner);
198        });
199        Ok(IoPausedGuard { sender: Some(tx) })
200    }
201
202    /// Restarts the scheduler, only if it is running.
203    pub(crate) async fn restart(&self, context: &Context) {
204        info!(context, "restarting IO");
205        if self.is_running().await {
206            self.stop(context).await;
207            self.start(context).await;
208        }
209    }
210
211    /// Indicate that the network likely has come back.
212    pub(crate) async fn maybe_network(&self) {
213        let inner = self.inner.read().await;
214        let (inboxes, oboxes) = match *inner {
215            InnerSchedulerState::Started(ref scheduler) => {
216                scheduler.maybe_network();
217                let inboxes = scheduler
218                    .inboxes
219                    .iter()
220                    .map(|b| b.conn_state.state.connectivity.clone())
221                    .collect::<Vec<_>>();
222                let oboxes = scheduler
223                    .oboxes
224                    .iter()
225                    .map(|b| b.conn_state.state.connectivity.clone())
226                    .collect::<Vec<_>>();
227                (inboxes, oboxes)
228            }
229            _ => return,
230        };
231        drop(inner);
232        connectivity::idle_interrupted(inboxes, oboxes);
233    }
234
235    /// Indicate that the network likely is lost.
236    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
237        let inner = self.inner.read().await;
238        let stores = match *inner {
239            InnerSchedulerState::Started(ref scheduler) => {
240                scheduler.maybe_network_lost();
241                scheduler
242                    .boxes()
243                    .map(|b| b.conn_state.state.connectivity.clone())
244                    .collect()
245            }
246            _ => return,
247        };
248        drop(inner);
249        connectivity::maybe_network_lost(context, stores);
250    }
251
252    pub(crate) async fn interrupt_inbox(&self) {
253        let inner = self.inner.read().await;
254        if let InnerSchedulerState::Started(ref scheduler) = *inner {
255            scheduler.interrupt_inbox();
256        }
257    }
258
259    /// Interrupt optional boxes (mvbox currently) loops.
260    pub(crate) async fn interrupt_oboxes(&self) {
261        let inner = self.inner.read().await;
262        if let InnerSchedulerState::Started(ref scheduler) = *inner {
263            scheduler.interrupt_oboxes();
264        }
265    }
266
267    pub(crate) async fn interrupt_smtp(&self) {
268        let inner = self.inner.read().await;
269        if let InnerSchedulerState::Started(ref scheduler) = *inner {
270            scheduler.interrupt_smtp();
271        }
272    }
273
274    pub(crate) async fn interrupt_ephemeral_task(&self) {
275        let inner = self.inner.read().await;
276        if let InnerSchedulerState::Started(ref scheduler) = *inner {
277            scheduler.interrupt_ephemeral_task();
278        }
279    }
280
281    pub(crate) async fn interrupt_location(&self) {
282        let inner = self.inner.read().await;
283        if let InnerSchedulerState::Started(ref scheduler) = *inner {
284            scheduler.interrupt_location();
285        }
286    }
287
288    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
289        let inner = self.inner.read().await;
290        if let InnerSchedulerState::Started(ref scheduler) = *inner {
291            scheduler.interrupt_recently_seen(contact_id, timestamp);
292        }
293    }
294}
295
296#[derive(Debug, Default)]
297pub(crate) enum InnerSchedulerState {
298    Started(Scheduler),
299    #[default]
300    Stopped,
301    Paused {
302        started: bool,
303        pause_guards_count: NonZeroUsize,
304    },
305}
306
307/// Guard to make sure the IO Scheduler is resumed.
308///
309/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
310/// guard.
311#[derive(Default, Debug)]
312pub(crate) struct IoPausedGuard {
313    sender: Option<oneshot::Sender<()>>,
314}
315
316impl Drop for IoPausedGuard {
317    fn drop(&mut self) {
318        if let Some(sender) = self.sender.take() {
319            // Can only fail if receiver is dropped, but then we're already resumed.
320            sender.send(()).ok();
321        }
322    }
323}
324
325#[derive(Debug)]
326struct SchedBox {
327    /// Hostname of used chatmail/email relay
328    host: String,
329    meaning: FolderMeaning,
330    conn_state: ImapConnectionState,
331
332    /// IMAP loop task handle.
333    handle: task::JoinHandle<()>,
334}
335
336/// Job and connection scheduler.
337#[derive(Debug)]
338pub(crate) struct Scheduler {
339    /// Inboxes, one per transport.
340    inboxes: Vec<SchedBox>,
341    /// Optional boxes -- mvbox.
342    oboxes: Vec<SchedBox>,
343    smtp: SmtpConnectionState,
344    smtp_handle: task::JoinHandle<()>,
345    ephemeral_handle: task::JoinHandle<()>,
346    ephemeral_interrupt_send: Sender<()>,
347    location_handle: task::JoinHandle<()>,
348    location_interrupt_send: Sender<()>,
349
350    recently_seen_loop: RecentlySeenLoop,
351}
352
353async fn inbox_loop(
354    ctx: Context,
355    started: oneshot::Sender<()>,
356    inbox_handlers: ImapConnectionHandlers,
357) {
358    use futures::future::FutureExt;
359
360    info!(ctx, "Starting inbox loop.");
361    let ImapConnectionHandlers {
362        mut connection,
363        stop_token,
364    } = inbox_handlers;
365
366    let ctx1 = ctx.clone();
367    let fut = async move {
368        let ctx = ctx1;
369        if let Err(()) = started.send(()) {
370            warn!(ctx, "Inbox loop, missing started receiver.");
371            return;
372        };
373
374        let mut old_session: Option<Session> = None;
375        loop {
376            let session = if let Some(session) = old_session.take() {
377                session
378            } else {
379                info!(ctx, "Preparing new IMAP session for inbox.");
380                match connection.prepare(&ctx).await {
381                    Err(err) => {
382                        warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
383                        continue;
384                    }
385                    Ok(session) => session,
386                }
387            };
388
389            match inbox_fetch_idle(&ctx, &mut connection, session).await {
390                Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
391                Ok(session) => {
392                    info!(
393                        ctx,
394                        "IMAP loop iteration for inbox finished, keeping the session."
395                    );
396                    old_session = Some(session);
397                }
398            }
399        }
400    };
401
402    stop_token
403        .cancelled()
404        .map(|_| {
405            info!(ctx, "Shutting down inbox loop.");
406        })
407        .race(fut)
408        .await;
409}
410
411/// Convert folder meaning
412/// used internally by [fetch_idle] and [Context::background_fetch].
413///
414/// Returns folder configuration key and folder name
415/// if such folder is configured, `Ok(None)` otherwise.
416pub async fn convert_folder_meaning(
417    ctx: &Context,
418    folder_meaning: FolderMeaning,
419) -> Result<Option<(Config, String)>> {
420    let folder_config = match folder_meaning.to_config() {
421        Some(c) => c,
422        None => {
423            // Such folder cannot be configured,
424            // e.g. a `FolderMeaning::Spam` folder.
425            return Ok(None);
426        }
427    };
428
429    let folder = ctx
430        .get_config(folder_config)
431        .await
432        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
433
434    if let Some(watch_folder) = folder {
435        Ok(Some((folder_config, watch_folder)))
436    } else {
437        Ok(None)
438    }
439}
440
441async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
442    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
443        ctx.set_config_internal(
444            Config::IsChatmail,
445            crate::config::from_bool(session.is_chatmail()),
446        )
447        .await?;
448    }
449
450    // Update quota no more than once a minute.
451    if ctx.quota_needs_update(session.transport_id(), 60).await
452        && let Err(err) = ctx.update_recent_quota(&mut session).await
453    {
454        warn!(ctx, "Failed to update quota: {:#}.", err);
455    }
456
457    if let Ok(()) = imap.resync_request_receiver.try_recv()
458        && let Err(err) = session.resync_folders(ctx).await
459    {
460        warn!(ctx, "Failed to resync folders: {:#}.", err);
461        imap.resync_request_sender.try_send(()).ok();
462    }
463
464    maybe_add_time_based_warnings(ctx).await;
465
466    match ctx.get_config_i64(Config::LastHousekeeping).await {
467        Ok(last_housekeeping_time) => {
468            let next_housekeeping_time =
469                last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
470            if next_housekeeping_time <= time() {
471                sql::housekeeping(ctx).await.log_err(ctx).ok();
472            }
473        }
474        Err(err) => {
475            warn!(ctx, "Failed to get last housekeeping time: {}", err);
476        }
477    };
478
479    maybe_send_stats(ctx).await.log_err(ctx).ok();
480    match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
481        Ok(fetched_existing_msgs) => {
482            if !fetched_existing_msgs {
483                // Consider it done even if we fail.
484                //
485                // This operation is not critical enough to retry,
486                // especially if the error is persistent.
487                if let Err(err) = ctx
488                    .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
489                    .await
490                {
491                    warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
492                }
493
494                if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
495                    warn!(ctx, "Failed to fetch existing messages: {:#}", err);
496                }
497            }
498        }
499        Err(err) => {
500            warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
501        }
502    }
503
504    session
505        .update_metadata(ctx)
506        .await
507        .context("update_metadata")?;
508    session
509        .register_token(ctx)
510        .await
511        .context("Failed to register push token")?;
512
513    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
514    Ok(session)
515}
516
517/// Implement a single iteration of IMAP loop.
518///
519/// This function performs all IMAP operations on a single folder, selecting it if necessary and
520/// handling all the errors. In case of an error, an error is returned and connection is dropped,
521/// otherwise connection is returned.
522async fn fetch_idle(
523    ctx: &Context,
524    connection: &mut Imap,
525    mut session: Session,
526    folder_meaning: FolderMeaning,
527) -> Result<Session> {
528    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
529    else {
530        // The folder is not configured.
531        // For example, this happens if the server does not have Sent folder
532        // but watching Sent folder is enabled.
533        connection.connectivity.set_not_configured(ctx);
534        connection.idle_interrupt_receiver.recv().await.ok();
535        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
536    };
537
538    if folder_config == Config::ConfiguredInboxFolder {
539        session
540            .store_seen_flags_on_imap(ctx)
541            .await
542            .context("store_seen_flags_on_imap")?;
543    }
544
545    if !ctx.should_delete_to_trash().await?
546        || ctx
547            .get_config(Config::ConfiguredTrashFolder)
548            .await?
549            .is_some()
550    {
551        // Fetch the watched folder.
552        connection
553            .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
554            .await
555            .context("fetch_move_delete")?;
556
557        // Mark expired messages for deletion. Marked messages will be deleted from the server
558        // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
559        // called right before `fetch_move_delete` because it is not well optimized and would
560        // otherwise slow down message fetching.
561        delete_expired_imap_messages(ctx)
562            .await
563            .context("delete_expired_imap_messages")?;
564
565        download_known_post_messages_without_pre_message(ctx, &mut session).await?;
566        download_msgs(ctx, &mut session)
567            .await
568            .context("download_msgs")?;
569    } else if folder_config == Config::ConfiguredInboxFolder {
570        session.last_full_folder_scan.lock().await.take();
571    }
572
573    // Scan additional folders only after finishing fetching the watched folder.
574    //
575    // On iOS the application has strictly limited time to work in background, so we may not
576    // be able to scan all folders before time is up if there are many of them.
577    if folder_config == Config::ConfiguredInboxFolder {
578        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
579        match connection
580            .scan_folders(ctx, &mut session)
581            .await
582            .context("scan_folders")
583        {
584            Err(err) => {
585                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
586                // but maybe just one folder can't be selected or something
587                warn!(ctx, "{:#}", err);
588            }
589            Ok(true) => {
590                // Fetch the watched folder again in case scanning other folder moved messages
591                // there.
592                //
593                // In most cases this will select the watched folder and return because there are
594                // no new messages. We want to select the watched folder anyway before going IDLE
595                // there, so this does not take additional protocol round-trip.
596                connection
597                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
598                    .await
599                    .context("fetch_move_delete after scan_folders")?;
600            }
601            Ok(false) => {}
602        }
603    }
604
605    // Synchronize Seen flags.
606    session
607        .sync_seen_flags(ctx, &watch_folder)
608        .await
609        .context("sync_seen_flags")
610        .log_err(ctx)
611        .ok();
612
613    connection.connectivity.set_idle(ctx);
614
615    ctx.emit_event(EventType::ImapInboxIdle);
616
617    if !session.can_idle() {
618        info!(
619            ctx,
620            "IMAP session does not support IDLE, going to fake idle."
621        );
622        connection.fake_idle(ctx, watch_folder).await?;
623        return Ok(session);
624    }
625
626    if ctx
627        .get_config_bool(Config::DisableIdle)
628        .await
629        .context("Failed to get disable_idle config")
630        .log_err(ctx)
631        .unwrap_or_default()
632    {
633        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
634        connection.fake_idle(ctx, watch_folder).await?;
635        return Ok(session);
636    }
637
638    info!(
639        ctx,
640        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
641    );
642    let session = session
643        .idle(
644            ctx,
645            connection.idle_interrupt_receiver.clone(),
646            &watch_folder,
647        )
648        .await
649        .context("idle")?;
650
651    Ok(session)
652}
653
654/// Simplified IMAP loop to watch non-inbox folders.
655async fn simple_imap_loop(
656    ctx: Context,
657    started: oneshot::Sender<()>,
658    inbox_handlers: ImapConnectionHandlers,
659    folder_meaning: FolderMeaning,
660) {
661    use futures::future::FutureExt;
662
663    info!(ctx, "Starting simple loop for {folder_meaning}.");
664    let ImapConnectionHandlers {
665        mut connection,
666        stop_token,
667    } = inbox_handlers;
668
669    let ctx1 = ctx.clone();
670
671    let fut = async move {
672        let ctx = ctx1;
673        if let Err(()) = started.send(()) {
674            warn!(
675                ctx,
676                "Simple imap loop for {folder_meaning}, missing started receiver."
677            );
678            return;
679        }
680
681        let mut old_session: Option<Session> = None;
682        loop {
683            let session = if let Some(session) = old_session.take() {
684                session
685            } else {
686                info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
687                match connection.prepare(&ctx).await {
688                    Err(err) => {
689                        warn!(
690                            ctx,
691                            "Failed to prepare {folder_meaning} connection: {err:#}."
692                        );
693                        continue;
694                    }
695                    Ok(session) => session,
696                }
697            };
698
699            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
700                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
701                Ok(session) => {
702                    info!(
703                        ctx,
704                        "IMAP loop iteration for {folder_meaning} finished, keeping the session"
705                    );
706                    old_session = Some(session);
707                }
708            }
709        }
710    };
711
712    stop_token
713        .cancelled()
714        .map(|_| {
715            info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
716        })
717        .race(fut)
718        .await;
719}
720
721async fn smtp_loop(
722    ctx: Context,
723    started: oneshot::Sender<()>,
724    smtp_handlers: SmtpConnectionHandlers,
725) {
726    use futures::future::FutureExt;
727
728    info!(ctx, "Starting SMTP loop.");
729    let SmtpConnectionHandlers {
730        mut connection,
731        stop_token,
732        idle_interrupt_receiver,
733    } = smtp_handlers;
734
735    let ctx1 = ctx.clone();
736    let fut = async move {
737        let ctx = ctx1;
738        if let Err(()) = started.send(()) {
739            warn!(&ctx, "SMTP loop, missing started receiver.");
740            return;
741        }
742
743        let mut timeout = None;
744        loop {
745            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
746                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
747                timeout = Some(timeout.unwrap_or(30));
748            } else {
749                timeout = None;
750                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
751                if !duration_until_can_send.is_zero() {
752                    info!(
753                        ctx,
754                        "smtp got rate limited, waiting for {} until can send again",
755                        duration_to_str(duration_until_can_send)
756                    );
757                    tokio::time::sleep(duration_until_can_send).await;
758                    continue;
759                }
760            }
761
762            stats::maybe_update_message_stats(&ctx)
763                .await
764                .log_err(&ctx)
765                .ok();
766
767            // Fake Idle
768            info!(ctx, "SMTP fake idle started.");
769            match &connection.last_send_error {
770                None => connection.connectivity.set_idle(&ctx),
771                Some(err) => connection.connectivity.set_err(&ctx, err),
772            }
773
774            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
775            // sending is retried (at the latest) after the timeout. If sending fails
776            // again, we increase the timeout exponentially, in order not to do lots of
777            // unnecessary retries.
778            if let Some(t) = timeout {
779                let now = tools::Time::now();
780                info!(
781                    ctx,
782                    "SMTP has messages to retry, planning to retry {t} seconds later."
783                );
784                let duration = std::time::Duration::from_secs(t);
785                tokio::time::timeout(duration, async {
786                    idle_interrupt_receiver.recv().await.unwrap_or_default()
787                })
788                .await
789                .unwrap_or_default();
790                let slept = time_elapsed(&now).as_secs();
791                timeout = Some(cmp::max(
792                    t,
793                    slept.saturating_add(rand::random_range((slept / 2)..=slept)),
794                ));
795            } else {
796                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
797                idle_interrupt_receiver.recv().await.unwrap_or_default();
798            };
799
800            info!(ctx, "SMTP fake idle interrupted.")
801        }
802    };
803
804    stop_token
805        .cancelled()
806        .map(|_| {
807            info!(ctx, "Shutting down SMTP loop.");
808        })
809        .race(fut)
810        .await;
811}
812
813impl Scheduler {
814    /// Start the scheduler.
815    pub async fn start(ctx: &Context) -> Result<Self> {
816        let (smtp, smtp_handlers) = SmtpConnectionState::new();
817
818        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
819        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
820        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
821
822        let mut inboxes = Vec::new();
823        let mut oboxes = Vec::new();
824        let mut start_recvs = Vec::new();
825
826        for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
827            let (conn_state, inbox_handlers) =
828                ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
829            let (inbox_start_send, inbox_start_recv) = oneshot::channel();
830            let handle = {
831                let ctx = ctx.clone();
832                task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
833            };
834            let host = configured_login_param
835                .addr
836                .split("@")
837                .last()
838                .context("address has no host")?
839                .to_owned();
840            let inbox = SchedBox {
841                host: host.clone(),
842                meaning: FolderMeaning::Inbox,
843                conn_state,
844                handle,
845            };
846            inboxes.push(inbox);
847            start_recvs.push(inbox_start_recv);
848
849            if ctx.should_watch_mvbox().await? {
850                let (conn_state, handlers) =
851                    ImapConnectionState::new(ctx, transport_id, configured_login_param).await?;
852                let (start_send, start_recv) = oneshot::channel();
853                let ctx = ctx.clone();
854                let meaning = FolderMeaning::Mvbox;
855                let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
856                oboxes.push(SchedBox {
857                    host,
858                    meaning,
859                    conn_state,
860                    handle,
861                });
862                start_recvs.push(start_recv);
863            }
864        }
865
866        let smtp_handle = {
867            let ctx = ctx.clone();
868            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
869        };
870        start_recvs.push(smtp_start_recv);
871
872        let ephemeral_handle = {
873            let ctx = ctx.clone();
874            task::spawn(async move {
875                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
876            })
877        };
878
879        let location_handle = {
880            let ctx = ctx.clone();
881            task::spawn(async move {
882                location::location_loop(&ctx, location_interrupt_recv).await;
883            })
884        };
885
886        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
887
888        let res = Self {
889            inboxes,
890            oboxes,
891            smtp,
892            smtp_handle,
893            ephemeral_handle,
894            ephemeral_interrupt_send,
895            location_handle,
896            location_interrupt_send,
897            recently_seen_loop,
898        };
899
900        // wait for all loops to be started
901        if let Err(err) = try_join_all(start_recvs).await {
902            bail!("failed to start scheduler: {err}");
903        }
904
905        info!(ctx, "scheduler is running");
906        Ok(res)
907    }
908
909    fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
910        self.inboxes.iter().chain(self.oboxes.iter())
911    }
912
913    fn maybe_network(&self) {
914        for b in self.boxes() {
915            b.conn_state.interrupt();
916        }
917        self.interrupt_smtp();
918    }
919
920    fn maybe_network_lost(&self) {
921        for b in self.boxes() {
922            b.conn_state.interrupt();
923        }
924        self.interrupt_smtp();
925    }
926
927    fn interrupt_inbox(&self) {
928        for b in &self.inboxes {
929            b.conn_state.interrupt();
930        }
931    }
932
933    fn interrupt_oboxes(&self) {
934        for b in &self.oboxes {
935            b.conn_state.interrupt();
936        }
937    }
938
939    fn interrupt_smtp(&self) {
940        self.smtp.interrupt();
941    }
942
943    fn interrupt_ephemeral_task(&self) {
944        self.ephemeral_interrupt_send.try_send(()).ok();
945    }
946
947    fn interrupt_location(&self) {
948        self.location_interrupt_send.try_send(()).ok();
949    }
950
951    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
952        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
953    }
954
955    /// Halt the scheduler.
956    ///
957    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
958    /// are forcefully terminated if they cannot shutdown within the timeout.
959    pub(crate) async fn stop(self, context: &Context) {
960        // Send stop signals to tasks so they can shutdown cleanly.
961        for b in self.boxes() {
962            b.conn_state.stop();
963        }
964        self.smtp.stop();
965
966        // Actually shutdown tasks.
967        let timeout_duration = std::time::Duration::from_secs(30);
968
969        let tracker = TaskTracker::new();
970        for b in self.inboxes.into_iter().chain(self.oboxes.into_iter()) {
971            let context = context.clone();
972            tracker.spawn(async move {
973                tokio::time::timeout(timeout_duration, b.handle)
974                    .await
975                    .log_err(&context)
976            });
977        }
978        {
979            let context = context.clone();
980            tracker.spawn(async move {
981                tokio::time::timeout(timeout_duration, self.smtp_handle)
982                    .await
983                    .log_err(&context)
984            });
985        }
986        tracker.close();
987        tracker.wait().await;
988
989        // Abort tasks, then await them to ensure the `Future` is dropped.
990        // Just aborting the task may keep resources such as `Context` clone
991        // moved into it indefinitely, resulting in database not being
992        // closed etc.
993        self.ephemeral_handle.abort();
994        self.ephemeral_handle.await.ok();
995        self.location_handle.abort();
996        self.location_handle.await.ok();
997        self.recently_seen_loop.abort().await;
998    }
999}
1000
1001/// Connection state logic shared between imap and smtp connections.
1002#[derive(Debug)]
1003struct ConnectionState {
1004    /// Cancellation token to interrupt the whole connection.
1005    stop_token: CancellationToken,
1006    /// Channel to interrupt idle.
1007    idle_interrupt_sender: Sender<()>,
1008    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
1009    connectivity: ConnectivityStore,
1010}
1011
1012impl ConnectionState {
1013    /// Shutdown this connection completely.
1014    fn stop(&self) {
1015        // Trigger shutdown of the run loop.
1016        self.stop_token.cancel();
1017    }
1018
1019    fn interrupt(&self) {
1020        // Use try_send to avoid blocking on interrupts.
1021        self.idle_interrupt_sender.try_send(()).ok();
1022    }
1023}
1024
1025#[derive(Debug)]
1026pub(crate) struct SmtpConnectionState {
1027    state: ConnectionState,
1028}
1029
1030impl SmtpConnectionState {
1031    fn new() -> (Self, SmtpConnectionHandlers) {
1032        let stop_token = CancellationToken::new();
1033        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1034
1035        let handlers = SmtpConnectionHandlers {
1036            connection: Smtp::new(),
1037            stop_token: stop_token.clone(),
1038            idle_interrupt_receiver,
1039        };
1040
1041        let state = ConnectionState {
1042            stop_token,
1043            idle_interrupt_sender,
1044            connectivity: handlers.connection.connectivity.clone(),
1045        };
1046
1047        let conn = SmtpConnectionState { state };
1048
1049        (conn, handlers)
1050    }
1051
1052    /// Interrupt any form of idle.
1053    fn interrupt(&self) {
1054        self.state.interrupt();
1055    }
1056
1057    /// Shutdown this connection completely.
1058    fn stop(&self) {
1059        self.state.stop();
1060    }
1061}
1062
1063struct SmtpConnectionHandlers {
1064    connection: Smtp,
1065    stop_token: CancellationToken,
1066    idle_interrupt_receiver: Receiver<()>,
1067}
1068
1069#[derive(Debug)]
1070pub(crate) struct ImapConnectionState {
1071    state: ConnectionState,
1072}
1073
1074impl ImapConnectionState {
1075    /// Construct a new connection.
1076    async fn new(
1077        context: &Context,
1078        transport_id: u32,
1079        login_param: ConfiguredLoginParam,
1080    ) -> Result<(Self, ImapConnectionHandlers)> {
1081        let stop_token = CancellationToken::new();
1082        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1083
1084        let handlers = ImapConnectionHandlers {
1085            connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
1086                .await?,
1087            stop_token: stop_token.clone(),
1088        };
1089
1090        let state = ConnectionState {
1091            stop_token,
1092            idle_interrupt_sender,
1093            connectivity: handlers.connection.connectivity.clone(),
1094        };
1095
1096        let conn = ImapConnectionState { state };
1097
1098        Ok((conn, handlers))
1099    }
1100
1101    /// Interrupt any form of idle.
1102    fn interrupt(&self) {
1103        self.state.interrupt();
1104    }
1105
1106    /// Shutdown this connection completely.
1107    fn stop(&self) {
1108        self.state.stop();
1109    }
1110}
1111
1112#[derive(Debug)]
1113struct ImapConnectionHandlers {
1114    connection: Imap,
1115    stop_token: CancellationToken,
1116}