1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4
5use anyhow::{Context as _, Error, Result, bail};
6use async_channel::{self as channel, Receiver, Sender};
7use futures::future::try_join_all;
8use futures_lite::FutureExt;
9use tokio::sync::{RwLock, oneshot};
10use tokio::task;
11use tokio_util::sync::CancellationToken;
12use tokio_util::task::TaskTracker;
13
14pub(crate) use self::connectivity::ConnectivityStore;
15use crate::config::{self, Config};
16use crate::contact::{ContactId, RecentlySeenLoop};
17use crate::context::Context;
18use crate::download::{DownloadState, download_msg};
19use crate::ephemeral::{self, delete_expired_imap_messages};
20use crate::events::EventType;
21use crate::imap::{FolderMeaning, Imap, session::Session};
22use crate::location;
23use crate::log::{LogExt, warn};
24use crate::message::MsgId;
25use crate::smtp::{Smtp, send_smtp_messages};
26use crate::sql;
27use crate::stats::maybe_send_stats;
28use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
29use crate::{constants, stats};
30
31pub(crate) mod connectivity;
32
33#[derive(Debug, Default)]
39pub(crate) struct SchedulerState {
40 inner: RwLock<InnerSchedulerState>,
41}
42
43impl SchedulerState {
44 pub(crate) fn new() -> Self {
45 Default::default()
46 }
47
48 pub(crate) async fn is_running(&self) -> bool {
50 let inner = self.inner.read().await;
51 matches!(*inner, InnerSchedulerState::Started(_))
52 }
53
54 pub(crate) async fn start(&self, context: &Context) {
56 let mut inner = self.inner.write().await;
57 match *inner {
58 InnerSchedulerState::Started(_) => (),
59 InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
60 InnerSchedulerState::Paused {
61 ref mut started, ..
62 } => *started = true,
63 }
64 context.update_connectivities(&inner);
65 }
66
67 async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
69 info!(context, "starting IO");
70
71 context.new_msgs_notify.notify_one();
74
75 match Scheduler::start(context).await {
76 Ok(scheduler) => {
77 *inner = InnerSchedulerState::Started(scheduler);
78 context.emit_event(EventType::ConnectivityChanged);
79 }
80 Err(err) => error!(context, "Failed to start IO: {:#}", err),
81 }
82 }
83
84 pub(crate) async fn stop(&self, context: &Context) {
86 let mut inner = self.inner.write().await;
87 match *inner {
88 InnerSchedulerState::Started(_) => {
89 Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
90 }
91 InnerSchedulerState::Stopped => (),
92 InnerSchedulerState::Paused {
93 ref mut started, ..
94 } => *started = false,
95 }
96 context.update_connectivities(&inner);
97 }
98
99 async fn do_stop(
101 inner: &mut InnerSchedulerState,
102 context: &Context,
103 new_state: InnerSchedulerState,
104 ) {
105 info!(context, "stopping IO");
111
112 context.new_msgs_notify.notify_one();
115
116 let debug_logging = context
117 .debug_logging
118 .write()
119 .expect("RwLock is poisoned")
120 .take();
121 if let Some(debug_logging) = debug_logging {
122 debug_logging.loop_handle.abort();
123 debug_logging.loop_handle.await.ok();
124 }
125 let prev_state = std::mem::replace(inner, new_state);
126 context.emit_event(EventType::ConnectivityChanged);
127 match prev_state {
128 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
129 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
130 }
131 }
132
133 pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
142 {
143 let mut inner = self.inner.write().await;
144 match *inner {
145 InnerSchedulerState::Started(_) => {
146 let new_state = InnerSchedulerState::Paused {
147 started: true,
148 pause_guards_count: NonZeroUsize::new(1).unwrap(),
149 };
150 Self::do_stop(&mut inner, context, new_state).await;
151 }
152 InnerSchedulerState::Stopped => {
153 *inner = InnerSchedulerState::Paused {
154 started: false,
155 pause_guards_count: NonZeroUsize::new(1).unwrap(),
156 };
157 }
158 InnerSchedulerState::Paused {
159 ref mut pause_guards_count,
160 ..
161 } => {
162 *pause_guards_count = pause_guards_count
163 .checked_add(1)
164 .ok_or_else(|| Error::msg("Too many pause guards active"))?
165 }
166 }
167 context.update_connectivities(&inner);
168 }
169
170 let (tx, rx) = oneshot::channel();
171 let context = context.clone();
172 tokio::spawn(async move {
173 rx.await.ok();
174 let mut inner = context.scheduler.inner.write().await;
175 match *inner {
176 InnerSchedulerState::Started(_) => {
177 warn!(&context, "IoPausedGuard resume: started instead of paused");
178 }
179 InnerSchedulerState::Stopped => {
180 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
181 }
182 InnerSchedulerState::Paused {
183 ref started,
184 ref mut pause_guards_count,
185 } => {
186 if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
187 match *started {
188 true => SchedulerState::do_start(&mut inner, &context).await,
189 false => *inner = InnerSchedulerState::Stopped,
190 }
191 } else {
192 let new_count = pause_guards_count.get() - 1;
193 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
195 }
196 }
197 }
198 context.update_connectivities(&inner);
199 });
200 Ok(IoPausedGuard { sender: Some(tx) })
201 }
202
203 pub(crate) async fn restart(&self, context: &Context) {
205 info!(context, "restarting IO");
206 if self.is_running().await {
207 self.stop(context).await;
208 self.start(context).await;
209 }
210 }
211
212 pub(crate) async fn maybe_network(&self) {
214 let inner = self.inner.read().await;
215 let (inbox, oboxes) = match *inner {
216 InnerSchedulerState::Started(ref scheduler) => {
217 scheduler.maybe_network();
218 let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
219 let oboxes = scheduler
220 .oboxes
221 .iter()
222 .map(|b| b.conn_state.state.connectivity.clone())
223 .collect::<Vec<_>>();
224 (inbox, oboxes)
225 }
226 _ => return,
227 };
228 drop(inner);
229 connectivity::idle_interrupted(inbox, oboxes);
230 }
231
232 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
234 let inner = self.inner.read().await;
235 let stores = match *inner {
236 InnerSchedulerState::Started(ref scheduler) => {
237 scheduler.maybe_network_lost();
238 scheduler
239 .boxes()
240 .map(|b| b.conn_state.state.connectivity.clone())
241 .collect()
242 }
243 _ => return,
244 };
245 drop(inner);
246 connectivity::maybe_network_lost(context, stores);
247 }
248
249 pub(crate) async fn interrupt_inbox(&self) {
250 let inner = self.inner.read().await;
251 if let InnerSchedulerState::Started(ref scheduler) = *inner {
252 scheduler.interrupt_inbox();
253 }
254 }
255
256 pub(crate) async fn interrupt_oboxes(&self) {
258 let inner = self.inner.read().await;
259 if let InnerSchedulerState::Started(ref scheduler) = *inner {
260 scheduler.interrupt_oboxes();
261 }
262 }
263
264 pub(crate) async fn interrupt_smtp(&self) {
265 let inner = self.inner.read().await;
266 if let InnerSchedulerState::Started(ref scheduler) = *inner {
267 scheduler.interrupt_smtp();
268 }
269 }
270
271 pub(crate) async fn interrupt_ephemeral_task(&self) {
272 let inner = self.inner.read().await;
273 if let InnerSchedulerState::Started(ref scheduler) = *inner {
274 scheduler.interrupt_ephemeral_task();
275 }
276 }
277
278 pub(crate) async fn interrupt_location(&self) {
279 let inner = self.inner.read().await;
280 if let InnerSchedulerState::Started(ref scheduler) = *inner {
281 scheduler.interrupt_location();
282 }
283 }
284
285 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
286 let inner = self.inner.read().await;
287 if let InnerSchedulerState::Started(ref scheduler) = *inner {
288 scheduler.interrupt_recently_seen(contact_id, timestamp);
289 }
290 }
291}
292
293#[derive(Debug, Default)]
294pub(crate) enum InnerSchedulerState {
295 Started(Scheduler),
296 #[default]
297 Stopped,
298 Paused {
299 started: bool,
300 pause_guards_count: NonZeroUsize,
301 },
302}
303
304#[derive(Default, Debug)]
309pub(crate) struct IoPausedGuard {
310 sender: Option<oneshot::Sender<()>>,
311}
312
313impl Drop for IoPausedGuard {
314 fn drop(&mut self) {
315 if let Some(sender) = self.sender.take() {
316 sender.send(()).ok();
318 }
319 }
320}
321
322#[derive(Debug)]
323struct SchedBox {
324 meaning: FolderMeaning,
325 conn_state: ImapConnectionState,
326
327 handle: task::JoinHandle<()>,
329}
330
331#[derive(Debug)]
333pub(crate) struct Scheduler {
334 inbox: SchedBox,
335 oboxes: Vec<SchedBox>,
337 smtp: SmtpConnectionState,
338 smtp_handle: task::JoinHandle<()>,
339 ephemeral_handle: task::JoinHandle<()>,
340 ephemeral_interrupt_send: Sender<()>,
341 location_handle: task::JoinHandle<()>,
342 location_interrupt_send: Sender<()>,
343
344 recently_seen_loop: RecentlySeenLoop,
345}
346
347async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
348 let msg_ids = context
349 .sql
350 .query_map_vec("SELECT msg_id FROM download", (), |row| {
351 let msg_id: MsgId = row.get(0)?;
352 Ok(msg_id)
353 })
354 .await?;
355
356 for msg_id in msg_ids {
357 if let Err(err) = download_msg(context, msg_id, session).await {
358 warn!(context, "Failed to download message {msg_id}: {:#}.", err);
359
360 msg_id
367 .update_download_state(context, DownloadState::Failure)
368 .await?;
369 }
370 context
371 .sql
372 .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
373 .await?;
374 }
375
376 Ok(())
377}
378
379async fn inbox_loop(
380 ctx: Context,
381 started: oneshot::Sender<()>,
382 inbox_handlers: ImapConnectionHandlers,
383) {
384 use futures::future::FutureExt;
385
386 info!(ctx, "Starting inbox loop.");
387 let ImapConnectionHandlers {
388 mut connection,
389 stop_token,
390 } = inbox_handlers;
391
392 let ctx1 = ctx.clone();
393 let fut = async move {
394 let ctx = ctx1;
395 if let Err(()) = started.send(()) {
396 warn!(ctx, "Inbox loop, missing started receiver.");
397 return;
398 };
399
400 let mut old_session: Option<Session> = None;
401 loop {
402 let session = if let Some(session) = old_session.take() {
403 session
404 } else {
405 info!(ctx, "Preparing new IMAP session for inbox.");
406 match connection.prepare(&ctx).await {
407 Err(err) => {
408 warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
409 continue;
410 }
411 Ok(session) => session,
412 }
413 };
414
415 match inbox_fetch_idle(&ctx, &mut connection, session).await {
416 Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
417 Ok(session) => {
418 info!(
419 ctx,
420 "IMAP loop iteration for inbox finished, keeping the session."
421 );
422 old_session = Some(session);
423 }
424 }
425 }
426 };
427
428 stop_token
429 .cancelled()
430 .map(|_| {
431 info!(ctx, "Shutting down inbox loop.");
432 })
433 .race(fut)
434 .await;
435}
436
437pub async fn convert_folder_meaning(
443 ctx: &Context,
444 folder_meaning: FolderMeaning,
445) -> Result<Option<(Config, String)>> {
446 let folder_config = match folder_meaning.to_config() {
447 Some(c) => c,
448 None => {
449 return Ok(None);
452 }
453 };
454
455 let folder = ctx
456 .get_config(folder_config)
457 .await
458 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
459
460 if let Some(watch_folder) = folder {
461 Ok(Some((folder_config, watch_folder)))
462 } else {
463 Ok(None)
464 }
465}
466
467async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
468 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
469 ctx.set_config_internal(
470 Config::IsChatmail,
471 crate::config::from_bool(session.is_chatmail()),
472 )
473 .await?;
474 }
475
476 if ctx.quota_needs_update(60).await {
478 if let Err(err) = ctx.update_recent_quota(&mut session).await {
479 warn!(ctx, "Failed to update quota: {:#}.", err);
480 }
481 }
482
483 if let Ok(()) = imap.resync_request_receiver.try_recv() {
484 if let Err(err) = session.resync_folders(ctx).await {
485 warn!(ctx, "Failed to resync folders: {:#}.", err);
486 imap.resync_request_sender.try_send(()).ok();
487 }
488 }
489
490 maybe_add_time_based_warnings(ctx).await;
491
492 match ctx.get_config_i64(Config::LastHousekeeping).await {
493 Ok(last_housekeeping_time) => {
494 let next_housekeeping_time =
495 last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
496 if next_housekeeping_time <= time() {
497 sql::housekeeping(ctx).await.log_err(ctx).ok();
498 }
499 }
500 Err(err) => {
501 warn!(ctx, "Failed to get last housekeeping time: {}", err);
502 }
503 };
504
505 maybe_send_stats(ctx).await.log_err(ctx).ok();
506 match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
507 Ok(fetched_existing_msgs) => {
508 if !fetched_existing_msgs {
509 if let Err(err) = ctx
514 .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
515 .await
516 {
517 warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
518 }
519
520 if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
521 warn!(ctx, "Failed to fetch existing messages: {:#}", err);
522 }
523 }
524 }
525 Err(err) => {
526 warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
527 }
528 }
529
530 download_msgs(ctx, &mut session)
531 .await
532 .context("Failed to download messages")?;
533 session
534 .fetch_metadata(ctx)
535 .await
536 .context("Failed to fetch metadata")?;
537 session
538 .register_token(ctx)
539 .await
540 .context("Failed to register push token")?;
541
542 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
543 Ok(session)
544}
545
546async fn fetch_idle(
552 ctx: &Context,
553 connection: &mut Imap,
554 mut session: Session,
555 folder_meaning: FolderMeaning,
556) -> Result<Session> {
557 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
558 else {
559 connection.connectivity.set_not_configured(ctx);
563 connection.idle_interrupt_receiver.recv().await.ok();
564 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
565 };
566
567 if folder_config == Config::ConfiguredInboxFolder {
568 let mvbox;
569 let syncbox = match ctx.should_move_sync_msgs().await? {
570 false => &watch_folder,
571 true => {
572 mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
573 mvbox.as_deref().unwrap_or(&watch_folder)
574 }
575 };
576 session
577 .send_sync_msgs(ctx, syncbox)
578 .await
579 .context("fetch_idle: send_sync_msgs")
580 .log_err(ctx)
581 .ok();
582
583 session
584 .store_seen_flags_on_imap(ctx)
585 .await
586 .context("store_seen_flags_on_imap")?;
587 }
588
589 if !ctx.should_delete_to_trash().await?
590 || ctx
591 .get_config(Config::ConfiguredTrashFolder)
592 .await?
593 .is_some()
594 {
595 connection
597 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
598 .await
599 .context("fetch_move_delete")?;
600
601 delete_expired_imap_messages(ctx)
606 .await
607 .context("delete_expired_imap_messages")?;
608 } else if folder_config == Config::ConfiguredInboxFolder {
609 session.last_full_folder_scan.lock().await.take();
610 }
611
612 if folder_config == Config::ConfiguredInboxFolder {
617 match connection
619 .scan_folders(ctx, &mut session)
620 .await
621 .context("scan_folders")
622 {
623 Err(err) => {
624 warn!(ctx, "{:#}", err);
627 }
628 Ok(true) => {
629 connection
636 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
637 .await
638 .context("fetch_move_delete after scan_folders")?;
639 }
640 Ok(false) => {}
641 }
642 }
643
644 session
646 .sync_seen_flags(ctx, &watch_folder)
647 .await
648 .context("sync_seen_flags")
649 .log_err(ctx)
650 .ok();
651
652 connection.connectivity.set_idle(ctx);
653
654 ctx.emit_event(EventType::ImapInboxIdle);
655
656 if !session.can_idle() {
657 info!(
658 ctx,
659 "IMAP session does not support IDLE, going to fake idle."
660 );
661 connection.fake_idle(ctx, watch_folder).await?;
662 return Ok(session);
663 }
664
665 if ctx
666 .get_config_bool(Config::DisableIdle)
667 .await
668 .context("Failed to get disable_idle config")
669 .log_err(ctx)
670 .unwrap_or_default()
671 {
672 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
673 connection.fake_idle(ctx, watch_folder).await?;
674 return Ok(session);
675 }
676
677 info!(
678 ctx,
679 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
680 );
681 let session = session
682 .idle(
683 ctx,
684 connection.idle_interrupt_receiver.clone(),
685 &watch_folder,
686 )
687 .await
688 .context("idle")?;
689
690 Ok(session)
691}
692
693async fn simple_imap_loop(
694 ctx: Context,
695 started: oneshot::Sender<()>,
696 inbox_handlers: ImapConnectionHandlers,
697 folder_meaning: FolderMeaning,
698) {
699 use futures::future::FutureExt;
700
701 info!(ctx, "Starting simple loop for {folder_meaning}.");
702 let ImapConnectionHandlers {
703 mut connection,
704 stop_token,
705 } = inbox_handlers;
706
707 let ctx1 = ctx.clone();
708
709 let fut = async move {
710 let ctx = ctx1;
711 if let Err(()) = started.send(()) {
712 warn!(
713 ctx,
714 "Simple imap loop for {folder_meaning}, missing started receiver."
715 );
716 return;
717 }
718
719 let mut old_session: Option<Session> = None;
720 loop {
721 let session = if let Some(session) = old_session.take() {
722 session
723 } else {
724 info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
725 match connection.prepare(&ctx).await {
726 Err(err) => {
727 warn!(
728 ctx,
729 "Failed to prepare {folder_meaning} connection: {err:#}."
730 );
731 continue;
732 }
733 Ok(session) => session,
734 }
735 };
736
737 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
738 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
739 Ok(session) => {
740 info!(
741 ctx,
742 "IMAP loop iteration for {folder_meaning} finished, keeping the session"
743 );
744 old_session = Some(session);
745 }
746 }
747 }
748 };
749
750 stop_token
751 .cancelled()
752 .map(|_| {
753 info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
754 })
755 .race(fut)
756 .await;
757}
758
759async fn smtp_loop(
760 ctx: Context,
761 started: oneshot::Sender<()>,
762 smtp_handlers: SmtpConnectionHandlers,
763) {
764 use futures::future::FutureExt;
765
766 info!(ctx, "Starting SMTP loop.");
767 let SmtpConnectionHandlers {
768 mut connection,
769 stop_token,
770 idle_interrupt_receiver,
771 } = smtp_handlers;
772
773 let ctx1 = ctx.clone();
774 let fut = async move {
775 let ctx = ctx1;
776 if let Err(()) = started.send(()) {
777 warn!(&ctx, "SMTP loop, missing started receiver.");
778 return;
779 }
780
781 let mut timeout = None;
782 loop {
783 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
784 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
785 timeout = Some(timeout.unwrap_or(30));
786 } else {
787 timeout = None;
788 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
789 if !duration_until_can_send.is_zero() {
790 info!(
791 ctx,
792 "smtp got rate limited, waiting for {} until can send again",
793 duration_to_str(duration_until_can_send)
794 );
795 tokio::time::sleep(duration_until_can_send).await;
796 continue;
797 }
798 }
799
800 stats::maybe_update_message_stats(&ctx)
801 .await
802 .log_err(&ctx)
803 .ok();
804
805 info!(ctx, "SMTP fake idle started.");
807 match &connection.last_send_error {
808 None => connection.connectivity.set_idle(&ctx),
809 Some(err) => connection.connectivity.set_err(&ctx, err),
810 }
811
812 if let Some(t) = timeout {
817 let now = tools::Time::now();
818 info!(
819 ctx,
820 "SMTP has messages to retry, planning to retry {t} seconds later."
821 );
822 let duration = std::time::Duration::from_secs(t);
823 tokio::time::timeout(duration, async {
824 idle_interrupt_receiver.recv().await.unwrap_or_default()
825 })
826 .await
827 .unwrap_or_default();
828 let slept = time_elapsed(&now).as_secs();
829 timeout = Some(cmp::max(
830 t,
831 slept.saturating_add(rand::random_range((slept / 2)..=slept)),
832 ));
833 } else {
834 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
835 idle_interrupt_receiver.recv().await.unwrap_or_default();
836 };
837
838 info!(ctx, "SMTP fake idle interrupted.")
839 }
840 };
841
842 stop_token
843 .cancelled()
844 .map(|_| {
845 info!(ctx, "Shutting down SMTP loop.");
846 })
847 .race(fut)
848 .await;
849}
850
851impl Scheduler {
852 pub async fn start(ctx: &Context) -> Result<Self> {
854 let (smtp, smtp_handlers) = SmtpConnectionState::new();
855
856 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
857 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
858 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
859
860 let mut oboxes = Vec::new();
861 let mut start_recvs = Vec::new();
862
863 let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
864 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
865 let handle = {
866 let ctx = ctx.clone();
867 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
868 };
869 let inbox = SchedBox {
870 meaning: FolderMeaning::Inbox,
871 conn_state,
872 handle,
873 };
874 start_recvs.push(inbox_start_recv);
875
876 if ctx.should_watch_mvbox().await? {
877 let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
878 let (start_send, start_recv) = oneshot::channel();
879 let ctx = ctx.clone();
880 let meaning = FolderMeaning::Mvbox;
881 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
882 oboxes.push(SchedBox {
883 meaning,
884 conn_state,
885 handle,
886 });
887 start_recvs.push(start_recv);
888 }
889
890 let smtp_handle = {
891 let ctx = ctx.clone();
892 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
893 };
894 start_recvs.push(smtp_start_recv);
895
896 let ephemeral_handle = {
897 let ctx = ctx.clone();
898 task::spawn(async move {
899 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
900 })
901 };
902
903 let location_handle = {
904 let ctx = ctx.clone();
905 task::spawn(async move {
906 location::location_loop(&ctx, location_interrupt_recv).await;
907 })
908 };
909
910 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
911
912 let res = Self {
913 inbox,
914 oboxes,
915 smtp,
916 smtp_handle,
917 ephemeral_handle,
918 ephemeral_interrupt_send,
919 location_handle,
920 location_interrupt_send,
921 recently_seen_loop,
922 };
923
924 if let Err(err) = try_join_all(start_recvs).await {
926 bail!("failed to start scheduler: {err}");
927 }
928
929 info!(ctx, "scheduler is running");
930 Ok(res)
931 }
932
933 fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
934 once(&self.inbox).chain(self.oboxes.iter())
935 }
936
937 fn maybe_network(&self) {
938 for b in self.boxes() {
939 b.conn_state.interrupt();
940 }
941 self.interrupt_smtp();
942 }
943
944 fn maybe_network_lost(&self) {
945 for b in self.boxes() {
946 b.conn_state.interrupt();
947 }
948 self.interrupt_smtp();
949 }
950
951 fn interrupt_inbox(&self) {
952 self.inbox.conn_state.interrupt();
953 }
954
955 fn interrupt_oboxes(&self) {
956 for b in &self.oboxes {
957 b.conn_state.interrupt();
958 }
959 }
960
961 fn interrupt_smtp(&self) {
962 self.smtp.interrupt();
963 }
964
965 fn interrupt_ephemeral_task(&self) {
966 self.ephemeral_interrupt_send.try_send(()).ok();
967 }
968
969 fn interrupt_location(&self) {
970 self.location_interrupt_send.try_send(()).ok();
971 }
972
973 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
974 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
975 }
976
977 pub(crate) async fn stop(self, context: &Context) {
982 for b in self.boxes() {
984 b.conn_state.stop();
985 }
986 self.smtp.stop();
987
988 let timeout_duration = std::time::Duration::from_secs(30);
990
991 let tracker = TaskTracker::new();
992 for b in once(self.inbox).chain(self.oboxes) {
993 let context = context.clone();
994 tracker.spawn(async move {
995 tokio::time::timeout(timeout_duration, b.handle)
996 .await
997 .log_err(&context)
998 });
999 }
1000 {
1001 let context = context.clone();
1002 tracker.spawn(async move {
1003 tokio::time::timeout(timeout_duration, self.smtp_handle)
1004 .await
1005 .log_err(&context)
1006 });
1007 }
1008 tracker.close();
1009 tracker.wait().await;
1010
1011 self.ephemeral_handle.abort();
1016 self.ephemeral_handle.await.ok();
1017 self.location_handle.abort();
1018 self.location_handle.await.ok();
1019 self.recently_seen_loop.abort().await;
1020 }
1021}
1022
1023#[derive(Debug)]
1025struct ConnectionState {
1026 stop_token: CancellationToken,
1028 idle_interrupt_sender: Sender<()>,
1030 connectivity: ConnectivityStore,
1032}
1033
1034impl ConnectionState {
1035 fn stop(&self) {
1037 self.stop_token.cancel();
1039 }
1040
1041 fn interrupt(&self) {
1042 self.idle_interrupt_sender.try_send(()).ok();
1044 }
1045}
1046
1047#[derive(Debug)]
1048pub(crate) struct SmtpConnectionState {
1049 state: ConnectionState,
1050}
1051
1052impl SmtpConnectionState {
1053 fn new() -> (Self, SmtpConnectionHandlers) {
1054 let stop_token = CancellationToken::new();
1055 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1056
1057 let handlers = SmtpConnectionHandlers {
1058 connection: Smtp::new(),
1059 stop_token: stop_token.clone(),
1060 idle_interrupt_receiver,
1061 };
1062
1063 let state = ConnectionState {
1064 stop_token,
1065 idle_interrupt_sender,
1066 connectivity: handlers.connection.connectivity.clone(),
1067 };
1068
1069 let conn = SmtpConnectionState { state };
1070
1071 (conn, handlers)
1072 }
1073
1074 fn interrupt(&self) {
1076 self.state.interrupt();
1077 }
1078
1079 fn stop(&self) {
1081 self.state.stop();
1082 }
1083}
1084
1085struct SmtpConnectionHandlers {
1086 connection: Smtp,
1087 stop_token: CancellationToken,
1088 idle_interrupt_receiver: Receiver<()>,
1089}
1090
1091#[derive(Debug)]
1092pub(crate) struct ImapConnectionState {
1093 state: ConnectionState,
1094}
1095
1096impl ImapConnectionState {
1097 async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1099 let stop_token = CancellationToken::new();
1100 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1101
1102 let handlers = ImapConnectionHandlers {
1103 connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1104 stop_token: stop_token.clone(),
1105 };
1106
1107 let state = ConnectionState {
1108 stop_token,
1109 idle_interrupt_sender,
1110 connectivity: handlers.connection.connectivity.clone(),
1111 };
1112
1113 let conn = ImapConnectionState { state };
1114
1115 Ok((conn, handlers))
1116 }
1117
1118 fn interrupt(&self) {
1120 self.state.interrupt();
1121 }
1122
1123 fn stop(&self) {
1125 self.state.stop();
1126 }
1127}
1128
1129#[derive(Debug)]
1130struct ImapConnectionHandlers {
1131 connection: Imap,
1132 stop_token: CancellationToken,
1133}