1use std::{
19 collections::{BTreeMap, BinaryHeap, HashMap},
20 fmt::Debug,
21 ops::Deref,
22 pin::Pin,
23 sync::Arc,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use chrono::{DateTime, Utc};
29use futures::Stream;
30use nautilus_core::{
31 AtomicTime, UnixNanos,
32 correctness::{check_positive_u64, check_predicate_true, check_valid_string},
33 time::get_atomic_clock_realtime,
34};
35use tokio::sync::Mutex;
36use ustr::Ustr;
37
38use crate::{
39 runner::{TimeEventSender, get_time_event_sender},
40 timer::{
41 LiveTimer, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2,
42 create_valid_interval,
43 },
44};
45
46pub trait Clock: Debug {
52 fn utc_now(&self) -> DateTime<Utc> {
54 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
55 }
56
57 fn timestamp_ns(&self) -> UnixNanos;
59
60 fn timestamp_us(&self) -> u64;
62
63 fn timestamp_ms(&self) -> u64;
65
66 fn timestamp(&self) -> f64;
68
69 fn timer_names(&self) -> Vec<&str>;
71
72 fn timer_count(&self) -> usize;
74
75 fn register_default_handler(&mut self, callback: TimeEventCallback);
78
79 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
83
84 #[allow(clippy::too_many_arguments)]
98 fn set_time_alert(
99 &mut self,
100 name: &str,
101 alert_time: DateTime<Utc>,
102 callback: Option<TimeEventCallback>,
103 allow_past: Option<bool>,
104 ) -> anyhow::Result<()> {
105 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
106 }
107
108 #[allow(clippy::too_many_arguments)]
127 fn set_time_alert_ns(
128 &mut self,
129 name: &str,
130 alert_time_ns: UnixNanos,
131 callback: Option<TimeEventCallback>,
132 allow_past: Option<bool>,
133 ) -> anyhow::Result<()>;
134
135 #[allow(clippy::too_many_arguments)]
149 fn set_timer(
150 &mut self,
151 name: &str,
152 interval: Duration,
153 start_time: Option<DateTime<Utc>>,
154 stop_time: Option<DateTime<Utc>>,
155 callback: Option<TimeEventCallback>,
156 allow_past: Option<bool>,
157 fire_immediately: Option<bool>,
158 ) -> anyhow::Result<()> {
159 self.set_timer_ns(
160 name,
161 interval.as_nanos() as u64,
162 start_time.map(UnixNanos::from),
163 stop_time.map(UnixNanos::from),
164 callback,
165 allow_past,
166 fire_immediately,
167 )
168 }
169
170 #[allow(clippy::too_many_arguments)]
191 fn set_timer_ns(
192 &mut self,
193 name: &str,
194 interval_ns: u64,
195 start_time_ns: Option<UnixNanos>,
196 stop_time_ns: Option<UnixNanos>,
197 callback: Option<TimeEventCallback>,
198 allow_past: Option<bool>,
199 fire_immediately: Option<bool>,
200 ) -> anyhow::Result<()>;
201
202 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
206
207 fn cancel_timer(&mut self, name: &str);
209
210 fn cancel_timers(&mut self);
212
213 fn reset(&mut self);
215}
216
217#[derive(Debug)]
221pub struct TestClock {
222 time: AtomicTime,
223 timers: BTreeMap<Ustr, TestTimer>,
225 default_callback: Option<TimeEventCallback>,
226 callbacks: HashMap<Ustr, TimeEventCallback>,
227 heap: BinaryHeap<TimeEvent>, }
229
230impl TestClock {
231 #[must_use]
233 pub fn new() -> Self {
234 Self {
235 time: AtomicTime::new(false, UnixNanos::default()),
236 timers: BTreeMap::new(),
237 default_callback: None,
238 callbacks: HashMap::new(),
239 heap: BinaryHeap::new(),
240 }
241 }
242
243 #[must_use]
245 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
246 &self.timers
247 }
248
249 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
262 assert!(
264 to_time_ns >= self.time.get_time_ns(),
265 "`to_time_ns` {to_time_ns} was < `self.time.get_time_ns()` {}",
266 self.time.get_time_ns()
267 );
268
269 if set_time {
270 self.time.set_time(to_time_ns);
271 }
272
273 let mut events: Vec<TimeEvent> = Vec::new();
275 self.timers.retain(|_, timer| {
276 timer.advance(to_time_ns).for_each(|event| {
277 events.push(event);
278 });
279
280 !timer.is_expired()
281 });
282
283 events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
284 events
285 }
286
287 pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
297 assert!(
299 to_time_ns >= self.time.get_time_ns(),
300 "`to_time_ns` {to_time_ns} was < `self.time.get_time_ns()` {}",
301 self.time.get_time_ns()
302 );
303
304 self.time.set_time(to_time_ns);
305
306 self.timers.retain(|_, timer| {
308 timer.advance(to_time_ns).for_each(|event| {
309 self.heap.push(event);
310 });
311
312 !timer.is_expired()
313 });
314 }
315
316 #[must_use]
326 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
327 events
328 .into_iter()
329 .map(|event| {
330 let callback = self.callbacks.get(&event.name).cloned().unwrap_or_else(|| {
331 self.default_callback
334 .clone()
335 .expect("Default callback should exist")
336 });
337 TimeEventHandlerV2::new(event, callback)
338 })
339 .collect()
340 }
341}
342
343impl Iterator for TestClock {
344 type Item = TimeEventHandlerV2;
345
346 fn next(&mut self) -> Option<Self::Item> {
347 self.heap.pop().map(|event| self.get_handler(event))
348 }
349}
350
351impl Default for TestClock {
352 fn default() -> Self {
354 Self::new()
355 }
356}
357
358impl Deref for TestClock {
359 type Target = AtomicTime;
360
361 fn deref(&self) -> &Self::Target {
362 &self.time
363 }
364}
365
366impl Clock for TestClock {
367 fn timestamp_ns(&self) -> UnixNanos {
368 self.time.get_time_ns()
369 }
370
371 fn timestamp_us(&self) -> u64 {
372 self.time.get_time_us()
373 }
374
375 fn timestamp_ms(&self) -> u64 {
376 self.time.get_time_ms()
377 }
378
379 fn timestamp(&self) -> f64 {
380 self.time.get_time()
381 }
382
383 fn timer_names(&self) -> Vec<&str> {
384 self.timers
385 .iter()
386 .filter(|(_, timer)| !timer.is_expired())
387 .map(|(k, _)| k.as_str())
388 .collect()
389 }
390
391 fn timer_count(&self) -> usize {
392 self.timers
393 .iter()
394 .filter(|(_, timer)| !timer.is_expired())
395 .count()
396 }
397
398 fn register_default_handler(&mut self, callback: TimeEventCallback) {
399 self.default_callback = Some(callback);
400 }
401
402 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
408 let callback = self
410 .callbacks
411 .get(&event.name)
412 .cloned()
413 .or_else(|| self.default_callback.clone())
414 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
415
416 TimeEventHandlerV2::new(event, callback)
417 }
418
419 fn set_time_alert_ns(
420 &mut self,
421 name: &str,
422 mut alert_time_ns: UnixNanos, callback: Option<TimeEventCallback>,
424 allow_past: Option<bool>,
425 ) -> anyhow::Result<()> {
426 check_valid_string(name, stringify!(name))?;
427
428 let name = Ustr::from(name);
429 let allow_past = allow_past.unwrap_or(true);
430
431 check_predicate_true(
432 callback.is_some()
433 | self.callbacks.contains_key(&name)
434 | self.default_callback.is_some(),
435 "No callbacks provided",
436 )?;
437
438 match callback {
439 Some(callback_py) => self.callbacks.insert(name, callback_py),
440 None => None,
441 };
442
443 self.cancel_timer(name.as_str());
445
446 let ts_now = self.get_time_ns();
447
448 if alert_time_ns < ts_now {
449 if allow_past {
450 alert_time_ns = ts_now;
451 log::warn!(
452 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
453 alert_time_ns.to_rfc3339(),
454 );
455 } else {
456 anyhow::bail!(
457 "Timer '{name}' alert time {} was in the past (current time is {})",
458 alert_time_ns.to_rfc3339(),
459 ts_now.to_rfc3339(),
460 );
461 }
462 }
463
464 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
466
467 let timer = TestTimer::new(name, interval_ns, ts_now, Some(alert_time_ns), false);
468 self.timers.insert(name, timer);
469
470 Ok(())
471 }
472
473 fn set_timer_ns(
474 &mut self,
475 name: &str,
476 interval_ns: u64,
477 start_time_ns: Option<UnixNanos>,
478 stop_time_ns: Option<UnixNanos>,
479 callback: Option<TimeEventCallback>,
480 allow_past: Option<bool>,
481 fire_immediately: Option<bool>,
482 ) -> anyhow::Result<()> {
483 check_valid_string(name, stringify!(name))?;
484 check_positive_u64(interval_ns, stringify!(interval_ns))?;
485 check_predicate_true(
486 callback.is_some() | self.default_callback.is_some(),
487 "No callbacks provided",
488 )?;
489
490 let name = Ustr::from(name);
491 let allow_past = allow_past.unwrap_or(true);
492 let fire_immediately = fire_immediately.unwrap_or(false);
493
494 match callback {
495 Some(callback_py) => self.callbacks.insert(name, callback_py),
496 None => None,
497 };
498
499 let mut start_time_ns = start_time_ns.unwrap_or_default();
500 let ts_now = self.get_time_ns();
501
502 if start_time_ns == 0 {
503 start_time_ns = self.timestamp_ns();
505 } else if !allow_past {
506 let next_event_time = if fire_immediately {
508 start_time_ns
509 } else {
510 start_time_ns + interval_ns
511 };
512
513 if next_event_time < ts_now {
515 anyhow::bail!(
516 "Timer '{name}' next event time {} would be in the past (current time is {})",
517 next_event_time.to_rfc3339(),
518 ts_now.to_rfc3339(),
519 );
520 }
521 }
522
523 if let Some(stop_time) = stop_time_ns {
524 if stop_time <= start_time_ns {
525 anyhow::bail!(
526 "Timer '{name}' stop time {} must be after start time {}",
527 stop_time.to_rfc3339(),
528 start_time_ns.to_rfc3339(),
529 );
530 }
531 }
532
533 let interval_ns = create_valid_interval(interval_ns);
534
535 let timer = TestTimer::new(
536 name,
537 interval_ns,
538 start_time_ns,
539 stop_time_ns,
540 fire_immediately,
541 );
542 self.timers.insert(name, timer);
543
544 Ok(())
545 }
546
547 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
548 self.timers
549 .get(&Ustr::from(name))
550 .map(|timer| timer.next_time_ns())
551 }
552
553 fn cancel_timer(&mut self, name: &str) {
554 let timer = self.timers.remove(&Ustr::from(name));
555 if let Some(mut timer) = timer {
556 timer.cancel();
557 }
558 }
559
560 fn cancel_timers(&mut self) {
561 for timer in &mut self.timers.values_mut() {
562 timer.cancel();
563 }
564
565 self.timers.clear();
566 }
567
568 fn reset(&mut self) {
569 self.time = AtomicTime::new(false, UnixNanos::default());
570 self.timers = BTreeMap::new();
571 self.heap = BinaryHeap::new();
572 self.callbacks = HashMap::new();
573 }
574}
575
576#[derive(Debug)]
580pub struct LiveClock {
581 time: &'static AtomicTime,
582 timers: HashMap<Ustr, LiveTimer>,
583 default_callback: Option<TimeEventCallback>,
584 callbacks: HashMap<Ustr, TimeEventCallback>,
585 sender: Option<Arc<dyn TimeEventSender>>,
586}
587
588impl LiveClock {
589 #[must_use]
591 pub fn new(sender: Option<Arc<dyn TimeEventSender>>) -> Self {
592 Self {
593 time: get_atomic_clock_realtime(),
594 timers: HashMap::new(),
595 default_callback: None,
596 callbacks: HashMap::new(),
597 sender,
598 }
599 }
600
601 #[must_use]
602 pub const fn get_timers(&self) -> &HashMap<Ustr, LiveTimer> {
603 &self.timers
604 }
605
606 fn clear_expired_timers(&mut self) {
608 self.timers.retain(|_, timer| !timer.is_expired());
609 }
610}
611
612impl Default for LiveClock {
613 fn default() -> Self {
615 Self::new(Some(get_time_event_sender()))
616 }
617}
618
619impl Deref for LiveClock {
620 type Target = AtomicTime;
621
622 fn deref(&self) -> &Self::Target {
623 self.time
624 }
625}
626
627impl Clock for LiveClock {
628 fn timestamp_ns(&self) -> UnixNanos {
629 self.time.get_time_ns()
630 }
631
632 fn timestamp_us(&self) -> u64 {
633 self.time.get_time_us()
634 }
635
636 fn timestamp_ms(&self) -> u64 {
637 self.time.get_time_ms()
638 }
639
640 fn timestamp(&self) -> f64 {
641 self.time.get_time()
642 }
643
644 fn timer_names(&self) -> Vec<&str> {
645 self.timers
646 .iter()
647 .filter(|(_, timer)| !timer.is_expired())
648 .map(|(k, _)| k.as_str())
649 .collect()
650 }
651
652 fn timer_count(&self) -> usize {
653 self.timers
654 .iter()
655 .filter(|(_, timer)| !timer.is_expired())
656 .count()
657 }
658
659 fn register_default_handler(&mut self, handler: TimeEventCallback) {
660 self.default_callback = Some(handler);
661 }
662
663 #[allow(unused_variables)]
668 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
669 let callback = self
671 .callbacks
672 .get(&event.name)
673 .cloned()
674 .or_else(|| self.default_callback.clone())
675 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
676
677 TimeEventHandlerV2::new(event, callback)
678 }
679
680 fn set_time_alert_ns(
681 &mut self,
682 name: &str,
683 mut alert_time_ns: UnixNanos, callback: Option<TimeEventCallback>,
685 allow_past: Option<bool>,
686 ) -> anyhow::Result<()> {
687 check_valid_string(name, stringify!(name))?;
688
689 let name = Ustr::from(name);
690 let allow_past = allow_past.unwrap_or(true);
691
692 check_predicate_true(
693 callback.is_some()
694 | self.callbacks.contains_key(&name)
695 | self.default_callback.is_some(),
696 "No callbacks provided",
697 )?;
698
699 let callback = match callback {
700 Some(callback) => callback,
701 None => {
702 if self.callbacks.contains_key(&name) {
703 self.callbacks.get(&name).unwrap().clone()
704 } else {
705 self.default_callback.clone().unwrap()
706 }
707 }
708 };
709
710 self.cancel_timer(name.as_str());
712
713 let ts_now = self.get_time_ns();
714
715 if alert_time_ns < ts_now {
717 if allow_past {
718 alert_time_ns = ts_now;
719 log::warn!(
720 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
721 alert_time_ns.to_rfc3339(),
722 );
723 } else {
724 anyhow::bail!(
725 "Timer '{name}' alert time {} was in the past (current time is {})",
726 alert_time_ns.to_rfc3339(),
727 ts_now.to_rfc3339(),
728 );
729 }
730 }
731
732 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
734
735 let mut timer = LiveTimer::new(
736 name,
737 interval_ns,
738 ts_now,
739 Some(alert_time_ns),
740 callback,
741 false,
742 self.sender.clone(),
743 );
744
745 timer.start();
746
747 self.clear_expired_timers();
748 self.timers.insert(name, timer);
749
750 Ok(())
751 }
752
753 fn set_timer_ns(
754 &mut self,
755 name: &str,
756 interval_ns: u64,
757 start_time_ns: Option<UnixNanos>,
758 stop_time_ns: Option<UnixNanos>,
759 callback: Option<TimeEventCallback>,
760 allow_past: Option<bool>,
761 fire_immediately: Option<bool>,
762 ) -> anyhow::Result<()> {
763 check_valid_string(name, stringify!(name))?;
764 check_positive_u64(interval_ns, stringify!(interval_ns))?;
765 check_predicate_true(
766 callback.is_some() | self.default_callback.is_some(),
767 "No callbacks provided",
768 )?;
769
770 let name = Ustr::from(name);
771 let allow_past = allow_past.unwrap_or(true);
772 let fire_immediately = fire_immediately.unwrap_or(false);
773
774 let callback = match callback {
775 Some(callback) => callback,
776 None => self.default_callback.clone().unwrap(),
777 };
778
779 self.callbacks.insert(name, callback.clone());
780
781 let mut start_time_ns = start_time_ns.unwrap_or_default();
782 let ts_now = self.get_time_ns();
783
784 if start_time_ns == 0 {
785 start_time_ns = self.timestamp_ns();
787 } else if start_time_ns < ts_now && !allow_past {
788 anyhow::bail!(
789 "Timer '{name}' start time {} was in the past (current time is {})",
790 start_time_ns.to_rfc3339(),
791 ts_now.to_rfc3339(),
792 );
793 }
794
795 if let Some(stop_time) = stop_time_ns {
796 if stop_time <= start_time_ns {
797 anyhow::bail!(
798 "Timer '{name}' stop time {} must be after start time {}",
799 stop_time.to_rfc3339(),
800 start_time_ns.to_rfc3339(),
801 );
802 }
803 }
804
805 let interval_ns = create_valid_interval(interval_ns);
806
807 let mut timer = LiveTimer::new(
808 name,
809 interval_ns,
810 start_time_ns,
811 stop_time_ns,
812 callback,
813 fire_immediately,
814 self.sender.clone(),
815 );
816 timer.start();
817
818 self.clear_expired_timers();
819 self.timers.insert(name, timer);
820
821 Ok(())
822 }
823
824 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
825 self.timers
826 .get(&Ustr::from(name))
827 .map(|timer| timer.next_time_ns())
828 }
829
830 fn cancel_timer(&mut self, name: &str) {
831 let timer = self.timers.remove(&Ustr::from(name));
832 if let Some(mut timer) = timer {
833 timer.cancel();
834 }
835 }
836
837 fn cancel_timers(&mut self) {
838 for timer in &mut self.timers.values_mut() {
839 timer.cancel();
840 }
841
842 self.timers.clear();
843 }
844
845 fn reset(&mut self) {
846 self.timers.clear();
847 self.callbacks.clear();
848 }
849}
850
851#[derive(Debug)]
853pub struct TimeEventStream {
854 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
855}
856
857impl TimeEventStream {
858 pub const fn new(heap: Arc<Mutex<BinaryHeap<TimeEvent>>>) -> Self {
859 Self { heap }
860 }
861}
862
863impl Stream for TimeEventStream {
864 type Item = TimeEvent;
865
866 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
867 let mut heap = match self.heap.try_lock() {
868 Ok(guard) => guard,
869 Err(e) => {
870 tracing::error!("Unable to get LiveClock heap lock: {e}");
871 cx.waker().wake_by_ref();
872 return Poll::Pending;
873 }
874 };
875
876 if let Some(event) = heap.pop() {
877 Poll::Ready(Some(event))
878 } else {
879 cx.waker().wake_by_ref();
880 Poll::Pending
881 }
882 }
883}
884
885#[cfg(test)]
889mod tests {
890 use std::{cell::RefCell, rc::Rc};
891
892 use rstest::{fixture, rstest};
893
894 use super::*;
895
896 #[derive(Default)]
897 struct TestCallback {
898 called: Rc<RefCell<bool>>,
899 }
900
901 impl TestCallback {
902 const fn new(called: Rc<RefCell<bool>>) -> Self {
903 Self { called }
904 }
905 }
906
907 impl From<TestCallback> for TimeEventCallback {
908 fn from(callback: TestCallback) -> Self {
909 Self::Rust(Rc::new(move |_event: TimeEvent| {
910 *callback.called.borrow_mut() = true;
911 }))
912 }
913 }
914
915 #[fixture]
916 pub fn test_clock() -> TestClock {
917 let mut clock = TestClock::new();
918 clock.register_default_handler(TestCallback::default().into());
919 clock
920 }
921
922 #[rstest]
923 fn test_time_monotonicity(mut test_clock: TestClock) {
924 let initial_time = test_clock.timestamp_ns();
925 test_clock.advance_time((*initial_time + 1000).into(), true);
926 assert!(test_clock.timestamp_ns() > initial_time);
927 }
928
929 #[rstest]
930 fn test_timer_registration(mut test_clock: TestClock) {
931 test_clock
932 .set_time_alert_ns(
933 "test_timer",
934 (*test_clock.timestamp_ns() + 1000).into(),
935 None,
936 None,
937 )
938 .unwrap();
939 assert_eq!(test_clock.timer_count(), 1);
940 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
941 }
942
943 #[rstest]
944 fn test_timer_expiration(mut test_clock: TestClock) {
945 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
946 test_clock
947 .set_time_alert_ns("test_timer", alert_time, None, None)
948 .unwrap();
949 let events = test_clock.advance_time(alert_time, true);
950 assert_eq!(events.len(), 1);
951 assert_eq!(events[0].name.as_str(), "test_timer");
952 }
953
954 #[rstest]
955 fn test_timer_cancellation(mut test_clock: TestClock) {
956 test_clock
957 .set_time_alert_ns(
958 "test_timer",
959 (*test_clock.timestamp_ns() + 1000).into(),
960 None,
961 None,
962 )
963 .unwrap();
964 assert_eq!(test_clock.timer_count(), 1);
965 test_clock.cancel_timer("test_timer");
966 assert_eq!(test_clock.timer_count(), 0);
967 }
968
969 #[rstest]
970 fn test_time_advancement(mut test_clock: TestClock) {
971 let start_time = test_clock.timestamp_ns();
972 test_clock
973 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
974 .unwrap();
975 let events = test_clock.advance_time((*start_time + 2500).into(), true);
976 assert_eq!(events.len(), 2);
977 assert_eq!(*events[0].ts_event, *start_time + 1000);
978 assert_eq!(*events[1].ts_event, *start_time + 2000);
979 }
980
981 #[rstest]
982 fn test_default_and_custom_callbacks() {
983 let mut clock = TestClock::new();
984 let default_called = Rc::new(RefCell::new(false));
985 let custom_called = Rc::new(RefCell::new(false));
986
987 let default_callback = TestCallback::new(Rc::clone(&default_called));
988 let custom_callback = TestCallback::new(Rc::clone(&custom_called));
989
990 clock.register_default_handler(TimeEventCallback::from(default_callback));
991 clock
992 .set_time_alert_ns(
993 "default_timer",
994 (*clock.timestamp_ns() + 1000).into(),
995 None,
996 None,
997 )
998 .unwrap();
999 clock
1000 .set_time_alert_ns(
1001 "custom_timer",
1002 (*clock.timestamp_ns() + 1000).into(),
1003 Some(TimeEventCallback::from(custom_callback)),
1004 None,
1005 )
1006 .unwrap();
1007
1008 let events = clock.advance_time((*clock.timestamp_ns() + 1000).into(), true);
1009 let handlers = clock.match_handlers(events);
1010
1011 for handler in handlers {
1012 handler.callback.call(handler.event);
1013 }
1014
1015 assert!(*default_called.borrow());
1016 assert!(*custom_called.borrow());
1017 }
1018
1019 #[rstest]
1020 fn test_multiple_timers(mut test_clock: TestClock) {
1021 let start_time = test_clock.timestamp_ns();
1022 test_clock
1023 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
1024 .unwrap();
1025 test_clock
1026 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
1027 .unwrap();
1028 let events = test_clock.advance_time((*start_time + 2000).into(), true);
1029 assert_eq!(events.len(), 3);
1030 assert_eq!(events[0].name.as_str(), "timer1");
1031 assert_eq!(events[1].name.as_str(), "timer1");
1032 assert_eq!(events[2].name.as_str(), "timer2");
1033 }
1034
1035 #[rstest]
1036 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
1037 test_clock.set_time(UnixNanos::from(2000));
1038 let current_time = test_clock.timestamp_ns();
1039 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
1040
1041 test_clock
1043 .set_time_alert_ns("past_timer", past_time, None, Some(true))
1044 .unwrap();
1045
1046 assert_eq!(test_clock.timer_count(), 1);
1048 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
1049
1050 let next_time = test_clock.next_time_ns("past_timer").unwrap();
1052 assert!(next_time >= current_time);
1053 }
1054
1055 #[rstest]
1056 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
1057 test_clock.set_time(UnixNanos::from(2000));
1058 let current_time = test_clock.timestamp_ns();
1059 let past_time = current_time - 1000;
1060
1061 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
1063
1064 assert!(result.is_err());
1066 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
1067
1068 assert_eq!(test_clock.timer_count(), 0);
1070 assert!(test_clock.timer_names().is_empty());
1071 }
1072
1073 #[rstest]
1074 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
1075 test_clock.set_time(UnixNanos::from(2000));
1076 let current_time = test_clock.timestamp_ns();
1077 let start_time = current_time + 1000;
1078 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
1082 "invalid_timer",
1083 100,
1084 Some(start_time),
1085 Some(stop_time),
1086 None,
1087 None,
1088 None,
1089 );
1090
1091 assert!(result.is_err());
1093 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
1094
1095 assert_eq!(test_clock.timer_count(), 0);
1097 }
1098
1099 #[rstest]
1100 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
1101 let start_time = test_clock.timestamp_ns();
1102 let interval_ns = 1000;
1103
1104 test_clock
1105 .set_timer_ns(
1106 "fire_immediately_timer",
1107 interval_ns,
1108 Some(start_time),
1109 None,
1110 None,
1111 None,
1112 Some(true),
1113 )
1114 .unwrap();
1115
1116 let events = test_clock.advance_time((start_time + 2500).into(), true);
1118
1119 assert_eq!(events.len(), 3);
1121 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
1125
1126 #[rstest]
1127 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
1128 let start_time = test_clock.timestamp_ns();
1129 let interval_ns = 1000;
1130
1131 test_clock
1132 .set_timer_ns(
1133 "normal_timer",
1134 interval_ns,
1135 Some(start_time),
1136 None,
1137 None,
1138 None,
1139 Some(false),
1140 )
1141 .unwrap();
1142
1143 let events = test_clock.advance_time((start_time + 2500).into(), true);
1145
1146 assert_eq!(events.len(), 2);
1148 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
1151
1152 #[rstest]
1153 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1154 let start_time = test_clock.timestamp_ns();
1155 let interval_ns = 1000;
1156
1157 test_clock
1159 .set_timer_ns(
1160 "default_timer",
1161 interval_ns,
1162 Some(start_time),
1163 None,
1164 None,
1165 None,
1166 None,
1167 )
1168 .unwrap();
1169
1170 let events = test_clock.advance_time((start_time + 1500).into(), true);
1171
1172 assert_eq!(events.len(), 1);
1174 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1176
1177 #[rstest]
1178 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1179 test_clock.set_time(5000.into());
1180 let interval_ns = 1000;
1181
1182 test_clock
1183 .set_timer_ns(
1184 "zero_start_timer",
1185 interval_ns,
1186 None,
1187 None,
1188 None,
1189 None,
1190 Some(true),
1191 )
1192 .unwrap();
1193
1194 let events = test_clock.advance_time(7000.into(), true);
1195
1196 assert_eq!(events.len(), 3);
1199 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1201 assert_eq!(*events[2].ts_event, 7000);
1202 }
1203
1204 #[rstest]
1205 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1206 let start_time = test_clock.timestamp_ns();
1207 let interval_ns = 1000;
1208
1209 test_clock
1211 .set_timer_ns(
1212 "immediate_timer",
1213 interval_ns,
1214 Some(start_time),
1215 None,
1216 None,
1217 None,
1218 Some(true),
1219 )
1220 .unwrap();
1221
1222 test_clock
1224 .set_timer_ns(
1225 "normal_timer",
1226 interval_ns,
1227 Some(start_time),
1228 None,
1229 None,
1230 None,
1231 Some(false),
1232 )
1233 .unwrap();
1234
1235 let events = test_clock.advance_time((start_time + 1500).into(), true);
1236
1237 assert_eq!(events.len(), 3);
1239
1240 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1242 event_times.sort();
1243
1244 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1248
1249 #[rstest]
1250 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1251 let start_time = test_clock.timestamp_ns();
1252
1253 test_clock
1255 .set_timer_ns(
1256 "collision_timer",
1257 1000,
1258 Some(start_time),
1259 None,
1260 None,
1261 None,
1262 None,
1263 )
1264 .unwrap();
1265
1266 let result = test_clock.set_timer_ns(
1268 "collision_timer",
1269 2000,
1270 Some(start_time),
1271 None,
1272 None,
1273 None,
1274 None,
1275 );
1276
1277 assert!(result.is_ok());
1278 assert_eq!(test_clock.timer_count(), 1);
1280
1281 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1283 assert_eq!(next_time, start_time + 2000);
1285 }
1286
1287 #[rstest]
1288 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1289 let start_time = test_clock.timestamp_ns();
1290
1291 let result =
1293 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1294
1295 assert!(result.is_err());
1296 assert_eq!(test_clock.timer_count(), 0);
1297 }
1298
1299 #[rstest]
1300 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1301 let start_time = test_clock.timestamp_ns();
1302
1303 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1305
1306 assert!(result.is_err());
1307 assert_eq!(test_clock.timer_count(), 0);
1308 }
1309
1310 #[rstest]
1311 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1312 let start_time = test_clock.timestamp_ns();
1313 let interval_ns = 1000;
1314 let stop_time = start_time + interval_ns; test_clock
1317 .set_timer_ns(
1318 "exact_stop",
1319 interval_ns,
1320 Some(start_time),
1321 Some(stop_time.into()),
1322 None,
1323 None,
1324 Some(true),
1325 )
1326 .unwrap();
1327
1328 let events = test_clock.advance_time(stop_time.into(), true);
1329
1330 assert_eq!(events.len(), 2);
1332 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1335
1336 #[rstest]
1337 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1338 let start_time = test_clock.timestamp_ns();
1339 let interval_ns = 1000;
1340
1341 test_clock
1342 .set_timer_ns(
1343 "exact_advance",
1344 interval_ns,
1345 Some(start_time),
1346 None,
1347 None,
1348 None,
1349 Some(false),
1350 )
1351 .unwrap();
1352
1353 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1355 let events = test_clock.advance_time(next_time, true);
1356
1357 assert_eq!(events.len(), 1);
1358 assert_eq!(*events[0].ts_event, *next_time);
1359 }
1360
1361 #[rstest]
1362 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1363 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1373 "bar_timer",
1374 interval_ns,
1375 Some(bar_start_time),
1376 None,
1377 None,
1378 Some(false), Some(false), );
1381
1382 assert!(result.is_ok());
1384 assert_eq!(test_clock.timer_count(), 1);
1385
1386 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1388 assert_eq!(*next_time, 101_000);
1389 }
1390
1391 #[rstest]
1392 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1393 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1402 "past_event_timer",
1403 interval_ns,
1404 Some(past_start_time),
1405 None,
1406 None,
1407 Some(false), Some(false), );
1410
1411 assert!(result.is_err());
1413 assert!(
1414 result
1415 .unwrap_err()
1416 .to_string()
1417 .contains("would be in the past")
1418 );
1419 }
1420
1421 #[rstest]
1422 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1423 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1427
1428 let result = test_clock.set_timer_ns(
1431 "immediate_past_timer",
1432 interval_ns,
1433 Some(past_start_time),
1434 None,
1435 None,
1436 Some(false), Some(true), );
1439
1440 assert!(result.is_err());
1442 assert!(
1443 result
1444 .unwrap_err()
1445 .to_string()
1446 .contains("would be in the past")
1447 );
1448 }
1449
1450 #[rstest]
1451 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1452 let start_time = test_clock.timestamp_ns();
1453
1454 test_clock
1455 .set_timer_ns(
1456 "cancel_test",
1457 1000,
1458 Some(start_time),
1459 None,
1460 None,
1461 None,
1462 None,
1463 )
1464 .unwrap();
1465
1466 assert_eq!(test_clock.timer_count(), 1);
1467
1468 test_clock.cancel_timer("cancel_test");
1470
1471 assert_eq!(test_clock.timer_count(), 0);
1472
1473 let events = test_clock.advance_time((start_time + 2000).into(), true);
1475 assert_eq!(events.len(), 0);
1476 }
1477
1478 #[rstest]
1479 fn test_cancel_all_timers(mut test_clock: TestClock) {
1480 test_clock
1482 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1483 .unwrap();
1484 test_clock
1485 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1486 .unwrap();
1487 test_clock
1488 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1489 .unwrap();
1490
1491 assert_eq!(test_clock.timer_count(), 3);
1492
1493 test_clock.cancel_timers();
1495
1496 assert_eq!(test_clock.timer_count(), 0);
1497
1498 let events = test_clock.advance_time(5000.into(), true);
1500 assert_eq!(events.len(), 0);
1501 }
1502
1503 #[rstest]
1504 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1505 test_clock
1506 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1507 .unwrap();
1508
1509 assert_eq!(test_clock.timer_count(), 1);
1510
1511 test_clock.reset();
1513
1514 assert_eq!(test_clock.timer_count(), 0);
1515 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1517
1518 #[rstest]
1519 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1520 let current_time = test_clock.utc_now();
1521 let alert_time = current_time + chrono::Duration::seconds(1);
1522
1523 test_clock
1525 .set_time_alert("alert_test", alert_time, None, None)
1526 .unwrap();
1527
1528 assert_eq!(test_clock.timer_count(), 1);
1529 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1530
1531 let expected_ns = UnixNanos::from(alert_time);
1533 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1534
1535 let diff = if next_time >= expected_ns {
1537 next_time.as_u64() - expected_ns.as_u64()
1538 } else {
1539 expected_ns.as_u64() - next_time.as_u64()
1540 };
1541 assert!(
1542 diff < 1000,
1543 "Timer should be set within 1 microsecond of expected time"
1544 );
1545 }
1546
1547 #[rstest]
1548 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1549 let current_time = test_clock.utc_now();
1550 let start_time = current_time + chrono::Duration::seconds(1);
1551 let interval = Duration::from_millis(500);
1552
1553 test_clock
1555 .set_timer(
1556 "timer_test",
1557 interval,
1558 Some(start_time),
1559 None,
1560 None,
1561 None,
1562 None,
1563 )
1564 .unwrap();
1565
1566 assert_eq!(test_clock.timer_count(), 1);
1567 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1568
1569 let start_ns = UnixNanos::from(start_time);
1571 let interval_ns = interval.as_nanos() as u64;
1572
1573 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1574 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1578 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1579 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1580 }
1581
1582 #[rstest]
1583 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1584 let current_time = test_clock.utc_now();
1585 let start_time = current_time + chrono::Duration::seconds(1);
1586 let stop_time = current_time + chrono::Duration::seconds(3);
1587 let interval = Duration::from_secs(1);
1588
1589 test_clock
1591 .set_timer(
1592 "timer_with_stop",
1593 interval,
1594 Some(start_time),
1595 Some(stop_time),
1596 None,
1597 None,
1598 None,
1599 )
1600 .unwrap();
1601
1602 assert_eq!(test_clock.timer_count(), 1);
1603
1604 let stop_ns = UnixNanos::from(stop_time);
1606 let events = test_clock.advance_time(stop_ns + 1000, true);
1607
1608 assert_eq!(events.len(), 2);
1610
1611 let start_ns = UnixNanos::from(start_time);
1612 let interval_ns = interval.as_nanos() as u64;
1613 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1614 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1615 }
1616
1617 #[rstest]
1618 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1619 let current_time = test_clock.utc_now();
1620 let start_time = current_time + chrono::Duration::seconds(1);
1621 let interval = Duration::from_millis(500);
1622
1623 test_clock
1625 .set_timer(
1626 "immediate_timer",
1627 interval,
1628 Some(start_time),
1629 None,
1630 None,
1631 None,
1632 Some(true),
1633 )
1634 .unwrap();
1635
1636 let start_ns = UnixNanos::from(start_time);
1637 let interval_ns = interval.as_nanos() as u64;
1638
1639 let events = test_clock.advance_time(start_ns + interval_ns, true);
1641
1642 assert_eq!(events.len(), 2);
1644 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1647}