nautilus_common/python/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{
19    UUID4, UnixNanos,
20    python::{IntoPyObjectPoseiExt, to_pyvalue_err},
21};
22use pyo3::{
23    IntoPyObjectExt,
24    basic::CompareOp,
25    prelude::*,
26    types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33    module = "posei_trader.core.nautilus_pyo3.common",
34    name = "TimeEventHandler"
35)]
36#[derive(Clone)]
37/// Temporary time event handler for Python inter-operatbility
38///
39/// TODO: Remove once control flow moves into Rust
40///
41/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
42/// when the event's timestamp is reached.
43#[allow(non_camel_case_types)]
44#[derive(Debug)]
45pub struct TimeEventHandler_Py {
46    /// The time event.
47    pub event: TimeEvent,
48    /// The callable python object.
49    pub callback: Arc<PyObject>,
50}
51
52impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
53    /// # Panics
54    ///
55    /// Panics if the provided `TimeEventHandlerV2` contains a Rust callback,
56    /// since only Python callbacks are supported by this handler.
57    fn from(value: TimeEventHandlerV2) -> Self {
58        Self {
59            event: value.event,
60            callback: match value.callback {
61                #[cfg(feature = "python")]
62                TimeEventCallback::Python(callback) => callback,
63                TimeEventCallback::Rust(_) => {
64                    panic!("Python time event handler is not supported for Rust callback")
65                }
66            },
67        }
68    }
69}
70
71#[pymethods]
72impl TimeEvent {
73    #[new]
74    fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
75        Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
76    }
77
78    fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
79        let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
80
81        let ts_event = py_tuple
82            .get_item(2)?
83            .downcast::<PyInt>()?
84            .extract::<u64>()?;
85        let ts_init: u64 = py_tuple
86            .get_item(3)?
87            .downcast::<PyInt>()?
88            .extract::<u64>()?;
89
90        self.name = Ustr::from(
91            py_tuple
92                .get_item(0)?
93                .downcast::<PyString>()?
94                .extract::<&str>()?,
95        );
96        self.event_id = UUID4::from_str(
97            py_tuple
98                .get_item(1)?
99                .downcast::<PyString>()?
100                .extract::<&str>()?,
101        )
102        .map_err(to_pyvalue_err)?;
103        self.ts_event = ts_event.into();
104        self.ts_init = ts_init.into();
105
106        Ok(())
107    }
108
109    fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
110        (
111            self.name.to_string(),
112            self.event_id.to_string(),
113            self.ts_event.as_u64(),
114            self.ts_init.as_u64(),
115        )
116            .into_py_any(py)
117    }
118
119    fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
120        let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
121        let state = self.__getstate__(py)?;
122        (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
123    }
124
125    #[staticmethod]
126    fn _safe_constructor() -> Self {
127        Self::new(
128            Ustr::from("NULL"),
129            UUID4::new(),
130            UnixNanos::default(),
131            UnixNanos::default(),
132        )
133    }
134
135    fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
136        match op {
137            CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
138            CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
139            _ => py.NotImplemented(),
140        }
141    }
142
143    fn __repr__(&self) -> String {
144        format!("{}('{}')", stringify!(TimeEvent), self)
145    }
146
147    fn __str__(&self) -> String {
148        self.to_string()
149    }
150
151    #[getter]
152    #[pyo3(name = "name")]
153    fn py_name(&self) -> String {
154        self.name.to_string()
155    }
156
157    #[getter]
158    #[pyo3(name = "event_id")]
159    const fn py_event_id(&self) -> UUID4 {
160        self.event_id
161    }
162
163    #[getter]
164    #[pyo3(name = "ts_event")]
165    const fn py_ts_event(&self) -> u64 {
166        self.ts_event.as_u64()
167    }
168
169    #[getter]
170    #[pyo3(name = "ts_init")]
171    const fn py_ts_init(&self) -> u64 {
172        self.ts_init.as_u64()
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use std::{num::NonZeroU64, sync::Arc};
179
180    use nautilus_core::{
181        UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectPoseiExt,
182        time::get_atomic_clock_realtime,
183    };
184    use pyo3::prelude::*;
185    use tokio::time::Duration;
186
187    use crate::{
188        runner::{TimeEventSender, set_time_event_sender},
189        testing::wait_until,
190        timer::{LiveTimer, TimeEvent, TimeEventCallback},
191    };
192
193    #[pyfunction]
194    const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
195        // TODO: Assert the length of a handler vec
196        Ok(())
197    }
198
199    #[derive(Debug)]
200    struct TestTimeEventSender;
201
202    impl TimeEventSender for TestTimeEventSender {
203        fn send(&self, _handler: crate::timer::TimeEventHandlerV2) {
204            // Test implementation - just ignore the events
205        }
206    }
207
208    #[tokio::test]
209    async fn test_live_timer_starts_and_stops() {
210        pyo3::prepare_freethreaded_python();
211
212        set_time_event_sender(Arc::new(TestTimeEventSender));
213
214        let callback = Python::with_gil(|py| {
215            let callable = wrap_pyfunction!(receive_event, py).unwrap();
216            let callable = callable.into_py_any_unwrap(py);
217            TimeEventCallback::from(callable)
218        });
219
220        // Create a new LiveTimer with no stop time
221        let clock = get_atomic_clock_realtime();
222        let start_time = clock.get_time_ns();
223        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
224
225        let test_sender = Arc::new(TestTimeEventSender);
226        let mut timer = LiveTimer::new(
227            "TEST_TIMER".into(),
228            interval_ns,
229            start_time,
230            None,
231            callback,
232            false,
233            Some(test_sender),
234        );
235
236        let next_time_ns = timer.next_time_ns();
237        timer.start();
238
239        // Wait for timer to run
240        tokio::time::sleep(Duration::from_millis(300)).await;
241
242        timer.cancel();
243        wait_until(|| timer.is_expired(), Duration::from_secs(2));
244        assert!(timer.next_time_ns() > next_time_ns);
245    }
246
247    #[tokio::test]
248    async fn test_live_timer_with_stop_time() {
249        pyo3::prepare_freethreaded_python();
250
251        set_time_event_sender(Arc::new(TestTimeEventSender));
252
253        let callback = Python::with_gil(|py| {
254            let callable = wrap_pyfunction!(receive_event, py).unwrap();
255            let callable = callable.into_py_any_unwrap(py);
256            TimeEventCallback::from(callable)
257        });
258
259        // Create a new LiveTimer with a stop time
260        let clock = get_atomic_clock_realtime();
261        let start_time = clock.get_time_ns();
262        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
263        let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
264
265        let test_sender = Arc::new(TestTimeEventSender);
266        let mut timer = LiveTimer::new(
267            "TEST_TIMER".into(),
268            interval_ns,
269            start_time,
270            Some(stop_time),
271            callback,
272            false,
273            Some(test_sender),
274        );
275
276        let next_time_ns = timer.next_time_ns();
277        timer.start();
278
279        // Wait for a longer time than the stop time
280        tokio::time::sleep(Duration::from_secs(1)).await;
281
282        wait_until(|| timer.is_expired(), Duration::from_secs(2));
283        assert!(timer.next_time_ns() > next_time_ns);
284    }
285
286    #[tokio::test]
287    async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
288        pyo3::prepare_freethreaded_python();
289
290        set_time_event_sender(Arc::new(TestTimeEventSender));
291
292        let callback = Python::with_gil(|py| {
293            let callable = wrap_pyfunction!(receive_event, py).unwrap();
294            let callable = callable.into_py_any_unwrap(py);
295            TimeEventCallback::from(callable)
296        });
297
298        // Create a new LiveTimer with a stop time
299        let clock = get_atomic_clock_realtime();
300        let start_time = UnixNanos::default();
301        let interval_ns = NonZeroU64::new(1).unwrap();
302        let stop_time = clock.get_time_ns();
303
304        let test_sender = Arc::new(TestTimeEventSender);
305        let mut timer = LiveTimer::new(
306            "TEST_TIMER".into(),
307            interval_ns,
308            start_time,
309            Some(stop_time),
310            callback,
311            false,
312            Some(test_sender),
313        );
314
315        timer.start();
316
317        wait_until(|| timer.is_expired(), Duration::from_secs(2));
318    }
319}