deltachat/
scheduler.rs

1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{bail, Context as _, Error, Result};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
12use tokio::task;
13
14use self::connectivity::ConnectivityStore;
15use crate::config::{self, Config};
16use crate::contact::{ContactId, RecentlySeenLoop};
17use crate::context::Context;
18use crate::download::{download_msg, DownloadState};
19use crate::ephemeral::{self, delete_expired_imap_messages};
20use crate::events::EventType;
21use crate::imap::{session::Session, FolderMeaning, Imap};
22use crate::location;
23use crate::log::LogExt;
24use crate::message::MsgId;
25use crate::smtp::{send_smtp_messages, Smtp};
26use crate::sql;
27use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
28
29pub(crate) mod connectivity;
30
31/// State of the IO scheduler, as stored on the [`Context`].
32///
33/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
34/// the IO scheduler will be restarted only if it was running before paused or
35/// [`Context::start_io`] was called in the meantime while it was paused.
36#[derive(Debug, Default)]
37pub(crate) struct SchedulerState {
38    inner: RwLock<InnerSchedulerState>,
39}
40
41impl SchedulerState {
42    pub(crate) fn new() -> Self {
43        Default::default()
44    }
45
46    /// Whether the scheduler is currently running.
47    pub(crate) async fn is_running(&self) -> bool {
48        let inner = self.inner.read().await;
49        matches!(*inner, InnerSchedulerState::Started(_))
50    }
51
52    /// Starts the scheduler if it is not yet started.
53    pub(crate) async fn start(&self, context: Context) {
54        let mut inner = self.inner.write().await;
55        match *inner {
56            InnerSchedulerState::Started(_) => (),
57            InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
58            InnerSchedulerState::Paused {
59                ref mut started, ..
60            } => *started = true,
61        }
62    }
63
64    /// Starts the scheduler if it is not yet started.
65    async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
66        info!(context, "starting IO");
67
68        // Notify message processing loop
69        // to allow processing old messages after restart.
70        context.new_msgs_notify.notify_one();
71
72        let ctx = context.clone();
73        match Scheduler::start(&context).await {
74            Ok(scheduler) => {
75                *inner = InnerSchedulerState::Started(scheduler);
76                context.emit_event(EventType::ConnectivityChanged);
77            }
78            Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
79        }
80    }
81
82    /// Stops the scheduler if it is currently running.
83    pub(crate) async fn stop(&self, context: &Context) {
84        let mut inner = self.inner.write().await;
85        match *inner {
86            InnerSchedulerState::Started(_) => {
87                Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
88            }
89            InnerSchedulerState::Stopped => (),
90            InnerSchedulerState::Paused {
91                ref mut started, ..
92            } => *started = false,
93        }
94    }
95
96    /// Stops the scheduler if it is currently running.
97    async fn do_stop(
98        mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
99        context: &Context,
100        new_state: InnerSchedulerState,
101    ) {
102        // Sending an event wakes up event pollers (get_next_event)
103        // so the caller of stop_io() can arrange for proper termination.
104        // For this, the caller needs to instruct the event poller
105        // to terminate on receiving the next event and then call stop_io()
106        // which will emit the below event(s)
107        info!(context, "stopping IO");
108
109        // Wake up message processing loop even if there are no messages
110        // to allow for clean shutdown.
111        context.new_msgs_notify.notify_one();
112
113        let debug_logging = context
114            .debug_logging
115            .write()
116            .expect("RwLock is poisoned")
117            .take();
118        if let Some(debug_logging) = debug_logging {
119            debug_logging.loop_handle.abort();
120            debug_logging.loop_handle.await.ok();
121        }
122        let prev_state = std::mem::replace(&mut *inner, new_state);
123        context.emit_event(EventType::ConnectivityChanged);
124        match prev_state {
125            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
126            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
127        }
128    }
129
130    /// Pauses the IO scheduler.
131    ///
132    /// If it is currently running the scheduler will be stopped.  When the
133    /// [`IoPausedGuard`] is dropped the scheduler is started again.
134    ///
135    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
136    /// resume will do the right thing and restore the scheduler to the state requested by
137    /// the last call.
138    pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
139        {
140            let mut inner = self.inner.write().await;
141            match *inner {
142                InnerSchedulerState::Started(_) => {
143                    let new_state = InnerSchedulerState::Paused {
144                        started: true,
145                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
146                    };
147                    Self::do_stop(inner, &context, new_state).await;
148                }
149                InnerSchedulerState::Stopped => {
150                    *inner = InnerSchedulerState::Paused {
151                        started: false,
152                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
153                    };
154                }
155                InnerSchedulerState::Paused {
156                    ref mut pause_guards_count,
157                    ..
158                } => {
159                    *pause_guards_count = pause_guards_count
160                        .checked_add(1)
161                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
162                }
163            }
164        }
165
166        let (tx, rx) = oneshot::channel();
167        tokio::spawn(async move {
168            rx.await.ok();
169            let mut inner = context.scheduler.inner.write().await;
170            match *inner {
171                InnerSchedulerState::Started(_) => {
172                    warn!(&context, "IoPausedGuard resume: started instead of paused");
173                }
174                InnerSchedulerState::Stopped => {
175                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
176                }
177                InnerSchedulerState::Paused {
178                    ref started,
179                    ref mut pause_guards_count,
180                } => {
181                    if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
182                        match *started {
183                            true => SchedulerState::do_start(inner, context.clone()).await,
184                            false => *inner = InnerSchedulerState::Stopped,
185                        }
186                    } else {
187                        let new_count = pause_guards_count.get() - 1;
188                        // SAFETY: Value was >=2 before due to if condition
189                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
190                    }
191                }
192            }
193        });
194        Ok(IoPausedGuard { sender: Some(tx) })
195    }
196
197    /// Restarts the scheduler, only if it is running.
198    pub(crate) async fn restart(&self, context: &Context) {
199        info!(context, "restarting IO");
200        if self.is_running().await {
201            self.stop(context).await;
202            self.start(context.clone()).await;
203        }
204    }
205
206    /// Indicate that the network likely has come back.
207    pub(crate) async fn maybe_network(&self) {
208        let inner = self.inner.read().await;
209        let (inbox, oboxes) = match *inner {
210            InnerSchedulerState::Started(ref scheduler) => {
211                scheduler.maybe_network();
212                let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
213                let oboxes = scheduler
214                    .oboxes
215                    .iter()
216                    .map(|b| b.conn_state.state.connectivity.clone())
217                    .collect::<Vec<_>>();
218                (inbox, oboxes)
219            }
220            _ => return,
221        };
222        drop(inner);
223        connectivity::idle_interrupted(inbox, oboxes).await;
224    }
225
226    /// Indicate that the network likely is lost.
227    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
228        let inner = self.inner.read().await;
229        let stores = match *inner {
230            InnerSchedulerState::Started(ref scheduler) => {
231                scheduler.maybe_network_lost();
232                scheduler
233                    .boxes()
234                    .map(|b| b.conn_state.state.connectivity.clone())
235                    .collect()
236            }
237            _ => return,
238        };
239        drop(inner);
240        connectivity::maybe_network_lost(context, stores).await;
241    }
242
243    pub(crate) async fn interrupt_inbox(&self) {
244        let inner = self.inner.read().await;
245        if let InnerSchedulerState::Started(ref scheduler) = *inner {
246            scheduler.interrupt_inbox();
247        }
248    }
249
250    /// Interrupt optional boxes (mvbox, sentbox) loops.
251    pub(crate) async fn interrupt_oboxes(&self) {
252        let inner = self.inner.read().await;
253        if let InnerSchedulerState::Started(ref scheduler) = *inner {
254            scheduler.interrupt_oboxes();
255        }
256    }
257
258    pub(crate) async fn interrupt_smtp(&self) {
259        let inner = self.inner.read().await;
260        if let InnerSchedulerState::Started(ref scheduler) = *inner {
261            scheduler.interrupt_smtp();
262        }
263    }
264
265    pub(crate) async fn interrupt_ephemeral_task(&self) {
266        let inner = self.inner.read().await;
267        if let InnerSchedulerState::Started(ref scheduler) = *inner {
268            scheduler.interrupt_ephemeral_task();
269        }
270    }
271
272    pub(crate) async fn interrupt_location(&self) {
273        let inner = self.inner.read().await;
274        if let InnerSchedulerState::Started(ref scheduler) = *inner {
275            scheduler.interrupt_location();
276        }
277    }
278
279    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
280        let inner = self.inner.read().await;
281        if let InnerSchedulerState::Started(ref scheduler) = *inner {
282            scheduler.interrupt_recently_seen(contact_id, timestamp);
283        }
284    }
285}
286
287#[derive(Debug, Default)]
288enum InnerSchedulerState {
289    Started(Scheduler),
290    #[default]
291    Stopped,
292    Paused {
293        started: bool,
294        pause_guards_count: NonZeroUsize,
295    },
296}
297
298/// Guard to make sure the IO Scheduler is resumed.
299///
300/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
301/// guard.
302#[derive(Default, Debug)]
303pub(crate) struct IoPausedGuard {
304    sender: Option<oneshot::Sender<()>>,
305}
306
307impl Drop for IoPausedGuard {
308    fn drop(&mut self) {
309        if let Some(sender) = self.sender.take() {
310            // Can only fail if receiver is dropped, but then we're already resumed.
311            sender.send(()).ok();
312        }
313    }
314}
315
316#[derive(Debug)]
317struct SchedBox {
318    meaning: FolderMeaning,
319    conn_state: ImapConnectionState,
320
321    /// IMAP loop task handle.
322    handle: task::JoinHandle<()>,
323}
324
325/// Job and connection scheduler.
326#[derive(Debug)]
327pub(crate) struct Scheduler {
328    inbox: SchedBox,
329    /// Optional boxes -- mvbox, sentbox.
330    oboxes: Vec<SchedBox>,
331    smtp: SmtpConnectionState,
332    smtp_handle: task::JoinHandle<()>,
333    ephemeral_handle: task::JoinHandle<()>,
334    ephemeral_interrupt_send: Sender<()>,
335    location_handle: task::JoinHandle<()>,
336    location_interrupt_send: Sender<()>,
337
338    recently_seen_loop: RecentlySeenLoop,
339}
340
341async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
342    let msg_ids = context
343        .sql
344        .query_map(
345            "SELECT msg_id FROM download",
346            (),
347            |row| {
348                let msg_id: MsgId = row.get(0)?;
349                Ok(msg_id)
350            },
351            |rowids| {
352                rowids
353                    .collect::<std::result::Result<Vec<_>, _>>()
354                    .map_err(Into::into)
355            },
356        )
357        .await?;
358
359    for msg_id in msg_ids {
360        if let Err(err) = download_msg(context, msg_id, session).await {
361            warn!(context, "Failed to download message {msg_id}: {:#}.", err);
362
363            // Update download state to failure
364            // so it can be retried.
365            //
366            // On success update_download_state() is not needed
367            // as receive_imf() already
368            // set the state and emitted the event.
369            msg_id
370                .update_download_state(context, DownloadState::Failure)
371                .await?;
372        }
373        context
374            .sql
375            .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
376            .await?;
377    }
378
379    Ok(())
380}
381
382async fn inbox_loop(
383    ctx: Context,
384    started: oneshot::Sender<()>,
385    inbox_handlers: ImapConnectionHandlers,
386) {
387    use futures::future::FutureExt;
388
389    info!(ctx, "starting inbox loop");
390    let ImapConnectionHandlers {
391        mut connection,
392        stop_receiver,
393    } = inbox_handlers;
394
395    let ctx1 = ctx.clone();
396    let fut = async move {
397        let ctx = ctx1;
398        if let Err(()) = started.send(()) {
399            warn!(ctx, "inbox loop, missing started receiver");
400            return;
401        };
402
403        let mut old_session: Option<Session> = None;
404        loop {
405            let session = if let Some(session) = old_session.take() {
406                session
407            } else {
408                match connection.prepare(&ctx).await {
409                    Err(err) => {
410                        warn!(ctx, "Failed to prepare INBOX connection: {:#}.", err);
411                        continue;
412                    }
413                    Ok(session) => session,
414                }
415            };
416
417            match inbox_fetch_idle(&ctx, &mut connection, session).await {
418                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
419                Ok(session) => {
420                    old_session = Some(session);
421                }
422            }
423        }
424    };
425
426    stop_receiver
427        .recv()
428        .map(|_| {
429            info!(ctx, "shutting down inbox loop");
430        })
431        .race(fut)
432        .await;
433}
434
435/// Convert folder meaning
436/// used internally by [fetch_idle] and [Context::background_fetch].
437///
438/// Returns folder configuration key and folder name
439/// if such folder is configured, `Ok(None)` otherwise.
440pub async fn convert_folder_meaning(
441    ctx: &Context,
442    folder_meaning: FolderMeaning,
443) -> Result<Option<(Config, String)>> {
444    let folder_config = match folder_meaning.to_config() {
445        Some(c) => c,
446        None => {
447            // Such folder cannot be configured,
448            // e.g. a `FolderMeaning::Spam` folder.
449            return Ok(None);
450        }
451    };
452
453    let folder = ctx
454        .get_config(folder_config)
455        .await
456        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
457
458    if let Some(watch_folder) = folder {
459        Ok(Some((folder_config, watch_folder)))
460    } else {
461        Ok(None)
462    }
463}
464
465async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
466    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
467        ctx.set_config_internal(
468            Config::IsChatmail,
469            crate::config::from_bool(session.is_chatmail()),
470        )
471        .await?;
472    }
473
474    // Update quota no more than once a minute.
475    if ctx.quota_needs_update(60).await {
476        if let Err(err) = ctx.update_recent_quota(&mut session).await {
477            warn!(ctx, "Failed to update quota: {:#}.", err);
478        }
479    }
480
481    let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
482    if resync_requested {
483        if let Err(err) = session.resync_folders(ctx).await {
484            warn!(ctx, "Failed to resync folders: {:#}.", err);
485            ctx.resync_request.store(true, Ordering::Relaxed);
486        }
487    }
488
489    maybe_add_time_based_warnings(ctx).await;
490
491    match ctx.get_config_i64(Config::LastHousekeeping).await {
492        Ok(last_housekeeping_time) => {
493            let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24);
494            if next_housekeeping_time <= time() {
495                sql::housekeeping(ctx).await.log_err(ctx).ok();
496            }
497        }
498        Err(err) => {
499            warn!(ctx, "Failed to get last housekeeping time: {}", err);
500        }
501    };
502
503    match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
504        Ok(fetched_existing_msgs) => {
505            if !fetched_existing_msgs {
506                // Consider it done even if we fail.
507                //
508                // This operation is not critical enough to retry,
509                // especially if the error is persistent.
510                if let Err(err) = ctx
511                    .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
512                    .await
513                {
514                    warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
515                }
516
517                if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
518                    warn!(ctx, "Failed to fetch existing messages: {:#}", err);
519                }
520            }
521        }
522        Err(err) => {
523            warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
524        }
525    }
526
527    download_msgs(ctx, &mut session)
528        .await
529        .context("Failed to download messages")?;
530    session
531        .fetch_metadata(ctx)
532        .await
533        .context("Failed to fetch metadata")?;
534    session
535        .register_token(ctx)
536        .await
537        .context("Failed to register push token")?;
538
539    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
540    Ok(session)
541}
542
543/// Implement a single iteration of IMAP loop.
544///
545/// This function performs all IMAP operations on a single folder, selecting it if necessary and
546/// handling all the errors. In case of an error, an error is returned and connection is dropped,
547/// otherwise connection is returned.
548async fn fetch_idle(
549    ctx: &Context,
550    connection: &mut Imap,
551    mut session: Session,
552    folder_meaning: FolderMeaning,
553) -> Result<Session> {
554    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
555    else {
556        // The folder is not configured.
557        // For example, this happens if the server does not have Sent folder
558        // but watching Sent folder is enabled.
559        connection.connectivity.set_not_configured(ctx).await;
560        connection.idle_interrupt_receiver.recv().await.ok();
561        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
562    };
563
564    if folder_config == Config::ConfiguredInboxFolder {
565        let mvbox;
566        let syncbox = match ctx.should_move_sync_msgs().await? {
567            false => &watch_folder,
568            true => {
569                mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
570                mvbox.as_deref().unwrap_or(&watch_folder)
571            }
572        };
573        session
574            .send_sync_msgs(ctx, syncbox)
575            .await
576            .context("fetch_idle: send_sync_msgs")
577            .log_err(ctx)
578            .ok();
579
580        session
581            .store_seen_flags_on_imap(ctx)
582            .await
583            .context("store_seen_flags_on_imap")?;
584    }
585
586    if !ctx.should_delete_to_trash().await?
587        || ctx
588            .get_config(Config::ConfiguredTrashFolder)
589            .await?
590            .is_some()
591    {
592        // Fetch the watched folder.
593        connection
594            .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
595            .await
596            .context("fetch_move_delete")?;
597
598        // Mark expired messages for deletion. Marked messages will be deleted from the server
599        // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
600        // called right before `fetch_move_delete` because it is not well optimized and would
601        // otherwise slow down message fetching.
602        delete_expired_imap_messages(ctx)
603            .await
604            .context("delete_expired_imap_messages")?;
605    } else if folder_config == Config::ConfiguredInboxFolder {
606        ctx.last_full_folder_scan.lock().await.take();
607    }
608
609    // Scan additional folders only after finishing fetching the watched folder.
610    //
611    // On iOS the application has strictly limited time to work in background, so we may not
612    // be able to scan all folders before time is up if there are many of them.
613    if folder_config == Config::ConfiguredInboxFolder {
614        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
615        match connection
616            .scan_folders(ctx, &mut session)
617            .await
618            .context("scan_folders")
619        {
620            Err(err) => {
621                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
622                // but maybe just one folder can't be selected or something
623                warn!(ctx, "{:#}", err);
624            }
625            Ok(true) => {
626                // Fetch the watched folder again in case scanning other folder moved messages
627                // there.
628                //
629                // In most cases this will select the watched folder and return because there are
630                // no new messages. We want to select the watched folder anyway before going IDLE
631                // there, so this does not take additional protocol round-trip.
632                connection
633                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
634                    .await
635                    .context("fetch_move_delete after scan_folders")?;
636            }
637            Ok(false) => {}
638        }
639    }
640
641    // Synchronize Seen flags.
642    session
643        .sync_seen_flags(ctx, &watch_folder)
644        .await
645        .context("sync_seen_flags")
646        .log_err(ctx)
647        .ok();
648
649    connection.connectivity.set_idle(ctx).await;
650
651    ctx.emit_event(EventType::ImapInboxIdle);
652
653    if !session.can_idle() {
654        info!(
655            ctx,
656            "IMAP session does not support IDLE, going to fake idle."
657        );
658        connection.fake_idle(ctx, watch_folder).await?;
659        return Ok(session);
660    }
661
662    if ctx
663        .get_config_bool(Config::DisableIdle)
664        .await
665        .context("Failed to get disable_idle config")
666        .log_err(ctx)
667        .unwrap_or_default()
668    {
669        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
670        connection.fake_idle(ctx, watch_folder).await?;
671        return Ok(session);
672    }
673
674    info!(
675        ctx,
676        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
677    );
678    let session = session
679        .idle(
680            ctx,
681            connection.idle_interrupt_receiver.clone(),
682            &watch_folder,
683        )
684        .await
685        .context("idle")?;
686
687    Ok(session)
688}
689
690async fn simple_imap_loop(
691    ctx: Context,
692    started: oneshot::Sender<()>,
693    inbox_handlers: ImapConnectionHandlers,
694    folder_meaning: FolderMeaning,
695) {
696    use futures::future::FutureExt;
697
698    info!(ctx, "starting simple loop for {}", folder_meaning);
699    let ImapConnectionHandlers {
700        mut connection,
701        stop_receiver,
702    } = inbox_handlers;
703
704    let ctx1 = ctx.clone();
705
706    let fut = async move {
707        let ctx = ctx1;
708        if let Err(()) = started.send(()) {
709            warn!(&ctx, "simple imap loop, missing started receiver");
710            return;
711        }
712
713        let mut old_session: Option<Session> = None;
714        loop {
715            let session = if let Some(session) = old_session.take() {
716                session
717            } else {
718                match connection.prepare(&ctx).await {
719                    Err(err) => {
720                        warn!(
721                            ctx,
722                            "Failed to prepare {folder_meaning} connection: {err:#}."
723                        );
724                        continue;
725                    }
726                    Ok(session) => session,
727                }
728            };
729
730            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
731                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
732                Ok(session) => {
733                    old_session = Some(session);
734                }
735            }
736        }
737    };
738
739    stop_receiver
740        .recv()
741        .map(|_| {
742            info!(ctx, "shutting down simple loop");
743        })
744        .race(fut)
745        .await;
746}
747
748async fn smtp_loop(
749    ctx: Context,
750    started: oneshot::Sender<()>,
751    smtp_handlers: SmtpConnectionHandlers,
752) {
753    use futures::future::FutureExt;
754
755    info!(ctx, "Starting SMTP loop.");
756    let SmtpConnectionHandlers {
757        mut connection,
758        stop_receiver,
759        idle_interrupt_receiver,
760    } = smtp_handlers;
761
762    let ctx1 = ctx.clone();
763    let fut = async move {
764        let ctx = ctx1;
765        if let Err(()) = started.send(()) {
766            warn!(&ctx, "SMTP loop, missing started receiver.");
767            return;
768        }
769
770        let mut timeout = None;
771        loop {
772            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
773                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
774                timeout = Some(timeout.unwrap_or(30));
775            } else {
776                timeout = None;
777                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
778                if !duration_until_can_send.is_zero() {
779                    info!(
780                        ctx,
781                        "smtp got rate limited, waiting for {} until can send again",
782                        duration_to_str(duration_until_can_send)
783                    );
784                    tokio::time::sleep(duration_until_can_send).await;
785                    continue;
786                }
787            }
788
789            // Fake Idle
790            info!(ctx, "SMTP fake idle started.");
791            match &connection.last_send_error {
792                None => connection.connectivity.set_idle(&ctx).await,
793                Some(err) => connection.connectivity.set_err(&ctx, err).await,
794            }
795
796            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
797            // sending is retried (at the latest) after the timeout. If sending fails
798            // again, we increase the timeout exponentially, in order not to do lots of
799            // unnecessary retries.
800            if let Some(t) = timeout {
801                let now = tools::Time::now();
802                info!(
803                    ctx,
804                    "SMTP has messages to retry, planning to retry {t} seconds later."
805                );
806                let duration = std::time::Duration::from_secs(t);
807                tokio::time::timeout(duration, async {
808                    idle_interrupt_receiver.recv().await.unwrap_or_default()
809                })
810                .await
811                .unwrap_or_default();
812                let slept = time_elapsed(&now).as_secs();
813                timeout = Some(cmp::max(
814                    t,
815                    slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
816                ));
817            } else {
818                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
819                idle_interrupt_receiver.recv().await.unwrap_or_default();
820            };
821
822            info!(ctx, "SMTP fake idle interrupted.")
823        }
824    };
825
826    stop_receiver
827        .recv()
828        .map(|_| {
829            info!(ctx, "Shutting down SMTP loop.");
830        })
831        .race(fut)
832        .await;
833}
834
835impl Scheduler {
836    /// Start the scheduler.
837    pub async fn start(ctx: &Context) -> Result<Self> {
838        let (smtp, smtp_handlers) = SmtpConnectionState::new();
839
840        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
841        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
842        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
843
844        let mut oboxes = Vec::new();
845        let mut start_recvs = Vec::new();
846
847        let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
848        let (inbox_start_send, inbox_start_recv) = oneshot::channel();
849        let handle = {
850            let ctx = ctx.clone();
851            task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
852        };
853        let inbox = SchedBox {
854            meaning: FolderMeaning::Inbox,
855            conn_state,
856            handle,
857        };
858        start_recvs.push(inbox_start_recv);
859
860        for (meaning, should_watch) in [
861            (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
862            (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
863        ] {
864            if should_watch? {
865                let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
866                let (start_send, start_recv) = oneshot::channel();
867                let ctx = ctx.clone();
868                let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
869                oboxes.push(SchedBox {
870                    meaning,
871                    conn_state,
872                    handle,
873                });
874                start_recvs.push(start_recv);
875            }
876        }
877
878        let smtp_handle = {
879            let ctx = ctx.clone();
880            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
881        };
882        start_recvs.push(smtp_start_recv);
883
884        let ephemeral_handle = {
885            let ctx = ctx.clone();
886            task::spawn(async move {
887                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
888            })
889        };
890
891        let location_handle = {
892            let ctx = ctx.clone();
893            task::spawn(async move {
894                location::location_loop(&ctx, location_interrupt_recv).await;
895            })
896        };
897
898        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
899
900        let res = Self {
901            inbox,
902            oboxes,
903            smtp,
904            smtp_handle,
905            ephemeral_handle,
906            ephemeral_interrupt_send,
907            location_handle,
908            location_interrupt_send,
909            recently_seen_loop,
910        };
911
912        // wait for all loops to be started
913        if let Err(err) = try_join_all(start_recvs).await {
914            bail!("failed to start scheduler: {}", err);
915        }
916
917        info!(ctx, "scheduler is running");
918        Ok(res)
919    }
920
921    fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
922        once(&self.inbox).chain(self.oboxes.iter())
923    }
924
925    fn maybe_network(&self) {
926        for b in self.boxes() {
927            b.conn_state.interrupt();
928        }
929        self.interrupt_smtp();
930    }
931
932    fn maybe_network_lost(&self) {
933        for b in self.boxes() {
934            b.conn_state.interrupt();
935        }
936        self.interrupt_smtp();
937    }
938
939    fn interrupt_inbox(&self) {
940        self.inbox.conn_state.interrupt();
941    }
942
943    fn interrupt_oboxes(&self) {
944        for b in &self.oboxes {
945            b.conn_state.interrupt();
946        }
947    }
948
949    fn interrupt_smtp(&self) {
950        self.smtp.interrupt();
951    }
952
953    fn interrupt_ephemeral_task(&self) {
954        self.ephemeral_interrupt_send.try_send(()).ok();
955    }
956
957    fn interrupt_location(&self) {
958        self.location_interrupt_send.try_send(()).ok();
959    }
960
961    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
962        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
963    }
964
965    /// Halt the scheduler.
966    ///
967    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
968    /// are forcefully terminated if they cannot shutdown within the timeout.
969    pub(crate) async fn stop(self, context: &Context) {
970        // Send stop signals to tasks so they can shutdown cleanly.
971        for b in self.boxes() {
972            b.conn_state.stop().await.log_err(context).ok();
973        }
974        self.smtp.stop().await.log_err(context).ok();
975
976        // Actually shutdown tasks.
977        let timeout_duration = std::time::Duration::from_secs(30);
978        for b in once(self.inbox).chain(self.oboxes) {
979            tokio::time::timeout(timeout_duration, b.handle)
980                .await
981                .log_err(context)
982                .ok();
983        }
984        tokio::time::timeout(timeout_duration, self.smtp_handle)
985            .await
986            .log_err(context)
987            .ok();
988
989        // Abort tasks, then await them to ensure the `Future` is dropped.
990        // Just aborting the task may keep resources such as `Context` clone
991        // moved into it indefinitely, resulting in database not being
992        // closed etc.
993        self.ephemeral_handle.abort();
994        self.ephemeral_handle.await.ok();
995        self.location_handle.abort();
996        self.location_handle.await.ok();
997        self.recently_seen_loop.abort().await;
998    }
999}
1000
1001/// Connection state logic shared between imap and smtp connections.
1002#[derive(Debug)]
1003struct ConnectionState {
1004    /// Channel to interrupt the whole connection.
1005    stop_sender: Sender<()>,
1006    /// Channel to interrupt idle.
1007    idle_interrupt_sender: Sender<()>,
1008    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
1009    connectivity: ConnectivityStore,
1010}
1011
1012impl ConnectionState {
1013    /// Shutdown this connection completely.
1014    async fn stop(&self) -> Result<()> {
1015        // Trigger shutdown of the run loop.
1016        self.stop_sender
1017            .send(())
1018            .await
1019            .context("failed to stop, missing receiver")?;
1020        Ok(())
1021    }
1022
1023    fn interrupt(&self) {
1024        // Use try_send to avoid blocking on interrupts.
1025        self.idle_interrupt_sender.try_send(()).ok();
1026    }
1027}
1028
1029#[derive(Debug)]
1030pub(crate) struct SmtpConnectionState {
1031    state: ConnectionState,
1032}
1033
1034impl SmtpConnectionState {
1035    fn new() -> (Self, SmtpConnectionHandlers) {
1036        let (stop_sender, stop_receiver) = channel::bounded(1);
1037        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1038
1039        let handlers = SmtpConnectionHandlers {
1040            connection: Smtp::new(),
1041            stop_receiver,
1042            idle_interrupt_receiver,
1043        };
1044
1045        let state = ConnectionState {
1046            stop_sender,
1047            idle_interrupt_sender,
1048            connectivity: handlers.connection.connectivity.clone(),
1049        };
1050
1051        let conn = SmtpConnectionState { state };
1052
1053        (conn, handlers)
1054    }
1055
1056    /// Interrupt any form of idle.
1057    fn interrupt(&self) {
1058        self.state.interrupt();
1059    }
1060
1061    /// Shutdown this connection completely.
1062    async fn stop(&self) -> Result<()> {
1063        self.state.stop().await?;
1064        Ok(())
1065    }
1066}
1067
1068struct SmtpConnectionHandlers {
1069    connection: Smtp,
1070    stop_receiver: Receiver<()>,
1071    idle_interrupt_receiver: Receiver<()>,
1072}
1073
1074#[derive(Debug)]
1075pub(crate) struct ImapConnectionState {
1076    state: ConnectionState,
1077}
1078
1079impl ImapConnectionState {
1080    /// Construct a new connection.
1081    async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1082        let (stop_sender, stop_receiver) = channel::bounded(1);
1083        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1084
1085        let handlers = ImapConnectionHandlers {
1086            connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1087            stop_receiver,
1088        };
1089
1090        let state = ConnectionState {
1091            stop_sender,
1092            idle_interrupt_sender,
1093            connectivity: handlers.connection.connectivity.clone(),
1094        };
1095
1096        let conn = ImapConnectionState { state };
1097
1098        Ok((conn, handlers))
1099    }
1100
1101    /// Interrupt any form of idle.
1102    fn interrupt(&self) {
1103        self.state.interrupt();
1104    }
1105
1106    /// Shutdown this connection completely.
1107    async fn stop(&self) -> Result<()> {
1108        self.state.stop().await?;
1109        Ok(())
1110    }
1111}
1112
1113#[derive(Debug)]
1114struct ImapConnectionHandlers {
1115    connection: Imap,
1116    stop_receiver: Receiver<()>,
1117}