nautilus_core/
time.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `AtomicTime` for real-time and static clocks.
17//!
18//! This module provides an atomic time abstraction that supports both real-time and static
19//! clocks. It ensures thread-safe operations and monotonic time retrieval with nanosecond precision.
20//!
21//! # Modes
22//!
23//! - **Real-time mode:** The clock continuously syncs with system wall-clock time (via
24//!   [`SystemTime::now()`]). To ensure strict monotonic increments across multiple threads,
25//!   the internal updates use an atomic compare-and-exchange loop (`time_since_epoch`).
26//!   While this guarantees that every new timestamp is at least one nanosecond greater than the
27//!   last, it may introduce higher contention if many threads call it heavily.
28//!
29//! - **Static mode:** The clock is manually controlled via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
30//!   which can be useful for simulations or backtesting. You can switch modes at runtime using
31//!   [`AtomicTime::make_realtime`] or [`AtomicTime::make_static`]. In **static mode**, we use
32//!   acquire/release semantics so that updates from one thread can be observed by another;
33//!   however, we do not enforce strict global ordering for manual updates. If you need strong,
34//!   multi-threaded ordering in **static mode**, you must coordinate higher-level synchronization yourself.
35
36use std::{
37    ops::Deref,
38    sync::{
39        OnceLock,
40        atomic::{AtomicBool, AtomicU64, Ordering},
41    },
42    time::{Duration, SystemTime, UNIX_EPOCH},
43};
44
45use crate::{
46    UnixNanos,
47    datetime::{NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
48};
49
50/// Global atomic time in **real-time mode** for use across the system.
51///
52/// This clock operates in **real-time mode**, synchronizing with the system clock.
53/// It provides globally unique, strictly increasing timestamps across threads.
54pub static ATOMIC_CLOCK_REALTIME: OnceLock<AtomicTime> = OnceLock::new();
55
56/// Global atomic time in **static mode** for use across the system.
57///
58/// This clock operates in **static mode**, where the time value can be set or incremented
59/// manually. Useful for backtesting or simulated time control.
60pub static ATOMIC_CLOCK_STATIC: OnceLock<AtomicTime> = OnceLock::new();
61
62/// Returns a static reference to the global atomic clock in **real-time mode**.
63///
64/// This clock uses [`AtomicTime::time_since_epoch`] under the hood, ensuring strictly increasing
65/// timestamps across threads.
66pub fn get_atomic_clock_realtime() -> &'static AtomicTime {
67    ATOMIC_CLOCK_REALTIME.get_or_init(AtomicTime::default)
68}
69
70/// Returns a static reference to the global atomic clock in **static mode**.
71///
72/// This clock allows manual time control via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
73/// and does not automatically sync with system time.
74pub fn get_atomic_clock_static() -> &'static AtomicTime {
75    ATOMIC_CLOCK_STATIC.get_or_init(|| AtomicTime::new(false, UnixNanos::default()))
76}
77
78/// Returns the duration since the UNIX epoch based on [`SystemTime::now()`].
79///
80/// # Panics
81///
82/// Panics if the system time is set before the UNIX epoch.
83#[inline(always)]
84#[must_use]
85pub fn duration_since_unix_epoch() -> Duration {
86    SystemTime::now()
87        .duration_since(UNIX_EPOCH)
88        .expect("Error calling `SystemTime`")
89}
90
91/// Returns the current UNIX time in nanoseconds, based on [`SystemTime::now()`].
92///
93/// # Panics
94///
95/// Panics if the duration in nanoseconds exceeds `u64::MAX`.
96#[inline(always)]
97#[must_use]
98pub fn nanos_since_unix_epoch() -> u64 {
99    let ns = duration_since_unix_epoch().as_nanos();
100    assert!(
101        (ns <= u128::from(u64::MAX)),
102        "System time overflow: value exceeds u64::MAX nanoseconds"
103    );
104    ns as u64
105}
106
107/// Represents an atomic timekeeping structure.
108///
109/// [`AtomicTime`] can act as a real-time clock or static clock based on its mode.
110/// It uses an [`AtomicU64`] to atomically update the value using only immutable
111/// references.
112///
113/// The `realtime` flag indicates which mode the clock is currently in.
114/// For concurrency, this struct uses atomic operations with appropriate memory orderings:
115/// - **Acquire/Release** for reading/writing in **static mode**,
116/// - **Compare-and-exchange (`AcqRel`)** in real-time mode to guarantee monotonic increments.
117#[repr(C)]
118#[derive(Debug)]
119pub struct AtomicTime {
120    /// Indicates whether the clock is operating in **real-time mode** (`true`) or **static mode** (`false`)
121    pub realtime: AtomicBool,
122    /// The last recorded time (in UNIX nanoseconds). Updated atomically with compare-and-exchange
123    /// in **real-time mode**, or simple store/fetch in **static mode**.
124    pub timestamp_ns: AtomicU64,
125}
126
127impl Deref for AtomicTime {
128    type Target = AtomicU64;
129
130    fn deref(&self) -> &Self::Target {
131        &self.timestamp_ns
132    }
133}
134
135impl Default for AtomicTime {
136    /// Creates a new default [`AtomicTime`] instance in **real-time mode**, starting at the current system time.
137    fn default() -> Self {
138        Self::new(true, UnixNanos::default())
139    }
140}
141
142impl AtomicTime {
143    /// Creates a new [`AtomicTime`] instance.
144    ///
145    /// - If `realtime` is `true`, the provided `time` is used only as an initial placeholder
146    ///   and will quickly be overridden by calls to [`AtomicTime::time_since_epoch`].
147    /// - If `realtime` is `false`, this clock starts in **static mode**, with the given `time`
148    ///   as its current value.
149    #[must_use]
150    pub fn new(realtime: bool, time: UnixNanos) -> Self {
151        Self {
152            realtime: AtomicBool::new(realtime),
153            timestamp_ns: AtomicU64::new(time.into()),
154        }
155    }
156
157    /// Returns the current time in nanoseconds, based on the clock’s mode.
158    ///
159    /// - In **real-time mode**, calls [`AtomicTime::time_since_epoch`], ensuring strictly increasing
160    ///   timestamps across threads, using `AcqRel` semantics for the underlying atomic.
161    /// - In **static mode**, reads the stored time using [`Ordering::Acquire`]. Updates by other
162    ///   threads using [`AtomicTime::set_time`] or [`AtomicTime::increment_time`] (Release/AcqRel)
163    ///   will be visible here.
164    #[must_use]
165    pub fn get_time_ns(&self) -> UnixNanos {
166        if self.realtime.load(Ordering::Acquire) {
167            self.time_since_epoch()
168        } else {
169            UnixNanos::from(self.timestamp_ns.load(Ordering::Acquire))
170        }
171    }
172
173    /// Returns the current time as microseconds.
174    #[must_use]
175    pub fn get_time_us(&self) -> u64 {
176        self.get_time_ns().as_u64() / NANOSECONDS_IN_MICROSECOND
177    }
178
179    /// Returns the current time as milliseconds.
180    #[must_use]
181    pub fn get_time_ms(&self) -> u64 {
182        self.get_time_ns().as_u64() / NANOSECONDS_IN_MILLISECOND
183    }
184
185    /// Returns the current time as seconds.
186    #[must_use]
187    #[allow(clippy::cast_precision_loss)]
188    pub fn get_time(&self) -> f64 {
189        self.get_time_ns().as_f64() / (NANOSECONDS_IN_SECOND as f64)
190    }
191
192    /// Manually sets a new time for the clock (only meaningful in **static mode**).
193    ///
194    /// This uses an atomic store with [`Ordering::Release`], so any thread reading with
195    /// [`Ordering::Acquire`] will see the updated time. This does *not* enforce a total ordering
196    /// among all threads, but is enough to ensure that once a thread sees this update, it also
197    /// sees all writes made before this call in the writing thread.
198    ///
199    /// Typically used in single-threaded scenarios or coordinated concurrency in **static mode**,
200    /// since there’s no global ordering across threads.
201    ///
202    /// # Panics
203    ///
204    /// Panics if invoked when in real-time mode.
205    pub fn set_time(&self, time: UnixNanos) {
206        assert!(
207            !self.realtime.load(Ordering::Acquire),
208            "Cannot set time while clock is in realtime mode"
209        );
210
211        self.store(time.into(), Ordering::Release);
212    }
213
214    /// Increments the current (static-mode) time by `delta` nanoseconds and returns the updated value.
215    ///
216    /// Internally this uses `fetch_add` with [`Ordering::AcqRel`] to ensure the increment is
217    /// atomic and visible to readers using `Acquire` loads.
218    ///
219    /// # Panics
220    ///
221    /// Panics if called while the clock is in real-time mode.
222    pub fn increment_time(&self, delta: u64) -> UnixNanos {
223        assert!(
224            !self.realtime.load(Ordering::Acquire),
225            "Cannot increment time while clock is in realtime mode"
226        );
227
228        let prev = self.fetch_add(delta, Ordering::AcqRel);
229        UnixNanos::from(prev + delta)
230    }
231
232    /// Retrieves and updates the current “real-time” clock, returning a strictly increasing
233    /// timestamp based on system time.
234    ///
235    /// Internally:
236    /// - We fetch `now` from [`SystemTime::now()`].
237    /// - We do an atomic compare-and-exchange (using [`Ordering::AcqRel`]) to ensure the stored
238    ///   timestamp is never less than the last timestamp.
239    ///
240    /// This ensures:
241    /// 1. **Monotonic increments**: The returned timestamp is strictly greater than the previous
242    ///    one (by at least 1 nanosecond).
243    /// 2. **No backward jumps**: If the OS time moves backward, we ignore that shift to preserve
244    ///    monotonicity.
245    /// 3. **Visibility**: In a multi-threaded environment, other threads see the updated value
246    ///    once this compare-and-exchange completes.
247    ///
248    /// Note that under heavy contention (many threads calling this in tight loops), the CAS loop
249    /// may increase latency. If you need extremely high-frequency, concurrent updates, consider
250    /// using a more specialized approach or relaxing some ordering requirements.
251    ///
252    /// # Panics
253    ///
254    /// Panics if the internal counter has reached `u64::MAX`, which would indicate the process has
255    /// been running for longer than the representable range (~584 years) *or* the clock was
256    /// manually corrupted.
257    pub fn time_since_epoch(&self) -> UnixNanos {
258        // This method guarantees strict consistency but may incur a performance cost under
259        // high contention due to retries in the `compare_exchange` loop.
260        let now = nanos_since_unix_epoch();
261        loop {
262            // Acquire to observe the latest stored value
263            let last = self.load(Ordering::Acquire);
264            // Ensure we never wrap past u64::MAX – treat that as a fatal error
265            let incremented = last
266                .checked_add(1)
267                .expect("AtomicTime overflow: reached u64::MAX");
268            let next = now.max(incremented);
269            // AcqRel on success ensures this new value is published,
270            // Acquire on failure reloads if we lost a CAS race.
271            if self
272                .compare_exchange(last, next, Ordering::AcqRel, Ordering::Acquire)
273                .is_ok()
274            {
275                return UnixNanos::from(next);
276            }
277        }
278    }
279
280    /// Switches the clock to **real-time mode** (`realtime = true`).
281    ///
282    /// Uses [`Ordering::SeqCst`] for the mode store, which ensures a global ordering for the
283    /// mode switch if other threads also do `SeqCst` loads/stores of `realtime`.
284    /// Typically, switching modes is done infrequently, so the performance impact of `SeqCst`
285    /// here is acceptable.
286    pub fn make_realtime(&self) {
287        self.realtime.store(true, Ordering::SeqCst);
288    }
289
290    /// Switches the clock to **static mode** (`realtime = false`).
291    ///
292    /// Uses [`Ordering::SeqCst`] for the mode store, which ensures a global ordering for the
293    /// mode switch if other threads also do `SeqCst` loads/stores of `realtime`.
294    pub fn make_static(&self) {
295        self.realtime.store(false, Ordering::SeqCst);
296    }
297}
298
299////////////////////////////////////////////////////////////////////////////////
300// Tests
301////////////////////////////////////////////////////////////////////////////////
302#[cfg(test)]
303mod tests {
304    use std::sync::Arc;
305
306    use rstest::*;
307
308    use super::*;
309
310    #[rstest]
311    fn test_global_clocks_initialization() {
312        let realtime_clock = get_atomic_clock_realtime();
313        assert!(realtime_clock.get_time_ns().as_u64() > 0);
314
315        let static_clock = get_atomic_clock_static();
316        static_clock.set_time(UnixNanos::from(500_000_000)); // 500 ms
317        assert_eq!(static_clock.get_time_ns().as_u64(), 500_000_000);
318    }
319
320    #[rstest]
321    fn test_mode_switching() {
322        let time = AtomicTime::new(true, UnixNanos::default());
323
324        // Verify real-time mode
325        let realtime_ns = time.get_time_ns();
326        assert!(realtime_ns.as_u64() > 0);
327
328        // Switch to static mode
329        time.make_static();
330        time.set_time(UnixNanos::from(1_000_000_000)); // 1 second
331        let static_ns = time.get_time_ns();
332        assert_eq!(static_ns.as_u64(), 1_000_000_000);
333
334        // Switch back to real-time mode
335        time.make_realtime();
336        let new_realtime_ns = time.get_time_ns();
337        assert!(new_realtime_ns.as_u64() > static_ns.as_u64());
338    }
339
340    #[rstest]
341    #[should_panic(expected = "Cannot set time while clock is in realtime mode")]
342    fn test_set_time_panics_in_realtime_mode() {
343        let clock = AtomicTime::new(true, UnixNanos::default());
344        clock.set_time(UnixNanos::from(123));
345    }
346
347    #[rstest]
348    #[should_panic(expected = "Cannot increment time while clock is in realtime mode")]
349    fn test_increment_time_panics_in_realtime_mode() {
350        let clock = AtomicTime::new(true, UnixNanos::default());
351        let _ = clock.increment_time(1);
352    }
353
354    #[rstest]
355    #[should_panic(expected = "AtomicTime overflow")]
356    fn test_time_since_epoch_overflow_panics() {
357        use std::sync::atomic::{AtomicBool, AtomicU64};
358
359        // Manually construct a clock with the counter already at u64::MAX
360        let clock = AtomicTime {
361            realtime: AtomicBool::new(true),
362            timestamp_ns: AtomicU64::new(u64::MAX),
363        };
364
365        // This call will attempt to add 1 and must panic
366        let _ = clock.time_since_epoch();
367    }
368
369    #[rstest]
370    fn test_mode_switching_concurrent() {
371        let clock = Arc::new(AtomicTime::new(true, UnixNanos::default()));
372        let num_threads = 4;
373        let iterations = 10000;
374        let mut handles = Vec::with_capacity(num_threads);
375
376        for _ in 0..num_threads {
377            let clock_clone = Arc::clone(&clock);
378            let handle = std::thread::spawn(move || {
379                for i in 0..iterations {
380                    if i % 2 == 0 {
381                        clock_clone.make_static();
382                    } else {
383                        clock_clone.make_realtime();
384                    }
385                    // Retrieve the time; we’re not asserting a particular value here,
386                    // but at least we’re exercising the mode switch logic under concurrency.
387                    let _ = clock_clone.get_time_ns();
388                }
389            });
390            handles.push(handle);
391        }
392
393        for handle in handles {
394            handle.join().unwrap();
395        }
396    }
397
398    #[rstest]
399    fn test_static_time_is_stable() {
400        // Create a clock in static mode with an initial value
401        let clock = AtomicTime::new(false, UnixNanos::from(42));
402        let time1 = clock.get_time_ns();
403
404        // Sleep a bit to give the system time to change, if the clock were using real-time
405        std::thread::sleep(std::time::Duration::from_millis(10));
406        let time2 = clock.get_time_ns();
407
408        // In static mode, the value should remain unchanged
409        assert_eq!(time1, time2);
410    }
411
412    #[rstest]
413    fn test_increment_time() {
414        // Start in static mode
415        let time = AtomicTime::new(false, UnixNanos::from(0));
416
417        let updated_time = time.increment_time(500);
418        assert_eq!(updated_time.as_u64(), 500);
419
420        let updated_time = time.increment_time(1_000);
421        assert_eq!(updated_time.as_u64(), 1_500);
422    }
423
424    #[rstest]
425    #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
426    fn test_nanos_since_unix_epoch_vs_system_time() {
427        let unix_nanos = nanos_since_unix_epoch();
428        let system_ns = duration_since_unix_epoch().as_nanos() as u64;
429        assert!((unix_nanos as i64 - system_ns as i64).abs() < NANOSECONDS_IN_SECOND as i64);
430    }
431
432    #[rstest]
433    fn test_time_since_epoch_monotonicity() {
434        let clock = get_atomic_clock_realtime();
435        let mut previous = clock.time_since_epoch();
436        for _ in 0..1_000_000 {
437            let current = clock.time_since_epoch();
438            assert!(current > previous);
439            previous = current;
440        }
441    }
442
443    #[rstest]
444    fn test_time_since_epoch_strictly_increasing_concurrent() {
445        let time = Arc::new(AtomicTime::new(true, UnixNanos::default()));
446        let num_threads = 4;
447        let iterations = 100_000;
448        let mut handles = Vec::with_capacity(num_threads);
449
450        for thread_id in 0..num_threads {
451            let time_clone = Arc::clone(&time);
452
453            let handle = std::thread::spawn(move || {
454                let mut previous = time_clone.time_since_epoch().as_u64();
455
456                for i in 0..iterations {
457                    let current = time_clone.time_since_epoch().as_u64();
458                    assert!(
459                        current > previous,
460                        "Thread {thread_id}: iteration {i}: time did not increase: previous={previous}, current={current}",
461                    );
462                    previous = current;
463                }
464            });
465
466            handles.push(handle);
467        }
468
469        for handle in handles {
470            handle.join().unwrap();
471        }
472    }
473
474    #[rstest]
475    fn test_duration_since_unix_epoch() {
476        let time = AtomicTime::new(true, UnixNanos::default());
477        let duration = Duration::from_nanos(time.get_time_ns().into());
478        let now = SystemTime::now();
479
480        // Check if the duration is close to the actual difference between now and UNIX_EPOCH
481        let delta = now
482            .duration_since(UNIX_EPOCH)
483            .unwrap()
484            .checked_sub(duration);
485        assert!(delta.unwrap_or_default() < Duration::from_millis(100));
486
487        // Check if the duration is greater than a certain value (assuming the test is run after that point)
488        assert!(duration > Duration::from_secs(1_650_000_000));
489    }
490
491    #[rstest]
492    fn test_unix_timestamp_is_monotonic_increasing() {
493        let time = AtomicTime::new(true, UnixNanos::default());
494        let result1 = time.get_time();
495        let result2 = time.get_time();
496        let result3 = time.get_time();
497        let result4 = time.get_time();
498        let result5 = time.get_time();
499
500        assert!(result2 >= result1);
501        assert!(result3 >= result2);
502        assert!(result4 >= result3);
503        assert!(result5 >= result4);
504        assert!(result1 > 1_650_000_000.0);
505    }
506
507    #[rstest]
508    fn test_unix_timestamp_ms_is_monotonic_increasing() {
509        let time = AtomicTime::new(true, UnixNanos::default());
510        let result1 = time.get_time_ms();
511        let result2 = time.get_time_ms();
512        let result3 = time.get_time_ms();
513        let result4 = time.get_time_ms();
514        let result5 = time.get_time_ms();
515
516        assert!(result2 >= result1);
517        assert!(result3 >= result2);
518        assert!(result4 >= result3);
519        assert!(result5 >= result4);
520        assert!(result1 >= 1_650_000_000_000);
521    }
522
523    #[rstest]
524    fn test_unix_timestamp_us_is_monotonic_increasing() {
525        let time = AtomicTime::new(true, UnixNanos::default());
526        let result1 = time.get_time_us();
527        let result2 = time.get_time_us();
528        let result3 = time.get_time_us();
529        let result4 = time.get_time_us();
530        let result5 = time.get_time_us();
531
532        assert!(result2 >= result1);
533        assert!(result3 >= result2);
534        assert!(result4 >= result3);
535        assert!(result5 >= result4);
536        assert!(result1 > 1_650_000_000_000_000);
537    }
538
539    #[rstest]
540    fn test_unix_timestamp_ns_is_monotonic_increasing() {
541        let time = AtomicTime::new(true, UnixNanos::default());
542        let result1 = time.get_time_ns();
543        let result2 = time.get_time_ns();
544        let result3 = time.get_time_ns();
545        let result4 = time.get_time_ns();
546        let result5 = time.get_time_ns();
547
548        assert!(result2 >= result1);
549        assert!(result3 >= result2);
550        assert!(result4 >= result3);
551        assert!(result5 >= result4);
552        assert!(result1.as_u64() > 1_650_000_000_000_000_000);
553    }
554}