1use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{
19 UUID4, UnixNanos,
20 python::{IntoPyObjectPoseiExt, to_pyvalue_err},
21};
22use pyo3::{
23 IntoPyObjectExt,
24 basic::CompareOp,
25 prelude::*,
26 types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33 module = "posei_trader.core.nautilus_pyo3.common",
34 name = "TimeEventHandler"
35)]
36#[derive(Clone)]
37#[allow(non_camel_case_types)]
44#[derive(Debug)]
45pub struct TimeEventHandler_Py {
46 pub event: TimeEvent,
48 pub callback: Arc<PyObject>,
50}
51
52impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
53 fn from(value: TimeEventHandlerV2) -> Self {
58 Self {
59 event: value.event,
60 callback: match value.callback {
61 TimeEventCallback::Python(callback) => callback,
62 TimeEventCallback::Rust(_) => {
63 panic!("Python time event handler is not supported for Rust callback")
64 }
65 },
66 }
67 }
68}
69
70#[pymethods]
71impl TimeEvent {
72 #[new]
73 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
74 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
75 }
76
77 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
78 let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
79
80 let ts_event = py_tuple
81 .get_item(2)?
82 .downcast::<PyInt>()?
83 .extract::<u64>()?;
84 let ts_init: u64 = py_tuple
85 .get_item(3)?
86 .downcast::<PyInt>()?
87 .extract::<u64>()?;
88
89 self.name = Ustr::from(
90 py_tuple
91 .get_item(0)?
92 .downcast::<PyString>()?
93 .extract::<&str>()?,
94 );
95 self.event_id = UUID4::from_str(
96 py_tuple
97 .get_item(1)?
98 .downcast::<PyString>()?
99 .extract::<&str>()?,
100 )
101 .map_err(to_pyvalue_err)?;
102 self.ts_event = ts_event.into();
103 self.ts_init = ts_init.into();
104
105 Ok(())
106 }
107
108 fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
109 (
110 self.name.to_string(),
111 self.event_id.to_string(),
112 self.ts_event.as_u64(),
113 self.ts_init.as_u64(),
114 )
115 .into_py_any(py)
116 }
117
118 fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
119 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
120 let state = self.__getstate__(py)?;
121 (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
122 }
123
124 #[staticmethod]
125 fn _safe_constructor() -> Self {
126 Self::new(
127 Ustr::from("NULL"),
128 UUID4::new(),
129 UnixNanos::default(),
130 UnixNanos::default(),
131 )
132 }
133
134 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
135 match op {
136 CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
137 CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
138 _ => py.NotImplemented(),
139 }
140 }
141
142 fn __repr__(&self) -> String {
143 format!("{}('{}')", stringify!(TimeEvent), self)
144 }
145
146 fn __str__(&self) -> String {
147 self.to_string()
148 }
149
150 #[getter]
151 #[pyo3(name = "name")]
152 fn py_name(&self) -> String {
153 self.name.to_string()
154 }
155
156 #[getter]
157 #[pyo3(name = "event_id")]
158 const fn py_event_id(&self) -> UUID4 {
159 self.event_id
160 }
161
162 #[getter]
163 #[pyo3(name = "ts_event")]
164 const fn py_ts_event(&self) -> u64 {
165 self.ts_event.as_u64()
166 }
167
168 #[getter]
169 #[pyo3(name = "ts_init")]
170 const fn py_ts_init(&self) -> u64 {
171 self.ts_init.as_u64()
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 #[rustfmt::skip]
178 #[cfg(feature = "clock_v2")]
179 use std::collections::BinaryHeap;
180
181 use std::num::NonZeroU64;
182 #[rustfmt::skip]
183 #[cfg(feature = "clock_v2")]
184 use std::sync::Arc;
185
186 #[rustfmt::skip]
187 #[cfg(feature = "clock_v2")]
188 use tokio::sync::Mutex;
189
190 use nautilus_core::{
191 UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectPoseiExt,
192 time::get_atomic_clock_realtime,
193 };
194 use pyo3::prelude::*;
195 use tokio::time::Duration;
196 use ustr::Ustr; use crate::{
199 testing::wait_until,
200 timer::{LiveTimer, TimeEvent, TimeEventCallback},
201 };
202
203 #[pyfunction]
204 const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
205 Ok(())
207 }
208
209 #[tokio::test]
210 async fn test_live_timer_starts_and_stops() {
211 pyo3::prepare_freethreaded_python();
212
213 let callback = Python::with_gil(|py| {
214 let callable = wrap_pyfunction!(receive_event, py).unwrap();
215 let callable = callable.into_py_any_unwrap(py);
216 TimeEventCallback::from(callable)
217 });
218
219 let clock = get_atomic_clock_realtime();
221 let start_time = clock.get_time_ns();
222 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
223
224 #[cfg(not(feature = "clock_v2"))]
225 let mut timer = LiveTimer::new(
226 Ustr::from("TEST_TIMER"),
227 interval_ns,
228 start_time,
229 None,
230 callback,
231 );
232
233 #[cfg(feature = "clock_v2")]
234 let (_heap, mut timer) = {
235 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
236 (
237 heap.clone(),
238 LiveTimer::new(
239 "TEST_TIMER".into(),
240 interval_ns,
241 start_time,
242 None,
243 callback,
244 heap,
245 ),
246 )
247 };
248 let next_time_ns = timer.next_time_ns();
249 timer.start();
250
251 tokio::time::sleep(Duration::from_millis(300)).await;
253
254 timer.cancel();
255 wait_until(|| timer.is_expired(), Duration::from_secs(2));
256 assert!(timer.next_time_ns() > next_time_ns);
257 }
258
259 #[tokio::test]
260 async fn test_live_timer_with_stop_time() {
261 pyo3::prepare_freethreaded_python();
262
263 let callback = Python::with_gil(|py| {
264 let callable = wrap_pyfunction!(receive_event, py).unwrap();
265 let callable = callable.into_py_any_unwrap(py);
266 TimeEventCallback::from(callable)
267 });
268
269 let clock = get_atomic_clock_realtime();
271 let start_time = clock.get_time_ns();
272 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
273 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
274
275 #[cfg(not(feature = "clock_v2"))]
276 let mut timer = LiveTimer::new(
277 Ustr::from("TEST_TIMER"),
278 interval_ns,
279 start_time,
280 Some(stop_time),
281 callback,
282 );
283
284 #[cfg(feature = "clock_v2")]
285 let (_heap, mut timer) = {
286 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
287 (
288 heap.clone(),
289 LiveTimer::new(
290 "TEST_TIMER".into(),
291 interval_ns,
292 start_time,
293 Some(stop_time),
294 callback,
295 heap,
296 ),
297 )
298 };
299
300 let next_time_ns = timer.next_time_ns();
301 timer.start();
302
303 tokio::time::sleep(Duration::from_secs(1)).await;
305
306 wait_until(|| timer.is_expired(), Duration::from_secs(2));
307 assert!(timer.next_time_ns() > next_time_ns);
308 }
309
310 #[tokio::test]
311 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
312 pyo3::prepare_freethreaded_python();
313
314 let callback = Python::with_gil(|py| {
315 let callable = wrap_pyfunction!(receive_event, py).unwrap();
316 let callable = callable.into_py_any_unwrap(py);
317 TimeEventCallback::from(callable)
318 });
319
320 let clock = get_atomic_clock_realtime();
322 let start_time = UnixNanos::default();
323 let interval_ns = NonZeroU64::new(1).unwrap();
324 let stop_time = clock.get_time_ns();
325
326 #[cfg(not(feature = "clock_v2"))]
327 let mut timer = LiveTimer::new(
328 Ustr::from("TEST_TIMER"),
329 interval_ns,
330 start_time,
331 Some(stop_time),
332 callback,
333 );
334
335 #[cfg(feature = "clock_v2")]
336 let (_heap, mut timer) = {
337 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
338 (
339 heap.clone(),
340 LiveTimer::new(
341 "TEST_TIMER".into(),
342 interval_ns,
343 start_time,
344 Some(stop_time),
345 callback,
346 heap,
347 ),
348 )
349 };
350
351 timer.start();
352
353 wait_until(|| timer.is_expired(), Duration::from_secs(2));
354 }
355}