1#[rustfmt::skip]
19#[cfg(feature = "clock_v2")]
20use std::collections::BinaryHeap;
21
22#[rustfmt::skip]
23#[cfg(feature = "clock_v2")]
24use tokio::sync::Mutex;
25
26use std::{
27 cmp::Ordering,
28 fmt::{Debug, Display},
29 num::NonZeroU64,
30 rc::Rc,
31 sync::{
32 Arc,
33 atomic::{self, AtomicU64},
34 },
35};
36
37use nautilus_core::{
38 UUID4, UnixNanos,
39 correctness::{FAILED, check_valid_string},
40 datetime::floor_to_nearest_microsecond,
41 time::get_atomic_clock_realtime,
42};
43#[cfg(feature = "python")]
44use pyo3::{PyObject, Python};
45use tokio::{
46 task::JoinHandle,
47 time::{Duration, Instant},
48};
49use ustr::Ustr;
50
51use crate::runtime::get_runtime;
52
53#[must_use]
59pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
60 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
61}
62
63#[repr(C)]
64#[derive(Clone, Debug)]
65#[cfg_attr(
66 feature = "python",
67 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
68)]
69#[derive(Eq)]
74pub struct TimeEvent {
75 pub name: Ustr,
77 pub event_id: UUID4,
79 pub ts_event: UnixNanos,
81 pub ts_init: UnixNanos,
83}
84
85impl PartialOrd for TimeEvent {
87 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92impl Ord for TimeEvent {
94 fn cmp(&self, other: &Self) -> Ordering {
95 other.ts_event.cmp(&self.ts_event)
96 }
97}
98
99impl TimeEvent {
100 #[must_use]
106 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
107 Self {
108 name,
109 event_id,
110 ts_event,
111 ts_init,
112 }
113 }
114}
115
116impl Display for TimeEvent {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 write!(
119 f,
120 "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
121 self.name, self.event_id, self.ts_event, self.ts_init
122 )
123 }
124}
125
126impl PartialEq for TimeEvent {
127 fn eq(&self, other: &Self) -> bool {
128 self.event_id == other.event_id
129 }
130}
131
132pub type RustTimeEventCallback = dyn Fn(TimeEvent);
133
134#[derive(Clone)]
135pub enum TimeEventCallback {
136 #[cfg(feature = "python")]
137 Python(Arc<PyObject>),
138 Rust(Rc<RustTimeEventCallback>),
139}
140
141impl Debug for TimeEventCallback {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 match self {
144 #[cfg(feature = "python")]
145 Self::Python(_) => f.write_str("Python callback"),
146 Self::Rust(_) => f.write_str("Rust callback"),
147 }
148 }
149}
150
151impl TimeEventCallback {
152 pub fn call(&self, event: TimeEvent) {
158 match self {
159 #[cfg(feature = "python")]
160 Self::Python(callback) => {
161 Python::with_gil(|py| {
162 callback.call1(py, (event,)).unwrap();
163 });
164 }
165 Self::Rust(callback) => callback(event),
166 }
167 }
168}
169
170impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
171 fn from(value: Rc<RustTimeEventCallback>) -> Self {
172 Self::Rust(value)
173 }
174}
175
176#[cfg(feature = "python")]
177impl From<PyObject> for TimeEventCallback {
178 fn from(value: PyObject) -> Self {
179 Self::Python(Arc::new(value))
180 }
181}
182
183#[allow(unsafe_code)]
185unsafe impl Send for TimeEventCallback {}
186#[allow(unsafe_code)]
187unsafe impl Sync for TimeEventCallback {}
188
189#[repr(C)]
190#[derive(Clone, Debug)]
191pub struct TimeEventHandlerV2 {
196 pub event: TimeEvent,
198 pub callback: TimeEventCallback,
200}
201
202impl TimeEventHandlerV2 {
203 #[must_use]
205 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
206 Self { event, callback }
207 }
208
209 pub fn run(self) {
215 let Self { event, callback } = self;
216 callback.call(event);
217 }
218}
219
220impl PartialOrd for TimeEventHandlerV2 {
221 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
222 Some(self.cmp(other))
223 }
224}
225
226impl PartialEq for TimeEventHandlerV2 {
227 fn eq(&self, other: &Self) -> bool {
228 self.event.ts_event == other.event.ts_event
229 }
230}
231
232impl Eq for TimeEventHandlerV2 {}
233
234impl Ord for TimeEventHandlerV2 {
235 fn cmp(&self, other: &Self) -> Ordering {
236 self.event.ts_event.cmp(&other.event.ts_event)
237 }
238}
239
240#[derive(Clone, Copy, Debug)]
245pub struct TestTimer {
246 pub name: Ustr,
248 pub interval_ns: NonZeroU64,
250 pub start_time_ns: UnixNanos,
252 pub stop_time_ns: Option<UnixNanos>,
254 next_time_ns: UnixNanos,
255 is_expired: bool,
256}
257
258impl TestTimer {
259 #[must_use]
265 pub fn new(
266 name: Ustr,
267 interval_ns: NonZeroU64,
268 start_time_ns: UnixNanos,
269 stop_time_ns: Option<UnixNanos>,
270 ) -> Self {
271 check_valid_string(name, stringify!(name)).expect(FAILED);
272
273 Self {
274 name,
275 interval_ns,
276 start_time_ns,
277 stop_time_ns,
278 next_time_ns: start_time_ns + interval_ns.get(),
279 is_expired: false,
280 }
281 }
282
283 #[must_use]
285 pub const fn next_time_ns(&self) -> UnixNanos {
286 self.next_time_ns
287 }
288
289 #[must_use]
291 pub const fn is_expired(&self) -> bool {
292 self.is_expired
293 }
294
295 #[must_use]
296 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
297 TimeEvent {
298 name: self.name,
299 event_id,
300 ts_event: self.next_time_ns,
301 ts_init,
302 }
303 }
304
305 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
311 let advances = to_time_ns
312 .saturating_sub(self.next_time_ns.as_u64() - self.interval_ns.get())
313 / self.interval_ns.get();
314 self.take(advances as usize).map(|(event, _)| event)
315 }
316
317 pub const fn cancel(&mut self) {
321 self.is_expired = true;
322 }
323}
324
325impl Iterator for TestTimer {
326 type Item = (TimeEvent, UnixNanos);
327
328 fn next(&mut self) -> Option<Self::Item> {
329 if self.is_expired {
330 None
331 } else {
332 let item = (
333 TimeEvent {
334 name: self.name,
335 event_id: UUID4::new(),
336 ts_event: self.next_time_ns,
337 ts_init: self.next_time_ns,
338 },
339 self.next_time_ns,
340 );
341
342 if let Some(stop_time_ns) = self.stop_time_ns {
344 if self.next_time_ns >= stop_time_ns {
345 self.is_expired = true;
346 }
347 }
348
349 self.next_time_ns += self.interval_ns;
350
351 Some(item)
352 }
353 }
354}
355
356#[derive(Debug)]
361pub struct LiveTimer {
362 pub name: Ustr,
364 pub interval_ns: NonZeroU64,
366 pub start_time_ns: UnixNanos,
368 pub stop_time_ns: Option<UnixNanos>,
370 next_time_ns: Arc<AtomicU64>,
371 callback: TimeEventCallback,
372 task_handle: Option<JoinHandle<()>>,
373 #[cfg(feature = "clock_v2")]
374 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
375}
376
377impl LiveTimer {
378 #[must_use]
384 #[cfg(not(feature = "clock_v2"))]
385 pub fn new(
386 name: Ustr,
387 interval_ns: NonZeroU64,
388 start_time_ns: UnixNanos,
389 stop_time_ns: Option<UnixNanos>,
390 callback: TimeEventCallback,
391 ) -> Self {
392 check_valid_string(name, stringify!(name)).expect(FAILED);
393
394 log::debug!("Creating timer '{name}'");
395 Self {
396 name,
397 interval_ns,
398 start_time_ns,
399 stop_time_ns,
400 next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
401 callback,
402 task_handle: None,
403 }
404 }
405
406 #[must_use]
412 #[cfg(feature = "clock_v2")]
413 pub fn new(
414 name: Ustr,
415 interval_ns: NonZeroU64,
416 start_time_ns: UnixNanos,
417 stop_time_ns: Option<UnixNanos>,
418 callback: TimeEventCallback,
419 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
420 ) -> Self {
421 check_valid_string(name, stringify!(name)).expect(FAILED);
422
423 log::debug!("Creating timer '{name}'");
424 Self {
425 name,
426 interval_ns,
427 start_time_ns,
428 stop_time_ns,
429 next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
430 callback,
431 heap,
432 task_handle: None,
433 }
434 }
435
436 #[must_use]
440 pub fn next_time_ns(&self) -> UnixNanos {
441 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
442 }
443
444 #[must_use]
449 pub fn is_expired(&self) -> bool {
450 self.task_handle
451 .as_ref()
452 .is_some_and(tokio::task::JoinHandle::is_finished)
453 }
454
455 #[allow(unused_variables)] pub fn start(&mut self) {
461 let event_name = self.name;
462 let stop_time_ns = self.stop_time_ns;
463 let interval_ns = self.interval_ns.get();
464 let callback = self.callback.clone();
465
466 let clock = get_atomic_clock_realtime();
468 let now_ns = clock.get_time_ns();
469
470 let mut next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
472 if next_time_ns <= now_ns {
473 log::warn!(
474 "Timer '{}' alert time {} was in the past, adjusted to current time for immediate fire",
475 event_name,
476 next_time_ns,
477 );
478 next_time_ns = now_ns.into();
479 self.next_time_ns
480 .store(now_ns.as_u64(), atomic::Ordering::SeqCst);
481 }
482
483 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
485 let next_time_atomic = self.next_time_ns.clone();
486
487 #[cfg(feature = "clock_v2")]
488 let heap = self.heap.clone();
489
490 let rt = get_runtime();
491 let handle = rt.spawn(async move {
492 let clock = get_atomic_clock_realtime();
493
494 let overhead = Duration::from_millis(1);
496 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
497 let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
498 let start = Instant::now() + delay;
499
500 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
501
502 loop {
503 timer.tick().await;
506 let now_ns = clock.get_time_ns();
507
508 #[cfg(feature = "python")]
509 {
510 match callback {
511 TimeEventCallback::Python(ref callback) => {
512 call_python_with_time_event(event_name, next_time_ns, now_ns, callback);
513 }
514 TimeEventCallback::Rust(_) => {}
516 }
517 }
518
519 #[cfg(feature = "clock_v2")]
520 {
521 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
522 heap.lock().await.push(event);
523 }
524
525 next_time_ns += interval_ns;
527 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
528
529 if let Some(stop_time_ns) = stop_time_ns {
531 if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
532 break; }
534 }
535 }
536 });
537
538 self.task_handle = Some(handle);
539 }
540
541 pub fn cancel(&mut self) {
545 log::debug!("Cancel timer '{}'", self.name);
546 if let Some(ref handle) = self.task_handle {
547 handle.abort();
548 }
549 }
550}
551
552#[cfg(feature = "python")]
553fn call_python_with_time_event(
554 name: Ustr,
555 ts_event: UnixNanos,
556 ts_init: UnixNanos,
557 callback: &PyObject,
558) {
559 use nautilus_core::python::IntoPyObjectPoseiExt;
560 use pyo3::types::PyCapsule;
561
562 Python::with_gil(|py| {
563 let event = TimeEvent::new(name, UUID4::new(), ts_event, ts_init);
565 let capsule: PyObject = PyCapsule::new(py, event, None)
566 .expect("Error creating `PyCapsule`")
567 .into_py_any_unwrap(py);
568
569 match callback.call1(py, (capsule,)) {
570 Ok(_) => {}
571 Err(e) => tracing::error!("Error on callback: {e:?}"),
572 }
573 });
574}
575
576#[cfg(test)]
580mod tests {
581 use std::num::NonZeroU64;
582
583 use nautilus_core::UnixNanos;
584 use rstest::*;
585 use ustr::Ustr;
586
587 use super::{TestTimer, TimeEvent};
588
589 #[rstest]
590 fn test_test_timer_pop_event() {
591 let mut timer = TestTimer::new(
592 Ustr::from("TEST_TIMER"),
593 NonZeroU64::new(1).unwrap(),
594 UnixNanos::from(1),
595 None,
596 );
597
598 assert!(timer.next().is_some());
599 assert!(timer.next().is_some());
600 timer.is_expired = true;
601 assert!(timer.next().is_none());
602 }
603
604 #[rstest]
605 fn test_test_timer_advance_within_next_time_ns() {
606 let mut timer = TestTimer::new(
607 Ustr::from("TEST_TIMER"),
608 NonZeroU64::new(5).unwrap(),
609 UnixNanos::default(),
610 None,
611 );
612 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
613 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
614 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
615 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
616 assert_eq!(timer.next_time_ns, 5);
617 assert!(!timer.is_expired);
618 }
619
620 #[rstest]
621 fn test_test_timer_advance_up_to_next_time_ns() {
622 let mut timer = TestTimer::new(
623 Ustr::from("TEST_TIMER"),
624 NonZeroU64::new(1).unwrap(),
625 UnixNanos::default(),
626 None,
627 );
628 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
629 assert!(!timer.is_expired);
630 }
631
632 #[rstest]
633 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
634 let mut timer = TestTimer::new(
635 Ustr::from("TEST_TIMER"),
636 NonZeroU64::new(1).unwrap(),
637 UnixNanos::default(),
638 Some(UnixNanos::from(2)),
639 );
640 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
641 assert!(timer.is_expired);
642 }
643
644 #[rstest]
645 fn test_test_timer_advance_beyond_next_time_ns() {
646 let mut timer = TestTimer::new(
647 Ustr::from("TEST_TIMER"),
648 NonZeroU64::new(1).unwrap(),
649 UnixNanos::default(),
650 Some(UnixNanos::from(5)),
651 );
652 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
653 assert!(timer.is_expired);
654 }
655
656 #[rstest]
657 fn test_test_timer_advance_beyond_stop_time() {
658 let mut timer = TestTimer::new(
659 Ustr::from("TEST_TIMER"),
660 NonZeroU64::new(1).unwrap(),
661 UnixNanos::default(),
662 Some(UnixNanos::from(5)),
663 );
664 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
665 assert!(timer.is_expired);
666 }
667
668 #[rstest]
669 fn test_test_timer_advance_exact_boundary() {
670 let mut timer = TestTimer::new(
671 Ustr::from("TEST_TIMER"),
672 NonZeroU64::new(5).unwrap(),
673 UnixNanos::from(0),
674 None,
675 );
676 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
677 assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
678
679 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
680 assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
681 }
682}