deltachat/
scheduler.rs

1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{bail, Context as _, Error, Result};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
12use tokio::task;
13
14use self::connectivity::ConnectivityStore;
15use crate::config::{self, Config};
16use crate::contact::{ContactId, RecentlySeenLoop};
17use crate::context::Context;
18use crate::download::{download_msg, DownloadState};
19use crate::ephemeral::{self, delete_expired_imap_messages};
20use crate::events::EventType;
21use crate::imap::{session::Session, FolderMeaning, Imap};
22use crate::location;
23use crate::log::LogExt;
24use crate::message::MsgId;
25use crate::smtp::{send_smtp_messages, Smtp};
26use crate::sql;
27use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
28
29pub(crate) mod connectivity;
30
31/// State of the IO scheduler, as stored on the [`Context`].
32///
33/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
34/// the IO scheduler will be restarted only if it was running before paused or
35/// [`Context::start_io`] was called in the meantime while it was paused.
36#[derive(Debug, Default)]
37pub(crate) struct SchedulerState {
38    inner: RwLock<InnerSchedulerState>,
39}
40
41impl SchedulerState {
42    pub(crate) fn new() -> Self {
43        Default::default()
44    }
45
46    /// Whether the scheduler is currently running.
47    pub(crate) async fn is_running(&self) -> bool {
48        let inner = self.inner.read().await;
49        matches!(*inner, InnerSchedulerState::Started(_))
50    }
51
52    /// Starts the scheduler if it is not yet started.
53    pub(crate) async fn start(&self, context: Context) {
54        let mut inner = self.inner.write().await;
55        match *inner {
56            InnerSchedulerState::Started(_) => (),
57            InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
58            InnerSchedulerState::Paused {
59                ref mut started, ..
60            } => *started = true,
61        }
62    }
63
64    /// Starts the scheduler if it is not yet started.
65    async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
66        info!(context, "starting IO");
67
68        // Notify message processing loop
69        // to allow processing old messages after restart.
70        context.new_msgs_notify.notify_one();
71
72        let ctx = context.clone();
73        match Scheduler::start(&context).await {
74            Ok(scheduler) => {
75                *inner = InnerSchedulerState::Started(scheduler);
76                context.emit_event(EventType::ConnectivityChanged);
77            }
78            Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
79        }
80    }
81
82    /// Stops the scheduler if it is currently running.
83    pub(crate) async fn stop(&self, context: &Context) {
84        let mut inner = self.inner.write().await;
85        match *inner {
86            InnerSchedulerState::Started(_) => {
87                Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
88            }
89            InnerSchedulerState::Stopped => (),
90            InnerSchedulerState::Paused {
91                ref mut started, ..
92            } => *started = false,
93        }
94    }
95
96    /// Stops the scheduler if it is currently running.
97    async fn do_stop(
98        mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
99        context: &Context,
100        new_state: InnerSchedulerState,
101    ) {
102        // Sending an event wakes up event pollers (get_next_event)
103        // so the caller of stop_io() can arrange for proper termination.
104        // For this, the caller needs to instruct the event poller
105        // to terminate on receiving the next event and then call stop_io()
106        // which will emit the below event(s)
107        info!(context, "stopping IO");
108
109        // Wake up message processing loop even if there are no messages
110        // to allow for clean shutdown.
111        context.new_msgs_notify.notify_one();
112
113        let debug_logging = context
114            .debug_logging
115            .write()
116            .expect("RwLock is poisoned")
117            .take();
118        if let Some(debug_logging) = debug_logging {
119            debug_logging.loop_handle.abort();
120            debug_logging.loop_handle.await.ok();
121        }
122        let prev_state = std::mem::replace(&mut *inner, new_state);
123        context.emit_event(EventType::ConnectivityChanged);
124        match prev_state {
125            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
126            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
127        }
128    }
129
130    /// Pauses the IO scheduler.
131    ///
132    /// If it is currently running the scheduler will be stopped.  When the
133    /// [`IoPausedGuard`] is dropped the scheduler is started again.
134    ///
135    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
136    /// resume will do the right thing and restore the scheduler to the state requested by
137    /// the last call.
138    pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
139        {
140            let mut inner = self.inner.write().await;
141            match *inner {
142                InnerSchedulerState::Started(_) => {
143                    let new_state = InnerSchedulerState::Paused {
144                        started: true,
145                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
146                    };
147                    Self::do_stop(inner, &context, new_state).await;
148                }
149                InnerSchedulerState::Stopped => {
150                    *inner = InnerSchedulerState::Paused {
151                        started: false,
152                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
153                    };
154                }
155                InnerSchedulerState::Paused {
156                    ref mut pause_guards_count,
157                    ..
158                } => {
159                    *pause_guards_count = pause_guards_count
160                        .checked_add(1)
161                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
162                }
163            }
164        }
165
166        let (tx, rx) = oneshot::channel();
167        tokio::spawn(async move {
168            rx.await.ok();
169            let mut inner = context.scheduler.inner.write().await;
170            match *inner {
171                InnerSchedulerState::Started(_) => {
172                    warn!(&context, "IoPausedGuard resume: started instead of paused");
173                }
174                InnerSchedulerState::Stopped => {
175                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
176                }
177                InnerSchedulerState::Paused {
178                    ref started,
179                    ref mut pause_guards_count,
180                } => {
181                    if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
182                        match *started {
183                            true => SchedulerState::do_start(inner, context.clone()).await,
184                            false => *inner = InnerSchedulerState::Stopped,
185                        }
186                    } else {
187                        let new_count = pause_guards_count.get() - 1;
188                        // SAFETY: Value was >=2 before due to if condition
189                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
190                    }
191                }
192            }
193        });
194        Ok(IoPausedGuard { sender: Some(tx) })
195    }
196
197    /// Restarts the scheduler, only if it is running.
198    pub(crate) async fn restart(&self, context: &Context) {
199        info!(context, "restarting IO");
200        if self.is_running().await {
201            self.stop(context).await;
202            self.start(context.clone()).await;
203        }
204    }
205
206    /// Indicate that the network likely has come back.
207    pub(crate) async fn maybe_network(&self) {
208        let inner = self.inner.read().await;
209        let (inbox, oboxes) = match *inner {
210            InnerSchedulerState::Started(ref scheduler) => {
211                scheduler.maybe_network();
212                let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
213                let oboxes = scheduler
214                    .oboxes
215                    .iter()
216                    .map(|b| b.conn_state.state.connectivity.clone())
217                    .collect::<Vec<_>>();
218                (inbox, oboxes)
219            }
220            _ => return,
221        };
222        drop(inner);
223        connectivity::idle_interrupted(inbox, oboxes).await;
224    }
225
226    /// Indicate that the network likely is lost.
227    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
228        let inner = self.inner.read().await;
229        let stores = match *inner {
230            InnerSchedulerState::Started(ref scheduler) => {
231                scheduler.maybe_network_lost();
232                scheduler
233                    .boxes()
234                    .map(|b| b.conn_state.state.connectivity.clone())
235                    .collect()
236            }
237            _ => return,
238        };
239        drop(inner);
240        connectivity::maybe_network_lost(context, stores).await;
241    }
242
243    pub(crate) async fn interrupt_inbox(&self) {
244        let inner = self.inner.read().await;
245        if let InnerSchedulerState::Started(ref scheduler) = *inner {
246            scheduler.interrupt_inbox();
247        }
248    }
249
250    /// Interrupt optional boxes (mvbox, sentbox) loops.
251    pub(crate) async fn interrupt_oboxes(&self) {
252        let inner = self.inner.read().await;
253        if let InnerSchedulerState::Started(ref scheduler) = *inner {
254            scheduler.interrupt_oboxes();
255        }
256    }
257
258    pub(crate) async fn interrupt_smtp(&self) {
259        let inner = self.inner.read().await;
260        if let InnerSchedulerState::Started(ref scheduler) = *inner {
261            scheduler.interrupt_smtp();
262        }
263    }
264
265    pub(crate) async fn interrupt_ephemeral_task(&self) {
266        let inner = self.inner.read().await;
267        if let InnerSchedulerState::Started(ref scheduler) = *inner {
268            scheduler.interrupt_ephemeral_task();
269        }
270    }
271
272    pub(crate) async fn interrupt_location(&self) {
273        let inner = self.inner.read().await;
274        if let InnerSchedulerState::Started(ref scheduler) = *inner {
275            scheduler.interrupt_location();
276        }
277    }
278
279    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
280        let inner = self.inner.read().await;
281        if let InnerSchedulerState::Started(ref scheduler) = *inner {
282            scheduler.interrupt_recently_seen(contact_id, timestamp);
283        }
284    }
285}
286
287#[derive(Debug, Default)]
288enum InnerSchedulerState {
289    Started(Scheduler),
290    #[default]
291    Stopped,
292    Paused {
293        started: bool,
294        pause_guards_count: NonZeroUsize,
295    },
296}
297
298/// Guard to make sure the IO Scheduler is resumed.
299///
300/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
301/// guard.
302#[derive(Default, Debug)]
303pub(crate) struct IoPausedGuard {
304    sender: Option<oneshot::Sender<()>>,
305}
306
307impl Drop for IoPausedGuard {
308    fn drop(&mut self) {
309        if let Some(sender) = self.sender.take() {
310            // Can only fail if receiver is dropped, but then we're already resumed.
311            sender.send(()).ok();
312        }
313    }
314}
315
316#[derive(Debug)]
317struct SchedBox {
318    meaning: FolderMeaning,
319    conn_state: ImapConnectionState,
320
321    /// IMAP loop task handle.
322    handle: task::JoinHandle<()>,
323}
324
325/// Job and connection scheduler.
326#[derive(Debug)]
327pub(crate) struct Scheduler {
328    inbox: SchedBox,
329    /// Optional boxes -- mvbox, sentbox.
330    oboxes: Vec<SchedBox>,
331    smtp: SmtpConnectionState,
332    smtp_handle: task::JoinHandle<()>,
333    ephemeral_handle: task::JoinHandle<()>,
334    ephemeral_interrupt_send: Sender<()>,
335    location_handle: task::JoinHandle<()>,
336    location_interrupt_send: Sender<()>,
337
338    recently_seen_loop: RecentlySeenLoop,
339}
340
341async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
342    let msg_ids = context
343        .sql
344        .query_map(
345            "SELECT msg_id FROM download",
346            (),
347            |row| {
348                let msg_id: MsgId = row.get(0)?;
349                Ok(msg_id)
350            },
351            |rowids| {
352                rowids
353                    .collect::<std::result::Result<Vec<_>, _>>()
354                    .map_err(Into::into)
355            },
356        )
357        .await?;
358
359    for msg_id in msg_ids {
360        if let Err(err) = download_msg(context, msg_id, session).await {
361            warn!(context, "Failed to download message {msg_id}: {:#}.", err);
362
363            // Update download state to failure
364            // so it can be retried.
365            //
366            // On success update_download_state() is not needed
367            // as receive_imf() already
368            // set the state and emitted the event.
369            msg_id
370                .update_download_state(context, DownloadState::Failure)
371                .await?;
372        }
373        context
374            .sql
375            .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
376            .await?;
377    }
378
379    Ok(())
380}
381
382async fn inbox_loop(
383    ctx: Context,
384    started: oneshot::Sender<()>,
385    inbox_handlers: ImapConnectionHandlers,
386) {
387    use futures::future::FutureExt;
388
389    info!(ctx, "Starting inbox loop.");
390    let ImapConnectionHandlers {
391        mut connection,
392        stop_receiver,
393    } = inbox_handlers;
394
395    let ctx1 = ctx.clone();
396    let fut = async move {
397        let ctx = ctx1;
398        if let Err(()) = started.send(()) {
399            warn!(ctx, "Inbox loop, missing started receiver.");
400            return;
401        };
402
403        let mut old_session: Option<Session> = None;
404        loop {
405            let session = if let Some(session) = old_session.take() {
406                session
407            } else {
408                info!(ctx, "Preparing new IMAP session for inbox.");
409                match connection.prepare(&ctx).await {
410                    Err(err) => {
411                        warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
412                        continue;
413                    }
414                    Ok(session) => session,
415                }
416            };
417
418            match inbox_fetch_idle(&ctx, &mut connection, session).await {
419                Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
420                Ok(session) => {
421                    info!(
422                        ctx,
423                        "IMAP loop iteration for inbox finished, keeping the session."
424                    );
425                    old_session = Some(session);
426                }
427            }
428        }
429    };
430
431    stop_receiver
432        .recv()
433        .map(|_| {
434            info!(ctx, "Shutting down inbox loop.");
435        })
436        .race(fut)
437        .await;
438}
439
440/// Convert folder meaning
441/// used internally by [fetch_idle] and [Context::background_fetch].
442///
443/// Returns folder configuration key and folder name
444/// if such folder is configured, `Ok(None)` otherwise.
445pub async fn convert_folder_meaning(
446    ctx: &Context,
447    folder_meaning: FolderMeaning,
448) -> Result<Option<(Config, String)>> {
449    let folder_config = match folder_meaning.to_config() {
450        Some(c) => c,
451        None => {
452            // Such folder cannot be configured,
453            // e.g. a `FolderMeaning::Spam` folder.
454            return Ok(None);
455        }
456    };
457
458    let folder = ctx
459        .get_config(folder_config)
460        .await
461        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
462
463    if let Some(watch_folder) = folder {
464        Ok(Some((folder_config, watch_folder)))
465    } else {
466        Ok(None)
467    }
468}
469
470async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
471    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
472        ctx.set_config_internal(
473            Config::IsChatmail,
474            crate::config::from_bool(session.is_chatmail()),
475        )
476        .await?;
477    }
478
479    // Update quota no more than once a minute.
480    if ctx.quota_needs_update(60).await {
481        if let Err(err) = ctx.update_recent_quota(&mut session).await {
482            warn!(ctx, "Failed to update quota: {:#}.", err);
483        }
484    }
485
486    let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
487    if resync_requested {
488        if let Err(err) = session.resync_folders(ctx).await {
489            warn!(ctx, "Failed to resync folders: {:#}.", err);
490            ctx.resync_request.store(true, Ordering::Relaxed);
491        }
492    }
493
494    maybe_add_time_based_warnings(ctx).await;
495
496    match ctx.get_config_i64(Config::LastHousekeeping).await {
497        Ok(last_housekeeping_time) => {
498            let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24);
499            if next_housekeeping_time <= time() {
500                sql::housekeeping(ctx).await.log_err(ctx).ok();
501            }
502        }
503        Err(err) => {
504            warn!(ctx, "Failed to get last housekeeping time: {}", err);
505        }
506    };
507
508    match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
509        Ok(fetched_existing_msgs) => {
510            if !fetched_existing_msgs {
511                // Consider it done even if we fail.
512                //
513                // This operation is not critical enough to retry,
514                // especially if the error is persistent.
515                if let Err(err) = ctx
516                    .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
517                    .await
518                {
519                    warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
520                }
521
522                if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
523                    warn!(ctx, "Failed to fetch existing messages: {:#}", err);
524                }
525            }
526        }
527        Err(err) => {
528            warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
529        }
530    }
531
532    download_msgs(ctx, &mut session)
533        .await
534        .context("Failed to download messages")?;
535    session
536        .fetch_metadata(ctx)
537        .await
538        .context("Failed to fetch metadata")?;
539    session
540        .register_token(ctx)
541        .await
542        .context("Failed to register push token")?;
543
544    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
545    Ok(session)
546}
547
548/// Implement a single iteration of IMAP loop.
549///
550/// This function performs all IMAP operations on a single folder, selecting it if necessary and
551/// handling all the errors. In case of an error, an error is returned and connection is dropped,
552/// otherwise connection is returned.
553async fn fetch_idle(
554    ctx: &Context,
555    connection: &mut Imap,
556    mut session: Session,
557    folder_meaning: FolderMeaning,
558) -> Result<Session> {
559    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
560    else {
561        // The folder is not configured.
562        // For example, this happens if the server does not have Sent folder
563        // but watching Sent folder is enabled.
564        connection.connectivity.set_not_configured(ctx).await;
565        connection.idle_interrupt_receiver.recv().await.ok();
566        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
567    };
568
569    if folder_config == Config::ConfiguredInboxFolder {
570        let mvbox;
571        let syncbox = match ctx.should_move_sync_msgs().await? {
572            false => &watch_folder,
573            true => {
574                mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
575                mvbox.as_deref().unwrap_or(&watch_folder)
576            }
577        };
578        session
579            .send_sync_msgs(ctx, syncbox)
580            .await
581            .context("fetch_idle: send_sync_msgs")
582            .log_err(ctx)
583            .ok();
584
585        session
586            .store_seen_flags_on_imap(ctx)
587            .await
588            .context("store_seen_flags_on_imap")?;
589    }
590
591    if !ctx.should_delete_to_trash().await?
592        || ctx
593            .get_config(Config::ConfiguredTrashFolder)
594            .await?
595            .is_some()
596    {
597        // Fetch the watched folder.
598        connection
599            .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
600            .await
601            .context("fetch_move_delete")?;
602
603        // Mark expired messages for deletion. Marked messages will be deleted from the server
604        // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
605        // called right before `fetch_move_delete` because it is not well optimized and would
606        // otherwise slow down message fetching.
607        delete_expired_imap_messages(ctx)
608            .await
609            .context("delete_expired_imap_messages")?;
610    } else if folder_config == Config::ConfiguredInboxFolder {
611        ctx.last_full_folder_scan.lock().await.take();
612    }
613
614    // Scan additional folders only after finishing fetching the watched folder.
615    //
616    // On iOS the application has strictly limited time to work in background, so we may not
617    // be able to scan all folders before time is up if there are many of them.
618    if folder_config == Config::ConfiguredInboxFolder {
619        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
620        match connection
621            .scan_folders(ctx, &mut session)
622            .await
623            .context("scan_folders")
624        {
625            Err(err) => {
626                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
627                // but maybe just one folder can't be selected or something
628                warn!(ctx, "{:#}", err);
629            }
630            Ok(true) => {
631                // Fetch the watched folder again in case scanning other folder moved messages
632                // there.
633                //
634                // In most cases this will select the watched folder and return because there are
635                // no new messages. We want to select the watched folder anyway before going IDLE
636                // there, so this does not take additional protocol round-trip.
637                connection
638                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
639                    .await
640                    .context("fetch_move_delete after scan_folders")?;
641            }
642            Ok(false) => {}
643        }
644    }
645
646    // Synchronize Seen flags.
647    session
648        .sync_seen_flags(ctx, &watch_folder)
649        .await
650        .context("sync_seen_flags")
651        .log_err(ctx)
652        .ok();
653
654    connection.connectivity.set_idle(ctx).await;
655
656    ctx.emit_event(EventType::ImapInboxIdle);
657
658    if !session.can_idle() {
659        info!(
660            ctx,
661            "IMAP session does not support IDLE, going to fake idle."
662        );
663        connection.fake_idle(ctx, watch_folder).await?;
664        return Ok(session);
665    }
666
667    if ctx
668        .get_config_bool(Config::DisableIdle)
669        .await
670        .context("Failed to get disable_idle config")
671        .log_err(ctx)
672        .unwrap_or_default()
673    {
674        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
675        connection.fake_idle(ctx, watch_folder).await?;
676        return Ok(session);
677    }
678
679    info!(
680        ctx,
681        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
682    );
683    let session = session
684        .idle(
685            ctx,
686            connection.idle_interrupt_receiver.clone(),
687            &watch_folder,
688        )
689        .await
690        .context("idle")?;
691
692    Ok(session)
693}
694
695async fn simple_imap_loop(
696    ctx: Context,
697    started: oneshot::Sender<()>,
698    inbox_handlers: ImapConnectionHandlers,
699    folder_meaning: FolderMeaning,
700) {
701    use futures::future::FutureExt;
702
703    info!(ctx, "Starting simple loop for {folder_meaning}.");
704    let ImapConnectionHandlers {
705        mut connection,
706        stop_receiver,
707    } = inbox_handlers;
708
709    let ctx1 = ctx.clone();
710
711    let fut = async move {
712        let ctx = ctx1;
713        if let Err(()) = started.send(()) {
714            warn!(
715                ctx,
716                "Simple imap loop for {folder_meaning}, missing started receiver."
717            );
718            return;
719        }
720
721        let mut old_session: Option<Session> = None;
722        loop {
723            let session = if let Some(session) = old_session.take() {
724                session
725            } else {
726                info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
727                match connection.prepare(&ctx).await {
728                    Err(err) => {
729                        warn!(
730                            ctx,
731                            "Failed to prepare {folder_meaning} connection: {err:#}."
732                        );
733                        continue;
734                    }
735                    Ok(session) => session,
736                }
737            };
738
739            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
740                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
741                Ok(session) => {
742                    info!(
743                        ctx,
744                        "IMAP loop iteration for {folder_meaning} finished, keeping the session"
745                    );
746                    old_session = Some(session);
747                }
748            }
749        }
750    };
751
752    stop_receiver
753        .recv()
754        .map(|_| {
755            info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
756        })
757        .race(fut)
758        .await;
759}
760
761async fn smtp_loop(
762    ctx: Context,
763    started: oneshot::Sender<()>,
764    smtp_handlers: SmtpConnectionHandlers,
765) {
766    use futures::future::FutureExt;
767
768    info!(ctx, "Starting SMTP loop.");
769    let SmtpConnectionHandlers {
770        mut connection,
771        stop_receiver,
772        idle_interrupt_receiver,
773    } = smtp_handlers;
774
775    let ctx1 = ctx.clone();
776    let fut = async move {
777        let ctx = ctx1;
778        if let Err(()) = started.send(()) {
779            warn!(&ctx, "SMTP loop, missing started receiver.");
780            return;
781        }
782
783        let mut timeout = None;
784        loop {
785            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
786                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
787                timeout = Some(timeout.unwrap_or(30));
788            } else {
789                timeout = None;
790                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
791                if !duration_until_can_send.is_zero() {
792                    info!(
793                        ctx,
794                        "smtp got rate limited, waiting for {} until can send again",
795                        duration_to_str(duration_until_can_send)
796                    );
797                    tokio::time::sleep(duration_until_can_send).await;
798                    continue;
799                }
800            }
801
802            // Fake Idle
803            info!(ctx, "SMTP fake idle started.");
804            match &connection.last_send_error {
805                None => connection.connectivity.set_idle(&ctx).await,
806                Some(err) => connection.connectivity.set_err(&ctx, err).await,
807            }
808
809            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
810            // sending is retried (at the latest) after the timeout. If sending fails
811            // again, we increase the timeout exponentially, in order not to do lots of
812            // unnecessary retries.
813            if let Some(t) = timeout {
814                let now = tools::Time::now();
815                info!(
816                    ctx,
817                    "SMTP has messages to retry, planning to retry {t} seconds later."
818                );
819                let duration = std::time::Duration::from_secs(t);
820                tokio::time::timeout(duration, async {
821                    idle_interrupt_receiver.recv().await.unwrap_or_default()
822                })
823                .await
824                .unwrap_or_default();
825                let slept = time_elapsed(&now).as_secs();
826                timeout = Some(cmp::max(
827                    t,
828                    slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
829                ));
830            } else {
831                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
832                idle_interrupt_receiver.recv().await.unwrap_or_default();
833            };
834
835            info!(ctx, "SMTP fake idle interrupted.")
836        }
837    };
838
839    stop_receiver
840        .recv()
841        .map(|_| {
842            info!(ctx, "Shutting down SMTP loop.");
843        })
844        .race(fut)
845        .await;
846}
847
848impl Scheduler {
849    /// Start the scheduler.
850    pub async fn start(ctx: &Context) -> Result<Self> {
851        let (smtp, smtp_handlers) = SmtpConnectionState::new();
852
853        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
854        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
855        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
856
857        let mut oboxes = Vec::new();
858        let mut start_recvs = Vec::new();
859
860        let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
861        let (inbox_start_send, inbox_start_recv) = oneshot::channel();
862        let handle = {
863            let ctx = ctx.clone();
864            task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
865        };
866        let inbox = SchedBox {
867            meaning: FolderMeaning::Inbox,
868            conn_state,
869            handle,
870        };
871        start_recvs.push(inbox_start_recv);
872
873        for (meaning, should_watch) in [
874            (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
875            (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
876        ] {
877            if should_watch? {
878                let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
879                let (start_send, start_recv) = oneshot::channel();
880                let ctx = ctx.clone();
881                let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
882                oboxes.push(SchedBox {
883                    meaning,
884                    conn_state,
885                    handle,
886                });
887                start_recvs.push(start_recv);
888            }
889        }
890
891        let smtp_handle = {
892            let ctx = ctx.clone();
893            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
894        };
895        start_recvs.push(smtp_start_recv);
896
897        let ephemeral_handle = {
898            let ctx = ctx.clone();
899            task::spawn(async move {
900                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
901            })
902        };
903
904        let location_handle = {
905            let ctx = ctx.clone();
906            task::spawn(async move {
907                location::location_loop(&ctx, location_interrupt_recv).await;
908            })
909        };
910
911        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
912
913        let res = Self {
914            inbox,
915            oboxes,
916            smtp,
917            smtp_handle,
918            ephemeral_handle,
919            ephemeral_interrupt_send,
920            location_handle,
921            location_interrupt_send,
922            recently_seen_loop,
923        };
924
925        // wait for all loops to be started
926        if let Err(err) = try_join_all(start_recvs).await {
927            bail!("failed to start scheduler: {}", err);
928        }
929
930        info!(ctx, "scheduler is running");
931        Ok(res)
932    }
933
934    fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
935        once(&self.inbox).chain(self.oboxes.iter())
936    }
937
938    fn maybe_network(&self) {
939        for b in self.boxes() {
940            b.conn_state.interrupt();
941        }
942        self.interrupt_smtp();
943    }
944
945    fn maybe_network_lost(&self) {
946        for b in self.boxes() {
947            b.conn_state.interrupt();
948        }
949        self.interrupt_smtp();
950    }
951
952    fn interrupt_inbox(&self) {
953        self.inbox.conn_state.interrupt();
954    }
955
956    fn interrupt_oboxes(&self) {
957        for b in &self.oboxes {
958            b.conn_state.interrupt();
959        }
960    }
961
962    fn interrupt_smtp(&self) {
963        self.smtp.interrupt();
964    }
965
966    fn interrupt_ephemeral_task(&self) {
967        self.ephemeral_interrupt_send.try_send(()).ok();
968    }
969
970    fn interrupt_location(&self) {
971        self.location_interrupt_send.try_send(()).ok();
972    }
973
974    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
975        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
976    }
977
978    /// Halt the scheduler.
979    ///
980    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
981    /// are forcefully terminated if they cannot shutdown within the timeout.
982    pub(crate) async fn stop(self, context: &Context) {
983        // Send stop signals to tasks so they can shutdown cleanly.
984        for b in self.boxes() {
985            b.conn_state.stop().await.log_err(context).ok();
986        }
987        self.smtp.stop().await.log_err(context).ok();
988
989        // Actually shutdown tasks.
990        let timeout_duration = std::time::Duration::from_secs(30);
991        for b in once(self.inbox).chain(self.oboxes) {
992            tokio::time::timeout(timeout_duration, b.handle)
993                .await
994                .log_err(context)
995                .ok();
996        }
997        tokio::time::timeout(timeout_duration, self.smtp_handle)
998            .await
999            .log_err(context)
1000            .ok();
1001
1002        // Abort tasks, then await them to ensure the `Future` is dropped.
1003        // Just aborting the task may keep resources such as `Context` clone
1004        // moved into it indefinitely, resulting in database not being
1005        // closed etc.
1006        self.ephemeral_handle.abort();
1007        self.ephemeral_handle.await.ok();
1008        self.location_handle.abort();
1009        self.location_handle.await.ok();
1010        self.recently_seen_loop.abort().await;
1011    }
1012}
1013
1014/// Connection state logic shared between imap and smtp connections.
1015#[derive(Debug)]
1016struct ConnectionState {
1017    /// Channel to interrupt the whole connection.
1018    stop_sender: Sender<()>,
1019    /// Channel to interrupt idle.
1020    idle_interrupt_sender: Sender<()>,
1021    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
1022    connectivity: ConnectivityStore,
1023}
1024
1025impl ConnectionState {
1026    /// Shutdown this connection completely.
1027    async fn stop(&self) -> Result<()> {
1028        // Trigger shutdown of the run loop.
1029        self.stop_sender
1030            .send(())
1031            .await
1032            .context("failed to stop, missing receiver")?;
1033        Ok(())
1034    }
1035
1036    fn interrupt(&self) {
1037        // Use try_send to avoid blocking on interrupts.
1038        self.idle_interrupt_sender.try_send(()).ok();
1039    }
1040}
1041
1042#[derive(Debug)]
1043pub(crate) struct SmtpConnectionState {
1044    state: ConnectionState,
1045}
1046
1047impl SmtpConnectionState {
1048    fn new() -> (Self, SmtpConnectionHandlers) {
1049        let (stop_sender, stop_receiver) = channel::bounded(1);
1050        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1051
1052        let handlers = SmtpConnectionHandlers {
1053            connection: Smtp::new(),
1054            stop_receiver,
1055            idle_interrupt_receiver,
1056        };
1057
1058        let state = ConnectionState {
1059            stop_sender,
1060            idle_interrupt_sender,
1061            connectivity: handlers.connection.connectivity.clone(),
1062        };
1063
1064        let conn = SmtpConnectionState { state };
1065
1066        (conn, handlers)
1067    }
1068
1069    /// Interrupt any form of idle.
1070    fn interrupt(&self) {
1071        self.state.interrupt();
1072    }
1073
1074    /// Shutdown this connection completely.
1075    async fn stop(&self) -> Result<()> {
1076        self.state.stop().await?;
1077        Ok(())
1078    }
1079}
1080
1081struct SmtpConnectionHandlers {
1082    connection: Smtp,
1083    stop_receiver: Receiver<()>,
1084    idle_interrupt_receiver: Receiver<()>,
1085}
1086
1087#[derive(Debug)]
1088pub(crate) struct ImapConnectionState {
1089    state: ConnectionState,
1090}
1091
1092impl ImapConnectionState {
1093    /// Construct a new connection.
1094    async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1095        let (stop_sender, stop_receiver) = channel::bounded(1);
1096        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1097
1098        let handlers = ImapConnectionHandlers {
1099            connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1100            stop_receiver,
1101        };
1102
1103        let state = ConnectionState {
1104            stop_sender,
1105            idle_interrupt_sender,
1106            connectivity: handlers.connection.connectivity.clone(),
1107        };
1108
1109        let conn = ImapConnectionState { state };
1110
1111        Ok((conn, handlers))
1112    }
1113
1114    /// Interrupt any form of idle.
1115    fn interrupt(&self) {
1116        self.state.interrupt();
1117    }
1118
1119    /// Shutdown this connection completely.
1120    async fn stop(&self) -> Result<()> {
1121        self.state.stop().await?;
1122        Ok(())
1123    }
1124}
1125
1126#[derive(Debug)]
1127struct ImapConnectionHandlers {
1128    connection: Imap,
1129    stop_receiver: Receiver<()>,
1130}