1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::Config;
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{FolderMeaning, Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::smtp::{Smtp, send_smtp_messages};
24use crate::sql;
25use crate::stats::maybe_send_stats;
26use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
27use crate::transport::ConfiguredLoginParam;
28use crate::{constants, stats};
29
30pub(crate) mod connectivity;
31
32#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39 inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43 pub(crate) fn new() -> Self {
44 Default::default()
45 }
46
47 pub(crate) async fn is_running(&self) -> bool {
49 let inner = self.inner.read().await;
50 matches!(*inner, InnerSchedulerState::Started(_))
51 }
52
53 pub(crate) async fn start(&self, context: &Context) {
55 let mut inner = self.inner.write().await;
56 match *inner {
57 InnerSchedulerState::Started(_) => (),
58 InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
59 InnerSchedulerState::Paused {
60 ref mut started, ..
61 } => *started = true,
62 }
63 context.update_connectivities(&inner);
64 }
65
66 async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
68 info!(context, "starting IO");
69
70 context.new_msgs_notify.notify_one();
73
74 match Scheduler::start(context).await {
75 Ok(scheduler) => {
76 *inner = InnerSchedulerState::Started(scheduler);
77 context.emit_event(EventType::ConnectivityChanged);
78 }
79 Err(err) => error!(context, "Failed to start IO: {:#}", err),
80 }
81 }
82
83 pub(crate) async fn stop(&self, context: &Context) {
85 let mut inner = self.inner.write().await;
86 match *inner {
87 InnerSchedulerState::Started(_) => {
88 Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
89 }
90 InnerSchedulerState::Stopped => (),
91 InnerSchedulerState::Paused {
92 ref mut started, ..
93 } => *started = false,
94 }
95 context.update_connectivities(&inner);
96 }
97
98 async fn do_stop(
100 inner: &mut InnerSchedulerState,
101 context: &Context,
102 new_state: InnerSchedulerState,
103 ) {
104 info!(context, "stopping IO");
110
111 context.new_msgs_notify.notify_one();
114
115 let debug_logging = context
116 .debug_logging
117 .write()
118 .expect("RwLock is poisoned")
119 .take();
120 if let Some(debug_logging) = debug_logging {
121 debug_logging.loop_handle.abort();
122 debug_logging.loop_handle.await.ok();
123 }
124 let prev_state = std::mem::replace(inner, new_state);
125 context.emit_event(EventType::ConnectivityChanged);
126 match prev_state {
127 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
128 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
129 }
130 }
131
132 pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
141 {
142 let mut inner = self.inner.write().await;
143 match *inner {
144 InnerSchedulerState::Started(_) => {
145 let new_state = InnerSchedulerState::Paused {
146 started: true,
147 pause_guards_count: NonZeroUsize::MIN,
148 };
149 Self::do_stop(&mut inner, context, new_state).await;
150 }
151 InnerSchedulerState::Stopped => {
152 *inner = InnerSchedulerState::Paused {
153 started: false,
154 pause_guards_count: NonZeroUsize::MIN,
155 };
156 }
157 InnerSchedulerState::Paused {
158 ref mut pause_guards_count,
159 ..
160 } => {
161 *pause_guards_count = pause_guards_count
162 .checked_add(1)
163 .ok_or_else(|| Error::msg("Too many pause guards active"))?
164 }
165 }
166 context.update_connectivities(&inner);
167 }
168
169 let (tx, rx) = oneshot::channel();
170 let context = context.clone();
171 tokio::spawn(async move {
172 rx.await.ok();
173 let mut inner = context.scheduler.inner.write().await;
174 match *inner {
175 InnerSchedulerState::Started(_) => {
176 warn!(&context, "IoPausedGuard resume: started instead of paused");
177 }
178 InnerSchedulerState::Stopped => {
179 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
180 }
181 InnerSchedulerState::Paused {
182 ref started,
183 ref mut pause_guards_count,
184 } => {
185 if *pause_guards_count == NonZeroUsize::MIN {
186 match *started {
187 true => SchedulerState::do_start(&mut inner, &context).await,
188 false => *inner = InnerSchedulerState::Stopped,
189 }
190 } else {
191 let new_count = pause_guards_count.get() - 1;
192 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
194 }
195 }
196 }
197 context.update_connectivities(&inner);
198 });
199 Ok(IoPausedGuard { sender: Some(tx) })
200 }
201
202 pub(crate) async fn restart(&self, context: &Context) {
204 info!(context, "restarting IO");
205 if self.is_running().await {
206 self.stop(context).await;
207 self.start(context).await;
208 }
209 }
210
211 pub(crate) async fn maybe_network(&self) {
213 let inner = self.inner.read().await;
214 let (inboxes, oboxes) = match *inner {
215 InnerSchedulerState::Started(ref scheduler) => {
216 scheduler.maybe_network();
217 let inboxes = scheduler
218 .inboxes
219 .iter()
220 .map(|b| b.conn_state.state.connectivity.clone())
221 .collect::<Vec<_>>();
222 let oboxes = scheduler
223 .oboxes
224 .iter()
225 .map(|b| b.conn_state.state.connectivity.clone())
226 .collect::<Vec<_>>();
227 (inboxes, oboxes)
228 }
229 _ => return,
230 };
231 drop(inner);
232 connectivity::idle_interrupted(inboxes, oboxes);
233 }
234
235 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
237 let inner = self.inner.read().await;
238 let stores = match *inner {
239 InnerSchedulerState::Started(ref scheduler) => {
240 scheduler.maybe_network_lost();
241 scheduler
242 .boxes()
243 .map(|b| b.conn_state.state.connectivity.clone())
244 .collect()
245 }
246 _ => return,
247 };
248 drop(inner);
249 connectivity::maybe_network_lost(context, stores);
250 }
251
252 pub(crate) async fn interrupt_inbox(&self) {
253 let inner = self.inner.read().await;
254 if let InnerSchedulerState::Started(ref scheduler) = *inner {
255 scheduler.interrupt_inbox();
256 }
257 }
258
259 pub(crate) async fn interrupt_smtp(&self) {
260 let inner = self.inner.read().await;
261 if let InnerSchedulerState::Started(ref scheduler) = *inner {
262 scheduler.interrupt_smtp();
263 }
264 }
265
266 pub(crate) async fn interrupt_ephemeral_task(&self) {
267 let inner = self.inner.read().await;
268 if let InnerSchedulerState::Started(ref scheduler) = *inner {
269 scheduler.interrupt_ephemeral_task();
270 }
271 }
272
273 pub(crate) async fn interrupt_location(&self) {
274 let inner = self.inner.read().await;
275 if let InnerSchedulerState::Started(ref scheduler) = *inner {
276 scheduler.interrupt_location();
277 }
278 }
279
280 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
281 let inner = self.inner.read().await;
282 if let InnerSchedulerState::Started(ref scheduler) = *inner {
283 scheduler.interrupt_recently_seen(contact_id, timestamp);
284 }
285 }
286}
287
288#[derive(Debug, Default)]
289pub(crate) enum InnerSchedulerState {
290 Started(Scheduler),
291 #[default]
292 Stopped,
293 Paused {
294 started: bool,
295 pause_guards_count: NonZeroUsize,
296 },
297}
298
299#[derive(Default, Debug)]
304pub(crate) struct IoPausedGuard {
305 sender: Option<oneshot::Sender<()>>,
306}
307
308impl Drop for IoPausedGuard {
309 fn drop(&mut self) {
310 if let Some(sender) = self.sender.take() {
311 sender.send(()).ok();
313 }
314 }
315}
316
317#[derive(Debug)]
318struct SchedBox {
319 addr: String,
321 meaning: FolderMeaning,
322 conn_state: ImapConnectionState,
323
324 handle: task::JoinHandle<()>,
326}
327
328#[derive(Debug)]
330pub(crate) struct Scheduler {
331 inboxes: Vec<SchedBox>,
333 oboxes: Vec<SchedBox>,
335 smtp: SmtpConnectionState,
336 smtp_handle: task::JoinHandle<()>,
337 ephemeral_handle: task::JoinHandle<()>,
338 ephemeral_interrupt_send: Sender<()>,
339 location_handle: task::JoinHandle<()>,
340 location_interrupt_send: Sender<()>,
341
342 recently_seen_loop: RecentlySeenLoop,
343}
344
345async fn inbox_loop(
346 ctx: Context,
347 started: oneshot::Sender<()>,
348 inbox_handlers: ImapConnectionHandlers,
349) {
350 use futures::future::FutureExt;
351
352 info!(ctx, "Starting inbox loop.");
353 let ImapConnectionHandlers {
354 mut connection,
355 stop_token,
356 } = inbox_handlers;
357
358 let ctx1 = ctx.clone();
359 let fut = async move {
360 let ctx = ctx1;
361 if let Err(()) = started.send(()) {
362 warn!(ctx, "Inbox loop, missing started receiver.");
363 return;
364 };
365
366 let mut old_session: Option<Session> = None;
367 loop {
368 let session = if let Some(session) = old_session.take() {
369 session
370 } else {
371 info!(ctx, "Preparing new IMAP session for inbox.");
372 match connection.prepare(&ctx).await {
373 Err(err) => {
374 warn!(ctx, "Failed to prepare inbox connection: {err:#}.");
375 continue;
376 }
377 Ok(session) => session,
378 }
379 };
380
381 match inbox_fetch_idle(&ctx, &mut connection, session).await {
382 Err(err) => warn!(ctx, "Failed inbox fetch_idle: {err:#}."),
383 Ok(session) => {
384 info!(
385 ctx,
386 "IMAP loop iteration for inbox finished, keeping the session."
387 );
388 old_session = Some(session);
389 }
390 }
391 }
392 };
393
394 stop_token
395 .cancelled()
396 .map(|_| {
397 info!(ctx, "Shutting down inbox loop.");
398 })
399 .race(fut)
400 .await;
401}
402
403pub async fn convert_folder_meaning(
409 ctx: &Context,
410 folder_meaning: FolderMeaning,
411) -> Result<Option<(Config, String)>> {
412 let folder_config = match folder_meaning.to_config() {
413 Some(c) => c,
414 None => {
415 return Ok(None);
418 }
419 };
420
421 let folder = ctx
422 .get_config(folder_config)
423 .await
424 .with_context(|| format!("Failed to retrieve {folder_config} folder"))?;
425
426 if let Some(watch_folder) = folder {
427 Ok(Some((folder_config, watch_folder)))
428 } else {
429 Ok(None)
430 }
431}
432
433async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
434 if !ctx.get_config_bool(Config::FixIsChatmail).await? {
435 ctx.set_config_internal(
436 Config::IsChatmail,
437 crate::config::from_bool(session.is_chatmail()),
438 )
439 .await?;
440 }
441
442 if ctx.quota_needs_update(session.transport_id(), 60).await
444 && let Err(err) = ctx.update_recent_quota(&mut session).await
445 {
446 warn!(ctx, "Failed to update quota: {:#}.", err);
447 }
448
449 if let Ok(()) = imap.resync_request_receiver.try_recv()
450 && let Err(err) = session.resync_folders(ctx).await
451 {
452 warn!(ctx, "Failed to resync folders: {:#}.", err);
453 imap.resync_request_sender.try_send(()).ok();
454 }
455
456 maybe_add_time_based_warnings(ctx).await;
457
458 match ctx.get_config_i64(Config::LastHousekeeping).await {
459 Ok(last_housekeeping_time) => {
460 let next_housekeeping_time =
461 last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
462 if next_housekeeping_time <= time() {
463 sql::housekeeping(ctx).await.log_err(ctx).ok();
464 }
465 }
466 Err(err) => {
467 warn!(ctx, "Failed to get last housekeeping time: {}", err);
468 }
469 };
470
471 maybe_send_stats(ctx).await.log_err(ctx).ok();
472
473 session
474 .update_metadata(ctx)
475 .await
476 .context("update_metadata")?;
477 session
478 .register_token(ctx)
479 .await
480 .context("Failed to register push token")?;
481
482 let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?;
483 Ok(session)
484}
485
486async fn fetch_idle(
492 ctx: &Context,
493 connection: &mut Imap,
494 mut session: Session,
495 folder_meaning: FolderMeaning,
496) -> Result<Session> {
497 let Some((folder_config, watch_folder)) = convert_folder_meaning(ctx, folder_meaning).await?
498 else {
499 connection.connectivity.set_not_configured(ctx);
503 connection.idle_interrupt_receiver.recv().await.ok();
504 bail!("Cannot fetch folder {folder_meaning} because it is not configured");
505 };
506
507 if folder_config == Config::ConfiguredInboxFolder {
508 session
509 .store_seen_flags_on_imap(ctx)
510 .await
511 .context("store_seen_flags_on_imap")?;
512 }
513
514 connection
516 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
517 .await
518 .context("fetch_move_delete")?;
519
520 delete_expired_imap_messages(ctx)
525 .await
526 .context("delete_expired_imap_messages")?;
527
528 download_known_post_messages_without_pre_message(ctx, &mut session).await?;
529 download_msgs(ctx, &mut session)
530 .await
531 .context("download_msgs")?;
532
533 if folder_config == Config::ConfiguredInboxFolder {
538 match connection
540 .scan_folders(ctx, &mut session)
541 .await
542 .context("scan_folders")
543 {
544 Err(err) => {
545 warn!(ctx, "{:#}", err);
548 }
549 Ok(true) => {
550 connection
557 .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning)
558 .await
559 .context("fetch_move_delete after scan_folders")?;
560 }
561 Ok(false) => {}
562 }
563 }
564
565 session
567 .sync_seen_flags(ctx, &watch_folder)
568 .await
569 .context("sync_seen_flags")
570 .log_err(ctx)
571 .ok();
572
573 connection.connectivity.set_idle(ctx);
574
575 ctx.emit_event(EventType::ImapInboxIdle);
576
577 if !session.can_idle() {
578 info!(
579 ctx,
580 "IMAP session does not support IDLE, going to fake idle."
581 );
582 connection.fake_idle(ctx, watch_folder).await?;
583 return Ok(session);
584 }
585
586 if ctx
587 .get_config_bool(Config::DisableIdle)
588 .await
589 .context("Failed to get disable_idle config")
590 .log_err(ctx)
591 .unwrap_or_default()
592 {
593 info!(ctx, "IMAP IDLE is disabled, going to fake idle.");
594 connection.fake_idle(ctx, watch_folder).await?;
595 return Ok(session);
596 }
597
598 info!(
599 ctx,
600 "IMAP session in folder {watch_folder:?} supports IDLE, using it."
601 );
602 let session = session
603 .idle(
604 ctx,
605 connection.idle_interrupt_receiver.clone(),
606 &watch_folder,
607 )
608 .await
609 .context("idle")?;
610
611 Ok(session)
612}
613
614async fn simple_imap_loop(
616 ctx: Context,
617 started: oneshot::Sender<()>,
618 inbox_handlers: ImapConnectionHandlers,
619 folder_meaning: FolderMeaning,
620) {
621 use futures::future::FutureExt;
622
623 info!(ctx, "Starting simple loop for {folder_meaning}.");
624 let ImapConnectionHandlers {
625 mut connection,
626 stop_token,
627 } = inbox_handlers;
628
629 let ctx1 = ctx.clone();
630
631 let fut = async move {
632 let ctx = ctx1;
633 if let Err(()) = started.send(()) {
634 warn!(
635 ctx,
636 "Simple imap loop for {folder_meaning}, missing started receiver."
637 );
638 return;
639 }
640
641 let mut old_session: Option<Session> = None;
642 loop {
643 let session = if let Some(session) = old_session.take() {
644 session
645 } else {
646 info!(ctx, "Preparing new IMAP session for {folder_meaning}.");
647 match connection.prepare(&ctx).await {
648 Err(err) => {
649 warn!(
650 ctx,
651 "Failed to prepare {folder_meaning} connection: {err:#}."
652 );
653 continue;
654 }
655 Ok(session) => session,
656 }
657 };
658
659 match fetch_idle(&ctx, &mut connection, session, folder_meaning).await {
660 Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"),
661 Ok(session) => {
662 info!(
663 ctx,
664 "IMAP loop iteration for {folder_meaning} finished, keeping the session"
665 );
666 old_session = Some(session);
667 }
668 }
669 }
670 };
671
672 stop_token
673 .cancelled()
674 .map(|_| {
675 info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
676 })
677 .race(fut)
678 .await;
679}
680
681async fn smtp_loop(
682 ctx: Context,
683 started: oneshot::Sender<()>,
684 smtp_handlers: SmtpConnectionHandlers,
685) {
686 use futures::future::FutureExt;
687
688 info!(ctx, "Starting SMTP loop.");
689 let SmtpConnectionHandlers {
690 mut connection,
691 stop_token,
692 idle_interrupt_receiver,
693 } = smtp_handlers;
694
695 let ctx1 = ctx.clone();
696 let fut = async move {
697 let ctx = ctx1;
698 if let Err(()) = started.send(()) {
699 warn!(&ctx, "SMTP loop, missing started receiver.");
700 return;
701 }
702
703 let mut timeout = None;
704 loop {
705 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
706 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
707 timeout = Some(timeout.unwrap_or(30));
708 } else {
709 timeout = None;
710 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
711 if !duration_until_can_send.is_zero() {
712 info!(
713 ctx,
714 "smtp got rate limited, waiting for {} until can send again",
715 duration_to_str(duration_until_can_send)
716 );
717 tokio::time::sleep(duration_until_can_send).await;
718 continue;
719 }
720 }
721
722 stats::maybe_update_message_stats(&ctx)
723 .await
724 .log_err(&ctx)
725 .ok();
726
727 info!(ctx, "SMTP fake idle started.");
729 match &connection.last_send_error {
730 None => connection.connectivity.set_idle(&ctx),
731 Some(err) => connection.connectivity.set_err(&ctx, err),
732 }
733
734 if let Some(t) = timeout {
739 let now = tools::Time::now();
740 info!(
741 ctx,
742 "SMTP has messages to retry, planning to retry {t} seconds later."
743 );
744 let duration = std::time::Duration::from_secs(t);
745 tokio::time::timeout(duration, async {
746 idle_interrupt_receiver.recv().await.unwrap_or_default()
747 })
748 .await
749 .unwrap_or_default();
750 let slept = time_elapsed(&now).as_secs();
751 timeout = Some(cmp::max(
752 t,
753 slept.saturating_add(rand::random_range((slept / 2)..=slept)),
754 ));
755 } else {
756 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
757 idle_interrupt_receiver.recv().await.unwrap_or_default();
758 };
759
760 info!(ctx, "SMTP fake idle interrupted.")
761 }
762 };
763
764 stop_token
765 .cancelled()
766 .map(|_| {
767 info!(ctx, "Shutting down SMTP loop.");
768 })
769 .race(fut)
770 .await;
771}
772
773impl Scheduler {
774 pub async fn start(ctx: &Context) -> Result<Self> {
776 let (smtp, smtp_handlers) = SmtpConnectionState::new();
777
778 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
779 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
780 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
781
782 let mut inboxes = Vec::new();
783 let mut oboxes = Vec::new();
784 let mut start_recvs = Vec::new();
785
786 for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
787 let (conn_state, inbox_handlers) =
788 ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
789 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
790 let handle = {
791 let ctx = ctx.clone();
792 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
793 };
794 let addr = configured_login_param.addr.clone();
795 let inbox = SchedBox {
796 addr: addr.clone(),
797 meaning: FolderMeaning::Inbox,
798 conn_state,
799 handle,
800 };
801 inboxes.push(inbox);
802 start_recvs.push(inbox_start_recv);
803
804 if ctx.should_watch_mvbox().await? {
805 let (conn_state, handlers) =
806 ImapConnectionState::new(ctx, transport_id, configured_login_param).await?;
807 let (start_send, start_recv) = oneshot::channel();
808 let ctx = ctx.clone();
809 let meaning = FolderMeaning::Mvbox;
810 let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
811 oboxes.push(SchedBox {
812 addr,
813 meaning,
814 conn_state,
815 handle,
816 });
817 start_recvs.push(start_recv);
818 }
819 }
820
821 let smtp_handle = {
822 let ctx = ctx.clone();
823 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
824 };
825 start_recvs.push(smtp_start_recv);
826
827 let ephemeral_handle = {
828 let ctx = ctx.clone();
829 task::spawn(async move {
830 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
831 })
832 };
833
834 let location_handle = {
835 let ctx = ctx.clone();
836 task::spawn(async move {
837 location::location_loop(&ctx, location_interrupt_recv).await;
838 })
839 };
840
841 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
842
843 let res = Self {
844 inboxes,
845 oboxes,
846 smtp,
847 smtp_handle,
848 ephemeral_handle,
849 ephemeral_interrupt_send,
850 location_handle,
851 location_interrupt_send,
852 recently_seen_loop,
853 };
854
855 if let Err(err) = try_join_all(start_recvs).await {
857 bail!("failed to start scheduler: {err}");
858 }
859
860 info!(ctx, "scheduler is running");
861 Ok(res)
862 }
863
864 fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
865 self.inboxes.iter().chain(self.oboxes.iter())
866 }
867
868 fn maybe_network(&self) {
869 for b in self.boxes() {
870 b.conn_state.interrupt();
871 }
872 self.interrupt_smtp();
873 }
874
875 fn maybe_network_lost(&self) {
876 for b in self.boxes() {
877 b.conn_state.interrupt();
878 }
879 self.interrupt_smtp();
880 }
881
882 fn interrupt_inbox(&self) {
883 for b in &self.inboxes {
884 b.conn_state.interrupt();
885 }
886 }
887
888 fn interrupt_smtp(&self) {
889 self.smtp.interrupt();
890 }
891
892 fn interrupt_ephemeral_task(&self) {
893 self.ephemeral_interrupt_send.try_send(()).ok();
894 }
895
896 fn interrupt_location(&self) {
897 self.location_interrupt_send.try_send(()).ok();
898 }
899
900 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
901 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
902 }
903
904 pub(crate) async fn stop(self, context: &Context) {
909 for b in self.boxes() {
911 b.conn_state.stop();
912 }
913 self.smtp.stop();
914
915 let timeout_duration = std::time::Duration::from_secs(30);
917
918 let tracker = TaskTracker::new();
919 for b in self.inboxes.into_iter().chain(self.oboxes.into_iter()) {
920 let context = context.clone();
921 tracker.spawn(async move {
922 tokio::time::timeout(timeout_duration, b.handle)
923 .await
924 .log_err(&context)
925 });
926 }
927 {
928 let context = context.clone();
929 tracker.spawn(async move {
930 tokio::time::timeout(timeout_duration, self.smtp_handle)
931 .await
932 .log_err(&context)
933 });
934 }
935 tracker.close();
936 tracker.wait().await;
937
938 self.ephemeral_handle.abort();
943 self.ephemeral_handle.await.ok();
944 self.location_handle.abort();
945 self.location_handle.await.ok();
946 self.recently_seen_loop.abort().await;
947 }
948}
949
950#[derive(Debug)]
952struct ConnectionState {
953 stop_token: CancellationToken,
955 idle_interrupt_sender: Sender<()>,
957 connectivity: ConnectivityStore,
959}
960
961impl ConnectionState {
962 fn stop(&self) {
964 self.stop_token.cancel();
966 }
967
968 fn interrupt(&self) {
969 self.idle_interrupt_sender.try_send(()).ok();
971 }
972}
973
974#[derive(Debug)]
975pub(crate) struct SmtpConnectionState {
976 state: ConnectionState,
977}
978
979impl SmtpConnectionState {
980 fn new() -> (Self, SmtpConnectionHandlers) {
981 let stop_token = CancellationToken::new();
982 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
983
984 let handlers = SmtpConnectionHandlers {
985 connection: Smtp::new(),
986 stop_token: stop_token.clone(),
987 idle_interrupt_receiver,
988 };
989
990 let state = ConnectionState {
991 stop_token,
992 idle_interrupt_sender,
993 connectivity: handlers.connection.connectivity.clone(),
994 };
995
996 let conn = SmtpConnectionState { state };
997
998 (conn, handlers)
999 }
1000
1001 fn interrupt(&self) {
1003 self.state.interrupt();
1004 }
1005
1006 fn stop(&self) {
1008 self.state.stop();
1009 }
1010}
1011
1012struct SmtpConnectionHandlers {
1013 connection: Smtp,
1014 stop_token: CancellationToken,
1015 idle_interrupt_receiver: Receiver<()>,
1016}
1017
1018#[derive(Debug)]
1019pub(crate) struct ImapConnectionState {
1020 state: ConnectionState,
1021}
1022
1023impl ImapConnectionState {
1024 async fn new(
1026 context: &Context,
1027 transport_id: u32,
1028 login_param: ConfiguredLoginParam,
1029 ) -> Result<(Self, ImapConnectionHandlers)> {
1030 let stop_token = CancellationToken::new();
1031 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
1032
1033 let handlers = ImapConnectionHandlers {
1034 connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
1035 .await?,
1036 stop_token: stop_token.clone(),
1037 };
1038
1039 let state = ConnectionState {
1040 stop_token,
1041 idle_interrupt_sender,
1042 connectivity: handlers.connection.connectivity.clone(),
1043 };
1044
1045 let conn = ImapConnectionState { state };
1046
1047 Ok((conn, handlers))
1048 }
1049
1050 fn interrupt(&self) {
1052 self.state.interrupt();
1053 }
1054
1055 fn stop(&self) {
1057 self.state.stop();
1058 }
1059}
1060
1061#[derive(Debug)]
1062struct ImapConnectionHandlers {
1063 connection: Imap,
1064 stop_token: CancellationToken,
1065}