1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, num::NonZeroUsize, rc::Rc};
21
22use indexmap::IndexMap;
23use nautilus_core::{
24 nanos::UnixNanos,
25 python::{to_pyruntime_err, to_pyvalue_err},
26};
27use nautilus_model::{
28 data::{BarType, DataType},
29 enums::BookType,
30 identifiers::{ClientId, InstrumentId, TraderId, Venue},
31};
32use pyo3::{
33 exceptions::{PyRuntimeError, PyValueError},
34 prelude::*,
35};
36use ustr::Ustr;
37
38use crate::{
39 actor::{
40 Actor,
41 data_actor::{DataActor, DataActorConfig, DataActorCore},
42 },
43 cache::Cache,
44 clock::Clock,
45 enums::ComponentState,
46};
47
48#[derive(Debug)]
53pub struct PyDataActorInner {
54 core: DataActorCore,
55}
56
57impl PyDataActorInner {
58 pub fn new(
59 config: DataActorConfig,
60 cache: Rc<RefCell<Cache>>,
61 clock: Rc<RefCell<dyn Clock>>,
62 ) -> Self {
63 Self {
64 core: DataActorCore::new(config, cache, clock),
65 }
66 }
67
68 pub fn core(&self) -> &DataActorCore {
69 &self.core
70 }
71
72 pub fn core_mut(&mut self) -> &mut DataActorCore {
73 &mut self.core
74 }
75}
76
77impl Actor for PyDataActorInner {
78 fn id(&self) -> ustr::Ustr {
79 self.core.actor_id.inner()
80 }
81
82 fn handle(&mut self, msg: &dyn std::any::Any) {
83 self.core.handle(msg)
84 }
85
86 fn as_any(&self) -> &dyn std::any::Any {
87 self
88 }
89}
90
91impl DataActor for PyDataActorInner {
92 fn state(&self) -> ComponentState {
93 self.core.state()
94 }
95}
96
97#[allow(non_camel_case_types)]
99#[pyo3::pyclass(
100 module = "posei_trader.core.nautilus_pyo3.common",
101 name = "DataActor",
102 unsendable
103)]
104#[derive(Debug)]
105pub struct PyDataActor {
106 inner: Option<PyDataActorInner>,
107 config: DataActorConfig,
108}
109
110impl PyDataActor {
111 fn inner(&self) -> PyResult<&PyDataActorInner> {
113 self.inner.as_ref().ok_or_else(|| {
114 PyErr::new::<PyRuntimeError, _>("DataActor has not been registered with a system")
115 })
116 }
117
118 fn inner_mut(&mut self) -> PyResult<&mut PyDataActorInner> {
120 self.inner.as_mut().ok_or_else(|| {
121 PyErr::new::<PyRuntimeError, _>("DataActor has not been registered with a system")
122 })
123 }
124
125 fn core(&self) -> PyResult<&DataActorCore> {
127 Ok(self.inner()?.core())
128 }
129
130 fn core_mut(&mut self) -> PyResult<&mut DataActorCore> {
132 Ok(self.inner_mut()?.core_mut())
133 }
134
135 pub fn register(
143 &mut self,
144 trader_id: TraderId,
145 cache: Rc<RefCell<Cache>>,
146 clock: Rc<RefCell<dyn Clock>>,
147 ) -> PyResult<()> {
148 if self.inner.is_some() {
149 return Err(PyErr::new::<PyRuntimeError, _>(
150 "DataActor has already been registered",
151 ));
152 }
153
154 let mut inner = PyDataActorInner::new(self.config.clone(), cache, clock);
156 inner.core_mut().set_trader_id(trader_id);
157 self.inner = Some(inner);
158
159 Ok(())
160 }
161}
162
163#[pymethods]
164impl PyDataActor {
165 #[new]
166 #[pyo3(signature = (_config=None))]
167 fn py_new(_config: Option<PyObject>) -> PyResult<Self> {
168 let config = DataActorConfig::default();
170
171 Ok(Self {
172 inner: None,
173 config,
174 })
175 }
176
177 #[getter]
178 fn actor_id(&self) -> PyResult<String> {
179 Ok(self.core()?.actor_id.to_string())
180 }
181
182 #[getter]
183 fn state(&self) -> PyResult<ComponentState> {
184 Ok(self.core()?.state())
185 }
186
187 #[getter]
188 fn trader_id(&self) -> PyResult<Option<String>> {
189 Ok(self.core()?.trader_id().map(|id| id.to_string()))
190 }
191
192 fn is_ready(&self) -> PyResult<bool> {
193 Ok(self.core()?.is_ready())
194 }
195
196 fn is_running(&self) -> PyResult<bool> {
197 Ok(self.core()?.is_running())
198 }
199
200 fn is_stopped(&self) -> PyResult<bool> {
201 Ok(self.core()?.is_stopped())
202 }
203
204 fn is_disposed(&self) -> PyResult<bool> {
205 Ok(self.core()?.is_disposed())
206 }
207
208 fn is_degraded(&self) -> PyResult<bool> {
209 Ok(self.core()?.is_degraded())
210 }
211
212 fn is_faulting(&self) -> PyResult<bool> {
213 Ok(self.core()?.is_faulted())
214 }
215
216 #[pyo3(name = "initialize")]
217 fn py_initialize(&mut self) -> PyResult<()> {
218 self.core_mut()?.initialize().map_err(to_pyruntime_err)
219 }
220
221 #[pyo3(name = "start")]
222 fn py_start(&mut self) -> PyResult<()> {
223 self.core_mut()?.start().map_err(to_pyruntime_err)
224 }
225
226 #[pyo3(name = "stop")]
227 fn py_stop(&mut self) -> PyResult<()> {
228 self.core_mut()?.stop().map_err(to_pyruntime_err)
229 }
230
231 #[pyo3(name = "resume")]
232 fn py_resume(&mut self) -> PyResult<()> {
233 self.core_mut()?.resume().map_err(to_pyruntime_err)
234 }
235
236 #[pyo3(name = "reset")]
237 fn py_reset(&mut self) -> PyResult<()> {
238 self.core_mut()?.reset().map_err(to_pyruntime_err)
239 }
240
241 #[pyo3(name = "dispose")]
242 fn py_dispose(&mut self) -> PyResult<()> {
243 self.core_mut()?.dispose().map_err(to_pyruntime_err)
244 }
245
246 #[pyo3(name = "degrade")]
247 fn py_degrade(&mut self) -> PyResult<()> {
248 self.core_mut()?.degrade().map_err(to_pyruntime_err)
249 }
250
251 #[pyo3(name = "fault")]
252 fn py_fault(&mut self) -> PyResult<()> {
253 self.core_mut()?.fault().map_err(to_pyruntime_err)
254 }
255
256 #[pyo3(name = "register_warning_event")]
257 fn py_register_warning_event(&mut self, event_type: &str) -> PyResult<()> {
258 self.core_mut()?.register_warning_event(event_type);
259 Ok(())
260 }
261
262 #[pyo3(name = "deregister_warning_event")]
263 fn py_deregister_warning_event(&mut self, event_type: &str) -> PyResult<()> {
264 self.core_mut()?.deregister_warning_event(event_type);
265 Ok(())
266 }
267
268 #[pyo3(name = "shutdown_system")]
269 #[pyo3(signature = (reason=None))]
270 fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
271 self.core()?.shutdown_system(reason);
272 Ok(())
273 }
274
275 #[pyo3(name = "subscribe_data")]
276 #[pyo3(signature = (data_type, client_id=None, params=None))]
277 fn py_subscribe_data(
278 &mut self,
279 data_type: DataType,
280 client_id: Option<ClientId>,
281 params: Option<IndexMap<String, String>>,
282 ) -> PyResult<()> {
283 self.inner_mut()?
284 .core_mut()
285 .subscribe_data::<PyDataActorInner>(data_type, client_id, params);
286 Ok(())
287 }
288
289 #[pyo3(name = "subscribe_instruments")]
290 #[pyo3(signature = (venue, client_id=None, params=None))]
291 fn py_subscribe_instruments(
292 &mut self,
293 venue: Venue,
294 client_id: Option<ClientId>,
295 params: Option<IndexMap<String, String>>,
296 ) -> PyResult<()> {
297 self.inner_mut()?
298 .core_mut()
299 .subscribe_instruments::<PyDataActorInner>(venue, client_id, params);
300 Ok(())
301 }
302
303 #[pyo3(name = "subscribe_instrument")]
304 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
305 fn py_subscribe_instrument(
306 &mut self,
307 instrument_id: InstrumentId,
308 client_id: Option<ClientId>,
309 params: Option<IndexMap<String, String>>,
310 ) -> PyResult<()> {
311 self.inner_mut()?
312 .core_mut()
313 .subscribe_instrument::<PyDataActorInner>(instrument_id, client_id, params);
314 Ok(())
315 }
316
317 #[pyo3(name = "subscribe_book_deltas")]
318 #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
319 fn py_subscribe_book_deltas(
320 &mut self,
321 instrument_id: InstrumentId,
322 book_type: BookType,
323 depth: Option<usize>,
324 client_id: Option<ClientId>,
325 managed: bool,
326 params: Option<IndexMap<String, String>>,
327 ) -> PyResult<()> {
328 let depth = depth.and_then(NonZeroUsize::new);
329 self.inner_mut()?
330 .core_mut()
331 .subscribe_book_deltas::<PyDataActorInner>(
332 instrument_id,
333 book_type,
334 depth,
335 client_id,
336 managed,
337 params,
338 );
339 Ok(())
340 }
341
342 #[pyo3(name = "subscribe_book_at_interval")]
343 #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
344 fn py_subscribe_book_at_interval(
345 &mut self,
346 instrument_id: InstrumentId,
347 book_type: BookType,
348 interval_ms: usize,
349 depth: Option<usize>,
350 client_id: Option<ClientId>,
351 params: Option<IndexMap<String, String>>,
352 ) -> PyResult<()> {
353 let depth = depth.and_then(NonZeroUsize::new);
354 let interval_ms = NonZeroUsize::new(interval_ms)
355 .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
356
357 self.inner_mut()?
358 .core_mut()
359 .subscribe_book_at_interval::<PyDataActorInner>(
360 instrument_id,
361 book_type,
362 depth,
363 interval_ms,
364 client_id,
365 params,
366 );
367 Ok(())
368 }
369
370 #[pyo3(name = "subscribe_quotes")]
371 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
372 fn py_subscribe_quotes(
373 &mut self,
374 instrument_id: InstrumentId,
375 client_id: Option<ClientId>,
376 params: Option<IndexMap<String, String>>,
377 ) -> PyResult<()> {
378 self.inner_mut()?
379 .core_mut()
380 .subscribe_quotes::<PyDataActorInner>(instrument_id, client_id, params);
381 Ok(())
382 }
383
384 #[pyo3(name = "subscribe_trades")]
385 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
386 fn py_subscribe_trades(
387 &mut self,
388 instrument_id: InstrumentId,
389 client_id: Option<ClientId>,
390 params: Option<IndexMap<String, String>>,
391 ) -> PyResult<()> {
392 self.inner_mut()?
393 .core_mut()
394 .subscribe_trades::<PyDataActorInner>(instrument_id, client_id, params);
395 Ok(())
396 }
397
398 #[pyo3(name = "subscribe_bars")]
399 #[pyo3(signature = (bar_type, client_id=None, await_partial=false, params=None))]
400 fn py_subscribe_bars(
401 &mut self,
402 bar_type: BarType,
403 client_id: Option<ClientId>,
404 await_partial: bool,
405 params: Option<IndexMap<String, String>>,
406 ) -> PyResult<()> {
407 self.inner_mut()?
408 .core_mut()
409 .subscribe_bars::<PyDataActorInner>(bar_type, client_id, await_partial, params);
410 Ok(())
411 }
412
413 #[pyo3(name = "subscribe_mark_prices")]
414 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
415 fn py_subscribe_mark_prices(
416 &mut self,
417 instrument_id: InstrumentId,
418 client_id: Option<ClientId>,
419 params: Option<IndexMap<String, String>>,
420 ) -> PyResult<()> {
421 self.inner_mut()?
422 .core_mut()
423 .subscribe_mark_prices::<PyDataActorInner>(instrument_id, client_id, params);
424 Ok(())
425 }
426
427 #[pyo3(name = "subscribe_index_prices")]
428 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
429 fn py_subscribe_index_prices(
430 &mut self,
431 instrument_id: InstrumentId,
432 client_id: Option<ClientId>,
433 params: Option<IndexMap<String, String>>,
434 ) -> PyResult<()> {
435 self.inner_mut()?
436 .core_mut()
437 .subscribe_index_prices::<PyDataActorInner>(instrument_id, client_id, params);
438 Ok(())
439 }
440
441 #[pyo3(name = "subscribe_instrument_status")]
442 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
443 fn py_subscribe_instrument_status(
444 &mut self,
445 instrument_id: InstrumentId,
446 client_id: Option<ClientId>,
447 params: Option<IndexMap<String, String>>,
448 ) -> PyResult<()> {
449 self.inner_mut()?
450 .core_mut()
451 .subscribe_instrument_status::<PyDataActorInner>(instrument_id, client_id, params);
452 Ok(())
453 }
454
455 #[pyo3(name = "subscribe_instrument_close")]
456 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
457 fn py_subscribe_instrument_close(
458 &mut self,
459 instrument_id: InstrumentId,
460 client_id: Option<ClientId>,
461 params: Option<IndexMap<String, String>>,
462 ) -> PyResult<()> {
463 self.inner_mut()?
464 .core_mut()
465 .subscribe_instrument_close::<PyDataActorInner>(instrument_id, client_id, params);
466 Ok(())
467 }
468
469 #[pyo3(name = "request_data")]
471 #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
472 fn py_request_data(
473 &mut self,
474 data_type: DataType,
475 client_id: ClientId,
476 start: Option<u64>,
477 end: Option<u64>,
478 limit: Option<usize>,
479 params: Option<IndexMap<String, String>>,
480 ) -> PyResult<String> {
481 let limit = limit.and_then(NonZeroUsize::new);
482 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
483 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
484
485 let request_id = self
486 .inner_mut()?
487 .core_mut()
488 .request_data::<PyDataActorInner>(data_type, client_id, start, end, limit, params)
489 .map_err(to_pyvalue_err)?;
490 Ok(request_id.to_string())
491 }
492
493 #[pyo3(name = "request_instrument")]
494 #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
495 fn py_request_instrument(
496 &mut self,
497 instrument_id: InstrumentId,
498 start: Option<u64>,
499 end: Option<u64>,
500 client_id: Option<ClientId>,
501 params: Option<IndexMap<String, String>>,
502 ) -> PyResult<String> {
503 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
504 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
505
506 let request_id = self
507 .inner_mut()?
508 .core_mut()
509 .request_instrument::<PyDataActorInner>(instrument_id, start, end, client_id, params)
510 .map_err(to_pyvalue_err)?;
511 Ok(request_id.to_string())
512 }
513
514 #[pyo3(name = "request_instruments")]
515 #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
516 fn py_request_instruments(
517 &mut self,
518 venue: Option<Venue>,
519 start: Option<u64>,
520 end: Option<u64>,
521 client_id: Option<ClientId>,
522 params: Option<IndexMap<String, String>>,
523 ) -> PyResult<String> {
524 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
525 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
526
527 let request_id = self
528 .inner_mut()?
529 .core_mut()
530 .request_instruments::<PyDataActorInner>(venue, start, end, client_id, params)
531 .map_err(to_pyvalue_err)?;
532 Ok(request_id.to_string())
533 }
534
535 #[pyo3(name = "request_book_snapshot")]
536 #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
537 fn py_request_book_snapshot(
538 &mut self,
539 instrument_id: InstrumentId,
540 depth: Option<usize>,
541 client_id: Option<ClientId>,
542 params: Option<IndexMap<String, String>>,
543 ) -> PyResult<String> {
544 let depth = depth.and_then(NonZeroUsize::new);
545
546 let request_id = self
547 .inner_mut()?
548 .core_mut()
549 .request_book_snapshot::<PyDataActorInner>(instrument_id, depth, client_id, params)
550 .map_err(to_pyvalue_err)?;
551 Ok(request_id.to_string())
552 }
553
554 #[pyo3(name = "request_quotes")]
555 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
556 fn py_request_quotes(
557 &mut self,
558 instrument_id: InstrumentId,
559 start: Option<u64>,
560 end: Option<u64>,
561 limit: Option<usize>,
562 client_id: Option<ClientId>,
563 params: Option<IndexMap<String, String>>,
564 ) -> PyResult<String> {
565 let limit = limit.and_then(NonZeroUsize::new);
566 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
567 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
568
569 let request_id = self
570 .inner_mut()?
571 .core_mut()
572 .request_quotes::<PyDataActorInner>(instrument_id, start, end, limit, client_id, params)
573 .map_err(to_pyvalue_err)?;
574 Ok(request_id.to_string())
575 }
576
577 #[pyo3(name = "request_trades")]
578 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
579 fn py_request_trades(
580 &mut self,
581 instrument_id: InstrumentId,
582 start: Option<u64>,
583 end: Option<u64>,
584 limit: Option<usize>,
585 client_id: Option<ClientId>,
586 params: Option<IndexMap<String, String>>,
587 ) -> PyResult<String> {
588 let limit = limit.and_then(NonZeroUsize::new);
589 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
590 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
591
592 let request_id = self
593 .inner_mut()?
594 .core_mut()
595 .request_trades::<PyDataActorInner>(instrument_id, start, end, limit, client_id, params)
596 .map_err(to_pyvalue_err)?;
597 Ok(request_id.to_string())
598 }
599
600 #[pyo3(name = "request_bars")]
601 #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
602 fn py_request_bars(
603 &mut self,
604 bar_type: BarType,
605 start: Option<u64>,
606 end: Option<u64>,
607 limit: Option<usize>,
608 client_id: Option<ClientId>,
609 params: Option<IndexMap<String, String>>,
610 ) -> PyResult<String> {
611 let limit = limit.and_then(NonZeroUsize::new);
612 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
613 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
614
615 let request_id = self
616 .inner_mut()?
617 .core_mut()
618 .request_bars::<PyDataActorInner>(bar_type, start, end, limit, client_id, params)
619 .map_err(to_pyvalue_err)?;
620 Ok(request_id.to_string())
621 }
622
623 #[pyo3(name = "unsubscribe_data")]
625 #[pyo3(signature = (data_type, client_id=None, params=None))]
626 fn py_unsubscribe_data(
627 &mut self,
628 data_type: DataType,
629 client_id: Option<ClientId>,
630 params: Option<IndexMap<String, String>>,
631 ) -> PyResult<()> {
632 self.inner_mut()?
633 .core_mut()
634 .unsubscribe_data::<PyDataActorInner>(data_type, client_id, params);
635 Ok(())
636 }
637
638 #[pyo3(name = "unsubscribe_instruments")]
639 #[pyo3(signature = (venue, client_id=None, params=None))]
640 fn py_unsubscribe_instruments(
641 &mut self,
642 venue: Venue,
643 client_id: Option<ClientId>,
644 params: Option<IndexMap<String, String>>,
645 ) -> PyResult<()> {
646 self.inner_mut()?
647 .core_mut()
648 .unsubscribe_instruments::<PyDataActorInner>(venue, client_id, params);
649 Ok(())
650 }
651
652 #[pyo3(name = "unsubscribe_instrument")]
653 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
654 fn py_unsubscribe_instrument(
655 &mut self,
656 instrument_id: InstrumentId,
657 client_id: Option<ClientId>,
658 params: Option<IndexMap<String, String>>,
659 ) -> PyResult<()> {
660 self.inner_mut()?
661 .core_mut()
662 .unsubscribe_instrument::<PyDataActorInner>(instrument_id, client_id, params);
663 Ok(())
664 }
665
666 #[pyo3(name = "unsubscribe_book_deltas")]
667 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
668 fn py_unsubscribe_book_deltas(
669 &mut self,
670 instrument_id: InstrumentId,
671 client_id: Option<ClientId>,
672 params: Option<IndexMap<String, String>>,
673 ) -> PyResult<()> {
674 self.inner_mut()?
675 .core_mut()
676 .unsubscribe_book_deltas::<PyDataActorInner>(instrument_id, client_id, params);
677 Ok(())
678 }
679
680 #[pyo3(name = "unsubscribe_book_at_interval")]
681 #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
682 fn py_unsubscribe_book_at_interval(
683 &mut self,
684 instrument_id: InstrumentId,
685 interval_ms: usize,
686 client_id: Option<ClientId>,
687 params: Option<IndexMap<String, String>>,
688 ) -> PyResult<()> {
689 let interval_ms = NonZeroUsize::new(interval_ms)
690 .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
691
692 self.inner_mut()?
693 .core_mut()
694 .unsubscribe_book_at_interval::<PyDataActorInner>(
695 instrument_id,
696 interval_ms,
697 client_id,
698 params,
699 );
700 Ok(())
701 }
702
703 #[pyo3(name = "unsubscribe_quotes")]
704 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
705 fn py_unsubscribe_quotes(
706 &mut self,
707 instrument_id: InstrumentId,
708 client_id: Option<ClientId>,
709 params: Option<IndexMap<String, String>>,
710 ) -> PyResult<()> {
711 self.inner_mut()?
712 .core_mut()
713 .unsubscribe_quotes::<PyDataActorInner>(instrument_id, client_id, params);
714 Ok(())
715 }
716
717 #[pyo3(name = "unsubscribe_trades")]
718 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
719 fn py_unsubscribe_trades(
720 &mut self,
721 instrument_id: InstrumentId,
722 client_id: Option<ClientId>,
723 params: Option<IndexMap<String, String>>,
724 ) -> PyResult<()> {
725 self.inner_mut()?
726 .core_mut()
727 .unsubscribe_trades::<PyDataActorInner>(instrument_id, client_id, params);
728 Ok(())
729 }
730
731 #[pyo3(name = "unsubscribe_bars")]
732 #[pyo3(signature = (bar_type, client_id=None, params=None))]
733 fn py_unsubscribe_bars(
734 &mut self,
735 bar_type: BarType,
736 client_id: Option<ClientId>,
737 params: Option<IndexMap<String, String>>,
738 ) -> PyResult<()> {
739 self.inner_mut()?
740 .core_mut()
741 .unsubscribe_bars::<PyDataActorInner>(bar_type, client_id, params);
742 Ok(())
743 }
744
745 #[pyo3(name = "unsubscribe_mark_prices")]
746 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
747 fn py_unsubscribe_mark_prices(
748 &mut self,
749 instrument_id: InstrumentId,
750 client_id: Option<ClientId>,
751 params: Option<IndexMap<String, String>>,
752 ) -> PyResult<()> {
753 self.inner_mut()?
754 .core_mut()
755 .unsubscribe_mark_prices::<PyDataActorInner>(instrument_id, client_id, params);
756 Ok(())
757 }
758
759 #[pyo3(name = "unsubscribe_index_prices")]
760 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
761 fn py_unsubscribe_index_prices(
762 &mut self,
763 instrument_id: InstrumentId,
764 client_id: Option<ClientId>,
765 params: Option<IndexMap<String, String>>,
766 ) -> PyResult<()> {
767 self.inner_mut()?
768 .core_mut()
769 .unsubscribe_index_prices::<PyDataActorInner>(instrument_id, client_id, params);
770 Ok(())
771 }
772
773 #[pyo3(name = "unsubscribe_instrument_status")]
774 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
775 fn py_unsubscribe_instrument_status(
776 &mut self,
777 instrument_id: InstrumentId,
778 client_id: Option<ClientId>,
779 params: Option<IndexMap<String, String>>,
780 ) -> PyResult<()> {
781 self.inner_mut()?
782 .core_mut()
783 .unsubscribe_instrument_status::<PyDataActorInner>(instrument_id, client_id, params);
784 Ok(())
785 }
786
787 #[pyo3(name = "unsubscribe_instrument_close")]
788 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
789 fn py_unsubscribe_instrument_close(
790 &mut self,
791 instrument_id: InstrumentId,
792 client_id: Option<ClientId>,
793 params: Option<IndexMap<String, String>>,
794 ) -> PyResult<()> {
795 self.inner_mut()?
796 .core_mut()
797 .unsubscribe_instrument_close::<PyDataActorInner>(instrument_id, client_id, params);
798 Ok(())
799 }
800}
801
802impl Actor for PyDataActor {
803 fn id(&self) -> Ustr {
804 self.inner
805 .as_ref()
806 .map(|a| a.id())
807 .unwrap_or_else(|| Ustr::from("PyDataActor-Unregistered"))
808 }
809
810 fn handle(&mut self, msg: &dyn std::any::Any) {
811 if let Some(inner) = &mut self.inner {
812 inner.handle(msg)
813 }
814 }
815
816 fn as_any(&self) -> &dyn std::any::Any {
817 self
818 }
819}
820
821impl DataActor for PyDataActor {
822 fn state(&self) -> ComponentState {
823 self.inner
824 .as_ref()
825 .map(|a| a.state())
826 .unwrap_or(ComponentState::PreInitialized)
827 }
828}
829
830#[cfg(test)]
834mod tests {
835 use std::{cell::RefCell, rc::Rc, str::FromStr};
836
837 use nautilus_model::{
838 data::{BarType, DataType},
839 enums::BookType,
840 identifiers::{ClientId, TraderId, Venue},
841 instruments::{CurrencyPair, stubs::audusd_sim},
842 };
843 use rstest::{fixture, rstest};
844
845 use super::PyDataActor;
846 use crate::{
847 actor::{Actor, DataActor, data_actor::DataActorConfig},
848 cache::Cache,
849 clock::TestClock,
850 enums::ComponentState,
851 };
852
853 #[fixture]
854 fn clock() -> Rc<RefCell<TestClock>> {
855 Rc::new(RefCell::new(TestClock::new()))
856 }
857
858 #[fixture]
859 fn cache() -> Rc<RefCell<Cache>> {
860 Rc::new(RefCell::new(Cache::new(None, None)))
861 }
862
863 #[fixture]
864 fn trader_id() -> TraderId {
865 TraderId::from("TRADER-001")
866 }
867
868 #[fixture]
869 fn client_id() -> ClientId {
870 ClientId::new("TestClient")
871 }
872
873 #[fixture]
874 fn venue() -> Venue {
875 Venue::from("SIM")
876 }
877
878 #[fixture]
879 fn data_type() -> DataType {
880 DataType::new("TestData", None)
881 }
882
883 #[fixture]
884 fn bar_type(audusd_sim: CurrencyPair) -> BarType {
885 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
886 }
887
888 fn create_unregistered_actor() -> PyDataActor {
889 let config = DataActorConfig::default();
890 PyDataActor {
891 inner: None,
892 config,
893 }
894 }
895
896 fn create_registered_actor(
897 clock: Rc<RefCell<TestClock>>,
898 cache: Rc<RefCell<Cache>>,
899 trader_id: TraderId,
900 ) -> PyDataActor {
901 let config = DataActorConfig::default();
902 let mut actor = PyDataActor {
903 inner: None,
904 config,
905 };
906 actor.register(trader_id, cache, clock).unwrap();
907 actor
908 }
909
910 #[rstest]
911 fn test_new_actor_creation() {
912 let actor = PyDataActor::py_new(None).unwrap();
913 assert!(actor.inner.is_none());
914 }
915
916 #[rstest]
917 fn test_unregistered_actor_errors(data_type: DataType, client_id: ClientId) {
918 let mut actor = create_unregistered_actor();
919
920 assert!(actor.actor_id().is_err());
921 assert!(actor.state().is_err());
922 assert!(actor.trader_id().is_err());
923 assert!(actor.is_ready().is_err());
924 assert!(actor.is_running().is_err());
925 assert!(actor.is_stopped().is_err());
926 assert!(actor.is_disposed().is_err());
927 assert!(actor.is_degraded().is_err());
928 assert!(actor.is_faulting().is_err());
929 assert!(
930 actor
931 .py_subscribe_data(data_type.clone(), Some(client_id.clone()), None)
932 .is_err()
933 );
934 assert!(
935 actor
936 .py_request_data(data_type, client_id, None, None, None, None)
937 .is_err()
938 );
939 assert!(actor.py_initialize().is_err());
940 assert!(actor.py_start().is_err());
941 }
942
943 #[rstest]
944 fn test_registration_success(
945 clock: Rc<RefCell<TestClock>>,
946 cache: Rc<RefCell<Cache>>,
947 trader_id: TraderId,
948 ) {
949 let mut actor = create_unregistered_actor();
950 let result = actor.register(trader_id, cache, clock);
951 assert!(result.is_ok());
952 assert!(actor.inner.is_some());
953 }
954
955 #[rstest]
956 fn test_double_registration_fails(
957 clock: Rc<RefCell<TestClock>>,
958 cache: Rc<RefCell<Cache>>,
959 trader_id: TraderId,
960 ) {
961 pyo3::prepare_freethreaded_python();
962
963 let mut actor = create_unregistered_actor();
964 actor
965 .register(trader_id, cache.clone(), clock.clone())
966 .unwrap();
967
968 let result = actor.register(trader_id, cache, clock);
969 assert!(result.is_err());
970 assert_eq!(
971 result.unwrap_err().to_string(),
972 "RuntimeError: DataActor has already been registered"
973 );
974 }
975
976 #[rstest]
977 fn test_registered_actor_basic_properties(
978 clock: Rc<RefCell<TestClock>>,
979 cache: Rc<RefCell<Cache>>,
980 trader_id: TraderId,
981 ) {
982 let actor = create_registered_actor(clock, cache, trader_id);
983
984 assert!(actor.actor_id().is_ok());
985 assert_eq!(actor.state().unwrap(), ComponentState::PreInitialized);
986 assert_eq!(actor.trader_id().unwrap(), Some(trader_id.to_string()));
987 assert_eq!(actor.is_ready().unwrap(), false);
988 assert_eq!(actor.is_running().unwrap(), false);
989 assert_eq!(actor.is_stopped().unwrap(), false);
990 assert_eq!(actor.is_disposed().unwrap(), false);
991 assert_eq!(actor.is_degraded().unwrap(), false);
992 assert_eq!(actor.is_faulting().unwrap(), false);
993 }
994
995 #[rstest]
996 fn test_basic_subscription_methods_compile(
997 clock: Rc<RefCell<TestClock>>,
998 cache: Rc<RefCell<Cache>>,
999 trader_id: TraderId,
1000 data_type: DataType,
1001 client_id: ClientId,
1002 audusd_sim: CurrencyPair,
1003 ) {
1004 let mut actor = create_registered_actor(clock, cache, trader_id);
1005
1006 let _ = actor.py_subscribe_data(data_type.clone(), Some(client_id.clone()), None);
1007 let _ = actor.py_subscribe_quotes(audusd_sim.id, Some(client_id.clone()), None);
1008 let _ = actor.py_unsubscribe_data(data_type, Some(client_id.clone()), None);
1009 let _ = actor.py_unsubscribe_quotes(audusd_sim.id, Some(client_id), None);
1010 }
1011
1012 #[rstest]
1013 fn test_lifecycle_methods_pass_through(
1014 clock: Rc<RefCell<TestClock>>,
1015 cache: Rc<RefCell<Cache>>,
1016 trader_id: TraderId,
1017 ) {
1018 let mut actor = create_registered_actor(clock, cache, trader_id);
1019
1020 assert!(actor.py_initialize().is_ok());
1021 assert!(actor.py_start().is_ok());
1022 assert!(actor.py_stop().is_ok());
1023 assert!(actor.py_dispose().is_ok());
1024 }
1025
1026 #[rstest]
1027 fn test_warning_event_methods_pass_through(
1028 clock: Rc<RefCell<TestClock>>,
1029 cache: Rc<RefCell<Cache>>,
1030 trader_id: TraderId,
1031 ) {
1032 let mut actor = create_registered_actor(clock, cache, trader_id);
1033
1034 assert!(actor.py_register_warning_event("TestWarning").is_ok());
1035 assert!(actor.py_deregister_warning_event("TestWarning").is_ok());
1036 }
1037
1038 #[rstest]
1039 fn test_shutdown_system_passes_through(
1040 clock: Rc<RefCell<TestClock>>,
1041 cache: Rc<RefCell<Cache>>,
1042 trader_id: TraderId,
1043 ) {
1044 let actor = create_registered_actor(clock, cache, trader_id);
1045
1046 assert!(
1047 actor
1048 .py_shutdown_system(Some("Test shutdown".to_string()))
1049 .is_ok()
1050 );
1051 assert!(actor.py_shutdown_system(None).is_ok());
1052 }
1053
1054 #[rstest]
1055 fn test_book_at_interval_invalid_interval_ms(
1056 clock: Rc<RefCell<TestClock>>,
1057 cache: Rc<RefCell<Cache>>,
1058 trader_id: TraderId,
1059 audusd_sim: CurrencyPair,
1060 ) {
1061 pyo3::prepare_freethreaded_python();
1062
1063 let mut actor = create_registered_actor(clock, cache, trader_id);
1064
1065 let result = actor.py_subscribe_book_at_interval(
1066 audusd_sim.id,
1067 BookType::L2_MBP,
1068 0,
1069 None,
1070 None,
1071 None,
1072 );
1073 assert!(result.is_err());
1074 assert_eq!(
1075 result.unwrap_err().to_string(),
1076 "ValueError: interval_ms must be > 0"
1077 );
1078
1079 let result = actor.py_unsubscribe_book_at_interval(audusd_sim.id, 0, None, None);
1080 assert!(result.is_err());
1081 assert_eq!(
1082 result.unwrap_err().to_string(),
1083 "ValueError: interval_ms must be > 0"
1084 );
1085 }
1086
1087 #[rstest]
1088 fn test_request_methods_signatures_exist() {
1089 let actor = create_unregistered_actor();
1090
1091 assert!(actor.inner.is_none()); }
1095
1096 #[rstest]
1097 fn test_actor_trait_implementation() {
1098 let actor = create_unregistered_actor();
1099
1100 let id = actor.id();
1102 assert_eq!(id.as_str(), "PyDataActor-Unregistered");
1103
1104 let dummy_msg = "test message";
1106 let mut actor = actor;
1107 actor.handle(&dummy_msg);
1108
1109 let any_ref = actor.as_any();
1111 assert!(any_ref.is::<PyDataActor>());
1112 }
1113
1114 #[rstest]
1115 fn test_data_actor_trait_implementation(
1116 clock: Rc<RefCell<TestClock>>,
1117 cache: Rc<RefCell<Cache>>,
1118 trader_id: TraderId,
1119 ) {
1120 let actor = create_registered_actor(clock, cache, trader_id);
1121
1122 let state = DataActor::state(&actor);
1124 assert_eq!(state, ComponentState::PreInitialized);
1125 }
1126}