deltachat/
scheduler.rs

1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{Context as _, Error, Result, bail};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{RwLock, RwLockWriteGuard, oneshot};
12use tokio::task;
13use tokio_util::sync::CancellationToken;
14use tokio_util::task::TaskTracker;
15
16use self::connectivity::ConnectivityStore;
17use crate::config::{self, Config};
18use crate::constants;
19use crate::contact::{ContactId, RecentlySeenLoop};
20use crate::context::Context;
21use crate::download::{DownloadState, download_msg};
22use crate::ephemeral::{self, delete_expired_imap_messages};
23use crate::events::EventType;
24use crate::imap::{FolderMeaning, Imap, session::Session};
25use crate::location;
26use crate::log::{LogExt, error, info, warn};
27use crate::message::MsgId;
28use crate::smtp::{Smtp, send_smtp_messages};
29use crate::sql;
30use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
31
32pub(crate) mod connectivity;
33
34/// State of the IO scheduler, as stored on the [`Context`].
35///
36/// The IO scheduler can be stopped or started, but core can also pause it.  After pausing
37/// the IO scheduler will be restarted only if it was running before paused or
38/// [`Context::start_io`] was called in the meantime while it was paused.
39#[derive(Debug, Default)]
40pub(crate) struct SchedulerState {
41    inner: RwLock<InnerSchedulerState>,
42}
43
44impl SchedulerState {
45    pub(crate) fn new() -> Self {
46        Default::default()
47    }
48
49    /// Whether the scheduler is currently running.
50    pub(crate) async fn is_running(&self) -> bool {
51        let inner = self.inner.read().await;
52        matches!(*inner, InnerSchedulerState::Started(_))
53    }
54
55    /// Starts the scheduler if it is not yet started.
56    pub(crate) async fn start(&self, context: Context) {
57        let mut inner = self.inner.write().await;
58        match *inner {
59            InnerSchedulerState::Started(_) => (),
60            InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
61            InnerSchedulerState::Paused {
62                ref mut started, ..
63            } => *started = true,
64        }
65    }
66
67    /// Starts the scheduler if it is not yet started.
68    async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
69        info!(context, "starting IO");
70
71        // Notify message processing loop
72        // to allow processing old messages after restart.
73        context.new_msgs_notify.notify_one();
74
75        let ctx = context.clone();
76        match Scheduler::start(&context).await {
77            Ok(scheduler) => {
78                *inner = InnerSchedulerState::Started(scheduler);
79                context.emit_event(EventType::ConnectivityChanged);
80            }
81            Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
82        }
83    }
84
85    /// Stops the scheduler if it is currently running.
86    pub(crate) async fn stop(&self, context: &Context) {
87        let mut inner = self.inner.write().await;
88        match *inner {
89            InnerSchedulerState::Started(_) => {
90                Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
91            }
92            InnerSchedulerState::Stopped => (),
93            InnerSchedulerState::Paused {
94                ref mut started, ..
95            } => *started = false,
96        }
97    }
98
99    /// Stops the scheduler if it is currently running.
100    async fn do_stop(
101        mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
102        context: &Context,
103        new_state: InnerSchedulerState,
104    ) {
105        // Sending an event wakes up event pollers (get_next_event)
106        // so the caller of stop_io() can arrange for proper termination.
107        // For this, the caller needs to instruct the event poller
108        // to terminate on receiving the next event and then call stop_io()
109        // which will emit the below event(s)
110        info!(context, "stopping IO");
111
112        // Wake up message processing loop even if there are no messages
113        // to allow for clean shutdown.
114        context.new_msgs_notify.notify_one();
115
116        let debug_logging = context
117            .debug_logging
118            .write()
119            .expect("RwLock is poisoned")
120            .take();
121        if let Some(debug_logging) = debug_logging {
122            debug_logging.loop_handle.abort();
123            debug_logging.loop_handle.await.ok();
124        }
125        let prev_state = std::mem::replace(&mut *inner, new_state);
126        context.emit_event(EventType::ConnectivityChanged);
127        match prev_state {
128            InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
129            InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
130        }
131    }
132
133    /// Pauses the IO scheduler.
134    ///
135    /// If it is currently running the scheduler will be stopped.  When the
136    /// [`IoPausedGuard`] is dropped the scheduler is started again.
137    ///
138    /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
139    /// resume will do the right thing and restore the scheduler to the state requested by
140    /// the last call.
141    pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
142        {
143            let mut inner = self.inner.write().await;
144            match *inner {
145                InnerSchedulerState::Started(_) => {
146                    let new_state = InnerSchedulerState::Paused {
147                        started: true,
148                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
149                    };
150                    Self::do_stop(inner, &context, new_state).await;
151                }
152                InnerSchedulerState::Stopped => {
153                    *inner = InnerSchedulerState::Paused {
154                        started: false,
155                        pause_guards_count: NonZeroUsize::new(1).unwrap(),
156                    };
157                }
158                InnerSchedulerState::Paused {
159                    ref mut pause_guards_count,
160                    ..
161                } => {
162                    *pause_guards_count = pause_guards_count
163                        .checked_add(1)
164                        .ok_or_else(|| Error::msg("Too many pause guards active"))?
165                }
166            }
167        }
168
169        let (tx, rx) = oneshot::channel();
170        tokio::spawn(async move {
171            rx.await.ok();
172            let mut inner = context.scheduler.inner.write().await;
173            match *inner {
174                InnerSchedulerState::Started(_) => {
175                    warn!(&context, "IoPausedGuard resume: started instead of paused");
176                }
177                InnerSchedulerState::Stopped => {
178                    warn!(&context, "IoPausedGuard resume: stopped instead of paused");
179                }
180                InnerSchedulerState::Paused {
181                    ref started,
182                    ref mut pause_guards_count,
183                } => {
184                    if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
185                        match *started {
186                            true => SchedulerState::do_start(inner, context.clone()).await,
187                            false => *inner = InnerSchedulerState::Stopped,
188                        }
189                    } else {
190                        let new_count = pause_guards_count.get() - 1;
191                        // SAFETY: Value was >=2 before due to if condition
192                        *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
193                    }
194                }
195            }
196        });
197        Ok(IoPausedGuard { sender: Some(tx) })
198    }
199
200    /// Restarts the scheduler, only if it is running.
201    pub(crate) async fn restart(&self, context: &Context) {
202        info!(context, "restarting IO");
203        if self.is_running().await {
204            self.stop(context).await;
205            self.start(context.clone()).await;
206        }
207    }
208
209    /// Indicate that the network likely has come back.
210    pub(crate) async fn maybe_network(&self) {
211        let inner = self.inner.read().await;
212        let (inbox, oboxes) = match *inner {
213            InnerSchedulerState::Started(ref scheduler) => {
214                scheduler.maybe_network();
215                let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
216                let oboxes = scheduler
217                    .oboxes
218                    .iter()
219                    .map(|b| b.conn_state.state.connectivity.clone())
220                    .collect::<Vec<_>>();
221                (inbox, oboxes)
222            }
223            _ => return,
224        };
225        drop(inner);
226        connectivity::idle_interrupted(inbox, oboxes).await;
227    }
228
229    /// Indicate that the network likely is lost.
230    pub(crate) async fn maybe_network_lost(&self, context: &Context) {
231        let inner = self.inner.read().await;
232        let stores = match *inner {
233            InnerSchedulerState::Started(ref scheduler) => {
234                scheduler.maybe_network_lost();
235                scheduler
236                    .boxes()
237                    .map(|b| b.conn_state.state.connectivity.clone())
238                    .collect()
239            }
240            _ => return,
241        };
242        drop(inner);
243        connectivity::maybe_network_lost(context, stores).await;
244    }
245
246    pub(crate) async fn interrupt_inbox(&self) {
247        let inner = self.inner.read().await;
248        if let InnerSchedulerState::Started(ref scheduler) = *inner {
249            scheduler.interrupt_inbox();
250        }
251    }
252
253    /// Interrupt optional boxes (mvbox, sentbox) loops.
254    pub(crate) async fn interrupt_oboxes(&self) {
255        let inner = self.inner.read().await;
256        if let InnerSchedulerState::Started(ref scheduler) = *inner {
257            scheduler.interrupt_oboxes();
258        }
259    }
260
261    pub(crate) async fn interrupt_smtp(&self) {
262        let inner = self.inner.read().await;
263        if let InnerSchedulerState::Started(ref scheduler) = *inner {
264            scheduler.interrupt_smtp();
265        }
266    }
267
268    pub(crate) async fn interrupt_ephemeral_task(&self) {
269        let inner = self.inner.read().await;
270        if let InnerSchedulerState::Started(ref scheduler) = *inner {
271            scheduler.interrupt_ephemeral_task();
272        }
273    }
274
275    pub(crate) async fn interrupt_location(&self) {
276        let inner = self.inner.read().await;
277        if let InnerSchedulerState::Started(ref scheduler) = *inner {
278            scheduler.interrupt_location();
279        }
280    }
281
282    pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
283        let inner = self.inner.read().await;
284        if let InnerSchedulerState::Started(ref scheduler) = *inner {
285            scheduler.interrupt_recently_seen(contact_id, timestamp);
286        }
287    }
288}
289
290#[derive(Debug, Default)]
291enum InnerSchedulerState {
292    Started(Scheduler),
293    #[default]
294    Stopped,
295    Paused {
296        started: bool,
297        pause_guards_count: NonZeroUsize,
298    },
299}
300
301/// Guard to make sure the IO Scheduler is resumed.
302///
303/// Returned by [`SchedulerState::pause`].  To resume the IO scheduler simply drop this
304/// guard.
305#[derive(Default, Debug)]
306pub(crate) struct IoPausedGuard {
307    sender: Option<oneshot::Sender<()>>,
308}
309
310impl Drop for IoPausedGuard {
311    fn drop(&mut self) {
312        if let Some(sender) = self.sender.take() {
313            // Can only fail if receiver is dropped, but then we're already resumed.
314            sender.send(()).ok();
315        }
316    }
317}
318
319#[derive(Debug)]
320struct SchedBox {
321    meaning: FolderMeaning,
322    conn_state: ImapConnectionState,
323
324    /// IMAP loop task handle.
325    handle: task::JoinHandle<()>,
326}
327
328/// Job and connection scheduler.
329#[derive(Debug)]
330pub(crate) struct Scheduler {
331    inbox: SchedBox,
332    /// Optional boxes -- mvbox, sentbox.
333    oboxes: Vec<SchedBox>,
334    smtp: SmtpConnectionState,
335    smtp_handle: task::JoinHandle<()>,
336    ephemeral_handle: task::JoinHandle<()>,
337    ephemeral_interrupt_send: Sender<()>,
338    location_handle: task::JoinHandle<()>,
339    location_interrupt_send: Sender<()>,
340
341    recently_seen_loop: RecentlySeenLoop,
342}
343
344async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
345    let msg_ids = context
346        .sql
347        .query_map(
348            "SELECT msg_id FROM download",
349            (),
350            |row| {
351                let msg_id: MsgId = row.get(0)?;
352                Ok(msg_id)
353            },
354            |rowids| {
355                rowids
356                    .collect::<std::result::Result<Vec<_>, _>>()
357                    .map_err(Into::into)
358            },
359        )
360        .await?;
361
362    for msg_id in msg_ids {
363        if let Err(err) = download_msg(context, msg_id, session).await {
364            warn!(context, "Failed to download message {msg_id}: {:#}.", err);
365
366            // Update download state to failure
367            // so it can be retried.
368            //
369            // On success update_download_state() is not needed
370            // as receive_imf() already
371            // set the state and emitted the event.
372            msg_id
373                .update_download_state(context, DownloadState::Failure)
374                .await?;
375        }
376        context
377            .sql
378            .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
379            .await?;
380    }
381
382    Ok(())
383}
384
385async fn inbox_loop(
386    ctx: Context,
387    started: oneshot::Sender<()>,
388    inbox_handlers: ImapConnectionHandlers,
389) {
390    use futures::future::FutureExt;
391
392    info!(ctx, "Starting inbox loop.");
393    let ImapConnectionHandlers {
394        mut connection,
395        stop_token,
396    } = inbox_handlers;
397
398    let ctx1 = ctx.clone();
399    let fut = async move {
400        let ctx = ctx1;
401        if let Err(()) = started.send(()) {
402            warn!(ctx, "Inbox loop, missing started receiver.");
403            return;
404        };
405
406        let mut old_session: Option<Session> = None;
407        loop {
408            let session = if let Some(session) = old_session.take() {
409                session
410            } else {
411                info!(ctx, "Preparing new IMAP session for inbox.");
412                match connection.prepare(&ctx).await {
413                    Err(err) => {
414                        warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
415                        continue;
416                    }
417                    Ok(session) => session,
418                }
419            };
420
421            match inbox_fetch_idle(&ctx, &mut connection, session).await {
422                Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
423                Ok(session) => {
424                    info!(
425                        ctx,
426                        "IMAP loop iteration for inbox finished, keeping the session."
427                    );
428                    old_session = Some(session);
429                }
430            }
431        }
432    };
433
434    stop_token
435        .cancelled()
436        .map(|_| {
437            info!(ctx, "Shutting down inbox loop.");
438        })
439        .race(fut)
440        .await;
441}
442
443/// Convert folder meaning
444/// used internally by [fetch_idle] and [Context::background_fetch].
445///
446/// Returns folder configuration key and folder name
447/// if such folder is configured, `Ok(None)` otherwise.
448pub async fn convert_folder_meaning(
449    ctx: &Context,
450    folder_meaning: FolderMeaning,
451) -> Result<Option<(Config, String)>> {
452    let folder_config = match folder_meaning.to_config() {
453        Some(c) => c,
454        None => {
455            // Such folder cannot be configured,
456            // e.g. a `FolderMeaning::Spam` folder.
457            return Ok(None);
458        }
459    };
460
461    let folder = ctx
462        .get_config(folder_config)
463        .await
464        .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
465
466    if let Some(watch_folder) = folder {
467        Ok(Some((folder_config, watch_folder)))
468    } else {
469        Ok(None)
470    }
471}
472
473async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
474    if !ctx.get_config_bool(Config::FixIsChatmail).await? {
475        ctx.set_config_internal(
476            Config::IsChatmail,
477            crate::config::from_bool(session.is_chatmail()),
478        )
479        .await?;
480    }
481
482    // Update quota no more than once a minute.
483    if ctx.quota_needs_update(60).await {
484        if let Err(err) = ctx.update_recent_quota(&mut session).await {
485            warn!(ctx, "Failed to update quota: {:#}.", err);
486        }
487    }
488
489    let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
490    if resync_requested {
491        if let Err(err) = session.resync_folders(ctx).await {
492            warn!(ctx, "Failed to resync folders: {:#}.", err);
493            ctx.resync_request.store(true, Ordering::Relaxed);
494        }
495    }
496
497    maybe_add_time_based_warnings(ctx).await;
498
499    match ctx.get_config_i64(Config::LastHousekeeping).await {
500        Ok(last_housekeeping_time) => {
501            let next_housekeeping_time =
502                last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
503            if next_housekeeping_time <= time() {
504                sql::housekeeping(ctx).await.log_err(ctx).ok();
505            }
506        }
507        Err(err) => {
508            warn!(ctx, "Failed to get last housekeeping time: {}", err);
509        }
510    };
511
512    match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
513        Ok(fetched_existing_msgs) => {
514            if !fetched_existing_msgs {
515                // Consider it done even if we fail.
516                //
517                // This operation is not critical enough to retry,
518                // especially if the error is persistent.
519                if let Err(err) = ctx
520                    .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
521                    .await
522                {
523                    warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
524                }
525
526                if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
527                    warn!(ctx, "Failed to fetch existing messages: {:#}", err);
528                }
529            }
530        }
531        Err(err) => {
532            warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
533        }
534    }
535
536    download_msgs(ctx, &mut session)
537        .await
538        .context("Failed to download messages")?;
539    session
540        .fetch_metadata(ctx)
541        .await
542        .context("Failed to fetch metadata")?;
543    session
544        .register_token(ctx)
545        .await
546        .context("Failed to register push token")?;
547
548    let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
549    Ok(session)
550}
551
552/// Implement a single iteration of IMAP loop.
553///
554/// This function performs all IMAP operations on a single folder, selecting it if necessary and
555/// handling all the errors. In case of an error, an error is returned and connection is dropped,
556/// otherwise connection is returned.
557async fn fetch_idle(
558    ctx: &Context,
559    connection: &mut Imap,
560    mut session: Session,
561    folder_meaning: FolderMeaning,
562) -> Result<Session> {
563    let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
564    else {
565        // The folder is not configured.
566        // For example, this happens if the server does not have Sent folder
567        // but watching Sent folder is enabled.
568        connection.connectivity.set_not_configured(ctx).await;
569        connection.idle_interrupt_receiver.recv().await.ok();
570        bail!("Cannot fetch folder {folder_meaning} because it is not configured");
571    };
572
573    if folder_config == Config::ConfiguredInboxFolder {
574        let mvbox;
575        let syncbox = match ctx.should_move_sync_msgs().await? {
576            false => &watch_folder,
577            true => {
578                mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
579                mvbox.as_deref().unwrap_or(&watch_folder)
580            }
581        };
582        session
583            .send_sync_msgs(ctx, syncbox)
584            .await
585            .context("fetch_idle: send_sync_msgs")
586            .log_err(ctx)
587            .ok();
588
589        session
590            .store_seen_flags_on_imap(ctx)
591            .await
592            .context("store_seen_flags_on_imap")?;
593    }
594
595    if !ctx.should_delete_to_trash().await?
596        || ctx
597            .get_config(Config::ConfiguredTrashFolder)
598            .await?
599            .is_some()
600    {
601        // Fetch the watched folder.
602        connection
603            .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
604            .await
605            .context("fetch_move_delete")?;
606
607        // Mark expired messages for deletion. Marked messages will be deleted from the server
608        // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not
609        // called right before `fetch_move_delete` because it is not well optimized and would
610        // otherwise slow down message fetching.
611        delete_expired_imap_messages(ctx)
612            .await
613            .context("delete_expired_imap_messages")?;
614    } else if folder_config == Config::ConfiguredInboxFolder {
615        ctx.last_full_folder_scan.lock().await.take();
616    }
617
618    // Scan additional folders only after finishing fetching the watched folder.
619    //
620    // On iOS the application has strictly limited time to work in background, so we may not
621    // be able to scan all folders before time is up if there are many of them.
622    if folder_config == Config::ConfiguredInboxFolder {
623        // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages
624        match connection
625            .scan_folders(ctx, &mut session)
626            .await
627            .context("scan_folders")
628        {
629            Err(err) => {
630                // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing
631                // but maybe just one folder can't be selected or something
632                warn!(ctx, "{:#}", err);
633            }
634            Ok(true) => {
635                // Fetch the watched folder again in case scanning other folder moved messages
636                // there.
637                //
638                // In most cases this will select the watched folder and return because there are
639                // no new messages. We want to select the watched folder anyway before going IDLE
640                // there, so this does not take additional protocol round-trip.
641                connection
642                    .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
643                    .await
644                    .context("fetch_move_delete after scan_folders")?;
645            }
646            Ok(false) => {}
647        }
648    }
649
650    // Synchronize Seen flags.
651    session
652        .sync_seen_flags(ctx, &watch_folder)
653        .await
654        .context("sync_seen_flags")
655        .log_err(ctx)
656        .ok();
657
658    connection.connectivity.set_idle(ctx).await;
659
660    ctx.emit_event(EventType::ImapInboxIdle);
661
662    if !session.can_idle() {
663        info!(
664            ctx,
665            "IMAP session does not support IDLE, going to fake idle."
666        );
667        connection.fake_idle(ctx, watch_folder).await?;
668        return Ok(session);
669    }
670
671    if ctx
672        .get_config_bool(Config::DisableIdle)
673        .await
674        .context("Failed to get disable_idle config")
675        .log_err(ctx)
676        .unwrap_or_default()
677    {
678        info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
679        connection.fake_idle(ctx, watch_folder).await?;
680        return Ok(session);
681    }
682
683    info!(
684        ctx,
685        "IMAP session in folder {watch_folder:?} supports IDLE, using it."
686    );
687    let session = session
688        .idle(
689            ctx,
690            connection.idle_interrupt_receiver.clone(),
691            &watch_folder,
692        )
693        .await
694        .context("idle")?;
695
696    Ok(session)
697}
698
699async fn simple_imap_loop(
700    ctx: Context,
701    started: oneshot::Sender<()>,
702    inbox_handlers: ImapConnectionHandlers,
703    folder_meaning: FolderMeaning,
704) {
705    use futures::future::FutureExt;
706
707    info!(ctx, "Starting simple loop for {folder_meaning}.");
708    let ImapConnectionHandlers {
709        mut connection,
710        stop_token,
711    } = inbox_handlers;
712
713    let ctx1 = ctx.clone();
714
715    let fut = async move {
716        let ctx = ctx1;
717        if let Err(()) = started.send(()) {
718            warn!(
719                ctx,
720                "Simple imap loop for {folder_meaning}, missing started receiver."
721            );
722            return;
723        }
724
725        let mut old_session: Option<Session> = None;
726        loop {
727            let session = if let Some(session) = old_session.take() {
728                session
729            } else {
730                info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
731                match connection.prepare(&ctx).await {
732                    Err(err) => {
733                        warn!(
734                            ctx,
735                            "Failed to prepare {folder_meaning} connection: {err:#}."
736                        );
737                        continue;
738                    }
739                    Ok(session) => session,
740                }
741            };
742
743            match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
744                Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
745                Ok(session) => {
746                    info!(
747                        ctx,
748                        "IMAP loop iteration for {folder_meaning} finished, keeping the session"
749                    );
750                    old_session = Some(session);
751                }
752            }
753        }
754    };
755
756    stop_token
757        .cancelled()
758        .map(|_| {
759            info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
760        })
761        .race(fut)
762        .await;
763}
764
765async fn smtp_loop(
766    ctx: Context,
767    started: oneshot::Sender<()>,
768    smtp_handlers: SmtpConnectionHandlers,
769) {
770    use futures::future::FutureExt;
771
772    info!(ctx, "Starting SMTP loop.");
773    let SmtpConnectionHandlers {
774        mut connection,
775        stop_token,
776        idle_interrupt_receiver,
777    } = smtp_handlers;
778
779    let ctx1 = ctx.clone();
780    let fut = async move {
781        let ctx = ctx1;
782        if let Err(()) = started.send(()) {
783            warn!(&ctx, "SMTP loop, missing started receiver.");
784            return;
785        }
786
787        let mut timeout = None;
788        loop {
789            if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
790                warn!(ctx, "send_smtp_messages failed: {:#}.", err);
791                timeout = Some(timeout.unwrap_or(30));
792            } else {
793                timeout = None;
794                let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
795                if !duration_until_can_send.is_zero() {
796                    info!(
797                        ctx,
798                        "smtp got rate limited, waiting for {} until can send again",
799                        duration_to_str(duration_until_can_send)
800                    );
801                    tokio::time::sleep(duration_until_can_send).await;
802                    continue;
803                }
804            }
805
806            // Fake Idle
807            info!(ctx, "SMTP fake idle started.");
808            match &connection.last_send_error {
809                None => connection.connectivity.set_idle(&ctx).await,
810                Some(err) => connection.connectivity.set_err(&ctx, err).await,
811            }
812
813            // If send_smtp_messages() failed, we set a timeout for the fake-idle so that
814            // sending is retried (at the latest) after the timeout. If sending fails
815            // again, we increase the timeout exponentially, in order not to do lots of
816            // unnecessary retries.
817            if let Some(t) = timeout {
818                let now = tools::Time::now();
819                info!(
820                    ctx,
821                    "SMTP has messages to retry, planning to retry {t} seconds later."
822                );
823                let duration = std::time::Duration::from_secs(t);
824                tokio::time::timeout(duration, async {
825                    idle_interrupt_receiver.recv().await.unwrap_or_default()
826                })
827                .await
828                .unwrap_or_default();
829                let slept = time_elapsed(&now).as_secs();
830                timeout = Some(cmp::max(
831                    t,
832                    slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
833                ));
834            } else {
835                info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
836                idle_interrupt_receiver.recv().await.unwrap_or_default();
837            };
838
839            info!(ctx, "SMTP fake idle interrupted.")
840        }
841    };
842
843    stop_token
844        .cancelled()
845        .map(|_| {
846            info!(ctx, "Shutting down SMTP loop.");
847        })
848        .race(fut)
849        .await;
850}
851
852impl Scheduler {
853    /// Start the scheduler.
854    pub async fn start(ctx: &Context) -> Result<Self> {
855        let (smtp, smtp_handlers) = SmtpConnectionState::new();
856
857        let (smtp_start_send, smtp_start_recv) = oneshot::channel();
858        let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
859        let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
860
861        let mut oboxes = Vec::new();
862        let mut start_recvs = Vec::new();
863
864        let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
865        let (inbox_start_send, inbox_start_recv) = oneshot::channel();
866        let handle = {
867            let ctx = ctx.clone();
868            task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
869        };
870        let inbox = SchedBox {
871            meaning: FolderMeaning::Inbox,
872            conn_state,
873            handle,
874        };
875        start_recvs.push(inbox_start_recv);
876
877        for (meaning, should_watch) in [
878            (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
879            (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
880        ] {
881            if should_watch? {
882                let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
883                let (start_send, start_recv) = oneshot::channel();
884                let ctx = ctx.clone();
885                let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
886                oboxes.push(SchedBox {
887                    meaning,
888                    conn_state,
889                    handle,
890                });
891                start_recvs.push(start_recv);
892            }
893        }
894
895        let smtp_handle = {
896            let ctx = ctx.clone();
897            task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
898        };
899        start_recvs.push(smtp_start_recv);
900
901        let ephemeral_handle = {
902            let ctx = ctx.clone();
903            task::spawn(async move {
904                ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
905            })
906        };
907
908        let location_handle = {
909            let ctx = ctx.clone();
910            task::spawn(async move {
911                location::location_loop(&ctx, location_interrupt_recv).await;
912            })
913        };
914
915        let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
916
917        let res = Self {
918            inbox,
919            oboxes,
920            smtp,
921            smtp_handle,
922            ephemeral_handle,
923            ephemeral_interrupt_send,
924            location_handle,
925            location_interrupt_send,
926            recently_seen_loop,
927        };
928
929        // wait for all loops to be started
930        if let Err(err) = try_join_all(start_recvs).await {
931            bail!("failed to start scheduler: {}", err);
932        }
933
934        info!(ctx, "scheduler is running");
935        Ok(res)
936    }
937
938    fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
939        once(&self.inbox).chain(self.oboxes.iter())
940    }
941
942    fn maybe_network(&self) {
943        for b in self.boxes() {
944            b.conn_state.interrupt();
945        }
946        self.interrupt_smtp();
947    }
948
949    fn maybe_network_lost(&self) {
950        for b in self.boxes() {
951            b.conn_state.interrupt();
952        }
953        self.interrupt_smtp();
954    }
955
956    fn interrupt_inbox(&self) {
957        self.inbox.conn_state.interrupt();
958    }
959
960    fn interrupt_oboxes(&self) {
961        for b in &self.oboxes {
962            b.conn_state.interrupt();
963        }
964    }
965
966    fn interrupt_smtp(&self) {
967        self.smtp.interrupt();
968    }
969
970    fn interrupt_ephemeral_task(&self) {
971        self.ephemeral_interrupt_send.try_send(()).ok();
972    }
973
974    fn interrupt_location(&self) {
975        self.location_interrupt_send.try_send(()).ok();
976    }
977
978    fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
979        self.recently_seen_loop.try_interrupt(contact_id, timestamp);
980    }
981
982    /// Halt the scheduler.
983    ///
984    /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
985    /// are forcefully terminated if they cannot shutdown within the timeout.
986    pub(crate) async fn stop(self, context: &Context) {
987        // Send stop signals to tasks so they can shutdown cleanly.
988        for b in self.boxes() {
989            b.conn_state.stop();
990        }
991        self.smtp.stop();
992
993        // Actually shutdown tasks.
994        let timeout_duration = std::time::Duration::from_secs(30);
995
996        let tracker = TaskTracker::new();
997        for b in once(self.inbox).chain(self.oboxes) {
998            let context = context.clone();
999            tracker.spawn(async move {
1000                tokio::time::timeout(timeout_duration, b.handle)
1001                    .await
1002                    .log_err(&context)
1003            });
1004        }
1005        {
1006            let context = context.clone();
1007            tracker.spawn(async move {
1008                tokio::time::timeout(timeout_duration, self.smtp_handle)
1009                    .await
1010                    .log_err(&context)
1011            });
1012        }
1013        tracker.close();
1014        tracker.wait().await;
1015
1016        // Abort tasks, then await them to ensure the `Future` is dropped.
1017        // Just aborting the task may keep resources such as `Context` clone
1018        // moved into it indefinitely, resulting in database not being
1019        // closed etc.
1020        self.ephemeral_handle.abort();
1021        self.ephemeral_handle.await.ok();
1022        self.location_handle.abort();
1023        self.location_handle.await.ok();
1024        self.recently_seen_loop.abort().await;
1025    }
1026}
1027
1028/// Connection state logic shared between imap and smtp connections.
1029#[derive(Debug)]
1030struct ConnectionState {
1031    /// Cancellation token to interrupt the whole connection.
1032    stop_token: CancellationToken,
1033    /// Channel to interrupt idle.
1034    idle_interrupt_sender: Sender<()>,
1035    /// Mutex to pass connectivity info between IMAP/SMTP threads and the API
1036    connectivity: ConnectivityStore,
1037}
1038
1039impl ConnectionState {
1040    /// Shutdown this connection completely.
1041    fn stop(&self) {
1042        // Trigger shutdown of the run loop.
1043        self.stop_token.cancel();
1044    }
1045
1046    fn interrupt(&self) {
1047        // Use try_send to avoid blocking on interrupts.
1048        self.idle_interrupt_sender.try_send(()).ok();
1049    }
1050}
1051
1052#[derive(Debug)]
1053pub(crate) struct SmtpConnectionState {
1054    state: ConnectionState,
1055}
1056
1057impl SmtpConnectionState {
1058    fn new() -> (Self, SmtpConnectionHandlers) {
1059        let stop_token = CancellationToken::new();
1060        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1061
1062        let handlers = SmtpConnectionHandlers {
1063            connection: Smtp::new(),
1064            stop_token: stop_token.clone(),
1065            idle_interrupt_receiver,
1066        };
1067
1068        let state = ConnectionState {
1069            stop_token,
1070            idle_interrupt_sender,
1071            connectivity: handlers.connection.connectivity.clone(),
1072        };
1073
1074        let conn = SmtpConnectionState { state };
1075
1076        (conn, handlers)
1077    }
1078
1079    /// Interrupt any form of idle.
1080    fn interrupt(&self) {
1081        self.state.interrupt();
1082    }
1083
1084    /// Shutdown this connection completely.
1085    fn stop(&self) {
1086        self.state.stop();
1087    }
1088}
1089
1090struct SmtpConnectionHandlers {
1091    connection: Smtp,
1092    stop_token: CancellationToken,
1093    idle_interrupt_receiver: Receiver<()>,
1094}
1095
1096#[derive(Debug)]
1097pub(crate) struct ImapConnectionState {
1098    state: ConnectionState,
1099}
1100
1101impl ImapConnectionState {
1102    /// Construct a new connection.
1103    async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1104        let stop_token = CancellationToken::new();
1105        let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1106
1107        let handlers = ImapConnectionHandlers {
1108            connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1109            stop_token: stop_token.clone(),
1110        };
1111
1112        let state = ConnectionState {
1113            stop_token,
1114            idle_interrupt_sender,
1115            connectivity: handlers.connection.connectivity.clone(),
1116        };
1117
1118        let conn = ImapConnectionState { state };
1119
1120        Ok((conn, handlers))
1121    }
1122
1123    /// Interrupt any form of idle.
1124    fn interrupt(&self) {
1125        self.state.interrupt();
1126    }
1127
1128    /// Shutdown this connection completely.
1129    fn stop(&self) {
1130        self.state.stop();
1131    }
1132}
1133
1134#[derive(Debug)]
1135struct ImapConnectionHandlers {
1136    connection: Imap,
1137    stop_token: CancellationToken,
1138}