1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::Config;
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::smtp::{Smtp, send_smtp_messages};
24use crate::sql;
25use crate::stats::maybe_send_stats;
26use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
27use crate::transport::ConfiguredLoginParam;
28use crate::{constants, stats};
29
30pub(crate) mod connectivity;
31
32#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39 inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43 pub(crate) fn new() -> Self {
44 Default::default()
45 }
46
47 pub(crate) async fn is_running(&self) -> bool {
49 let inner = self.inner.read().await;
50 matches!(*inner, InnerSchedulerState::Started(_))
51 }
52
53 pub(crate) async fn start(&self, context: &Context) {
55 let mut inner = self.inner.write().await;
56 match *inner {
57 InnerSchedulerState::Started(_) => (),
58 InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
59 InnerSchedulerState::Paused {
60 ref mut started, ..
61 } => *started = true,
62 }
63 context.update_connectivities(&inner);
64 }
65
66 async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
68 info!(context, "starting IO");
69
70 context.new_msgs_notify.notify_one();
73
74 match Scheduler::start(context).await {
75 Ok(scheduler) => {
76 *inner = InnerSchedulerState::Started(scheduler);
77 context.emit_event(EventType::ConnectivityChanged);
78 }
79 Err(err) => error!(context, "Failed to start IO: {:#}", err),
80 }
81 }
82
83 pub(crate) async fn stop(&self, context: &Context) {
85 let mut inner = self.inner.write().await;
86 match *inner {
87 InnerSchedulerState::Started(_) => {
88 Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
89 }
90 InnerSchedulerState::Stopped => (),
91 InnerSchedulerState::Paused {
92 ref mut started, ..
93 } => *started = false,
94 }
95 context.update_connectivities(&inner);
96 }
97
98 async fn do_stop(
100 inner: &mut InnerSchedulerState,
101 context: &Context,
102 new_state: InnerSchedulerState,
103 ) {
104 info!(context, "stopping IO");
110
111 context.new_msgs_notify.notify_one();
114
115 let debug_logging = context
116 .debug_logging
117 .write()
118 .expect("RwLock is poisoned")
119 .take();
120 if let Some(debug_logging) = debug_logging {
121 debug_logging.loop_handle.abort();
122 debug_logging.loop_handle.await.ok();
123 }
124 let prev_state = std::mem::replace(inner, new_state);
125 context.emit_event(EventType::ConnectivityChanged);
126 match prev_state {
127 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
128 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
129 }
130 }
131
132 pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
141 {
142 let mut inner = self.inner.write().await;
143 match *inner {
144 InnerSchedulerState::Started(_) => {
145 let new_state = InnerSchedulerState::Paused {
146 started: true,
147 pause_guards_count: NonZeroUsize::MIN,
148 };
149 Self::do_stop(&mut inner, context, new_state).await;
150 }
151 InnerSchedulerState::Stopped => {
152 *inner = InnerSchedulerState::Paused {
153 started: false,
154 pause_guards_count: NonZeroUsize::MIN,
155 };
156 }
157 InnerSchedulerState::Paused {
158 ref mut pause_guards_count,
159 ..
160 } => {
161 *pause_guards_count = pause_guards_count
162 .checked_add(1)
163 .ok_or_else(|| Error::msg("Too many pause guards active"))?
164 }
165 }
166 context.update_connectivities(&inner);
167 }
168
169 let (tx, rx) = oneshot::channel();
170 let context = context.clone();
171 tokio::spawn(async move {
172 rx.await.ok();
173 let mut inner = context.scheduler.inner.write().await;
174 match *inner {
175 InnerSchedulerState::Started(_) => {
176 warn!(&context, "IoPausedGuard resume: started instead of paused");
177 }
178 InnerSchedulerState::Stopped => {
179 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
180 }
181 InnerSchedulerState::Paused {
182 ref started,
183 ref mut pause_guards_count,
184 } => {
185 if *pause_guards_count == NonZeroUsize::MIN {
186 match *started {
187 true => SchedulerState::do_start(&mut inner, &context).await,
188 false => *inner = InnerSchedulerState::Stopped,
189 }
190 } else {
191 let new_count = pause_guards_count.get() - 1;
192 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
194 }
195 }
196 }
197 context.update_connectivities(&inner);
198 });
199 Ok(IoPausedGuard { sender: Some(tx) })
200 }
201
202 pub(crate) async fn restart(&self, context: &Context) {
204 info!(context, "restarting IO");
205 if self.is_running().await {
206 self.stop(context).await;
207 self.start(context).await;
208 }
209 }
210
211 pub(crate) async fn maybe_network(&self) {
213 let inner = self.inner.read().await;
214 let inboxes = match *inner {
215 InnerSchedulerState::Started(ref scheduler) => {
216 scheduler.maybe_network();
217 scheduler
218 .inboxes
219 .iter()
220 .map(|b| b.conn_state.state.connectivity.clone())
221 .collect::<Vec<_>>()
222 }
223 _ => return,
224 };
225 drop(inner);
226 connectivity::idle_interrupted(inboxes);
227 }
228
229 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
231 let inner = self.inner.read().await;
232 let stores = match *inner {
233 InnerSchedulerState::Started(ref scheduler) => {
234 scheduler.maybe_network_lost();
235 scheduler
236 .boxes()
237 .map(|b| b.conn_state.state.connectivity.clone())
238 .collect()
239 }
240 _ => return,
241 };
242 drop(inner);
243 connectivity::maybe_network_lost(context, stores);
244 }
245
246 pub(crate) async fn interrupt_inbox(&self) {
247 let inner = self.inner.read().await;
248 if let InnerSchedulerState::Started(ref scheduler) = *inner {
249 scheduler.interrupt_inbox();
250 }
251 }
252
253 pub(crate) async fn interrupt_smtp(&self) {
254 let inner = self.inner.read().await;
255 if let InnerSchedulerState::Started(ref scheduler) = *inner {
256 scheduler.interrupt_smtp();
257 }
258 }
259
260 pub(crate) async fn interrupt_ephemeral_task(&self) {
261 let inner = self.inner.read().await;
262 if let InnerSchedulerState::Started(ref scheduler) = *inner {
263 scheduler.interrupt_ephemeral_task();
264 }
265 }
266
267 pub(crate) async fn interrupt_location(&self) {
268 let inner = self.inner.read().await;
269 if let InnerSchedulerState::Started(ref scheduler) = *inner {
270 scheduler.interrupt_location();
271 }
272 }
273
274 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
275 let inner = self.inner.read().await;
276 if let InnerSchedulerState::Started(ref scheduler) = *inner {
277 scheduler.interrupt_recently_seen(contact_id, timestamp);
278 }
279 }
280}
281
282#[derive(Debug, Default)]
283pub(crate) enum InnerSchedulerState {
284 Started(Scheduler),
285 #[default]
286 Stopped,
287 Paused {
288 started: bool,
289 pause_guards_count: NonZeroUsize,
290 },
291}
292
293#[derive(Default, Debug)]
298pub(crate) struct IoPausedGuard {
299 sender: Option<oneshot::Sender<()>>,
300}
301
302impl Drop for IoPausedGuard {
303 fn drop(&mut self) {
304 if let Some(sender) = self.sender.take() {
305 sender.send(()).ok();
307 }
308 }
309}
310
311#[derive(Debug)]
312struct SchedBox {
313 addr: String,
315
316 folder: String,
318
319 conn_state: ImapConnectionState,
320
321 handle: task::JoinHandle<()>,
323}
324
325#[derive(Debug)]
327pub(crate) struct Scheduler {
328 inboxes: Vec<SchedBox>,
330 smtp: SmtpConnectionState,
331 smtp_handle: task::JoinHandle<()>,
332 ephemeral_handle: task::JoinHandle<()>,
333 ephemeral_interrupt_send: Sender<()>,
334 location_handle: task::JoinHandle<()>,
335 location_interrupt_send: Sender<()>,
336
337 recently_seen_loop: RecentlySeenLoop,
338}
339
340async fn inbox_loop(
341 ctx: Context,
342 started: oneshot::Sender<()>,
343 inbox_handlers: ImapConnectionHandlers,
344) {
345 use futures::future::FutureExt;
346
347 info!(ctx, "Starting inbox loop.");
348 let ImapConnectionHandlers {
349 mut connection,
350 stop_token,
351 } = inbox_handlers;
352
353 let transport_id = connection.transport_id();
354 let ctx1 = ctx.clone();
355 let fut = async move {
356 let ctx = ctx1;
357 if let Err(()) = started.send(()) {
358 warn!(ctx, "Inbox loop, missing started receiver.");
359 return;
360 };
361
362 let mut old_session: Option<Session> = None;
363 loop {
364 let session = if let Some(session) = old_session.take() {
365 session
366 } else {
367 info!(
368 ctx,
369 "Transport {transport_id}: Preparing new IMAP session for inbox."
370 );
371 match connection.prepare(&ctx).await {
372 Err(err) => {
373 warn!(
374 ctx,
375 "Transport {transport_id}: Failed to prepare inbox connection: {err:#}."
376 );
377 continue;
378 }
379 Ok(session) => {
380 info!(
381 ctx,
382 "Transport {transport_id}: Prepared new IMAP session for inbox."
383 );
384 session
385 }
386 }
387 };
388
389 match inbox_fetch_idle(&ctx, &mut connection, session).await {
390 Err(err) => warn!(
391 ctx,
392 "Transport {transport_id}: Failed inbox fetch_idle: {err:#}."
393 ),
394 Ok(session) => {
395 old_session = Some(session);
396 }
397 }
398 }
399 };
400
401 stop_token
402 .cancelled()
403 .map(|_| {
404 info!(ctx, "Transport {transport_id}: Shutting down inbox loop.");
405 })
406 .race(fut)
407 .await;
408}
409
410async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
411 let transport_id = session.transport_id();
412
413 if ctx.quota_needs_update(session.transport_id(), 60).await
415 && let Err(err) = ctx.update_recent_quota(&mut session, &imap.folder).await
416 {
417 warn!(
418 ctx,
419 "Transport {transport_id}: Failed to update quota: {err:#}."
420 );
421 }
422
423 if let Ok(()) = imap.resync_request_receiver.try_recv()
424 && let Err(err) = session.resync_folders(ctx).await
425 {
426 warn!(
427 ctx,
428 "Transport {transport_id}: Failed to resync folders: {err:#}."
429 );
430 imap.resync_request_sender.try_send(()).ok();
431 }
432
433 maybe_add_time_based_warnings(ctx).await;
434
435 match ctx.get_config_i64(Config::LastHousekeeping).await {
436 Ok(last_housekeeping_time) => {
437 let next_housekeeping_time =
438 last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
439 if next_housekeeping_time <= time() {
440 sql::housekeeping(ctx).await.log_err(ctx).ok();
441 }
442 }
443 Err(err) => {
444 warn!(
445 ctx,
446 "Transport {transport_id}: Failed to get last housekeeping time: {err:#}"
447 );
448 }
449 };
450
451 maybe_send_stats(ctx).await.log_err(ctx).ok();
452
453 session
454 .update_metadata(ctx)
455 .await
456 .context("update_metadata")?;
457 session
458 .register_token(ctx)
459 .await
460 .context("Failed to register push token")?;
461
462 let session = fetch_idle(ctx, imap, session).await?;
463 Ok(session)
464}
465
466async fn fetch_idle(ctx: &Context, connection: &mut Imap, mut session: Session) -> Result<Session> {
472 let transport_id = session.transport_id();
473
474 let watch_folder = connection.folder.clone();
475
476 session
477 .store_seen_flags_on_imap(ctx)
478 .await
479 .context("store_seen_flags_on_imap")?;
480
481 connection
483 .fetch_move_delete(ctx, &mut session, &watch_folder)
484 .await
485 .context("fetch_move_delete")?;
486
487 delete_expired_imap_messages(ctx)
492 .await
493 .context("delete_expired_imap_messages")?;
494
495 download_known_post_messages_without_pre_message(ctx, &mut session).await?;
496 download_msgs(ctx, &mut session)
497 .await
498 .context("download_msgs")?;
499
500 session
502 .sync_seen_flags(ctx, &watch_folder)
503 .await
504 .context("sync_seen_flags")
505 .log_err(ctx)
506 .ok();
507
508 connection.connectivity.set_idle(ctx);
509
510 ctx.emit_event(EventType::ImapInboxIdle);
511
512 if !session.can_idle() {
513 info!(
514 ctx,
515 "Transport {transport_id}: IMAP session does not support IDLE, going to fake idle."
516 );
517 connection.fake_idle(ctx, &watch_folder).await?;
518 return Ok(session);
519 }
520
521 if ctx
522 .get_config_bool(Config::DisableIdle)
523 .await
524 .context("Failed to get disable_idle config")
525 .log_err(ctx)
526 .unwrap_or_default()
527 {
528 info!(
529 ctx,
530 "Transport {transport_id}: IMAP IDLE is disabled, going to fake idle."
531 );
532 connection.fake_idle(ctx, &watch_folder).await?;
533 return Ok(session);
534 }
535
536 let session = session
537 .idle(
538 ctx,
539 connection.idle_interrupt_receiver.clone(),
540 &watch_folder,
541 )
542 .await
543 .context("idle")?;
544
545 Ok(session)
546}
547
548async fn smtp_loop(
549 ctx: Context,
550 started: oneshot::Sender<()>,
551 smtp_handlers: SmtpConnectionHandlers,
552) {
553 use futures::future::FutureExt;
554
555 info!(ctx, "Starting SMTP loop.");
556 let SmtpConnectionHandlers {
557 mut connection,
558 stop_token,
559 idle_interrupt_receiver,
560 } = smtp_handlers;
561
562 let ctx1 = ctx.clone();
563 let fut = async move {
564 let ctx = ctx1;
565 if let Err(()) = started.send(()) {
566 warn!(&ctx, "SMTP loop, missing started receiver.");
567 return;
568 }
569
570 let mut timeout = None;
571 loop {
572 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
573 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
574 timeout = Some(timeout.unwrap_or(30));
575 } else {
576 timeout = None;
577 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
578 if !duration_until_can_send.is_zero() {
579 info!(
580 ctx,
581 "smtp got rate limited, waiting for {} until can send again",
582 duration_to_str(duration_until_can_send)
583 );
584 tokio::time::sleep(duration_until_can_send).await;
585 continue;
586 }
587 }
588
589 stats::maybe_update_message_stats(&ctx)
590 .await
591 .log_err(&ctx)
592 .ok();
593
594 info!(ctx, "SMTP fake idle started.");
596 match &connection.last_send_error {
597 None => connection.connectivity.set_idle(&ctx),
598 Some(err) => connection.connectivity.set_err(&ctx, err.clone()),
599 }
600
601 if let Some(t) = timeout {
606 let now = tools::Time::now();
607 info!(
608 ctx,
609 "SMTP has messages to retry, planning to retry {t} seconds later."
610 );
611 let duration = std::time::Duration::from_secs(t);
612 tokio::time::timeout(duration, async {
613 idle_interrupt_receiver.recv().await.unwrap_or_default()
614 })
615 .await
616 .unwrap_or_default();
617 let slept = time_elapsed(&now).as_secs();
618 timeout = Some(cmp::max(
619 t,
620 slept.saturating_add(rand::random_range((slept / 2)..=slept)),
621 ));
622 } else {
623 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
624 idle_interrupt_receiver.recv().await.unwrap_or_default();
625 };
626
627 info!(ctx, "SMTP fake idle interrupted.")
628 }
629 };
630
631 stop_token
632 .cancelled()
633 .map(|_| {
634 info!(ctx, "Shutting down SMTP loop.");
635 })
636 .race(fut)
637 .await;
638}
639
640impl Scheduler {
641 pub async fn start(ctx: &Context) -> Result<Self> {
643 let (smtp, smtp_handlers) = SmtpConnectionState::new();
644
645 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
646 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
647 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
648
649 let mut inboxes = Vec::new();
650 let mut start_recvs = Vec::new();
651
652 for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
653 let (conn_state, inbox_handlers) =
654 ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
655 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
656 let handle = {
657 let ctx = ctx.clone();
658 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
659 };
660 let addr = configured_login_param.addr.clone();
661 let folder = configured_login_param
662 .imap_folder
663 .unwrap_or_else(|| "INBOX".to_string());
664 let inbox = SchedBox {
665 addr: addr.clone(),
666 folder,
667 conn_state,
668 handle,
669 };
670 inboxes.push(inbox);
671 start_recvs.push(inbox_start_recv);
672 }
673
674 let smtp_handle = {
675 let ctx = ctx.clone();
676 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
677 };
678 start_recvs.push(smtp_start_recv);
679
680 let ephemeral_handle = {
681 let ctx = ctx.clone();
682 task::spawn(async move {
683 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
684 })
685 };
686
687 let location_handle = {
688 let ctx = ctx.clone();
689 task::spawn(async move {
690 location::location_loop(&ctx, location_interrupt_recv).await;
691 })
692 };
693
694 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
695
696 let res = Self {
697 inboxes,
698 smtp,
699 smtp_handle,
700 ephemeral_handle,
701 ephemeral_interrupt_send,
702 location_handle,
703 location_interrupt_send,
704 recently_seen_loop,
705 };
706
707 if let Err(err) = try_join_all(start_recvs).await {
709 bail!("failed to start scheduler: {err}");
710 }
711
712 info!(ctx, "scheduler is running");
713 Ok(res)
714 }
715
716 fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
717 self.inboxes.iter()
718 }
719
720 fn maybe_network(&self) {
721 for b in self.boxes() {
722 b.conn_state.interrupt();
723 }
724 self.interrupt_smtp();
725 }
726
727 fn maybe_network_lost(&self) {
728 for b in self.boxes() {
729 b.conn_state.interrupt();
730 }
731 self.interrupt_smtp();
732 }
733
734 fn interrupt_inbox(&self) {
735 for b in &self.inboxes {
736 b.conn_state.interrupt();
737 }
738 }
739
740 fn interrupt_smtp(&self) {
741 self.smtp.interrupt();
742 }
743
744 fn interrupt_ephemeral_task(&self) {
745 self.ephemeral_interrupt_send.try_send(()).ok();
746 }
747
748 fn interrupt_location(&self) {
749 self.location_interrupt_send.try_send(()).ok();
750 }
751
752 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
753 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
754 }
755
756 pub(crate) async fn stop(self, context: &Context) {
761 for b in self.boxes() {
763 b.conn_state.stop();
764 }
765 self.smtp.stop();
766
767 let timeout_duration = std::time::Duration::from_secs(30);
769
770 let tracker = TaskTracker::new();
771 for b in self.inboxes {
772 let context = context.clone();
773 tracker.spawn(async move {
774 tokio::time::timeout(timeout_duration, b.handle)
775 .await
776 .log_err(&context)
777 });
778 }
779 {
780 let context = context.clone();
781 tracker.spawn(async move {
782 tokio::time::timeout(timeout_duration, self.smtp_handle)
783 .await
784 .log_err(&context)
785 });
786 }
787 tracker.close();
788 tracker.wait().await;
789
790 self.ephemeral_handle.abort();
795 self.ephemeral_handle.await.ok();
796 self.location_handle.abort();
797 self.location_handle.await.ok();
798 self.recently_seen_loop.abort().await;
799 }
800}
801
802#[derive(Debug)]
804struct ConnectionState {
805 stop_token: CancellationToken,
807 idle_interrupt_sender: Sender<()>,
809 connectivity: ConnectivityStore,
811}
812
813impl ConnectionState {
814 fn stop(&self) {
816 self.stop_token.cancel();
818 }
819
820 fn interrupt(&self) {
821 self.idle_interrupt_sender.try_send(()).ok();
823 }
824}
825
826#[derive(Debug)]
827pub(crate) struct SmtpConnectionState {
828 state: ConnectionState,
829}
830
831impl SmtpConnectionState {
832 fn new() -> (Self, SmtpConnectionHandlers) {
833 let stop_token = CancellationToken::new();
834 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
835
836 let handlers = SmtpConnectionHandlers {
837 connection: Smtp::new(),
838 stop_token: stop_token.clone(),
839 idle_interrupt_receiver,
840 };
841
842 let state = ConnectionState {
843 stop_token,
844 idle_interrupt_sender,
845 connectivity: handlers.connection.connectivity.clone(),
846 };
847
848 let conn = SmtpConnectionState { state };
849
850 (conn, handlers)
851 }
852
853 fn interrupt(&self) {
855 self.state.interrupt();
856 }
857
858 fn stop(&self) {
860 self.state.stop();
861 }
862}
863
864struct SmtpConnectionHandlers {
865 connection: Smtp,
866 stop_token: CancellationToken,
867 idle_interrupt_receiver: Receiver<()>,
868}
869
870#[derive(Debug)]
871pub(crate) struct ImapConnectionState {
872 state: ConnectionState,
873}
874
875impl ImapConnectionState {
876 async fn new(
878 context: &Context,
879 transport_id: u32,
880 login_param: ConfiguredLoginParam,
881 ) -> Result<(Self, ImapConnectionHandlers)> {
882 let stop_token = CancellationToken::new();
883 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
884
885 let handlers = ImapConnectionHandlers {
886 connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
887 .await?,
888 stop_token: stop_token.clone(),
889 };
890
891 let state = ConnectionState {
892 stop_token,
893 idle_interrupt_sender,
894 connectivity: handlers.connection.connectivity.clone(),
895 };
896
897 let conn = ImapConnectionState { state };
898
899 Ok((conn, handlers))
900 }
901
902 fn interrupt(&self) {
904 self.state.interrupt();
905 }
906
907 fn stop(&self) {
909 self.state.stop();
910 }
911}
912
913#[derive(Debug)]
914struct ImapConnectionHandlers {
915 connection: Imap,
916 stop_token: CancellationToken,
917}