1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{Context as _, Error, Result, bail};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{RwLock, oneshot};
12use tokio::task;
13use tokio_util::sync::CancellationToken;
14use tokio_util::task::TaskTracker;
15
16pub(crate) use self::connectivity::ConnectivityStore;
17use crate::config::{self, Config};
18use crate::constants;
19use crate::contact::{ContactId, RecentlySeenLoop};
20use crate::context::Context;
21use crate::download::{DownloadState, download_msg};
22use crate::ephemeral::{self, delete_expired_imap_messages};
23use crate::events::EventType;
24use crate::imap::{FolderMeaning, Imap, session::Session};
25use crate::location;
26use crate::log::{LogExt, error, info, warn};
27use crate::message::MsgId;
28use crate::smtp::{Smtp, send_smtp_messages};
29use crate::sql;
30use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
31
32pub(crate) mod connectivity;
33
34#[derive(Debug, Default)]
40pub(crate) struct SchedulerState {
41 inner: RwLock<InnerSchedulerState>,
42}
43
44impl SchedulerState {
45 pub(crate) fn new() -> Self {
46 Default::default()
47 }
48
49 pub(crate) async fn is_running(&self) -> bool {
51 let inner = self.inner.read().await;
52 matches!(*inner, InnerSchedulerState::Started(_))
53 }
54
55 pub(crate) async fn start(&self, context: &Context) {
57 let mut inner = self.inner.write().await;
58 match *inner {
59 InnerSchedulerState::Started(_) => (),
60 InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
61 InnerSchedulerState::Paused {
62 ref mut started, ..
63 } => *started = true,
64 }
65 context.update_connectivities(&inner);
66 }
67
68 async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
70 info!(context, "starting IO");
71
72 context.new_msgs_notify.notify_one();
75
76 match Scheduler::start(context).await {
77 Ok(scheduler) => {
78 *inner = InnerSchedulerState::Started(scheduler);
79 context.emit_event(EventType::ConnectivityChanged);
80 }
81 Err(err) => error!(context, "Failed to start IO: {:#}", err),
82 }
83 }
84
85 pub(crate) async fn stop(&self, context: &Context) {
87 let mut inner = self.inner.write().await;
88 match *inner {
89 InnerSchedulerState::Started(_) => {
90 Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
91 }
92 InnerSchedulerState::Stopped => (),
93 InnerSchedulerState::Paused {
94 ref mut started, ..
95 } => *started = false,
96 }
97 context.update_connectivities(&inner);
98 }
99
100 async fn do_stop(
102 inner: &mut InnerSchedulerState,
103 context: &Context,
104 new_state: InnerSchedulerState,
105 ) {
106 info!(context, "stopping IO");
112
113 context.new_msgs_notify.notify_one();
116
117 let debug_logging = context
118 .debug_logging
119 .write()
120 .expect("RwLock is poisoned")
121 .take();
122 if let Some(debug_logging) = debug_logging {
123 debug_logging.loop_handle.abort();
124 debug_logging.loop_handle.await.ok();
125 }
126 let prev_state = std::mem::replace(inner, new_state);
127 context.emit_event(EventType::ConnectivityChanged);
128 match prev_state {
129 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
130 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
131 }
132 }
133
134 pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
143 {
144 let mut inner = self.inner.write().await;
145 match *inner {
146 InnerSchedulerState::Started(_) => {
147 let new_state = InnerSchedulerState::Paused {
148 started: true,
149 pause_guards_count: NonZeroUsize::new(1).unwrap(),
150 };
151 Self::do_stop(&mut inner, context, new_state).await;
152 }
153 InnerSchedulerState::Stopped => {
154 *inner = InnerSchedulerState::Paused {
155 started: false,
156 pause_guards_count: NonZeroUsize::new(1).unwrap(),
157 };
158 }
159 InnerSchedulerState::Paused {
160 ref mut pause_guards_count,
161 ..
162 } => {
163 *pause_guards_count = pause_guards_count
164 .checked_add(1)
165 .ok_or_else(|| Error::msg("Too many pause guards active"))?
166 }
167 }
168 context.update_connectivities(&inner);
169 }
170
171 let (tx, rx) = oneshot::channel();
172 let context = context.clone();
173 tokio::spawn(async move {
174 rx.await.ok();
175 let mut inner = context.scheduler.inner.write().await;
176 match *inner {
177 InnerSchedulerState::Started(_) => {
178 warn!(&context, "IoPausedGuard resume: started instead of paused");
179 }
180 InnerSchedulerState::Stopped => {
181 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
182 }
183 InnerSchedulerState::Paused {
184 ref started,
185 ref mut pause_guards_count,
186 } => {
187 if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
188 match *started {
189 true => SchedulerState::do_start(&mut inner, &context).await,
190 false => *inner = InnerSchedulerState::Stopped,
191 }
192 } else {
193 let new_count = pause_guards_count.get() - 1;
194 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
196 }
197 }
198 }
199 context.update_connectivities(&inner);
200 });
201 Ok(IoPausedGuard { sender: Some(tx) })
202 }
203
204 pub(crate) async fn restart(&self, context: &Context) {
206 info!(context, "restarting IO");
207 if self.is_running().await {
208 self.stop(context).await;
209 self.start(context).await;
210 }
211 }
212
213 pub(crate) async fn maybe_network(&self) {
215 let inner = self.inner.read().await;
216 let (inbox, oboxes) = match *inner {
217 InnerSchedulerState::Started(ref scheduler) => {
218 scheduler.maybe_network();
219 let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
220 let oboxes = scheduler
221 .oboxes
222 .iter()
223 .map(|b| b.conn_state.state.connectivity.clone())
224 .collect::<Vec<_>>();
225 (inbox, oboxes)
226 }
227 _ => return,
228 };
229 drop(inner);
230 connectivity::idle_interrupted(inbox, oboxes).await;
231 }
232
233 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
235 let inner = self.inner.read().await;
236 let stores = match *inner {
237 InnerSchedulerState::Started(ref scheduler) => {
238 scheduler.maybe_network_lost();
239 scheduler
240 .boxes()
241 .map(|b| b.conn_state.state.connectivity.clone())
242 .collect()
243 }
244 _ => return,
245 };
246 drop(inner);
247 connectivity::maybe_network_lost(context, stores).await;
248 }
249
250 pub(crate) async fn interrupt_inbox(&self) {
251 let inner = self.inner.read().await;
252 if let InnerSchedulerState::Started(ref scheduler) = *inner {
253 scheduler.interrupt_inbox();
254 }
255 }
256
257 pub(crate) async fn interrupt_oboxes(&self) {
259 let inner = self.inner.read().await;
260 if let InnerSchedulerState::Started(ref scheduler) = *inner {
261 scheduler.interrupt_oboxes();
262 }
263 }
264
265 pub(crate) async fn interrupt_smtp(&self) {
266 let inner = self.inner.read().await;
267 if let InnerSchedulerState::Started(ref scheduler) = *inner {
268 scheduler.interrupt_smtp();
269 }
270 }
271
272 pub(crate) async fn interrupt_ephemeral_task(&self) {
273 let inner = self.inner.read().await;
274 if let InnerSchedulerState::Started(ref scheduler) = *inner {
275 scheduler.interrupt_ephemeral_task();
276 }
277 }
278
279 pub(crate) async fn interrupt_location(&self) {
280 let inner = self.inner.read().await;
281 if let InnerSchedulerState::Started(ref scheduler) = *inner {
282 scheduler.interrupt_location();
283 }
284 }
285
286 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
287 let inner = self.inner.read().await;
288 if let InnerSchedulerState::Started(ref scheduler) = *inner {
289 scheduler.interrupt_recently_seen(contact_id, timestamp);
290 }
291 }
292}
293
294#[derive(Debug, Default)]
295pub(crate) enum InnerSchedulerState {
296 Started(Scheduler),
297 #[default]
298 Stopped,
299 Paused {
300 started: bool,
301 pause_guards_count: NonZeroUsize,
302 },
303}
304
305#[derive(Default, Debug)]
310pub(crate) struct IoPausedGuard {
311 sender: Option<oneshot::Sender<()>>,
312}
313
314impl Drop for IoPausedGuard {
315 fn drop(&mut self) {
316 if let Some(sender) = self.sender.take() {
317 sender.send(()).ok();
319 }
320 }
321}
322
323#[derive(Debug)]
324struct SchedBox {
325 meaning: FolderMeaning,
326 conn_state: ImapConnectionState,
327
328 handle: task::JoinHandle<()>,
330}
331
332#[derive(Debug)]
334pub(crate) struct Scheduler {
335 inbox: SchedBox,
336 oboxes: Vec<SchedBox>,
338 smtp: SmtpConnectionState,
339 smtp_handle: task::JoinHandle<()>,
340 ephemeral_handle: task::JoinHandle<()>,
341 ephemeral_interrupt_send: Sender<()>,
342 location_handle: task::JoinHandle<()>,
343 location_interrupt_send: Sender<()>,
344
345 recently_seen_loop: RecentlySeenLoop,
346}
347
348async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
349 let msg_ids = context
350 .sql
351 .query_map(
352 "SELECT msg_id FROM download",
353 (),
354 |row| {
355 let msg_id: MsgId = row.get(0)?;
356 Ok(msg_id)
357 },
358 |rowids| {
359 rowids
360 .collect::<std::result::Result<Vec<_>, _>>()
361 .map_err(Into::into)
362 },
363 )
364 .await?;
365
366 for msg_id in msg_ids {
367 if let Err(err) = download_msg(context, msg_id, session).await {
368 warn!(context, "Failed to download message {msg_id}: {:#}.", err);
369
370 msg_id
377 .update_download_state(context, DownloadState::Failure)
378 .await?;
379 }
380 context
381 .sql
382 .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
383 .await?;
384 }
385
386 Ok(())
387}
388
389async fn inbox_loop(
390 ctx: Context,
391 started: oneshot::Sender<()>,
392 inbox_handlers: ImapConnectionHandlers,
393) {
394 use futures::future::FutureExt;
395
396 info!(ctx, "Starting inbox loop.");
397 let ImapConnectionHandlers {
398 mut connection,
399 stop_token,
400 } = inbox_handlers;
401
402 let ctx1 = ctx.clone();
403 let fut = async move {
404 let ctx = ctx1;
405 if let Err(()) = started.send(()) {
406 warn!(ctx, "Inbox loop, missing started receiver.");
407 return;
408 };
409
410 let mut old_session: Option<Session> = None;
411 loop {
412 let session = if let Some(session) = old_session.take() {
413 session
414 } else {
415 info!(ctx, "Preparing new IMAP session for inbox.");
416 match connection.prepare(&ctx).await {
417 Err(err) => {
418 warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
419 continue;
420 }
421 Ok(session) => session,
422 }
423 };
424
425 match inbox_fetch_idle(&ctx, &mut connection, session).await {
426 Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
427 Ok(session) => {
428 info!(
429 ctx,
430 "IMAP loop iteration for inbox finished, keeping the session."
431 );
432 old_session = Some(session);
433 }
434 }
435 }
436 };
437
438 stop_token
439 .cancelled()
440 .map(|_| {
441 info!(ctx, "Shutting down inbox loop.");
442 })
443 .race(fut)
444 .await;
445}
446
447pub async fn convert_folder_meaning(
453 ctx: &Context,
454 folder_meaning: FolderMeaning,
455) -> Result<Option<(Config, String)>> {
456 let folder_config = match folder_meaning.to_config() {
457 Some(c) => c,
458 None => {
459 return Ok(None);
462 }
463 };
464
465 let folder = ctx
466 .get_config(folder_config)
467 .await
468 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
469
470 if let Some(watch_folder) = folder {
471 Ok(Some((folder_config, watch_folder)))
472 } else {
473 Ok(None)
474 }
475}
476
477async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
478 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
479 ctx.set_config_internal(
480 Config::IsChatmail,
481 crate::config::from_bool(session.is_chatmail()),
482 )
483 .await?;
484 }
485
486 if ctx.quota_needs_update(60).await {
488 if let Err(err) = ctx.update_recent_quota(&mut session).await {
489 warn!(ctx, "Failed to update quota: {:#}.", err);
490 }
491 }
492
493 let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
494 if resync_requested {
495 if let Err(err) = session.resync_folders(ctx).await {
496 warn!(ctx, "Failed to resync folders: {:#}.", err);
497 ctx.resync_request.store(true, Ordering::Relaxed);
498 }
499 }
500
501 maybe_add_time_based_warnings(ctx).await;
502
503 match ctx.get_config_i64(Config::LastHousekeeping).await {
504 Ok(last_housekeeping_time) => {
505 let next_housekeeping_time =
506 last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
507 if next_housekeeping_time <= time() {
508 sql::housekeeping(ctx).await.log_err(ctx).ok();
509 }
510 }
511 Err(err) => {
512 warn!(ctx, "Failed to get last housekeeping time: {}", err);
513 }
514 };
515
516 match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
517 Ok(fetched_existing_msgs) => {
518 if !fetched_existing_msgs {
519 if let Err(err) = ctx
524 .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
525 .await
526 {
527 warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
528 }
529
530 if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
531 warn!(ctx, "Failed to fetch existing messages: {:#}", err);
532 }
533 }
534 }
535 Err(err) => {
536 warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
537 }
538 }
539
540 download_msgs(ctx, &mut session)
541 .await
542 .context("Failed to download messages")?;
543 session
544 .fetch_metadata(ctx)
545 .await
546 .context("Failed to fetch metadata")?;
547 session
548 .register_token(ctx)
549 .await
550 .context("Failed to register push token")?;
551
552 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
553 Ok(session)
554}
555
556async fn fetch_idle(
562 ctx: &Context,
563 connection: &mut Imap,
564 mut session: Session,
565 folder_meaning: FolderMeaning,
566) -> Result<Session> {
567 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
568 else {
569 connection.connectivity.set_not_configured(ctx).await;
573 connection.idle_interrupt_receiver.recv().await.ok();
574 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
575 };
576
577 if folder_config == Config::ConfiguredInboxFolder {
578 let mvbox;
579 let syncbox = match ctx.should_move_sync_msgs().await? {
580 false => &watch_folder,
581 true => {
582 mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
583 mvbox.as_deref().unwrap_or(&watch_folder)
584 }
585 };
586 session
587 .send_sync_msgs(ctx, syncbox)
588 .await
589 .context("fetch_idle: send_sync_msgs")
590 .log_err(ctx)
591 .ok();
592
593 session
594 .store_seen_flags_on_imap(ctx)
595 .await
596 .context("store_seen_flags_on_imap")?;
597 }
598
599 if !ctx.should_delete_to_trash().await?
600 || ctx
601 .get_config(Config::ConfiguredTrashFolder)
602 .await?
603 .is_some()
604 {
605 connection
607 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
608 .await
609 .context("fetch_move_delete")?;
610
611 delete_expired_imap_messages(ctx)
616 .await
617 .context("delete_expired_imap_messages")?;
618 } else if folder_config == Config::ConfiguredInboxFolder {
619 ctx.last_full_folder_scan.lock().await.take();
620 }
621
622 if folder_config == Config::ConfiguredInboxFolder {
627 match connection
629 .scan_folders(ctx, &mut session)
630 .await
631 .context("scan_folders")
632 {
633 Err(err) => {
634 warn!(ctx, "{:#}", err);
637 }
638 Ok(true) => {
639 connection
646 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
647 .await
648 .context("fetch_move_delete after scan_folders")?;
649 }
650 Ok(false) => {}
651 }
652 }
653
654 session
656 .sync_seen_flags(ctx, &watch_folder)
657 .await
658 .context("sync_seen_flags")
659 .log_err(ctx)
660 .ok();
661
662 connection.connectivity.set_idle(ctx).await;
663
664 ctx.emit_event(EventType::ImapInboxIdle);
665
666 if !session.can_idle() {
667 info!(
668 ctx,
669 "IMAP session does not support IDLE, going to fake idle."
670 );
671 connection.fake_idle(ctx, watch_folder).await?;
672 return Ok(session);
673 }
674
675 if ctx
676 .get_config_bool(Config::DisableIdle)
677 .await
678 .context("Failed to get disable_idle config")
679 .log_err(ctx)
680 .unwrap_or_default()
681 {
682 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
683 connection.fake_idle(ctx, watch_folder).await?;
684 return Ok(session);
685 }
686
687 info!(
688 ctx,
689 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
690 );
691 let session = session
692 .idle(
693 ctx,
694 connection.idle_interrupt_receiver.clone(),
695 &watch_folder,
696 )
697 .await
698 .context("idle")?;
699
700 Ok(session)
701}
702
703async fn simple_imap_loop(
704 ctx: Context,
705 started: oneshot::Sender<()>,
706 inbox_handlers: ImapConnectionHandlers,
707 folder_meaning: FolderMeaning,
708) {
709 use futures::future::FutureExt;
710
711 info!(ctx, "Starting simple loop for {folder_meaning}.");
712 let ImapConnectionHandlers {
713 mut connection,
714 stop_token,
715 } = inbox_handlers;
716
717 let ctx1 = ctx.clone();
718
719 let fut = async move {
720 let ctx = ctx1;
721 if let Err(()) = started.send(()) {
722 warn!(
723 ctx,
724 "Simple imap loop for {folder_meaning}, missing started receiver."
725 );
726 return;
727 }
728
729 let mut old_session: Option<Session> = None;
730 loop {
731 let session = if let Some(session) = old_session.take() {
732 session
733 } else {
734 info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
735 match connection.prepare(&ctx).await {
736 Err(err) => {
737 warn!(
738 ctx,
739 "Failed to prepare {folder_meaning} connection: {err:#}."
740 );
741 continue;
742 }
743 Ok(session) => session,
744 }
745 };
746
747 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
748 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
749 Ok(session) => {
750 info!(
751 ctx,
752 "IMAP loop iteration for {folder_meaning} finished, keeping the session"
753 );
754 old_session = Some(session);
755 }
756 }
757 }
758 };
759
760 stop_token
761 .cancelled()
762 .map(|_| {
763 info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
764 })
765 .race(fut)
766 .await;
767}
768
769async fn smtp_loop(
770 ctx: Context,
771 started: oneshot::Sender<()>,
772 smtp_handlers: SmtpConnectionHandlers,
773) {
774 use futures::future::FutureExt;
775
776 info!(ctx, "Starting SMTP loop.");
777 let SmtpConnectionHandlers {
778 mut connection,
779 stop_token,
780 idle_interrupt_receiver,
781 } = smtp_handlers;
782
783 let ctx1 = ctx.clone();
784 let fut = async move {
785 let ctx = ctx1;
786 if let Err(()) = started.send(()) {
787 warn!(&ctx, "SMTP loop, missing started receiver.");
788 return;
789 }
790
791 let mut timeout = None;
792 loop {
793 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
794 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
795 timeout = Some(timeout.unwrap_or(30));
796 } else {
797 timeout = None;
798 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
799 if !duration_until_can_send.is_zero() {
800 info!(
801 ctx,
802 "smtp got rate limited, waiting for {} until can send again",
803 duration_to_str(duration_until_can_send)
804 );
805 tokio::time::sleep(duration_until_can_send).await;
806 continue;
807 }
808 }
809
810 info!(ctx, "SMTP fake idle started.");
812 match &connection.last_send_error {
813 None => connection.connectivity.set_idle(&ctx).await,
814 Some(err) => connection.connectivity.set_err(&ctx, err).await,
815 }
816
817 if let Some(t) = timeout {
822 let now = tools::Time::now();
823 info!(
824 ctx,
825 "SMTP has messages to retry, planning to retry {t} seconds later."
826 );
827 let duration = std::time::Duration::from_secs(t);
828 tokio::time::timeout(duration, async {
829 idle_interrupt_receiver.recv().await.unwrap_or_default()
830 })
831 .await
832 .unwrap_or_default();
833 let slept = time_elapsed(&now).as_secs();
834 timeout = Some(cmp::max(
835 t,
836 slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
837 ));
838 } else {
839 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
840 idle_interrupt_receiver.recv().await.unwrap_or_default();
841 };
842
843 info!(ctx, "SMTP fake idle interrupted.")
844 }
845 };
846
847 stop_token
848 .cancelled()
849 .map(|_| {
850 info!(ctx, "Shutting down SMTP loop.");
851 })
852 .race(fut)
853 .await;
854}
855
856impl Scheduler {
857 pub async fn start(ctx: &Context) -> Result<Self> {
859 let (smtp, smtp_handlers) = SmtpConnectionState::new();
860
861 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
862 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
863 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
864
865 let mut oboxes = Vec::new();
866 let mut start_recvs = Vec::new();
867
868 let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
869 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
870 let handle = {
871 let ctx = ctx.clone();
872 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
873 };
874 let inbox = SchedBox {
875 meaning: FolderMeaning::Inbox,
876 conn_state,
877 handle,
878 };
879 start_recvs.push(inbox_start_recv);
880
881 for (meaning, should_watch) in [
882 (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
883 (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
884 ] {
885 if should_watch? {
886 let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
887 let (start_send, start_recv) = oneshot::channel();
888 let ctx = ctx.clone();
889 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
890 oboxes.push(SchedBox {
891 meaning,
892 conn_state,
893 handle,
894 });
895 start_recvs.push(start_recv);
896 }
897 }
898
899 let smtp_handle = {
900 let ctx = ctx.clone();
901 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
902 };
903 start_recvs.push(smtp_start_recv);
904
905 let ephemeral_handle = {
906 let ctx = ctx.clone();
907 task::spawn(async move {
908 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
909 })
910 };
911
912 let location_handle = {
913 let ctx = ctx.clone();
914 task::spawn(async move {
915 location::location_loop(&ctx, location_interrupt_recv).await;
916 })
917 };
918
919 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
920
921 let res = Self {
922 inbox,
923 oboxes,
924 smtp,
925 smtp_handle,
926 ephemeral_handle,
927 ephemeral_interrupt_send,
928 location_handle,
929 location_interrupt_send,
930 recently_seen_loop,
931 };
932
933 if let Err(err) = try_join_all(start_recvs).await {
935 bail!("failed to start scheduler: {}", err);
936 }
937
938 info!(ctx, "scheduler is running");
939 Ok(res)
940 }
941
942 fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
943 once(&self.inbox).chain(self.oboxes.iter())
944 }
945
946 fn maybe_network(&self) {
947 for b in self.boxes() {
948 b.conn_state.interrupt();
949 }
950 self.interrupt_smtp();
951 }
952
953 fn maybe_network_lost(&self) {
954 for b in self.boxes() {
955 b.conn_state.interrupt();
956 }
957 self.interrupt_smtp();
958 }
959
960 fn interrupt_inbox(&self) {
961 self.inbox.conn_state.interrupt();
962 }
963
964 fn interrupt_oboxes(&self) {
965 for b in &self.oboxes {
966 b.conn_state.interrupt();
967 }
968 }
969
970 fn interrupt_smtp(&self) {
971 self.smtp.interrupt();
972 }
973
974 fn interrupt_ephemeral_task(&self) {
975 self.ephemeral_interrupt_send.try_send(()).ok();
976 }
977
978 fn interrupt_location(&self) {
979 self.location_interrupt_send.try_send(()).ok();
980 }
981
982 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
983 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
984 }
985
986 pub(crate) async fn stop(self, context: &Context) {
991 for b in self.boxes() {
993 b.conn_state.stop();
994 }
995 self.smtp.stop();
996
997 let timeout_duration = std::time::Duration::from_secs(30);
999
1000 let tracker = TaskTracker::new();
1001 for b in once(self.inbox).chain(self.oboxes) {
1002 let context = context.clone();
1003 tracker.spawn(async move {
1004 tokio::time::timeout(timeout_duration, b.handle)
1005 .await
1006 .log_err(&context)
1007 });
1008 }
1009 {
1010 let context = context.clone();
1011 tracker.spawn(async move {
1012 tokio::time::timeout(timeout_duration, self.smtp_handle)
1013 .await
1014 .log_err(&context)
1015 });
1016 }
1017 tracker.close();
1018 tracker.wait().await;
1019
1020 self.ephemeral_handle.abort();
1025 self.ephemeral_handle.await.ok();
1026 self.location_handle.abort();
1027 self.location_handle.await.ok();
1028 self.recently_seen_loop.abort().await;
1029 }
1030}
1031
1032#[derive(Debug)]
1034struct ConnectionState {
1035 stop_token: CancellationToken,
1037 idle_interrupt_sender: Sender<()>,
1039 connectivity: ConnectivityStore,
1041}
1042
1043impl ConnectionState {
1044 fn stop(&self) {
1046 self.stop_token.cancel();
1048 }
1049
1050 fn interrupt(&self) {
1051 self.idle_interrupt_sender.try_send(()).ok();
1053 }
1054}
1055
1056#[derive(Debug)]
1057pub(crate) struct SmtpConnectionState {
1058 state: ConnectionState,
1059}
1060
1061impl SmtpConnectionState {
1062 fn new() -> (Self, SmtpConnectionHandlers) {
1063 let stop_token = CancellationToken::new();
1064 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1065
1066 let handlers = SmtpConnectionHandlers {
1067 connection: Smtp::new(),
1068 stop_token: stop_token.clone(),
1069 idle_interrupt_receiver,
1070 };
1071
1072 let state = ConnectionState {
1073 stop_token,
1074 idle_interrupt_sender,
1075 connectivity: handlers.connection.connectivity.clone(),
1076 };
1077
1078 let conn = SmtpConnectionState { state };
1079
1080 (conn, handlers)
1081 }
1082
1083 fn interrupt(&self) {
1085 self.state.interrupt();
1086 }
1087
1088 fn stop(&self) {
1090 self.state.stop();
1091 }
1092}
1093
1094struct SmtpConnectionHandlers {
1095 connection: Smtp,
1096 stop_token: CancellationToken,
1097 idle_interrupt_receiver: Receiver<()>,
1098}
1099
1100#[derive(Debug)]
1101pub(crate) struct ImapConnectionState {
1102 state: ConnectionState,
1103}
1104
1105impl ImapConnectionState {
1106 async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1108 let stop_token = CancellationToken::new();
1109 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1110
1111 let handlers = ImapConnectionHandlers {
1112 connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1113 stop_token: stop_token.clone(),
1114 };
1115
1116 let state = ConnectionState {
1117 stop_token,
1118 idle_interrupt_sender,
1119 connectivity: handlers.connection.connectivity.clone(),
1120 };
1121
1122 let conn = ImapConnectionState { state };
1123
1124 Ok((conn, handlers))
1125 }
1126
1127 fn interrupt(&self) {
1129 self.state.interrupt();
1130 }
1131
1132 fn stop(&self) {
1134 self.state.stop();
1135 }
1136}
1137
1138#[derive(Debug)]
1139struct ImapConnectionHandlers {
1140 connection: Imap,
1141 stop_token: CancellationToken,
1142}