1use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{
19 UUID4, UnixNanos,
20 python::{IntoPyObjectPoseiExt, to_pyvalue_err},
21};
22use pyo3::{
23 IntoPyObjectExt,
24 basic::CompareOp,
25 prelude::*,
26 types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33 module = "posei_trader.core.nautilus_pyo3.common",
34 name = "TimeEventHandler"
35)]
36#[derive(Clone)]
37#[allow(non_camel_case_types)]
44#[derive(Debug)]
45pub struct TimeEventHandler_Py {
46 pub event: TimeEvent,
48 pub callback: Arc<PyObject>,
50}
51
52impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
53 fn from(value: TimeEventHandlerV2) -> Self {
58 Self {
59 event: value.event,
60 callback: match value.callback {
61 #[cfg(feature = "python")]
62 TimeEventCallback::Python(callback) => callback,
63 TimeEventCallback::Rust(_) => {
64 panic!("Python time event handler is not supported for Rust callback")
65 }
66 },
67 }
68 }
69}
70
71#[pymethods]
72impl TimeEvent {
73 #[new]
74 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
75 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
76 }
77
78 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
79 let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
80
81 let ts_event = py_tuple
82 .get_item(2)?
83 .downcast::<PyInt>()?
84 .extract::<u64>()?;
85 let ts_init: u64 = py_tuple
86 .get_item(3)?
87 .downcast::<PyInt>()?
88 .extract::<u64>()?;
89
90 self.name = Ustr::from(
91 py_tuple
92 .get_item(0)?
93 .downcast::<PyString>()?
94 .extract::<&str>()?,
95 );
96 self.event_id = UUID4::from_str(
97 py_tuple
98 .get_item(1)?
99 .downcast::<PyString>()?
100 .extract::<&str>()?,
101 )
102 .map_err(to_pyvalue_err)?;
103 self.ts_event = ts_event.into();
104 self.ts_init = ts_init.into();
105
106 Ok(())
107 }
108
109 fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
110 (
111 self.name.to_string(),
112 self.event_id.to_string(),
113 self.ts_event.as_u64(),
114 self.ts_init.as_u64(),
115 )
116 .into_py_any(py)
117 }
118
119 fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
120 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
121 let state = self.__getstate__(py)?;
122 (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
123 }
124
125 #[staticmethod]
126 fn _safe_constructor() -> Self {
127 Self::new(
128 Ustr::from("NULL"),
129 UUID4::new(),
130 UnixNanos::default(),
131 UnixNanos::default(),
132 )
133 }
134
135 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
136 match op {
137 CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
138 CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
139 _ => py.NotImplemented(),
140 }
141 }
142
143 fn __repr__(&self) -> String {
144 format!("{}('{}')", stringify!(TimeEvent), self)
145 }
146
147 fn __str__(&self) -> String {
148 self.to_string()
149 }
150
151 #[getter]
152 #[pyo3(name = "name")]
153 fn py_name(&self) -> String {
154 self.name.to_string()
155 }
156
157 #[getter]
158 #[pyo3(name = "event_id")]
159 const fn py_event_id(&self) -> UUID4 {
160 self.event_id
161 }
162
163 #[getter]
164 #[pyo3(name = "ts_event")]
165 const fn py_ts_event(&self) -> u64 {
166 self.ts_event.as_u64()
167 }
168
169 #[getter]
170 #[pyo3(name = "ts_init")]
171 const fn py_ts_init(&self) -> u64 {
172 self.ts_init.as_u64()
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use std::{num::NonZeroU64, sync::Arc};
179
180 use nautilus_core::{
181 UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectPoseiExt,
182 time::get_atomic_clock_realtime,
183 };
184 use pyo3::prelude::*;
185 use tokio::time::Duration;
186
187 use crate::{
188 runner::{TimeEventSender, set_time_event_sender},
189 testing::wait_until,
190 timer::{LiveTimer, TimeEvent, TimeEventCallback},
191 };
192
193 #[pyfunction]
194 const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
195 Ok(())
197 }
198
199 #[derive(Debug)]
200 struct TestTimeEventSender;
201
202 impl TimeEventSender for TestTimeEventSender {
203 fn send(&self, _handler: crate::timer::TimeEventHandlerV2) {
204 }
206 }
207
208 #[tokio::test]
209 async fn test_live_timer_starts_and_stops() {
210 pyo3::prepare_freethreaded_python();
211
212 set_time_event_sender(Arc::new(TestTimeEventSender));
213
214 let callback = Python::with_gil(|py| {
215 let callable = wrap_pyfunction!(receive_event, py).unwrap();
216 let callable = callable.into_py_any_unwrap(py);
217 TimeEventCallback::from(callable)
218 });
219
220 let clock = get_atomic_clock_realtime();
222 let start_time = clock.get_time_ns();
223 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
224
225 let test_sender = Arc::new(TestTimeEventSender);
226 let mut timer = LiveTimer::new(
227 "TEST_TIMER".into(),
228 interval_ns,
229 start_time,
230 None,
231 callback,
232 false,
233 Some(test_sender),
234 );
235
236 let next_time_ns = timer.next_time_ns();
237 timer.start();
238
239 tokio::time::sleep(Duration::from_millis(300)).await;
241
242 timer.cancel();
243 wait_until(|| timer.is_expired(), Duration::from_secs(2));
244 assert!(timer.next_time_ns() > next_time_ns);
245 }
246
247 #[tokio::test]
248 async fn test_live_timer_with_stop_time() {
249 pyo3::prepare_freethreaded_python();
250
251 set_time_event_sender(Arc::new(TestTimeEventSender));
252
253 let callback = Python::with_gil(|py| {
254 let callable = wrap_pyfunction!(receive_event, py).unwrap();
255 let callable = callable.into_py_any_unwrap(py);
256 TimeEventCallback::from(callable)
257 });
258
259 let clock = get_atomic_clock_realtime();
261 let start_time = clock.get_time_ns();
262 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
263 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
264
265 let test_sender = Arc::new(TestTimeEventSender);
266 let mut timer = LiveTimer::new(
267 "TEST_TIMER".into(),
268 interval_ns,
269 start_time,
270 Some(stop_time),
271 callback,
272 false,
273 Some(test_sender),
274 );
275
276 let next_time_ns = timer.next_time_ns();
277 timer.start();
278
279 tokio::time::sleep(Duration::from_secs(1)).await;
281
282 wait_until(|| timer.is_expired(), Duration::from_secs(2));
283 assert!(timer.next_time_ns() > next_time_ns);
284 }
285
286 #[tokio::test]
287 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
288 pyo3::prepare_freethreaded_python();
289
290 set_time_event_sender(Arc::new(TestTimeEventSender));
291
292 let callback = Python::with_gil(|py| {
293 let callable = wrap_pyfunction!(receive_event, py).unwrap();
294 let callable = callable.into_py_any_unwrap(py);
295 TimeEventCallback::from(callable)
296 });
297
298 let clock = get_atomic_clock_realtime();
300 let start_time = UnixNanos::default();
301 let interval_ns = NonZeroU64::new(1).unwrap();
302 let stop_time = clock.get_time_ns();
303
304 let test_sender = Arc::new(TestTimeEventSender);
305 let mut timer = LiveTimer::new(
306 "TEST_TIMER".into(),
307 interval_ns,
308 start_time,
309 Some(stop_time),
310 callback,
311 false,
312 Some(test_sender),
313 );
314
315 timer.start();
316
317 wait_until(|| timer.is_expired(), Duration::from_secs(2));
318 }
319}