1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::{self, Config};
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{FolderMeaning, Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::smtp::{Smtp, send_smtp_messages};
24use crate::sql;
25use crate::stats::maybe_send_stats;
26use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
27use crate::transport::ConfiguredLoginParam;
28use crate::{constants, stats};
29
30pub(crate) mod connectivity;
31
32#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39 inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43 pub(crate) fn new() -> Self {
44 Default::default()
45 }
46
47 pub(crate) async fn is_running(&self) -> bool {
49 let inner = self.inner.read().await;
50 matches!(*inner, InnerSchedulerState::Started(_))
51 }
52
53 pub(crate) async fn start(&self, context: &Context) {
55 let mut inner = self.inner.write().await;
56 match *inner {
57 InnerSchedulerState::Started(_) => (),
58 InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
59 InnerSchedulerState::Paused {
60 ref mut started, ..
61 } => *started = true,
62 }
63 context.update_connectivities(&inner);
64 }
65
66 async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
68 info!(context, "starting IO");
69
70 context.new_msgs_notify.notify_one();
73
74 match Scheduler::start(context).await {
75 Ok(scheduler) => {
76 *inner = InnerSchedulerState::Started(scheduler);
77 context.emit_event(EventType::ConnectivityChanged);
78 }
79 Err(err) => error!(context, "Failed to start IO: {:#}", err),
80 }
81 }
82
83 pub(crate) async fn stop(&self, context: &Context) {
85 let mut inner = self.inner.write().await;
86 match *inner {
87 InnerSchedulerState::Started(_) => {
88 Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
89 }
90 InnerSchedulerState::Stopped => (),
91 InnerSchedulerState::Paused {
92 ref mut started, ..
93 } => *started = false,
94 }
95 context.update_connectivities(&inner);
96 }
97
98 async fn do_stop(
100 inner: &mut InnerSchedulerState,
101 context: &Context,
102 new_state: InnerSchedulerState,
103 ) {
104 info!(context, "stopping IO");
110
111 context.new_msgs_notify.notify_one();
114
115 let debug_logging = context
116 .debug_logging
117 .write()
118 .expect("RwLock is poisoned")
119 .take();
120 if let Some(debug_logging) = debug_logging {
121 debug_logging.loop_handle.abort();
122 debug_logging.loop_handle.await.ok();
123 }
124 let prev_state = std::mem::replace(inner, new_state);
125 context.emit_event(EventType::ConnectivityChanged);
126 match prev_state {
127 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
128 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
129 }
130 }
131
132 pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
141 {
142 let mut inner = self.inner.write().await;
143 match *inner {
144 InnerSchedulerState::Started(_) => {
145 let new_state = InnerSchedulerState::Paused {
146 started: true,
147 pause_guards_count: NonZeroUsize::MIN,
148 };
149 Self::do_stop(&mut inner, context, new_state).await;
150 }
151 InnerSchedulerState::Stopped => {
152 *inner = InnerSchedulerState::Paused {
153 started: false,
154 pause_guards_count: NonZeroUsize::MIN,
155 };
156 }
157 InnerSchedulerState::Paused {
158 ref mut pause_guards_count,
159 ..
160 } => {
161 *pause_guards_count = pause_guards_count
162 .checked_add(1)
163 .ok_or_else(|| Error::msg("Too many pause guards active"))?
164 }
165 }
166 context.update_connectivities(&inner);
167 }
168
169 let (tx, rx) = oneshot::channel();
170 let context = context.clone();
171 tokio::spawn(async move {
172 rx.await.ok();
173 let mut inner = context.scheduler.inner.write().await;
174 match *inner {
175 InnerSchedulerState::Started(_) => {
176 warn!(&context, "IoPausedGuard resume: started instead of paused");
177 }
178 InnerSchedulerState::Stopped => {
179 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
180 }
181 InnerSchedulerState::Paused {
182 ref started,
183 ref mut pause_guards_count,
184 } => {
185 if *pause_guards_count == NonZeroUsize::MIN {
186 match *started {
187 true => SchedulerState::do_start(&mut inner, &context).await,
188 false => *inner = InnerSchedulerState::Stopped,
189 }
190 } else {
191 let new_count = pause_guards_count.get() - 1;
192 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
194 }
195 }
196 }
197 context.update_connectivities(&inner);
198 });
199 Ok(IoPausedGuard { sender: Some(tx) })
200 }
201
202 pub(crate) async fn restart(&self, context: &Context) {
204 info!(context, "restarting IO");
205 if self.is_running().await {
206 self.stop(context).await;
207 self.start(context).await;
208 }
209 }
210
211 pub(crate) async fn maybe_network(&self) {
213 let inner = self.inner.read().await;
214 let (inboxes, oboxes) = match *inner {
215 InnerSchedulerState::Started(ref scheduler) => {
216 scheduler.maybe_network();
217 let inboxes = scheduler
218 .inboxes
219 .iter()
220 .map(|b| b.conn_state.state.connectivity.clone())
221 .collect::<Vec<_>>();
222 let oboxes = scheduler
223 .oboxes
224 .iter()
225 .map(|b| b.conn_state.state.connectivity.clone())
226 .collect::<Vec<_>>();
227 (inboxes, oboxes)
228 }
229 _ => return,
230 };
231 drop(inner);
232 connectivity::idle_interrupted(inboxes, oboxes);
233 }
234
235 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
237 let inner = self.inner.read().await;
238 let stores = match *inner {
239 InnerSchedulerState::Started(ref scheduler) => {
240 scheduler.maybe_network_lost();
241 scheduler
242 .boxes()
243 .map(|b| b.conn_state.state.connectivity.clone())
244 .collect()
245 }
246 _ => return,
247 };
248 drop(inner);
249 connectivity::maybe_network_lost(context, stores);
250 }
251
252 pub(crate) async fn interrupt_inbox(&self) {
253 let inner = self.inner.read().await;
254 if let InnerSchedulerState::Started(ref scheduler) = *inner {
255 scheduler.interrupt_inbox();
256 }
257 }
258
259 pub(crate) async fn interrupt_oboxes(&self) {
261 let inner = self.inner.read().await;
262 if let InnerSchedulerState::Started(ref scheduler) = *inner {
263 scheduler.interrupt_oboxes();
264 }
265 }
266
267 pub(crate) async fn interrupt_smtp(&self) {
268 let inner = self.inner.read().await;
269 if let InnerSchedulerState::Started(ref scheduler) = *inner {
270 scheduler.interrupt_smtp();
271 }
272 }
273
274 pub(crate) async fn interrupt_ephemeral_task(&self) {
275 let inner = self.inner.read().await;
276 if let InnerSchedulerState::Started(ref scheduler) = *inner {
277 scheduler.interrupt_ephemeral_task();
278 }
279 }
280
281 pub(crate) async fn interrupt_location(&self) {
282 let inner = self.inner.read().await;
283 if let InnerSchedulerState::Started(ref scheduler) = *inner {
284 scheduler.interrupt_location();
285 }
286 }
287
288 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
289 let inner = self.inner.read().await;
290 if let InnerSchedulerState::Started(ref scheduler) = *inner {
291 scheduler.interrupt_recently_seen(contact_id, timestamp);
292 }
293 }
294}
295
296#[derive(Debug, Default)]
297pub(crate) enum InnerSchedulerState {
298 Started(Scheduler),
299 #[default]
300 Stopped,
301 Paused {
302 started: bool,
303 pause_guards_count: NonZeroUsize,
304 },
305}
306
307#[derive(Default, Debug)]
312pub(crate) struct IoPausedGuard {
313 sender: Option<oneshot::Sender<()>>,
314}
315
316impl Drop for IoPausedGuard {
317 fn drop(&mut self) {
318 if let Some(sender) = self.sender.take() {
319 sender.send(()).ok();
321 }
322 }
323}
324
325#[derive(Debug)]
326struct SchedBox {
327 host: String,
329 meaning: FolderMeaning,
330 conn_state: ImapConnectionState,
331
332 handle: task::JoinHandle<()>,
334}
335
336#[derive(Debug)]
338pub(crate) struct Scheduler {
339 inboxes: Vec<SchedBox>,
341 oboxes: Vec<SchedBox>,
343 smtp: SmtpConnectionState,
344 smtp_handle: task::JoinHandle<()>,
345 ephemeral_handle: task::JoinHandle<()>,
346 ephemeral_interrupt_send: Sender<()>,
347 location_handle: task::JoinHandle<()>,
348 location_interrupt_send: Sender<()>,
349
350 recently_seen_loop: RecentlySeenLoop,
351}
352
353async fn inbox_loop(
354 ctx: Context,
355 started: oneshot::Sender<()>,
356 inbox_handlers: ImapConnectionHandlers,
357) {
358 use futures::future::FutureExt;
359
360 info!(ctx, "Starting inbox loop.");
361 let ImapConnectionHandlers {
362 mut connection,
363 stop_token,
364 } = inbox_handlers;
365
366 let ctx1 = ctx.clone();
367 let fut = async move {
368 let ctx = ctx1;
369 if let Err(()) = started.send(()) {
370 warn!(ctx, "Inbox loop, missing started receiver.");
371 return;
372 };
373
374 let mut old_session: Option<Session> = None;
375 loop {
376 let session = if let Some(session) = old_session.take() {
377 session
378 } else {
379 info!(ctx, "Preparing new IMAP session for inbox.");
380 match connection.prepare(&ctx).await {
381 Err(err) => {
382 warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
383 continue;
384 }
385 Ok(session) => session,
386 }
387 };
388
389 match inbox_fetch_idle(&ctx, &mut connection, session).await {
390 Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
391 Ok(session) => {
392 info!(
393 ctx,
394 "IMAP loop iteration for inbox finished, keeping the session."
395 );
396 old_session = Some(session);
397 }
398 }
399 }
400 };
401
402 stop_token
403 .cancelled()
404 .map(|_| {
405 info!(ctx, "Shutting down inbox loop.");
406 })
407 .race(fut)
408 .await;
409}
410
411pub async fn convert_folder_meaning(
417 ctx: &Context,
418 folder_meaning: FolderMeaning,
419) -> Result<Option<(Config, String)>> {
420 let folder_config = match folder_meaning.to_config() {
421 Some(c) => c,
422 None => {
423 return Ok(None);
426 }
427 };
428
429 let folder = ctx
430 .get_config(folder_config)
431 .await
432 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
433
434 if let Some(watch_folder) = folder {
435 Ok(Some((folder_config, watch_folder)))
436 } else {
437 Ok(None)
438 }
439}
440
441async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
442 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
443 ctx.set_config_internal(
444 Config::IsChatmail,
445 crate::config::from_bool(session.is_chatmail()),
446 )
447 .await?;
448 }
449
450 if ctx.quota_needs_update(session.transport_id(), 60).await
452 && let Err(err) = ctx.update_recent_quota(&mut session).await
453 {
454 warn!(ctx, "Failed to update quota: {:#}.", err);
455 }
456
457 if let Ok(()) = imap.resync_request_receiver.try_recv()
458 && let Err(err) = session.resync_folders(ctx).await
459 {
460 warn!(ctx, "Failed to resync folders: {:#}.", err);
461 imap.resync_request_sender.try_send(()).ok();
462 }
463
464 maybe_add_time_based_warnings(ctx).await;
465
466 match ctx.get_config_i64(Config::LastHousekeeping).await {
467 Ok(last_housekeeping_time) => {
468 let next_housekeeping_time =
469 last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
470 if next_housekeeping_time <= time() {
471 sql::housekeeping(ctx).await.log_err(ctx).ok();
472 }
473 }
474 Err(err) => {
475 warn!(ctx, "Failed to get last housekeeping time: {}", err);
476 }
477 };
478
479 maybe_send_stats(ctx).await.log_err(ctx).ok();
480 match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
481 Ok(fetched_existing_msgs) => {
482 if !fetched_existing_msgs {
483 if let Err(err) = ctx
488 .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
489 .await
490 {
491 warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
492 }
493
494 if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
495 warn!(ctx, "Failed to fetch existing messages: {:#}", err);
496 }
497 }
498 }
499 Err(err) => {
500 warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
501 }
502 }
503
504 session
505 .update_metadata(ctx)
506 .await
507 .context("update_metadata")?;
508 session
509 .register_token(ctx)
510 .await
511 .context("Failed to register push token")?;
512
513 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
514 Ok(session)
515}
516
517async fn fetch_idle(
523 ctx: &Context,
524 connection: &mut Imap,
525 mut session: Session,
526 folder_meaning: FolderMeaning,
527) -> Result<Session> {
528 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
529 else {
530 connection.connectivity.set_not_configured(ctx);
534 connection.idle_interrupt_receiver.recv().await.ok();
535 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
536 };
537
538 if folder_config == Config::ConfiguredInboxFolder {
539 session
540 .store_seen_flags_on_imap(ctx)
541 .await
542 .context("store_seen_flags_on_imap")?;
543 }
544
545 if !ctx.should_delete_to_trash().await?
546 || ctx
547 .get_config(Config::ConfiguredTrashFolder)
548 .await?
549 .is_some()
550 {
551 connection
553 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
554 .await
555 .context("fetch_move_delete")?;
556
557 delete_expired_imap_messages(ctx)
562 .await
563 .context("delete_expired_imap_messages")?;
564
565 download_known_post_messages_without_pre_message(ctx, &mut session).await?;
566 download_msgs(ctx, &mut session)
567 .await
568 .context("download_msgs")?;
569 } else if folder_config == Config::ConfiguredInboxFolder {
570 session.last_full_folder_scan.lock().await.take();
571 }
572
573 if folder_config == Config::ConfiguredInboxFolder {
578 match connection
580 .scan_folders(ctx, &mut session)
581 .await
582 .context("scan_folders")
583 {
584 Err(err) => {
585 warn!(ctx, "{:#}", err);
588 }
589 Ok(true) => {
590 connection
597 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
598 .await
599 .context("fetch_move_delete after scan_folders")?;
600 }
601 Ok(false) => {}
602 }
603 }
604
605 session
607 .sync_seen_flags(ctx, &watch_folder)
608 .await
609 .context("sync_seen_flags")
610 .log_err(ctx)
611 .ok();
612
613 connection.connectivity.set_idle(ctx);
614
615 ctx.emit_event(EventType::ImapInboxIdle);
616
617 if !session.can_idle() {
618 info!(
619 ctx,
620 "IMAP session does not support IDLE, going to fake idle."
621 );
622 connection.fake_idle(ctx, watch_folder).await?;
623 return Ok(session);
624 }
625
626 if ctx
627 .get_config_bool(Config::DisableIdle)
628 .await
629 .context("Failed to get disable_idle config")
630 .log_err(ctx)
631 .unwrap_or_default()
632 {
633 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
634 connection.fake_idle(ctx, watch_folder).await?;
635 return Ok(session);
636 }
637
638 info!(
639 ctx,
640 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
641 );
642 let session = session
643 .idle(
644 ctx,
645 connection.idle_interrupt_receiver.clone(),
646 &watch_folder,
647 )
648 .await
649 .context("idle")?;
650
651 Ok(session)
652}
653
654async fn simple_imap_loop(
656 ctx: Context,
657 started: oneshot::Sender<()>,
658 inbox_handlers: ImapConnectionHandlers,
659 folder_meaning: FolderMeaning,
660) {
661 use futures::future::FutureExt;
662
663 info!(ctx, "Starting simple loop for {folder_meaning}.");
664 let ImapConnectionHandlers {
665 mut connection,
666 stop_token,
667 } = inbox_handlers;
668
669 let ctx1 = ctx.clone();
670
671 let fut = async move {
672 let ctx = ctx1;
673 if let Err(()) = started.send(()) {
674 warn!(
675 ctx,
676 "Simple imap loop for {folder_meaning}, missing started receiver."
677 );
678 return;
679 }
680
681 let mut old_session: Option<Session> = None;
682 loop {
683 let session = if let Some(session) = old_session.take() {
684 session
685 } else {
686 info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
687 match connection.prepare(&ctx).await {
688 Err(err) => {
689 warn!(
690 ctx,
691 "Failed to prepare {folder_meaning} connection: {err:#}."
692 );
693 continue;
694 }
695 Ok(session) => session,
696 }
697 };
698
699 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
700 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
701 Ok(session) => {
702 info!(
703 ctx,
704 "IMAP loop iteration for {folder_meaning} finished, keeping the session"
705 );
706 old_session = Some(session);
707 }
708 }
709 }
710 };
711
712 stop_token
713 .cancelled()
714 .map(|_| {
715 info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
716 })
717 .race(fut)
718 .await;
719}
720
721async fn smtp_loop(
722 ctx: Context,
723 started: oneshot::Sender<()>,
724 smtp_handlers: SmtpConnectionHandlers,
725) {
726 use futures::future::FutureExt;
727
728 info!(ctx, "Starting SMTP loop.");
729 let SmtpConnectionHandlers {
730 mut connection,
731 stop_token,
732 idle_interrupt_receiver,
733 } = smtp_handlers;
734
735 let ctx1 = ctx.clone();
736 let fut = async move {
737 let ctx = ctx1;
738 if let Err(()) = started.send(()) {
739 warn!(&ctx, "SMTP loop, missing started receiver.");
740 return;
741 }
742
743 let mut timeout = None;
744 loop {
745 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
746 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
747 timeout = Some(timeout.unwrap_or(30));
748 } else {
749 timeout = None;
750 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
751 if !duration_until_can_send.is_zero() {
752 info!(
753 ctx,
754 "smtp got rate limited, waiting for {} until can send again",
755 duration_to_str(duration_until_can_send)
756 );
757 tokio::time::sleep(duration_until_can_send).await;
758 continue;
759 }
760 }
761
762 stats::maybe_update_message_stats(&ctx)
763 .await
764 .log_err(&ctx)
765 .ok();
766
767 info!(ctx, "SMTP fake idle started.");
769 match &connection.last_send_error {
770 None => connection.connectivity.set_idle(&ctx),
771 Some(err) => connection.connectivity.set_err(&ctx, err),
772 }
773
774 if let Some(t) = timeout {
779 let now = tools::Time::now();
780 info!(
781 ctx,
782 "SMTP has messages to retry, planning to retry {t} seconds later."
783 );
784 let duration = std::time::Duration::from_secs(t);
785 tokio::time::timeout(duration, async {
786 idle_interrupt_receiver.recv().await.unwrap_or_default()
787 })
788 .await
789 .unwrap_or_default();
790 let slept = time_elapsed(&now).as_secs();
791 timeout = Some(cmp::max(
792 t,
793 slept.saturating_add(rand::random_range((slept / 2)..=slept)),
794 ));
795 } else {
796 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
797 idle_interrupt_receiver.recv().await.unwrap_or_default();
798 };
799
800 info!(ctx, "SMTP fake idle interrupted.")
801 }
802 };
803
804 stop_token
805 .cancelled()
806 .map(|_| {
807 info!(ctx, "Shutting down SMTP loop.");
808 })
809 .race(fut)
810 .await;
811}
812
813impl Scheduler {
814 pub async fn start(ctx: &Context) -> Result<Self> {
816 let (smtp, smtp_handlers) = SmtpConnectionState::new();
817
818 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
819 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
820 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
821
822 let mut inboxes = Vec::new();
823 let mut oboxes = Vec::new();
824 let mut start_recvs = Vec::new();
825
826 for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
827 let (conn_state, inbox_handlers) =
828 ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
829 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
830 let handle = {
831 let ctx = ctx.clone();
832 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
833 };
834 let host = configured_login_param
835 .addr
836 .split("@")
837 .last()
838 .context("address has no host")?
839 .to_owned();
840 let inbox = SchedBox {
841 host: host.clone(),
842 meaning: FolderMeaning::Inbox,
843 conn_state,
844 handle,
845 };
846 inboxes.push(inbox);
847 start_recvs.push(inbox_start_recv);
848
849 if ctx.should_watch_mvbox().await? {
850 let (conn_state, handlers) =
851 ImapConnectionState::new(ctx, transport_id, configured_login_param).await?;
852 let (start_send, start_recv) = oneshot::channel();
853 let ctx = ctx.clone();
854 let meaning = FolderMeaning::Mvbox;
855 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
856 oboxes.push(SchedBox {
857 host,
858 meaning,
859 conn_state,
860 handle,
861 });
862 start_recvs.push(start_recv);
863 }
864 }
865
866 let smtp_handle = {
867 let ctx = ctx.clone();
868 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
869 };
870 start_recvs.push(smtp_start_recv);
871
872 let ephemeral_handle = {
873 let ctx = ctx.clone();
874 task::spawn(async move {
875 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
876 })
877 };
878
879 let location_handle = {
880 let ctx = ctx.clone();
881 task::spawn(async move {
882 location::location_loop(&ctx, location_interrupt_recv).await;
883 })
884 };
885
886 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
887
888 let res = Self {
889 inboxes,
890 oboxes,
891 smtp,
892 smtp_handle,
893 ephemeral_handle,
894 ephemeral_interrupt_send,
895 location_handle,
896 location_interrupt_send,
897 recently_seen_loop,
898 };
899
900 if let Err(err) = try_join_all(start_recvs).await {
902 bail!("failed to start scheduler: {err}");
903 }
904
905 info!(ctx, "scheduler is running");
906 Ok(res)
907 }
908
909 fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
910 self.inboxes.iter().chain(self.oboxes.iter())
911 }
912
913 fn maybe_network(&self) {
914 for b in self.boxes() {
915 b.conn_state.interrupt();
916 }
917 self.interrupt_smtp();
918 }
919
920 fn maybe_network_lost(&self) {
921 for b in self.boxes() {
922 b.conn_state.interrupt();
923 }
924 self.interrupt_smtp();
925 }
926
927 fn interrupt_inbox(&self) {
928 for b in &self.inboxes {
929 b.conn_state.interrupt();
930 }
931 }
932
933 fn interrupt_oboxes(&self) {
934 for b in &self.oboxes {
935 b.conn_state.interrupt();
936 }
937 }
938
939 fn interrupt_smtp(&self) {
940 self.smtp.interrupt();
941 }
942
943 fn interrupt_ephemeral_task(&self) {
944 self.ephemeral_interrupt_send.try_send(()).ok();
945 }
946
947 fn interrupt_location(&self) {
948 self.location_interrupt_send.try_send(()).ok();
949 }
950
951 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
952 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
953 }
954
955 pub(crate) async fn stop(self, context: &Context) {
960 for b in self.boxes() {
962 b.conn_state.stop();
963 }
964 self.smtp.stop();
965
966 let timeout_duration = std::time::Duration::from_secs(30);
968
969 let tracker = TaskTracker::new();
970 for b in self.inboxes.into_iter().chain(self.oboxes.into_iter()) {
971 let context = context.clone();
972 tracker.spawn(async move {
973 tokio::time::timeout(timeout_duration, b.handle)
974 .await
975 .log_err(&context)
976 });
977 }
978 {
979 let context = context.clone();
980 tracker.spawn(async move {
981 tokio::time::timeout(timeout_duration, self.smtp_handle)
982 .await
983 .log_err(&context)
984 });
985 }
986 tracker.close();
987 tracker.wait().await;
988
989 self.ephemeral_handle.abort();
994 self.ephemeral_handle.await.ok();
995 self.location_handle.abort();
996 self.location_handle.await.ok();
997 self.recently_seen_loop.abort().await;
998 }
999}
1000
1001#[derive(Debug)]
1003struct ConnectionState {
1004 stop_token: CancellationToken,
1006 idle_interrupt_sender: Sender<()>,
1008 connectivity: ConnectivityStore,
1010}
1011
1012impl ConnectionState {
1013 fn stop(&self) {
1015 self.stop_token.cancel();
1017 }
1018
1019 fn interrupt(&self) {
1020 self.idle_interrupt_sender.try_send(()).ok();
1022 }
1023}
1024
1025#[derive(Debug)]
1026pub(crate) struct SmtpConnectionState {
1027 state: ConnectionState,
1028}
1029
1030impl SmtpConnectionState {
1031 fn new() -> (Self, SmtpConnectionHandlers) {
1032 let stop_token = CancellationToken::new();
1033 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1034
1035 let handlers = SmtpConnectionHandlers {
1036 connection: Smtp::new(),
1037 stop_token: stop_token.clone(),
1038 idle_interrupt_receiver,
1039 };
1040
1041 let state = ConnectionState {
1042 stop_token,
1043 idle_interrupt_sender,
1044 connectivity: handlers.connection.connectivity.clone(),
1045 };
1046
1047 let conn = SmtpConnectionState { state };
1048
1049 (conn, handlers)
1050 }
1051
1052 fn interrupt(&self) {
1054 self.state.interrupt();
1055 }
1056
1057 fn stop(&self) {
1059 self.state.stop();
1060 }
1061}
1062
1063struct SmtpConnectionHandlers {
1064 connection: Smtp,
1065 stop_token: CancellationToken,
1066 idle_interrupt_receiver: Receiver<()>,
1067}
1068
1069#[derive(Debug)]
1070pub(crate) struct ImapConnectionState {
1071 state: ConnectionState,
1072}
1073
1074impl ImapConnectionState {
1075 async fn new(
1077 context: &Context,
1078 transport_id: u32,
1079 login_param: ConfiguredLoginParam,
1080 ) -> Result<(Self, ImapConnectionHandlers)> {
1081 let stop_token = CancellationToken::new();
1082 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1083
1084 let handlers = ImapConnectionHandlers {
1085 connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
1086 .await?,
1087 stop_token: stop_token.clone(),
1088 };
1089
1090 let state = ConnectionState {
1091 stop_token,
1092 idle_interrupt_sender,
1093 connectivity: handlers.connection.connectivity.clone(),
1094 };
1095
1096 let conn = ImapConnectionState { state };
1097
1098 Ok((conn, handlers))
1099 }
1100
1101 fn interrupt(&self) {
1103 self.state.interrupt();
1104 }
1105
1106 fn stop(&self) {
1108 self.state.stop();
1109 }
1110}
1111
1112#[derive(Debug)]
1113struct ImapConnectionHandlers {
1114 connection: Imap,
1115 stop_token: CancellationToken,
1116}