nautilus_common/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and test timers for use with `Clock` implementations.
17
18#[rustfmt::skip]
19#[cfg(feature = "clock_v2")]
20use std::collections::BinaryHeap;
21
22#[rustfmt::skip]
23#[cfg(feature = "clock_v2")]
24use tokio::sync::Mutex;
25
26use std::{
27    cmp::Ordering,
28    fmt::{Debug, Display},
29    num::NonZeroU64,
30    rc::Rc,
31    sync::{
32        Arc,
33        atomic::{self, AtomicU64},
34    },
35};
36
37use nautilus_core::{
38    UUID4, UnixNanos,
39    correctness::{FAILED, check_valid_string},
40    datetime::floor_to_nearest_microsecond,
41    time::get_atomic_clock_realtime,
42};
43#[cfg(feature = "python")]
44use pyo3::{PyObject, Python};
45use tokio::{
46    task::JoinHandle,
47    time::{Duration, Instant},
48};
49use ustr::Ustr;
50
51use crate::runtime::get_runtime;
52
53/// Creates a valid nanoseconds interval that is guaranteed to be positive.
54///
55/// # Panics
56///
57/// Panics if `interval_ns` is zero.
58#[must_use]
59pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
60    NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
61}
62
63#[repr(C)]
64#[derive(Clone, Debug)]
65#[cfg_attr(
66    feature = "python",
67    pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
68)]
69/// Represents a time event occurring at the event timestamp.
70///
71/// A `TimeEvent` carries metadata such as the event's name, a unique event ID,
72/// and timestamps indicating when the event was scheduled to occur and when it was initialized.
73#[derive(Eq)]
74pub struct TimeEvent {
75    /// The event name, identifying the nature or purpose of the event.
76    pub name: Ustr,
77    /// The unique identifier for the event.
78    pub event_id: UUID4,
79    /// UNIX timestamp (nanoseconds) when the event occurred.
80    pub ts_event: UnixNanos,
81    /// UNIX timestamp (nanoseconds) when the instance was initialized.
82    pub ts_init: UnixNanos,
83}
84
85/// Reverse order for `TimeEvent` comparison to be used in max heap.
86impl PartialOrd for TimeEvent {
87    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92/// Reverse order for `TimeEvent` comparison to be used in max heap.
93impl Ord for TimeEvent {
94    fn cmp(&self, other: &Self) -> Ordering {
95        other.ts_event.cmp(&self.ts_event)
96    }
97}
98
99impl TimeEvent {
100    /// Creates a new [`TimeEvent`] instance.
101    ///
102    /// # Safety
103    ///
104    /// Assumes `name` is a valid string.
105    #[must_use]
106    pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
107        Self {
108            name,
109            event_id,
110            ts_event,
111            ts_init,
112        }
113    }
114}
115
116impl Display for TimeEvent {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        write!(
119            f,
120            "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
121            self.name, self.event_id, self.ts_event, self.ts_init
122        )
123    }
124}
125
126impl PartialEq for TimeEvent {
127    fn eq(&self, other: &Self) -> bool {
128        self.event_id == other.event_id
129    }
130}
131
132pub type RustTimeEventCallback = dyn Fn(TimeEvent);
133
134#[derive(Clone)]
135pub enum TimeEventCallback {
136    #[cfg(feature = "python")]
137    Python(Arc<PyObject>),
138    Rust(Rc<RustTimeEventCallback>),
139}
140
141impl Debug for TimeEventCallback {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        match self {
144            #[cfg(feature = "python")]
145            Self::Python(_) => f.write_str("Python callback"),
146            Self::Rust(_) => f.write_str("Rust callback"),
147        }
148    }
149}
150
151impl TimeEventCallback {
152    /// Invokes the callback for the given `TimeEvent`.
153    ///
154    /// # Panics
155    ///
156    /// Panics if the underlying Python callback invocation fails (e.g., raises an exception).
157    pub fn call(&self, event: TimeEvent) {
158        match self {
159            #[cfg(feature = "python")]
160            Self::Python(callback) => {
161                Python::with_gil(|py| {
162                    callback.call1(py, (event,)).unwrap();
163                });
164            }
165            Self::Rust(callback) => callback(event),
166        }
167    }
168}
169
170impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
171    fn from(value: Rc<RustTimeEventCallback>) -> Self {
172        Self::Rust(value)
173    }
174}
175
176#[cfg(feature = "python")]
177impl From<PyObject> for TimeEventCallback {
178    fn from(value: PyObject) -> Self {
179        Self::Python(Arc::new(value))
180    }
181}
182
183// SAFETY: Message handlers cannot be sent across thread boundaries
184#[allow(unsafe_code)]
185unsafe impl Send for TimeEventCallback {}
186#[allow(unsafe_code)]
187unsafe impl Sync for TimeEventCallback {}
188
189#[repr(C)]
190#[derive(Clone, Debug)]
191/// Represents a time event and its associated handler.
192///
193/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
194/// when the event's timestamp is reached.
195pub struct TimeEventHandlerV2 {
196    /// The time event.
197    pub event: TimeEvent,
198    /// The callable handler for the event.
199    pub callback: TimeEventCallback,
200}
201
202impl TimeEventHandlerV2 {
203    /// Creates a new [`TimeEventHandlerV2`] instance.
204    #[must_use]
205    pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
206        Self { event, callback }
207    }
208
209    /// Executes the handler by invoking its callback for the associated event.
210    ///
211    /// # Panics
212    ///
213    /// Panics if the underlying callback invocation fails (e.g., a Python callback raises an exception).
214    pub fn run(self) {
215        let Self { event, callback } = self;
216        callback.call(event);
217    }
218}
219
220impl PartialOrd for TimeEventHandlerV2 {
221    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
222        Some(self.cmp(other))
223    }
224}
225
226impl PartialEq for TimeEventHandlerV2 {
227    fn eq(&self, other: &Self) -> bool {
228        self.event.ts_event == other.event.ts_event
229    }
230}
231
232impl Eq for TimeEventHandlerV2 {}
233
234impl Ord for TimeEventHandlerV2 {
235    fn cmp(&self, other: &Self) -> Ordering {
236        self.event.ts_event.cmp(&other.event.ts_event)
237    }
238}
239
240/// A test timer for user with a `TestClock`.
241///
242/// `TestTimer` simulates time progression in a controlled environment,
243/// allowing for precise control over event generation in test scenarios.
244#[derive(Clone, Copy, Debug)]
245pub struct TestTimer {
246    /// The name of the timer.
247    pub name: Ustr,
248    /// The interval between timer events in nanoseconds.
249    pub interval_ns: NonZeroU64,
250    /// The start time of the timer in UNIX nanoseconds.
251    pub start_time_ns: UnixNanos,
252    /// The optional stop time of the timer in UNIX nanoseconds.
253    pub stop_time_ns: Option<UnixNanos>,
254    next_time_ns: UnixNanos,
255    is_expired: bool,
256}
257
258impl TestTimer {
259    /// Creates a new [`TestTimer`] instance.
260    ///
261    /// # Panics
262    ///
263    /// Panics if `name` is not a valid string.
264    #[must_use]
265    pub fn new(
266        name: Ustr,
267        interval_ns: NonZeroU64,
268        start_time_ns: UnixNanos,
269        stop_time_ns: Option<UnixNanos>,
270    ) -> Self {
271        check_valid_string(name, stringify!(name)).expect(FAILED);
272
273        Self {
274            name,
275            interval_ns,
276            start_time_ns,
277            stop_time_ns,
278            next_time_ns: start_time_ns + interval_ns.get(),
279            is_expired: false,
280        }
281    }
282
283    /// Returns the next time in UNIX nanoseconds when the timer will fire.
284    #[must_use]
285    pub const fn next_time_ns(&self) -> UnixNanos {
286        self.next_time_ns
287    }
288
289    /// Returns whether the timer is expired.
290    #[must_use]
291    pub const fn is_expired(&self) -> bool {
292        self.is_expired
293    }
294
295    #[must_use]
296    pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
297        TimeEvent {
298            name: self.name,
299            event_id,
300            ts_event: self.next_time_ns,
301            ts_init,
302        }
303    }
304
305    /// Advance the test timer forward to the given time, generating a sequence
306    /// of events. A [`TimeEvent`] is appended for each time a next event is
307    /// <= the given `to_time_ns`.
308    ///
309    /// This allows testing of multiple time intervals within a single step.
310    pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
311        let advances = to_time_ns
312            .saturating_sub(self.next_time_ns.as_u64() - self.interval_ns.get())
313            / self.interval_ns.get();
314        self.take(advances as usize).map(|(event, _)| event)
315    }
316
317    /// Cancels the timer (the timer will not generate an event).
318    ///
319    /// Used to stop the timer before its scheduled stop time.
320    pub const fn cancel(&mut self) {
321        self.is_expired = true;
322    }
323}
324
325impl Iterator for TestTimer {
326    type Item = (TimeEvent, UnixNanos);
327
328    fn next(&mut self) -> Option<Self::Item> {
329        if self.is_expired {
330            None
331        } else {
332            let item = (
333                TimeEvent {
334                    name: self.name,
335                    event_id: UUID4::new(),
336                    ts_event: self.next_time_ns,
337                    ts_init: self.next_time_ns,
338                },
339                self.next_time_ns,
340            );
341
342            // If current next event time has exceeded stop time, then expire timer
343            if let Some(stop_time_ns) = self.stop_time_ns {
344                if self.next_time_ns >= stop_time_ns {
345                    self.is_expired = true;
346                }
347            }
348
349            self.next_time_ns += self.interval_ns;
350
351            Some(item)
352        }
353    }
354}
355
356/// A live timer for use with a `LiveClock`.
357///
358/// `LiveTimer` triggers events at specified intervals in a real-time environment,
359/// using Tokio's async runtime to handle scheduling and execution.
360#[derive(Debug)]
361pub struct LiveTimer {
362    /// The name of the timer.
363    pub name: Ustr,
364    /// The start time of the timer in UNIX nanoseconds.
365    pub interval_ns: NonZeroU64,
366    /// The start time of the timer in UNIX nanoseconds.
367    pub start_time_ns: UnixNanos,
368    /// The optional stop time of the timer in UNIX nanoseconds.
369    pub stop_time_ns: Option<UnixNanos>,
370    next_time_ns: Arc<AtomicU64>,
371    callback: TimeEventCallback,
372    task_handle: Option<JoinHandle<()>>,
373    #[cfg(feature = "clock_v2")]
374    heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
375}
376
377impl LiveTimer {
378    /// Creates a new [`LiveTimer`] instance.
379    ///
380    /// # Panics
381    ///
382    /// Panics if `name` is not a valid string.
383    #[must_use]
384    #[cfg(not(feature = "clock_v2"))]
385    pub fn new(
386        name: Ustr,
387        interval_ns: NonZeroU64,
388        start_time_ns: UnixNanos,
389        stop_time_ns: Option<UnixNanos>,
390        callback: TimeEventCallback,
391    ) -> Self {
392        check_valid_string(name, stringify!(name)).expect(FAILED);
393
394        log::debug!("Creating timer '{name}'");
395        Self {
396            name,
397            interval_ns,
398            start_time_ns,
399            stop_time_ns,
400            next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
401            callback,
402            task_handle: None,
403        }
404    }
405
406    /// Creates a new [`LiveTimer`] instance.
407    ///
408    /// # Panics
409    ///
410    /// Panics if `name` is not a valid string.
411    #[must_use]
412    #[cfg(feature = "clock_v2")]
413    pub fn new(
414        name: Ustr,
415        interval_ns: NonZeroU64,
416        start_time_ns: UnixNanos,
417        stop_time_ns: Option<UnixNanos>,
418        callback: TimeEventCallback,
419        heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
420    ) -> Self {
421        check_valid_string(name, stringify!(name)).expect(FAILED);
422
423        log::debug!("Creating timer '{name}'");
424        Self {
425            name,
426            interval_ns,
427            start_time_ns,
428            stop_time_ns,
429            next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
430            callback,
431            heap,
432            task_handle: None,
433        }
434    }
435
436    /// Returns the next time in UNIX nanoseconds when the timer will fire.
437    ///
438    /// Provides the scheduled time for the next event based on the current state of the timer.
439    #[must_use]
440    pub fn next_time_ns(&self) -> UnixNanos {
441        UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
442    }
443
444    /// Returns whether the timer is expired.
445    ///
446    /// An expired timer will not trigger any further events.
447    /// A timer that has not been started is not expired.
448    #[must_use]
449    pub fn is_expired(&self) -> bool {
450        self.task_handle
451            .as_ref()
452            .is_some_and(tokio::task::JoinHandle::is_finished)
453    }
454
455    /// Starts the timer.
456    ///
457    /// Time events will begin triggering at the specified intervals.
458    /// The generated events are handled by the provided callback function.
459    #[allow(unused_variables)] // callback is used
460    pub fn start(&mut self) {
461        let event_name = self.name;
462        let stop_time_ns = self.stop_time_ns;
463        let interval_ns = self.interval_ns.get();
464        let callback = self.callback.clone();
465
466        // Get current time
467        let clock = get_atomic_clock_realtime();
468        let now_ns = clock.get_time_ns();
469
470        // Check if the timer's alert time is in the past and adjust if needed
471        let mut next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
472        if next_time_ns <= now_ns {
473            log::warn!(
474                "Timer '{}' alert time {} was in the past, adjusted to current time for immediate fire",
475                event_name,
476                next_time_ns,
477            );
478            next_time_ns = now_ns.into();
479            self.next_time_ns
480                .store(now_ns.as_u64(), atomic::Ordering::SeqCst);
481        }
482
483        // Floor the next time to the nearest microsecond which is within the timers accuracy
484        let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
485        let next_time_atomic = self.next_time_ns.clone();
486
487        #[cfg(feature = "clock_v2")]
488        let heap = self.heap.clone();
489
490        let rt = get_runtime();
491        let handle = rt.spawn(async move {
492            let clock = get_atomic_clock_realtime();
493
494            // 1-millisecond delay to account for the overhead of initializing a tokio timer
495            let overhead = Duration::from_millis(1);
496            let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
497            let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
498            let start = Instant::now() + delay;
499
500            let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
501
502            loop {
503                // SAFETY: `timer.tick` is cancellation safe, if the cancel branch completes
504                // first then no tick has been consumed (no event was ready).
505                timer.tick().await;
506                let now_ns = clock.get_time_ns();
507
508                #[cfg(feature = "python")]
509                {
510                    match callback {
511                        TimeEventCallback::Python(ref callback) => {
512                            call_python_with_time_event(event_name, next_time_ns, now_ns, callback);
513                        }
514                        // Note: Clock v1 style path should not be called with Rust callback
515                        TimeEventCallback::Rust(_) => {}
516                    }
517                }
518
519                #[cfg(feature = "clock_v2")]
520                {
521                    let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
522                    heap.lock().await.push(event);
523                }
524
525                // Prepare next time interval
526                next_time_ns += interval_ns;
527                next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
528
529                // Check if expired
530                if let Some(stop_time_ns) = stop_time_ns {
531                    if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
532                        break; // Timer expired
533                    }
534                }
535            }
536        });
537
538        self.task_handle = Some(handle);
539    }
540
541    /// Cancels the timer.
542    ///
543    /// The timer will not generate a final event.
544    pub fn cancel(&mut self) {
545        log::debug!("Cancel timer '{}'", self.name);
546        if let Some(ref handle) = self.task_handle {
547            handle.abort();
548        }
549    }
550}
551
552#[cfg(feature = "python")]
553fn call_python_with_time_event(
554    name: Ustr,
555    ts_event: UnixNanos,
556    ts_init: UnixNanos,
557    callback: &PyObject,
558) {
559    use nautilus_core::python::IntoPyObjectPoseiExt;
560    use pyo3::types::PyCapsule;
561
562    Python::with_gil(|py| {
563        // Create new time event
564        let event = TimeEvent::new(name, UUID4::new(), ts_event, ts_init);
565        let capsule: PyObject = PyCapsule::new(py, event, None)
566            .expect("Error creating `PyCapsule`")
567            .into_py_any_unwrap(py);
568
569        match callback.call1(py, (capsule,)) {
570            Ok(_) => {}
571            Err(e) => tracing::error!("Error on callback: {e:?}"),
572        }
573    });
574}
575
576////////////////////////////////////////////////////////////////////////////////
577// Tests
578////////////////////////////////////////////////////////////////////////////////
579#[cfg(test)]
580mod tests {
581    use std::num::NonZeroU64;
582
583    use nautilus_core::UnixNanos;
584    use rstest::*;
585    use ustr::Ustr;
586
587    use super::{TestTimer, TimeEvent};
588
589    #[rstest]
590    fn test_test_timer_pop_event() {
591        let mut timer = TestTimer::new(
592            Ustr::from("TEST_TIMER"),
593            NonZeroU64::new(1).unwrap(),
594            UnixNanos::from(1),
595            None,
596        );
597
598        assert!(timer.next().is_some());
599        assert!(timer.next().is_some());
600        timer.is_expired = true;
601        assert!(timer.next().is_none());
602    }
603
604    #[rstest]
605    fn test_test_timer_advance_within_next_time_ns() {
606        let mut timer = TestTimer::new(
607            Ustr::from("TEST_TIMER"),
608            NonZeroU64::new(5).unwrap(),
609            UnixNanos::default(),
610            None,
611        );
612        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
613        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
614        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
615        assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
616        assert_eq!(timer.next_time_ns, 5);
617        assert!(!timer.is_expired);
618    }
619
620    #[rstest]
621    fn test_test_timer_advance_up_to_next_time_ns() {
622        let mut timer = TestTimer::new(
623            Ustr::from("TEST_TIMER"),
624            NonZeroU64::new(1).unwrap(),
625            UnixNanos::default(),
626            None,
627        );
628        assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
629        assert!(!timer.is_expired);
630    }
631
632    #[rstest]
633    fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
634        let mut timer = TestTimer::new(
635            Ustr::from("TEST_TIMER"),
636            NonZeroU64::new(1).unwrap(),
637            UnixNanos::default(),
638            Some(UnixNanos::from(2)),
639        );
640        assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
641        assert!(timer.is_expired);
642    }
643
644    #[rstest]
645    fn test_test_timer_advance_beyond_next_time_ns() {
646        let mut timer = TestTimer::new(
647            Ustr::from("TEST_TIMER"),
648            NonZeroU64::new(1).unwrap(),
649            UnixNanos::default(),
650            Some(UnixNanos::from(5)),
651        );
652        assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
653        assert!(timer.is_expired);
654    }
655
656    #[rstest]
657    fn test_test_timer_advance_beyond_stop_time() {
658        let mut timer = TestTimer::new(
659            Ustr::from("TEST_TIMER"),
660            NonZeroU64::new(1).unwrap(),
661            UnixNanos::default(),
662            Some(UnixNanos::from(5)),
663        );
664        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
665        assert!(timer.is_expired);
666    }
667
668    #[rstest]
669    fn test_test_timer_advance_exact_boundary() {
670        let mut timer = TestTimer::new(
671            Ustr::from("TEST_TIMER"),
672            NonZeroU64::new(5).unwrap(),
673            UnixNanos::from(0),
674            None,
675        );
676        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
677        assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
678
679        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
680        assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
681    }
682}