nautilus_common/python/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, num::NonZeroUsize, rc::Rc};
21
22use indexmap::IndexMap;
23use nautilus_core::{
24    nanos::UnixNanos,
25    python::{to_pyruntime_err, to_pyvalue_err},
26};
27use nautilus_model::{
28    data::{BarType, DataType},
29    enums::BookType,
30    identifiers::{ClientId, InstrumentId, TraderId, Venue},
31};
32use pyo3::{
33    exceptions::{PyRuntimeError, PyValueError},
34    prelude::*,
35};
36use ustr::Ustr;
37
38use crate::{
39    actor::{
40        Actor,
41        data_actor::{DataActor, DataActorConfig, DataActorCore},
42    },
43    cache::Cache,
44    clock::Clock,
45    enums::ComponentState,
46};
47
48/// Inner actor that implements `DataActor` and can be used as the generic type parameter.
49///
50/// Holds the `DataActorCore` and implements the `DataActor` trait, allowing it to be used
51/// with the generic methods on `DataActorCore`.
52#[derive(Debug)]
53pub struct PyDataActorInner {
54    core: DataActorCore,
55}
56
57impl PyDataActorInner {
58    pub fn new(
59        config: DataActorConfig,
60        cache: Rc<RefCell<Cache>>,
61        clock: Rc<RefCell<dyn Clock>>,
62    ) -> Self {
63        Self {
64            core: DataActorCore::new(config, cache, clock),
65        }
66    }
67
68    pub fn core(&self) -> &DataActorCore {
69        &self.core
70    }
71
72    pub fn core_mut(&mut self) -> &mut DataActorCore {
73        &mut self.core
74    }
75}
76
77impl Actor for PyDataActorInner {
78    fn id(&self) -> ustr::Ustr {
79        self.core.actor_id.inner()
80    }
81
82    fn handle(&mut self, msg: &dyn std::any::Any) {
83        self.core.handle(msg)
84    }
85
86    fn as_any(&self) -> &dyn std::any::Any {
87        self
88    }
89}
90
91impl DataActor for PyDataActorInner {
92    fn state(&self) -> ComponentState {
93        self.core.state()
94    }
95}
96
97/// Provides a generic `DataActor`.
98#[allow(non_camel_case_types)]
99#[pyo3::pyclass(
100    module = "posei_trader.core.nautilus_pyo3.common",
101    name = "DataActor",
102    unsendable
103)]
104#[derive(Debug)]
105pub struct PyDataActor {
106    inner: Option<PyDataActorInner>,
107    config: DataActorConfig,
108}
109
110impl PyDataActor {
111    /// Gets a reference to the inner actor, returning an error if not registered.
112    fn inner(&self) -> PyResult<&PyDataActorInner> {
113        self.inner.as_ref().ok_or_else(|| {
114            PyErr::new::<PyRuntimeError, _>("DataActor has not been registered with a system")
115        })
116    }
117
118    /// Gets a mutable reference to the inner actor, returning an error if not registered.
119    fn inner_mut(&mut self) -> PyResult<&mut PyDataActorInner> {
120        self.inner.as_mut().ok_or_else(|| {
121            PyErr::new::<PyRuntimeError, _>("DataActor has not been registered with a system")
122        })
123    }
124
125    /// Gets a reference to the core, returning an error if not registered.
126    fn core(&self) -> PyResult<&DataActorCore> {
127        Ok(self.inner()?.core())
128    }
129
130    /// Gets a mutable reference to the core, returning an error if not registered.
131    fn core_mut(&mut self) -> PyResult<&mut DataActorCore> {
132        Ok(self.inner_mut()?.core_mut())
133    }
134
135    /// TODO: WIP
136    /// This method should be called to properly initialize the actor
137    /// with cache, clock and other components.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if already registered.
142    pub fn register(
143        &mut self,
144        trader_id: TraderId,
145        cache: Rc<RefCell<Cache>>,
146        clock: Rc<RefCell<dyn Clock>>,
147    ) -> PyResult<()> {
148        if self.inner.is_some() {
149            return Err(PyErr::new::<PyRuntimeError, _>(
150                "DataActor has already been registered",
151            ));
152        }
153
154        // Create the inner actor with the components
155        let mut inner = PyDataActorInner::new(self.config.clone(), cache, clock);
156        inner.core_mut().set_trader_id(trader_id);
157        self.inner = Some(inner);
158
159        Ok(())
160    }
161}
162
163#[pymethods]
164impl PyDataActor {
165    #[new]
166    #[pyo3(signature = (_config=None))]
167    fn py_new(_config: Option<PyObject>) -> PyResult<Self> {
168        // TODO: Create with default config but no inner actor until registered
169        let config = DataActorConfig::default();
170
171        Ok(Self {
172            inner: None,
173            config,
174        })
175    }
176
177    #[getter]
178    fn actor_id(&self) -> PyResult<String> {
179        Ok(self.core()?.actor_id.to_string())
180    }
181
182    #[getter]
183    fn state(&self) -> PyResult<ComponentState> {
184        Ok(self.core()?.state())
185    }
186
187    #[getter]
188    fn trader_id(&self) -> PyResult<Option<String>> {
189        Ok(self.core()?.trader_id().map(|id| id.to_string()))
190    }
191
192    fn is_ready(&self) -> PyResult<bool> {
193        Ok(self.core()?.is_ready())
194    }
195
196    fn is_running(&self) -> PyResult<bool> {
197        Ok(self.core()?.is_running())
198    }
199
200    fn is_stopped(&self) -> PyResult<bool> {
201        Ok(self.core()?.is_stopped())
202    }
203
204    fn is_disposed(&self) -> PyResult<bool> {
205        Ok(self.core()?.is_disposed())
206    }
207
208    fn is_degraded(&self) -> PyResult<bool> {
209        Ok(self.core()?.is_degraded())
210    }
211
212    fn is_faulting(&self) -> PyResult<bool> {
213        Ok(self.core()?.is_faulted())
214    }
215
216    #[pyo3(name = "initialize")]
217    fn py_initialize(&mut self) -> PyResult<()> {
218        self.core_mut()?.initialize().map_err(to_pyruntime_err)
219    }
220
221    #[pyo3(name = "start")]
222    fn py_start(&mut self) -> PyResult<()> {
223        self.core_mut()?.start().map_err(to_pyruntime_err)
224    }
225
226    #[pyo3(name = "stop")]
227    fn py_stop(&mut self) -> PyResult<()> {
228        self.core_mut()?.stop().map_err(to_pyruntime_err)
229    }
230
231    #[pyo3(name = "resume")]
232    fn py_resume(&mut self) -> PyResult<()> {
233        self.core_mut()?.resume().map_err(to_pyruntime_err)
234    }
235
236    #[pyo3(name = "reset")]
237    fn py_reset(&mut self) -> PyResult<()> {
238        self.core_mut()?.reset().map_err(to_pyruntime_err)
239    }
240
241    #[pyo3(name = "dispose")]
242    fn py_dispose(&mut self) -> PyResult<()> {
243        self.core_mut()?.dispose().map_err(to_pyruntime_err)
244    }
245
246    #[pyo3(name = "degrade")]
247    fn py_degrade(&mut self) -> PyResult<()> {
248        self.core_mut()?.degrade().map_err(to_pyruntime_err)
249    }
250
251    #[pyo3(name = "fault")]
252    fn py_fault(&mut self) -> PyResult<()> {
253        self.core_mut()?.fault().map_err(to_pyruntime_err)
254    }
255
256    #[pyo3(name = "register_warning_event")]
257    fn py_register_warning_event(&mut self, event_type: &str) -> PyResult<()> {
258        self.core_mut()?.register_warning_event(event_type);
259        Ok(())
260    }
261
262    #[pyo3(name = "deregister_warning_event")]
263    fn py_deregister_warning_event(&mut self, event_type: &str) -> PyResult<()> {
264        self.core_mut()?.deregister_warning_event(event_type);
265        Ok(())
266    }
267
268    #[pyo3(name = "shutdown_system")]
269    #[pyo3(signature = (reason=None))]
270    fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
271        self.core()?.shutdown_system(reason);
272        Ok(())
273    }
274
275    #[pyo3(name = "subscribe_data")]
276    #[pyo3(signature = (data_type, client_id=None, params=None))]
277    fn py_subscribe_data(
278        &mut self,
279        data_type: DataType,
280        client_id: Option<ClientId>,
281        params: Option<IndexMap<String, String>>,
282    ) -> PyResult<()> {
283        self.inner_mut()?
284            .core_mut()
285            .subscribe_data::<PyDataActorInner>(data_type, client_id, params);
286        Ok(())
287    }
288
289    #[pyo3(name = "subscribe_instruments")]
290    #[pyo3(signature = (venue, client_id=None, params=None))]
291    fn py_subscribe_instruments(
292        &mut self,
293        venue: Venue,
294        client_id: Option<ClientId>,
295        params: Option<IndexMap<String, String>>,
296    ) -> PyResult<()> {
297        self.inner_mut()?
298            .core_mut()
299            .subscribe_instruments::<PyDataActorInner>(venue, client_id, params);
300        Ok(())
301    }
302
303    #[pyo3(name = "subscribe_instrument")]
304    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
305    fn py_subscribe_instrument(
306        &mut self,
307        instrument_id: InstrumentId,
308        client_id: Option<ClientId>,
309        params: Option<IndexMap<String, String>>,
310    ) -> PyResult<()> {
311        self.inner_mut()?
312            .core_mut()
313            .subscribe_instrument::<PyDataActorInner>(instrument_id, client_id, params);
314        Ok(())
315    }
316
317    #[pyo3(name = "subscribe_book_deltas")]
318    #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
319    fn py_subscribe_book_deltas(
320        &mut self,
321        instrument_id: InstrumentId,
322        book_type: BookType,
323        depth: Option<usize>,
324        client_id: Option<ClientId>,
325        managed: bool,
326        params: Option<IndexMap<String, String>>,
327    ) -> PyResult<()> {
328        let depth = depth.and_then(NonZeroUsize::new);
329        self.inner_mut()?
330            .core_mut()
331            .subscribe_book_deltas::<PyDataActorInner>(
332                instrument_id,
333                book_type,
334                depth,
335                client_id,
336                managed,
337                params,
338            );
339        Ok(())
340    }
341
342    #[pyo3(name = "subscribe_book_at_interval")]
343    #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
344    fn py_subscribe_book_at_interval(
345        &mut self,
346        instrument_id: InstrumentId,
347        book_type: BookType,
348        interval_ms: usize,
349        depth: Option<usize>,
350        client_id: Option<ClientId>,
351        params: Option<IndexMap<String, String>>,
352    ) -> PyResult<()> {
353        let depth = depth.and_then(NonZeroUsize::new);
354        let interval_ms = NonZeroUsize::new(interval_ms)
355            .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
356
357        self.inner_mut()?
358            .core_mut()
359            .subscribe_book_at_interval::<PyDataActorInner>(
360                instrument_id,
361                book_type,
362                depth,
363                interval_ms,
364                client_id,
365                params,
366            );
367        Ok(())
368    }
369
370    #[pyo3(name = "subscribe_quotes")]
371    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
372    fn py_subscribe_quotes(
373        &mut self,
374        instrument_id: InstrumentId,
375        client_id: Option<ClientId>,
376        params: Option<IndexMap<String, String>>,
377    ) -> PyResult<()> {
378        self.inner_mut()?
379            .core_mut()
380            .subscribe_quotes::<PyDataActorInner>(instrument_id, client_id, params);
381        Ok(())
382    }
383
384    #[pyo3(name = "subscribe_trades")]
385    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
386    fn py_subscribe_trades(
387        &mut self,
388        instrument_id: InstrumentId,
389        client_id: Option<ClientId>,
390        params: Option<IndexMap<String, String>>,
391    ) -> PyResult<()> {
392        self.inner_mut()?
393            .core_mut()
394            .subscribe_trades::<PyDataActorInner>(instrument_id, client_id, params);
395        Ok(())
396    }
397
398    #[pyo3(name = "subscribe_bars")]
399    #[pyo3(signature = (bar_type, client_id=None, await_partial=false, params=None))]
400    fn py_subscribe_bars(
401        &mut self,
402        bar_type: BarType,
403        client_id: Option<ClientId>,
404        await_partial: bool,
405        params: Option<IndexMap<String, String>>,
406    ) -> PyResult<()> {
407        self.inner_mut()?
408            .core_mut()
409            .subscribe_bars::<PyDataActorInner>(bar_type, client_id, await_partial, params);
410        Ok(())
411    }
412
413    #[pyo3(name = "subscribe_mark_prices")]
414    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
415    fn py_subscribe_mark_prices(
416        &mut self,
417        instrument_id: InstrumentId,
418        client_id: Option<ClientId>,
419        params: Option<IndexMap<String, String>>,
420    ) -> PyResult<()> {
421        self.inner_mut()?
422            .core_mut()
423            .subscribe_mark_prices::<PyDataActorInner>(instrument_id, client_id, params);
424        Ok(())
425    }
426
427    #[pyo3(name = "subscribe_index_prices")]
428    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
429    fn py_subscribe_index_prices(
430        &mut self,
431        instrument_id: InstrumentId,
432        client_id: Option<ClientId>,
433        params: Option<IndexMap<String, String>>,
434    ) -> PyResult<()> {
435        self.inner_mut()?
436            .core_mut()
437            .subscribe_index_prices::<PyDataActorInner>(instrument_id, client_id, params);
438        Ok(())
439    }
440
441    #[pyo3(name = "subscribe_instrument_status")]
442    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
443    fn py_subscribe_instrument_status(
444        &mut self,
445        instrument_id: InstrumentId,
446        client_id: Option<ClientId>,
447        params: Option<IndexMap<String, String>>,
448    ) -> PyResult<()> {
449        self.inner_mut()?
450            .core_mut()
451            .subscribe_instrument_status::<PyDataActorInner>(instrument_id, client_id, params);
452        Ok(())
453    }
454
455    #[pyo3(name = "subscribe_instrument_close")]
456    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
457    fn py_subscribe_instrument_close(
458        &mut self,
459        instrument_id: InstrumentId,
460        client_id: Option<ClientId>,
461        params: Option<IndexMap<String, String>>,
462    ) -> PyResult<()> {
463        self.inner_mut()?
464            .core_mut()
465            .subscribe_instrument_close::<PyDataActorInner>(instrument_id, client_id, params);
466        Ok(())
467    }
468
469    // Request methods
470    #[pyo3(name = "request_data")]
471    #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
472    fn py_request_data(
473        &mut self,
474        data_type: DataType,
475        client_id: ClientId,
476        start: Option<u64>,
477        end: Option<u64>,
478        limit: Option<usize>,
479        params: Option<IndexMap<String, String>>,
480    ) -> PyResult<String> {
481        let limit = limit.and_then(NonZeroUsize::new);
482        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
483        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
484
485        let request_id = self
486            .inner_mut()?
487            .core_mut()
488            .request_data::<PyDataActorInner>(data_type, client_id, start, end, limit, params)
489            .map_err(to_pyvalue_err)?;
490        Ok(request_id.to_string())
491    }
492
493    #[pyo3(name = "request_instrument")]
494    #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
495    fn py_request_instrument(
496        &mut self,
497        instrument_id: InstrumentId,
498        start: Option<u64>,
499        end: Option<u64>,
500        client_id: Option<ClientId>,
501        params: Option<IndexMap<String, String>>,
502    ) -> PyResult<String> {
503        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
504        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
505
506        let request_id = self
507            .inner_mut()?
508            .core_mut()
509            .request_instrument::<PyDataActorInner>(instrument_id, start, end, client_id, params)
510            .map_err(to_pyvalue_err)?;
511        Ok(request_id.to_string())
512    }
513
514    #[pyo3(name = "request_instruments")]
515    #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
516    fn py_request_instruments(
517        &mut self,
518        venue: Option<Venue>,
519        start: Option<u64>,
520        end: Option<u64>,
521        client_id: Option<ClientId>,
522        params: Option<IndexMap<String, String>>,
523    ) -> PyResult<String> {
524        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
525        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
526
527        let request_id = self
528            .inner_mut()?
529            .core_mut()
530            .request_instruments::<PyDataActorInner>(venue, start, end, client_id, params)
531            .map_err(to_pyvalue_err)?;
532        Ok(request_id.to_string())
533    }
534
535    #[pyo3(name = "request_book_snapshot")]
536    #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
537    fn py_request_book_snapshot(
538        &mut self,
539        instrument_id: InstrumentId,
540        depth: Option<usize>,
541        client_id: Option<ClientId>,
542        params: Option<IndexMap<String, String>>,
543    ) -> PyResult<String> {
544        let depth = depth.and_then(NonZeroUsize::new);
545
546        let request_id = self
547            .inner_mut()?
548            .core_mut()
549            .request_book_snapshot::<PyDataActorInner>(instrument_id, depth, client_id, params)
550            .map_err(to_pyvalue_err)?;
551        Ok(request_id.to_string())
552    }
553
554    #[pyo3(name = "request_quotes")]
555    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
556    fn py_request_quotes(
557        &mut self,
558        instrument_id: InstrumentId,
559        start: Option<u64>,
560        end: Option<u64>,
561        limit: Option<usize>,
562        client_id: Option<ClientId>,
563        params: Option<IndexMap<String, String>>,
564    ) -> PyResult<String> {
565        let limit = limit.and_then(NonZeroUsize::new);
566        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
567        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
568
569        let request_id = self
570            .inner_mut()?
571            .core_mut()
572            .request_quotes::<PyDataActorInner>(instrument_id, start, end, limit, client_id, params)
573            .map_err(to_pyvalue_err)?;
574        Ok(request_id.to_string())
575    }
576
577    #[pyo3(name = "request_trades")]
578    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
579    fn py_request_trades(
580        &mut self,
581        instrument_id: InstrumentId,
582        start: Option<u64>,
583        end: Option<u64>,
584        limit: Option<usize>,
585        client_id: Option<ClientId>,
586        params: Option<IndexMap<String, String>>,
587    ) -> PyResult<String> {
588        let limit = limit.and_then(NonZeroUsize::new);
589        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
590        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
591
592        let request_id = self
593            .inner_mut()?
594            .core_mut()
595            .request_trades::<PyDataActorInner>(instrument_id, start, end, limit, client_id, params)
596            .map_err(to_pyvalue_err)?;
597        Ok(request_id.to_string())
598    }
599
600    #[pyo3(name = "request_bars")]
601    #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
602    fn py_request_bars(
603        &mut self,
604        bar_type: BarType,
605        start: Option<u64>,
606        end: Option<u64>,
607        limit: Option<usize>,
608        client_id: Option<ClientId>,
609        params: Option<IndexMap<String, String>>,
610    ) -> PyResult<String> {
611        let limit = limit.and_then(NonZeroUsize::new);
612        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
613        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
614
615        let request_id = self
616            .inner_mut()?
617            .core_mut()
618            .request_bars::<PyDataActorInner>(bar_type, start, end, limit, client_id, params)
619            .map_err(to_pyvalue_err)?;
620        Ok(request_id.to_string())
621    }
622
623    // Unsubscribe methods
624    #[pyo3(name = "unsubscribe_data")]
625    #[pyo3(signature = (data_type, client_id=None, params=None))]
626    fn py_unsubscribe_data(
627        &mut self,
628        data_type: DataType,
629        client_id: Option<ClientId>,
630        params: Option<IndexMap<String, String>>,
631    ) -> PyResult<()> {
632        self.inner_mut()?
633            .core_mut()
634            .unsubscribe_data::<PyDataActorInner>(data_type, client_id, params);
635        Ok(())
636    }
637
638    #[pyo3(name = "unsubscribe_instruments")]
639    #[pyo3(signature = (venue, client_id=None, params=None))]
640    fn py_unsubscribe_instruments(
641        &mut self,
642        venue: Venue,
643        client_id: Option<ClientId>,
644        params: Option<IndexMap<String, String>>,
645    ) -> PyResult<()> {
646        self.inner_mut()?
647            .core_mut()
648            .unsubscribe_instruments::<PyDataActorInner>(venue, client_id, params);
649        Ok(())
650    }
651
652    #[pyo3(name = "unsubscribe_instrument")]
653    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
654    fn py_unsubscribe_instrument(
655        &mut self,
656        instrument_id: InstrumentId,
657        client_id: Option<ClientId>,
658        params: Option<IndexMap<String, String>>,
659    ) -> PyResult<()> {
660        self.inner_mut()?
661            .core_mut()
662            .unsubscribe_instrument::<PyDataActorInner>(instrument_id, client_id, params);
663        Ok(())
664    }
665
666    #[pyo3(name = "unsubscribe_book_deltas")]
667    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
668    fn py_unsubscribe_book_deltas(
669        &mut self,
670        instrument_id: InstrumentId,
671        client_id: Option<ClientId>,
672        params: Option<IndexMap<String, String>>,
673    ) -> PyResult<()> {
674        self.inner_mut()?
675            .core_mut()
676            .unsubscribe_book_deltas::<PyDataActorInner>(instrument_id, client_id, params);
677        Ok(())
678    }
679
680    #[pyo3(name = "unsubscribe_book_at_interval")]
681    #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
682    fn py_unsubscribe_book_at_interval(
683        &mut self,
684        instrument_id: InstrumentId,
685        interval_ms: usize,
686        client_id: Option<ClientId>,
687        params: Option<IndexMap<String, String>>,
688    ) -> PyResult<()> {
689        let interval_ms = NonZeroUsize::new(interval_ms)
690            .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
691
692        self.inner_mut()?
693            .core_mut()
694            .unsubscribe_book_at_interval::<PyDataActorInner>(
695                instrument_id,
696                interval_ms,
697                client_id,
698                params,
699            );
700        Ok(())
701    }
702
703    #[pyo3(name = "unsubscribe_quotes")]
704    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
705    fn py_unsubscribe_quotes(
706        &mut self,
707        instrument_id: InstrumentId,
708        client_id: Option<ClientId>,
709        params: Option<IndexMap<String, String>>,
710    ) -> PyResult<()> {
711        self.inner_mut()?
712            .core_mut()
713            .unsubscribe_quotes::<PyDataActorInner>(instrument_id, client_id, params);
714        Ok(())
715    }
716
717    #[pyo3(name = "unsubscribe_trades")]
718    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
719    fn py_unsubscribe_trades(
720        &mut self,
721        instrument_id: InstrumentId,
722        client_id: Option<ClientId>,
723        params: Option<IndexMap<String, String>>,
724    ) -> PyResult<()> {
725        self.inner_mut()?
726            .core_mut()
727            .unsubscribe_trades::<PyDataActorInner>(instrument_id, client_id, params);
728        Ok(())
729    }
730
731    #[pyo3(name = "unsubscribe_bars")]
732    #[pyo3(signature = (bar_type, client_id=None, params=None))]
733    fn py_unsubscribe_bars(
734        &mut self,
735        bar_type: BarType,
736        client_id: Option<ClientId>,
737        params: Option<IndexMap<String, String>>,
738    ) -> PyResult<()> {
739        self.inner_mut()?
740            .core_mut()
741            .unsubscribe_bars::<PyDataActorInner>(bar_type, client_id, params);
742        Ok(())
743    }
744
745    #[pyo3(name = "unsubscribe_mark_prices")]
746    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
747    fn py_unsubscribe_mark_prices(
748        &mut self,
749        instrument_id: InstrumentId,
750        client_id: Option<ClientId>,
751        params: Option<IndexMap<String, String>>,
752    ) -> PyResult<()> {
753        self.inner_mut()?
754            .core_mut()
755            .unsubscribe_mark_prices::<PyDataActorInner>(instrument_id, client_id, params);
756        Ok(())
757    }
758
759    #[pyo3(name = "unsubscribe_index_prices")]
760    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
761    fn py_unsubscribe_index_prices(
762        &mut self,
763        instrument_id: InstrumentId,
764        client_id: Option<ClientId>,
765        params: Option<IndexMap<String, String>>,
766    ) -> PyResult<()> {
767        self.inner_mut()?
768            .core_mut()
769            .unsubscribe_index_prices::<PyDataActorInner>(instrument_id, client_id, params);
770        Ok(())
771    }
772
773    #[pyo3(name = "unsubscribe_instrument_status")]
774    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
775    fn py_unsubscribe_instrument_status(
776        &mut self,
777        instrument_id: InstrumentId,
778        client_id: Option<ClientId>,
779        params: Option<IndexMap<String, String>>,
780    ) -> PyResult<()> {
781        self.inner_mut()?
782            .core_mut()
783            .unsubscribe_instrument_status::<PyDataActorInner>(instrument_id, client_id, params);
784        Ok(())
785    }
786
787    #[pyo3(name = "unsubscribe_instrument_close")]
788    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
789    fn py_unsubscribe_instrument_close(
790        &mut self,
791        instrument_id: InstrumentId,
792        client_id: Option<ClientId>,
793        params: Option<IndexMap<String, String>>,
794    ) -> PyResult<()> {
795        self.inner_mut()?
796            .core_mut()
797            .unsubscribe_instrument_close::<PyDataActorInner>(instrument_id, client_id, params);
798        Ok(())
799    }
800}
801
802impl Actor for PyDataActor {
803    fn id(&self) -> Ustr {
804        self.inner
805            .as_ref()
806            .map(|a| a.id())
807            .unwrap_or_else(|| Ustr::from("PyDataActor-Unregistered"))
808    }
809
810    fn handle(&mut self, msg: &dyn std::any::Any) {
811        if let Some(inner) = &mut self.inner {
812            inner.handle(msg)
813        }
814    }
815
816    fn as_any(&self) -> &dyn std::any::Any {
817        self
818    }
819}
820
821impl DataActor for PyDataActor {
822    fn state(&self) -> ComponentState {
823        self.inner
824            .as_ref()
825            .map(|a| a.state())
826            .unwrap_or(ComponentState::PreInitialized)
827    }
828}
829
830////////////////////////////////////////////////////////////////////////////////
831// Tests
832////////////////////////////////////////////////////////////////////////////////
833#[cfg(test)]
834mod tests {
835    use std::{cell::RefCell, rc::Rc, str::FromStr};
836
837    use nautilus_model::{
838        data::{BarType, DataType},
839        enums::BookType,
840        identifiers::{ClientId, TraderId, Venue},
841        instruments::{CurrencyPair, stubs::audusd_sim},
842    };
843    use rstest::{fixture, rstest};
844
845    use super::PyDataActor;
846    use crate::{
847        actor::{Actor, DataActor, data_actor::DataActorConfig},
848        cache::Cache,
849        clock::TestClock,
850        enums::ComponentState,
851    };
852
853    #[fixture]
854    fn clock() -> Rc<RefCell<TestClock>> {
855        Rc::new(RefCell::new(TestClock::new()))
856    }
857
858    #[fixture]
859    fn cache() -> Rc<RefCell<Cache>> {
860        Rc::new(RefCell::new(Cache::new(None, None)))
861    }
862
863    #[fixture]
864    fn trader_id() -> TraderId {
865        TraderId::from("TRADER-001")
866    }
867
868    #[fixture]
869    fn client_id() -> ClientId {
870        ClientId::new("TestClient")
871    }
872
873    #[fixture]
874    fn venue() -> Venue {
875        Venue::from("SIM")
876    }
877
878    #[fixture]
879    fn data_type() -> DataType {
880        DataType::new("TestData", None)
881    }
882
883    #[fixture]
884    fn bar_type(audusd_sim: CurrencyPair) -> BarType {
885        BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
886    }
887
888    fn create_unregistered_actor() -> PyDataActor {
889        let config = DataActorConfig::default();
890        PyDataActor {
891            inner: None,
892            config,
893        }
894    }
895
896    fn create_registered_actor(
897        clock: Rc<RefCell<TestClock>>,
898        cache: Rc<RefCell<Cache>>,
899        trader_id: TraderId,
900    ) -> PyDataActor {
901        let config = DataActorConfig::default();
902        let mut actor = PyDataActor {
903            inner: None,
904            config,
905        };
906        actor.register(trader_id, cache, clock).unwrap();
907        actor
908    }
909
910    #[rstest]
911    fn test_new_actor_creation() {
912        let actor = PyDataActor::py_new(None).unwrap();
913        assert!(actor.inner.is_none());
914    }
915
916    #[rstest]
917    fn test_unregistered_actor_errors(data_type: DataType, client_id: ClientId) {
918        let mut actor = create_unregistered_actor();
919
920        assert!(actor.actor_id().is_err());
921        assert!(actor.state().is_err());
922        assert!(actor.trader_id().is_err());
923        assert!(actor.is_ready().is_err());
924        assert!(actor.is_running().is_err());
925        assert!(actor.is_stopped().is_err());
926        assert!(actor.is_disposed().is_err());
927        assert!(actor.is_degraded().is_err());
928        assert!(actor.is_faulting().is_err());
929        assert!(
930            actor
931                .py_subscribe_data(data_type.clone(), Some(client_id.clone()), None)
932                .is_err()
933        );
934        assert!(
935            actor
936                .py_request_data(data_type, client_id, None, None, None, None)
937                .is_err()
938        );
939        assert!(actor.py_initialize().is_err());
940        assert!(actor.py_start().is_err());
941    }
942
943    #[rstest]
944    fn test_registration_success(
945        clock: Rc<RefCell<TestClock>>,
946        cache: Rc<RefCell<Cache>>,
947        trader_id: TraderId,
948    ) {
949        let mut actor = create_unregistered_actor();
950        let result = actor.register(trader_id, cache, clock);
951        assert!(result.is_ok());
952        assert!(actor.inner.is_some());
953    }
954
955    #[rstest]
956    fn test_double_registration_fails(
957        clock: Rc<RefCell<TestClock>>,
958        cache: Rc<RefCell<Cache>>,
959        trader_id: TraderId,
960    ) {
961        pyo3::prepare_freethreaded_python();
962
963        let mut actor = create_unregistered_actor();
964        actor
965            .register(trader_id, cache.clone(), clock.clone())
966            .unwrap();
967
968        let result = actor.register(trader_id, cache, clock);
969        assert!(result.is_err());
970        assert_eq!(
971            result.unwrap_err().to_string(),
972            "RuntimeError: DataActor has already been registered"
973        );
974    }
975
976    #[rstest]
977    fn test_registered_actor_basic_properties(
978        clock: Rc<RefCell<TestClock>>,
979        cache: Rc<RefCell<Cache>>,
980        trader_id: TraderId,
981    ) {
982        let actor = create_registered_actor(clock, cache, trader_id);
983
984        assert!(actor.actor_id().is_ok());
985        assert_eq!(actor.state().unwrap(), ComponentState::PreInitialized);
986        assert_eq!(actor.trader_id().unwrap(), Some(trader_id.to_string()));
987        assert_eq!(actor.is_ready().unwrap(), false);
988        assert_eq!(actor.is_running().unwrap(), false);
989        assert_eq!(actor.is_stopped().unwrap(), false);
990        assert_eq!(actor.is_disposed().unwrap(), false);
991        assert_eq!(actor.is_degraded().unwrap(), false);
992        assert_eq!(actor.is_faulting().unwrap(), false);
993    }
994
995    #[rstest]
996    fn test_basic_subscription_methods_compile(
997        clock: Rc<RefCell<TestClock>>,
998        cache: Rc<RefCell<Cache>>,
999        trader_id: TraderId,
1000        data_type: DataType,
1001        client_id: ClientId,
1002        audusd_sim: CurrencyPair,
1003    ) {
1004        let mut actor = create_registered_actor(clock, cache, trader_id);
1005
1006        let _ = actor.py_subscribe_data(data_type.clone(), Some(client_id.clone()), None);
1007        let _ = actor.py_subscribe_quotes(audusd_sim.id, Some(client_id.clone()), None);
1008        let _ = actor.py_unsubscribe_data(data_type, Some(client_id.clone()), None);
1009        let _ = actor.py_unsubscribe_quotes(audusd_sim.id, Some(client_id), None);
1010    }
1011
1012    #[rstest]
1013    fn test_lifecycle_methods_pass_through(
1014        clock: Rc<RefCell<TestClock>>,
1015        cache: Rc<RefCell<Cache>>,
1016        trader_id: TraderId,
1017    ) {
1018        let mut actor = create_registered_actor(clock, cache, trader_id);
1019
1020        assert!(actor.py_initialize().is_ok());
1021        assert!(actor.py_start().is_ok());
1022        assert!(actor.py_stop().is_ok());
1023        assert!(actor.py_dispose().is_ok());
1024    }
1025
1026    #[rstest]
1027    fn test_warning_event_methods_pass_through(
1028        clock: Rc<RefCell<TestClock>>,
1029        cache: Rc<RefCell<Cache>>,
1030        trader_id: TraderId,
1031    ) {
1032        let mut actor = create_registered_actor(clock, cache, trader_id);
1033
1034        assert!(actor.py_register_warning_event("TestWarning").is_ok());
1035        assert!(actor.py_deregister_warning_event("TestWarning").is_ok());
1036    }
1037
1038    #[rstest]
1039    fn test_shutdown_system_passes_through(
1040        clock: Rc<RefCell<TestClock>>,
1041        cache: Rc<RefCell<Cache>>,
1042        trader_id: TraderId,
1043    ) {
1044        let actor = create_registered_actor(clock, cache, trader_id);
1045
1046        assert!(
1047            actor
1048                .py_shutdown_system(Some("Test shutdown".to_string()))
1049                .is_ok()
1050        );
1051        assert!(actor.py_shutdown_system(None).is_ok());
1052    }
1053
1054    #[rstest]
1055    fn test_book_at_interval_invalid_interval_ms(
1056        clock: Rc<RefCell<TestClock>>,
1057        cache: Rc<RefCell<Cache>>,
1058        trader_id: TraderId,
1059        audusd_sim: CurrencyPair,
1060    ) {
1061        pyo3::prepare_freethreaded_python();
1062
1063        let mut actor = create_registered_actor(clock, cache, trader_id);
1064
1065        let result = actor.py_subscribe_book_at_interval(
1066            audusd_sim.id,
1067            BookType::L2_MBP,
1068            0,
1069            None,
1070            None,
1071            None,
1072        );
1073        assert!(result.is_err());
1074        assert_eq!(
1075            result.unwrap_err().to_string(),
1076            "ValueError: interval_ms must be > 0"
1077        );
1078
1079        let result = actor.py_unsubscribe_book_at_interval(audusd_sim.id, 0, None, None);
1080        assert!(result.is_err());
1081        assert_eq!(
1082            result.unwrap_err().to_string(),
1083            "ValueError: interval_ms must be > 0"
1084        );
1085    }
1086
1087    #[rstest]
1088    fn test_request_methods_signatures_exist() {
1089        let actor = create_unregistered_actor();
1090
1091        // These calls will fail at runtime since the actor is unregistered,
1092        // but they prove the method signatures exist and compile correctly
1093        assert!(actor.inner.is_none()); // Verify it's unregistered
1094    }
1095
1096    #[rstest]
1097    fn test_actor_trait_implementation() {
1098        let actor = create_unregistered_actor();
1099
1100        // Test Actor trait methods
1101        let id = actor.id();
1102        assert_eq!(id.as_str(), "PyDataActor-Unregistered");
1103
1104        // Test that handle method doesn't panic
1105        let dummy_msg = "test message";
1106        let mut actor = actor;
1107        actor.handle(&dummy_msg);
1108
1109        // Test as_any returns the actor
1110        let any_ref = actor.as_any();
1111        assert!(any_ref.is::<PyDataActor>());
1112    }
1113
1114    #[rstest]
1115    fn test_data_actor_trait_implementation(
1116        clock: Rc<RefCell<TestClock>>,
1117        cache: Rc<RefCell<Cache>>,
1118        trader_id: TraderId,
1119    ) {
1120        let actor = create_registered_actor(clock, cache, trader_id);
1121
1122        // Test DataActor trait method (using the trait method directly)
1123        let state = DataActor::state(&actor);
1124        assert_eq!(state, ComponentState::PreInitialized);
1125    }
1126}