1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{bail, Context as _, Error, Result};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
12use tokio::task;
13
14use self::connectivity::ConnectivityStore;
15use crate::config::{self, Config};
16use crate::contact::{ContactId, RecentlySeenLoop};
17use crate::context::Context;
18use crate::download::{download_msg, DownloadState};
19use crate::ephemeral::{self, delete_expired_imap_messages};
20use crate::events::EventType;
21use crate::imap::{session::Session, FolderMeaning, Imap};
22use crate::location;
23use crate::log::LogExt;
24use crate::message::MsgId;
25use crate::smtp::{send_smtp_messages, Smtp};
26use crate::sql;
27use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
28
29pub(crate) mod connectivity;
30
31#[derive(Debug, Default)]
37pub(crate) struct SchedulerState {
38 inner: RwLock<InnerSchedulerState>,
39}
40
41impl SchedulerState {
42 pub(crate) fn new() -> Self {
43 Default::default()
44 }
45
46 pub(crate) async fn is_running(&self) -> bool {
48 let inner = self.inner.read().await;
49 matches!(*inner, InnerSchedulerState::Started(_))
50 }
51
52 pub(crate) async fn start(&self, context: Context) {
54 let mut inner = self.inner.write().await;
55 match *inner {
56 InnerSchedulerState::Started(_) => (),
57 InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
58 InnerSchedulerState::Paused {
59 ref mut started, ..
60 } => *started = true,
61 }
62 }
63
64 async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
66 info!(context, "starting IO");
67
68 context.new_msgs_notify.notify_one();
71
72 let ctx = context.clone();
73 match Scheduler::start(&context).await {
74 Ok(scheduler) => {
75 *inner = InnerSchedulerState::Started(scheduler);
76 context.emit_event(EventType::ConnectivityChanged);
77 }
78 Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
79 }
80 }
81
82 pub(crate) async fn stop(&self, context: &Context) {
84 let mut inner = self.inner.write().await;
85 match *inner {
86 InnerSchedulerState::Started(_) => {
87 Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
88 }
89 InnerSchedulerState::Stopped => (),
90 InnerSchedulerState::Paused {
91 ref mut started, ..
92 } => *started = false,
93 }
94 }
95
96 async fn do_stop(
98 mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
99 context: &Context,
100 new_state: InnerSchedulerState,
101 ) {
102 info!(context, "stopping IO");
108
109 context.new_msgs_notify.notify_one();
112
113 let debug_logging = context
114 .debug_logging
115 .write()
116 .expect("RwLock is poisoned")
117 .take();
118 if let Some(debug_logging) = debug_logging {
119 debug_logging.loop_handle.abort();
120 debug_logging.loop_handle.await.ok();
121 }
122 let prev_state = std::mem::replace(&mut *inner, new_state);
123 context.emit_event(EventType::ConnectivityChanged);
124 match prev_state {
125 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
126 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
127 }
128 }
129
130 pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
139 {
140 let mut inner = self.inner.write().await;
141 match *inner {
142 InnerSchedulerState::Started(_) => {
143 let new_state = InnerSchedulerState::Paused {
144 started: true,
145 pause_guards_count: NonZeroUsize::new(1).unwrap(),
146 };
147 Self::do_stop(inner, &context, new_state).await;
148 }
149 InnerSchedulerState::Stopped => {
150 *inner = InnerSchedulerState::Paused {
151 started: false,
152 pause_guards_count: NonZeroUsize::new(1).unwrap(),
153 };
154 }
155 InnerSchedulerState::Paused {
156 ref mut pause_guards_count,
157 ..
158 } => {
159 *pause_guards_count = pause_guards_count
160 .checked_add(1)
161 .ok_or_else(|| Error::msg("Too many pause guards active"))?
162 }
163 }
164 }
165
166 let (tx, rx) = oneshot::channel();
167 tokio::spawn(async move {
168 rx.await.ok();
169 let mut inner = context.scheduler.inner.write().await;
170 match *inner {
171 InnerSchedulerState::Started(_) => {
172 warn!(&context, "IoPausedGuard resume: started instead of paused");
173 }
174 InnerSchedulerState::Stopped => {
175 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
176 }
177 InnerSchedulerState::Paused {
178 ref started,
179 ref mut pause_guards_count,
180 } => {
181 if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
182 match *started {
183 true => SchedulerState::do_start(inner, context.clone()).await,
184 false => *inner = InnerSchedulerState::Stopped,
185 }
186 } else {
187 let new_count = pause_guards_count.get() - 1;
188 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
190 }
191 }
192 }
193 });
194 Ok(IoPausedGuard { sender: Some(tx) })
195 }
196
197 pub(crate) async fn restart(&self, context: &Context) {
199 info!(context, "restarting IO");
200 if self.is_running().await {
201 self.stop(context).await;
202 self.start(context.clone()).await;
203 }
204 }
205
206 pub(crate) async fn maybe_network(&self) {
208 let inner = self.inner.read().await;
209 let (inbox, oboxes) = match *inner {
210 InnerSchedulerState::Started(ref scheduler) => {
211 scheduler.maybe_network();
212 let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
213 let oboxes = scheduler
214 .oboxes
215 .iter()
216 .map(|b| b.conn_state.state.connectivity.clone())
217 .collect::<Vec<_>>();
218 (inbox, oboxes)
219 }
220 _ => return,
221 };
222 drop(inner);
223 connectivity::idle_interrupted(inbox, oboxes).await;
224 }
225
226 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
228 let inner = self.inner.read().await;
229 let stores = match *inner {
230 InnerSchedulerState::Started(ref scheduler) => {
231 scheduler.maybe_network_lost();
232 scheduler
233 .boxes()
234 .map(|b| b.conn_state.state.connectivity.clone())
235 .collect()
236 }
237 _ => return,
238 };
239 drop(inner);
240 connectivity::maybe_network_lost(context, stores).await;
241 }
242
243 pub(crate) async fn interrupt_inbox(&self) {
244 let inner = self.inner.read().await;
245 if let InnerSchedulerState::Started(ref scheduler) = *inner {
246 scheduler.interrupt_inbox();
247 }
248 }
249
250 pub(crate) async fn interrupt_oboxes(&self) {
252 let inner = self.inner.read().await;
253 if let InnerSchedulerState::Started(ref scheduler) = *inner {
254 scheduler.interrupt_oboxes();
255 }
256 }
257
258 pub(crate) async fn interrupt_smtp(&self) {
259 let inner = self.inner.read().await;
260 if let InnerSchedulerState::Started(ref scheduler) = *inner {
261 scheduler.interrupt_smtp();
262 }
263 }
264
265 pub(crate) async fn interrupt_ephemeral_task(&self) {
266 let inner = self.inner.read().await;
267 if let InnerSchedulerState::Started(ref scheduler) = *inner {
268 scheduler.interrupt_ephemeral_task();
269 }
270 }
271
272 pub(crate) async fn interrupt_location(&self) {
273 let inner = self.inner.read().await;
274 if let InnerSchedulerState::Started(ref scheduler) = *inner {
275 scheduler.interrupt_location();
276 }
277 }
278
279 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
280 let inner = self.inner.read().await;
281 if let InnerSchedulerState::Started(ref scheduler) = *inner {
282 scheduler.interrupt_recently_seen(contact_id, timestamp);
283 }
284 }
285}
286
287#[derive(Debug, Default)]
288enum InnerSchedulerState {
289 Started(Scheduler),
290 #[default]
291 Stopped,
292 Paused {
293 started: bool,
294 pause_guards_count: NonZeroUsize,
295 },
296}
297
298#[derive(Default, Debug)]
303pub(crate) struct IoPausedGuard {
304 sender: Option<oneshot::Sender<()>>,
305}
306
307impl Drop for IoPausedGuard {
308 fn drop(&mut self) {
309 if let Some(sender) = self.sender.take() {
310 sender.send(()).ok();
312 }
313 }
314}
315
316#[derive(Debug)]
317struct SchedBox {
318 meaning: FolderMeaning,
319 conn_state: ImapConnectionState,
320
321 handle: task::JoinHandle<()>,
323}
324
325#[derive(Debug)]
327pub(crate) struct Scheduler {
328 inbox: SchedBox,
329 oboxes: Vec<SchedBox>,
331 smtp: SmtpConnectionState,
332 smtp_handle: task::JoinHandle<()>,
333 ephemeral_handle: task::JoinHandle<()>,
334 ephemeral_interrupt_send: Sender<()>,
335 location_handle: task::JoinHandle<()>,
336 location_interrupt_send: Sender<()>,
337
338 recently_seen_loop: RecentlySeenLoop,
339}
340
341async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
342 let msg_ids = context
343 .sql
344 .query_map(
345 "SELECT msg_id FROM download",
346 (),
347 |row| {
348 let msg_id: MsgId = row.get(0)?;
349 Ok(msg_id)
350 },
351 |rowids| {
352 rowids
353 .collect::<std::result::Result<Vec<_>, _>>()
354 .map_err(Into::into)
355 },
356 )
357 .await?;
358
359 for msg_id in msg_ids {
360 if let Err(err) = download_msg(context, msg_id, session).await {
361 warn!(context, "Failed to download message {msg_id}: {:#}.", err);
362
363 msg_id
370 .update_download_state(context, DownloadState::Failure)
371 .await?;
372 }
373 context
374 .sql
375 .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
376 .await?;
377 }
378
379 Ok(())
380}
381
382async fn inbox_loop(
383 ctx: Context,
384 started: oneshot::Sender<()>,
385 inbox_handlers: ImapConnectionHandlers,
386) {
387 use futures::future::FutureExt;
388
389 info!(ctx, "Starting inbox loop.");
390 let ImapConnectionHandlers {
391 mut connection,
392 stop_receiver,
393 } = inbox_handlers;
394
395 let ctx1 = ctx.clone();
396 let fut = async move {
397 let ctx = ctx1;
398 if let Err(()) = started.send(()) {
399 warn!(ctx, "Inbox loop, missing started receiver.");
400 return;
401 };
402
403 let mut old_session: Option<Session> = None;
404 loop {
405 let session = if let Some(session) = old_session.take() {
406 session
407 } else {
408 info!(ctx, "Preparing new IMAP session for inbox.");
409 match connection.prepare(&ctx).await {
410 Err(err) => {
411 warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
412 continue;
413 }
414 Ok(session) => session,
415 }
416 };
417
418 match inbox_fetch_idle(&ctx, &mut connection, session).await {
419 Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
420 Ok(session) => {
421 info!(
422 ctx,
423 "IMAP loop iteration for inbox finished, keeping the session."
424 );
425 old_session = Some(session);
426 }
427 }
428 }
429 };
430
431 stop_receiver
432 .recv()
433 .map(|_| {
434 info!(ctx, "Shutting down inbox loop.");
435 })
436 .race(fut)
437 .await;
438}
439
440pub async fn convert_folder_meaning(
446 ctx: &Context,
447 folder_meaning: FolderMeaning,
448) -> Result<Option<(Config, String)>> {
449 let folder_config = match folder_meaning.to_config() {
450 Some(c) => c,
451 None => {
452 return Ok(None);
455 }
456 };
457
458 let folder = ctx
459 .get_config(folder_config)
460 .await
461 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
462
463 if let Some(watch_folder) = folder {
464 Ok(Some((folder_config, watch_folder)))
465 } else {
466 Ok(None)
467 }
468}
469
470async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
471 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
472 ctx.set_config_internal(
473 Config::IsChatmail,
474 crate::config::from_bool(session.is_chatmail()),
475 )
476 .await?;
477 }
478
479 if ctx.quota_needs_update(60).await {
481 if let Err(err) = ctx.update_recent_quota(&mut session).await {
482 warn!(ctx, "Failed to update quota: {:#}.", err);
483 }
484 }
485
486 let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
487 if resync_requested {
488 if let Err(err) = session.resync_folders(ctx).await {
489 warn!(ctx, "Failed to resync folders: {:#}.", err);
490 ctx.resync_request.store(true, Ordering::Relaxed);
491 }
492 }
493
494 maybe_add_time_based_warnings(ctx).await;
495
496 match ctx.get_config_i64(Config::LastHousekeeping).await {
497 Ok(last_housekeeping_time) => {
498 let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24);
499 if next_housekeeping_time <= time() {
500 sql::housekeeping(ctx).await.log_err(ctx).ok();
501 }
502 }
503 Err(err) => {
504 warn!(ctx, "Failed to get last housekeeping time: {}", err);
505 }
506 };
507
508 match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
509 Ok(fetched_existing_msgs) => {
510 if !fetched_existing_msgs {
511 if let Err(err) = ctx
516 .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
517 .await
518 {
519 warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
520 }
521
522 if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
523 warn!(ctx, "Failed to fetch existing messages: {:#}", err);
524 }
525 }
526 }
527 Err(err) => {
528 warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
529 }
530 }
531
532 download_msgs(ctx, &mut session)
533 .await
534 .context("Failed to download messages")?;
535 session
536 .fetch_metadata(ctx)
537 .await
538 .context("Failed to fetch metadata")?;
539 session
540 .register_token(ctx)
541 .await
542 .context("Failed to register push token")?;
543
544 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
545 Ok(session)
546}
547
548async fn fetch_idle(
554 ctx: &Context,
555 connection: &mut Imap,
556 mut session: Session,
557 folder_meaning: FolderMeaning,
558) -> Result<Session> {
559 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
560 else {
561 connection.connectivity.set_not_configured(ctx).await;
565 connection.idle_interrupt_receiver.recv().await.ok();
566 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
567 };
568
569 if folder_config == Config::ConfiguredInboxFolder {
570 let mvbox;
571 let syncbox = match ctx.should_move_sync_msgs().await? {
572 false => &watch_folder,
573 true => {
574 mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
575 mvbox.as_deref().unwrap_or(&watch_folder)
576 }
577 };
578 session
579 .send_sync_msgs(ctx, syncbox)
580 .await
581 .context("fetch_idle: send_sync_msgs")
582 .log_err(ctx)
583 .ok();
584
585 session
586 .store_seen_flags_on_imap(ctx)
587 .await
588 .context("store_seen_flags_on_imap")?;
589 }
590
591 if !ctx.should_delete_to_trash().await?
592 || ctx
593 .get_config(Config::ConfiguredTrashFolder)
594 .await?
595 .is_some()
596 {
597 connection
599 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
600 .await
601 .context("fetch_move_delete")?;
602
603 delete_expired_imap_messages(ctx)
608 .await
609 .context("delete_expired_imap_messages")?;
610 } else if folder_config == Config::ConfiguredInboxFolder {
611 ctx.last_full_folder_scan.lock().await.take();
612 }
613
614 if folder_config == Config::ConfiguredInboxFolder {
619 match connection
621 .scan_folders(ctx, &mut session)
622 .await
623 .context("scan_folders")
624 {
625 Err(err) => {
626 warn!(ctx, "{:#}", err);
629 }
630 Ok(true) => {
631 connection
638 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
639 .await
640 .context("fetch_move_delete after scan_folders")?;
641 }
642 Ok(false) => {}
643 }
644 }
645
646 session
648 .sync_seen_flags(ctx, &watch_folder)
649 .await
650 .context("sync_seen_flags")
651 .log_err(ctx)
652 .ok();
653
654 connection.connectivity.set_idle(ctx).await;
655
656 ctx.emit_event(EventType::ImapInboxIdle);
657
658 if !session.can_idle() {
659 info!(
660 ctx,
661 "IMAP session does not support IDLE, going to fake idle."
662 );
663 connection.fake_idle(ctx, watch_folder).await?;
664 return Ok(session);
665 }
666
667 if ctx
668 .get_config_bool(Config::DisableIdle)
669 .await
670 .context("Failed to get disable_idle config")
671 .log_err(ctx)
672 .unwrap_or_default()
673 {
674 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
675 connection.fake_idle(ctx, watch_folder).await?;
676 return Ok(session);
677 }
678
679 info!(
680 ctx,
681 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
682 );
683 let session = session
684 .idle(
685 ctx,
686 connection.idle_interrupt_receiver.clone(),
687 &watch_folder,
688 )
689 .await
690 .context("idle")?;
691
692 Ok(session)
693}
694
695async fn simple_imap_loop(
696 ctx: Context,
697 started: oneshot::Sender<()>,
698 inbox_handlers: ImapConnectionHandlers,
699 folder_meaning: FolderMeaning,
700) {
701 use futures::future::FutureExt;
702
703 info!(ctx, "Starting simple loop for {folder_meaning}.");
704 let ImapConnectionHandlers {
705 mut connection,
706 stop_receiver,
707 } = inbox_handlers;
708
709 let ctx1 = ctx.clone();
710
711 let fut = async move {
712 let ctx = ctx1;
713 if let Err(()) = started.send(()) {
714 warn!(
715 ctx,
716 "Simple imap loop for {folder_meaning}, missing started receiver."
717 );
718 return;
719 }
720
721 let mut old_session: Option<Session> = None;
722 loop {
723 let session = if let Some(session) = old_session.take() {
724 session
725 } else {
726 info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
727 match connection.prepare(&ctx).await {
728 Err(err) => {
729 warn!(
730 ctx,
731 "Failed to prepare {folder_meaning} connection: {err:#}."
732 );
733 continue;
734 }
735 Ok(session) => session,
736 }
737 };
738
739 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
740 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
741 Ok(session) => {
742 info!(
743 ctx,
744 "IMAP loop iteration for {folder_meaning} finished, keeping the session"
745 );
746 old_session = Some(session);
747 }
748 }
749 }
750 };
751
752 stop_receiver
753 .recv()
754 .map(|_| {
755 info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
756 })
757 .race(fut)
758 .await;
759}
760
761async fn smtp_loop(
762 ctx: Context,
763 started: oneshot::Sender<()>,
764 smtp_handlers: SmtpConnectionHandlers,
765) {
766 use futures::future::FutureExt;
767
768 info!(ctx, "Starting SMTP loop.");
769 let SmtpConnectionHandlers {
770 mut connection,
771 stop_receiver,
772 idle_interrupt_receiver,
773 } = smtp_handlers;
774
775 let ctx1 = ctx.clone();
776 let fut = async move {
777 let ctx = ctx1;
778 if let Err(()) = started.send(()) {
779 warn!(&ctx, "SMTP loop, missing started receiver.");
780 return;
781 }
782
783 let mut timeout = None;
784 loop {
785 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
786 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
787 timeout = Some(timeout.unwrap_or(30));
788 } else {
789 timeout = None;
790 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
791 if !duration_until_can_send.is_zero() {
792 info!(
793 ctx,
794 "smtp got rate limited, waiting for {} until can send again",
795 duration_to_str(duration_until_can_send)
796 );
797 tokio::time::sleep(duration_until_can_send).await;
798 continue;
799 }
800 }
801
802 info!(ctx, "SMTP fake idle started.");
804 match &connection.last_send_error {
805 None => connection.connectivity.set_idle(&ctx).await,
806 Some(err) => connection.connectivity.set_err(&ctx, err).await,
807 }
808
809 if let Some(t) = timeout {
814 let now = tools::Time::now();
815 info!(
816 ctx,
817 "SMTP has messages to retry, planning to retry {t} seconds later."
818 );
819 let duration = std::time::Duration::from_secs(t);
820 tokio::time::timeout(duration, async {
821 idle_interrupt_receiver.recv().await.unwrap_or_default()
822 })
823 .await
824 .unwrap_or_default();
825 let slept = time_elapsed(&now).as_secs();
826 timeout = Some(cmp::max(
827 t,
828 slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
829 ));
830 } else {
831 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
832 idle_interrupt_receiver.recv().await.unwrap_or_default();
833 };
834
835 info!(ctx, "SMTP fake idle interrupted.")
836 }
837 };
838
839 stop_receiver
840 .recv()
841 .map(|_| {
842 info!(ctx, "Shutting down SMTP loop.");
843 })
844 .race(fut)
845 .await;
846}
847
848impl Scheduler {
849 pub async fn start(ctx: &Context) -> Result<Self> {
851 let (smtp, smtp_handlers) = SmtpConnectionState::new();
852
853 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
854 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
855 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
856
857 let mut oboxes = Vec::new();
858 let mut start_recvs = Vec::new();
859
860 let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
861 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
862 let handle = {
863 let ctx = ctx.clone();
864 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
865 };
866 let inbox = SchedBox {
867 meaning: FolderMeaning::Inbox,
868 conn_state,
869 handle,
870 };
871 start_recvs.push(inbox_start_recv);
872
873 for (meaning, should_watch) in [
874 (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
875 (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
876 ] {
877 if should_watch? {
878 let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
879 let (start_send, start_recv) = oneshot::channel();
880 let ctx = ctx.clone();
881 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
882 oboxes.push(SchedBox {
883 meaning,
884 conn_state,
885 handle,
886 });
887 start_recvs.push(start_recv);
888 }
889 }
890
891 let smtp_handle = {
892 let ctx = ctx.clone();
893 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
894 };
895 start_recvs.push(smtp_start_recv);
896
897 let ephemeral_handle = {
898 let ctx = ctx.clone();
899 task::spawn(async move {
900 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
901 })
902 };
903
904 let location_handle = {
905 let ctx = ctx.clone();
906 task::spawn(async move {
907 location::location_loop(&ctx, location_interrupt_recv).await;
908 })
909 };
910
911 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
912
913 let res = Self {
914 inbox,
915 oboxes,
916 smtp,
917 smtp_handle,
918 ephemeral_handle,
919 ephemeral_interrupt_send,
920 location_handle,
921 location_interrupt_send,
922 recently_seen_loop,
923 };
924
925 if let Err(err) = try_join_all(start_recvs).await {
927 bail!("failed to start scheduler: {}", err);
928 }
929
930 info!(ctx, "scheduler is running");
931 Ok(res)
932 }
933
934 fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
935 once(&self.inbox).chain(self.oboxes.iter())
936 }
937
938 fn maybe_network(&self) {
939 for b in self.boxes() {
940 b.conn_state.interrupt();
941 }
942 self.interrupt_smtp();
943 }
944
945 fn maybe_network_lost(&self) {
946 for b in self.boxes() {
947 b.conn_state.interrupt();
948 }
949 self.interrupt_smtp();
950 }
951
952 fn interrupt_inbox(&self) {
953 self.inbox.conn_state.interrupt();
954 }
955
956 fn interrupt_oboxes(&self) {
957 for b in &self.oboxes {
958 b.conn_state.interrupt();
959 }
960 }
961
962 fn interrupt_smtp(&self) {
963 self.smtp.interrupt();
964 }
965
966 fn interrupt_ephemeral_task(&self) {
967 self.ephemeral_interrupt_send.try_send(()).ok();
968 }
969
970 fn interrupt_location(&self) {
971 self.location_interrupt_send.try_send(()).ok();
972 }
973
974 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
975 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
976 }
977
978 pub(crate) async fn stop(self, context: &Context) {
983 for b in self.boxes() {
985 b.conn_state.stop().await.log_err(context).ok();
986 }
987 self.smtp.stop().await.log_err(context).ok();
988
989 let timeout_duration = std::time::Duration::from_secs(30);
991 for b in once(self.inbox).chain(self.oboxes) {
992 tokio::time::timeout(timeout_duration, b.handle)
993 .await
994 .log_err(context)
995 .ok();
996 }
997 tokio::time::timeout(timeout_duration, self.smtp_handle)
998 .await
999 .log_err(context)
1000 .ok();
1001
1002 self.ephemeral_handle.abort();
1007 self.ephemeral_handle.await.ok();
1008 self.location_handle.abort();
1009 self.location_handle.await.ok();
1010 self.recently_seen_loop.abort().await;
1011 }
1012}
1013
1014#[derive(Debug)]
1016struct ConnectionState {
1017 stop_sender: Sender<()>,
1019 idle_interrupt_sender: Sender<()>,
1021 connectivity: ConnectivityStore,
1023}
1024
1025impl ConnectionState {
1026 async fn stop(&self) -> Result<()> {
1028 self.stop_sender
1030 .send(())
1031 .await
1032 .context("failed to stop, missing receiver")?;
1033 Ok(())
1034 }
1035
1036 fn interrupt(&self) {
1037 self.idle_interrupt_sender.try_send(()).ok();
1039 }
1040}
1041
1042#[derive(Debug)]
1043pub(crate) struct SmtpConnectionState {
1044 state: ConnectionState,
1045}
1046
1047impl SmtpConnectionState {
1048 fn new() -> (Self, SmtpConnectionHandlers) {
1049 let (stop_sender, stop_receiver) = channel::bounded(1);
1050 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1051
1052 let handlers = SmtpConnectionHandlers {
1053 connection: Smtp::new(),
1054 stop_receiver,
1055 idle_interrupt_receiver,
1056 };
1057
1058 let state = ConnectionState {
1059 stop_sender,
1060 idle_interrupt_sender,
1061 connectivity: handlers.connection.connectivity.clone(),
1062 };
1063
1064 let conn = SmtpConnectionState { state };
1065
1066 (conn, handlers)
1067 }
1068
1069 fn interrupt(&self) {
1071 self.state.interrupt();
1072 }
1073
1074 async fn stop(&self) -> Result<()> {
1076 self.state.stop().await?;
1077 Ok(())
1078 }
1079}
1080
1081struct SmtpConnectionHandlers {
1082 connection: Smtp,
1083 stop_receiver: Receiver<()>,
1084 idle_interrupt_receiver: Receiver<()>,
1085}
1086
1087#[derive(Debug)]
1088pub(crate) struct ImapConnectionState {
1089 state: ConnectionState,
1090}
1091
1092impl ImapConnectionState {
1093 async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1095 let (stop_sender, stop_receiver) = channel::bounded(1);
1096 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1097
1098 let handlers = ImapConnectionHandlers {
1099 connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1100 stop_receiver,
1101 };
1102
1103 let state = ConnectionState {
1104 stop_sender,
1105 idle_interrupt_sender,
1106 connectivity: handlers.connection.connectivity.clone(),
1107 };
1108
1109 let conn = ImapConnectionState { state };
1110
1111 Ok((conn, handlers))
1112 }
1113
1114 fn interrupt(&self) {
1116 self.state.interrupt();
1117 }
1118
1119 async fn stop(&self) -> Result<()> {
1121 self.state.stop().await?;
1122 Ok(())
1123 }
1124}
1125
1126#[derive(Debug)]
1127struct ImapConnectionHandlers {
1128 connection: Imap,
1129 stop_receiver: Receiver<()>,
1130}