1use std::cmp;
2use std::iter::{self, once};
3use std::num::NonZeroUsize;
4use std::sync::atomic::Ordering;
5
6use anyhow::{bail, Context as _, Error, Result};
7use async_channel::{self as channel, Receiver, Sender};
8use futures::future::try_join_all;
9use futures_lite::FutureExt;
10use rand::Rng;
11use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
12use tokio::task;
13
14use self::connectivity::ConnectivityStore;
15use crate::config::{self, Config};
16use crate::contact::{ContactId, RecentlySeenLoop};
17use crate::context::Context;
18use crate::download::{download_msg, DownloadState};
19use crate::ephemeral::{self, delete_expired_imap_messages};
20use crate::events::EventType;
21use crate::imap::{session::Session, FolderMeaning, Imap};
22use crate::location;
23use crate::log::LogExt;
24use crate::message::MsgId;
25use crate::smtp::{send_smtp_messages, Smtp};
26use crate::sql;
27use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
28
29pub(crate) mod connectivity;
30
31#[derive(Debug, Default)]
37pub(crate) struct SchedulerState {
38 inner: RwLock<InnerSchedulerState>,
39}
40
41impl SchedulerState {
42 pub(crate) fn new() -> Self {
43 Default::default()
44 }
45
46 pub(crate) async fn is_running(&self) -> bool {
48 let inner = self.inner.read().await;
49 matches!(*inner, InnerSchedulerState::Started(_))
50 }
51
52 pub(crate) async fn start(&self, context: Context) {
54 let mut inner = self.inner.write().await;
55 match *inner {
56 InnerSchedulerState::Started(_) => (),
57 InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
58 InnerSchedulerState::Paused {
59 ref mut started, ..
60 } => *started = true,
61 }
62 }
63
64 async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
66 info!(context, "starting IO");
67
68 context.new_msgs_notify.notify_one();
71
72 let ctx = context.clone();
73 match Scheduler::start(&context).await {
74 Ok(scheduler) => {
75 *inner = InnerSchedulerState::Started(scheduler);
76 context.emit_event(EventType::ConnectivityChanged);
77 }
78 Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
79 }
80 }
81
82 pub(crate) async fn stop(&self, context: &Context) {
84 let mut inner = self.inner.write().await;
85 match *inner {
86 InnerSchedulerState::Started(_) => {
87 Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
88 }
89 InnerSchedulerState::Stopped => (),
90 InnerSchedulerState::Paused {
91 ref mut started, ..
92 } => *started = false,
93 }
94 }
95
96 async fn do_stop(
98 mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
99 context: &Context,
100 new_state: InnerSchedulerState,
101 ) {
102 info!(context, "stopping IO");
108
109 context.new_msgs_notify.notify_one();
112
113 let debug_logging = context
114 .debug_logging
115 .write()
116 .expect("RwLock is poisoned")
117 .take();
118 if let Some(debug_logging) = debug_logging {
119 debug_logging.loop_handle.abort();
120 debug_logging.loop_handle.await.ok();
121 }
122 let prev_state = std::mem::replace(&mut *inner, new_state);
123 context.emit_event(EventType::ConnectivityChanged);
124 match prev_state {
125 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
126 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
127 }
128 }
129
130 pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
139 {
140 let mut inner = self.inner.write().await;
141 match *inner {
142 InnerSchedulerState::Started(_) => {
143 let new_state = InnerSchedulerState::Paused {
144 started: true,
145 pause_guards_count: NonZeroUsize::new(1).unwrap(),
146 };
147 Self::do_stop(inner, &context, new_state).await;
148 }
149 InnerSchedulerState::Stopped => {
150 *inner = InnerSchedulerState::Paused {
151 started: false,
152 pause_guards_count: NonZeroUsize::new(1).unwrap(),
153 };
154 }
155 InnerSchedulerState::Paused {
156 ref mut pause_guards_count,
157 ..
158 } => {
159 *pause_guards_count = pause_guards_count
160 .checked_add(1)
161 .ok_or_else(|| Error::msg("Too many pause guards active"))?
162 }
163 }
164 }
165
166 let (tx, rx) = oneshot::channel();
167 tokio::spawn(async move {
168 rx.await.ok();
169 let mut inner = context.scheduler.inner.write().await;
170 match *inner {
171 InnerSchedulerState::Started(_) => {
172 warn!(&context, "IoPausedGuard resume: started instead of paused");
173 }
174 InnerSchedulerState::Stopped => {
175 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
176 }
177 InnerSchedulerState::Paused {
178 ref started,
179 ref mut pause_guards_count,
180 } => {
181 if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
182 match *started {
183 true => SchedulerState::do_start(inner, context.clone()).await,
184 false => *inner = InnerSchedulerState::Stopped,
185 }
186 } else {
187 let new_count = pause_guards_count.get() - 1;
188 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
190 }
191 }
192 }
193 });
194 Ok(IoPausedGuard { sender: Some(tx) })
195 }
196
197 pub(crate) async fn restart(&self, context: &Context) {
199 info!(context, "restarting IO");
200 if self.is_running().await {
201 self.stop(context).await;
202 self.start(context.clone()).await;
203 }
204 }
205
206 pub(crate) async fn maybe_network(&self) {
208 let inner = self.inner.read().await;
209 let (inbox, oboxes) = match *inner {
210 InnerSchedulerState::Started(ref scheduler) => {
211 scheduler.maybe_network();
212 let inbox = scheduler.inbox.conn_state.state.connectivity.clone();
213 let oboxes = scheduler
214 .oboxes
215 .iter()
216 .map(|b| b.conn_state.state.connectivity.clone())
217 .collect::<Vec<_>>();
218 (inbox, oboxes)
219 }
220 _ => return,
221 };
222 drop(inner);
223 connectivity::idle_interrupted(inbox, oboxes).await;
224 }
225
226 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
228 let inner = self.inner.read().await;
229 let stores = match *inner {
230 InnerSchedulerState::Started(ref scheduler) => {
231 scheduler.maybe_network_lost();
232 scheduler
233 .boxes()
234 .map(|b| b.conn_state.state.connectivity.clone())
235 .collect()
236 }
237 _ => return,
238 };
239 drop(inner);
240 connectivity::maybe_network_lost(context, stores).await;
241 }
242
243 pub(crate) async fn interrupt_inbox(&self) {
244 let inner = self.inner.read().await;
245 if let InnerSchedulerState::Started(ref scheduler) = *inner {
246 scheduler.interrupt_inbox();
247 }
248 }
249
250 pub(crate) async fn interrupt_oboxes(&self) {
252 let inner = self.inner.read().await;
253 if let InnerSchedulerState::Started(ref scheduler) = *inner {
254 scheduler.interrupt_oboxes();
255 }
256 }
257
258 pub(crate) async fn interrupt_smtp(&self) {
259 let inner = self.inner.read().await;
260 if let InnerSchedulerState::Started(ref scheduler) = *inner {
261 scheduler.interrupt_smtp();
262 }
263 }
264
265 pub(crate) async fn interrupt_ephemeral_task(&self) {
266 let inner = self.inner.read().await;
267 if let InnerSchedulerState::Started(ref scheduler) = *inner {
268 scheduler.interrupt_ephemeral_task();
269 }
270 }
271
272 pub(crate) async fn interrupt_location(&self) {
273 let inner = self.inner.read().await;
274 if let InnerSchedulerState::Started(ref scheduler) = *inner {
275 scheduler.interrupt_location();
276 }
277 }
278
279 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
280 let inner = self.inner.read().await;
281 if let InnerSchedulerState::Started(ref scheduler) = *inner {
282 scheduler.interrupt_recently_seen(contact_id, timestamp);
283 }
284 }
285}
286
287#[derive(Debug, Default)]
288enum InnerSchedulerState {
289 Started(Scheduler),
290 #[default]
291 Stopped,
292 Paused {
293 started: bool,
294 pause_guards_count: NonZeroUsize,
295 },
296}
297
298#[derive(Default, Debug)]
303pub(crate) struct IoPausedGuard {
304 sender: Option<oneshot::Sender<()>>,
305}
306
307impl Drop for IoPausedGuard {
308 fn drop(&mut self) {
309 if let Some(sender) = self.sender.take() {
310 sender.send(()).ok();
312 }
313 }
314}
315
316#[derive(Debug)]
317struct SchedBox {
318 meaning: FolderMeaning,
319 conn_state: ImapConnectionState,
320
321 handle: task::JoinHandle<()>,
323}
324
325#[derive(Debug)]
327pub(crate) struct Scheduler {
328 inbox: SchedBox,
329 oboxes: Vec<SchedBox>,
331 smtp: SmtpConnectionState,
332 smtp_handle: task::JoinHandle<()>,
333 ephemeral_handle: task::JoinHandle<()>,
334 ephemeral_interrupt_send: Sender<()>,
335 location_handle: task::JoinHandle<()>,
336 location_interrupt_send: Sender<()>,
337
338 recently_seen_loop: RecentlySeenLoop,
339}
340
341async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
342 let msg_ids = context
343 .sql
344 .query_map(
345 "SELECT msg_id FROM download",
346 (),
347 |row| {
348 let msg_id: MsgId = row.get(0)?;
349 Ok(msg_id)
350 },
351 |rowids| {
352 rowids
353 .collect::<std::result::Result<Vec<_>, _>>()
354 .map_err(Into::into)
355 },
356 )
357 .await?;
358
359 for msg_id in msg_ids {
360 if let Err(err) = download_msg(context, msg_id, session).await {
361 warn!(context, "Failed to download message {msg_id}: {:#}.", err);
362
363 msg_id
370 .update_download_state(context, DownloadState::Failure)
371 .await?;
372 }
373 context
374 .sql
375 .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
376 .await?;
377 }
378
379 Ok(())
380}
381
382async fn inbox_loop(
383 ctx: Context,
384 started: oneshot::Sender<()>,
385 inbox_handlers: ImapConnectionHandlers,
386) {
387 use futures::future::FutureExt;
388
389 info!(ctx, "starting inbox loop");
390 let ImapConnectionHandlers {
391 mut connection,
392 stop_receiver,
393 } = inbox_handlers;
394
395 let ctx1 = ctx.clone();
396 let fut = async move {
397 let ctx = ctx1;
398 if let Err(()) = started.send(()) {
399 warn!(ctx, "inbox loop, missing started receiver");
400 return;
401 };
402
403 let mut old_session: Option<Session> = None;
404 loop {
405 let session = if let Some(session) = old_session.take() {
406 session
407 } else {
408 match connection.prepare(&ctx).await {
409 Err(err) => {
410 warn!(ctx, "Failed to prepare INBOX connection: {:#}.", err);
411 continue;
412 }
413 Ok(session) => session,
414 }
415 };
416
417 match inbox_fetch_idle(&ctx, &mut connection, session).await {
418 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
419 Ok(session) => {
420 old_session = Some(session);
421 }
422 }
423 }
424 };
425
426 stop_receiver
427 .recv()
428 .map(|_| {
429 info!(ctx, "shutting down inbox loop");
430 })
431 .race(fut)
432 .await;
433}
434
435pub async fn convert_folder_meaning(
441 ctx: &Context,
442 folder_meaning: FolderMeaning,
443) -> Result<Option<(Config, String)>> {
444 let folder_config = match folder_meaning.to_config() {
445 Some(c) => c,
446 None => {
447 return Ok(None);
450 }
451 };
452
453 let folder = ctx
454 .get_config(folder_config)
455 .await
456 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
457
458 if let Some(watch_folder) = folder {
459 Ok(Some((folder_config, watch_folder)))
460 } else {
461 Ok(None)
462 }
463}
464
465async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
466 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
467 ctx.set_config_internal(
468 Config::IsChatmail,
469 crate::config::from_bool(session.is_chatmail()),
470 )
471 .await?;
472 }
473
474 if ctx.quota_needs_update(60).await {
476 if let Err(err) = ctx.update_recent_quota(&mut session).await {
477 warn!(ctx, "Failed to update quota: {:#}.", err);
478 }
479 }
480
481 let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
482 if resync_requested {
483 if let Err(err) = session.resync_folders(ctx).await {
484 warn!(ctx, "Failed to resync folders: {:#}.", err);
485 ctx.resync_request.store(true, Ordering::Relaxed);
486 }
487 }
488
489 maybe_add_time_based_warnings(ctx).await;
490
491 match ctx.get_config_i64(Config::LastHousekeeping).await {
492 Ok(last_housekeeping_time) => {
493 let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24);
494 if next_housekeeping_time <= time() {
495 sql::housekeeping(ctx).await.log_err(ctx).ok();
496 }
497 }
498 Err(err) => {
499 warn!(ctx, "Failed to get last housekeeping time: {}", err);
500 }
501 };
502
503 match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
504 Ok(fetched_existing_msgs) => {
505 if !fetched_existing_msgs {
506 if let Err(err) = ctx
511 .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
512 .await
513 {
514 warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
515 }
516
517 if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
518 warn!(ctx, "Failed to fetch existing messages: {:#}", err);
519 }
520 }
521 }
522 Err(err) => {
523 warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
524 }
525 }
526
527 download_msgs(ctx, &mut session)
528 .await
529 .context("Failed to download messages")?;
530 session
531 .fetch_metadata(ctx)
532 .await
533 .context("Failed to fetch metadata")?;
534 session
535 .register_token(ctx)
536 .await
537 .context("Failed to register push token")?;
538
539 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
540 Ok(session)
541}
542
543async fn fetch_idle(
549 ctx: &Context,
550 connection: &mut Imap,
551 mut session: Session,
552 folder_meaning: FolderMeaning,
553) -> Result<Session> {
554 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
555 else {
556 connection.connectivity.set_not_configured(ctx).await;
560 connection.idle_interrupt_receiver.recv().await.ok();
561 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
562 };
563
564 if folder_config == Config::ConfiguredInboxFolder {
565 let mvbox;
566 let syncbox = match ctx.should_move_sync_msgs().await? {
567 false => &watch_folder,
568 true => {
569 mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
570 mvbox.as_deref().unwrap_or(&watch_folder)
571 }
572 };
573 session
574 .send_sync_msgs(ctx, syncbox)
575 .await
576 .context("fetch_idle: send_sync_msgs")
577 .log_err(ctx)
578 .ok();
579
580 session
581 .store_seen_flags_on_imap(ctx)
582 .await
583 .context("store_seen_flags_on_imap")?;
584 }
585
586 if !ctx.should_delete_to_trash().await?
587 || ctx
588 .get_config(Config::ConfiguredTrashFolder)
589 .await?
590 .is_some()
591 {
592 connection
594 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
595 .await
596 .context("fetch_move_delete")?;
597
598 delete_expired_imap_messages(ctx)
603 .await
604 .context("delete_expired_imap_messages")?;
605 } else if folder_config == Config::ConfiguredInboxFolder {
606 ctx.last_full_folder_scan.lock().await.take();
607 }
608
609 if folder_config == Config::ConfiguredInboxFolder {
614 match connection
616 .scan_folders(ctx, &mut session)
617 .await
618 .context("scan_folders")
619 {
620 Err(err) => {
621 warn!(ctx, "{:#}", err);
624 }
625 Ok(true) => {
626 connection
633 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
634 .await
635 .context("fetch_move_delete after scan_folders")?;
636 }
637 Ok(false) => {}
638 }
639 }
640
641 session
643 .sync_seen_flags(ctx, &watch_folder)
644 .await
645 .context("sync_seen_flags")
646 .log_err(ctx)
647 .ok();
648
649 connection.connectivity.set_idle(ctx).await;
650
651 ctx.emit_event(EventType::ImapInboxIdle);
652
653 if !session.can_idle() {
654 info!(
655 ctx,
656 "IMAP session does not support IDLE, going to fake idle."
657 );
658 connection.fake_idle(ctx, watch_folder).await?;
659 return Ok(session);
660 }
661
662 if ctx
663 .get_config_bool(Config::DisableIdle)
664 .await
665 .context("Failed to get disable_idle config")
666 .log_err(ctx)
667 .unwrap_or_default()
668 {
669 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
670 connection.fake_idle(ctx, watch_folder).await?;
671 return Ok(session);
672 }
673
674 info!(
675 ctx,
676 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
677 );
678 let session = session
679 .idle(
680 ctx,
681 connection.idle_interrupt_receiver.clone(),
682 &watch_folder,
683 )
684 .await
685 .context("idle")?;
686
687 Ok(session)
688}
689
690async fn simple_imap_loop(
691 ctx: Context,
692 started: oneshot::Sender<()>,
693 inbox_handlers: ImapConnectionHandlers,
694 folder_meaning: FolderMeaning,
695) {
696 use futures::future::FutureExt;
697
698 info!(ctx, "starting simple loop for {}", folder_meaning);
699 let ImapConnectionHandlers {
700 mut connection,
701 stop_receiver,
702 } = inbox_handlers;
703
704 let ctx1 = ctx.clone();
705
706 let fut = async move {
707 let ctx = ctx1;
708 if let Err(()) = started.send(()) {
709 warn!(&ctx, "simple imap loop, missing started receiver");
710 return;
711 }
712
713 let mut old_session: Option<Session> = None;
714 loop {
715 let session = if let Some(session) = old_session.take() {
716 session
717 } else {
718 match connection.prepare(&ctx).await {
719 Err(err) => {
720 warn!(
721 ctx,
722 "Failed to prepare {folder_meaning} connection: {err:#}."
723 );
724 continue;
725 }
726 Ok(session) => session,
727 }
728 };
729
730 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
731 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
732 Ok(session) => {
733 old_session = Some(session);
734 }
735 }
736 }
737 };
738
739 stop_receiver
740 .recv()
741 .map(|_| {
742 info!(ctx, "shutting down simple loop");
743 })
744 .race(fut)
745 .await;
746}
747
748async fn smtp_loop(
749 ctx: Context,
750 started: oneshot::Sender<()>,
751 smtp_handlers: SmtpConnectionHandlers,
752) {
753 use futures::future::FutureExt;
754
755 info!(ctx, "Starting SMTP loop.");
756 let SmtpConnectionHandlers {
757 mut connection,
758 stop_receiver,
759 idle_interrupt_receiver,
760 } = smtp_handlers;
761
762 let ctx1 = ctx.clone();
763 let fut = async move {
764 let ctx = ctx1;
765 if let Err(()) = started.send(()) {
766 warn!(&ctx, "SMTP loop, missing started receiver.");
767 return;
768 }
769
770 let mut timeout = None;
771 loop {
772 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
773 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
774 timeout = Some(timeout.unwrap_or(30));
775 } else {
776 timeout = None;
777 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
778 if !duration_until_can_send.is_zero() {
779 info!(
780 ctx,
781 "smtp got rate limited, waiting for {} until can send again",
782 duration_to_str(duration_until_can_send)
783 );
784 tokio::time::sleep(duration_until_can_send).await;
785 continue;
786 }
787 }
788
789 info!(ctx, "SMTP fake idle started.");
791 match &connection.last_send_error {
792 None => connection.connectivity.set_idle(&ctx).await,
793 Some(err) => connection.connectivity.set_err(&ctx, err).await,
794 }
795
796 if let Some(t) = timeout {
801 let now = tools::Time::now();
802 info!(
803 ctx,
804 "SMTP has messages to retry, planning to retry {t} seconds later."
805 );
806 let duration = std::time::Duration::from_secs(t);
807 tokio::time::timeout(duration, async {
808 idle_interrupt_receiver.recv().await.unwrap_or_default()
809 })
810 .await
811 .unwrap_or_default();
812 let slept = time_elapsed(&now).as_secs();
813 timeout = Some(cmp::max(
814 t,
815 slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
816 ));
817 } else {
818 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
819 idle_interrupt_receiver.recv().await.unwrap_or_default();
820 };
821
822 info!(ctx, "SMTP fake idle interrupted.")
823 }
824 };
825
826 stop_receiver
827 .recv()
828 .map(|_| {
829 info!(ctx, "Shutting down SMTP loop.");
830 })
831 .race(fut)
832 .await;
833}
834
835impl Scheduler {
836 pub async fn start(ctx: &Context) -> Result<Self> {
838 let (smtp, smtp_handlers) = SmtpConnectionState::new();
839
840 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
841 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
842 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
843
844 let mut oboxes = Vec::new();
845 let mut start_recvs = Vec::new();
846
847 let (conn_state, inbox_handlers) = ImapConnectionState::new(ctx).await?;
848 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
849 let handle = {
850 let ctx = ctx.clone();
851 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
852 };
853 let inbox = SchedBox {
854 meaning: FolderMeaning::Inbox,
855 conn_state,
856 handle,
857 };
858 start_recvs.push(inbox_start_recv);
859
860 for (meaning, should_watch) in [
861 (FolderMeaning::Mvbox, ctx.should_watch_mvbox().await),
862 (FolderMeaning::Sent, ctx.should_watch_sentbox().await),
863 ] {
864 if should_watch? {
865 let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
866 let (start_send, start_recv) = oneshot::channel();
867 let ctx = ctx.clone();
868 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
869 oboxes.push(SchedBox {
870 meaning,
871 conn_state,
872 handle,
873 });
874 start_recvs.push(start_recv);
875 }
876 }
877
878 let smtp_handle = {
879 let ctx = ctx.clone();
880 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
881 };
882 start_recvs.push(smtp_start_recv);
883
884 let ephemeral_handle = {
885 let ctx = ctx.clone();
886 task::spawn(async move {
887 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
888 })
889 };
890
891 let location_handle = {
892 let ctx = ctx.clone();
893 task::spawn(async move {
894 location::location_loop(&ctx, location_interrupt_recv).await;
895 })
896 };
897
898 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
899
900 let res = Self {
901 inbox,
902 oboxes,
903 smtp,
904 smtp_handle,
905 ephemeral_handle,
906 ephemeral_interrupt_send,
907 location_handle,
908 location_interrupt_send,
909 recently_seen_loop,
910 };
911
912 if let Err(err) = try_join_all(start_recvs).await {
914 bail!("failed to start scheduler: {}", err);
915 }
916
917 info!(ctx, "scheduler is running");
918 Ok(res)
919 }
920
921 fn boxes(&self) -> iter::Chain<iter::Once<&SchedBox>, std::slice::Iter<'_, SchedBox>> {
922 once(&self.inbox).chain(self.oboxes.iter())
923 }
924
925 fn maybe_network(&self) {
926 for b in self.boxes() {
927 b.conn_state.interrupt();
928 }
929 self.interrupt_smtp();
930 }
931
932 fn maybe_network_lost(&self) {
933 for b in self.boxes() {
934 b.conn_state.interrupt();
935 }
936 self.interrupt_smtp();
937 }
938
939 fn interrupt_inbox(&self) {
940 self.inbox.conn_state.interrupt();
941 }
942
943 fn interrupt_oboxes(&self) {
944 for b in &self.oboxes {
945 b.conn_state.interrupt();
946 }
947 }
948
949 fn interrupt_smtp(&self) {
950 self.smtp.interrupt();
951 }
952
953 fn interrupt_ephemeral_task(&self) {
954 self.ephemeral_interrupt_send.try_send(()).ok();
955 }
956
957 fn interrupt_location(&self) {
958 self.location_interrupt_send.try_send(()).ok();
959 }
960
961 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
962 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
963 }
964
965 pub(crate) async fn stop(self, context: &Context) {
970 for b in self.boxes() {
972 b.conn_state.stop().await.log_err(context).ok();
973 }
974 self.smtp.stop().await.log_err(context).ok();
975
976 let timeout_duration = std::time::Duration::from_secs(30);
978 for b in once(self.inbox).chain(self.oboxes) {
979 tokio::time::timeout(timeout_duration, b.handle)
980 .await
981 .log_err(context)
982 .ok();
983 }
984 tokio::time::timeout(timeout_duration, self.smtp_handle)
985 .await
986 .log_err(context)
987 .ok();
988
989 self.ephemeral_handle.abort();
994 self.ephemeral_handle.await.ok();
995 self.location_handle.abort();
996 self.location_handle.await.ok();
997 self.recently_seen_loop.abort().await;
998 }
999}
1000
1001#[derive(Debug)]
1003struct ConnectionState {
1004 stop_sender: Sender<()>,
1006 idle_interrupt_sender: Sender<()>,
1008 connectivity: ConnectivityStore,
1010}
1011
1012impl ConnectionState {
1013 async fn stop(&self) -> Result<()> {
1015 self.stop_sender
1017 .send(())
1018 .await
1019 .context("failed to stop, missing receiver")?;
1020 Ok(())
1021 }
1022
1023 fn interrupt(&self) {
1024 self.idle_interrupt_sender.try_send(()).ok();
1026 }
1027}
1028
1029#[derive(Debug)]
1030pub(crate) struct SmtpConnectionState {
1031 state: ConnectionState,
1032}
1033
1034impl SmtpConnectionState {
1035 fn new() -> (Self, SmtpConnectionHandlers) {
1036 let (stop_sender, stop_receiver) = channel::bounded(1);
1037 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1038
1039 let handlers = SmtpConnectionHandlers {
1040 connection: Smtp::new(),
1041 stop_receiver,
1042 idle_interrupt_receiver,
1043 };
1044
1045 let state = ConnectionState {
1046 stop_sender,
1047 idle_interrupt_sender,
1048 connectivity: handlers.connection.connectivity.clone(),
1049 };
1050
1051 let conn = SmtpConnectionState { state };
1052
1053 (conn, handlers)
1054 }
1055
1056 fn interrupt(&self) {
1058 self.state.interrupt();
1059 }
1060
1061 async fn stop(&self) -> Result<()> {
1063 self.state.stop().await?;
1064 Ok(())
1065 }
1066}
1067
1068struct SmtpConnectionHandlers {
1069 connection: Smtp,
1070 stop_receiver: Receiver<()>,
1071 idle_interrupt_receiver: Receiver<()>,
1072}
1073
1074#[derive(Debug)]
1075pub(crate) struct ImapConnectionState {
1076 state: ConnectionState,
1077}
1078
1079impl ImapConnectionState {
1080 async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1082 let (stop_sender, stop_receiver) = channel::bounded(1);
1083 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1084
1085 let handlers = ImapConnectionHandlers {
1086 connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1087 stop_receiver,
1088 };
1089
1090 let state = ConnectionState {
1091 stop_sender,
1092 idle_interrupt_sender,
1093 connectivity: handlers.connection.connectivity.clone(),
1094 };
1095
1096 let conn = ImapConnectionState { state };
1097
1098 Ok((conn, handlers))
1099 }
1100
1101 fn interrupt(&self) {
1103 self.state.interrupt();
1104 }
1105
1106 async fn stop(&self) -> Result<()> {
1108 self.state.stop().await?;
1109 Ok(())
1110 }
1111}
1112
1113#[derive(Debug)]
1114struct ImapConnectionHandlers {
1115 connection: Imap,
1116 stop_receiver: Receiver<()>,
1117}