1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{bail, Context as _, Error, Result};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
12use tokio::task;
13use tokio_util::sync::CancellationToken;
14
15use self::connectivity::ConnectivityStore;
16use crate::config::{self, Config};
17use crate::contact::{ContactId, RecentlySeenLoop};
18use crate::context::Context;
19use crate::download::{download_msg, DownloadState};
20use crate::ephemeral::{self, delete_expired_imap_messages};
21use crate::events::EventType;
22use crate::imap::{session::Session, FolderMeaning, Imap};
23use crate::location;
24use crate::log::{error, info, warn, LogExt};
25use crate::message::MsgId;
26use crate::smtp::{send_smtp_messages, Smtp};
27use crate::sql;
28use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
29
30pub(crate) mod connectivity;
31
32#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39 inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43 pub(crate) fn new() -> Self {
44 Default::default()
45 }
46
47 pub(crate) async fn is_running(&self) -> bool {
49 let inner = self.inner.read().await;
50 matches!(*inner, InnerSchedulerState::Started(_))
51 }
52
53 pub(crate) async fn start(&self, context: Context) {
55 let mut inner = self.inner.write().await;
56 match *inner {
57 InnerSchedulerState::Started(_) => (),
58 InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
59 InnerSchedulerState::Paused {
60 ref mut started, ..
61 } => *started = true,
62 }
63 }
64
65 async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
67 info!(context, "starting IO");
68
69 context.new_msgs_notify.notify_one();
72
73 let ctx = context.clone();
74 match Scheduler::start(&context).await {
75 Ok(scheduler) => {
76 *inner = InnerSchedulerState::Started(scheduler);
77 context.emit_event(EventType::ConnectivityChanged);
78 }
79 Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
80 }
81 }
82
83 pub(crate) async fn stop(&self, context: &Context) {
85 let mut inner = self.inner.write().await;
86 match *inner {
87 InnerSchedulerState::Started(_) => {
88 Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
89 }
90 InnerSchedulerState::Stopped => (),
91 InnerSchedulerState::Paused {
92 ref mut started, ..
93 } => *started = false,
94 }
95 }
96
97 async fn do_stop(
99 mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
100 context: &Context,
101 new_state: InnerSchedulerState,
102 ) {
103 info!(context, "stopping IO");
109
110 context.new_msgs_notify.notify_one();
113
114 let debug_logging = context
115 .debug_logging
116 .write()
117 .expect("RwLock is poisoned")
118 .take();
119 if let Some(debug_logging) = debug_logging {
120 debug_logging.loop_handle.abort();
121 debug_logging.loop_handle.await.ok();
122 }
123 let prev_state = std::mem::replace(&mut *inner, new_state);
124 context.emit_event(EventType::ConnectivityChanged);
125 match prev_state {
126 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
127 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
128 }
129 }
130
131 pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
140 {
141 let mut inner = self.inner.write().await;
142 match *inner {
143 InnerSchedulerState::Started(_) => {
144 let new_state = InnerSchedulerState::Paused {
145 started: true,
146 pause_guards_count: NonZeroUsize::new(1).unwrap(),
147 };
148 Self::do_stop(inner, &context, new_state).await;
149 }
150 InnerSchedulerState::Stopped => {
151 *inner = InnerSchedulerState::Paused {
152 started: false,
153 pause_guards_count: NonZeroUsize::new(1).unwrap(),
154 };
155 }
156 InnerSchedulerState::Paused {
157 ref mut pause_guards_count,
158 ..
159 } => {
160 *pause_guards_count = pause_guards_count
161 .checked_add(1)
162 .ok_or_else(|| Error::msg("Too many pause guards active"))?
163 }
164 }
165 }
166
167 let (tx, rx) = oneshot::channel();
168 tokio::spawn(async move {
169 rx.await.ok();
170 let mut inner = context.scheduler.inner.write().await;
171 match *inner {
172 InnerSchedulerState::Started(_) => {
173 warn!(&context, "IoPausedGuard resume: started instead of paused");
174 }
175 InnerSchedulerState::Stopped => {
176 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
177 }
178 InnerSchedulerState::Paused {
179 ref started,
180 ref mut pause_guards_count,
181 } => {
182 if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
183 match *started {
184 true => SchedulerState::do_start(inner, context.clone()).await,
185 false => *inner = InnerSchedulerState::Stopped,
186 }
187 } else {
188 let new_count = pause_guards_count.get() - 1;
189 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
191 }
192 }
193 }
194 });
195 Ok(IoPausedGuard { sender: Some(tx) })
196 }
197
198 pub(crate) async fn restart(&self, context: &Context) {
200 info!(context, "restarting IO");
201 if self.is_running().await {
202 self.stop(context).await;
203 self.start(context.clone()).await;
204 }
205 }
206
207 pub(crate) async fn maybe_network(&self) {
209 let inner = self.inner.read().await;
210 let (inbox, oboxes) = match *inner {
211 InnerSchedulerState::Started(ref scheduler) => {
212 scheduler.maybe_network();
213 let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
214 let oboxes = scheduler
215 .oboxes
216 .iter()
217 .map(|b| b.conn_state.state.connectivity.clone())
218 .collect::<Vec<_>>();
219 (inbox, oboxes)
220 }
221 _ => return,
222 };
223 drop(inner);
224 connectivity::idle_interrupted(inbox, oboxes).await;
225 }
226
227 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
229 let inner = self.inner.read().await;
230 let stores = match *inner {
231 InnerSchedulerState::Started(ref scheduler) => {
232 scheduler.maybe_network_lost();
233 scheduler
234 .boxes()
235 .map(|b| b.conn_state.state.connectivity.clone())
236 .collect()
237 }
238 _ => return,
239 };
240 drop(inner);
241 connectivity::maybe_network_lost(context, stores).await;
242 }
243
244 pub(crate) async fn interrupt_inbox(&self) {
245 let inner = self.inner.read().await;
246 if let InnerSchedulerState::Started(ref scheduler) = *inner {
247 scheduler.interrupt_inbox();
248 }
249 }
250
251 pub(crate) async fn interrupt_oboxes(&self) {
253 let inner = self.inner.read().await;
254 if let InnerSchedulerState::Started(ref scheduler) = *inner {
255 scheduler.interrupt_oboxes();
256 }
257 }
258
259 pub(crate) async fn interrupt_smtp(&self) {
260 let inner = self.inner.read().await;
261 if let InnerSchedulerState::Started(ref scheduler) = *inner {
262 scheduler.interrupt_smtp();
263 }
264 }
265
266 pub(crate) async fn interrupt_ephemeral_task(&self) {
267 let inner = self.inner.read().await;
268 if let InnerSchedulerState::Started(ref scheduler) = *inner {
269 scheduler.interrupt_ephemeral_task();
270 }
271 }
272
273 pub(crate) async fn interrupt_location(&self) {
274 let inner = self.inner.read().await;
275 if let InnerSchedulerState::Started(ref scheduler) = *inner {
276 scheduler.interrupt_location();
277 }
278 }
279
280 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
281 let inner = self.inner.read().await;
282 if let InnerSchedulerState::Started(ref scheduler) = *inner {
283 scheduler.interrupt_recently_seen(contact_id, timestamp);
284 }
285 }
286}
287
288#[derive(Debug, Default)]
289enum InnerSchedulerState {
290 Started(Scheduler),
291 #[default]
292 Stopped,
293 Paused {
294 started: bool,
295 pause_guards_count: NonZeroUsize,
296 },
297}
298
299#[derive(Default, Debug)]
304pub(crate) struct IoPausedGuard {
305 sender: Option<oneshot::Sender<()>>,
306}
307
308impl Drop for IoPausedGuard {
309 fn drop(&mut self) {
310 if let Some(sender) = self.sender.take() {
311 sender.send(()).ok();
313 }
314 }
315}
316
317#[derive(Debug)]
318struct SchedBox {
319 meaning: FolderMeaning,
320 conn_state: ImapConnectionState,
321
322 handle: task::JoinHandle<()>,
324}
325
326#[derive(Debug)]
328pub(crate) struct Scheduler {
329 inbox: SchedBox,
330 oboxes: Vec<SchedBox>,
332 smtp: SmtpConnectionState,
333 smtp_handle: task::JoinHandle<()>,
334 ephemeral_handle: task::JoinHandle<()>,
335 ephemeral_interrupt_send: Sender<()>,
336 location_handle: task::JoinHandle<()>,
337 location_interrupt_send: Sender<()>,
338
339 recently_seen_loop: RecentlySeenLoop,
340}
341
342async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
343 let msg_ids = context
344 .sql
345 .query_map(
346 "SELECT msg_id FROM download",
347 (),
348 |row| {
349 let msg_id: MsgId = row.get(0)?;
350 Ok(msg_id)
351 },
352 |rowids| {
353 rowids
354 .collect::<std::result::Result<Vec<_>, _>>()
355 .map_err(Into::into)
356 },
357 )
358 .await?;
359
360 for msg_id in msg_ids {
361 if let Err(err) = download_msg(context, msg_id, session).await {
362 warn!(context, "Failed to download message {msg_id}: {:#}.", err);
363
364 msg_id
371 .update_download_state(context, DownloadState::Failure)
372 .await?;
373 }
374 context
375 .sql
376 .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
377 .await?;
378 }
379
380 Ok(())
381}
382
383async fn inbox_loop(
384 ctx: Context,
385 started: oneshot::Sender<()>,
386 inbox_handlers: ImapConnectionHandlers,
387) {
388 use futures::future::FutureExt;
389
390 info!(ctx, "Starting inbox loop.");
391 let ImapConnectionHandlers {
392 mut connection,
393 stop_token,
394 } = inbox_handlers;
395
396 let ctx1 = ctx.clone();
397 let fut = async move {
398 let ctx = ctx1;
399 if let Err(()) = started.send(()) {
400 warn!(ctx, "Inbox loop, missing started receiver.");
401 return;
402 };
403
404 let mut old_session: Option<Session> = None;
405 loop {
406 let session = if let Some(session) = old_session.take() {
407 session
408 } else {
409 info!(ctx, "Preparing new IMAP session for inbox.");
410 match connection.prepare(&ctx).await {
411 Err(err) => {
412 warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
413 continue;
414 }
415 Ok(session) => session,
416 }
417 };
418
419 match inbox_fetch_idle(&ctx, &mut connection, session).await {
420 Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
421 Ok(session) => {
422 info!(
423 ctx,
424 "IMAP loop iteration for inbox finished, keeping the session."
425 );
426 old_session = Some(session);
427 }
428 }
429 }
430 };
431
432 stop_token
433 .cancelled()
434 .map(|_| {
435 info!(ctx, "Shutting down inbox loop.");
436 })
437 .race(fut)
438 .await;
439}
440
441pub async fn convert_folder_meaning(
447 ctx: &Context,
448 folder_meaning: FolderMeaning,
449) -> Result<Option<(Config, String)>> {
450 let folder_config = match folder_meaning.to_config() {
451 Some(c) => c,
452 None => {
453 return Ok(None);
456 }
457 };
458
459 let folder = ctx
460 .get_config(folder_config)
461 .await
462 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
463
464 if let Some(watch_folder) = folder {
465 Ok(Some((folder_config, watch_folder)))
466 } else {
467 Ok(None)
468 }
469}
470
471async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
472 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
473 ctx.set_config_internal(
474 Config::IsChatmail,
475 crate::config::from_bool(session.is_chatmail()),
476 )
477 .await?;
478 }
479
480 if ctx.quota_needs_update(60).await {
482 if let Err(err) = ctx.update_recent_quota(&mut session).await {
483 warn!(ctx, "Failed to update quota: {:#}.", err);
484 }
485 }
486
487 let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
488 if resync_requested {
489 if let Err(err) = session.resync_folders(ctx).await {
490 warn!(ctx, "Failed to resync folders: {:#}.", err);
491 ctx.resync_request.store(true, Ordering::Relaxed);
492 }
493 }
494
495 maybe_add_time_based_warnings(ctx).await;
496
497 match ctx.get_config_i64(Config::LastHousekeeping).await {
498 Ok(last_housekeeping_time) => {
499 let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24);
500 if next_housekeeping_time <= time() {
501 sql::housekeeping(ctx).await.log_err(ctx).ok();
502 }
503 }
504 Err(err) => {
505 warn!(ctx, "Failed to get last housekeeping time: {}", err);
506 }
507 };
508
509 match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
510 Ok(fetched_existing_msgs) => {
511 if !fetched_existing_msgs {
512 if let Err(err) = ctx
517 .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
518 .await
519 {
520 warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
521 }
522
523 if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
524 warn!(ctx, "Failed to fetch existing messages: {:#}", err);
525 }
526 }
527 }
528 Err(err) => {
529 warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
530 }
531 }
532
533 download_msgs(ctx, &mut session)
534 .await
535 .context("Failed to download messages")?;
536 session
537 .fetch_metadata(ctx)
538 .await
539 .context("Failed to fetch metadata")?;
540 session
541 .register_token(ctx)
542 .await
543 .context("Failed to register push token")?;
544
545 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
546 Ok(session)
547}
548
549async fn fetch_idle(
555 ctx: &Context,
556 connection: &mut Imap,
557 mut session: Session,
558 folder_meaning: FolderMeaning,
559) -> Result<Session> {
560 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
561 else {
562 connection.connectivity.set_not_configured(ctx).await;
566 connection.idle_interrupt_receiver.recv().await.ok();
567 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
568 };
569
570 if folder_config == Config::ConfiguredInboxFolder {
571 let mvbox;
572 let syncbox = match ctx.should_move_sync_msgs().await? {
573 false => &watch_folder,
574 true => {
575 mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
576 mvbox.as_deref().unwrap_or(&watch_folder)
577 }
578 };
579 session
580 .send_sync_msgs(ctx, syncbox)
581 .await
582 .context("fetch_idle: send_sync_msgs")
583 .log_err(ctx)
584 .ok();
585
586 session
587 .store_seen_flags_on_imap(ctx)
588 .await
589 .context("store_seen_flags_on_imap")?;
590 }
591
592 if !ctx.should_delete_to_trash().await?
593 || ctx
594 .get_config(Config::ConfiguredTrashFolder)
595 .await?
596 .is_some()
597 {
598 connection
600 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
601 .await
602 .context("fetch_move_delete")?;
603
604 delete_expired_imap_messages(ctx)
609 .await
610 .context("delete_expired_imap_messages")?;
611 } else if folder_config == Config::ConfiguredInboxFolder {
612 ctx.last_full_folder_scan.lock().await.take();
613 }
614
615 if folder_config == Config::ConfiguredInboxFolder {
620 match connection
622 .scan_folders(ctx, &mut session)
623 .await
624 .context("scan_folders")
625 {
626 Err(err) => {
627 warn!(ctx, "{:#}", err);
630 }
631 Ok(true) => {
632 connection
639 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
640 .await
641 .context("fetch_move_delete after scan_folders")?;
642 }
643 Ok(false) => {}
644 }
645 }
646
647 session
649 .sync_seen_flags(ctx, &watch_folder)
650 .await
651 .context("sync_seen_flags")
652 .log_err(ctx)
653 .ok();
654
655 connection.connectivity.set_idle(ctx).await;
656
657 ctx.emit_event(EventType::ImapInboxIdle);
658
659 if !session.can_idle() {
660 info!(
661 ctx,
662 "IMAP session does not support IDLE, going to fake idle."
663 );
664 connection.fake_idle(ctx, watch_folder).await?;
665 return Ok(session);
666 }
667
668 if ctx
669 .get_config_bool(Config::DisableIdle)
670 .await
671 .context("Failed to get disable_idle config")
672 .log_err(ctx)
673 .unwrap_or_default()
674 {
675 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
676 connection.fake_idle(ctx, watch_folder).await?;
677 return Ok(session);
678 }
679
680 info!(
681 ctx,
682 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
683 );
684 let session = session
685 .idle(
686 ctx,
687 connection.idle_interrupt_receiver.clone(),
688 &watch_folder,
689 )
690 .await
691 .context("idle")?;
692
693 Ok(session)
694}
695
696async fn simple_imap_loop(
697 ctx: Context,
698 started: oneshot::Sender<()>,
699 inbox_handlers: ImapConnectionHandlers,
700 folder_meaning: FolderMeaning,
701) {
702 use futures::future::FutureExt;
703
704 info!(ctx, "Starting simple loop for {folder_meaning}.");
705 let ImapConnectionHandlers {
706 mut connection,
707 stop_token,
708 } = inbox_handlers;
709
710 let ctx1 = ctx.clone();
711
712 let fut = async move {
713 let ctx = ctx1;
714 if let Err(()) = started.send(()) {
715 warn!(
716 ctx,
717 "Simple imap loop for {folder_meaning}, missing started receiver."
718 );
719 return;
720 }
721
722 let mut old_session: Option<Session> = None;
723 loop {
724 let session = if let Some(session) = old_session.take() {
725 session
726 } else {
727 info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
728 match connection.prepare(&ctx).await {
729 Err(err) => {
730 warn!(
731 ctx,
732 "Failed to prepare {folder_meaning} connection: {err:#}."
733 );
734 continue;
735 }
736 Ok(session) => session,
737 }
738 };
739
740 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
741 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
742 Ok(session) => {
743 info!(
744 ctx,
745 "IMAP loop iteration for {folder_meaning} finished, keeping the session"
746 );
747 old_session = Some(session);
748 }
749 }
750 }
751 };
752
753 stop_token
754 .cancelled()
755 .map(|_| {
756 info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
757 })
758 .race(fut)
759 .await;
760}
761
762async fn smtp_loop(
763 ctx: Context,
764 started: oneshot::Sender<()>,
765 smtp_handlers: SmtpConnectionHandlers,
766) {
767 use futures::future::FutureExt;
768
769 info!(ctx, "Starting SMTP loop.");
770 let SmtpConnectionHandlers {
771 mut connection,
772 stop_token,
773 idle_interrupt_receiver,
774 } = smtp_handlers;
775
776 let ctx1 = ctx.clone();
777 let fut = async move {
778 let ctx = ctx1;
779 if let Err(()) = started.send(()) {
780 warn!(&ctx, "SMTP loop, missing started receiver.");
781 return;
782 }
783
784 let mut timeout = None;
785 loop {
786 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
787 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
788 timeout = Some(timeout.unwrap_or(30));
789 } else {
790 timeout = None;
791 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
792 if !duration_until_can_send.is_zero() {
793 info!(
794 ctx,
795 "smtp got rate limited, waiting for {} until can send again",
796 duration_to_str(duration_until_can_send)
797 );
798 tokio::time::sleep(duration_until_can_send).await;
799 continue;
800 }
801 }
802
803 info!(ctx, "SMTP fake idle started.");
805 match &connection.last_send_error {
806 None => connection.connectivity.set_idle(&ctx).await,
807 Some(err) => connection.connectivity.set_err(&ctx, err).await,
808 }
809
810 if let Some(t) = timeout {
815 let now = tools::Time::now();
816 info!(
817 ctx,
818 "SMTP has messages to retry, planning to retry {t} seconds later."
819 );
820 let duration = std::time::Duration::from_secs(t);
821 tokio::time::timeout(duration, async {
822 idle_interrupt_receiver.recv().await.unwrap_or_default()
823 })
824 .await
825 .unwrap_or_default();
826 let slept = time_elapsed(&now).as_secs();
827 timeout = Some(cmp::max(
828 t,
829 slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
830 ));
831 } else {
832 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
833 idle_interrupt_receiver.recv().await.unwrap_or_default();
834 };
835
836 info!(ctx, "SMTP fake idle interrupted.")
837 }
838 };
839
840 stop_token
841 .cancelled()
842 .map(|_| {
843 info!(ctx, "Shutting down SMTP loop.");
844 })
845 .race(fut)
846 .await;
847}
848
849impl Scheduler {
850 pub async fn start(ctx: &Context) -> Result<Self> {
852 let (smtp, smtp_handlers) = SmtpConnectionState::new();
853
854 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
855 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
856 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
857
858 let mut oboxes = Vec::new();
859 let mut start_recvs = Vec::new();
860
861 let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
862 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
863 let handle = {
864 let ctx = ctx.clone();
865 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
866 };
867 let inbox = SchedBox {
868 meaning: FolderMeaning::Inbox,
869 conn_state,
870 handle,
871 };
872 start_recvs.push(inbox_start_recv);
873
874 for (meaning, should_watch) in [
875 (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
876 (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
877 ] {
878 if should_watch? {
879 let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
880 let (start_send, start_recv) = oneshot::channel();
881 let ctx = ctx.clone();
882 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
883 oboxes.push(SchedBox {
884 meaning,
885 conn_state,
886 handle,
887 });
888 start_recvs.push(start_recv);
889 }
890 }
891
892 let smtp_handle = {
893 let ctx = ctx.clone();
894 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
895 };
896 start_recvs.push(smtp_start_recv);
897
898 let ephemeral_handle = {
899 let ctx = ctx.clone();
900 task::spawn(async move {
901 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
902 })
903 };
904
905 let location_handle = {
906 let ctx = ctx.clone();
907 task::spawn(async move {
908 location::location_loop(&ctx, location_interrupt_recv).await;
909 })
910 };
911
912 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
913
914 let res = Self {
915 inbox,
916 oboxes,
917 smtp,
918 smtp_handle,
919 ephemeral_handle,
920 ephemeral_interrupt_send,
921 location_handle,
922 location_interrupt_send,
923 recently_seen_loop,
924 };
925
926 if let Err(err) = try_join_all(start_recvs).await {
928 bail!("failed to start scheduler: {}", err);
929 }
930
931 info!(ctx, "scheduler is running");
932 Ok(res)
933 }
934
935 fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
936 once(&self.inbox).chain(self.oboxes.iter())
937 }
938
939 fn maybe_network(&self) {
940 for b in self.boxes() {
941 b.conn_state.interrupt();
942 }
943 self.interrupt_smtp();
944 }
945
946 fn maybe_network_lost(&self) {
947 for b in self.boxes() {
948 b.conn_state.interrupt();
949 }
950 self.interrupt_smtp();
951 }
952
953 fn interrupt_inbox(&self) {
954 self.inbox.conn_state.interrupt();
955 }
956
957 fn interrupt_oboxes(&self) {
958 for b in &self.oboxes {
959 b.conn_state.interrupt();
960 }
961 }
962
963 fn interrupt_smtp(&self) {
964 self.smtp.interrupt();
965 }
966
967 fn interrupt_ephemeral_task(&self) {
968 self.ephemeral_interrupt_send.try_send(()).ok();
969 }
970
971 fn interrupt_location(&self) {
972 self.location_interrupt_send.try_send(()).ok();
973 }
974
975 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
976 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
977 }
978
979 pub(crate) async fn stop(self, context: &Context) {
984 for b in self.boxes() {
986 b.conn_state.stop();
987 }
988 self.smtp.stop();
989
990 let timeout_duration = std::time::Duration::from_secs(30);
992 for b in once(self.inbox).chain(self.oboxes) {
993 tokio::time::timeout(timeout_duration, b.handle)
994 .await
995 .log_err(context)
996 .ok();
997 }
998 tokio::time::timeout(timeout_duration, self.smtp_handle)
999 .await
1000 .log_err(context)
1001 .ok();
1002
1003 self.ephemeral_handle.abort();
1008 self.ephemeral_handle.await.ok();
1009 self.location_handle.abort();
1010 self.location_handle.await.ok();
1011 self.recently_seen_loop.abort().await;
1012 }
1013}
1014
1015#[derive(Debug)]
1017struct ConnectionState {
1018 stop_token: CancellationToken,
1020 idle_interrupt_sender: Sender<()>,
1022 connectivity: ConnectivityStore,
1024}
1025
1026impl ConnectionState {
1027 fn stop(&self) {
1029 self.stop_token.cancel();
1031 }
1032
1033 fn interrupt(&self) {
1034 self.idle_interrupt_sender.try_send(()).ok();
1036 }
1037}
1038
1039#[derive(Debug)]
1040pub(crate) struct SmtpConnectionState {
1041 state: ConnectionState,
1042}
1043
1044impl SmtpConnectionState {
1045 fn new() -> (Self, SmtpConnectionHandlers) {
1046 let stop_token = CancellationToken::new();
1047 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1048
1049 let handlers = SmtpConnectionHandlers {
1050 connection: Smtp::new(),
1051 stop_token: stop_token.clone(),
1052 idle_interrupt_receiver,
1053 };
1054
1055 let state = ConnectionState {
1056 stop_token,
1057 idle_interrupt_sender,
1058 connectivity: handlers.connection.connectivity.clone(),
1059 };
1060
1061 let conn = SmtpConnectionState { state };
1062
1063 (conn, handlers)
1064 }
1065
1066 fn interrupt(&self) {
1068 self.state.interrupt();
1069 }
1070
1071 fn stop(&self) {
1073 self.state.stop();
1074 }
1075}
1076
1077struct SmtpConnectionHandlers {
1078 connection: Smtp,
1079 stop_token: CancellationToken,
1080 idle_interrupt_receiver: Receiver<()>,
1081}
1082
1083#[derive(Debug)]
1084pub(crate) struct ImapConnectionState {
1085 state: ConnectionState,
1086}
1087
1088impl ImapConnectionState {
1089 async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1091 let stop_token = CancellationToken::new();
1092 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1093
1094 let handlers = ImapConnectionHandlers {
1095 connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1096 stop_token: stop_token.clone(),
1097 };
1098
1099 let state = ConnectionState {
1100 stop_token,
1101 idle_interrupt_sender,
1102 connectivity: handlers.connection.connectivity.clone(),
1103 };
1104
1105 let conn = ImapConnectionState { state };
1106
1107 Ok((conn, handlers))
1108 }
1109
1110 fn interrupt(&self) {
1112 self.state.interrupt();
1113 }
1114
1115 fn stop(&self) {
1117 self.state.stop();
1118 }
1119}
1120
1121#[derive(Debug)]
1122struct ImapConnectionHandlers {
1123 connection: Imap,
1124 stop_token: CancellationToken,
1125}