1use std::cmp;
2use std::num::NonZeroUsize;
3
4use anyhow::{Context as _, Error, Result, bail};
5use async_channel::{self as channel, Receiver, Sender};
6use futures::future::try_join_all;
7use futures_lite::FutureExt;
8use tokio::sync::{RwLock, oneshot};
9use tokio::task;
10use tokio_util::sync::CancellationToken;
11use tokio_util::task::TaskTracker;
12
13pub(crate) use self::connectivity::ConnectivityStore;
14use crate::config::Config;
15use crate::contact::{ContactId, RecentlySeenLoop};
16use crate::context::Context;
17use crate::download::{download_known_post_messages_without_pre_message, download_msgs};
18use crate::ephemeral::{self, delete_expired_imap_messages};
19use crate::events::EventType;
20use crate::imap::{Imap, session::Session};
21use crate::location;
22use crate::log::{LogExt, warn};
23use crate::smtp::{Smtp, send_smtp_messages};
24use crate::sql;
25use crate::stats::maybe_send_stats;
26use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
27use crate::transport::ConfiguredLoginParam;
28use crate::{constants, stats};
29
30pub(crate) mod connectivity;
31
32#[derive(Debug, Default)]
38pub(crate) struct SchedulerState {
39 inner: RwLock<InnerSchedulerState>,
40}
41
42impl SchedulerState {
43 pub(crate) fn new() -> Self {
44 Default::default()
45 }
46
47 pub(crate) async fn is_running(&self) -> bool {
49 let inner = self.inner.read().await;
50 matches!(*inner, InnerSchedulerState::Started(_))
51 }
52
53 pub(crate) async fn start(&self, context: &Context) {
55 let mut inner = self.inner.write().await;
56 match *inner {
57 InnerSchedulerState::Started(_) => (),
58 InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
59 InnerSchedulerState::Paused {
60 ref mut started, ..
61 } => *started = true,
62 }
63 context.update_connectivities(&inner);
64 }
65
66 async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
68 info!(context, "starting IO");
69
70 context.new_msgs_notify.notify_one();
73
74 match Scheduler::start(context).await {
75 Ok(scheduler) => {
76 *inner = InnerSchedulerState::Started(scheduler);
77 context.emit_event(EventType::ConnectivityChanged);
78 }
79 Err(err) => error!(context, "Failed to start IO: {:#}", err),
80 }
81 }
82
83 pub(crate) async fn stop(&self, context: &Context) {
85 let mut inner = self.inner.write().await;
86 match *inner {
87 InnerSchedulerState::Started(_) => {
88 Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
89 }
90 InnerSchedulerState::Stopped => (),
91 InnerSchedulerState::Paused {
92 ref mut started, ..
93 } => *started = false,
94 }
95 context.update_connectivities(&inner);
96 }
97
98 async fn do_stop(
100 inner: &mut InnerSchedulerState,
101 context: &Context,
102 new_state: InnerSchedulerState,
103 ) {
104 info!(context, "stopping IO");
110
111 context.new_msgs_notify.notify_one();
114
115 let debug_logging = context
116 .debug_logging
117 .write()
118 .expect("RwLock is poisoned")
119 .take();
120 if let Some(debug_logging) = debug_logging {
121 debug_logging.loop_handle.abort();
122 debug_logging.loop_handle.await.ok();
123 }
124 let prev_state = std::mem::replace(inner, new_state);
125 context.emit_event(EventType::ConnectivityChanged);
126 match prev_state {
127 InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
128 InnerSchedulerState::Stopped | InnerSchedulerState::Paused { .. } => (),
129 }
130 }
131
132 pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
141 {
142 let mut inner = self.inner.write().await;
143 match *inner {
144 InnerSchedulerState::Started(_) => {
145 let new_state = InnerSchedulerState::Paused {
146 started: true,
147 pause_guards_count: NonZeroUsize::MIN,
148 };
149 Self::do_stop(&mut inner, context, new_state).await;
150 }
151 InnerSchedulerState::Stopped => {
152 *inner = InnerSchedulerState::Paused {
153 started: false,
154 pause_guards_count: NonZeroUsize::MIN,
155 };
156 }
157 InnerSchedulerState::Paused {
158 ref mut pause_guards_count,
159 ..
160 } => {
161 *pause_guards_count = pause_guards_count
162 .checked_add(1)
163 .ok_or_else(|| Error::msg("Too many pause guards active"))?
164 }
165 }
166 context.update_connectivities(&inner);
167 }
168
169 let (tx, rx) = oneshot::channel();
170 let context = context.clone();
171 tokio::spawn(async move {
172 rx.await.ok();
173 let mut inner = context.scheduler.inner.write().await;
174 match *inner {
175 InnerSchedulerState::Started(_) => {
176 warn!(&context, "IoPausedGuard resume: started instead of paused");
177 }
178 InnerSchedulerState::Stopped => {
179 warn!(&context, "IoPausedGuard resume: stopped instead of paused");
180 }
181 InnerSchedulerState::Paused {
182 ref started,
183 ref mut pause_guards_count,
184 } => {
185 if *pause_guards_count == NonZeroUsize::MIN {
186 match *started {
187 true => SchedulerState::do_start(&mut inner, &context).await,
188 false => *inner = InnerSchedulerState::Stopped,
189 }
190 } else {
191 let new_count = pause_guards_count.get() - 1;
192 *pause_guards_count = NonZeroUsize::new(new_count).unwrap();
194 }
195 }
196 }
197 context.update_connectivities(&inner);
198 });
199 Ok(IoPausedGuard { sender: Some(tx) })
200 }
201
202 pub(crate) async fn restart(&self, context: &Context) {
204 info!(context, "restarting IO");
205 if self.is_running().await {
206 self.stop(context).await;
207 self.start(context).await;
208 }
209 }
210
211 pub(crate) async fn maybe_network(&self) {
213 let inner = self.inner.read().await;
214 let inboxes = match *inner {
215 InnerSchedulerState::Started(ref scheduler) => {
216 scheduler.maybe_network();
217 scheduler
218 .inboxes
219 .iter()
220 .map(|b| b.conn_state.state.connectivity.clone())
221 .collect::<Vec<_>>()
222 }
223 _ => return,
224 };
225 drop(inner);
226 connectivity::idle_interrupted(inboxes);
227 }
228
229 pub(crate) async fn maybe_network_lost(&self, context: &Context) {
231 let inner = self.inner.read().await;
232 let stores = match *inner {
233 InnerSchedulerState::Started(ref scheduler) => {
234 scheduler.maybe_network_lost();
235 scheduler
236 .boxes()
237 .map(|b| b.conn_state.state.connectivity.clone())
238 .collect()
239 }
240 _ => return,
241 };
242 drop(inner);
243 connectivity::maybe_network_lost(context, stores);
244 }
245
246 pub(crate) async fn interrupt_inbox(&self) {
247 let inner = self.inner.read().await;
248 if let InnerSchedulerState::Started(ref scheduler) = *inner {
249 scheduler.interrupt_inbox();
250 }
251 }
252
253 pub(crate) async fn clear_all_relay_storage(&self) -> Result<()> {
254 let inner = self.inner.read().await;
255 if let InnerSchedulerState::Started(ref scheduler) = *inner {
256 scheduler.clear_all_relay_storage();
257 Ok(())
258 } else {
259 bail!("IO is not started");
260 }
261 }
262
263 pub(crate) async fn interrupt_smtp(&self) {
264 let inner = self.inner.read().await;
265 if let InnerSchedulerState::Started(ref scheduler) = *inner {
266 scheduler.interrupt_smtp();
267 }
268 }
269
270 pub(crate) async fn interrupt_ephemeral_task(&self) {
271 let inner = self.inner.read().await;
272 if let InnerSchedulerState::Started(ref scheduler) = *inner {
273 scheduler.interrupt_ephemeral_task();
274 }
275 }
276
277 pub(crate) async fn interrupt_location(&self) {
278 let inner = self.inner.read().await;
279 if let InnerSchedulerState::Started(ref scheduler) = *inner {
280 scheduler.interrupt_location();
281 }
282 }
283
284 pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
285 let inner = self.inner.read().await;
286 if let InnerSchedulerState::Started(ref scheduler) = *inner {
287 scheduler.interrupt_recently_seen(contact_id, timestamp);
288 }
289 }
290}
291
292#[derive(Debug, Default)]
293pub(crate) enum InnerSchedulerState {
294 Started(Scheduler),
295 #[default]
296 Stopped,
297 Paused {
298 started: bool,
299 pause_guards_count: NonZeroUsize,
300 },
301}
302
303#[derive(Default, Debug)]
308pub(crate) struct IoPausedGuard {
309 sender: Option<oneshot::Sender<()>>,
310}
311
312impl Drop for IoPausedGuard {
313 fn drop(&mut self) {
314 if let Some(sender) = self.sender.take() {
315 sender.send(()).ok();
317 }
318 }
319}
320
321#[derive(Debug)]
322struct SchedBox {
323 addr: String,
325
326 folder: String,
328
329 conn_state: ImapConnectionState,
330
331 handle: task::JoinHandle<()>,
333}
334
335#[derive(Debug)]
337pub(crate) struct Scheduler {
338 inboxes: Vec<SchedBox>,
340 smtp: SmtpConnectionState,
341 smtp_handle: task::JoinHandle<()>,
342 ephemeral_handle: task::JoinHandle<()>,
343 ephemeral_interrupt_send: Sender<()>,
344 location_handle: task::JoinHandle<()>,
345 location_interrupt_send: Sender<()>,
346
347 recently_seen_loop: RecentlySeenLoop,
348}
349
350async fn inbox_loop(
351 ctx: Context,
352 started: oneshot::Sender<()>,
353 inbox_handlers: ImapConnectionHandlers,
354) {
355 use futures::future::FutureExt;
356
357 info!(ctx, "Starting inbox loop.");
358 let ImapConnectionHandlers {
359 mut connection,
360 stop_token,
361 clear_storage_request_receiver,
362 } = inbox_handlers;
363
364 let transport_id = connection.transport_id();
365 let ctx1 = ctx.clone();
366 let fut = async move {
367 let ctx = ctx1;
368 if let Err(()) = started.send(()) {
369 warn!(ctx, "Inbox loop, missing started receiver.");
370 return;
371 };
372
373 let mut old_session: Option<Session> = None;
374 loop {
375 let session = if let Some(session) = old_session.take() {
376 session
377 } else {
378 info!(
379 ctx,
380 "Transport {transport_id}: Preparing new IMAP session for inbox."
381 );
382 match connection.prepare(&ctx).await {
383 Err(err) => {
384 warn!(
385 ctx,
386 "Transport {transport_id}: Failed to prepare inbox connection: {err:#}."
387 );
388 continue;
389 }
390 Ok(session) => {
391 info!(
392 ctx,
393 "Transport {transport_id}: Prepared new IMAP session for inbox."
394 );
395 session
396 }
397 }
398 };
399
400 match inbox_fetch_idle(
401 &ctx,
402 &mut connection,
403 session,
404 &clear_storage_request_receiver,
405 )
406 .await
407 {
408 Err(err) => warn!(
409 ctx,
410 "Transport {transport_id}: Failed inbox fetch_idle: {err:#}."
411 ),
412 Ok(session) => {
413 old_session = Some(session);
414 }
415 }
416 }
417 };
418
419 stop_token
420 .cancelled()
421 .map(|_| {
422 info!(ctx, "Transport {transport_id}: Shutting down inbox loop.");
423 })
424 .race(fut)
425 .await;
426}
427
428async fn inbox_fetch_idle(
429 ctx: &Context,
430 imap: &mut Imap,
431 mut session: Session,
432 clear_storage_request_receiver: &Receiver<()>,
433) -> Result<Session> {
434 let transport_id = session.transport_id();
435
436 let should_clear_imap_storage =
441 clear_storage_request_receiver.try_recv().is_ok() && session.is_chatmail();
442 if should_clear_imap_storage {
443 info!(ctx, "Transport {transport_id}: Clearing IMAP storage.");
444 session.delete_all_messages(ctx, &imap.folder).await?;
445 }
446
447 if (ctx.quota_needs_update(session.transport_id(), 60).await || should_clear_imap_storage)
451 && let Err(err) = ctx.update_recent_quota(&mut session, &imap.folder).await
452 {
453 warn!(
454 ctx,
455 "Transport {transport_id}: Failed to update quota: {err:#}."
456 );
457 }
458
459 if let Ok(()) = imap.resync_request_receiver.try_recv()
460 && let Err(err) = session.resync_folders(ctx).await
461 {
462 warn!(
463 ctx,
464 "Transport {transport_id}: Failed to resync folders: {err:#}."
465 );
466 imap.resync_request_sender.try_send(()).ok();
467 }
468
469 maybe_add_time_based_warnings(ctx).await;
470
471 match ctx.get_config_i64(Config::LastHousekeeping).await {
472 Ok(last_housekeeping_time) => {
473 let next_housekeeping_time =
474 last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
475 if next_housekeeping_time <= time() {
476 sql::housekeeping(ctx).await.log_err(ctx).ok();
477 }
478 }
479 Err(err) => {
480 warn!(
481 ctx,
482 "Transport {transport_id}: Failed to get last housekeeping time: {err:#}"
483 );
484 }
485 };
486
487 maybe_send_stats(ctx).await.log_err(ctx).ok();
488
489 session
490 .update_metadata(ctx)
491 .await
492 .context("update_metadata")?;
493 session
494 .register_token(ctx)
495 .await
496 .context("Failed to register push token")?;
497
498 let session = fetch_idle(ctx, imap, session).await?;
499 Ok(session)
500}
501
502async fn fetch_idle(ctx: &Context, connection: &mut Imap, mut session: Session) -> Result<Session> {
508 let transport_id = session.transport_id();
509
510 let watch_folder = connection.folder.clone();
511
512 session
513 .store_seen_flags_on_imap(ctx)
514 .await
515 .context("store_seen_flags_on_imap")?;
516
517 connection
519 .fetch_move_delete(ctx, &mut session, &watch_folder)
520 .await
521 .context("fetch_move_delete")?;
522
523 delete_expired_imap_messages(ctx)
528 .await
529 .context("delete_expired_imap_messages")?;
530
531 download_known_post_messages_without_pre_message(ctx, &mut session).await?;
532 download_msgs(ctx, &mut session)
533 .await
534 .context("download_msgs")?;
535
536 session
538 .sync_seen_flags(ctx, &watch_folder)
539 .await
540 .context("sync_seen_flags")
541 .log_err(ctx)
542 .ok();
543
544 connection.connectivity.set_idle(ctx);
545
546 ctx.emit_event(EventType::ImapInboxIdle);
547
548 if !session.can_idle() {
549 info!(
550 ctx,
551 "Transport {transport_id}: IMAP session does not support IDLE, going to fake idle."
552 );
553 connection.fake_idle(ctx, &watch_folder).await?;
554 return Ok(session);
555 }
556
557 if ctx
558 .get_config_bool(Config::DisableIdle)
559 .await
560 .context("Failed to get disable_idle config")
561 .log_err(ctx)
562 .unwrap_or_default()
563 {
564 info!(
565 ctx,
566 "Transport {transport_id}: IMAP IDLE is disabled, going to fake idle."
567 );
568 connection.fake_idle(ctx, &watch_folder).await?;
569 return Ok(session);
570 }
571
572 let session = session
573 .idle(
574 ctx,
575 connection.idle_interrupt_receiver.clone(),
576 &watch_folder,
577 )
578 .await
579 .context("idle")?;
580
581 Ok(session)
582}
583
584async fn smtp_loop(
585 ctx: Context,
586 started: oneshot::Sender<()>,
587 smtp_handlers: SmtpConnectionHandlers,
588) {
589 use futures::future::FutureExt;
590
591 info!(ctx, "Starting SMTP loop.");
592 let SmtpConnectionHandlers {
593 mut connection,
594 stop_token,
595 idle_interrupt_receiver,
596 } = smtp_handlers;
597
598 let ctx1 = ctx.clone();
599 let fut = async move {
600 let ctx = ctx1;
601 if let Err(()) = started.send(()) {
602 warn!(&ctx, "SMTP loop, missing started receiver.");
603 return;
604 }
605
606 let mut timeout = None;
607 loop {
608 if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
609 warn!(ctx, "send_smtp_messages failed: {:#}.", err);
610 timeout = Some(timeout.unwrap_or(30));
611 } else {
612 timeout = None;
613 let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
614 if !duration_until_can_send.is_zero() {
615 info!(
616 ctx,
617 "smtp got rate limited, waiting for {} until can send again",
618 duration_to_str(duration_until_can_send)
619 );
620 tokio::time::sleep(duration_until_can_send).await;
621 continue;
622 }
623 }
624
625 stats::maybe_update_message_stats(&ctx)
626 .await
627 .log_err(&ctx)
628 .ok();
629
630 info!(ctx, "SMTP fake idle started.");
632 match &connection.last_send_error {
633 None => connection.connectivity.set_idle(&ctx),
634 Some(err) => connection.connectivity.set_err(&ctx, err),
635 }
636
637 if let Some(t) = timeout {
642 let now = tools::Time::now();
643 info!(
644 ctx,
645 "SMTP has messages to retry, planning to retry {t} seconds later."
646 );
647 let duration = std::time::Duration::from_secs(t);
648 tokio::time::timeout(duration, async {
649 idle_interrupt_receiver.recv().await.unwrap_or_default()
650 })
651 .await
652 .unwrap_or_default();
653 let slept = time_elapsed(&now).as_secs();
654 timeout = Some(cmp::max(
655 t,
656 slept.saturating_add(rand::random_range((slept / 2)..=slept)),
657 ));
658 } else {
659 info!(ctx, "SMTP has no messages to retry, waiting for interrupt.");
660 idle_interrupt_receiver.recv().await.unwrap_or_default();
661 };
662
663 info!(ctx, "SMTP fake idle interrupted.")
664 }
665 };
666
667 stop_token
668 .cancelled()
669 .map(|_| {
670 info!(ctx, "Shutting down SMTP loop.");
671 })
672 .race(fut)
673 .await;
674}
675
676impl Scheduler {
677 pub async fn start(ctx: &Context) -> Result<Self> {
679 let (smtp, smtp_handlers) = SmtpConnectionState::new();
680
681 let (smtp_start_send, smtp_start_recv) = oneshot::channel();
682 let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
683 let (location_interrupt_send, location_interrupt_recv) = channel::bounded(1);
684
685 let mut inboxes = Vec::new();
686 let mut start_recvs = Vec::new();
687
688 for (transport_id, configured_login_param) in ConfiguredLoginParam::load_all(ctx).await? {
689 let (conn_state, inbox_handlers) =
690 ImapConnectionState::new(ctx, transport_id, configured_login_param.clone()).await?;
691 let (inbox_start_send, inbox_start_recv) = oneshot::channel();
692 let handle = {
693 let ctx = ctx.clone();
694 task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
695 };
696 let addr = configured_login_param.addr.clone();
697 let folder = configured_login_param
698 .imap_folder
699 .unwrap_or_else(|| "INBOX".to_string());
700 let inbox = SchedBox {
701 addr: addr.clone(),
702 folder,
703 conn_state,
704 handle,
705 };
706 inboxes.push(inbox);
707 start_recvs.push(inbox_start_recv);
708 }
709
710 let smtp_handle = {
711 let ctx = ctx.clone();
712 task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
713 };
714 start_recvs.push(smtp_start_recv);
715
716 let ephemeral_handle = {
717 let ctx = ctx.clone();
718 task::spawn(async move {
719 ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
720 })
721 };
722
723 let location_handle = {
724 let ctx = ctx.clone();
725 task::spawn(async move {
726 location::location_loop(&ctx, location_interrupt_recv).await;
727 })
728 };
729
730 let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
731
732 let res = Self {
733 inboxes,
734 smtp,
735 smtp_handle,
736 ephemeral_handle,
737 ephemeral_interrupt_send,
738 location_handle,
739 location_interrupt_send,
740 recently_seen_loop,
741 };
742
743 if let Err(err) = try_join_all(start_recvs).await {
745 bail!("failed to start scheduler: {err}");
746 }
747
748 info!(ctx, "scheduler is running");
749 Ok(res)
750 }
751
752 fn boxes(&self) -> impl Iterator<Item = &SchedBox> {
753 self.inboxes.iter()
754 }
755
756 fn maybe_network(&self) {
757 for b in self.boxes() {
758 b.conn_state.interrupt();
759 }
760 self.interrupt_smtp();
761 }
762
763 fn maybe_network_lost(&self) {
764 for b in self.boxes() {
765 b.conn_state.interrupt();
766 }
767 self.interrupt_smtp();
768 }
769
770 fn interrupt_inbox(&self) {
771 for b in &self.inboxes {
772 b.conn_state.interrupt();
773 }
774 }
775
776 fn clear_all_relay_storage(&self) {
777 for b in &self.inboxes {
778 b.conn_state.clear_relay_storage();
779 }
780 }
781
782 fn interrupt_smtp(&self) {
783 self.smtp.interrupt();
784 }
785
786 fn interrupt_ephemeral_task(&self) {
787 self.ephemeral_interrupt_send.try_send(()).ok();
788 }
789
790 fn interrupt_location(&self) {
791 self.location_interrupt_send.try_send(()).ok();
792 }
793
794 fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
795 self.recently_seen_loop.try_interrupt(contact_id, timestamp);
796 }
797
798 pub(crate) async fn stop(self, context: &Context) {
803 for b in self.boxes() {
805 b.conn_state.stop();
806 }
807 self.smtp.stop();
808
809 let timeout_duration = std::time::Duration::from_secs(30);
811
812 let tracker = TaskTracker::new();
813 for b in self.inboxes {
814 let context = context.clone();
815 tracker.spawn(async move {
816 tokio::time::timeout(timeout_duration, b.handle)
817 .await
818 .log_err(&context)
819 });
820 }
821 {
822 let context = context.clone();
823 tracker.spawn(async move {
824 tokio::time::timeout(timeout_duration, self.smtp_handle)
825 .await
826 .log_err(&context)
827 });
828 }
829 tracker.close();
830 tracker.wait().await;
831
832 self.ephemeral_handle.abort();
837 self.ephemeral_handle.await.ok();
838 self.location_handle.abort();
839 self.location_handle.await.ok();
840 self.recently_seen_loop.abort().await;
841 }
842}
843
844#[derive(Debug)]
846struct ConnectionState {
847 stop_token: CancellationToken,
849 idle_interrupt_sender: Sender<()>,
851 connectivity: ConnectivityStore,
853}
854
855impl ConnectionState {
856 fn stop(&self) {
858 self.stop_token.cancel();
860 }
861
862 fn interrupt(&self) {
863 self.idle_interrupt_sender.try_send(()).ok();
865 }
866}
867
868#[derive(Debug)]
869pub(crate) struct SmtpConnectionState {
870 state: ConnectionState,
871}
872
873impl SmtpConnectionState {
874 fn new() -> (Self, SmtpConnectionHandlers) {
875 let stop_token = CancellationToken::new();
876 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
877
878 let handlers = SmtpConnectionHandlers {
879 connection: Smtp::new(),
880 stop_token: stop_token.clone(),
881 idle_interrupt_receiver,
882 };
883
884 let state = ConnectionState {
885 stop_token,
886 idle_interrupt_sender,
887 connectivity: handlers.connection.connectivity.clone(),
888 };
889
890 let conn = SmtpConnectionState { state };
891
892 (conn, handlers)
893 }
894
895 fn interrupt(&self) {
897 self.state.interrupt();
898 }
899
900 fn stop(&self) {
902 self.state.stop();
903 }
904}
905
906struct SmtpConnectionHandlers {
907 connection: Smtp,
908 stop_token: CancellationToken,
909 idle_interrupt_receiver: Receiver<()>,
910}
911
912#[derive(Debug)]
913pub(crate) struct ImapConnectionState {
914 state: ConnectionState,
915
916 clear_storage_request_sender: Sender<()>,
922}
923
924impl ImapConnectionState {
925 async fn new(
927 context: &Context,
928 transport_id: u32,
929 login_param: ConfiguredLoginParam,
930 ) -> Result<(Self, ImapConnectionHandlers)> {
931 let stop_token = CancellationToken::new();
932 let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
933 let (clear_storage_request_sender, clear_storage_request_receiver) = channel::bounded(1);
934
935 let handlers = ImapConnectionHandlers {
936 connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
937 .await?,
938 stop_token: stop_token.clone(),
939 clear_storage_request_receiver,
940 };
941
942 let state = ConnectionState {
943 stop_token,
944 idle_interrupt_sender,
945 connectivity: handlers.connection.connectivity.clone(),
946 };
947
948 let conn = ImapConnectionState {
949 state,
950 clear_storage_request_sender,
951 };
952
953 Ok((conn, handlers))
954 }
955
956 fn interrupt(&self) {
958 self.state.interrupt();
959 }
960
961 fn stop(&self) {
963 self.state.stop();
964 }
965
966 fn clear_relay_storage(&self) {
968 self.clear_storage_request_sender.try_send(()).ok();
969 self.state.interrupt();
970 }
971}
972
973#[derive(Debug)]
974struct ImapConnectionHandlers {
975 connection: Imap,
976 stop_token: CancellationToken,
977
978 pub(crate) clear_storage_request_receiver: Receiver<()>,
980}