nautilus_common/
clock.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and static test `Clock` implementations.
17
18use std::{
19    collections::{BTreeMap, BinaryHeap, HashMap},
20    fmt::Debug,
21    ops::Deref,
22    pin::Pin,
23    sync::Arc,
24    task::{Context, Poll},
25};
26
27use chrono::{DateTime, Utc};
28use futures::Stream;
29use nautilus_core::{
30    AtomicTime, UnixNanos,
31    correctness::{check_positive_u64, check_predicate_true, check_valid_string},
32    time::get_atomic_clock_realtime,
33};
34use tokio::sync::Mutex;
35use ustr::Ustr;
36
37use crate::timer::{
38    LiveTimer, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2, create_valid_interval,
39};
40
41/// Represents a type of clock.
42///
43/// # Notes
44///
45/// An active timer is one which has not expired (`timer.is_expired == False`).
46pub trait Clock: Debug {
47    /// Returns the current date and time as a timezone-aware `DateTime<UTC>`.
48    fn utc_now(&self) -> DateTime<Utc> {
49        DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
50    }
51
52    /// Returns the current UNIX timestamp in nanoseconds (ns).
53    fn timestamp_ns(&self) -> UnixNanos;
54
55    /// Returns the current UNIX timestamp in microseconds (μs).
56    fn timestamp_us(&self) -> u64;
57
58    /// Returns the current UNIX timestamp in milliseconds (ms).
59    fn timestamp_ms(&self) -> u64;
60
61    /// Returns the current UNIX timestamp in seconds.
62    fn timestamp(&self) -> f64;
63
64    /// Returns the names of active timers in the clock.
65    fn timer_names(&self) -> Vec<&str>;
66
67    /// Returns the count of active timers in the clock.
68    fn timer_count(&self) -> usize;
69
70    /// Register a default event handler for the clock. If a `Timer`
71    /// does not have an event handler, then this handler is used.
72    fn register_default_handler(&mut self, callback: TimeEventCallback);
73
74    /// Get handler for [`TimeEvent`]
75    ///
76    /// Note: Panics if the event does not have an associated handler
77    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
78
79    /// Set a `Timer` to alert at a particular time. Optional
80    /// callback gets used to handle generated events.
81    /// # Errors
82    ///
83    /// Returns an error if `name` is invalid, `alert_time_ns` is non-positive when not allowed,
84    /// or any predicate check fails.
85    fn set_time_alert_ns(
86        &mut self,
87        name: &str,
88        alert_time_ns: UnixNanos,
89        callback: Option<TimeEventCallback>,
90        allow_past: Option<bool>,
91    ) -> anyhow::Result<()>;
92
93    /// Set a `Timer` to start alerting at every interval
94    /// between start and stop time. Optional callback gets
95    /// used to handle generated event.
96    /// # Errors
97    ///
98    /// Returns an error if `name` is invalid, `interval_ns` is not positive,
99    /// or if any predicate check fails.
100    fn set_timer_ns(
101        &mut self,
102        name: &str,
103        interval_ns: u64,
104        start_time_ns: UnixNanos,
105        stop_time_ns: Option<UnixNanos>,
106        callback: Option<TimeEventCallback>,
107        allow_past: Option<bool>,
108    ) -> anyhow::Result<()>;
109
110    /// Returns the time interval in which the timer `name` is triggered.
111    ///
112    /// If the timer doesn't exist `None` is returned.
113    fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
114
115    /// Cancels the timer with `name`.
116    fn cancel_timer(&mut self, name: &str);
117
118    /// Cancels all timers.
119    fn cancel_timers(&mut self);
120
121    /// Resets the clock by clearing it's internal state.
122    fn reset(&mut self);
123}
124
125/// A static test clock.
126///
127/// Stores the current timestamp internally which can be advanced.
128#[derive(Debug)]
129pub struct TestClock {
130    time: AtomicTime,
131    // Use btree map to ensure stable ordering when scanning for timers in `advance_time`
132    timers: BTreeMap<Ustr, TestTimer>,
133    default_callback: Option<TimeEventCallback>,
134    callbacks: HashMap<Ustr, TimeEventCallback>,
135    heap: BinaryHeap<TimeEvent>,
136}
137
138impl TestClock {
139    /// Creates a new [`TestClock`] instance.
140    #[must_use]
141    pub fn new() -> Self {
142        Self {
143            time: AtomicTime::new(false, UnixNanos::default()),
144            timers: BTreeMap::new(),
145            default_callback: None,
146            callbacks: HashMap::new(),
147            heap: BinaryHeap::new(),
148        }
149    }
150
151    /// Returns a reference to the internal timers for the clock.
152    #[must_use]
153    pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
154        &self.timers
155    }
156
157    /// Advances the internal clock to the specified `to_time_ns` and optionally sets the clock to that time.
158    ///
159    /// This function ensures that the clock behaves in a non-decreasing manner. If `set_time` is `true`,
160    /// the internal clock will be updated to the value of `to_time_ns`. Otherwise, the clock will advance
161    /// without explicitly setting the time.
162    ///
163    /// The method processes active timers, advancing them to `to_time_ns`, and collects any `TimeEvent`
164    /// objects that are triggered as a result. Only timers that are not expired are processed.
165    ///
166    /// # Panics
167    ///
168    /// Panics if `to_time_ns` is less than the current internal clock time.
169    pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
170        // Time should be non-decreasing
171        assert!(
172            to_time_ns >= self.time.get_time_ns(),
173            "`to_time_ns` {to_time_ns} was < `self.time.get_time_ns()` {}",
174            self.time.get_time_ns()
175        );
176
177        if set_time {
178            self.time.set_time(to_time_ns);
179        }
180
181        // Iterate and advance timers and collect events. Only retain alive timers.
182        let mut events: Vec<TimeEvent> = Vec::new();
183        self.timers.retain(|_, timer| {
184            timer.advance(to_time_ns).for_each(|event| {
185                events.push(event);
186            });
187
188            !timer.is_expired()
189        });
190
191        events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
192        events
193    }
194
195    /// Advances the internal clock to the specified `to_time_ns` and optionally sets the clock to that time.
196    ///
197    /// Pushes the [`TimeEvent`]s on the heap to ensure ordering
198    ///
199    /// Note: `set_time` is not used but present to keep backward compatible api call
200    ///
201    /// # Panics
202    ///
203    /// Panics if `to_time_ns` is less than the current internal clock time.
204    pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
205        // Time should be non-decreasing
206        assert!(
207            to_time_ns >= self.time.get_time_ns(),
208            "`to_time_ns` {to_time_ns} was < `self.time.get_time_ns()` {}",
209            self.time.get_time_ns()
210        );
211
212        self.time.set_time(to_time_ns);
213
214        // Iterate and advance timers and push events to heap. Only retain alive timers.
215        self.timers.retain(|_, timer| {
216            timer.advance(to_time_ns).for_each(|event| {
217                self.heap.push(event);
218            });
219
220            !timer.is_expired()
221        });
222    }
223
224    /// Matches `TimeEvent` objects with their corresponding event handlers.
225    ///
226    /// This function takes an `events` vector of `TimeEvent` objects, assumes they are already sorted
227    /// by their `ts_event`, and matches them with the appropriate callback handler from the internal
228    /// registry of callbacks. If no specific callback is found for an event, the default callback is used.
229    ///
230    /// # Panics
231    ///
232    /// Panics if the default callback is not set for the clock when matching handlers.
233    #[must_use]
234    pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
235        events
236            .into_iter()
237            .map(|event| {
238                let callback = self.callbacks.get(&event.name).cloned().unwrap_or_else(|| {
239                    // If callback_py is None, use the default_callback_py
240                    // TODO: clone for now
241                    self.default_callback
242                        .clone()
243                        .expect("Default callback should exist")
244                });
245                TimeEventHandlerV2::new(event, callback)
246            })
247            .collect()
248    }
249}
250
251impl Iterator for TestClock {
252    type Item = TimeEventHandlerV2;
253
254    fn next(&mut self) -> Option<Self::Item> {
255        self.heap.pop().map(|event| self.get_handler(event))
256    }
257}
258
259impl Default for TestClock {
260    /// Creates a new default [`TestClock`] instance.
261    fn default() -> Self {
262        Self::new()
263    }
264}
265
266impl Deref for TestClock {
267    type Target = AtomicTime;
268
269    fn deref(&self) -> &Self::Target {
270        &self.time
271    }
272}
273
274impl Clock for TestClock {
275    fn timestamp_ns(&self) -> UnixNanos {
276        self.time.get_time_ns()
277    }
278
279    fn timestamp_us(&self) -> u64 {
280        self.time.get_time_us()
281    }
282
283    fn timestamp_ms(&self) -> u64 {
284        self.time.get_time_ms()
285    }
286
287    fn timestamp(&self) -> f64 {
288        self.time.get_time()
289    }
290
291    fn timer_names(&self) -> Vec<&str> {
292        self.timers
293            .iter()
294            .filter(|(_, timer)| !timer.is_expired())
295            .map(|(k, _)| k.as_str())
296            .collect()
297    }
298
299    fn timer_count(&self) -> usize {
300        self.timers
301            .iter()
302            .filter(|(_, timer)| !timer.is_expired())
303            .count()
304    }
305
306    fn register_default_handler(&mut self, callback: TimeEventCallback) {
307        self.default_callback = Some(callback);
308    }
309
310    /// Returns the handler for the given `TimeEvent`.
311    ///
312    /// # Panics
313    ///
314    /// Panics if no event-specific or default callback has been registered for the event.
315    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
316        // Get the callback from either the event-specific callbacks or default callback
317        let callback = self
318            .callbacks
319            .get(&event.name)
320            .cloned()
321            .or_else(|| self.default_callback.clone())
322            .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
323
324        TimeEventHandlerV2::new(event, callback)
325    }
326
327    fn set_time_alert_ns(
328        &mut self,
329        name: &str,
330        mut alert_time_ns: UnixNanos, // mut allows adjustment based on allow_past
331        callback: Option<TimeEventCallback>,
332        allow_past: Option<bool>,
333    ) -> anyhow::Result<()> {
334        check_valid_string(name, stringify!(name))?;
335
336        let name = Ustr::from(name);
337        let allow_past = allow_past.unwrap_or(true);
338
339        check_predicate_true(
340            callback.is_some()
341                | self.callbacks.contains_key(&name)
342                | self.default_callback.is_some(),
343            "No callbacks provided",
344        )?;
345
346        match callback {
347            Some(callback_py) => self.callbacks.insert(name, callback_py),
348            None => None,
349        };
350
351        // This allows to reuse a time alert without updating the callback, for example for non regular monthly alerts
352        self.cancel_timer(name.as_str());
353
354        let ts_now = self.get_time_ns();
355
356        if alert_time_ns < ts_now {
357            if allow_past {
358                alert_time_ns = ts_now;
359                log::warn!(
360                    "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
361                    alert_time_ns.to_rfc3339(),
362                );
363            } else {
364                anyhow::bail!(
365                    "Timer '{name}' alert time {} was in the past (current time is {})",
366                    alert_time_ns.to_rfc3339(),
367                    ts_now.to_rfc3339(),
368                );
369            }
370        }
371
372        // Safe to calculate interval now that we've ensured alert_time_ns >= ts_now
373        let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
374
375        let timer = TestTimer::new(name, interval_ns, ts_now, Some(alert_time_ns));
376        self.timers.insert(name, timer);
377
378        Ok(())
379    }
380
381    fn set_timer_ns(
382        &mut self,
383        name: &str,
384        interval_ns: u64,
385        start_time_ns: UnixNanos,
386        stop_time_ns: Option<UnixNanos>,
387        callback: Option<TimeEventCallback>,
388        allow_past: Option<bool>,
389    ) -> anyhow::Result<()> {
390        check_valid_string(name, stringify!(name))?;
391        check_positive_u64(interval_ns, stringify!(interval_ns))?;
392        check_predicate_true(
393            callback.is_some() | self.default_callback.is_some(),
394            "No callbacks provided",
395        )?;
396
397        let name = Ustr::from(name);
398        let allow_past = allow_past.unwrap_or(true);
399
400        match callback {
401            Some(callback_py) => self.callbacks.insert(name, callback_py),
402            None => None,
403        };
404
405        let mut start_time_ns = start_time_ns;
406        let ts_now = self.get_time_ns();
407
408        if start_time_ns == 0 {
409            // Zero start time indicates no explicit start; we use the current time
410            start_time_ns = self.timestamp_ns();
411        } else if start_time_ns < ts_now && !allow_past {
412            anyhow::bail!(
413                "Timer '{name}' start time {} was in the past (current time is {})",
414                start_time_ns.to_rfc3339(),
415                ts_now.to_rfc3339(),
416            );
417        }
418
419        if let Some(stop_time) = stop_time_ns {
420            if stop_time <= start_time_ns {
421                anyhow::bail!(
422                    "Timer '{name}' stop time {} must be after start time {}",
423                    stop_time.to_rfc3339(),
424                    start_time_ns.to_rfc3339(),
425                );
426            }
427        }
428
429        let interval_ns = create_valid_interval(interval_ns);
430
431        let timer = TestTimer::new(name, interval_ns, start_time_ns, stop_time_ns);
432        self.timers.insert(name, timer);
433
434        Ok(())
435    }
436
437    fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
438        self.timers
439            .get(&Ustr::from(name))
440            .map(|timer| timer.next_time_ns())
441    }
442
443    fn cancel_timer(&mut self, name: &str) {
444        let timer = self.timers.remove(&Ustr::from(name));
445        if let Some(mut timer) = timer {
446            timer.cancel();
447        }
448    }
449
450    fn cancel_timers(&mut self) {
451        for timer in &mut self.timers.values_mut() {
452            timer.cancel();
453        }
454
455        self.timers.clear();
456    }
457
458    fn reset(&mut self) {
459        self.time = AtomicTime::new(false, UnixNanos::default());
460        self.timers = BTreeMap::new();
461        self.heap = BinaryHeap::new();
462        self.callbacks = HashMap::new();
463    }
464}
465
466/// A real-time clock which uses system time.
467///
468/// Timestamps are guaranteed to be unique and monotonically increasing.
469#[derive(Debug)]
470pub struct LiveClock {
471    time: &'static AtomicTime,
472    timers: HashMap<Ustr, LiveTimer>,
473    default_callback: Option<TimeEventCallback>,
474    pub heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
475    #[allow(dead_code)]
476    callbacks: HashMap<Ustr, TimeEventCallback>,
477}
478
479impl LiveClock {
480    /// Creates a new [`LiveClock`] instance.
481    #[must_use]
482    pub fn new() -> Self {
483        Self {
484            time: get_atomic_clock_realtime(),
485            timers: HashMap::new(),
486            default_callback: None,
487            heap: Arc::new(Mutex::new(BinaryHeap::new())),
488            callbacks: HashMap::new(),
489        }
490    }
491
492    #[must_use]
493    pub fn get_event_stream(&self) -> TimeEventStream {
494        TimeEventStream::new(self.heap.clone())
495    }
496
497    #[must_use]
498    pub const fn get_timers(&self) -> &HashMap<Ustr, LiveTimer> {
499        &self.timers
500    }
501
502    // Clean up expired timers. Retain only live ones
503    fn clear_expired_timers(&mut self) {
504        self.timers.retain(|_, timer| !timer.is_expired());
505    }
506}
507
508impl Default for LiveClock {
509    /// Creates a new default [`LiveClock`] instance.
510    fn default() -> Self {
511        Self::new()
512    }
513}
514
515impl Deref for LiveClock {
516    type Target = AtomicTime;
517
518    fn deref(&self) -> &Self::Target {
519        self.time
520    }
521}
522
523impl Clock for LiveClock {
524    fn timestamp_ns(&self) -> UnixNanos {
525        self.time.get_time_ns()
526    }
527
528    fn timestamp_us(&self) -> u64 {
529        self.time.get_time_us()
530    }
531
532    fn timestamp_ms(&self) -> u64 {
533        self.time.get_time_ms()
534    }
535
536    fn timestamp(&self) -> f64 {
537        self.time.get_time()
538    }
539
540    fn timer_names(&self) -> Vec<&str> {
541        self.timers
542            .iter()
543            .filter(|(_, timer)| !timer.is_expired())
544            .map(|(k, _)| k.as_str())
545            .collect()
546    }
547
548    fn timer_count(&self) -> usize {
549        self.timers
550            .iter()
551            .filter(|(_, timer)| !timer.is_expired())
552            .count()
553    }
554
555    fn register_default_handler(&mut self, handler: TimeEventCallback) {
556        self.default_callback = Some(handler);
557    }
558
559    /// # Panics
560    ///
561    /// This function panics if:
562    /// - The `clock_v2` feature is not enabled.
563    /// - The event does not have an associated handler (see trait documentation).
564    #[allow(unused_variables)]
565    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
566        #[cfg(not(feature = "clock_v2"))]
567        panic!("Cannot get live clock handler without 'clock_v2' feature");
568
569        // Get the callback from either the event-specific callbacks or default callback
570        #[cfg(feature = "clock_v2")]
571        {
572            let callback = self
573                .callbacks
574                .get(&event.name)
575                .cloned()
576                .or_else(|| self.default_callback.clone())
577                .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
578
579            TimeEventHandlerV2::new(event, callback)
580        }
581    }
582
583    fn set_time_alert_ns(
584        &mut self,
585        name: &str,
586        mut alert_time_ns: UnixNanos, // mut allows adjustment based on allow_past
587        callback: Option<TimeEventCallback>,
588        allow_past: Option<bool>,
589    ) -> anyhow::Result<()> {
590        check_valid_string(name, stringify!(name))?;
591
592        let name = Ustr::from(name);
593        let allow_past = allow_past.unwrap_or(true);
594
595        check_predicate_true(
596            callback.is_some()
597                | self.callbacks.contains_key(&name)
598                | self.default_callback.is_some(),
599            "No callbacks provided",
600        )?;
601
602        #[cfg(feature = "clock_v2")]
603        {
604            match callback.clone() {
605                Some(callback) => self.callbacks.insert(name, callback),
606                None => None,
607            };
608        }
609
610        let callback = match callback {
611            Some(callback) => callback,
612            None => {
613                if self.callbacks.contains_key(&name) {
614                    self.callbacks.get(&name).unwrap().clone()
615                } else {
616                    self.default_callback.clone().unwrap()
617                }
618            }
619        };
620
621        // This allows to reuse a time alert without updating the callback, for example for non regular monthly alerts
622        self.cancel_timer(name.as_str());
623
624        let ts_now = self.get_time_ns();
625
626        // Handle past timestamps based on flag
627        if alert_time_ns < ts_now {
628            if allow_past {
629                alert_time_ns = ts_now;
630                log::warn!(
631                    "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
632                    alert_time_ns.to_rfc3339(),
633                );
634            } else {
635                anyhow::bail!(
636                    "Timer '{name}' alert time {} was in the past (current time is {})",
637                    alert_time_ns.to_rfc3339(),
638                    ts_now.to_rfc3339(),
639                );
640            }
641        }
642
643        // Safe to calculate interval now that we've ensured alert_time_ns >= ts_now
644        let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
645
646        #[cfg(not(feature = "clock_v2"))]
647        let mut timer = LiveTimer::new(name, interval_ns, ts_now, Some(alert_time_ns), callback);
648
649        #[cfg(feature = "clock_v2")]
650        let mut timer = LiveTimer::new(
651            name,
652            interval_ns,
653            ts_now,
654            Some(alert_time_ns),
655            callback,
656            self.heap.clone(),
657        );
658
659        timer.start();
660
661        self.clear_expired_timers();
662        self.timers.insert(name, timer);
663
664        Ok(())
665    }
666
667    fn set_timer_ns(
668        &mut self,
669        name: &str,
670        interval_ns: u64,
671        start_time_ns: UnixNanos,
672        stop_time_ns: Option<UnixNanos>,
673        callback: Option<TimeEventCallback>,
674        allow_past: Option<bool>,
675    ) -> anyhow::Result<()> {
676        check_valid_string(name, stringify!(name))?;
677        check_positive_u64(interval_ns, stringify!(interval_ns))?;
678        check_predicate_true(
679            callback.is_some() | self.default_callback.is_some(),
680            "No callbacks provided",
681        )?;
682
683        let name = Ustr::from(name);
684        let allow_past = allow_past.unwrap_or(true);
685
686        let callback = match callback {
687            Some(callback) => callback,
688            None => self.default_callback.clone().unwrap(),
689        };
690
691        #[cfg(feature = "clock_v2")]
692        {
693            self.callbacks.insert(name, callback.clone());
694        }
695
696        let mut start_time_ns = start_time_ns;
697        let ts_now = self.get_time_ns();
698
699        if start_time_ns == 0 {
700            // Zero start time indicates no explicit start; we use the current time
701            start_time_ns = self.timestamp_ns();
702        } else if start_time_ns < ts_now && !allow_past {
703            anyhow::bail!(
704                "Timer '{name}' start time {} was in the past (current time is {})",
705                start_time_ns.to_rfc3339(),
706                ts_now.to_rfc3339(),
707            );
708        }
709
710        if let Some(stop_time) = stop_time_ns {
711            if stop_time <= start_time_ns {
712                anyhow::bail!(
713                    "Timer '{name}' stop time {} must be after start time {}",
714                    stop_time.to_rfc3339(),
715                    start_time_ns.to_rfc3339(),
716                );
717            }
718        }
719
720        let interval_ns = create_valid_interval(interval_ns);
721
722        #[cfg(not(feature = "clock_v2"))]
723        let mut timer = LiveTimer::new(name, interval_ns, start_time_ns, stop_time_ns, callback);
724
725        #[cfg(feature = "clock_v2")]
726        let mut timer = LiveTimer::new(
727            name,
728            interval_ns,
729            start_time_ns,
730            stop_time_ns,
731            callback,
732            self.heap.clone(),
733        );
734        timer.start();
735
736        self.clear_expired_timers();
737        self.timers.insert(name, timer);
738
739        Ok(())
740    }
741
742    fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
743        self.timers
744            .get(&Ustr::from(name))
745            .map(|timer| timer.next_time_ns())
746    }
747
748    fn cancel_timer(&mut self, name: &str) {
749        let timer = self.timers.remove(&Ustr::from(name));
750        if let Some(mut timer) = timer {
751            timer.cancel();
752        }
753    }
754
755    fn cancel_timers(&mut self) {
756        for timer in &mut self.timers.values_mut() {
757            timer.cancel();
758        }
759
760        self.timers.clear();
761    }
762
763    fn reset(&mut self) {
764        self.timers = HashMap::new();
765        self.heap = Arc::new(Mutex::new(BinaryHeap::new()));
766        self.callbacks = HashMap::new();
767    }
768}
769
770// Helper struct to stream events from the heap
771#[derive(Debug)]
772pub struct TimeEventStream {
773    heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
774}
775
776impl TimeEventStream {
777    pub const fn new(heap: Arc<Mutex<BinaryHeap<TimeEvent>>>) -> Self {
778        Self { heap }
779    }
780}
781
782impl Stream for TimeEventStream {
783    type Item = TimeEvent;
784
785    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
786        let mut heap = match self.heap.try_lock() {
787            Ok(guard) => guard,
788            Err(e) => {
789                tracing::error!("Unable to get LiveClock heap lock: {e}");
790                cx.waker().wake_by_ref();
791                return Poll::Pending;
792            }
793        };
794
795        if let Some(event) = heap.pop() {
796            Poll::Ready(Some(event))
797        } else {
798            cx.waker().wake_by_ref();
799            Poll::Pending
800        }
801    }
802}
803
804////////////////////////////////////////////////////////////////////////////////
805// Tests
806////////////////////////////////////////////////////////////////////////////////
807#[cfg(test)]
808mod tests {
809    use std::{cell::RefCell, rc::Rc};
810
811    use rstest::{fixture, rstest};
812
813    use super::*;
814
815    #[derive(Default)]
816    struct TestCallback {
817        called: Rc<RefCell<bool>>,
818    }
819
820    impl TestCallback {
821        const fn new(called: Rc<RefCell<bool>>) -> Self {
822            Self { called }
823        }
824    }
825
826    impl From<TestCallback> for TimeEventCallback {
827        fn from(callback: TestCallback) -> Self {
828            Self::Rust(Rc::new(move |_event: TimeEvent| {
829                *callback.called.borrow_mut() = true;
830            }))
831        }
832    }
833
834    #[fixture]
835    pub fn test_clock() -> TestClock {
836        let mut clock = TestClock::new();
837        clock.register_default_handler(TestCallback::default().into());
838        clock
839    }
840
841    #[rstest]
842    fn test_time_monotonicity(mut test_clock: TestClock) {
843        let initial_time = test_clock.timestamp_ns();
844        test_clock.advance_time((*initial_time + 1000).into(), true);
845        assert!(test_clock.timestamp_ns() > initial_time);
846    }
847
848    #[rstest]
849    fn test_timer_registration(mut test_clock: TestClock) {
850        test_clock
851            .set_time_alert_ns(
852                "test_timer",
853                (*test_clock.timestamp_ns() + 1000).into(),
854                None,
855                None,
856            )
857            .unwrap();
858        assert_eq!(test_clock.timer_count(), 1);
859        assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
860    }
861
862    #[rstest]
863    fn test_timer_expiration(mut test_clock: TestClock) {
864        let alert_time = (*test_clock.timestamp_ns() + 1000).into();
865        test_clock
866            .set_time_alert_ns("test_timer", alert_time, None, None)
867            .unwrap();
868        let events = test_clock.advance_time(alert_time, true);
869        assert_eq!(events.len(), 1);
870        assert_eq!(events[0].name.as_str(), "test_timer");
871    }
872
873    #[rstest]
874    fn test_timer_cancellation(mut test_clock: TestClock) {
875        test_clock
876            .set_time_alert_ns(
877                "test_timer",
878                (*test_clock.timestamp_ns() + 1000).into(),
879                None,
880                None,
881            )
882            .unwrap();
883        assert_eq!(test_clock.timer_count(), 1);
884        test_clock.cancel_timer("test_timer");
885        assert_eq!(test_clock.timer_count(), 0);
886    }
887
888    #[rstest]
889    fn test_time_advancement(mut test_clock: TestClock) {
890        let start_time = test_clock.timestamp_ns();
891        test_clock
892            .set_timer_ns("test_timer", 1000, start_time, None, None, None)
893            .unwrap();
894        let events = test_clock.advance_time((*start_time + 2500).into(), true);
895        assert_eq!(events.len(), 2);
896        assert_eq!(*events[0].ts_event, *start_time + 1000);
897        assert_eq!(*events[1].ts_event, *start_time + 2000);
898    }
899
900    #[rstest]
901    fn test_default_and_custom_callbacks() {
902        let mut clock = TestClock::new();
903        let default_called = Rc::new(RefCell::new(false));
904        let custom_called = Rc::new(RefCell::new(false));
905
906        let default_callback = TestCallback::new(Rc::clone(&default_called));
907        let custom_callback = TestCallback::new(Rc::clone(&custom_called));
908
909        clock.register_default_handler(TimeEventCallback::from(default_callback));
910        clock
911            .set_time_alert_ns(
912                "default_timer",
913                (*clock.timestamp_ns() + 1000).into(),
914                None,
915                None,
916            )
917            .unwrap();
918        clock
919            .set_time_alert_ns(
920                "custom_timer",
921                (*clock.timestamp_ns() + 1000).into(),
922                Some(TimeEventCallback::from(custom_callback)),
923                None,
924            )
925            .unwrap();
926
927        let events = clock.advance_time((*clock.timestamp_ns() + 1000).into(), true);
928        let handlers = clock.match_handlers(events);
929
930        for handler in handlers {
931            handler.callback.call(handler.event);
932        }
933
934        assert!(*default_called.borrow());
935        assert!(*custom_called.borrow());
936    }
937
938    #[rstest]
939    fn test_multiple_timers(mut test_clock: TestClock) {
940        let start_time = test_clock.timestamp_ns();
941        test_clock
942            .set_timer_ns("timer1", 1000, start_time, None, None, None)
943            .unwrap();
944        test_clock
945            .set_timer_ns("timer2", 2000, start_time, None, None, None)
946            .unwrap();
947        let events = test_clock.advance_time((*start_time + 2000).into(), true);
948        assert_eq!(events.len(), 3);
949        assert_eq!(events[0].name.as_str(), "timer1");
950        assert_eq!(events[1].name.as_str(), "timer1");
951        assert_eq!(events[2].name.as_str(), "timer2");
952    }
953
954    #[rstest]
955    fn test_allow_past_parameter_true(mut test_clock: TestClock) {
956        test_clock.set_time(UnixNanos::from(2000));
957        let current_time = test_clock.timestamp_ns();
958        let past_time = UnixNanos::from(current_time.as_u64() - 1000);
959
960        // With allow_past=true (default), should adjust to current time and succeed
961        test_clock
962            .set_time_alert_ns("past_timer", past_time, None, Some(true))
963            .unwrap();
964
965        // Verify timer was created with adjusted time
966        assert_eq!(test_clock.timer_count(), 1);
967        assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
968
969        // Next time should be at or after current time, not in the past
970        let next_time = test_clock.next_time_ns("past_timer").unwrap();
971        assert!(next_time >= current_time);
972    }
973
974    #[rstest]
975    fn test_allow_past_parameter_false(mut test_clock: TestClock) {
976        test_clock.set_time(UnixNanos::from(2000));
977        let current_time = test_clock.timestamp_ns();
978        let past_time = current_time - 1000;
979
980        // With allow_past=false, should fail for past times
981        let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
982
983        // Verify the operation failed with appropriate error
984        assert!(result.is_err());
985        assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
986
987        // Verify no timer was created
988        assert_eq!(test_clock.timer_count(), 0);
989        assert!(test_clock.timer_names().is_empty());
990    }
991
992    #[rstest]
993    fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
994        test_clock.set_time(UnixNanos::from(2000));
995        let current_time = test_clock.timestamp_ns();
996        let start_time = current_time + 1000;
997        let stop_time = current_time + 500; // Stop time before start time
998
999        // Should fail because stop_time < start_time
1000        let result = test_clock.set_timer_ns(
1001            "invalid_timer",
1002            100,
1003            start_time,
1004            Some(stop_time),
1005            None,
1006            None,
1007        );
1008
1009        // Verify the operation failed with appropriate error
1010        assert!(result.is_err());
1011        assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
1012
1013        // Verify no timer was created
1014        assert_eq!(test_clock.timer_count(), 0);
1015    }
1016}