1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
//! # Time smearing.
//!
//! As e-mails typically only use a second-based-resolution for timestamps,
//! the order of two mails sent within one second is unclear.
//! This is bad e.g. when forwarding some messages from a chat -
//! these messages will appear at the recipient easily out of order.
//!
//! We work around this issue by not sending out two mails with the same timestamp.
//! For this purpose, in short, we track the last timestamp used in `last_smeared_timestamp`
//! when another timestamp is needed in the same second, we use `last_smeared_timestamp+1`
//! after some moments without messages sent out,
//! `last_smeared_timestamp` is again in sync with the normal time.
//!
//! However, we do not do all this for the far future,
//! but at max `MAX_SECONDS_TO_LEND_FROM_FUTURE`

use std::cmp::{max, min};
use std::sync::atomic::{AtomicI64, Ordering};

pub(crate) const MAX_SECONDS_TO_LEND_FROM_FUTURE: i64 = 5;

/// Smeared timestamp generator.
#[derive(Debug)]
pub struct SmearedTimestamp {
    /// Next timestamp available for allocation.
    smeared_timestamp: AtomicI64,
}

impl SmearedTimestamp {
    /// Creates a new smeared timestamp generator.
    pub fn new() -> Self {
        Self {
            smeared_timestamp: AtomicI64::new(0),
        }
    }

    /// Allocates `count` unique timestamps.
    ///
    /// Returns the first allocated timestamp.
    pub fn create_n(&self, now: i64, count: i64) -> i64 {
        let mut prev = self.smeared_timestamp.load(Ordering::Relaxed);
        loop {
            // Advance the timestamp if it is in the past,
            // but keep `count - 1` timestamps from the past if possible.
            let t = max(prev, now - count + 1);

            // Rewind the time back if there is no room
            // to allocate `count` timestamps without going too far into the future.
            // Not going too far into the future
            // is more important than generating unique timestamps.
            let first = min(t, now + MAX_SECONDS_TO_LEND_FROM_FUTURE - count + 1);

            // Allocate `count` timestamps by advancing the current timestamp.
            let next = first + count;

            if let Err(x) = self.smeared_timestamp.compare_exchange_weak(
                prev,
                next,
                Ordering::Relaxed,
                Ordering::Relaxed,
            ) {
                prev = x;
            } else {
                return first;
            }
        }
    }

    /// Creates a single timestamp.
    pub fn create(&self, now: i64) -> i64 {
        self.create_n(now, 1)
    }

    /// Returns the current smeared timestamp.
    pub fn current(&self) -> i64 {
        self.smeared_timestamp.load(Ordering::Relaxed)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::test_utils::TestContext;
    use crate::tools::{
        create_smeared_timestamp, create_smeared_timestamps, smeared_time, time, SystemTime,
    };

    #[test]
    fn test_smeared_timestamp() {
        let smeared_timestamp = SmearedTimestamp::new();
        let now = time();

        assert_eq!(smeared_timestamp.current(), 0);

        for i in 0..MAX_SECONDS_TO_LEND_FROM_FUTURE {
            assert_eq!(smeared_timestamp.create(now), now + i);
        }
        assert_eq!(
            smeared_timestamp.create(now),
            now + MAX_SECONDS_TO_LEND_FROM_FUTURE
        );
        assert_eq!(
            smeared_timestamp.create(now),
            now + MAX_SECONDS_TO_LEND_FROM_FUTURE
        );

        // System time rewinds back by 1000 seconds.
        let now = now - 1000;
        assert_eq!(
            smeared_timestamp.create(now),
            now + MAX_SECONDS_TO_LEND_FROM_FUTURE
        );
        assert_eq!(
            smeared_timestamp.create(now),
            now + MAX_SECONDS_TO_LEND_FROM_FUTURE
        );
        assert_eq!(
            smeared_timestamp.create(now + 1),
            now + MAX_SECONDS_TO_LEND_FROM_FUTURE + 1
        );
        assert_eq!(smeared_timestamp.create(now + 100), now + 100);
        assert_eq!(smeared_timestamp.create(now + 100), now + 101);
        assert_eq!(smeared_timestamp.create(now + 100), now + 102);
    }

    #[test]
    fn test_create_n_smeared_timestamps() {
        let smeared_timestamp = SmearedTimestamp::new();
        let now = time();

        // Create a single timestamp to initialize the generator.
        assert_eq!(smeared_timestamp.create(now), now);

        // Wait a minute.
        let now = now + 60;

        // Simulate forwarding 7 messages.
        let forwarded_messages = 7;

        // We have not sent anything for a minute,
        // so we can take the current timestamp and take 6 timestamps from the past.
        assert_eq!(smeared_timestamp.create_n(now, forwarded_messages), now - 6);

        assert_eq!(smeared_timestamp.current(), now + 1);

        // Wait 4 seconds.
        // Now we have 3 free timestamps in the past.
        let now = now + 4;

        assert_eq!(smeared_timestamp.current(), now - 3);

        // Forward another 7 messages.
        // We can only lend 3 timestamps from the past.
        assert_eq!(smeared_timestamp.create_n(now, forwarded_messages), now - 3);

        // We had to borrow 3 timestamps from the future
        // because there were not enough timestamps in the past.
        assert_eq!(smeared_timestamp.current(), now + 4);

        // Forward another 7 messages.
        // We cannot use more than 5 timestamps from the future,
        // so we use 5 timestamps from the future,
        // the current timestamp and one timestamp from the past.
        assert_eq!(smeared_timestamp.create_n(now, forwarded_messages), now - 1);
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_create_smeared_timestamp() {
        let t = TestContext::new().await;
        assert_ne!(create_smeared_timestamp(&t), create_smeared_timestamp(&t));
        assert!(
            create_smeared_timestamp(&t)
                >= SystemTime::now()
                    .duration_since(SystemTime::UNIX_EPOCH)
                    .unwrap()
                    .as_secs() as i64
        );
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_create_smeared_timestamps() {
        let t = TestContext::new().await;
        let count = MAX_SECONDS_TO_LEND_FROM_FUTURE - 1;
        let start = create_smeared_timestamps(&t, count as usize);
        let next = smeared_time(&t);
        assert!((start + count - 1) < next);

        let count = MAX_SECONDS_TO_LEND_FROM_FUTURE + 30;
        let start = create_smeared_timestamps(&t, count as usize);
        let next = smeared_time(&t);
        assert!((start + count - 1) < next);
    }
}