1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::{self, Config};
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{DownloadState, download_msg};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{FolderMeaning, Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::message::MsgId;
24use crate::smtp::{Smtp, send_smtp_messages};
25use crate::sql;
26use crate::stats::maybe_send_stats;
27use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
28use crate::transport::ConfiguredLoginParam;
29use crate::{constants, stats};
30
31pub(crate) mod connectivity;
32
33#[derive(Debug, Default)]
39pub(crate) struct SchedulerState {
40 inner: RwLock<InnerSchedulerState>,
41}
42
43impl SchedulerState {
44 pub(crate) fn new() -> Self {
45 Default::default()
46 }
47
48 pub(crate) async fn is_running(&self) -> bool {
50 let inner = self.inner.read().await;
51 matches!(*inner, InnerSchedulerState::Started(_))
52 }
53
54 pub(crate) async fn start(&self, context: &Context) {
56 let mut inner = self.inner.write().await;
57 match *inner {
58 InnerSchedulerState::Started(_) => (),
59 InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
60 InnerSchedulerState::Paused {
61 ref mut started, ..
62 } => *started = true,
63 }
64 context.update_connectivities(&inner);
65 }
66
67 async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
69 info!(context, "starting IO");
70
71 context.new_msgs_notify.notify_one();
74
75 match Scheduler::start(context).await {
76 Ok(scheduler) => {
77 *inner = InnerSchedulerState::Started(scheduler);
78 context.emit_event(EventType::ConnectivityChanged);
79 }
80 Err(err) => error!(context, "Failed to start IO: {:#}", err),
81 }
82 }
83
84 pub(crate) async fn stop(&self, context: &Context) {
86 let mut inner = self.inner.write().await;
87 match *inner {
88 InnerSchedulerState::Started(_) => {
89 Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
90 }
91 InnerSchedulerState::Stopped => (),
92 InnerSchedulerState::Paused {
93 ref mut started, ..
94 } => *started = false,
95 }
96 context.update_connectivities(&inner);
97 }
98
99 async fn do_stop(
101 inner: &mut InnerSchedulerState,
102 context: &Context,
103 new_state: InnerSchedulerState,
104 ) {
105 info!(context, "stopping IO");
111
112 context.new_msgs_notify.notify_one();
115
116 let debug_logging = context
117 .debug_logging
118 .write()
119 .expect("RwLock is poisoned")
120 .take();
121 if let Some(debug_logging) = debug_logging {
122 debug_logging.loop_handle.abort();
123 debug_logging.loop_handle.await.ok();
124 }
125 let prev_state = std::mem::replace(inner, new_state);
126 context.emit_event(EventType::ConnectivityChanged);
127 match prev_state {
128 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
129 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
130 }
131 }
132
133 pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
142 {
143 let mut inner = self.inner.write().await;
144 match *inner {
145 InnerSchedulerState::Started(_) => {
146 let new_state = InnerSchedulerState::Paused {
147 started: true,
148 pause_guards_count: NonZeroUsize::MIN,
149 };
150 Self::do_stop(&mut inner, context, new_state).await;
151 }
152 InnerSchedulerState::Stopped => {
153 *inner = InnerSchedulerState::Paused {
154 started: false,
155 pause_guards_count: NonZeroUsize::MIN,
156 };
157 }
158 InnerSchedulerState::Paused {
159 ref mut pause_guards_count,
160 ..
161 } => {
162 *pause_guards_count = pause_guards_count
163 .checked_add(1)
164 .ok_or_else(|| Error::msg("Too many pause guards active"))?
165 }
166 }
167 context.update_connectivities(&inner);
168 }
169
170 let (tx, rx) = oneshot::channel();
171 let context = context.clone();
172 tokio::spawn(async move {
173 rx.await.ok();
174 let mut inner = context.scheduler.inner.write().await;
175 match *inner {
176 InnerSchedulerState::Started(_) => {
177 warn!(&context, "IoPausedGuard resume: started instead of paused");
178 }
179 InnerSchedulerState::Stopped => {
180 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
181 }
182 InnerSchedulerState::Paused {
183 ref started,
184 ref mut pause_guards_count,
185 } => {
186 if *pause_guards_count == NonZeroUsize::MIN {
187 match *started {
188 true => SchedulerState::do_start(&mut inner, &context).await,
189 false => *inner = InnerSchedulerState::Stopped,
190 }
191 } else {
192 let new_count = pause_guards_count.get() - 1;
193 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
195 }
196 }
197 }
198 context.update_connectivities(&inner);
199 });
200 Ok(IoPausedGuard { sender: Some(tx) })
201 }
202
203 pub(crate) async fn restart(&self, context: &Context) {
205 info!(context, "restarting IO");
206 if self.is_running().await {
207 self.stop(context).await;
208 self.start(context).await;
209 }
210 }
211
212 pub(crate) async fn maybe_network(&self) {
214 let inner = self.inner.read().await;
215 let (inboxes, oboxes) = match *inner {
216 InnerSchedulerState::Started(ref scheduler) => {
217 scheduler.maybe_network();
218 let inboxes = scheduler
219 .inboxes
220 .iter()
221 .map(|b| b.conn_state.state.connectivity.clone())
222 .collect::<Vec<_>>();
223 let oboxes = scheduler
224 .oboxes
225 .iter()
226 .map(|b| b.conn_state.state.connectivity.clone())
227 .collect::<Vec<_>>();
228 (inboxes, oboxes)
229 }
230 _ => return,
231 };
232 drop(inner);
233 connectivity::idle_interrupted(inboxes, oboxes);
234 }
235
236 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
238 let inner = self.inner.read().await;
239 let stores = match *inner {
240 InnerSchedulerState::Started(ref scheduler) => {
241 scheduler.maybe_network_lost();
242 scheduler
243 .boxes()
244 .map(|b| b.conn_state.state.connectivity.clone())
245 .collect()
246 }
247 _ => return,
248 };
249 drop(inner);
250 connectivity::maybe_network_lost(context, stores);
251 }
252
253 pub(crate) async fn interrupt_inbox(&self) {
254 let inner = self.inner.read().await;
255 if let InnerSchedulerState::Started(ref scheduler) = *inner {
256 scheduler.interrupt_inbox();
257 }
258 }
259
260 pub(crate) async fn interrupt_oboxes(&self) {
262 let inner = self.inner.read().await;
263 if let InnerSchedulerState::Started(ref scheduler) = *inner {
264 scheduler.interrupt_oboxes();
265 }
266 }
267
268 pub(crate) async fn interrupt_smtp(&self) {
269 let inner = self.inner.read().await;
270 if let InnerSchedulerState::Started(ref scheduler) = *inner {
271 scheduler.interrupt_smtp();
272 }
273 }
274
275 pub(crate) async fn interrupt_ephemeral_task(&self) {
276 let inner = self.inner.read().await;
277 if let InnerSchedulerState::Started(ref scheduler) = *inner {
278 scheduler.interrupt_ephemeral_task();
279 }
280 }
281
282 pub(crate) async fn interrupt_location(&self) {
283 let inner = self.inner.read().await;
284 if let InnerSchedulerState::Started(ref scheduler) = *inner {
285 scheduler.interrupt_location();
286 }
287 }
288
289 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
290 let inner = self.inner.read().await;
291 if let InnerSchedulerState::Started(ref scheduler) = *inner {
292 scheduler.interrupt_recently_seen(contact_id, timestamp);
293 }
294 }
295}
296
297#[derive(Debug, Default)]
298pub(crate) enum InnerSchedulerState {
299 Started(Scheduler),
300 #[default]
301 Stopped,
302 Paused {
303 started: bool,
304 pause_guards_count: NonZeroUsize,
305 },
306}
307
308#[derive(Default, Debug)]
313pub(crate) struct IoPausedGuard {
314 sender: Option<oneshot::Sender<()>>,
315}
316
317impl Drop for IoPausedGuard {
318 fn drop(&mut self) {
319 if let Some(sender) = self.sender.take() {
320 sender.send(()).ok();
322 }
323 }
324}
325
326#[derive(Debug)]
327struct SchedBox {
328 host: String,
330 meaning: FolderMeaning,
331 conn_state: ImapConnectionState,
332
333 handle: task::JoinHandle<()>,
335}
336
337#[derive(Debug)]
339pub(crate) struct Scheduler {
340 inboxes: Vec<SchedBox>,
342 oboxes: Vec<SchedBox>,
344 smtp: SmtpConnectionState,
345 smtp_handle: task::JoinHandle<()>,
346 ephemeral_handle: task::JoinHandle<()>,
347 ephemeral_interrupt_send: Sender<()>,
348 location_handle: task::JoinHandle<()>,
349 location_interrupt_send: Sender<()>,
350
351 recently_seen_loop: RecentlySeenLoop,
352}
353
354async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
355 let msg_ids = context
356 .sql
357 .query_map_vec("SELECT msg_id FROM download", (), |row| {
358 let msg_id: MsgId = row.get(0)?;
359 Ok(msg_id)
360 })
361 .await?;
362
363 for msg_id in msg_ids {
364 if let Err(err) = download_msg(context, msg_id, session).await {
365 warn!(context, "Failed to download message {msg_id}: {:#}.", err);
366
367 msg_id
374 .update_download_state(context, DownloadState::Failure)
375 .await?;
376 }
377 context
378 .sql
379 .execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
380 .await?;
381 }
382
383 Ok(())
384}
385
386async fn inbox_loop(
387 ctx: Context,
388 started: oneshot::Sender<()>,
389 inbox_handlers: ImapConnectionHandlers,
390) {
391 use futures::future::FutureExt;
392
393 info!(ctx, "Starting inbox loop.");
394 let ImapConnectionHandlers {
395 mut connection,
396 stop_token,
397 } = inbox_handlers;
398
399 let ctx1 = ctx.clone();
400 let fut = async move {
401 let ctx = ctx1;
402 if let Err(()) = started.send(()) {
403 warn!(ctx, "Inbox loop, missing started receiver.");
404 return;
405 };
406
407 let mut old_session: Option<Session> = None;
408 loop {
409 let session = if let Some(session) = old_session.take() {
410 session
411 } else {
412 info!(ctx, "Preparing new IMAP session for inbox.");
413 match connection.prepare(&ctx).await {
414 Err(err) => {
415 warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
416 continue;
417 }
418 Ok(session) => session,
419 }
420 };
421
422 match inbox_fetch_idle(&ctx, &mut connection, session).await {
423 Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
424 Ok(session) => {
425 info!(
426 ctx,
427 "IMAP loop iteration for inbox finished, keeping the session."
428 );
429 old_session = Some(session);
430 }
431 }
432 }
433 };
434
435 stop_token
436 .cancelled()
437 .map(|_| {
438 info!(ctx, "Shutting down inbox loop.");
439 })
440 .race(fut)
441 .await;
442}
443
444pub async fn convert_folder_meaning(
450 ctx: &Context,
451 folder_meaning: FolderMeaning,
452) -> Result<Option<(Config, String)>> {
453 let folder_config = match folder_meaning.to_config() {
454 Some(c) => c,
455 None => {
456 return Ok(None);
459 }
460 };
461
462 let folder = ctx
463 .get_config(folder_config)
464 .await
465 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
466
467 if let Some(watch_folder) = folder {
468 Ok(Some((folder_config, watch_folder)))
469 } else {
470 Ok(None)
471 }
472}
473
474async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
475 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
476 ctx.set_config_internal(
477 Config::IsChatmail,
478 crate::config::from_bool(session.is_chatmail()),
479 )
480 .await?;
481 }
482
483 if ctx.quota_needs_update(session.transport_id(), 60).await
485 && let Err(err) = ctx.update_recent_quota(&mut session).await
486 {
487 warn!(ctx, "Failed to update quota: {:#}.", err);
488 }
489
490 if let Ok(()) = imap.resync_request_receiver.try_recv()
491 && let Err(err) = session.resync_folders(ctx).await
492 {
493 warn!(ctx, "Failed to resync folders: {:#}.", err);
494 imap.resync_request_sender.try_send(()).ok();
495 }
496
497 maybe_add_time_based_warnings(ctx).await;
498
499 match ctx.get_config_i64(Config::LastHousekeeping).await {
500 Ok(last_housekeeping_time) => {
501 let next_housekeeping_time =
502 last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
503 if next_housekeeping_time <= time() {
504 sql::housekeeping(ctx).await.log_err(ctx).ok();
505 }
506 }
507 Err(err) => {
508 warn!(ctx, "Failed to get last housekeeping time: {}", err);
509 }
510 };
511
512 maybe_send_stats(ctx).await.log_err(ctx).ok();
513 match ctx.get_config_bool(Config::FetchedExistingMsgs).await {
514 Ok(fetched_existing_msgs) => {
515 if !fetched_existing_msgs {
516 if let Err(err) = ctx
521 .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true))
522 .await
523 {
524 warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err);
525 }
526
527 if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await {
528 warn!(ctx, "Failed to fetch existing messages: {:#}", err);
529 }
530 }
531 }
532 Err(err) => {
533 warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err);
534 }
535 }
536
537 download_msgs(ctx, &mut session)
538 .await
539 .context("Failed to download messages")?;
540 session
541 .update_metadata(ctx)
542 .await
543 .context("update_metadata")?;
544 session
545 .register_token(ctx)
546 .await
547 .context("Failed to register push token")?;
548
549 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
550 Ok(session)
551}
552
553async fn fetch_idle(
559 ctx: &Context,
560 connection: &mut Imap,
561 mut session: Session,
562 folder_meaning: FolderMeaning,
563) -> Result<Session> {
564 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
565 else {
566 connection.connectivity.set_not_configured(ctx);
570 connection.idle_interrupt_receiver.recv().await.ok();
571 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
572 };
573
574 if folder_config == Config::ConfiguredInboxFolder {
575 let mvbox;
576 let syncbox = match ctx.should_move_sync_msgs().await? {
577 false => &watch_folder,
578 true => {
579 mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
580 mvbox.as_deref().unwrap_or(&watch_folder)
581 }
582 };
583 if ctx
584 .get_config(Config::ConfiguredAddr)
585 .await?
586 .unwrap_or_default()
587 == connection.addr
588 {
589 session
590 .send_sync_msgs(ctx, syncbox)
591 .await
592 .context("fetch_idle: send_sync_msgs")
593 .log_err(ctx)
594 .ok();
595 }
596
597 session
598 .store_seen_flags_on_imap(ctx)
599 .await
600 .context("store_seen_flags_on_imap")?;
601 }
602
603 if !ctx.should_delete_to_trash().await?
604 || ctx
605 .get_config(Config::ConfiguredTrashFolder)
606 .await?
607 .is_some()
608 {
609 connection
611 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
612 .await
613 .context("fetch_move_delete")?;
614
615 delete_expired_imap_messages(ctx)
620 .await
621 .context("delete_expired_imap_messages")?;
622 } else if folder_config == Config::ConfiguredInboxFolder {
623 session.last_full_folder_scan.lock().await.take();
624 }
625
626 if folder_config == Config::ConfiguredInboxFolder {
631 match connection
633 .scan_folders(ctx, &mut session)
634 .await
635 .context("scan_folders")
636 {
637 Err(err) => {
638 warn!(ctx, "{:#}", err);
641 }
642 Ok(true) => {
643 connection
650 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
651 .await
652 .context("fetch_move_delete after scan_folders")?;
653 }
654 Ok(false) => {}
655 }
656 }
657
658 session
660 .sync_seen_flags(ctx, &watch_folder)
661 .await
662 .context("sync_seen_flags")
663 .log_err(ctx)
664 .ok();
665
666 connection.connectivity.set_idle(ctx);
667
668 ctx.emit_event(EventType::ImapInboxIdle);
669
670 if !session.can_idle() {
671 info!(
672 ctx,
673 "IMAP session does not support IDLE, going to fake idle."
674 );
675 connection.fake_idle(ctx, watch_folder).await?;
676 return Ok(session);
677 }
678
679 if ctx
680 .get_config_bool(Config::DisableIdle)
681 .await
682 .context("Failed to get disable_idle config")
683 .log_err(ctx)
684 .unwrap_or_default()
685 {
686 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
687 connection.fake_idle(ctx, watch_folder).await?;
688 return Ok(session);
689 }
690
691 info!(
692 ctx,
693 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
694 );
695 let session = session
696 .idle(
697 ctx,
698 connection.idle_interrupt_receiver.clone(),
699 &watch_folder,
700 )
701 .await
702 .context("idle")?;
703
704 Ok(session)
705}
706
707async fn simple_imap_loop(
708 ctx: Context,
709 started: oneshot::Sender<()>,
710 inbox_handlers: ImapConnectionHandlers,
711 folder_meaning: FolderMeaning,
712) {
713 use futures::future::FutureExt;
714
715 info!(ctx, "Starting simple loop for {folder_meaning}.");
716 let ImapConnectionHandlers {
717 mut connection,
718 stop_token,
719 } = inbox_handlers;
720
721 let ctx1 = ctx.clone();
722
723 let fut = async move {
724 let ctx = ctx1;
725 if let Err(()) = started.send(()) {
726 warn!(
727 ctx,
728 "Simple imap loop for {folder_meaning}, missing started receiver."
729 );
730 return;
731 }
732
733 let mut old_session: Option<Session> = None;
734 loop {
735 let session = if let Some(session) = old_session.take() {
736 session
737 } else {
738 info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
739 match connection.prepare(&ctx).await {
740 Err(err) => {
741 warn!(
742 ctx,
743 "Failed to prepare {folder_meaning} connection: {err:#}."
744 );
745 continue;
746 }
747 Ok(session) => session,
748 }
749 };
750
751 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
752 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
753 Ok(session) => {
754 info!(
755 ctx,
756 "IMAP loop iteration for {folder_meaning} finished, keeping the session"
757 );
758 old_session = Some(session);
759 }
760 }
761 }
762 };
763
764 stop_token
765 .cancelled()
766 .map(|_| {
767 info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
768 })
769 .race(fut)
770 .await;
771}
772
773async fn smtp_loop(
774 ctx: Context,
775 started: oneshot::Sender<()>,
776 smtp_handlers: SmtpConnectionHandlers,
777) {
778 use futures::future::FutureExt;
779
780 info!(ctx, "Starting SMTP loop.");
781 let SmtpConnectionHandlers {
782 mut connection,
783 stop_token,
784 idle_interrupt_receiver,
785 } = smtp_handlers;
786
787 let ctx1 = ctx.clone();
788 let fut = async move {
789 let ctx = ctx1;
790 if let Err(()) = started.send(()) {
791 warn!(&ctx, "SMTP loop, missing started receiver.");
792 return;
793 }
794
795 let mut timeout = None;
796 loop {
797 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
798 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
799 timeout = Some(timeout.unwrap_or(30));
800 } else {
801 timeout = None;
802 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
803 if !duration_until_can_send.is_zero() {
804 info!(
805 ctx,
806 "smtp got rate limited, waiting for {} until can send again",
807 duration_to_str(duration_until_can_send)
808 );
809 tokio::time::sleep(duration_until_can_send).await;
810 continue;
811 }
812 }
813
814 stats::maybe_update_message_stats(&ctx)
815 .await
816 .log_err(&ctx)
817 .ok();
818
819 info!(ctx, "SMTP fake idle started.");
821 match &connection.last_send_error {
822 None => connection.connectivity.set_idle(&ctx),
823 Some(err) => connection.connectivity.set_err(&ctx, err),
824 }
825
826 if let Some(t) = timeout {
831 let now = tools::Time::now();
832 info!(
833 ctx,
834 "SMTP has messages to retry, planning to retry {t} seconds later."
835 );
836 let duration = std::time::Duration::from_secs(t);
837 tokio::time::timeout(duration, async {
838 idle_interrupt_receiver.recv().await.unwrap_or_default()
839 })
840 .await
841 .unwrap_or_default();
842 let slept = time_elapsed(&now).as_secs();
843 timeout = Some(cmp::max(
844 t,
845 slept.saturating_add(rand::random_range((slept / 2)..=slept)),
846 ));
847 } else {
848 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
849 idle_interrupt_receiver.recv().await.unwrap_or_default();
850 };
851
852 info!(ctx, "SMTP fake idle interrupted.")
853 }
854 };
855
856 stop_token
857 .cancelled()
858 .map(|_| {
859 info!(ctx, "Shutting down SMTP loop.");
860 })
861 .race(fut)
862 .await;
863}
864
865impl Scheduler {
866 pub async fn start(ctx: &Context) -> Result<Self> {
868 let (smtp, smtp_handlers) = SmtpConnectionState::new();
869
870 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
871 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
872 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
873
874 let mut inboxes = Vec::new();
875 let mut oboxes = Vec::new();
876 let mut start_recvs = Vec::new();
877
878 for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
879 let (conn_state, inbox_handlers) =
880 ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
881 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
882 let handle = {
883 let ctx = ctx.clone();
884 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
885 };
886 let host = configured_login_param
887 .addr
888 .split("@")
889 .last()
890 .context("address has no host")?
891 .to_owned();
892 let inbox = SchedBox {
893 host: host.clone(),
894 meaning: FolderMeaning::Inbox,
895 conn_state,
896 handle,
897 };
898 inboxes.push(inbox);
899 start_recvs.push(inbox_start_recv);
900
901 if ctx.should_watch_mvbox().await? {
902 let (conn_state, handlers) =
903 ImapConnectionState::new(ctx, transport_id, configured_login_param).await?;
904 let (start_send, start_recv) = oneshot::channel();
905 let ctx = ctx.clone();
906 let meaning = FolderMeaning::Mvbox;
907 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
908 oboxes.push(SchedBox {
909 host,
910 meaning,
911 conn_state,
912 handle,
913 });
914 start_recvs.push(start_recv);
915 }
916 }
917
918 let smtp_handle = {
919 let ctx = ctx.clone();
920 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
921 };
922 start_recvs.push(smtp_start_recv);
923
924 let ephemeral_handle = {
925 let ctx = ctx.clone();
926 task::spawn(async move {
927 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
928 })
929 };
930
931 let location_handle = {
932 let ctx = ctx.clone();
933 task::spawn(async move {
934 location::location_loop(&ctx, location_interrupt_recv).await;
935 })
936 };
937
938 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
939
940 let res = Self {
941 inboxes,
942 oboxes,
943 smtp,
944 smtp_handle,
945 ephemeral_handle,
946 ephemeral_interrupt_send,
947 location_handle,
948 location_interrupt_send,
949 recently_seen_loop,
950 };
951
952 if let Err(err) = try_join_all(start_recvs).await {
954 bail!("failed to start scheduler: {err}");
955 }
956
957 info!(ctx, "scheduler is running");
958 Ok(res)
959 }
960
961 fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
962 self.inboxes.iter().chain(self.oboxes.iter())
963 }
964
965 fn maybe_network(&self) {
966 for b in self.boxes() {
967 b.conn_state.interrupt();
968 }
969 self.interrupt_smtp();
970 }
971
972 fn maybe_network_lost(&self) {
973 for b in self.boxes() {
974 b.conn_state.interrupt();
975 }
976 self.interrupt_smtp();
977 }
978
979 fn interrupt_inbox(&self) {
980 for b in &self.inboxes {
981 b.conn_state.interrupt();
982 }
983 }
984
985 fn interrupt_oboxes(&self) {
986 for b in &self.oboxes {
987 b.conn_state.interrupt();
988 }
989 }
990
991 fn interrupt_smtp(&self) {
992 self.smtp.interrupt();
993 }
994
995 fn interrupt_ephemeral_task(&self) {
996 self.ephemeral_interrupt_send.try_send(()).ok();
997 }
998
999 fn interrupt_location(&self) {
1000 self.location_interrupt_send.try_send(()).ok();
1001 }
1002
1003 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
1004 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
1005 }
1006
1007 pub(crate) async fn stop(self, context: &Context) {
1012 for b in self.boxes() {
1014 b.conn_state.stop();
1015 }
1016 self.smtp.stop();
1017
1018 let timeout_duration = std::time::Duration::from_secs(30);
1020
1021 let tracker = TaskTracker::new();
1022 for b in self.inboxes.into_iter().chain(self.oboxes.into_iter()) {
1023 let context = context.clone();
1024 tracker.spawn(async move {
1025 tokio::time::timeout(timeout_duration, b.handle)
1026 .await
1027 .log_err(&context)
1028 });
1029 }
1030 {
1031 let context = context.clone();
1032 tracker.spawn(async move {
1033 tokio::time::timeout(timeout_duration, self.smtp_handle)
1034 .await
1035 .log_err(&context)
1036 });
1037 }
1038 tracker.close();
1039 tracker.wait().await;
1040
1041 self.ephemeral_handle.abort();
1046 self.ephemeral_handle.await.ok();
1047 self.location_handle.abort();
1048 self.location_handle.await.ok();
1049 self.recently_seen_loop.abort().await;
1050 }
1051}
1052
1053#[derive(Debug)]
1055struct ConnectionState {
1056 stop_token: CancellationToken,
1058 idle_interrupt_sender: Sender<()>,
1060 connectivity: ConnectivityStore,
1062}
1063
1064impl ConnectionState {
1065 fn stop(&self) {
1067 self.stop_token.cancel();
1069 }
1070
1071 fn interrupt(&self) {
1072 self.idle_interrupt_sender.try_send(()).ok();
1074 }
1075}
1076
1077#[derive(Debug)]
1078pub(crate) struct SmtpConnectionState {
1079 state: ConnectionState,
1080}
1081
1082impl SmtpConnectionState {
1083 fn new() -> (Self, SmtpConnectionHandlers) {
1084 let stop_token = CancellationToken::new();
1085 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1086
1087 let handlers = SmtpConnectionHandlers {
1088 connection: Smtp::new(),
1089 stop_token: stop_token.clone(),
1090 idle_interrupt_receiver,
1091 };
1092
1093 let state = ConnectionState {
1094 stop_token,
1095 idle_interrupt_sender,
1096 connectivity: handlers.connection.connectivity.clone(),
1097 };
1098
1099 let conn = SmtpConnectionState { state };
1100
1101 (conn, handlers)
1102 }
1103
1104 fn interrupt(&self) {
1106 self.state.interrupt();
1107 }
1108
1109 fn stop(&self) {
1111 self.state.stop();
1112 }
1113}
1114
1115struct SmtpConnectionHandlers {
1116 connection: Smtp,
1117 stop_token: CancellationToken,
1118 idle_interrupt_receiver: Receiver<()>,
1119}
1120
1121#[derive(Debug)]
1122pub(crate) struct ImapConnectionState {
1123 state: ConnectionState,
1124}
1125
1126impl ImapConnectionState {
1127 async fn new(
1129 context: &Context,
1130 transport_id: u32,
1131 login_param: ConfiguredLoginParam,
1132 ) -> Result<(Self, ImapConnectionHandlers)> {
1133 let stop_token = CancellationToken::new();
1134 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1135
1136 let handlers = ImapConnectionHandlers {
1137 connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
1138 .await?,
1139 stop_token: stop_token.clone(),
1140 };
1141
1142 let state = ConnectionState {
1143 stop_token,
1144 idle_interrupt_sender,
1145 connectivity: handlers.connection.connectivity.clone(),
1146 };
1147
1148 let conn = ImapConnectionState { state };
1149
1150 Ok((conn, handlers))
1151 }
1152
1153 fn interrupt(&self) {
1155 self.state.interrupt();
1156 }
1157
1158 fn stop(&self) {
1160 self.state.stop();
1161 }
1162}
1163
1164#[derive(Debug)]
1165struct ImapConnectionHandlers {
1166 connection: Imap,
1167 stop_token: CancellationToken,
1168}