1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{Context as _, Error, Result, bail};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{RwLock, RwLockWriteGuard, oneshot};
12use tokio::task;
13use tokio_util::sync::CancellationToken;
14use tokio_util::task::TaskTracker;
15
16use self::connectivity::ConnectivityStore;
17use crate::config::{self, Config};
18use crate::constants;
19use crate::contact::{ContactId, RecentlySeenLoop};
20use crate::context::Context;
21use crate::download::{DownloadState, download_msg};
22use crate::ephemeral::{self, delete_expired_imap_messages};
23use crate::events::EventType;
24use crate::imap::{FolderMeaning, Imap, session::Session};
25use crate::location;
26use crate::log::{LogExt, error, info, warn};
27use crate::message::MsgId;
28use crate::smtp::{Smtp, send_smtp_messages};
29use crate::sql;
30use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
31
32pub(crate) mod connectivity;
33
34#[derive(Debug, Default)]
40pub(crate) struct SchedulerState {
41 inner: RwLock<InnerSchedulerState>,
42}
43
44impl SchedulerState {
45 pub(crate) fn new() -> Self {
46 Default::default()
47 }
48
49 pub(crate) async fn is_running(&self) -> bool {
51 let inner = self.inner.read().await;
52 matches!(*inner, InnerSchedulerState::Started(_))
53 }
54
55 pub(crate) async fn start(&self, context: Context) {
57 let mut inner = self.inner.write().await;
58 match *inner {
59 InnerSchedulerState::Started(_) => (),
60 InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
61 InnerSchedulerState::Paused {
62 ref mut started, ..
63 } => *started = true,
64 }
65 }
66
67 async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
69 info!(context, "starting IO");
70
71 context.new_msgs_notify.notify_one();
74
75 let ctx = context.clone();
76 match Scheduler::start(&context).await {
77 Ok(scheduler) => {
78 *inner = InnerSchedulerState::Started(scheduler);
79 context.emit_event(EventType::ConnectivityChanged);
80 }
81 Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
82 }
83 }
84
85 pub(crate) async fn stop(&self, context: &Context) {
87 let mut inner = self.inner.write().await;
88 match *inner {
89 InnerSchedulerState::Started(_) => {
90 Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
91 }
92 InnerSchedulerState::Stopped => (),
93 InnerSchedulerState::Paused {
94 ref mut started, ..
95 } => *started = false,
96 }
97 }
98
99 async fn do_stop(
101 mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
102 context: &Context,
103 new_state: InnerSchedulerState,
104 ) {
105 info!(context, "stopping IO");
111
112 context.new_msgs_notify.notify_one();
115
116 let debug_logging = context
117 .debug_logging
118 .write()
119 .expect("RwLock is poisoned")
120 .take();
121 if let Some(debug_logging) = debug_logging {
122 debug_logging.loop_handle.abort();
123 debug_logging.loop_handle.await.ok();
124 }
125 let prev_state = std::mem::replace(&mut *inner, new_state);
126 context.emit_event(EventType::ConnectivityChanged);
127 match prev_state {
128 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
129 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
130 }
131 }
132
133 pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
142 {
143 let mut inner = self.inner.write().await;
144 match *inner {
145 InnerSchedulerState::Started(_) => {
146 let new_state = InnerSchedulerState::Paused {
147 started: true,
148 pause_guards_count: NonZeroUsize::new(1).unwrap(),
149 };
150 Self::do_stop(inner, &context, new_state).await;
151 }
152 InnerSchedulerState::Stopped => {
153 *inner = InnerSchedulerState::Paused {
154 started: false,
155 pause_guards_count: NonZeroUsize::new(1).unwrap(),
156 };
157 }
158 InnerSchedulerState::Paused {
159 ref mut pause_guards_count,
160 ..
161 } => {
162 *pause_guards_count = pause_guards_count
163 .checked_add(1)
164 .ok_or_else(|| Error::msg("Too many pause guards active"))?
165 }
166 }
167 }
168
169 let (tx, rx) = oneshot::channel();
170 tokio::spawn(async move {
171 rx.await.ok();
172 let mut inner = context.scheduler.inner.write().await;
173 match *inner {
174 InnerSchedulerState::Started(_) => {
175 warn!(&context, "IoPausedGuard resume: started instead of paused");
176 }
177 InnerSchedulerState::Stopped => {
178 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
179 }
180 InnerSchedulerState::Paused {
181 ref started,
182 ref mut pause_guards_count,
183 } => {
184 if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
185 match *started {
186 true => SchedulerState::do_start(inner, context.clone()).await,
187 false => *inner = InnerSchedulerState::Stopped,
188 }
189 } else {
190 let new_count = pause_guards_count.get() - 1;
191 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
193 }
194 }
195 }
196 });
197 Ok(IoPausedGuard { sender: Some(tx) })
198 }
199
200 pub(crate) async fn restart(&self, context: &Context) {
202 info!(context, "restarting IO");
203 if self.is_running().await {
204 self.stop(context).await;
205 self.start(context.clone()).await;
206 }
207 }
208
209 pub(crate) async fn maybe_network(&self) {
211 let inner = self.inner.read().await;
212 let (inbox, oboxes) = match *inner {
213 InnerSchedulerState::Started(ref scheduler) => {
214 scheduler.maybe_network();
215 let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
216 let oboxes = scheduler
217 .oboxes
218 .iter()
219 .map(|b| b.conn_state.state.connectivity.clone())
220 .collect::<Vec<_>>();
221 (inbox, oboxes)
222 }
223 _ => return,
224 };
225 drop(inner);
226 connectivity::idle_interrupted(inbox, oboxes).await;
227 }
228
229 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
231 let inner = self.inner.read().await;
232 let stores = match *inner {
233 InnerSchedulerState::Started(ref scheduler) => {
234 scheduler.maybe_network_lost();
235 scheduler
236 .boxes()
237 .map(|b| b.conn_state.state.connectivity.clone())
238 .collect()
239 }
240 _ => return,
241 };
242 drop(inner);
243 connectivity::maybe_network_lost(context, stores).await;
244 }
245
246 pub(crate) async fn interrupt_inbox(&self) {
247 let inner = self.inner.read().await;
248 if let InnerSchedulerState::Started(ref scheduler) = *inner {
249 scheduler.interrupt_inbox();
250 }
251 }
252
253 pub(crate) async fn interrupt_oboxes(&self) {
255 let inner = self.inner.read().await;
256 if let InnerSchedulerState::Started(ref scheduler) = *inner {
257 scheduler.interrupt_oboxes();
258 }
259 }
260
261 pub(crate) async fn interrupt_smtp(&self) {
262 let inner = self.inner.read().await;
263 if let InnerSchedulerState::Started(ref scheduler) = *inner {
264 scheduler.interrupt_smtp();
265 }
266 }
267
268 pub(crate) async fn interrupt_ephemeral_task(&self) {
269 let inner = self.inner.read().await;
270 if let InnerSchedulerState::Started(ref scheduler) = *inner {
271 scheduler.interrupt_ephemeral_task();
272 }
273 }
274
275 pub(crate) async fn interrupt_location(&self) {
276 let inner = self.inner.read().await;
277 if let InnerSchedulerState::Started(ref scheduler) = *inner {
278 scheduler.interrupt_location();
279 }
280 }
281
282 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
283 let inner = self.inner.read().await;
284 if let InnerSchedulerState::Started(ref scheduler) = *inner {
285 scheduler.interrupt_recently_seen(contact_id, timestamp);
286 }
287 }
288}
289
290#[derive(Debug, Default)]
291enum InnerSchedulerState {
292 Started(Scheduler),
293 #[default]
294 Stopped,
295 Paused {
296 started: bool,
297 pause_guards_count: NonZeroUsize,
298 },
299}
300
301#[derive(Default, Debug)]
306pub(crate) struct IoPausedGuard {
307 sender: Option<oneshot::Sender<()>>,
308}
309
310impl Drop for IoPausedGuard {
311 fn drop(&mut self) {
312 if let Some(sender) = self.sender.take() {
313 sender.send(()).ok();
315 }
316 }
317}
318
319#[derive(Debug)]
320struct SchedBox {
321 meaning: FolderMeaning,
322 conn_state: ImapConnectionState,
323
324 handle: task::JoinHandle<()>,
326}
327
328#[derive(Debug)]
330pub(crate) struct Scheduler {
331 inbox: SchedBox,
332 oboxes: Vec<SchedBox>,
334 smtp: SmtpConnectionState,
335 smtp_handle: task::JoinHandle<()>,
336 ephemeral_handle: task::JoinHandle<()>,
337 ephemeral_interrupt_send: Sender<()>,
338 location_handle: task::JoinHandle<()>,
339 location_interrupt_send: Sender<()>,
340
341 recently_seen_loop: RecentlySeenLoop,
342}
343
344async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
345 let msg_ids = context
346 .sql
347 .query_map(
348 "SELECT msg_id FROM download",
349 (),
350 |row| {
351 let msg_id: MsgId = row.get(0)?;
352 Ok(msg_id)
353 },
354 |rowids| {
355 rowids
356 .collect::<std::result::Result<Vec<_>, _>>()
357 .map_err(Into::into)
358 },
359 )
360 .await?;
361
362 for msg_id in msg_ids {
363 if let Err(err) = download_msg(context, msg_id, session).await {
364 warn!(context, "Failed to download message {msg_id}: {:#}.", err);
365
366 msg_id
373 .update_download_state(context, DownloadState::Failure)
374 .await?;
375 }
376 context
377 .sql
378 .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
379 .await?;
380 }
381
382 Ok(())
383}
384
385async fn inbox_loop(
386 ctx: Context,
387 started: oneshot::Sender<()>,
388 inbox_handlers: ImapConnectionHandlers,
389) {
390 use futures::future::FutureExt;
391
392 info!(ctx, "Starting inbox loop.");
393 let ImapConnectionHandlers {
394 mut connection,
395 stop_token,
396 } = inbox_handlers;
397
398 let ctx1 = ctx.clone();
399 let fut = async move {
400 let ctx = ctx1;
401 if let Err(()) = started.send(()) {
402 warn!(ctx, "Inbox loop, missing started receiver.");
403 return;
404 };
405
406 let mut old_session: Option<Session> = None;
407 loop {
408 let session = if let Some(session) = old_session.take() {
409 session
410 } else {
411 info!(ctx, "Preparing new IMAP session for inbox.");
412 match connection.prepare(&ctx).await {
413 Err(err) => {
414 warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
415 continue;
416 }
417 Ok(session) => session,
418 }
419 };
420
421 match inbox_fetch_idle(&ctx, &mut connection, session).await {
422 Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
423 Ok(session) => {
424 info!(
425 ctx,
426 "IMAP loop iteration for inbox finished, keeping the session."
427 );
428 old_session = Some(session);
429 }
430 }
431 }
432 };
433
434 stop_token
435 .cancelled()
436 .map(|_| {
437 info!(ctx, "Shutting down inbox loop.");
438 })
439 .race(fut)
440 .await;
441}
442
443pub async fn convert_folder_meaning(
449 ctx: &Context,
450 folder_meaning: FolderMeaning,
451) -> Result<Option<(Config, String)>> {
452 let folder_config = match folder_meaning.to_config() {
453 Some(c) => c,
454 None => {
455 return Ok(None);
458 }
459 };
460
461 let folder = ctx
462 .get_config(folder_config)
463 .await
464 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
465
466 if let Some(watch_folder) = folder {
467 Ok(Some((folder_config, watch_folder)))
468 } else {
469 Ok(None)
470 }
471}
472
473async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
474 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
475 ctx.set_config_internal(
476 Config::IsChatmail,
477 crate::config::from_bool(session.is_chatmail()),
478 )
479 .await?;
480 }
481
482 if ctx.quota_needs_update(60).await {
484 if let Err(err) = ctx.update_recent_quota(&mut session).await {
485 warn!(ctx, "Failed to update quota: {:#}.", err);
486 }
487 }
488
489 let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
490 if resync_requested {
491 if let Err(err) = session.resync_folders(ctx).await {
492 warn!(ctx, "Failed to resync folders: {:#}.", err);
493 ctx.resync_request.store(true, Ordering::Relaxed);
494 }
495 }
496
497 maybe_add_time_based_warnings(ctx).await;
498
499 match ctx.get_config_i64(Config::LastHousekeeping).await {
500 Ok(last_housekeeping_time) => {
501 let next_housekeeping_time =
502 last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
503 if next_housekeeping_time <= time() {
504 sql::housekeeping(ctx).await.log_err(ctx).ok();
505 }
506 }
507 Err(err) => {
508 warn!(ctx, "Failed to get last housekeeping time: {}", err);
509 }
510 };
511
512 match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
513 Ok(fetched_existing_msgs) => {
514 if !fetched_existing_msgs {
515 if let Err(err) = ctx
520 .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
521 .await
522 {
523 warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
524 }
525
526 if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
527 warn!(ctx, "Failed to fetch existing messages: {:#}", err);
528 }
529 }
530 }
531 Err(err) => {
532 warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
533 }
534 }
535
536 download_msgs(ctx, &mut session)
537 .await
538 .context("Failed to download messages")?;
539 session
540 .fetch_metadata(ctx)
541 .await
542 .context("Failed to fetch metadata")?;
543 session
544 .register_token(ctx)
545 .await
546 .context("Failed to register push token")?;
547
548 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
549 Ok(session)
550}
551
552async fn fetch_idle(
558 ctx: &Context,
559 connection: &mut Imap,
560 mut session: Session,
561 folder_meaning: FolderMeaning,
562) -> Result<Session> {
563 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
564 else {
565 connection.connectivity.set_not_configured(ctx).await;
569 connection.idle_interrupt_receiver.recv().await.ok();
570 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
571 };
572
573 if folder_config == Config::ConfiguredInboxFolder {
574 let mvbox;
575 let syncbox = match ctx.should_move_sync_msgs().await? {
576 false => &watch_folder,
577 true => {
578 mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
579 mvbox.as_deref().unwrap_or(&watch_folder)
580 }
581 };
582 session
583 .send_sync_msgs(ctx, syncbox)
584 .await
585 .context("fetch_idle: send_sync_msgs")
586 .log_err(ctx)
587 .ok();
588
589 session
590 .store_seen_flags_on_imap(ctx)
591 .await
592 .context("store_seen_flags_on_imap")?;
593 }
594
595 if !ctx.should_delete_to_trash().await?
596 || ctx
597 .get_config(Config::ConfiguredTrashFolder)
598 .await?
599 .is_some()
600 {
601 connection
603 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
604 .await
605 .context("fetch_move_delete")?;
606
607 delete_expired_imap_messages(ctx)
612 .await
613 .context("delete_expired_imap_messages")?;
614 } else if folder_config == Config::ConfiguredInboxFolder {
615 ctx.last_full_folder_scan.lock().await.take();
616 }
617
618 if folder_config == Config::ConfiguredInboxFolder {
623 match connection
625 .scan_folders(ctx, &mut session)
626 .await
627 .context("scan_folders")
628 {
629 Err(err) => {
630 warn!(ctx, "{:#}", err);
633 }
634 Ok(true) => {
635 connection
642 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
643 .await
644 .context("fetch_move_delete after scan_folders")?;
645 }
646 Ok(false) => {}
647 }
648 }
649
650 session
652 .sync_seen_flags(ctx, &watch_folder)
653 .await
654 .context("sync_seen_flags")
655 .log_err(ctx)
656 .ok();
657
658 connection.connectivity.set_idle(ctx).await;
659
660 ctx.emit_event(EventType::ImapInboxIdle);
661
662 if !session.can_idle() {
663 info!(
664 ctx,
665 "IMAP session does not support IDLE, going to fake idle."
666 );
667 connection.fake_idle(ctx, watch_folder).await?;
668 return Ok(session);
669 }
670
671 if ctx
672 .get_config_bool(Config::DisableIdle)
673 .await
674 .context("Failed to get disable_idle config")
675 .log_err(ctx)
676 .unwrap_or_default()
677 {
678 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
679 connection.fake_idle(ctx, watch_folder).await?;
680 return Ok(session);
681 }
682
683 info!(
684 ctx,
685 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
686 );
687 let session = session
688 .idle(
689 ctx,
690 connection.idle_interrupt_receiver.clone(),
691 &watch_folder,
692 )
693 .await
694 .context("idle")?;
695
696 Ok(session)
697}
698
699async fn simple_imap_loop(
700 ctx: Context,
701 started: oneshot::Sender<()>,
702 inbox_handlers: ImapConnectionHandlers,
703 folder_meaning: FolderMeaning,
704) {
705 use futures::future::FutureExt;
706
707 info!(ctx, "Starting simple loop for {folder_meaning}.");
708 let ImapConnectionHandlers {
709 mut connection,
710 stop_token,
711 } = inbox_handlers;
712
713 let ctx1 = ctx.clone();
714
715 let fut = async move {
716 let ctx = ctx1;
717 if let Err(()) = started.send(()) {
718 warn!(
719 ctx,
720 "Simple imap loop for {folder_meaning}, missing started receiver."
721 );
722 return;
723 }
724
725 let mut old_session: Option<Session> = None;
726 loop {
727 let session = if let Some(session) = old_session.take() {
728 session
729 } else {
730 info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
731 match connection.prepare(&ctx).await {
732 Err(err) => {
733 warn!(
734 ctx,
735 "Failed to prepare {folder_meaning} connection: {err:#}."
736 );
737 continue;
738 }
739 Ok(session) => session,
740 }
741 };
742
743 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
744 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
745 Ok(session) => {
746 info!(
747 ctx,
748 "IMAP loop iteration for {folder_meaning} finished, keeping the session"
749 );
750 old_session = Some(session);
751 }
752 }
753 }
754 };
755
756 stop_token
757 .cancelled()
758 .map(|_| {
759 info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
760 })
761 .race(fut)
762 .await;
763}
764
765async fn smtp_loop(
766 ctx: Context,
767 started: oneshot::Sender<()>,
768 smtp_handlers: SmtpConnectionHandlers,
769) {
770 use futures::future::FutureExt;
771
772 info!(ctx, "Starting SMTP loop.");
773 let SmtpConnectionHandlers {
774 mut connection,
775 stop_token,
776 idle_interrupt_receiver,
777 } = smtp_handlers;
778
779 let ctx1 = ctx.clone();
780 let fut = async move {
781 let ctx = ctx1;
782 if let Err(()) = started.send(()) {
783 warn!(&ctx, "SMTP loop, missing started receiver.");
784 return;
785 }
786
787 let mut timeout = None;
788 loop {
789 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
790 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
791 timeout = Some(timeout.unwrap_or(30));
792 } else {
793 timeout = None;
794 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
795 if !duration_until_can_send.is_zero() {
796 info!(
797 ctx,
798 "smtp got rate limited, waiting for {} until can send again",
799 duration_to_str(duration_until_can_send)
800 );
801 tokio::time::sleep(duration_until_can_send).await;
802 continue;
803 }
804 }
805
806 info!(ctx, "SMTP fake idle started.");
808 match &connection.last_send_error {
809 None => connection.connectivity.set_idle(&ctx).await,
810 Some(err) => connection.connectivity.set_err(&ctx, err).await,
811 }
812
813 if let Some(t) = timeout {
818 let now = tools::Time::now();
819 info!(
820 ctx,
821 "SMTP has messages to retry, planning to retry {t} seconds later."
822 );
823 let duration = std::time::Duration::from_secs(t);
824 tokio::time::timeout(duration, async {
825 idle_interrupt_receiver.recv().await.unwrap_or_default()
826 })
827 .await
828 .unwrap_or_default();
829 let slept = time_elapsed(&now).as_secs();
830 timeout = Some(cmp::max(
831 t,
832 slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
833 ));
834 } else {
835 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
836 idle_interrupt_receiver.recv().await.unwrap_or_default();
837 };
838
839 info!(ctx, "SMTP fake idle interrupted.")
840 }
841 };
842
843 stop_token
844 .cancelled()
845 .map(|_| {
846 info!(ctx, "Shutting down SMTP loop.");
847 })
848 .race(fut)
849 .await;
850}
851
852impl Scheduler {
853 pub async fn start(ctx: &Context) -> Result<Self> {
855 let (smtp, smtp_handlers) = SmtpConnectionState::new();
856
857 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
858 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
859 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
860
861 let mut oboxes = Vec::new();
862 let mut start_recvs = Vec::new();
863
864 let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
865 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
866 let handle = {
867 let ctx = ctx.clone();
868 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
869 };
870 let inbox = SchedBox {
871 meaning: FolderMeaning::Inbox,
872 conn_state,
873 handle,
874 };
875 start_recvs.push(inbox_start_recv);
876
877 for (meaning, should_watch) in [
878 (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
879 (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
880 ] {
881 if should_watch? {
882 let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
883 let (start_send, start_recv) = oneshot::channel();
884 let ctx = ctx.clone();
885 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
886 oboxes.push(SchedBox {
887 meaning,
888 conn_state,
889 handle,
890 });
891 start_recvs.push(start_recv);
892 }
893 }
894
895 let smtp_handle = {
896 let ctx = ctx.clone();
897 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
898 };
899 start_recvs.push(smtp_start_recv);
900
901 let ephemeral_handle = {
902 let ctx = ctx.clone();
903 task::spawn(async move {
904 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
905 })
906 };
907
908 let location_handle = {
909 let ctx = ctx.clone();
910 task::spawn(async move {
911 location::location_loop(&ctx, location_interrupt_recv).await;
912 })
913 };
914
915 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
916
917 let res = Self {
918 inbox,
919 oboxes,
920 smtp,
921 smtp_handle,
922 ephemeral_handle,
923 ephemeral_interrupt_send,
924 location_handle,
925 location_interrupt_send,
926 recently_seen_loop,
927 };
928
929 if let Err(err) = try_join_all(start_recvs).await {
931 bail!("failed to start scheduler: {}", err);
932 }
933
934 info!(ctx, "scheduler is running");
935 Ok(res)
936 }
937
938 fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
939 once(&self.inbox).chain(self.oboxes.iter())
940 }
941
942 fn maybe_network(&self) {
943 for b in self.boxes() {
944 b.conn_state.interrupt();
945 }
946 self.interrupt_smtp();
947 }
948
949 fn maybe_network_lost(&self) {
950 for b in self.boxes() {
951 b.conn_state.interrupt();
952 }
953 self.interrupt_smtp();
954 }
955
956 fn interrupt_inbox(&self) {
957 self.inbox.conn_state.interrupt();
958 }
959
960 fn interrupt_oboxes(&self) {
961 for b in &self.oboxes {
962 b.conn_state.interrupt();
963 }
964 }
965
966 fn interrupt_smtp(&self) {
967 self.smtp.interrupt();
968 }
969
970 fn interrupt_ephemeral_task(&self) {
971 self.ephemeral_interrupt_send.try_send(()).ok();
972 }
973
974 fn interrupt_location(&self) {
975 self.location_interrupt_send.try_send(()).ok();
976 }
977
978 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
979 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
980 }
981
982 pub(crate) async fn stop(self, context: &Context) {
987 for b in self.boxes() {
989 b.conn_state.stop();
990 }
991 self.smtp.stop();
992
993 let timeout_duration = std::time::Duration::from_secs(30);
995
996 let tracker = TaskTracker::new();
997 for b in once(self.inbox).chain(self.oboxes) {
998 let context = context.clone();
999 tracker.spawn(async move {
1000 tokio::time::timeout(timeout_duration, b.handle)
1001 .await
1002 .log_err(&context)
1003 });
1004 }
1005 {
1006 let context = context.clone();
1007 tracker.spawn(async move {
1008 tokio::time::timeout(timeout_duration, self.smtp_handle)
1009 .await
1010 .log_err(&context)
1011 });
1012 }
1013 tracker.close();
1014 tracker.wait().await;
1015
1016 self.ephemeral_handle.abort();
1021 self.ephemeral_handle.await.ok();
1022 self.location_handle.abort();
1023 self.location_handle.await.ok();
1024 self.recently_seen_loop.abort().await;
1025 }
1026}
1027
1028#[derive(Debug)]
1030struct ConnectionState {
1031 stop_token: CancellationToken,
1033 idle_interrupt_sender: Sender<()>,
1035 connectivity: ConnectivityStore,
1037}
1038
1039impl ConnectionState {
1040 fn stop(&self) {
1042 self.stop_token.cancel();
1044 }
1045
1046 fn interrupt(&self) {
1047 self.idle_interrupt_sender.try_send(()).ok();
1049 }
1050}
1051
1052#[derive(Debug)]
1053pub(crate) struct SmtpConnectionState {
1054 state: ConnectionState,
1055}
1056
1057impl SmtpConnectionState {
1058 fn new() -> (Self, SmtpConnectionHandlers) {
1059 let stop_token = CancellationToken::new();
1060 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1061
1062 let handlers = SmtpConnectionHandlers {
1063 connection: Smtp::new(),
1064 stop_token: stop_token.clone(),
1065 idle_interrupt_receiver,
1066 };
1067
1068 let state = ConnectionState {
1069 stop_token,
1070 idle_interrupt_sender,
1071 connectivity: handlers.connection.connectivity.clone(),
1072 };
1073
1074 let conn = SmtpConnectionState { state };
1075
1076 (conn, handlers)
1077 }
1078
1079 fn interrupt(&self) {
1081 self.state.interrupt();
1082 }
1083
1084 fn stop(&self) {
1086 self.state.stop();
1087 }
1088}
1089
1090struct SmtpConnectionHandlers {
1091 connection: Smtp,
1092 stop_token: CancellationToken,
1093 idle_interrupt_receiver: Receiver<()>,
1094}
1095
1096#[derive(Debug)]
1097pub(crate) struct ImapConnectionState {
1098 state: ConnectionState,
1099}
1100
1101impl ImapConnectionState {
1102 async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1104 let stop_token = CancellationToken::new();
1105 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1106
1107 let handlers = ImapConnectionHandlers {
1108 connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1109 stop_token: stop_token.clone(),
1110 };
1111
1112 let state = ConnectionState {
1113 stop_token,
1114 idle_interrupt_sender,
1115 connectivity: handlers.connection.connectivity.clone(),
1116 };
1117
1118 let conn = ImapConnectionState { state };
1119
1120 Ok((conn, handlers))
1121 }
1122
1123 fn interrupt(&self) {
1125 self.state.interrupt();
1126 }
1127
1128 fn stop(&self) {
1130 self.state.stop();
1131 }
1132}
1133
1134#[derive(Debug)]
1135struct ImapConnectionHandlers {
1136 connection: Imap,
1137 stop_token: CancellationToken,
1138}