deltachat/
scheduler.rs

1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4
5use anyhow::{Context as _, Error, Result, bail};
6use async_channel::{self as channel, Receiver, Sender};
7use futures::future::try_join_all;
8use futures_lite::FutureExt;
9use tokio::sync::{RwLock, oneshot};
10use tokio::task;
11use tokio_util::sync::CancellationToken;
12use tokio_util::task::TaskTracker;
13
14pub(crate) use self::connectivity::ConnectivityStore;
15use crate::config::{self, Config};
16use crate::contact::{ContactId, RecentlySeenLoop};
17use crate::context::Context;
18use crate::download::{DownloadState, download_msg};
19use crate::ephemeral::{self, delete_expired_imap_messages};
20use crate::events::EventType;
21use crate::imap::{FolderMeaning, Imap, session::Session};
22use crate::location;
23use crate::log::{LogExt, warn};
24use crate::message::MsgId;
25use crate::smtp::{Smtp, send_smtp_messages};
26use crate::sql;
27use crate::stats::maybe_send_stats;
28use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
29use crate::{constants, stats};
30
31pub(crate) mod connectivity;
32
33/// State of the IO scheduler, as stored on the [`Context`].
34///
35/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
36/// the IO scheduler will be restarted only if it was running before paused or
37/// [`Context::start_io`] was called in the meantime while it was paused.
38#[derive(Debug, Default)]
39pub(crate) struct SchedulerState {
40    inner: RwLock<InnerSchedulerState>,
41}
42
43impl SchedulerState {
44    pub(crate) fn new() -> Self {
45        Default::default()
46    }
47
48    /// Whether the scheduler is currently running.
49    pub(crate) async fn is_running(&self) -> bool {
50        let inner = self.inner.read().await;
51        matches!(*inner, InnerSchedulerState::Started(_))
52    }
53
54    /// Starts the scheduler if it is not yet started.
55    pub(crate) async fn start(&self, context: &Context) {
56        let mut inner = self.inner.write().await;
57        match *inner {
58            InnerSchedulerState::Started(_) => (),
59            InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
60            InnerSchedulerState::Paused {
61                ref mut started, ..
62            } => *started = true,
63        }
64        context.update_connectivities(&inner);
65    }
66
67    /// Starts the scheduler if it is not yet started.
68    async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
69        info!(context, "starting IO");
70
71        // Notify message processing loop
72        // to allow processing old messages after restart.
73        context.new_msgs_notify.notify_one();
74
75        match Scheduler::start(context).await {
76            Ok(scheduler) => {
77                *inner = InnerSchedulerState::Started(scheduler);
78                context.emit_event(EventType::ConnectivityChanged);
79            }
80            Err(err) => error!(context, "Failed to start IO: {:#}", err),
81        }
82    }
83
84    /// Stops the scheduler if it is currently running.
85    pub(crate) async fn stop(&self, context: &Context) {
86        let mut inner = self.inner.write().await;
87        match *inner {
88            InnerSchedulerState::Started(_) => {
89                Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
90            }
91            InnerSchedulerState::Stopped => (),
92            InnerSchedulerState::Paused {
93                ref mut started, ..
94            } => *started = false,
95        }
96        context.update_connectivities(&inner);
97    }
98
99    /// Stops the scheduler if it is currently running.
100    async fn do_stop(
101        inner: &mut InnerSchedulerState,
102        context: &Context,
103        new_state: InnerSchedulerState,
104    ) {
105        // Sending an event wakes up event pollers (get_next_event)
106        // so the caller of stop_io() can arrange for proper termination.
107        // For this, the caller needs to instruct the event poller
108        // to terminate on receiving the next event and then call stop_io()
109        // which will emit the below event(s)
110        info!(context, "stopping IO");
111
112        // Wake up message processing loop even if there are no messages
113        // to allow for clean shutdown.
114        context.new_msgs_notify.notify_one();
115
116        let debug_logging = context
117            .debug_logging
118            .write()
119            .expect("RwLock is poisoned")
120            .take();
121        if let Some(debug_logging) = debug_logging {
122            debug_logging.loop_handle.abort();
123            debug_logging.loop_handle.await.ok();
124        }
125        let prev_state = std::mem::replace(inner, new_state);
126        context.emit_event(EventType::ConnectivityChanged);
127        match prev_state {
128            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
129            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
130        }
131    }
132
133    /// Pauses the IO scheduler.
134    ///
135    /// If it is currently running the scheduler will be stopped.  When the
136    /// [`IoPausedGuard`] is dropped the scheduler is started again.
137    ///
138    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
139    /// resume will do the right thing and restore the scheduler to the state requested by
140    /// the last call.
141    pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
142        {
143            let mut inner = self.inner.write().await;
144            match *inner {
145                InnerSchedulerState::Started(_) => {
146                    let new_state = InnerSchedulerState::Paused {
147                        started: true,
148                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
149                    };
150                    Self::do_stop(&mut inner, context, new_state).await;
151                }
152                InnerSchedulerState::Stopped => {
153                    *inner = InnerSchedulerState::Paused {
154                        started: false,
155                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
156                    };
157                }
158                InnerSchedulerState::Paused {
159                    ref mut pause_guards_count,
160                    ..
161                } => {
162                    *pause_guards_count = pause_guards_count
163                        .checked_add(1)
164                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
165                }
166            }
167            context.update_connectivities(&inner);
168        }
169
170        let (tx, rx) = oneshot::channel();
171        let context = context.clone();
172        tokio::spawn(async move {
173            rx.await.ok();
174            let mut inner = context.scheduler.inner.write().await;
175            match *inner {
176                InnerSchedulerState::Started(_) => {
177                    warn!(&context, "IoPausedGuard resume: started instead of paused");
178                }
179                InnerSchedulerState::Stopped => {
180                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
181                }
182                InnerSchedulerState::Paused {
183                    ref started,
184                    ref mut pause_guards_count,
185                } => {
186                    if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
187                        match *started {
188                            true => SchedulerState::do_start(&mut inner, &context).await,
189                            false => *inner = InnerSchedulerState::Stopped,
190                        }
191                    } else {
192                        let new_count = pause_guards_count.get() - 1;
193                        // SAFETY: Value was >=2 before due to if condition
194                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
195                    }
196                }
197            }
198            context.update_connectivities(&inner);
199        });
200        Ok(IoPausedGuard { sender: Some(tx) })
201    }
202
203    /// Restarts the scheduler, only if it is running.
204    pub(crate) async fn restart(&self, context: &Context) {
205        info!(context, "restarting IO");
206        if self.is_running().await {
207            self.stop(context).await;
208            self.start(context).await;
209        }
210    }
211
212    /// Indicate that the network likely has come back.
213    pub(crate) async fn maybe_network(&self) {
214        let inner = self.inner.read().await;
215        let (inbox, oboxes) = match *inner {
216            InnerSchedulerState::Started(ref scheduler) => {
217                scheduler.maybe_network();
218                let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
219                let oboxes = scheduler
220                    .oboxes
221                    .iter()
222                    .map(|b| b.conn_state.state.connectivity.clone())
223                    .collect::<Vec<_>>();
224                (inbox, oboxes)
225            }
226            _ => return,
227        };
228        drop(inner);
229        connectivity::idle_interrupted(inbox, oboxes);
230    }
231
232    /// Indicate that the network likely is lost.
233    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
234        let inner = self.inner.read().await;
235        let stores = match *inner {
236            InnerSchedulerState::Started(ref scheduler) => {
237                scheduler.maybe_network_lost();
238                scheduler
239                    .boxes()
240                    .map(|b| b.conn_state.state.connectivity.clone())
241                    .collect()
242            }
243            _ => return,
244        };
245        drop(inner);
246        connectivity::maybe_network_lost(context, stores);
247    }
248
249    pub(crate) async fn interrupt_inbox(&self) {
250        let inner = self.inner.read().await;
251        if let InnerSchedulerState::Started(ref scheduler) = *inner {
252            scheduler.interrupt_inbox();
253        }
254    }
255
256    /// Interrupt optional boxes (mvbox currently) loops.
257    pub(crate) async fn interrupt_oboxes(&self) {
258        let inner = self.inner.read().await;
259        if let InnerSchedulerState::Started(ref scheduler) = *inner {
260            scheduler.interrupt_oboxes();
261        }
262    }
263
264    pub(crate) async fn interrupt_smtp(&self) {
265        let inner = self.inner.read().await;
266        if let InnerSchedulerState::Started(ref scheduler) = *inner {
267            scheduler.interrupt_smtp();
268        }
269    }
270
271    pub(crate) async fn interrupt_ephemeral_task(&self) {
272        let inner = self.inner.read().await;
273        if let InnerSchedulerState::Started(ref scheduler) = *inner {
274            scheduler.interrupt_ephemeral_task();
275        }
276    }
277
278    pub(crate) async fn interrupt_location(&self) {
279        let inner = self.inner.read().await;
280        if let InnerSchedulerState::Started(ref scheduler) = *inner {
281            scheduler.interrupt_location();
282        }
283    }
284
285    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
286        let inner = self.inner.read().await;
287        if let InnerSchedulerState::Started(ref scheduler) = *inner {
288            scheduler.interrupt_recently_seen(contact_id, timestamp);
289        }
290    }
291}
292
293#[derive(Debug, Default)]
294pub(crate) enum InnerSchedulerState {
295    Started(Scheduler),
296    #[default]
297    Stopped,
298    Paused {
299        started: bool,
300        pause_guards_count: NonZeroUsize,
301    },
302}
303
304/// Guard to make sure the IO Scheduler is resumed.
305///
306/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
307/// guard.
308#[derive(Default, Debug)]
309pub(crate) struct IoPausedGuard {
310    sender: Option<oneshot::Sender<()>>,
311}
312
313impl Drop for IoPausedGuard {
314    fn drop(&mut self) {
315        if let Some(sender) = self.sender.take() {
316            // Can only fail if receiver is dropped, but then we're already resumed.
317            sender.send(()).ok();
318        }
319    }
320}
321
322#[derive(Debug)]
323struct SchedBox {
324    meaning: FolderMeaning,
325    conn_state: ImapConnectionState,
326
327    /// IMAP loop task handle.
328    handle: task::JoinHandle<()>,
329}
330
331/// Job and connection scheduler.
332#[derive(Debug)]
333pub(crate) struct Scheduler {
334    inbox: SchedBox,
335    /// Optional boxes -- mvbox.
336    oboxes: Vec<SchedBox>,
337    smtp: SmtpConnectionState,
338    smtp_handle: task::JoinHandle<()>,
339    ephemeral_handle: task::JoinHandle<()>,
340    ephemeral_interrupt_send: Sender<()>,
341    location_handle: task::JoinHandle<()>,
342    location_interrupt_send: Sender<()>,
343
344    recently_seen_loop: RecentlySeenLoop,
345}
346
347async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
348    let msg_ids = context
349        .sql
350        .query_map_vec("SELECT msg_id FROM download", (), |row| {
351            let msg_id: MsgId = row.get(0)?;
352            Ok(msg_id)
353        })
354        .await?;
355
356    for msg_id in msg_ids {
357        if let Err(err) = download_msg(context, msg_id, session).await {
358            warn!(context, "Failed to download message {msg_id}: {:#}.", err);
359
360            // Update download state to failure
361            // so it can be retried.
362            //
363            // On success update_download_state() is not needed
364            // as receive_imf() already
365            // set the state and emitted the event.
366            msg_id
367                .update_download_state(context, DownloadState::Failure)
368                .await?;
369        }
370        context
371            .sql
372            .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
373            .await?;
374    }
375
376    Ok(())
377}
378
379async fn inbox_loop(
380    ctx: Context,
381    started: oneshot::Sender<()>,
382    inbox_handlers: ImapConnectionHandlers,
383) {
384    use futures::future::FutureExt;
385
386    info!(ctx, "Starting inbox loop.");
387    let ImapConnectionHandlers {
388        mut connection,
389        stop_token,
390    } = inbox_handlers;
391
392    let ctx1 = ctx.clone();
393    let fut = async move {
394        let ctx = ctx1;
395        if let Err(()) = started.send(()) {
396            warn!(ctx, "Inbox loop, missing started receiver.");
397            return;
398        };
399
400        let mut old_session: Option<Session> = None;
401        loop {
402            let session = if let Some(session) = old_session.take() {
403                session
404            } else {
405                info!(ctx, "Preparing new IMAP session for inbox.");
406                match connection.prepare(&ctx).await {
407                    Err(err) => {
408                        warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
409                        continue;
410                    }
411                    Ok(session) => session,
412                }
413            };
414
415            match inbox_fetch_idle(&ctx, &mut connection, session).await {
416                Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
417                Ok(session) => {
418                    info!(
419                        ctx,
420                        "IMAP loop iteration for inbox finished, keeping the session."
421                    );
422                    old_session = Some(session);
423                }
424            }
425        }
426    };
427
428    stop_token
429        .cancelled()
430        .map(|_| {
431            info!(ctx, "Shutting down inbox loop.");
432        })
433        .race(fut)
434        .await;
435}
436
437/// Convert folder meaning
438/// used internally by [fetch_idle] and [Context::background_fetch].
439///
440/// Returns folder configuration key and folder name
441/// if such folder is configured, `Ok(None)` otherwise.
442pub async fn convert_folder_meaning(
443    ctx: &Context,
444    folder_meaning: FolderMeaning,
445) -> Result<Option<(Config, String)>> {
446    let folder_config = match folder_meaning.to_config() {
447        Some(c) => c,
448        None => {
449            // Such folder cannot be configured,
450            // e.g. a `FolderMeaning::Spam` folder.
451            return Ok(None);
452        }
453    };
454
455    let folder = ctx
456        .get_config(folder_config)
457        .await
458        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
459
460    if let Some(watch_folder) = folder {
461        Ok(Some((folder_config, watch_folder)))
462    } else {
463        Ok(None)
464    }
465}
466
467async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
468    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
469        ctx.set_config_internal(
470            Config::IsChatmail,
471            crate::config::from_bool(session.is_chatmail()),
472        )
473        .await?;
474    }
475
476    // Update quota no more than once a minute.
477    if ctx.quota_needs_update(60).await {
478        if let Err(err) = ctx.update_recent_quota(&mut session).await {
479            warn!(ctx, "Failed to update quota: {:#}.", err);
480        }
481    }
482
483    if let Ok(()) = imap.resync_request_receiver.try_recv() {
484        if let Err(err) = session.resync_folders(ctx).await {
485            warn!(ctx, "Failed to resync folders: {:#}.", err);
486            imap.resync_request_sender.try_send(()).ok();
487        }
488    }
489
490    maybe_add_time_based_warnings(ctx).await;
491
492    match ctx.get_config_i64(Config::LastHousekeeping).await {
493        Ok(last_housekeeping_time) => {
494            let next_housekeeping_time =
495                last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
496            if next_housekeeping_time <= time() {
497                sql::housekeeping(ctx).await.log_err(ctx).ok();
498            }
499        }
500        Err(err) => {
501            warn!(ctx, "Failed to get last housekeeping time: {}", err);
502        }
503    };
504
505    maybe_send_stats(ctx).await.log_err(ctx).ok();
506    match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
507        Ok(fetched_existing_msgs) => {
508            if !fetched_existing_msgs {
509                // Consider it done even if we fail.
510                //
511                // This operation is not critical enough to retry,
512                // especially if the error is persistent.
513                if let Err(err) = ctx
514                    .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
515                    .await
516                {
517                    warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
518                }
519
520                if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
521                    warn!(ctx, "Failed to fetch existing messages: {:#}", err);
522                }
523            }
524        }
525        Err(err) => {
526            warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
527        }
528    }
529
530    download_msgs(ctx, &mut session)
531        .await
532        .context("Failed to download messages")?;
533    session
534        .fetch_metadata(ctx)
535        .await
536        .context("Failed to fetch metadata")?;
537    session
538        .register_token(ctx)
539        .await
540        .context("Failed to register push token")?;
541
542    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
543    Ok(session)
544}
545
546/// Implement a single iteration of IMAP loop.
547///
548/// This function performs all IMAP operations on a single folder, selecting it if necessary and
549/// handling all the errors. In case of an error, an error is returned and connection is dropped,
550/// otherwise connection is returned.
551async fn fetch_idle(
552    ctx: &Context,
553    connection: &mut Imap,
554    mut session: Session,
555    folder_meaning: FolderMeaning,
556) -> Result<Session> {
557    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
558    else {
559        // The folder is not configured.
560        // For example, this happens if the server does not have Sent folder
561        // but watching Sent folder is enabled.
562        connection.connectivity.set_not_configured(ctx);
563        connection.idle_interrupt_receiver.recv().await.ok();
564        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
565    };
566
567    if folder_config == Config::ConfiguredInboxFolder {
568        let mvbox;
569        let syncbox = match ctx.should_move_sync_msgs().await? {
570            false => &watch_folder,
571            true => {
572                mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
573                mvbox.as_deref().unwrap_or(&watch_folder)
574            }
575        };
576        session
577            .send_sync_msgs(ctx, syncbox)
578            .await
579            .context("fetch_idle: send_sync_msgs")
580            .log_err(ctx)
581            .ok();
582
583        session
584            .store_seen_flags_on_imap(ctx)
585            .await
586            .context("store_seen_flags_on_imap")?;
587    }
588
589    if !ctx.should_delete_to_trash().await?
590        || ctx
591            .get_config(Config::ConfiguredTrashFolder)
592            .await?
593            .is_some()
594    {
595        // Fetch the watched folder.
596        connection
597            .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
598            .await
599            .context("fetch_move_delete")?;
600
601        // Mark expired messages for deletion. Marked messages will be deleted from the server
602        // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
603        // called right before `fetch_move_delete` because it is not well optimized and would
604        // otherwise slow down message fetching.
605        delete_expired_imap_messages(ctx)
606            .await
607            .context("delete_expired_imap_messages")?;
608    } else if folder_config == Config::ConfiguredInboxFolder {
609        session.last_full_folder_scan.lock().await.take();
610    }
611
612    // Scan additional folders only after finishing fetching the watched folder.
613    //
614    // On iOS the application has strictly limited time to work in background, so we may not
615    // be able to scan all folders before time is up if there are many of them.
616    if folder_config == Config::ConfiguredInboxFolder {
617        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
618        match connection
619            .scan_folders(ctx, &mut session)
620            .await
621            .context("scan_folders")
622        {
623            Err(err) => {
624                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
625                // but maybe just one folder can't be selected or something
626                warn!(ctx, "{:#}", err);
627            }
628            Ok(true) => {
629                // Fetch the watched folder again in case scanning other folder moved messages
630                // there.
631                //
632                // In most cases this will select the watched folder and return because there are
633                // no new messages. We want to select the watched folder anyway before going IDLE
634                // there, so this does not take additional protocol round-trip.
635                connection
636                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
637                    .await
638                    .context("fetch_move_delete after scan_folders")?;
639            }
640            Ok(false) => {}
641        }
642    }
643
644    // Synchronize Seen flags.
645    session
646        .sync_seen_flags(ctx, &watch_folder)
647        .await
648        .context("sync_seen_flags")
649        .log_err(ctx)
650        .ok();
651
652    connection.connectivity.set_idle(ctx);
653
654    ctx.emit_event(EventType::ImapInboxIdle);
655
656    if !session.can_idle() {
657        info!(
658            ctx,
659            "IMAP session does not support IDLE, going to fake idle."
660        );
661        connection.fake_idle(ctx, watch_folder).await?;
662        return Ok(session);
663    }
664
665    if ctx
666        .get_config_bool(Config::DisableIdle)
667        .await
668        .context("Failed to get disable_idle config")
669        .log_err(ctx)
670        .unwrap_or_default()
671    {
672        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
673        connection.fake_idle(ctx, watch_folder).await?;
674        return Ok(session);
675    }
676
677    info!(
678        ctx,
679        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
680    );
681    let session = session
682        .idle(
683            ctx,
684            connection.idle_interrupt_receiver.clone(),
685            &watch_folder,
686        )
687        .await
688        .context("idle")?;
689
690    Ok(session)
691}
692
693async fn simple_imap_loop(
694    ctx: Context,
695    started: oneshot::Sender<()>,
696    inbox_handlers: ImapConnectionHandlers,
697    folder_meaning: FolderMeaning,
698) {
699    use futures::future::FutureExt;
700
701    info!(ctx, "Starting simple loop for {folder_meaning}.");
702    let ImapConnectionHandlers {
703        mut connection,
704        stop_token,
705    } = inbox_handlers;
706
707    let ctx1 = ctx.clone();
708
709    let fut = async move {
710        let ctx = ctx1;
711        if let Err(()) = started.send(()) {
712            warn!(
713                ctx,
714                "Simple imap loop for {folder_meaning}, missing started receiver."
715            );
716            return;
717        }
718
719        let mut old_session: Option<Session> = None;
720        loop {
721            let session = if let Some(session) = old_session.take() {
722                session
723            } else {
724                info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
725                match connection.prepare(&ctx).await {
726                    Err(err) => {
727                        warn!(
728                            ctx,
729                            "Failed to prepare {folder_meaning} connection: {err:#}."
730                        );
731                        continue;
732                    }
733                    Ok(session) => session,
734                }
735            };
736
737            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
738                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
739                Ok(session) => {
740                    info!(
741                        ctx,
742                        "IMAP loop iteration for {folder_meaning} finished, keeping the session"
743                    );
744                    old_session = Some(session);
745                }
746            }
747        }
748    };
749
750    stop_token
751        .cancelled()
752        .map(|_| {
753            info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
754        })
755        .race(fut)
756        .await;
757}
758
759async fn smtp_loop(
760    ctx: Context,
761    started: oneshot::Sender<()>,
762    smtp_handlers: SmtpConnectionHandlers,
763) {
764    use futures::future::FutureExt;
765
766    info!(ctx, "Starting SMTP loop.");
767    let SmtpConnectionHandlers {
768        mut connection,
769        stop_token,
770        idle_interrupt_receiver,
771    } = smtp_handlers;
772
773    let ctx1 = ctx.clone();
774    let fut = async move {
775        let ctx = ctx1;
776        if let Err(()) = started.send(()) {
777            warn!(&ctx, "SMTP loop, missing started receiver.");
778            return;
779        }
780
781        let mut timeout = None;
782        loop {
783            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
784                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
785                timeout = Some(timeout.unwrap_or(30));
786            } else {
787                timeout = None;
788                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
789                if !duration_until_can_send.is_zero() {
790                    info!(
791                        ctx,
792                        "smtp got rate limited, waiting for {} until can send again",
793                        duration_to_str(duration_until_can_send)
794                    );
795                    tokio::time::sleep(duration_until_can_send).await;
796                    continue;
797                }
798            }
799
800            stats::maybe_update_message_stats(&ctx)
801                .await
802                .log_err(&ctx)
803                .ok();
804
805            // Fake Idle
806            info!(ctx, "SMTP fake idle started.");
807            match &connection.last_send_error {
808                None => connection.connectivity.set_idle(&ctx),
809                Some(err) => connection.connectivity.set_err(&ctx, err),
810            }
811
812            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
813            // sending is retried (at the latest) after the timeout. If sending fails
814            // again, we increase the timeout exponentially, in order not to do lots of
815            // unnecessary retries.
816            if let Some(t) = timeout {
817                let now = tools::Time::now();
818                info!(
819                    ctx,
820                    "SMTP has messages to retry, planning to retry {t} seconds later."
821                );
822                let duration = std::time::Duration::from_secs(t);
823                tokio::time::timeout(duration, async {
824                    idle_interrupt_receiver.recv().await.unwrap_or_default()
825                })
826                .await
827                .unwrap_or_default();
828                let slept = time_elapsed(&now).as_secs();
829                timeout = Some(cmp::max(
830                    t,
831                    slept.saturating_add(rand::random_range((slept / 2)..=slept)),
832                ));
833            } else {
834                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
835                idle_interrupt_receiver.recv().await.unwrap_or_default();
836            };
837
838            info!(ctx, "SMTP fake idle interrupted.")
839        }
840    };
841
842    stop_token
843        .cancelled()
844        .map(|_| {
845            info!(ctx, "Shutting down SMTP loop.");
846        })
847        .race(fut)
848        .await;
849}
850
851impl Scheduler {
852    /// Start the scheduler.
853    pub async fn start(ctx: &Context) -> Result<Self> {
854        let (smtp, smtp_handlers) = SmtpConnectionState::new();
855
856        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
857        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
858        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
859
860        let mut oboxes = Vec::new();
861        let mut start_recvs = Vec::new();
862
863        let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
864        let (inbox_start_send, inbox_start_recv) = oneshot::channel();
865        let handle = {
866            let ctx = ctx.clone();
867            task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
868        };
869        let inbox = SchedBox {
870            meaning: FolderMeaning::Inbox,
871            conn_state,
872            handle,
873        };
874        start_recvs.push(inbox_start_recv);
875
876        if ctx.should_watch_mvbox().await? {
877            let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
878            let (start_send, start_recv) = oneshot::channel();
879            let ctx = ctx.clone();
880            let meaning = FolderMeaning::Mvbox;
881            let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
882            oboxes.push(SchedBox {
883                meaning,
884                conn_state,
885                handle,
886            });
887            start_recvs.push(start_recv);
888        }
889
890        let smtp_handle = {
891            let ctx = ctx.clone();
892            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
893        };
894        start_recvs.push(smtp_start_recv);
895
896        let ephemeral_handle = {
897            let ctx = ctx.clone();
898            task::spawn(async move {
899                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
900            })
901        };
902
903        let location_handle = {
904            let ctx = ctx.clone();
905            task::spawn(async move {
906                location::location_loop(&ctx, location_interrupt_recv).await;
907            })
908        };
909
910        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
911
912        let res = Self {
913            inbox,
914            oboxes,
915            smtp,
916            smtp_handle,
917            ephemeral_handle,
918            ephemeral_interrupt_send,
919            location_handle,
920            location_interrupt_send,
921            recently_seen_loop,
922        };
923
924        // wait for all loops to be started
925        if let Err(err) = try_join_all(start_recvs).await {
926            bail!("failed to start scheduler: {err}");
927        }
928
929        info!(ctx, "scheduler is running");
930        Ok(res)
931    }
932
933    fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
934        once(&self.inbox).chain(self.oboxes.iter())
935    }
936
937    fn maybe_network(&self) {
938        for b in self.boxes() {
939            b.conn_state.interrupt();
940        }
941        self.interrupt_smtp();
942    }
943
944    fn maybe_network_lost(&self) {
945        for b in self.boxes() {
946            b.conn_state.interrupt();
947        }
948        self.interrupt_smtp();
949    }
950
951    fn interrupt_inbox(&self) {
952        self.inbox.conn_state.interrupt();
953    }
954
955    fn interrupt_oboxes(&self) {
956        for b in &self.oboxes {
957            b.conn_state.interrupt();
958        }
959    }
960
961    fn interrupt_smtp(&self) {
962        self.smtp.interrupt();
963    }
964
965    fn interrupt_ephemeral_task(&self) {
966        self.ephemeral_interrupt_send.try_send(()).ok();
967    }
968
969    fn interrupt_location(&self) {
970        self.location_interrupt_send.try_send(()).ok();
971    }
972
973    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
974        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
975    }
976
977    /// Halt the scheduler.
978    ///
979    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
980    /// are forcefully terminated if they cannot shutdown within the timeout.
981    pub(crate) async fn stop(self, context: &Context) {
982        // Send stop signals to tasks so they can shutdown cleanly.
983        for b in self.boxes() {
984            b.conn_state.stop();
985        }
986        self.smtp.stop();
987
988        // Actually shutdown tasks.
989        let timeout_duration = std::time::Duration::from_secs(30);
990
991        let tracker = TaskTracker::new();
992        for b in once(self.inbox).chain(self.oboxes) {
993            let context = context.clone();
994            tracker.spawn(async move {
995                tokio::time::timeout(timeout_duration, b.handle)
996                    .await
997                    .log_err(&context)
998            });
999        }
1000        {
1001            let context = context.clone();
1002            tracker.spawn(async move {
1003                tokio::time::timeout(timeout_duration, self.smtp_handle)
1004                    .await
1005                    .log_err(&context)
1006            });
1007        }
1008        tracker.close();
1009        tracker.wait().await;
1010
1011        // Abort tasks, then await them to ensure the `Future` is dropped.
1012        // Just aborting the task may keep resources such as `Context` clone
1013        // moved into it indefinitely, resulting in database not being
1014        // closed etc.
1015        self.ephemeral_handle.abort();
1016        self.ephemeral_handle.await.ok();
1017        self.location_handle.abort();
1018        self.location_handle.await.ok();
1019        self.recently_seen_loop.abort().await;
1020    }
1021}
1022
1023/// Connection state logic shared between imap and smtp connections.
1024#[derive(Debug)]
1025struct ConnectionState {
1026    /// Cancellation token to interrupt the whole connection.
1027    stop_token: CancellationToken,
1028    /// Channel to interrupt idle.
1029    idle_interrupt_sender: Sender<()>,
1030    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
1031    connectivity: ConnectivityStore,
1032}
1033
1034impl ConnectionState {
1035    /// Shutdown this connection completely.
1036    fn stop(&self) {
1037        // Trigger shutdown of the run loop.
1038        self.stop_token.cancel();
1039    }
1040
1041    fn interrupt(&self) {
1042        // Use try_send to avoid blocking on interrupts.
1043        self.idle_interrupt_sender.try_send(()).ok();
1044    }
1045}
1046
1047#[derive(Debug)]
1048pub(crate) struct SmtpConnectionState {
1049    state: ConnectionState,
1050}
1051
1052impl SmtpConnectionState {
1053    fn new() -> (Self, SmtpConnectionHandlers) {
1054        let stop_token = CancellationToken::new();
1055        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1056
1057        let handlers = SmtpConnectionHandlers {
1058            connection: Smtp::new(),
1059            stop_token: stop_token.clone(),
1060            idle_interrupt_receiver,
1061        };
1062
1063        let state = ConnectionState {
1064            stop_token,
1065            idle_interrupt_sender,
1066            connectivity: handlers.connection.connectivity.clone(),
1067        };
1068
1069        let conn = SmtpConnectionState { state };
1070
1071        (conn, handlers)
1072    }
1073
1074    /// Interrupt any form of idle.
1075    fn interrupt(&self) {
1076        self.state.interrupt();
1077    }
1078
1079    /// Shutdown this connection completely.
1080    fn stop(&self) {
1081        self.state.stop();
1082    }
1083}
1084
1085struct SmtpConnectionHandlers {
1086    connection: Smtp,
1087    stop_token: CancellationToken,
1088    idle_interrupt_receiver: Receiver<()>,
1089}
1090
1091#[derive(Debug)]
1092pub(crate) struct ImapConnectionState {
1093    state: ConnectionState,
1094}
1095
1096impl ImapConnectionState {
1097    /// Construct a new connection.
1098    async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1099        let stop_token = CancellationToken::new();
1100        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1101
1102        let handlers = ImapConnectionHandlers {
1103            connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1104            stop_token: stop_token.clone(),
1105        };
1106
1107        let state = ConnectionState {
1108            stop_token,
1109            idle_interrupt_sender,
1110            connectivity: handlers.connection.connectivity.clone(),
1111        };
1112
1113        let conn = ImapConnectionState { state };
1114
1115        Ok((conn, handlers))
1116    }
1117
1118    /// Interrupt any form of idle.
1119    fn interrupt(&self) {
1120        self.state.interrupt();
1121    }
1122
1123    /// Shutdown this connection completely.
1124    fn stop(&self) {
1125        self.state.stop();
1126    }
1127}
1128
1129#[derive(Debug)]
1130struct ImapConnectionHandlers {
1131    connection: Imap,
1132    stop_token: CancellationToken,
1133}