1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::{
24 Arc,
25 atomic::{self, AtomicU64},
26 },
27};
28
29use nautilus_core::{
30 UUID4, UnixNanos,
31 correctness::{FAILED, check_valid_string},
32 datetime::floor_to_nearest_microsecond,
33 time::get_atomic_clock_realtime,
34};
35#[cfg(feature = "python")]
36use pyo3::{PyObject, Python};
37use tokio::{
38 task::JoinHandle,
39 time::{Duration, Instant},
40};
41use ustr::Ustr;
42
43use crate::{runner::TimeEventSender, runtime::get_runtime};
44
45#[must_use]
51pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
52 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
53}
54
55#[repr(C)]
56#[derive(Clone, Debug)]
57#[cfg_attr(
58 feature = "python",
59 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
60)]
61#[derive(Eq)]
66pub struct TimeEvent {
67 pub name: Ustr,
69 pub event_id: UUID4,
71 pub ts_event: UnixNanos,
73 pub ts_init: UnixNanos,
75}
76
77impl PartialOrd for TimeEvent {
79 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80 Some(self.cmp(other))
81 }
82}
83
84impl Ord for TimeEvent {
86 fn cmp(&self, other: &Self) -> Ordering {
87 other.ts_event.cmp(&self.ts_event)
88 }
89}
90
91impl TimeEvent {
92 #[must_use]
98 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
99 Self {
100 name,
101 event_id,
102 ts_event,
103 ts_init,
104 }
105 }
106}
107
108impl Display for TimeEvent {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 write!(
111 f,
112 "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
113 self.name, self.event_id, self.ts_event, self.ts_init
114 )
115 }
116}
117
118impl PartialEq for TimeEvent {
119 fn eq(&self, other: &Self) -> bool {
120 self.event_id == other.event_id
121 }
122}
123
124pub type RustTimeEventCallback = dyn Fn(TimeEvent);
125
126#[derive(Clone)]
127pub enum TimeEventCallback {
128 #[cfg(feature = "python")]
129 Python(Arc<PyObject>),
130 Rust(Rc<RustTimeEventCallback>),
131}
132
133impl Debug for TimeEventCallback {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 match self {
136 #[cfg(feature = "python")]
137 Self::Python(_) => f.write_str("Python callback"),
138 Self::Rust(_) => f.write_str("Rust callback"),
139 }
140 }
141}
142
143impl TimeEventCallback {
144 pub fn call(&self, event: TimeEvent) {
150 match self {
151 #[cfg(feature = "python")]
152 Self::Python(callback) => {
153 Python::with_gil(|py| {
154 callback.call1(py, (event,)).unwrap();
155 });
156 }
157 Self::Rust(callback) => callback(event),
158 }
159 }
160}
161
162impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
163 fn from(value: Rc<RustTimeEventCallback>) -> Self {
164 Self::Rust(value)
165 }
166}
167
168#[cfg(feature = "python")]
169impl From<PyObject> for TimeEventCallback {
170 fn from(value: PyObject) -> Self {
171 Self::Python(Arc::new(value))
172 }
173}
174
175#[allow(unsafe_code)]
182unsafe impl Send for TimeEventCallback {}
183#[allow(unsafe_code)]
184unsafe impl Sync for TimeEventCallback {}
185
186#[repr(C)]
187#[derive(Clone, Debug)]
188pub struct TimeEventHandlerV2 {
193 pub event: TimeEvent,
195 pub callback: TimeEventCallback,
197}
198
199impl TimeEventHandlerV2 {
200 #[must_use]
202 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
203 Self { event, callback }
204 }
205
206 pub fn run(self) {
212 let Self { event, callback } = self;
213 callback.call(event);
214 }
215}
216
217impl PartialOrd for TimeEventHandlerV2 {
218 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
219 Some(self.cmp(other))
220 }
221}
222
223impl PartialEq for TimeEventHandlerV2 {
224 fn eq(&self, other: &Self) -> bool {
225 self.event.ts_event == other.event.ts_event
226 }
227}
228
229impl Eq for TimeEventHandlerV2 {}
230
231impl Ord for TimeEventHandlerV2 {
232 fn cmp(&self, other: &Self) -> Ordering {
233 self.event.ts_event.cmp(&other.event.ts_event)
234 }
235}
236
237#[derive(Clone, Copy, Debug)]
242pub struct TestTimer {
243 pub name: Ustr,
245 pub interval_ns: NonZeroU64,
247 pub start_time_ns: UnixNanos,
249 pub stop_time_ns: Option<UnixNanos>,
251 pub fire_immediately: bool,
253 next_time_ns: UnixNanos,
254 is_expired: bool,
255}
256
257impl TestTimer {
258 #[must_use]
264 pub fn new(
265 name: Ustr,
266 interval_ns: NonZeroU64,
267 start_time_ns: UnixNanos,
268 stop_time_ns: Option<UnixNanos>,
269 fire_immediately: bool,
270 ) -> Self {
271 check_valid_string(name, stringify!(name)).expect(FAILED);
272
273 let next_time_ns = if fire_immediately {
274 start_time_ns
275 } else {
276 start_time_ns + interval_ns.get()
277 };
278
279 Self {
280 name,
281 interval_ns,
282 start_time_ns,
283 stop_time_ns,
284 fire_immediately,
285 next_time_ns,
286 is_expired: false,
287 }
288 }
289
290 #[must_use]
292 pub const fn next_time_ns(&self) -> UnixNanos {
293 self.next_time_ns
294 }
295
296 #[must_use]
298 pub const fn is_expired(&self) -> bool {
299 self.is_expired
300 }
301
302 #[must_use]
303 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
304 TimeEvent {
305 name: self.name,
306 event_id,
307 ts_event: self.next_time_ns,
308 ts_init,
309 }
310 }
311
312 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
318 let advances = if self.next_time_ns <= to_time_ns {
320 (to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get() + 1
321 } else {
322 0
323 };
324 self.take(advances as usize).map(|(event, _)| event)
325 }
326
327 pub const fn cancel(&mut self) {
331 self.is_expired = true;
332 }
333}
334
335impl Iterator for TestTimer {
336 type Item = (TimeEvent, UnixNanos);
337
338 fn next(&mut self) -> Option<Self::Item> {
339 if self.is_expired {
340 None
341 } else {
342 if let Some(stop_time_ns) = self.stop_time_ns {
344 if self.next_time_ns > stop_time_ns {
345 self.is_expired = true;
346 return None;
347 }
348 }
349
350 let item = (
351 TimeEvent {
352 name: self.name,
353 event_id: UUID4::new(),
354 ts_event: self.next_time_ns,
355 ts_init: self.next_time_ns,
356 },
357 self.next_time_ns,
358 );
359
360 if let Some(stop_time_ns) = self.stop_time_ns {
362 if self.next_time_ns == stop_time_ns {
363 self.is_expired = true;
364 }
365 }
366
367 self.next_time_ns += self.interval_ns;
368
369 Some(item)
370 }
371 }
372}
373
374#[derive(Debug)]
379pub struct LiveTimer {
380 pub name: Ustr,
382 pub interval_ns: NonZeroU64,
384 pub start_time_ns: UnixNanos,
386 pub stop_time_ns: Option<UnixNanos>,
388 pub fire_immediately: bool,
390 next_time_ns: Arc<AtomicU64>,
391 callback: TimeEventCallback,
392 task_handle: Option<JoinHandle<()>>,
393 sender: Option<Arc<dyn TimeEventSender>>,
394}
395
396impl LiveTimer {
397 #[allow(clippy::too_many_arguments)]
403 #[must_use]
404 pub fn new(
405 name: Ustr,
406 interval_ns: NonZeroU64,
407 start_time_ns: UnixNanos,
408 stop_time_ns: Option<UnixNanos>,
409 callback: TimeEventCallback,
410 fire_immediately: bool,
411 sender: Option<Arc<dyn TimeEventSender>>,
412 ) -> Self {
413 check_valid_string(name, stringify!(name)).expect(FAILED);
414
415 let next_time_ns = if fire_immediately {
416 start_time_ns.as_u64()
417 } else {
418 start_time_ns.as_u64() + interval_ns.get()
419 };
420
421 log::debug!("Creating timer '{name}'");
422
423 Self {
424 name,
425 interval_ns,
426 start_time_ns,
427 stop_time_ns,
428 fire_immediately,
429 next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
430 callback,
431 task_handle: None,
432 sender,
433 }
434 }
435
436 #[must_use]
440 pub fn next_time_ns(&self) -> UnixNanos {
441 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
442 }
443
444 #[must_use]
449 pub fn is_expired(&self) -> bool {
450 self.task_handle
451 .as_ref()
452 .is_some_and(tokio::task::JoinHandle::is_finished)
453 }
454
455 #[allow(unused_variables)] pub fn start(&mut self) {
465 let event_name = self.name;
466 let stop_time_ns = self.stop_time_ns;
467 let interval_ns = self.interval_ns.get();
468 let callback = self.callback.clone();
469
470 let clock = get_atomic_clock_realtime();
472 let now_ns = clock.get_time_ns();
473
474 let mut next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
476 if next_time_ns <= now_ns {
477 log::warn!(
478 "Timer '{event_name}' alert time {next_time_ns} was in the past, adjusted to current time for immediate fire"
479 );
480 next_time_ns = now_ns.into();
481 self.next_time_ns
482 .store(now_ns.as_u64(), atomic::Ordering::SeqCst);
483 }
484
485 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
487 let next_time_atomic = self.next_time_ns.clone();
488
489 let sender = self.sender.clone();
490
491 let rt = get_runtime();
492 let handle = rt.spawn(async move {
493 let clock = get_atomic_clock_realtime();
494
495 let overhead = Duration::from_millis(1);
497 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
498 let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
499 let start = Instant::now() + delay;
500
501 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
502
503 loop {
504 timer.tick().await;
507 let now_ns = clock.get_time_ns();
508
509 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
510
511 match callback {
512 #[cfg(feature = "python")]
513 TimeEventCallback::Python(ref callback) => {
514 call_python_with_time_event(event, callback);
515 }
516 TimeEventCallback::Rust(_) => {
517 let sender = sender
518 .as_ref()
519 .expect("timer event sender was unset for Rust callback system");
520 let handler = TimeEventHandlerV2::new(event, callback.clone());
521 sender.send(handler);
522 }
523 }
524
525 next_time_ns += interval_ns;
527 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
528
529 if let Some(stop_time_ns) = stop_time_ns {
531 if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
532 break; }
534 }
535 }
536 });
537
538 self.task_handle = Some(handle);
539 }
540
541 pub fn cancel(&mut self) {
545 log::debug!("Cancel timer '{}'", self.name);
546 if let Some(ref handle) = self.task_handle {
547 handle.abort();
548 }
549 }
550}
551
552#[cfg(feature = "python")]
553fn call_python_with_time_event(event: TimeEvent, callback: &PyObject) {
554 use nautilus_core::python::IntoPyObjectPoseiExt;
555 use pyo3::types::PyCapsule;
556
557 Python::with_gil(|py| {
558 let capsule: PyObject = PyCapsule::new(py, event, None)
560 .expect("Error creating `PyCapsule`")
561 .into_py_any_unwrap(py);
562
563 match callback.call1(py, (capsule,)) {
564 Ok(_) => {}
565 Err(e) => tracing::error!("Error on callback: {e:?}"),
566 }
567 });
568}
569
570#[cfg(test)]
574mod tests {
575 use std::{num::NonZeroU64, rc::Rc};
576
577 use nautilus_core::UnixNanos;
578 use rstest::*;
579 use ustr::Ustr;
580
581 use super::{LiveTimer, TestTimer, TimeEvent, TimeEventCallback};
582
583 #[rstest]
584 fn test_test_timer_pop_event() {
585 let mut timer = TestTimer::new(
586 Ustr::from("TEST_TIMER"),
587 NonZeroU64::new(1).unwrap(),
588 UnixNanos::from(1),
589 None,
590 false,
591 );
592
593 assert!(timer.next().is_some());
594 assert!(timer.next().is_some());
595 timer.is_expired = true;
596 assert!(timer.next().is_none());
597 }
598
599 #[rstest]
600 fn test_test_timer_advance_within_next_time_ns() {
601 let mut timer = TestTimer::new(
602 Ustr::from("TEST_TIMER"),
603 NonZeroU64::new(5).unwrap(),
604 UnixNanos::default(),
605 None,
606 false,
607 );
608 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
609 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
610 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
611 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
612 assert_eq!(timer.next_time_ns, 5);
613 assert!(!timer.is_expired);
614 }
615
616 #[rstest]
617 fn test_test_timer_advance_up_to_next_time_ns() {
618 let mut timer = TestTimer::new(
619 Ustr::from("TEST_TIMER"),
620 NonZeroU64::new(1).unwrap(),
621 UnixNanos::default(),
622 None,
623 false,
624 );
625 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
626 assert!(!timer.is_expired);
627 }
628
629 #[rstest]
630 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
631 let mut timer = TestTimer::new(
632 Ustr::from("TEST_TIMER"),
633 NonZeroU64::new(1).unwrap(),
634 UnixNanos::default(),
635 Some(UnixNanos::from(2)),
636 false,
637 );
638 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
639 assert!(timer.is_expired);
640 }
641
642 #[rstest]
643 fn test_test_timer_advance_beyond_next_time_ns() {
644 let mut timer = TestTimer::new(
645 Ustr::from("TEST_TIMER"),
646 NonZeroU64::new(1).unwrap(),
647 UnixNanos::default(),
648 Some(UnixNanos::from(5)),
649 false,
650 );
651 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
652 assert!(timer.is_expired);
653 }
654
655 #[rstest]
656 fn test_test_timer_advance_beyond_stop_time() {
657 let mut timer = TestTimer::new(
658 Ustr::from("TEST_TIMER"),
659 NonZeroU64::new(1).unwrap(),
660 UnixNanos::default(),
661 Some(UnixNanos::from(5)),
662 false,
663 );
664 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
665 assert!(timer.is_expired);
666 }
667
668 #[rstest]
669 fn test_test_timer_advance_exact_boundary() {
670 let mut timer = TestTimer::new(
671 Ustr::from("TEST_TIMER"),
672 NonZeroU64::new(5).unwrap(),
673 UnixNanos::from(0),
674 None,
675 false,
676 );
677 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
678 assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
679
680 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
681 assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
682 }
683
684 #[rstest]
685 fn test_test_timer_fire_immediately_true() {
686 let mut timer = TestTimer::new(
687 Ustr::from("TEST_TIMER"),
688 NonZeroU64::new(5).unwrap(),
689 UnixNanos::from(10),
690 None,
691 true, );
693
694 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
696
697 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
699 assert_eq!(events.len(), 1);
700 assert_eq!(events[0].ts_event, UnixNanos::from(10));
701
702 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
704 }
705
706 #[rstest]
707 fn test_test_timer_fire_immediately_false() {
708 let mut timer = TestTimer::new(
709 Ustr::from("TEST_TIMER"),
710 NonZeroU64::new(5).unwrap(),
711 UnixNanos::from(10),
712 None,
713 false, );
715
716 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
718
719 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
721 assert_eq!(events.len(), 0);
722
723 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
725 assert_eq!(events.len(), 1);
726 assert_eq!(events[0].ts_event, UnixNanos::from(15));
727 }
728
729 #[rstest]
730 fn test_live_timer_fire_immediately_field() {
731 let timer = LiveTimer::new(
732 Ustr::from("TEST_TIMER"),
733 NonZeroU64::new(1000).unwrap(),
734 UnixNanos::from(100),
735 None,
736 TimeEventCallback::Rust(Rc::new(|_| {})),
737 true, None, );
740
741 assert!(timer.fire_immediately);
743
744 assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
746 }
747
748 #[rstest]
749 fn test_live_timer_fire_immediately_false_field() {
750 let timer = LiveTimer::new(
751 Ustr::from("TEST_TIMER"),
752 NonZeroU64::new(1000).unwrap(),
753 UnixNanos::from(100),
754 None,
755 TimeEventCallback::Rust(Rc::new(|_| {})),
756 false, None, );
759
760 assert!(!timer.fire_immediately);
762
763 assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
765 }
766
767 use proptest::prelude::*;
772
773 #[derive(Clone, Debug)]
774 enum TimerOperation {
775 AdvanceTime(u64),
776 Cancel,
777 }
778
779 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
780 prop_oneof![
781 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
782 2 => Just(TimerOperation::Cancel),
783 ]
784 }
785
786 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
787 (
788 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
793 }
794
795 fn timer_test_strategy()
796 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
797 (
798 prop::collection::vec(timer_operation_strategy(), 5..=50),
799 timer_config_strategy(),
800 )
801 }
802
803 fn test_timer_with_operations(
804 operations: Vec<TimerOperation>,
805 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
806 ) {
807 let mut timer = TestTimer::new(
808 Ustr::from("PROP_TEST_TIMER"),
809 NonZeroU64::new(interval_ns).unwrap(),
810 UnixNanos::from(start_time_ns),
811 stop_time_ns.map(UnixNanos::from),
812 fire_immediately,
813 );
814
815 let mut current_time = start_time_ns;
816
817 for operation in operations {
818 if timer.is_expired() {
819 break;
820 }
821
822 match operation {
823 TimerOperation::AdvanceTime(delta) => {
824 let to_time = current_time + delta;
825 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
826 current_time = to_time;
827
828 for (i, event) in events.iter().enumerate() {
830 if i > 0 {
832 assert!(
833 event.ts_event >= events[i - 1].ts_event,
834 "Events should be in chronological order"
835 );
836 }
837
838 assert!(
840 event.ts_event.as_u64() >= start_time_ns,
841 "Event timestamp should not be before start time"
842 );
843
844 assert!(
845 event.ts_event.as_u64() <= to_time,
846 "Event timestamp should not be after advance time"
847 );
848
849 if let Some(stop_time_ns) = stop_time_ns {
851 assert!(
852 event.ts_event.as_u64() <= stop_time_ns,
853 "Event timestamp should not exceed stop time"
854 );
855 }
856 }
857 }
858 TimerOperation::Cancel => {
859 timer.cancel();
860 assert!(timer.is_expired(), "Timer should be expired after cancel");
861 }
862 }
863
864 if !timer.is_expired() {
866 let expected_interval_multiple = if fire_immediately {
868 timer.next_time_ns().as_u64() >= start_time_ns
869 } else {
870 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
871 };
872 assert!(
873 expected_interval_multiple,
874 "Next time should respect interval spacing"
875 );
876
877 if let Some(stop_time_ns) = stop_time_ns {
880 if timer.next_time_ns().as_u64() > stop_time_ns {
881 let mut test_timer = timer.clone();
883 let events: Vec<TimeEvent> = test_timer
884 .advance(UnixNanos::from(stop_time_ns + 1))
885 .collect();
886 assert!(
887 events.is_empty() || test_timer.is_expired(),
888 "Timer should not generate events beyond stop time"
889 );
890 }
891 }
892 }
893 }
894
895 if !timer.is_expired() && stop_time_ns.is_some() {
898 let events: Vec<TimeEvent> = timer
899 .advance(UnixNanos::from(stop_time_ns.unwrap() + 1000))
900 .collect();
901 assert!(
902 timer.is_expired() || events.is_empty(),
903 "Timer should eventually expire or stop generating events"
904 );
905 }
906 }
907
908 proptest! {
909 #[test]
910 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
911 test_timer_with_operations(operations, config);
912 }
913
914 #[test]
915 fn prop_timer_interval_consistency(
916 interval_ns in 1u64..=100u64,
917 start_time_ns in 0u64..=50u64,
918 fire_immediately in prop::bool::ANY,
919 advance_count in 1usize..=20usize,
920 ) {
921 let mut timer = TestTimer::new(
922 Ustr::from("CONSISTENCY_TEST"),
923 NonZeroU64::new(interval_ns).unwrap(),
924 UnixNanos::from(start_time_ns),
925 None, fire_immediately,
927 );
928
929 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
930
931 for _ in 0..advance_count {
932 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
933
934 if !events.is_empty() {
935 prop_assert_eq!(events.len(), 1);
937 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
938 }
939
940 previous_event_time += interval_ns;
941 }
942 }
943 }
944}