deltachat/
scheduler.rs

1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{Context as _, Error, Result, bail};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{RwLock, oneshot};
12use tokio::task;
13use tokio_util::sync::CancellationToken;
14use tokio_util::task::TaskTracker;
15
16pub(crate) use self::connectivity::ConnectivityStore;
17use crate::config::{self, Config};
18use crate::constants;
19use crate::contact::{ContactId, RecentlySeenLoop};
20use crate::context::Context;
21use crate::download::{DownloadState, download_msg};
22use crate::ephemeral::{self, delete_expired_imap_messages};
23use crate::events::EventType;
24use crate::imap::{FolderMeaning, Imap, session::Session};
25use crate::location;
26use crate::log::{LogExt, error, info, warn};
27use crate::message::MsgId;
28use crate::smtp::{Smtp, send_smtp_messages};
29use crate::sql;
30use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
31
32pub(crate) mod connectivity;
33
34/// State of the IO scheduler, as stored on the [`Context`].
35///
36/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
37/// the IO scheduler will be restarted only if it was running before paused or
38/// [`Context::start_io`] was called in the meantime while it was paused.
39#[derive(Debug, Default)]
40pub(crate) struct SchedulerState {
41    inner: RwLock<InnerSchedulerState>,
42}
43
44impl SchedulerState {
45    pub(crate) fn new() -> Self {
46        Default::default()
47    }
48
49    /// Whether the scheduler is currently running.
50    pub(crate) async fn is_running(&self) -> bool {
51        let inner = self.inner.read().await;
52        matches!(*inner, InnerSchedulerState::Started(_))
53    }
54
55    /// Starts the scheduler if it is not yet started.
56    pub(crate) async fn start(&self, context: &Context) {
57        let mut inner = self.inner.write().await;
58        match *inner {
59            InnerSchedulerState::Started(_) => (),
60            InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
61            InnerSchedulerState::Paused {
62                ref mut started, ..
63            } => *started = true,
64        }
65        context.update_connectivities(&inner);
66    }
67
68    /// Starts the scheduler if it is not yet started.
69    async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
70        info!(context, "starting IO");
71
72        // Notify message processing loop
73        // to allow processing old messages after restart.
74        context.new_msgs_notify.notify_one();
75
76        match Scheduler::start(context).await {
77            Ok(scheduler) => {
78                *inner = InnerSchedulerState::Started(scheduler);
79                context.emit_event(EventType::ConnectivityChanged);
80            }
81            Err(err) => error!(context, "Failed to start IO: {:#}", err),
82        }
83    }
84
85    /// Stops the scheduler if it is currently running.
86    pub(crate) async fn stop(&self, context: &Context) {
87        let mut inner = self.inner.write().await;
88        match *inner {
89            InnerSchedulerState::Started(_) => {
90                Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
91            }
92            InnerSchedulerState::Stopped => (),
93            InnerSchedulerState::Paused {
94                ref mut started, ..
95            } => *started = false,
96        }
97        context.update_connectivities(&inner);
98    }
99
100    /// Stops the scheduler if it is currently running.
101    async fn do_stop(
102        inner: &mut InnerSchedulerState,
103        context: &Context,
104        new_state: InnerSchedulerState,
105    ) {
106        // Sending an event wakes up event pollers (get_next_event)
107        // so the caller of stop_io() can arrange for proper termination.
108        // For this, the caller needs to instruct the event poller
109        // to terminate on receiving the next event and then call stop_io()
110        // which will emit the below event(s)
111        info!(context, "stopping IO");
112
113        // Wake up message processing loop even if there are no messages
114        // to allow for clean shutdown.
115        context.new_msgs_notify.notify_one();
116
117        let debug_logging = context
118            .debug_logging
119            .write()
120            .expect("RwLock is poisoned")
121            .take();
122        if let Some(debug_logging) = debug_logging {
123            debug_logging.loop_handle.abort();
124            debug_logging.loop_handle.await.ok();
125        }
126        let prev_state = std::mem::replace(inner, new_state);
127        context.emit_event(EventType::ConnectivityChanged);
128        match prev_state {
129            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
130            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
131        }
132    }
133
134    /// Pauses the IO scheduler.
135    ///
136    /// If it is currently running the scheduler will be stopped.  When the
137    /// [`IoPausedGuard`] is dropped the scheduler is started again.
138    ///
139    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
140    /// resume will do the right thing and restore the scheduler to the state requested by
141    /// the last call.
142    pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
143        {
144            let mut inner = self.inner.write().await;
145            match *inner {
146                InnerSchedulerState::Started(_) => {
147                    let new_state = InnerSchedulerState::Paused {
148                        started: true,
149                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
150                    };
151                    Self::do_stop(&mut inner, context, new_state).await;
152                }
153                InnerSchedulerState::Stopped => {
154                    *inner = InnerSchedulerState::Paused {
155                        started: false,
156                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
157                    };
158                }
159                InnerSchedulerState::Paused {
160                    ref mut pause_guards_count,
161                    ..
162                } => {
163                    *pause_guards_count = pause_guards_count
164                        .checked_add(1)
165                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
166                }
167            }
168            context.update_connectivities(&inner);
169        }
170
171        let (tx, rx) = oneshot::channel();
172        let context = context.clone();
173        tokio::spawn(async move {
174            rx.await.ok();
175            let mut inner = context.scheduler.inner.write().await;
176            match *inner {
177                InnerSchedulerState::Started(_) => {
178                    warn!(&context, "IoPausedGuard resume: started instead of paused");
179                }
180                InnerSchedulerState::Stopped => {
181                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
182                }
183                InnerSchedulerState::Paused {
184                    ref started,
185                    ref mut pause_guards_count,
186                } => {
187                    if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
188                        match *started {
189                            true => SchedulerState::do_start(&mut inner, &context).await,
190                            false => *inner = InnerSchedulerState::Stopped,
191                        }
192                    } else {
193                        let new_count = pause_guards_count.get() - 1;
194                        // SAFETY: Value was >=2 before due to if condition
195                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
196                    }
197                }
198            }
199            context.update_connectivities(&inner);
200        });
201        Ok(IoPausedGuard { sender: Some(tx) })
202    }
203
204    /// Restarts the scheduler, only if it is running.
205    pub(crate) async fn restart(&self, context: &Context) {
206        info!(context, "restarting IO");
207        if self.is_running().await {
208            self.stop(context).await;
209            self.start(context).await;
210        }
211    }
212
213    /// Indicate that the network likely has come back.
214    pub(crate) async fn maybe_network(&self) {
215        let inner = self.inner.read().await;
216        let (inbox, oboxes) = match *inner {
217            InnerSchedulerState::Started(ref scheduler) => {
218                scheduler.maybe_network();
219                let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
220                let oboxes = scheduler
221                    .oboxes
222                    .iter()
223                    .map(|b| b.conn_state.state.connectivity.clone())
224                    .collect::<Vec<_>>();
225                (inbox, oboxes)
226            }
227            _ => return,
228        };
229        drop(inner);
230        connectivity::idle_interrupted(inbox, oboxes).await;
231    }
232
233    /// Indicate that the network likely is lost.
234    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
235        let inner = self.inner.read().await;
236        let stores = match *inner {
237            InnerSchedulerState::Started(ref scheduler) => {
238                scheduler.maybe_network_lost();
239                scheduler
240                    .boxes()
241                    .map(|b| b.conn_state.state.connectivity.clone())
242                    .collect()
243            }
244            _ => return,
245        };
246        drop(inner);
247        connectivity::maybe_network_lost(context, stores).await;
248    }
249
250    pub(crate) async fn interrupt_inbox(&self) {
251        let inner = self.inner.read().await;
252        if let InnerSchedulerState::Started(ref scheduler) = *inner {
253            scheduler.interrupt_inbox();
254        }
255    }
256
257    /// Interrupt optional boxes (mvbox, sentbox) loops.
258    pub(crate) async fn interrupt_oboxes(&self) {
259        let inner = self.inner.read().await;
260        if let InnerSchedulerState::Started(ref scheduler) = *inner {
261            scheduler.interrupt_oboxes();
262        }
263    }
264
265    pub(crate) async fn interrupt_smtp(&self) {
266        let inner = self.inner.read().await;
267        if let InnerSchedulerState::Started(ref scheduler) = *inner {
268            scheduler.interrupt_smtp();
269        }
270    }
271
272    pub(crate) async fn interrupt_ephemeral_task(&self) {
273        let inner = self.inner.read().await;
274        if let InnerSchedulerState::Started(ref scheduler) = *inner {
275            scheduler.interrupt_ephemeral_task();
276        }
277    }
278
279    pub(crate) async fn interrupt_location(&self) {
280        let inner = self.inner.read().await;
281        if let InnerSchedulerState::Started(ref scheduler) = *inner {
282            scheduler.interrupt_location();
283        }
284    }
285
286    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
287        let inner = self.inner.read().await;
288        if let InnerSchedulerState::Started(ref scheduler) = *inner {
289            scheduler.interrupt_recently_seen(contact_id, timestamp);
290        }
291    }
292}
293
294#[derive(Debug, Default)]
295pub(crate) enum InnerSchedulerState {
296    Started(Scheduler),
297    #[default]
298    Stopped,
299    Paused {
300        started: bool,
301        pause_guards_count: NonZeroUsize,
302    },
303}
304
305/// Guard to make sure the IO Scheduler is resumed.
306///
307/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
308/// guard.
309#[derive(Default, Debug)]
310pub(crate) struct IoPausedGuard {
311    sender: Option<oneshot::Sender<()>>,
312}
313
314impl Drop for IoPausedGuard {
315    fn drop(&mut self) {
316        if let Some(sender) = self.sender.take() {
317            // Can only fail if receiver is dropped, but then we're already resumed.
318            sender.send(()).ok();
319        }
320    }
321}
322
323#[derive(Debug)]
324struct SchedBox {
325    meaning: FolderMeaning,
326    conn_state: ImapConnectionState,
327
328    /// IMAP loop task handle.
329    handle: task::JoinHandle<()>,
330}
331
332/// Job and connection scheduler.
333#[derive(Debug)]
334pub(crate) struct Scheduler {
335    inbox: SchedBox,
336    /// Optional boxes -- mvbox, sentbox.
337    oboxes: Vec<SchedBox>,
338    smtp: SmtpConnectionState,
339    smtp_handle: task::JoinHandle<()>,
340    ephemeral_handle: task::JoinHandle<()>,
341    ephemeral_interrupt_send: Sender<()>,
342    location_handle: task::JoinHandle<()>,
343    location_interrupt_send: Sender<()>,
344
345    recently_seen_loop: RecentlySeenLoop,
346}
347
348async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
349    let msg_ids = context
350        .sql
351        .query_map(
352            "SELECT msg_id FROM download",
353            (),
354            |row| {
355                let msg_id: MsgId = row.get(0)?;
356                Ok(msg_id)
357            },
358            |rowids| {
359                rowids
360                    .collect::<std::result::Result<Vec<_>, _>>()
361                    .map_err(Into::into)
362            },
363        )
364        .await?;
365
366    for msg_id in msg_ids {
367        if let Err(err) = download_msg(context, msg_id, session).await {
368            warn!(context, "Failed to download message {msg_id}: {:#}.", err);
369
370            // Update download state to failure
371            // so it can be retried.
372            //
373            // On success update_download_state() is not needed
374            // as receive_imf() already
375            // set the state and emitted the event.
376            msg_id
377                .update_download_state(context, DownloadState::Failure)
378                .await?;
379        }
380        context
381            .sql
382            .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
383            .await?;
384    }
385
386    Ok(())
387}
388
389async fn inbox_loop(
390    ctx: Context,
391    started: oneshot::Sender<()>,
392    inbox_handlers: ImapConnectionHandlers,
393) {
394    use futures::future::FutureExt;
395
396    info!(ctx, "Starting inbox loop.");
397    let ImapConnectionHandlers {
398        mut connection,
399        stop_token,
400    } = inbox_handlers;
401
402    let ctx1 = ctx.clone();
403    let fut = async move {
404        let ctx = ctx1;
405        if let Err(()) = started.send(()) {
406            warn!(ctx, "Inbox loop, missing started receiver.");
407            return;
408        };
409
410        let mut old_session: Option<Session> = None;
411        loop {
412            let session = if let Some(session) = old_session.take() {
413                session
414            } else {
415                info!(ctx, "Preparing new IMAP session for inbox.");
416                match connection.prepare(&ctx).await {
417                    Err(err) => {
418                        warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
419                        continue;
420                    }
421                    Ok(session) => session,
422                }
423            };
424
425            match inbox_fetch_idle(&ctx, &mut connection, session).await {
426                Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
427                Ok(session) => {
428                    info!(
429                        ctx,
430                        "IMAP loop iteration for inbox finished, keeping the session."
431                    );
432                    old_session = Some(session);
433                }
434            }
435        }
436    };
437
438    stop_token
439        .cancelled()
440        .map(|_| {
441            info!(ctx, "Shutting down inbox loop.");
442        })
443        .race(fut)
444        .await;
445}
446
447/// Convert folder meaning
448/// used internally by [fetch_idle] and [Context::background_fetch].
449///
450/// Returns folder configuration key and folder name
451/// if such folder is configured, `Ok(None)` otherwise.
452pub async fn convert_folder_meaning(
453    ctx: &Context,
454    folder_meaning: FolderMeaning,
455) -> Result<Option<(Config, String)>> {
456    let folder_config = match folder_meaning.to_config() {
457        Some(c) => c,
458        None => {
459            // Such folder cannot be configured,
460            // e.g. a `FolderMeaning::Spam` folder.
461            return Ok(None);
462        }
463    };
464
465    let folder = ctx
466        .get_config(folder_config)
467        .await
468        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
469
470    if let Some(watch_folder) = folder {
471        Ok(Some((folder_config, watch_folder)))
472    } else {
473        Ok(None)
474    }
475}
476
477async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
478    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
479        ctx.set_config_internal(
480            Config::IsChatmail,
481            crate::config::from_bool(session.is_chatmail()),
482        )
483        .await?;
484    }
485
486    // Update quota no more than once a minute.
487    if ctx.quota_needs_update(60).await {
488        if let Err(err) = ctx.update_recent_quota(&mut session).await {
489            warn!(ctx, "Failed to update quota: {:#}.", err);
490        }
491    }
492
493    let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
494    if resync_requested {
495        if let Err(err) = session.resync_folders(ctx).await {
496            warn!(ctx, "Failed to resync folders: {:#}.", err);
497            ctx.resync_request.store(true, Ordering::Relaxed);
498        }
499    }
500
501    maybe_add_time_based_warnings(ctx).await;
502
503    match ctx.get_config_i64(Config::LastHousekeeping).await {
504        Ok(last_housekeeping_time) => {
505            let next_housekeeping_time =
506                last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
507            if next_housekeeping_time <= time() {
508                sql::housekeeping(ctx).await.log_err(ctx).ok();
509            }
510        }
511        Err(err) => {
512            warn!(ctx, "Failed to get last housekeeping time: {}", err);
513        }
514    };
515
516    match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
517        Ok(fetched_existing_msgs) => {
518            if !fetched_existing_msgs {
519                // Consider it done even if we fail.
520                //
521                // This operation is not critical enough to retry,
522                // especially if the error is persistent.
523                if let Err(err) = ctx
524                    .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
525                    .await
526                {
527                    warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
528                }
529
530                if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
531                    warn!(ctx, "Failed to fetch existing messages: {:#}", err);
532                }
533            }
534        }
535        Err(err) => {
536            warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
537        }
538    }
539
540    download_msgs(ctx, &mut session)
541        .await
542        .context("Failed to download messages")?;
543    session
544        .fetch_metadata(ctx)
545        .await
546        .context("Failed to fetch metadata")?;
547    session
548        .register_token(ctx)
549        .await
550        .context("Failed to register push token")?;
551
552    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
553    Ok(session)
554}
555
556/// Implement a single iteration of IMAP loop.
557///
558/// This function performs all IMAP operations on a single folder, selecting it if necessary and
559/// handling all the errors. In case of an error, an error is returned and connection is dropped,
560/// otherwise connection is returned.
561async fn fetch_idle(
562    ctx: &Context,
563    connection: &mut Imap,
564    mut session: Session,
565    folder_meaning: FolderMeaning,
566) -> Result<Session> {
567    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
568    else {
569        // The folder is not configured.
570        // For example, this happens if the server does not have Sent folder
571        // but watching Sent folder is enabled.
572        connection.connectivity.set_not_configured(ctx).await;
573        connection.idle_interrupt_receiver.recv().await.ok();
574        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
575    };
576
577    if folder_config == Config::ConfiguredInboxFolder {
578        let mvbox;
579        let syncbox = match ctx.should_move_sync_msgs().await? {
580            false => &watch_folder,
581            true => {
582                mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
583                mvbox.as_deref().unwrap_or(&watch_folder)
584            }
585        };
586        session
587            .send_sync_msgs(ctx, syncbox)
588            .await
589            .context("fetch_idle: send_sync_msgs")
590            .log_err(ctx)
591            .ok();
592
593        session
594            .store_seen_flags_on_imap(ctx)
595            .await
596            .context("store_seen_flags_on_imap")?;
597    }
598
599    if !ctx.should_delete_to_trash().await?
600        || ctx
601            .get_config(Config::ConfiguredTrashFolder)
602            .await?
603            .is_some()
604    {
605        // Fetch the watched folder.
606        connection
607            .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
608            .await
609            .context("fetch_move_delete")?;
610
611        // Mark expired messages for deletion. Marked messages will be deleted from the server
612        // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
613        // called right before `fetch_move_delete` because it is not well optimized and would
614        // otherwise slow down message fetching.
615        delete_expired_imap_messages(ctx)
616            .await
617            .context("delete_expired_imap_messages")?;
618    } else if folder_config == Config::ConfiguredInboxFolder {
619        ctx.last_full_folder_scan.lock().await.take();
620    }
621
622    // Scan additional folders only after finishing fetching the watched folder.
623    //
624    // On iOS the application has strictly limited time to work in background, so we may not
625    // be able to scan all folders before time is up if there are many of them.
626    if folder_config == Config::ConfiguredInboxFolder {
627        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
628        match connection
629            .scan_folders(ctx, &mut session)
630            .await
631            .context("scan_folders")
632        {
633            Err(err) => {
634                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
635                // but maybe just one folder can't be selected or something
636                warn!(ctx, "{:#}", err);
637            }
638            Ok(true) => {
639                // Fetch the watched folder again in case scanning other folder moved messages
640                // there.
641                //
642                // In most cases this will select the watched folder and return because there are
643                // no new messages. We want to select the watched folder anyway before going IDLE
644                // there, so this does not take additional protocol round-trip.
645                connection
646                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
647                    .await
648                    .context("fetch_move_delete after scan_folders")?;
649            }
650            Ok(false) => {}
651        }
652    }
653
654    // Synchronize Seen flags.
655    session
656        .sync_seen_flags(ctx, &watch_folder)
657        .await
658        .context("sync_seen_flags")
659        .log_err(ctx)
660        .ok();
661
662    connection.connectivity.set_idle(ctx).await;
663
664    ctx.emit_event(EventType::ImapInboxIdle);
665
666    if !session.can_idle() {
667        info!(
668            ctx,
669            "IMAP session does not support IDLE, going to fake idle."
670        );
671        connection.fake_idle(ctx, watch_folder).await?;
672        return Ok(session);
673    }
674
675    if ctx
676        .get_config_bool(Config::DisableIdle)
677        .await
678        .context("Failed to get disable_idle config")
679        .log_err(ctx)
680        .unwrap_or_default()
681    {
682        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
683        connection.fake_idle(ctx, watch_folder).await?;
684        return Ok(session);
685    }
686
687    info!(
688        ctx,
689        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
690    );
691    let session = session
692        .idle(
693            ctx,
694            connection.idle_interrupt_receiver.clone(),
695            &watch_folder,
696        )
697        .await
698        .context("idle")?;
699
700    Ok(session)
701}
702
703async fn simple_imap_loop(
704    ctx: Context,
705    started: oneshot::Sender<()>,
706    inbox_handlers: ImapConnectionHandlers,
707    folder_meaning: FolderMeaning,
708) {
709    use futures::future::FutureExt;
710
711    info!(ctx, "Starting simple loop for {folder_meaning}.");
712    let ImapConnectionHandlers {
713        mut connection,
714        stop_token,
715    } = inbox_handlers;
716
717    let ctx1 = ctx.clone();
718
719    let fut = async move {
720        let ctx = ctx1;
721        if let Err(()) = started.send(()) {
722            warn!(
723                ctx,
724                "Simple imap loop for {folder_meaning}, missing started receiver."
725            );
726            return;
727        }
728
729        let mut old_session: Option<Session> = None;
730        loop {
731            let session = if let Some(session) = old_session.take() {
732                session
733            } else {
734                info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
735                match connection.prepare(&ctx).await {
736                    Err(err) => {
737                        warn!(
738                            ctx,
739                            "Failed to prepare {folder_meaning} connection: {err:#}."
740                        );
741                        continue;
742                    }
743                    Ok(session) => session,
744                }
745            };
746
747            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
748                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
749                Ok(session) => {
750                    info!(
751                        ctx,
752                        "IMAP loop iteration for {folder_meaning} finished, keeping the session"
753                    );
754                    old_session = Some(session);
755                }
756            }
757        }
758    };
759
760    stop_token
761        .cancelled()
762        .map(|_| {
763            info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
764        })
765        .race(fut)
766        .await;
767}
768
769async fn smtp_loop(
770    ctx: Context,
771    started: oneshot::Sender<()>,
772    smtp_handlers: SmtpConnectionHandlers,
773) {
774    use futures::future::FutureExt;
775
776    info!(ctx, "Starting SMTP loop.");
777    let SmtpConnectionHandlers {
778        mut connection,
779        stop_token,
780        idle_interrupt_receiver,
781    } = smtp_handlers;
782
783    let ctx1 = ctx.clone();
784    let fut = async move {
785        let ctx = ctx1;
786        if let Err(()) = started.send(()) {
787            warn!(&ctx, "SMTP loop, missing started receiver.");
788            return;
789        }
790
791        let mut timeout = None;
792        loop {
793            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
794                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
795                timeout = Some(timeout.unwrap_or(30));
796            } else {
797                timeout = None;
798                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
799                if !duration_until_can_send.is_zero() {
800                    info!(
801                        ctx,
802                        "smtp got rate limited, waiting for {} until can send again",
803                        duration_to_str(duration_until_can_send)
804                    );
805                    tokio::time::sleep(duration_until_can_send).await;
806                    continue;
807                }
808            }
809
810            // Fake Idle
811            info!(ctx, "SMTP fake idle started.");
812            match &connection.last_send_error {
813                None => connection.connectivity.set_idle(&ctx).await,
814                Some(err) => connection.connectivity.set_err(&ctx, err).await,
815            }
816
817            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
818            // sending is retried (at the latest) after the timeout. If sending fails
819            // again, we increase the timeout exponentially, in order not to do lots of
820            // unnecessary retries.
821            if let Some(t) = timeout {
822                let now = tools::Time::now();
823                info!(
824                    ctx,
825                    "SMTP has messages to retry, planning to retry {t} seconds later."
826                );
827                let duration = std::time::Duration::from_secs(t);
828                tokio::time::timeout(duration, async {
829                    idle_interrupt_receiver.recv().await.unwrap_or_default()
830                })
831                .await
832                .unwrap_or_default();
833                let slept = time_elapsed(&now).as_secs();
834                timeout = Some(cmp::max(
835                    t,
836                    slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
837                ));
838            } else {
839                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
840                idle_interrupt_receiver.recv().await.unwrap_or_default();
841            };
842
843            info!(ctx, "SMTP fake idle interrupted.")
844        }
845    };
846
847    stop_token
848        .cancelled()
849        .map(|_| {
850            info!(ctx, "Shutting down SMTP loop.");
851        })
852        .race(fut)
853        .await;
854}
855
856impl Scheduler {
857    /// Start the scheduler.
858    pub async fn start(ctx: &Context) -> Result<Self> {
859        let (smtp, smtp_handlers) = SmtpConnectionState::new();
860
861        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
862        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
863        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
864
865        let mut oboxes = Vec::new();
866        let mut start_recvs = Vec::new();
867
868        let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
869        let (inbox_start_send, inbox_start_recv) = oneshot::channel();
870        let handle = {
871            let ctx = ctx.clone();
872            task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
873        };
874        let inbox = SchedBox {
875            meaning: FolderMeaning::Inbox,
876            conn_state,
877            handle,
878        };
879        start_recvs.push(inbox_start_recv);
880
881        for (meaning, should_watch) in [
882            (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
883            (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
884        ] {
885            if should_watch? {
886                let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
887                let (start_send, start_recv) = oneshot::channel();
888                let ctx = ctx.clone();
889                let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
890                oboxes.push(SchedBox {
891                    meaning,
892                    conn_state,
893                    handle,
894                });
895                start_recvs.push(start_recv);
896            }
897        }
898
899        let smtp_handle = {
900            let ctx = ctx.clone();
901            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
902        };
903        start_recvs.push(smtp_start_recv);
904
905        let ephemeral_handle = {
906            let ctx = ctx.clone();
907            task::spawn(async move {
908                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
909            })
910        };
911
912        let location_handle = {
913            let ctx = ctx.clone();
914            task::spawn(async move {
915                location::location_loop(&ctx, location_interrupt_recv).await;
916            })
917        };
918
919        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
920
921        let res = Self {
922            inbox,
923            oboxes,
924            smtp,
925            smtp_handle,
926            ephemeral_handle,
927            ephemeral_interrupt_send,
928            location_handle,
929            location_interrupt_send,
930            recently_seen_loop,
931        };
932
933        // wait for all loops to be started
934        if let Err(err) = try_join_all(start_recvs).await {
935            bail!("failed to start scheduler: {}", err);
936        }
937
938        info!(ctx, "scheduler is running");
939        Ok(res)
940    }
941
942    fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
943        once(&self.inbox).chain(self.oboxes.iter())
944    }
945
946    fn maybe_network(&self) {
947        for b in self.boxes() {
948            b.conn_state.interrupt();
949        }
950        self.interrupt_smtp();
951    }
952
953    fn maybe_network_lost(&self) {
954        for b in self.boxes() {
955            b.conn_state.interrupt();
956        }
957        self.interrupt_smtp();
958    }
959
960    fn interrupt_inbox(&self) {
961        self.inbox.conn_state.interrupt();
962    }
963
964    fn interrupt_oboxes(&self) {
965        for b in &self.oboxes {
966            b.conn_state.interrupt();
967        }
968    }
969
970    fn interrupt_smtp(&self) {
971        self.smtp.interrupt();
972    }
973
974    fn interrupt_ephemeral_task(&self) {
975        self.ephemeral_interrupt_send.try_send(()).ok();
976    }
977
978    fn interrupt_location(&self) {
979        self.location_interrupt_send.try_send(()).ok();
980    }
981
982    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
983        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
984    }
985
986    /// Halt the scheduler.
987    ///
988    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
989    /// are forcefully terminated if they cannot shutdown within the timeout.
990    pub(crate) async fn stop(self, context: &Context) {
991        // Send stop signals to tasks so they can shutdown cleanly.
992        for b in self.boxes() {
993            b.conn_state.stop();
994        }
995        self.smtp.stop();
996
997        // Actually shutdown tasks.
998        let timeout_duration = std::time::Duration::from_secs(30);
999
1000        let tracker = TaskTracker::new();
1001        for b in once(self.inbox).chain(self.oboxes) {
1002            let context = context.clone();
1003            tracker.spawn(async move {
1004                tokio::time::timeout(timeout_duration, b.handle)
1005                    .await
1006                    .log_err(&context)
1007            });
1008        }
1009        {
1010            let context = context.clone();
1011            tracker.spawn(async move {
1012                tokio::time::timeout(timeout_duration, self.smtp_handle)
1013                    .await
1014                    .log_err(&context)
1015            });
1016        }
1017        tracker.close();
1018        tracker.wait().await;
1019
1020        // Abort tasks, then await them to ensure the `Future` is dropped.
1021        // Just aborting the task may keep resources such as `Context` clone
1022        // moved into it indefinitely, resulting in database not being
1023        // closed etc.
1024        self.ephemeral_handle.abort();
1025        self.ephemeral_handle.await.ok();
1026        self.location_handle.abort();
1027        self.location_handle.await.ok();
1028        self.recently_seen_loop.abort().await;
1029    }
1030}
1031
1032/// Connection state logic shared between imap and smtp connections.
1033#[derive(Debug)]
1034struct ConnectionState {
1035    /// Cancellation token to interrupt the whole connection.
1036    stop_token: CancellationToken,
1037    /// Channel to interrupt idle.
1038    idle_interrupt_sender: Sender<()>,
1039    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
1040    connectivity: ConnectivityStore,
1041}
1042
1043impl ConnectionState {
1044    /// Shutdown this connection completely.
1045    fn stop(&self) {
1046        // Trigger shutdown of the run loop.
1047        self.stop_token.cancel();
1048    }
1049
1050    fn interrupt(&self) {
1051        // Use try_send to avoid blocking on interrupts.
1052        self.idle_interrupt_sender.try_send(()).ok();
1053    }
1054}
1055
1056#[derive(Debug)]
1057pub(crate) struct SmtpConnectionState {
1058    state: ConnectionState,
1059}
1060
1061impl SmtpConnectionState {
1062    fn new() -> (Self, SmtpConnectionHandlers) {
1063        let stop_token = CancellationToken::new();
1064        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1065
1066        let handlers = SmtpConnectionHandlers {
1067            connection: Smtp::new(),
1068            stop_token: stop_token.clone(),
1069            idle_interrupt_receiver,
1070        };
1071
1072        let state = ConnectionState {
1073            stop_token,
1074            idle_interrupt_sender,
1075            connectivity: handlers.connection.connectivity.clone(),
1076        };
1077
1078        let conn = SmtpConnectionState { state };
1079
1080        (conn, handlers)
1081    }
1082
1083    /// Interrupt any form of idle.
1084    fn interrupt(&self) {
1085        self.state.interrupt();
1086    }
1087
1088    /// Shutdown this connection completely.
1089    fn stop(&self) {
1090        self.state.stop();
1091    }
1092}
1093
1094struct SmtpConnectionHandlers {
1095    connection: Smtp,
1096    stop_token: CancellationToken,
1097    idle_interrupt_receiver: Receiver<()>,
1098}
1099
1100#[derive(Debug)]
1101pub(crate) struct ImapConnectionState {
1102    state: ConnectionState,
1103}
1104
1105impl ImapConnectionState {
1106    /// Construct a new connection.
1107    async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1108        let stop_token = CancellationToken::new();
1109        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1110
1111        let handlers = ImapConnectionHandlers {
1112            connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1113            stop_token: stop_token.clone(),
1114        };
1115
1116        let state = ConnectionState {
1117            stop_token,
1118            idle_interrupt_sender,
1119            connectivity: handlers.connection.connectivity.clone(),
1120        };
1121
1122        let conn = ImapConnectionState { state };
1123
1124        Ok((conn, handlers))
1125    }
1126
1127    /// Interrupt any form of idle.
1128    fn interrupt(&self) {
1129        self.state.interrupt();
1130    }
1131
1132    /// Shutdown this connection completely.
1133    fn stop(&self) {
1134        self.state.stop();
1135    }
1136}
1137
1138#[derive(Debug)]
1139struct ImapConnectionHandlers {
1140    connection: Imap,
1141    stop_token: CancellationToken,
1142}