nautilus_common/python/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{
19    UUID4, UnixNanos,
20    python::{IntoPyObjectPoseiExt, to_pyvalue_err},
21};
22use pyo3::{
23    IntoPyObjectExt,
24    basic::CompareOp,
25    prelude::*,
26    types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33    module = "posei_trader.core.nautilus_pyo3.common",
34    name = "TimeEventHandler"
35)]
36#[derive(Clone)]
37/// Temporary time event handler for Python inter-operatbility
38///
39/// TODO: Remove once control flow moves into Rust
40///
41/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
42/// when the event's timestamp is reached.
43#[allow(non_camel_case_types)]
44#[derive(Debug)]
45pub struct TimeEventHandler_Py {
46    /// The time event.
47    pub event: TimeEvent,
48    /// The callable python object.
49    pub callback: Arc<PyObject>,
50}
51
52impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
53    /// # Panics
54    ///
55    /// Panics if the provided `TimeEventHandlerV2` contains a Rust callback,
56    /// since only Python callbacks are supported by this handler.
57    fn from(value: TimeEventHandlerV2) -> Self {
58        Self {
59            event: value.event,
60            callback: match value.callback {
61                TimeEventCallback::Python(callback) => callback,
62                TimeEventCallback::Rust(_) => {
63                    panic!("Python time event handler is not supported for Rust callback")
64                }
65            },
66        }
67    }
68}
69
70#[pymethods]
71impl TimeEvent {
72    #[new]
73    fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
74        Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
75    }
76
77    fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
78        let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
79
80        let ts_event = py_tuple
81            .get_item(2)?
82            .downcast::<PyInt>()?
83            .extract::<u64>()?;
84        let ts_init: u64 = py_tuple
85            .get_item(3)?
86            .downcast::<PyInt>()?
87            .extract::<u64>()?;
88
89        self.name = Ustr::from(
90            py_tuple
91                .get_item(0)?
92                .downcast::<PyString>()?
93                .extract::<&str>()?,
94        );
95        self.event_id = UUID4::from_str(
96            py_tuple
97                .get_item(1)?
98                .downcast::<PyString>()?
99                .extract::<&str>()?,
100        )
101        .map_err(to_pyvalue_err)?;
102        self.ts_event = ts_event.into();
103        self.ts_init = ts_init.into();
104
105        Ok(())
106    }
107
108    fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
109        (
110            self.name.to_string(),
111            self.event_id.to_string(),
112            self.ts_event.as_u64(),
113            self.ts_init.as_u64(),
114        )
115            .into_py_any(py)
116    }
117
118    fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
119        let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
120        let state = self.__getstate__(py)?;
121        (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
122    }
123
124    #[staticmethod]
125    fn _safe_constructor() -> Self {
126        Self::new(
127            Ustr::from("NULL"),
128            UUID4::new(),
129            UnixNanos::default(),
130            UnixNanos::default(),
131        )
132    }
133
134    fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
135        match op {
136            CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
137            CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
138            _ => py.NotImplemented(),
139        }
140    }
141
142    fn __repr__(&self) -> String {
143        format!("{}('{}')", stringify!(TimeEvent), self)
144    }
145
146    fn __str__(&self) -> String {
147        self.to_string()
148    }
149
150    #[getter]
151    #[pyo3(name = "name")]
152    fn py_name(&self) -> String {
153        self.name.to_string()
154    }
155
156    #[getter]
157    #[pyo3(name = "event_id")]
158    const fn py_event_id(&self) -> UUID4 {
159        self.event_id
160    }
161
162    #[getter]
163    #[pyo3(name = "ts_event")]
164    const fn py_ts_event(&self) -> u64 {
165        self.ts_event.as_u64()
166    }
167
168    #[getter]
169    #[pyo3(name = "ts_init")]
170    const fn py_ts_init(&self) -> u64 {
171        self.ts_init.as_u64()
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    #[rustfmt::skip]
178    #[cfg(feature = "clock_v2")]
179    use std::collections::BinaryHeap;
180
181    use std::num::NonZeroU64;
182    #[rustfmt::skip]
183    #[cfg(feature = "clock_v2")]
184    use std::sync::Arc;
185
186    #[rustfmt::skip]
187    #[cfg(feature = "clock_v2")]
188    use tokio::sync::Mutex;
189
190    use nautilus_core::{
191        UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectPoseiExt,
192        time::get_atomic_clock_realtime,
193    };
194    use pyo3::prelude::*;
195    use tokio::time::Duration;
196    use ustr::Ustr; // Import required
197
198    use crate::{
199        testing::wait_until,
200        timer::{LiveTimer, TimeEvent, TimeEventCallback},
201    };
202
203    #[pyfunction]
204    const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
205        // TODO: Assert the length of a handler vec
206        Ok(())
207    }
208
209    #[tokio::test]
210    async fn test_live_timer_starts_and_stops() {
211        pyo3::prepare_freethreaded_python();
212
213        let callback = Python::with_gil(|py| {
214            let callable = wrap_pyfunction!(receive_event, py).unwrap();
215            let callable = callable.into_py_any_unwrap(py);
216            TimeEventCallback::from(callable)
217        });
218
219        // Create a new LiveTimer with no stop time
220        let clock = get_atomic_clock_realtime();
221        let start_time = clock.get_time_ns();
222        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
223
224        #[cfg(not(feature = "clock_v2"))]
225        let mut timer = LiveTimer::new(
226            Ustr::from("TEST_TIMER"),
227            interval_ns,
228            start_time,
229            None,
230            callback,
231        );
232
233        #[cfg(feature = "clock_v2")]
234        let (_heap, mut timer) = {
235            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
236            (
237                heap.clone(),
238                LiveTimer::new(
239                    "TEST_TIMER".into(),
240                    interval_ns,
241                    start_time,
242                    None,
243                    callback,
244                    heap,
245                ),
246            )
247        };
248        let next_time_ns = timer.next_time_ns();
249        timer.start();
250
251        // Wait for timer to run
252        tokio::time::sleep(Duration::from_millis(300)).await;
253
254        timer.cancel();
255        wait_until(|| timer.is_expired(), Duration::from_secs(2));
256        assert!(timer.next_time_ns() > next_time_ns);
257    }
258
259    #[tokio::test]
260    async fn test_live_timer_with_stop_time() {
261        pyo3::prepare_freethreaded_python();
262
263        let callback = Python::with_gil(|py| {
264            let callable = wrap_pyfunction!(receive_event, py).unwrap();
265            let callable = callable.into_py_any_unwrap(py);
266            TimeEventCallback::from(callable)
267        });
268
269        // Create a new LiveTimer with a stop time
270        let clock = get_atomic_clock_realtime();
271        let start_time = clock.get_time_ns();
272        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
273        let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
274
275        #[cfg(not(feature = "clock_v2"))]
276        let mut timer = LiveTimer::new(
277            Ustr::from("TEST_TIMER"),
278            interval_ns,
279            start_time,
280            Some(stop_time),
281            callback,
282        );
283
284        #[cfg(feature = "clock_v2")]
285        let (_heap, mut timer) = {
286            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
287            (
288                heap.clone(),
289                LiveTimer::new(
290                    "TEST_TIMER".into(),
291                    interval_ns,
292                    start_time,
293                    Some(stop_time),
294                    callback,
295                    heap,
296                ),
297            )
298        };
299
300        let next_time_ns = timer.next_time_ns();
301        timer.start();
302
303        // Wait for a longer time than the stop time
304        tokio::time::sleep(Duration::from_secs(1)).await;
305
306        wait_until(|| timer.is_expired(), Duration::from_secs(2));
307        assert!(timer.next_time_ns() > next_time_ns);
308    }
309
310    #[tokio::test]
311    async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
312        pyo3::prepare_freethreaded_python();
313
314        let callback = Python::with_gil(|py| {
315            let callable = wrap_pyfunction!(receive_event, py).unwrap();
316            let callable = callable.into_py_any_unwrap(py);
317            TimeEventCallback::from(callable)
318        });
319
320        // Create a new LiveTimer with a stop time
321        let clock = get_atomic_clock_realtime();
322        let start_time = UnixNanos::default();
323        let interval_ns = NonZeroU64::new(1).unwrap();
324        let stop_time = clock.get_time_ns();
325
326        #[cfg(not(feature = "clock_v2"))]
327        let mut timer = LiveTimer::new(
328            Ustr::from("TEST_TIMER"),
329            interval_ns,
330            start_time,
331            Some(stop_time),
332            callback,
333        );
334
335        #[cfg(feature = "clock_v2")]
336        let (_heap, mut timer) = {
337            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
338            (
339                heap.clone(),
340                LiveTimer::new(
341                    "TEST_TIMER".into(),
342                    interval_ns,
343                    start_time,
344                    Some(stop_time),
345                    callback,
346                    heap,
347                ),
348            )
349        };
350
351        timer.start();
352
353        wait_until(|| timer.is_expired(), Duration::from_secs(2));
354    }
355}