deltachat/
scheduler.rs

1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{bail, Context as _, Error, Result};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
12use tokio::task;
13use tokio_util::sync::CancellationToken;
14
15use self::connectivity::ConnectivityStore;
16use crate::config::{self, Config};
17use crate::contact::{ContactId, RecentlySeenLoop};
18use crate::context::Context;
19use crate::download::{download_msg, DownloadState};
20use crate::ephemeral::{self, delete_expired_imap_messages};
21use crate::events::EventType;
22use crate::imap::{session::Session, FolderMeaning, Imap};
23use crate::location;
24use crate::log::{error, info, warn, LogExt};
25use crate::message::MsgId;
26use crate::smtp::{send_smtp_messages, Smtp};
27use crate::sql;
28use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
29
30pub(crate) mod connectivity;
31
32/// State of the IO scheduler, as stored on the [`Context`].
33///
34/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
35/// the IO scheduler will be restarted only if it was running before paused or
36/// [`Context::start_io`] was called in the meantime while it was paused.
37#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39    inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43    pub(crate) fn new() -> Self {
44        Default::default()
45    }
46
47    /// Whether the scheduler is currently running.
48    pub(crate) async fn is_running(&self) -> bool {
49        let inner = self.inner.read().await;
50        matches!(*inner, InnerSchedulerState::Started(_))
51    }
52
53    /// Starts the scheduler if it is not yet started.
54    pub(crate) async fn start(&self, context: Context) {
55        let mut inner = self.inner.write().await;
56        match *inner {
57            InnerSchedulerState::Started(_) => (),
58            InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
59            InnerSchedulerState::Paused {
60                ref mut started, ..
61            } => *started = true,
62        }
63    }
64
65    /// Starts the scheduler if it is not yet started.
66    async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
67        info!(context, "starting IO");
68
69        // Notify message processing loop
70        // to allow processing old messages after restart.
71        context.new_msgs_notify.notify_one();
72
73        let ctx = context.clone();
74        match Scheduler::start(&context).await {
75            Ok(scheduler) => {
76                *inner = InnerSchedulerState::Started(scheduler);
77                context.emit_event(EventType::ConnectivityChanged);
78            }
79            Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
80        }
81    }
82
83    /// Stops the scheduler if it is currently running.
84    pub(crate) async fn stop(&self, context: &Context) {
85        let mut inner = self.inner.write().await;
86        match *inner {
87            InnerSchedulerState::Started(_) => {
88                Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
89            }
90            InnerSchedulerState::Stopped => (),
91            InnerSchedulerState::Paused {
92                ref mut started, ..
93            } => *started = false,
94        }
95    }
96
97    /// Stops the scheduler if it is currently running.
98    async fn do_stop(
99        mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
100        context: &Context,
101        new_state: InnerSchedulerState,
102    ) {
103        // Sending an event wakes up event pollers (get_next_event)
104        // so the caller of stop_io() can arrange for proper termination.
105        // For this, the caller needs to instruct the event poller
106        // to terminate on receiving the next event and then call stop_io()
107        // which will emit the below event(s)
108        info!(context, "stopping IO");
109
110        // Wake up message processing loop even if there are no messages
111        // to allow for clean shutdown.
112        context.new_msgs_notify.notify_one();
113
114        let debug_logging = context
115            .debug_logging
116            .write()
117            .expect("RwLock is poisoned")
118            .take();
119        if let Some(debug_logging) = debug_logging {
120            debug_logging.loop_handle.abort();
121            debug_logging.loop_handle.await.ok();
122        }
123        let prev_state = std::mem::replace(&mut *inner, new_state);
124        context.emit_event(EventType::ConnectivityChanged);
125        match prev_state {
126            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
127            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
128        }
129    }
130
131    /// Pauses the IO scheduler.
132    ///
133    /// If it is currently running the scheduler will be stopped.  When the
134    /// [`IoPausedGuard`] is dropped the scheduler is started again.
135    ///
136    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
137    /// resume will do the right thing and restore the scheduler to the state requested by
138    /// the last call.
139    pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
140        {
141            let mut inner = self.inner.write().await;
142            match *inner {
143                InnerSchedulerState::Started(_) => {
144                    let new_state = InnerSchedulerState::Paused {
145                        started: true,
146                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
147                    };
148                    Self::do_stop(inner, &context, new_state).await;
149                }
150                InnerSchedulerState::Stopped => {
151                    *inner = InnerSchedulerState::Paused {
152                        started: false,
153                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
154                    };
155                }
156                InnerSchedulerState::Paused {
157                    ref mut pause_guards_count,
158                    ..
159                } => {
160                    *pause_guards_count = pause_guards_count
161                        .checked_add(1)
162                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
163                }
164            }
165        }
166
167        let (tx, rx) = oneshot::channel();
168        tokio::spawn(async move {
169            rx.await.ok();
170            let mut inner = context.scheduler.inner.write().await;
171            match *inner {
172                InnerSchedulerState::Started(_) => {
173                    warn!(&context, "IoPausedGuard resume: started instead of paused");
174                }
175                InnerSchedulerState::Stopped => {
176                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
177                }
178                InnerSchedulerState::Paused {
179                    ref started,
180                    ref mut pause_guards_count,
181                } => {
182                    if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
183                        match *started {
184                            true => SchedulerState::do_start(inner, context.clone()).await,
185                            false => *inner = InnerSchedulerState::Stopped,
186                        }
187                    } else {
188                        let new_count = pause_guards_count.get() - 1;
189                        // SAFETY: Value was >=2 before due to if condition
190                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
191                    }
192                }
193            }
194        });
195        Ok(IoPausedGuard { sender: Some(tx) })
196    }
197
198    /// Restarts the scheduler, only if it is running.
199    pub(crate) async fn restart(&self, context: &Context) {
200        info!(context, "restarting IO");
201        if self.is_running().await {
202            self.stop(context).await;
203            self.start(context.clone()).await;
204        }
205    }
206
207    /// Indicate that the network likely has come back.
208    pub(crate) async fn maybe_network(&self) {
209        let inner = self.inner.read().await;
210        let (inbox, oboxes) = match *inner {
211            InnerSchedulerState::Started(ref scheduler) => {
212                scheduler.maybe_network();
213                let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
214                let oboxes = scheduler
215                    .oboxes
216                    .iter()
217                    .map(|b| b.conn_state.state.connectivity.clone())
218                    .collect::<Vec<_>>();
219                (inbox, oboxes)
220            }
221            _ => return,
222        };
223        drop(inner);
224        connectivity::idle_interrupted(inbox, oboxes).await;
225    }
226
227    /// Indicate that the network likely is lost.
228    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
229        let inner = self.inner.read().await;
230        let stores = match *inner {
231            InnerSchedulerState::Started(ref scheduler) => {
232                scheduler.maybe_network_lost();
233                scheduler
234                    .boxes()
235                    .map(|b| b.conn_state.state.connectivity.clone())
236                    .collect()
237            }
238            _ => return,
239        };
240        drop(inner);
241        connectivity::maybe_network_lost(context, stores).await;
242    }
243
244    pub(crate) async fn interrupt_inbox(&self) {
245        let inner = self.inner.read().await;
246        if let InnerSchedulerState::Started(ref scheduler) = *inner {
247            scheduler.interrupt_inbox();
248        }
249    }
250
251    /// Interrupt optional boxes (mvbox, sentbox) loops.
252    pub(crate) async fn interrupt_oboxes(&self) {
253        let inner = self.inner.read().await;
254        if let InnerSchedulerState::Started(ref scheduler) = *inner {
255            scheduler.interrupt_oboxes();
256        }
257    }
258
259    pub(crate) async fn interrupt_smtp(&self) {
260        let inner = self.inner.read().await;
261        if let InnerSchedulerState::Started(ref scheduler) = *inner {
262            scheduler.interrupt_smtp();
263        }
264    }
265
266    pub(crate) async fn interrupt_ephemeral_task(&self) {
267        let inner = self.inner.read().await;
268        if let InnerSchedulerState::Started(ref scheduler) = *inner {
269            scheduler.interrupt_ephemeral_task();
270        }
271    }
272
273    pub(crate) async fn interrupt_location(&self) {
274        let inner = self.inner.read().await;
275        if let InnerSchedulerState::Started(ref scheduler) = *inner {
276            scheduler.interrupt_location();
277        }
278    }
279
280    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
281        let inner = self.inner.read().await;
282        if let InnerSchedulerState::Started(ref scheduler) = *inner {
283            scheduler.interrupt_recently_seen(contact_id, timestamp);
284        }
285    }
286}
287
288#[derive(Debug, Default)]
289enum InnerSchedulerState {
290    Started(Scheduler),
291    #[default]
292    Stopped,
293    Paused {
294        started: bool,
295        pause_guards_count: NonZeroUsize,
296    },
297}
298
299/// Guard to make sure the IO Scheduler is resumed.
300///
301/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
302/// guard.
303#[derive(Default, Debug)]
304pub(crate) struct IoPausedGuard {
305    sender: Option<oneshot::Sender<()>>,
306}
307
308impl Drop for IoPausedGuard {
309    fn drop(&mut self) {
310        if let Some(sender) = self.sender.take() {
311            // Can only fail if receiver is dropped, but then we're already resumed.
312            sender.send(()).ok();
313        }
314    }
315}
316
317#[derive(Debug)]
318struct SchedBox {
319    meaning: FolderMeaning,
320    conn_state: ImapConnectionState,
321
322    /// IMAP loop task handle.
323    handle: task::JoinHandle<()>,
324}
325
326/// Job and connection scheduler.
327#[derive(Debug)]
328pub(crate) struct Scheduler {
329    inbox: SchedBox,
330    /// Optional boxes -- mvbox, sentbox.
331    oboxes: Vec<SchedBox>,
332    smtp: SmtpConnectionState,
333    smtp_handle: task::JoinHandle<()>,
334    ephemeral_handle: task::JoinHandle<()>,
335    ephemeral_interrupt_send: Sender<()>,
336    location_handle: task::JoinHandle<()>,
337    location_interrupt_send: Sender<()>,
338
339    recently_seen_loop: RecentlySeenLoop,
340}
341
342async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
343    let msg_ids = context
344        .sql
345        .query_map(
346            "SELECT msg_id FROM download",
347            (),
348            |row| {
349                let msg_id: MsgId = row.get(0)?;
350                Ok(msg_id)
351            },
352            |rowids| {
353                rowids
354                    .collect::<std::result::Result<Vec<_>, _>>()
355                    .map_err(Into::into)
356            },
357        )
358        .await?;
359
360    for msg_id in msg_ids {
361        if let Err(err) = download_msg(context, msg_id, session).await {
362            warn!(context, "Failed to download message {msg_id}: {:#}.", err);
363
364            // Update download state to failure
365            // so it can be retried.
366            //
367            // On success update_download_state() is not needed
368            // as receive_imf() already
369            // set the state and emitted the event.
370            msg_id
371                .update_download_state(context, DownloadState::Failure)
372                .await?;
373        }
374        context
375            .sql
376            .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
377            .await?;
378    }
379
380    Ok(())
381}
382
383async fn inbox_loop(
384    ctx: Context,
385    started: oneshot::Sender<()>,
386    inbox_handlers: ImapConnectionHandlers,
387) {
388    use futures::future::FutureExt;
389
390    info!(ctx, "Starting inbox loop.");
391    let ImapConnectionHandlers {
392        mut connection,
393        stop_token,
394    } = inbox_handlers;
395
396    let ctx1 = ctx.clone();
397    let fut = async move {
398        let ctx = ctx1;
399        if let Err(()) = started.send(()) {
400            warn!(ctx, "Inbox loop, missing started receiver.");
401            return;
402        };
403
404        let mut old_session: Option<Session> = None;
405        loop {
406            let session = if let Some(session) = old_session.take() {
407                session
408            } else {
409                info!(ctx, "Preparing new IMAP session for inbox.");
410                match connection.prepare(&ctx).await {
411                    Err(err) => {
412                        warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
413                        continue;
414                    }
415                    Ok(session) => session,
416                }
417            };
418
419            match inbox_fetch_idle(&ctx, &mut connection, session).await {
420                Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
421                Ok(session) => {
422                    info!(
423                        ctx,
424                        "IMAP loop iteration for inbox finished, keeping the session."
425                    );
426                    old_session = Some(session);
427                }
428            }
429        }
430    };
431
432    stop_token
433        .cancelled()
434        .map(|_| {
435            info!(ctx, "Shutting down inbox loop.");
436        })
437        .race(fut)
438        .await;
439}
440
441/// Convert folder meaning
442/// used internally by [fetch_idle] and [Context::background_fetch].
443///
444/// Returns folder configuration key and folder name
445/// if such folder is configured, `Ok(None)` otherwise.
446pub async fn convert_folder_meaning(
447    ctx: &Context,
448    folder_meaning: FolderMeaning,
449) -> Result<Option<(Config, String)>> {
450    let folder_config = match folder_meaning.to_config() {
451        Some(c) => c,
452        None => {
453            // Such folder cannot be configured,
454            // e.g. a `FolderMeaning::Spam` folder.
455            return Ok(None);
456        }
457    };
458
459    let folder = ctx
460        .get_config(folder_config)
461        .await
462        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
463
464    if let Some(watch_folder) = folder {
465        Ok(Some((folder_config, watch_folder)))
466    } else {
467        Ok(None)
468    }
469}
470
471async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
472    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
473        ctx.set_config_internal(
474            Config::IsChatmail,
475            crate::config::from_bool(session.is_chatmail()),
476        )
477        .await?;
478    }
479
480    // Update quota no more than once a minute.
481    if ctx.quota_needs_update(60).await {
482        if let Err(err) = ctx.update_recent_quota(&mut session).await {
483            warn!(ctx, "Failed to update quota: {:#}.", err);
484        }
485    }
486
487    let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
488    if resync_requested {
489        if let Err(err) = session.resync_folders(ctx).await {
490            warn!(ctx, "Failed to resync folders: {:#}.", err);
491            ctx.resync_request.store(true, Ordering::Relaxed);
492        }
493    }
494
495    maybe_add_time_based_warnings(ctx).await;
496
497    match ctx.get_config_i64(Config::LastHousekeeping).await {
498        Ok(last_housekeeping_time) => {
499            let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24);
500            if next_housekeeping_time <= time() {
501                sql::housekeeping(ctx).await.log_err(ctx).ok();
502            }
503        }
504        Err(err) => {
505            warn!(ctx, "Failed to get last housekeeping time: {}", err);
506        }
507    };
508
509    match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
510        Ok(fetched_existing_msgs) => {
511            if !fetched_existing_msgs {
512                // Consider it done even if we fail.
513                //
514                // This operation is not critical enough to retry,
515                // especially if the error is persistent.
516                if let Err(err) = ctx
517                    .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
518                    .await
519                {
520                    warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
521                }
522
523                if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
524                    warn!(ctx, "Failed to fetch existing messages: {:#}", err);
525                }
526            }
527        }
528        Err(err) => {
529            warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
530        }
531    }
532
533    download_msgs(ctx, &mut session)
534        .await
535        .context("Failed to download messages")?;
536    session
537        .fetch_metadata(ctx)
538        .await
539        .context("Failed to fetch metadata")?;
540    session
541        .register_token(ctx)
542        .await
543        .context("Failed to register push token")?;
544
545    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
546    Ok(session)
547}
548
549/// Implement a single iteration of IMAP loop.
550///
551/// This function performs all IMAP operations on a single folder, selecting it if necessary and
552/// handling all the errors. In case of an error, an error is returned and connection is dropped,
553/// otherwise connection is returned.
554async fn fetch_idle(
555    ctx: &Context,
556    connection: &mut Imap,
557    mut session: Session,
558    folder_meaning: FolderMeaning,
559) -> Result<Session> {
560    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
561    else {
562        // The folder is not configured.
563        // For example, this happens if the server does not have Sent folder
564        // but watching Sent folder is enabled.
565        connection.connectivity.set_not_configured(ctx).await;
566        connection.idle_interrupt_receiver.recv().await.ok();
567        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
568    };
569
570    if folder_config == Config::ConfiguredInboxFolder {
571        let mvbox;
572        let syncbox = match ctx.should_move_sync_msgs().await? {
573            false => &watch_folder,
574            true => {
575                mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
576                mvbox.as_deref().unwrap_or(&watch_folder)
577            }
578        };
579        session
580            .send_sync_msgs(ctx, syncbox)
581            .await
582            .context("fetch_idle: send_sync_msgs")
583            .log_err(ctx)
584            .ok();
585
586        session
587            .store_seen_flags_on_imap(ctx)
588            .await
589            .context("store_seen_flags_on_imap")?;
590    }
591
592    if !ctx.should_delete_to_trash().await?
593        || ctx
594            .get_config(Config::ConfiguredTrashFolder)
595            .await?
596            .is_some()
597    {
598        // Fetch the watched folder.
599        connection
600            .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
601            .await
602            .context("fetch_move_delete")?;
603
604        // Mark expired messages for deletion. Marked messages will be deleted from the server
605        // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
606        // called right before `fetch_move_delete` because it is not well optimized and would
607        // otherwise slow down message fetching.
608        delete_expired_imap_messages(ctx)
609            .await
610            .context("delete_expired_imap_messages")?;
611    } else if folder_config == Config::ConfiguredInboxFolder {
612        ctx.last_full_folder_scan.lock().await.take();
613    }
614
615    // Scan additional folders only after finishing fetching the watched folder.
616    //
617    // On iOS the application has strictly limited time to work in background, so we may not
618    // be able to scan all folders before time is up if there are many of them.
619    if folder_config == Config::ConfiguredInboxFolder {
620        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
621        match connection
622            .scan_folders(ctx, &mut session)
623            .await
624            .context("scan_folders")
625        {
626            Err(err) => {
627                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
628                // but maybe just one folder can't be selected or something
629                warn!(ctx, "{:#}", err);
630            }
631            Ok(true) => {
632                // Fetch the watched folder again in case scanning other folder moved messages
633                // there.
634                //
635                // In most cases this will select the watched folder and return because there are
636                // no new messages. We want to select the watched folder anyway before going IDLE
637                // there, so this does not take additional protocol round-trip.
638                connection
639                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
640                    .await
641                    .context("fetch_move_delete after scan_folders")?;
642            }
643            Ok(false) => {}
644        }
645    }
646
647    // Synchronize Seen flags.
648    session
649        .sync_seen_flags(ctx, &watch_folder)
650        .await
651        .context("sync_seen_flags")
652        .log_err(ctx)
653        .ok();
654
655    connection.connectivity.set_idle(ctx).await;
656
657    ctx.emit_event(EventType::ImapInboxIdle);
658
659    if !session.can_idle() {
660        info!(
661            ctx,
662            "IMAP session does not support IDLE, going to fake idle."
663        );
664        connection.fake_idle(ctx, watch_folder).await?;
665        return Ok(session);
666    }
667
668    if ctx
669        .get_config_bool(Config::DisableIdle)
670        .await
671        .context("Failed to get disable_idle config")
672        .log_err(ctx)
673        .unwrap_or_default()
674    {
675        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
676        connection.fake_idle(ctx, watch_folder).await?;
677        return Ok(session);
678    }
679
680    info!(
681        ctx,
682        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
683    );
684    let session = session
685        .idle(
686            ctx,
687            connection.idle_interrupt_receiver.clone(),
688            &watch_folder,
689        )
690        .await
691        .context("idle")?;
692
693    Ok(session)
694}
695
696async fn simple_imap_loop(
697    ctx: Context,
698    started: oneshot::Sender<()>,
699    inbox_handlers: ImapConnectionHandlers,
700    folder_meaning: FolderMeaning,
701) {
702    use futures::future::FutureExt;
703
704    info!(ctx, "Starting simple loop for {folder_meaning}.");
705    let ImapConnectionHandlers {
706        mut connection,
707        stop_token,
708    } = inbox_handlers;
709
710    let ctx1 = ctx.clone();
711
712    let fut = async move {
713        let ctx = ctx1;
714        if let Err(()) = started.send(()) {
715            warn!(
716                ctx,
717                "Simple imap loop for {folder_meaning}, missing started receiver."
718            );
719            return;
720        }
721
722        let mut old_session: Option<Session> = None;
723        loop {
724            let session = if let Some(session) = old_session.take() {
725                session
726            } else {
727                info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
728                match connection.prepare(&ctx).await {
729                    Err(err) => {
730                        warn!(
731                            ctx,
732                            "Failed to prepare {folder_meaning} connection: {err:#}."
733                        );
734                        continue;
735                    }
736                    Ok(session) => session,
737                }
738            };
739
740            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
741                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
742                Ok(session) => {
743                    info!(
744                        ctx,
745                        "IMAP loop iteration for {folder_meaning} finished, keeping the session"
746                    );
747                    old_session = Some(session);
748                }
749            }
750        }
751    };
752
753    stop_token
754        .cancelled()
755        .map(|_| {
756            info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
757        })
758        .race(fut)
759        .await;
760}
761
762async fn smtp_loop(
763    ctx: Context,
764    started: oneshot::Sender<()>,
765    smtp_handlers: SmtpConnectionHandlers,
766) {
767    use futures::future::FutureExt;
768
769    info!(ctx, "Starting SMTP loop.");
770    let SmtpConnectionHandlers {
771        mut connection,
772        stop_token,
773        idle_interrupt_receiver,
774    } = smtp_handlers;
775
776    let ctx1 = ctx.clone();
777    let fut = async move {
778        let ctx = ctx1;
779        if let Err(()) = started.send(()) {
780            warn!(&ctx, "SMTP loop, missing started receiver.");
781            return;
782        }
783
784        let mut timeout = None;
785        loop {
786            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
787                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
788                timeout = Some(timeout.unwrap_or(30));
789            } else {
790                timeout = None;
791                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
792                if !duration_until_can_send.is_zero() {
793                    info!(
794                        ctx,
795                        "smtp got rate limited, waiting for {} until can send again",
796                        duration_to_str(duration_until_can_send)
797                    );
798                    tokio::time::sleep(duration_until_can_send).await;
799                    continue;
800                }
801            }
802
803            // Fake Idle
804            info!(ctx, "SMTP fake idle started.");
805            match &connection.last_send_error {
806                None => connection.connectivity.set_idle(&ctx).await,
807                Some(err) => connection.connectivity.set_err(&ctx, err).await,
808            }
809
810            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
811            // sending is retried (at the latest) after the timeout. If sending fails
812            // again, we increase the timeout exponentially, in order not to do lots of
813            // unnecessary retries.
814            if let Some(t) = timeout {
815                let now = tools::Time::now();
816                info!(
817                    ctx,
818                    "SMTP has messages to retry, planning to retry {t} seconds later."
819                );
820                let duration = std::time::Duration::from_secs(t);
821                tokio::time::timeout(duration, async {
822                    idle_interrupt_receiver.recv().await.unwrap_or_default()
823                })
824                .await
825                .unwrap_or_default();
826                let slept = time_elapsed(&now).as_secs();
827                timeout = Some(cmp::max(
828                    t,
829                    slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
830                ));
831            } else {
832                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
833                idle_interrupt_receiver.recv().await.unwrap_or_default();
834            };
835
836            info!(ctx, "SMTP fake idle interrupted.")
837        }
838    };
839
840    stop_token
841        .cancelled()
842        .map(|_| {
843            info!(ctx, "Shutting down SMTP loop.");
844        })
845        .race(fut)
846        .await;
847}
848
849impl Scheduler {
850    /// Start the scheduler.
851    pub async fn start(ctx: &Context) -> Result<Self> {
852        let (smtp, smtp_handlers) = SmtpConnectionState::new();
853
854        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
855        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
856        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
857
858        let mut oboxes = Vec::new();
859        let mut start_recvs = Vec::new();
860
861        let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
862        let (inbox_start_send, inbox_start_recv) = oneshot::channel();
863        let handle = {
864            let ctx = ctx.clone();
865            task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
866        };
867        let inbox = SchedBox {
868            meaning: FolderMeaning::Inbox,
869            conn_state,
870            handle,
871        };
872        start_recvs.push(inbox_start_recv);
873
874        for (meaning, should_watch) in [
875            (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
876            (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
877        ] {
878            if should_watch? {
879                let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
880                let (start_send, start_recv) = oneshot::channel();
881                let ctx = ctx.clone();
882                let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
883                oboxes.push(SchedBox {
884                    meaning,
885                    conn_state,
886                    handle,
887                });
888                start_recvs.push(start_recv);
889            }
890        }
891
892        let smtp_handle = {
893            let ctx = ctx.clone();
894            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
895        };
896        start_recvs.push(smtp_start_recv);
897
898        let ephemeral_handle = {
899            let ctx = ctx.clone();
900            task::spawn(async move {
901                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
902            })
903        };
904
905        let location_handle = {
906            let ctx = ctx.clone();
907            task::spawn(async move {
908                location::location_loop(&ctx, location_interrupt_recv).await;
909            })
910        };
911
912        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
913
914        let res = Self {
915            inbox,
916            oboxes,
917            smtp,
918            smtp_handle,
919            ephemeral_handle,
920            ephemeral_interrupt_send,
921            location_handle,
922            location_interrupt_send,
923            recently_seen_loop,
924        };
925
926        // wait for all loops to be started
927        if let Err(err) = try_join_all(start_recvs).await {
928            bail!("failed to start scheduler: {}", err);
929        }
930
931        info!(ctx, "scheduler is running");
932        Ok(res)
933    }
934
935    fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
936        once(&self.inbox).chain(self.oboxes.iter())
937    }
938
939    fn maybe_network(&self) {
940        for b in self.boxes() {
941            b.conn_state.interrupt();
942        }
943        self.interrupt_smtp();
944    }
945
946    fn maybe_network_lost(&self) {
947        for b in self.boxes() {
948            b.conn_state.interrupt();
949        }
950        self.interrupt_smtp();
951    }
952
953    fn interrupt_inbox(&self) {
954        self.inbox.conn_state.interrupt();
955    }
956
957    fn interrupt_oboxes(&self) {
958        for b in &self.oboxes {
959            b.conn_state.interrupt();
960        }
961    }
962
963    fn interrupt_smtp(&self) {
964        self.smtp.interrupt();
965    }
966
967    fn interrupt_ephemeral_task(&self) {
968        self.ephemeral_interrupt_send.try_send(()).ok();
969    }
970
971    fn interrupt_location(&self) {
972        self.location_interrupt_send.try_send(()).ok();
973    }
974
975    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
976        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
977    }
978
979    /// Halt the scheduler.
980    ///
981    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
982    /// are forcefully terminated if they cannot shutdown within the timeout.
983    pub(crate) async fn stop(self, context: &Context) {
984        // Send stop signals to tasks so they can shutdown cleanly.
985        for b in self.boxes() {
986            b.conn_state.stop();
987        }
988        self.smtp.stop();
989
990        // Actually shutdown tasks.
991        let timeout_duration = std::time::Duration::from_secs(30);
992        for b in once(self.inbox).chain(self.oboxes) {
993            tokio::time::timeout(timeout_duration, b.handle)
994                .await
995                .log_err(context)
996                .ok();
997        }
998        tokio::time::timeout(timeout_duration, self.smtp_handle)
999            .await
1000            .log_err(context)
1001            .ok();
1002
1003        // Abort tasks, then await them to ensure the `Future` is dropped.
1004        // Just aborting the task may keep resources such as `Context` clone
1005        // moved into it indefinitely, resulting in database not being
1006        // closed etc.
1007        self.ephemeral_handle.abort();
1008        self.ephemeral_handle.await.ok();
1009        self.location_handle.abort();
1010        self.location_handle.await.ok();
1011        self.recently_seen_loop.abort().await;
1012    }
1013}
1014
1015/// Connection state logic shared between imap and smtp connections.
1016#[derive(Debug)]
1017struct ConnectionState {
1018    /// Cancellation token to interrupt the whole connection.
1019    stop_token: CancellationToken,
1020    /// Channel to interrupt idle.
1021    idle_interrupt_sender: Sender<()>,
1022    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
1023    connectivity: ConnectivityStore,
1024}
1025
1026impl ConnectionState {
1027    /// Shutdown this connection completely.
1028    fn stop(&self) {
1029        // Trigger shutdown of the run loop.
1030        self.stop_token.cancel();
1031    }
1032
1033    fn interrupt(&self) {
1034        // Use try_send to avoid blocking on interrupts.
1035        self.idle_interrupt_sender.try_send(()).ok();
1036    }
1037}
1038
1039#[derive(Debug)]
1040pub(crate) struct SmtpConnectionState {
1041    state: ConnectionState,
1042}
1043
1044impl SmtpConnectionState {
1045    fn new() -> (Self, SmtpConnectionHandlers) {
1046        let stop_token = CancellationToken::new();
1047        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1048
1049        let handlers = SmtpConnectionHandlers {
1050            connection: Smtp::new(),
1051            stop_token: stop_token.clone(),
1052            idle_interrupt_receiver,
1053        };
1054
1055        let state = ConnectionState {
1056            stop_token,
1057            idle_interrupt_sender,
1058            connectivity: handlers.connection.connectivity.clone(),
1059        };
1060
1061        let conn = SmtpConnectionState { state };
1062
1063        (conn, handlers)
1064    }
1065
1066    /// Interrupt any form of idle.
1067    fn interrupt(&self) {
1068        self.state.interrupt();
1069    }
1070
1071    /// Shutdown this connection completely.
1072    fn stop(&self) {
1073        self.state.stop();
1074    }
1075}
1076
1077struct SmtpConnectionHandlers {
1078    connection: Smtp,
1079    stop_token: CancellationToken,
1080    idle_interrupt_receiver: Receiver<()>,
1081}
1082
1083#[derive(Debug)]
1084pub(crate) struct ImapConnectionState {
1085    state: ConnectionState,
1086}
1087
1088impl ImapConnectionState {
1089    /// Construct a new connection.
1090    async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1091        let stop_token = CancellationToken::new();
1092        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1093
1094        let handlers = ImapConnectionHandlers {
1095            connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1096            stop_token: stop_token.clone(),
1097        };
1098
1099        let state = ConnectionState {
1100            stop_token,
1101            idle_interrupt_sender,
1102            connectivity: handlers.connection.connectivity.clone(),
1103        };
1104
1105        let conn = ImapConnectionState { state };
1106
1107        Ok((conn, handlers))
1108    }
1109
1110    /// Interrupt any form of idle.
1111    fn interrupt(&self) {
1112        self.state.interrupt();
1113    }
1114
1115    /// Shutdown this connection completely.
1116    fn stop(&self) {
1117        self.state.stop();
1118    }
1119}
1120
1121#[derive(Debug)]
1122struct ImapConnectionHandlers {
1123    connection: Imap,
1124    stop_token: CancellationToken,
1125}