1#![allow(dead_code)]
18#![allow(unused_variables)]
19#![allow(unused_imports)]
20#![allow(clippy::too_many_arguments)]
21
22use std::{
23 any::{Any, TypeId},
24 cell::{Ref, RefCell, RefMut, UnsafeCell},
25 collections::HashSet,
26 fmt::Debug,
27 num::NonZeroUsize,
28 ops::{Deref, DerefMut},
29 rc::Rc,
30 sync::Arc,
31};
32
33use ahash::{AHashMap, AHashSet};
34#[cfg(feature = "defi")]
35use alloy_primitives::Address;
36use chrono::{DateTime, Utc};
37use indexmap::IndexMap;
38use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
39#[cfg(feature = "defi")]
40use nautilus_model::defi::{Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap};
41use nautilus_model::{
42 data::{
43 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
44 OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
45 },
46 enums::BookType,
47 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
48 instruments::{Instrument, InstrumentAny},
49 orderbook::OrderBook,
50};
51use ustr::Ustr;
52use uuid::Uuid;
53
54#[cfg(feature = "indicators")]
55use super::indicators::Indicators;
56use super::{
57 Actor,
58 registry::{get_actor, get_actor_unchecked, try_get_actor_unchecked},
59};
60#[cfg(feature = "defi")]
61use crate::msgbus::switchboard::{
62 get_defi_blocks_topic, get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
63};
64use crate::{
65 cache::Cache,
66 clock::Clock,
67 component::Component,
68 enums::{ComponentState, ComponentTrigger},
69 logging::{CMD, RECV, REQ, SEND},
70 messages::{
71 data::{
72 BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
73 InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
74 RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
75 SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
76 SubscribeCustomData, SubscribeIndexPrices, SubscribeInstrument,
77 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
78 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
79 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
80 UnsubscribeCustomData, UnsubscribeIndexPrices, UnsubscribeInstrument,
81 UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
82 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
83 },
84 system::ShutdownSystem,
85 },
86 msgbus::{
87 self, MStr, Pattern, Topic, get_message_bus,
88 handler::{MessageHandler, ShareableMessageHandler, TypedMessageHandler},
89 switchboard::{
90 self, MessagingSwitchboard, get_bars_topic, get_book_deltas_topic,
91 get_book_snapshots_topic, get_custom_topic, get_index_price_topic,
92 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
93 get_instruments_topic, get_mark_price_topic, get_quotes_topic, get_trades_topic,
94 },
95 },
96 signal::Signal,
97 timer::{TimeEvent, TimeEventCallback},
98};
99
100#[derive(Debug, Clone)]
102pub struct DataActorConfig {
103 pub actor_id: Option<ActorId>,
105 pub log_events: bool,
107 pub log_commands: bool,
109}
110
111impl Default for DataActorConfig {
112 fn default() -> Self {
113 Self {
114 actor_id: None,
115 log_events: true,
116 log_commands: true,
117 }
118 }
119}
120
121type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; pub trait DataActor:
124 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
125{
126 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
132 Ok(IndexMap::new())
133 }
134
135 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
141 Ok(())
142 }
143
144 fn on_start(&mut self) -> anyhow::Result<()> {
150 log::warn!(
151 "The `on_start` handler was called when not overridden, \
152 it's expected that any actions required when starting the actor \
153 occur here, such as subscribing/requesting data"
154 );
155 Ok(())
156 }
157
158 fn on_stop(&mut self) -> anyhow::Result<()> {
164 log::warn!(
165 "The `on_stop` handler was called when not overridden, \
166 it's expected that any actions required when stopping the actor \
167 occur here, such as unsubscribing from data",
168 );
169 Ok(())
170 }
171
172 fn on_resume(&mut self) -> anyhow::Result<()> {
178 log::warn!(
179 "The `on_resume` handler was called when not overridden, \
180 it's expected that any actions required when resuming the actor \
181 following a stop occur here"
182 );
183 Ok(())
184 }
185
186 fn on_reset(&mut self) -> anyhow::Result<()> {
192 log::warn!(
193 "The `on_reset` handler was called when not overridden, \
194 it's expected that any actions required when resetting the actor \
195 occur here, such as resetting indicators and other state"
196 );
197 Ok(())
198 }
199
200 fn on_dispose(&mut self) -> anyhow::Result<()> {
206 Ok(())
207 }
208
209 fn on_degrade(&mut self) -> anyhow::Result<()> {
215 Ok(())
216 }
217
218 fn on_fault(&mut self) -> anyhow::Result<()> {
224 Ok(())
225 }
226
227 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
233 Ok(())
234 }
235
236 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
242 Ok(())
243 }
244
245 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
251 Ok(())
252 }
253
254 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
260 Ok(())
261 }
262
263 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
269 Ok(())
270 }
271
272 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
278 Ok(())
279 }
280
281 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
287 Ok(())
288 }
289
290 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
296 Ok(())
297 }
298
299 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
305 Ok(())
306 }
307
308 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
314 Ok(())
315 }
316
317 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
323 Ok(())
324 }
325
326 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
332 Ok(())
333 }
334
335 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
341 Ok(())
342 }
343
344 #[cfg(feature = "defi")]
345 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
351 Ok(())
352 }
353
354 #[cfg(feature = "defi")]
355 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
361 Ok(())
362 }
363
364 #[cfg(feature = "defi")]
365 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
371 Ok(())
372 }
373
374 #[cfg(feature = "defi")]
375 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
381 Ok(())
382 }
383
384 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
390 Ok(())
391 }
392
393 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
399 Ok(())
400 }
401
402 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
408 Ok(())
409 }
410
411 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
417 Ok(())
418 }
419
420 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
426 Ok(())
427 }
428
429 fn on_historical_index_prices(
435 &mut self,
436 index_prices: &[IndexPriceUpdate],
437 ) -> anyhow::Result<()> {
438 Ok(())
439 }
440
441 fn handle_time_event(&mut self, event: &TimeEvent) {
443 log_received(&event);
444
445 if let Err(e) = DataActor::on_time_event(self, event) {
446 log_error(&e);
447 }
448 }
449
450 fn handle_data(&mut self, data: &dyn Any) {
452 log_received(&data);
453
454 if self.not_running() {
455 log_not_running(&data);
456 return;
457 }
458
459 if let Err(e) = self.on_data(data) {
460 log_error(&e);
461 }
462 }
463
464 fn handle_signal(&mut self, signal: &Signal) {
466 log_received(&signal);
467
468 if self.not_running() {
469 log_not_running(&signal);
470 return;
471 }
472
473 if let Err(e) = self.on_signal(signal) {
474 log_error(&e);
475 }
476 }
477
478 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
480 log_received(&instrument);
481
482 if self.not_running() {
483 log_not_running(&instrument);
484 return;
485 }
486
487 if let Err(e) = self.on_instrument(instrument) {
488 log_error(&e);
489 }
490 }
491
492 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
494 log_received(&deltas);
495
496 if self.not_running() {
497 log_not_running(&deltas);
498 return;
499 }
500
501 if let Err(e) = self.on_book_deltas(deltas) {
502 log_error(&e);
503 }
504 }
505
506 fn handle_book(&mut self, book: &OrderBook) {
508 log_received(&book);
509
510 if self.not_running() {
511 log_not_running(&book);
512 return;
513 }
514
515 if let Err(e) = self.on_book(book) {
516 log_error(&e);
517 };
518 }
519
520 fn handle_quote(&mut self, quote: &QuoteTick) {
522 log_received("e);
523
524 if self.not_running() {
525 log_not_running("e);
526 return;
527 }
528
529 if let Err(e) = self.on_quote(quote) {
530 log_error(&e);
531 }
532 }
533
534 fn handle_trade(&mut self, trade: &TradeTick) {
536 log_received(&trade);
537
538 if self.not_running() {
539 log_not_running(&trade);
540 return;
541 }
542
543 if let Err(e) = self.on_trade(trade) {
544 log_error(&e);
545 }
546 }
547
548 fn handle_bar(&mut self, bar: &Bar) {
550 log_received(&bar);
551
552 if self.not_running() {
553 log_not_running(&bar);
554 return;
555 }
556
557 if let Err(e) = self.on_bar(bar) {
558 log_error(&e);
559 }
560 }
561
562 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
564 log_received(&mark_price);
565
566 if self.not_running() {
567 log_not_running(&mark_price);
568 return;
569 }
570
571 if let Err(e) = self.on_mark_price(mark_price) {
572 log_error(&e);
573 }
574 }
575
576 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
578 log_received(&index_price);
579
580 if self.not_running() {
581 log_not_running(&index_price);
582 return;
583 }
584
585 if let Err(e) = self.on_index_price(index_price) {
586 log_error(&e);
587 }
588 }
589
590 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
592 log_received(&status);
593
594 if self.not_running() {
595 log_not_running(&status);
596 return;
597 }
598
599 if let Err(e) = self.on_instrument_status(status) {
600 log_error(&e);
601 }
602 }
603
604 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
606 log_received(&close);
607
608 if self.not_running() {
609 log_not_running(&close);
610 return;
611 }
612
613 if let Err(e) = self.on_instrument_close(close) {
614 log_error(&e);
615 }
616 }
617
618 #[cfg(feature = "defi")]
619 fn handle_block(&mut self, block: &Block) {
621 log_received(&block);
622
623 if self.not_running() {
624 log_not_running(&block);
625 return;
626 }
627
628 if let Err(e) = self.on_block(block) {
629 log_error(&e);
630 }
631 }
632
633 #[cfg(feature = "defi")]
634 fn handle_pool(&mut self, pool: &Pool) {
636 log_received(&pool);
637
638 if self.not_running() {
639 log_not_running(&pool);
640 return;
641 }
642
643 if let Err(e) = self.on_pool(pool) {
644 log_error(&e);
645 }
646 }
647
648 #[cfg(feature = "defi")]
649 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
651 log_received(&swap);
652
653 if self.not_running() {
654 log_not_running(&swap);
655 return;
656 }
657
658 if let Err(e) = self.on_pool_swap(swap) {
659 log_error(&e);
660 }
661 }
662
663 #[cfg(feature = "defi")]
664 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
666 log_received(&update);
667
668 if self.not_running() {
669 log_not_running(&update);
670 return;
671 }
672
673 if let Err(e) = self.on_pool_liquidity_update(update) {
674 log_error(&e);
675 }
676 }
677
678 fn handle_historical_data(&mut self, data: &dyn Any) {
680 log_received(&data);
681
682 if let Err(e) = self.on_historical_data(data) {
683 log_error(&e);
684 }
685 }
686
687 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
689 log_received(&resp);
690
691 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
692 log_error(&e);
693 }
694 }
695
696 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
698 log_received(&resp);
699
700 if let Err(e) = self.on_instrument(&resp.data) {
701 log_error(&e);
702 }
703 }
704
705 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
707 log_received(&resp);
708
709 for inst in &resp.data {
710 if let Err(e) = self.on_instrument(inst) {
711 log_error(&e);
712 }
713 }
714 }
715
716 fn handle_book_response(&mut self, resp: &BookResponse) {
718 log_received(&resp);
719
720 if let Err(e) = self.on_book(&resp.data) {
721 log_error(&e);
722 }
723 }
724
725 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
727 log_received(&resp);
728
729 if let Err(e) = self.on_historical_quotes(&resp.data) {
730 log_error(&e);
731 }
732 }
733
734 fn handle_trades_response(&mut self, resp: &TradesResponse) {
736 log_received(&resp);
737
738 if let Err(e) = self.on_historical_trades(&resp.data) {
739 log_error(&e);
740 }
741 }
742
743 fn handle_bars_response(&mut self, resp: &BarsResponse) {
745 log_received(&resp);
746
747 if let Err(e) = self.on_historical_bars(&resp.data) {
748 log_error(&e);
749 }
750 }
751
752 fn subscribe_data(
754 &mut self,
755 data_type: DataType,
756 client_id: Option<ClientId>,
757 params: Option<IndexMap<String, String>>,
758 ) where
759 Self: 'static + Debug + Sized,
760 {
761 let actor_id = self.actor_id().inner();
762 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
763 move |data: &dyn Any| {
764 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
765 },
766 )));
767
768 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
769 }
770
771 fn subscribe_quotes(
773 &mut self,
774 instrument_id: InstrumentId,
775 client_id: Option<ClientId>,
776 params: Option<IndexMap<String, String>>,
777 ) where
778 Self: 'static + Debug + Sized,
779 {
780 let actor_id = self.actor_id().inner();
781 let topic = get_quotes_topic(instrument_id);
782
783 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
784 move |quote: &QuoteTick| {
785 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
786 actor.handle_quote(quote);
787 } else {
788 log::error!("Actor {actor_id} not found for quote handling");
789 }
790 },
791 )));
792
793 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
794 }
795
796 fn subscribe_instruments(
798 &mut self,
799 venue: Venue,
800 client_id: Option<ClientId>,
801 params: Option<IndexMap<String, String>>,
802 ) where
803 Self: 'static + Debug + Sized,
804 {
805 let actor_id = self.actor_id().inner();
806 let topic = get_instruments_topic(venue);
807
808 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
809 move |instrument: &InstrumentAny| {
810 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
811 actor.handle_instrument(instrument);
812 } else {
813 log::error!("Actor {actor_id} not found for instruments handling");
814 }
815 },
816 )));
817
818 DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
819 }
820
821 fn subscribe_instrument(
823 &mut self,
824 instrument_id: InstrumentId,
825 client_id: Option<ClientId>,
826 params: Option<IndexMap<String, String>>,
827 ) where
828 Self: 'static + Debug + Sized,
829 {
830 let actor_id = self.actor_id().inner();
831 let topic = get_instrument_topic(instrument_id);
832
833 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
834 move |instrument: &InstrumentAny| {
835 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
836 actor.handle_instrument(instrument);
837 } else {
838 log::error!("Actor {actor_id} not found for instrument handling");
839 }
840 },
841 )));
842
843 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
844 }
845
846 fn subscribe_book_deltas(
848 &mut self,
849 instrument_id: InstrumentId,
850 book_type: BookType,
851 depth: Option<NonZeroUsize>,
852 client_id: Option<ClientId>,
853 managed: bool,
854 params: Option<IndexMap<String, String>>,
855 ) where
856 Self: 'static + Debug + Sized,
857 {
858 let actor_id = self.actor_id().inner();
859 let topic = get_book_deltas_topic(instrument_id);
860
861 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
862 move |deltas: &OrderBookDeltas| {
863 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
864 },
865 )));
866
867 DataActorCore::subscribe_book_deltas(
868 self,
869 topic,
870 handler,
871 instrument_id,
872 book_type,
873 depth,
874 client_id,
875 managed,
876 params,
877 );
878 }
879
880 fn subscribe_book_at_interval(
882 &mut self,
883 instrument_id: InstrumentId,
884 book_type: BookType,
885 depth: Option<NonZeroUsize>,
886 interval_ms: NonZeroUsize,
887 client_id: Option<ClientId>,
888 params: Option<IndexMap<String, String>>,
889 ) where
890 Self: 'static + Debug + Sized,
891 {
892 let actor_id = self.actor_id().inner();
893 let topic = get_book_snapshots_topic(instrument_id);
894
895 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
896 move |book: &OrderBook| {
897 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
898 },
899 )));
900
901 DataActorCore::subscribe_book_at_interval(
902 self,
903 topic,
904 handler,
905 instrument_id,
906 book_type,
907 depth,
908 interval_ms,
909 client_id,
910 params,
911 );
912 }
913
914 fn subscribe_trades(
916 &mut self,
917 instrument_id: InstrumentId,
918 client_id: Option<ClientId>,
919 params: Option<IndexMap<String, String>>,
920 ) where
921 Self: 'static + Debug + Sized,
922 {
923 let actor_id = self.actor_id().inner();
924 let topic = get_trades_topic(instrument_id);
925
926 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
927 move |trade: &TradeTick| {
928 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
929 },
930 )));
931
932 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
933 }
934
935 fn subscribe_bars(
937 &mut self,
938 bar_type: BarType,
939 client_id: Option<ClientId>,
940 await_partial: bool,
941 params: Option<IndexMap<String, String>>,
942 ) where
943 Self: 'static + Debug + Sized,
944 {
945 let actor_id = self.actor_id().inner();
946 let topic = get_bars_topic(bar_type);
947
948 let handler =
949 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
950 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
951 })));
952
953 DataActorCore::subscribe_bars(
954 self,
955 topic,
956 handler,
957 bar_type,
958 client_id,
959 await_partial,
960 params,
961 );
962 }
963
964 fn subscribe_mark_prices(
966 &mut self,
967 instrument_id: InstrumentId,
968 client_id: Option<ClientId>,
969 params: Option<IndexMap<String, String>>,
970 ) where
971 Self: 'static + Debug + Sized,
972 {
973 let actor_id = self.actor_id().inner();
974 let topic = get_mark_price_topic(instrument_id);
975
976 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
977 move |mark_price: &MarkPriceUpdate| {
978 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
979 },
980 )));
981
982 DataActorCore::subscribe_mark_prices(
983 self,
984 topic,
985 handler,
986 instrument_id,
987 client_id,
988 params,
989 );
990 }
991
992 fn subscribe_index_prices(
994 &mut self,
995 instrument_id: InstrumentId,
996 client_id: Option<ClientId>,
997 params: Option<IndexMap<String, String>>,
998 ) where
999 Self: 'static + Debug + Sized,
1000 {
1001 let actor_id = self.actor_id().inner();
1002 let topic = get_index_price_topic(instrument_id);
1003
1004 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1005 move |index_price: &IndexPriceUpdate| {
1006 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1007 },
1008 )));
1009
1010 DataActorCore::subscribe_index_prices(
1011 self,
1012 topic,
1013 handler,
1014 instrument_id,
1015 client_id,
1016 params,
1017 );
1018 }
1019
1020 fn subscribe_instrument_status(
1022 &mut self,
1023 instrument_id: InstrumentId,
1024 client_id: Option<ClientId>,
1025 params: Option<IndexMap<String, String>>,
1026 ) where
1027 Self: 'static + Debug + Sized,
1028 {
1029 let actor_id = self.actor_id().inner();
1030 let topic = get_instrument_status_topic(instrument_id);
1031
1032 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1033 move |status: &InstrumentStatus| {
1034 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1035 },
1036 )));
1037
1038 DataActorCore::subscribe_instrument_status(
1039 self,
1040 topic,
1041 handler,
1042 instrument_id,
1043 client_id,
1044 params,
1045 );
1046 }
1047
1048 fn subscribe_instrument_close(
1050 &mut self,
1051 instrument_id: InstrumentId,
1052 client_id: Option<ClientId>,
1053 params: Option<IndexMap<String, String>>,
1054 ) where
1055 Self: 'static + Debug + Sized,
1056 {
1057 let actor_id = self.actor_id().inner();
1058 let topic = get_instrument_close_topic(instrument_id);
1059
1060 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1061 move |close: &InstrumentClose| {
1062 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1063 },
1064 )));
1065
1066 DataActorCore::subscribe_instrument_close(
1067 self,
1068 topic,
1069 handler,
1070 instrument_id,
1071 client_id,
1072 params,
1073 );
1074 }
1075
1076 #[cfg(feature = "defi")]
1077 fn subscribe_blocks(
1079 &mut self,
1080 chain: Blockchain,
1081 client_id: Option<ClientId>,
1082 params: Option<IndexMap<String, String>>,
1083 ) where
1084 Self: 'static + Debug + Sized,
1085 {
1086 let actor_id = self.actor_id().inner();
1087 let topic = get_defi_blocks_topic(chain);
1088
1089 let handler =
1090 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1091 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1092 })));
1093
1094 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1095 }
1096
1097 #[cfg(feature = "defi")]
1098 fn subscribe_pool(
1100 &mut self,
1101 address: Address,
1102 client_id: Option<ClientId>,
1103 params: Option<IndexMap<String, String>>,
1104 ) where
1105 Self: 'static + Debug + Sized,
1106 {
1107 let actor_id = self.actor_id().inner();
1108 let topic = get_defi_pool_topic(address);
1109
1110 let handler =
1111 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1112 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1113 })));
1114
1115 DataActorCore::subscribe_pool(self, topic, handler, address, client_id, params);
1116 }
1117
1118 #[cfg(feature = "defi")]
1119 fn subscribe_pool_swaps(
1121 &mut self,
1122 address: Address,
1123 client_id: Option<ClientId>,
1124 params: Option<IndexMap<String, String>>,
1125 ) where
1126 Self: 'static + Debug + Sized,
1127 {
1128 let actor_id = self.actor_id().inner();
1129 let topic = get_defi_pool_swaps_topic(address);
1130
1131 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1132 move |swap: &PoolSwap| {
1133 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1134 },
1135 )));
1136
1137 DataActorCore::subscribe_pool_swaps(self, topic, handler, address, client_id, params);
1138 }
1139
1140 #[cfg(feature = "defi")]
1141 fn subscribe_pool_liquidity_updates(
1143 &mut self,
1144 address: Address,
1145 client_id: Option<ClientId>,
1146 params: Option<IndexMap<String, String>>,
1147 ) where
1148 Self: 'static + Debug + Sized,
1149 {
1150 let actor_id = self.actor_id().inner();
1151 let topic = get_defi_liquidity_topic(address);
1152
1153 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1154 move |update: &PoolLiquidityUpdate| {
1155 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1156 },
1157 )));
1158
1159 DataActorCore::subscribe_pool_liquidity_updates(
1160 self, topic, handler, address, client_id, params,
1161 );
1162 }
1163
1164 fn unsubscribe_data(
1166 &mut self,
1167 data_type: DataType,
1168 client_id: Option<ClientId>,
1169 params: Option<IndexMap<String, String>>,
1170 ) where
1171 Self: 'static + Debug + Sized,
1172 {
1173 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1174 }
1175
1176 fn unsubscribe_instruments(
1178 &mut self,
1179 venue: Venue,
1180 client_id: Option<ClientId>,
1181 params: Option<IndexMap<String, String>>,
1182 ) where
1183 Self: 'static + Debug + Sized,
1184 {
1185 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1186 }
1187
1188 fn unsubscribe_instrument(
1190 &mut self,
1191 instrument_id: InstrumentId,
1192 client_id: Option<ClientId>,
1193 params: Option<IndexMap<String, String>>,
1194 ) where
1195 Self: 'static + Debug + Sized,
1196 {
1197 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1198 }
1199
1200 fn unsubscribe_book_deltas(
1202 &mut self,
1203 instrument_id: InstrumentId,
1204 client_id: Option<ClientId>,
1205 params: Option<IndexMap<String, String>>,
1206 ) where
1207 Self: 'static + Debug + Sized,
1208 {
1209 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1210 }
1211
1212 fn unsubscribe_book_at_interval(
1214 &mut self,
1215 instrument_id: InstrumentId,
1216 interval_ms: NonZeroUsize,
1217 client_id: Option<ClientId>,
1218 params: Option<IndexMap<String, String>>,
1219 ) where
1220 Self: 'static + Debug + Sized,
1221 {
1222 DataActorCore::unsubscribe_book_at_interval(
1223 self,
1224 instrument_id,
1225 interval_ms,
1226 client_id,
1227 params,
1228 );
1229 }
1230
1231 fn unsubscribe_quotes(
1233 &mut self,
1234 instrument_id: InstrumentId,
1235 client_id: Option<ClientId>,
1236 params: Option<IndexMap<String, String>>,
1237 ) where
1238 Self: 'static + Debug + Sized,
1239 {
1240 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1241 }
1242
1243 fn unsubscribe_trades(
1245 &mut self,
1246 instrument_id: InstrumentId,
1247 client_id: Option<ClientId>,
1248 params: Option<IndexMap<String, String>>,
1249 ) where
1250 Self: 'static + Debug + Sized,
1251 {
1252 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1253 }
1254
1255 fn unsubscribe_bars(
1257 &mut self,
1258 bar_type: BarType,
1259 client_id: Option<ClientId>,
1260 params: Option<IndexMap<String, String>>,
1261 ) where
1262 Self: 'static + Debug + Sized,
1263 {
1264 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1265 }
1266
1267 fn unsubscribe_mark_prices(
1269 &mut self,
1270 instrument_id: InstrumentId,
1271 client_id: Option<ClientId>,
1272 params: Option<IndexMap<String, String>>,
1273 ) where
1274 Self: 'static + Debug + Sized,
1275 {
1276 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1277 }
1278
1279 fn unsubscribe_index_prices(
1281 &mut self,
1282 instrument_id: InstrumentId,
1283 client_id: Option<ClientId>,
1284 params: Option<IndexMap<String, String>>,
1285 ) where
1286 Self: 'static + Debug + Sized,
1287 {
1288 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1289 }
1290
1291 fn unsubscribe_instrument_status(
1293 &mut self,
1294 instrument_id: InstrumentId,
1295 client_id: Option<ClientId>,
1296 params: Option<IndexMap<String, String>>,
1297 ) where
1298 Self: 'static + Debug + Sized,
1299 {
1300 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1301 }
1302
1303 fn unsubscribe_instrument_close(
1305 &mut self,
1306 instrument_id: InstrumentId,
1307 client_id: Option<ClientId>,
1308 params: Option<IndexMap<String, String>>,
1309 ) where
1310 Self: 'static + Debug + Sized,
1311 {
1312 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1313 }
1314
1315 #[cfg(feature = "defi")]
1316 fn unsubscribe_blocks(
1318 &mut self,
1319 chain: Blockchain,
1320 client_id: Option<ClientId>,
1321 params: Option<IndexMap<String, String>>,
1322 ) where
1323 Self: 'static + Debug + Sized,
1324 {
1325 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1326 }
1327
1328 #[cfg(feature = "defi")]
1329 fn unsubscribe_pool(
1331 &mut self,
1332 address: Address,
1333 client_id: Option<ClientId>,
1334 params: Option<IndexMap<String, String>>,
1335 ) where
1336 Self: 'static + Debug + Sized,
1337 {
1338 DataActorCore::unsubscribe_pool(self, address, client_id, params);
1339 }
1340
1341 #[cfg(feature = "defi")]
1342 fn unsubscribe_pool_swaps(
1344 &mut self,
1345 address: Address,
1346 client_id: Option<ClientId>,
1347 params: Option<IndexMap<String, String>>,
1348 ) where
1349 Self: 'static + Debug + Sized,
1350 {
1351 DataActorCore::unsubscribe_pool_swaps(self, address, client_id, params);
1352 }
1353
1354 #[cfg(feature = "defi")]
1355 fn unsubscribe_pool_liquidity_updates(
1357 &mut self,
1358 address: Address,
1359 client_id: Option<ClientId>,
1360 params: Option<IndexMap<String, String>>,
1361 ) where
1362 Self: 'static + Debug + Sized,
1363 {
1364 DataActorCore::unsubscribe_pool_liquidity_updates(self, address, client_id, params);
1365 }
1366
1367 fn request_data(
1373 &mut self,
1374 data_type: DataType,
1375 client_id: ClientId,
1376 start: Option<DateTime<Utc>>,
1377 end: Option<DateTime<Utc>>,
1378 limit: Option<NonZeroUsize>,
1379 params: Option<IndexMap<String, String>>,
1380 ) -> anyhow::Result<UUID4>
1381 where
1382 Self: 'static + Debug + Sized,
1383 {
1384 let actor_id = self.actor_id().inner();
1385 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1386 move |resp: &CustomDataResponse| {
1387 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1388 },
1389 )));
1390
1391 DataActorCore::request_data(
1392 self, data_type, client_id, start, end, limit, params, handler,
1393 )
1394 }
1395
1396 fn request_instrument(
1402 &mut self,
1403 instrument_id: InstrumentId,
1404 start: Option<DateTime<Utc>>,
1405 end: Option<DateTime<Utc>>,
1406 client_id: Option<ClientId>,
1407 params: Option<IndexMap<String, String>>,
1408 ) -> anyhow::Result<UUID4>
1409 where
1410 Self: 'static + Debug + Sized,
1411 {
1412 let actor_id = self.actor_id().inner();
1413 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1414 move |resp: &InstrumentResponse| {
1415 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1416 },
1417 )));
1418
1419 DataActorCore::request_instrument(
1420 self,
1421 instrument_id,
1422 start,
1423 end,
1424 client_id,
1425 params,
1426 handler,
1427 )
1428 }
1429
1430 fn request_instruments(
1436 &mut self,
1437 venue: Option<Venue>,
1438 start: Option<DateTime<Utc>>,
1439 end: Option<DateTime<Utc>>,
1440 client_id: Option<ClientId>,
1441 params: Option<IndexMap<String, String>>,
1442 ) -> anyhow::Result<UUID4>
1443 where
1444 Self: 'static + Debug + Sized,
1445 {
1446 let actor_id = self.actor_id().inner();
1447 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1448 move |resp: &InstrumentsResponse| {
1449 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1450 },
1451 )));
1452
1453 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1454 }
1455
1456 fn request_book_snapshot(
1462 &mut self,
1463 instrument_id: InstrumentId,
1464 depth: Option<NonZeroUsize>,
1465 client_id: Option<ClientId>,
1466 params: Option<IndexMap<String, String>>,
1467 ) -> anyhow::Result<UUID4>
1468 where
1469 Self: 'static + Debug + Sized,
1470 {
1471 let actor_id = self.actor_id().inner();
1472 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1473 move |resp: &BookResponse| {
1474 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1475 },
1476 )));
1477
1478 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1479 }
1480
1481 fn request_quotes(
1487 &mut self,
1488 instrument_id: InstrumentId,
1489 start: Option<DateTime<Utc>>,
1490 end: Option<DateTime<Utc>>,
1491 limit: Option<NonZeroUsize>,
1492 client_id: Option<ClientId>,
1493 params: Option<IndexMap<String, String>>,
1494 ) -> anyhow::Result<UUID4>
1495 where
1496 Self: 'static + Debug + Sized,
1497 {
1498 let actor_id = self.actor_id().inner();
1499 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1500 move |resp: &QuotesResponse| {
1501 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1502 },
1503 )));
1504
1505 DataActorCore::request_quotes(
1506 self,
1507 instrument_id,
1508 start,
1509 end,
1510 limit,
1511 client_id,
1512 params,
1513 handler,
1514 )
1515 }
1516
1517 fn request_trades(
1523 &mut self,
1524 instrument_id: InstrumentId,
1525 start: Option<DateTime<Utc>>,
1526 end: Option<DateTime<Utc>>,
1527 limit: Option<NonZeroUsize>,
1528 client_id: Option<ClientId>,
1529 params: Option<IndexMap<String, String>>,
1530 ) -> anyhow::Result<UUID4>
1531 where
1532 Self: 'static + Debug + Sized,
1533 {
1534 let actor_id = self.actor_id().inner();
1535 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1536 move |resp: &TradesResponse| {
1537 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1538 },
1539 )));
1540
1541 DataActorCore::request_trades(
1542 self,
1543 instrument_id,
1544 start,
1545 end,
1546 limit,
1547 client_id,
1548 params,
1549 handler,
1550 )
1551 }
1552
1553 fn request_bars(
1559 &mut self,
1560 bar_type: BarType,
1561 start: Option<DateTime<Utc>>,
1562 end: Option<DateTime<Utc>>,
1563 limit: Option<NonZeroUsize>,
1564 client_id: Option<ClientId>,
1565 params: Option<IndexMap<String, String>>,
1566 ) -> anyhow::Result<UUID4>
1567 where
1568 Self: 'static + Debug + Sized,
1569 {
1570 let actor_id = self.actor_id().inner();
1571 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1572 move |resp: &BarsResponse| {
1573 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1574 },
1575 )));
1576
1577 DataActorCore::request_bars(
1578 self, bar_type, start, end, limit, client_id, params, handler,
1579 )
1580 }
1581}
1582
1583impl<T> Actor for T
1585where
1586 T: DataActor + Debug + 'static,
1587{
1588 fn id(&self) -> Ustr {
1589 self.actor_id.inner()
1590 }
1591
1592 fn handle(&mut self, msg: &dyn Any) {
1593 }
1595
1596 fn as_any(&self) -> &dyn Any {
1597 self
1598 }
1599}
1600
1601impl<T> Component for T
1603where
1604 T: DataActor + Debug + 'static,
1605{
1606 fn component_id(&self) -> ComponentId {
1607 ComponentId::new(self.actor_id.inner().as_str())
1608 }
1609
1610 fn state(&self) -> ComponentState {
1611 self.state
1612 }
1613
1614 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1615 self.state = self.state.transition(&trigger)?;
1616 log::info!("{}", self.state);
1617 Ok(())
1618 }
1619
1620 fn register(
1621 &mut self,
1622 trader_id: TraderId,
1623 clock: Rc<RefCell<dyn Clock>>,
1624 cache: Rc<RefCell<Cache>>,
1625 ) -> anyhow::Result<()> {
1626 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1627
1628 let actor_id = self.actor_id().inner();
1630 let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
1631 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1632 actor.handle_time_event(&event);
1633 } else {
1634 log::error!("Actor {actor_id} not found for time event handling");
1635 }
1636 }));
1637
1638 clock.borrow_mut().register_default_handler(callback);
1639
1640 self.initialize()
1641 }
1642
1643 fn on_start(&mut self) -> anyhow::Result<()> {
1644 DataActor::on_start(self)
1645 }
1646
1647 fn on_stop(&mut self) -> anyhow::Result<()> {
1648 DataActor::on_stop(self)
1649 }
1650
1651 fn on_resume(&mut self) -> anyhow::Result<()> {
1652 DataActor::on_resume(self)
1653 }
1654
1655 fn on_degrade(&mut self) -> anyhow::Result<()> {
1656 DataActor::on_degrade(self)
1657 }
1658
1659 fn on_fault(&mut self) -> anyhow::Result<()> {
1660 DataActor::on_fault(self)
1661 }
1662
1663 fn on_reset(&mut self) -> anyhow::Result<()> {
1664 DataActor::on_reset(self)
1665 }
1666
1667 fn on_dispose(&mut self) -> anyhow::Result<()> {
1668 DataActor::on_dispose(self)
1669 }
1670}
1671
1672pub struct DataActorCore {
1674 pub actor_id: ActorId,
1676 pub config: DataActorConfig,
1678 trader_id: Option<TraderId>,
1679 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
1682 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
1684 signal_classes: AHashMap<String, String>,
1685 #[cfg(feature = "indicators")]
1686 indicators: Indicators,
1687 topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
1688}
1689
1690impl Debug for DataActorCore {
1691 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1692 f.debug_struct(stringify!(DataActorCore))
1693 .field("actor_id", &self.actor_id)
1694 .field("config", &self.config)
1695 .field("state", &self.state)
1696 .field("trader_id", &self.trader_id)
1697 .finish()
1698 }
1699}
1700
1701impl DataActorCore {
1702 pub fn new(config: DataActorConfig) -> Self {
1704 let actor_id = config
1705 .actor_id
1706 .unwrap_or_else(|| Self::default_actor_id(&config));
1707
1708 Self {
1709 actor_id,
1710 config,
1711 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
1715 warning_events: AHashSet::new(),
1716 pending_requests: AHashMap::new(),
1717 signal_classes: AHashMap::new(),
1718 #[cfg(feature = "indicators")]
1719 indicators: Indicators::default(),
1720 topic_handlers: AHashMap::new(),
1721 }
1722 }
1723
1724 pub fn trader_id(&self) -> Option<TraderId> {
1726 self.trader_id
1727 }
1728
1729 pub fn actor_id(&self) -> ActorId {
1730 self.actor_id
1731 }
1732
1733 fn default_actor_id(config: &DataActorConfig) -> ActorId {
1734 let memory_address = std::ptr::from_ref(config) as *const _ as usize;
1735 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
1736 }
1737
1738 pub fn timestamp_ns(&self) -> UnixNanos {
1739 self.clock_ref().timestamp_ns()
1740 }
1741
1742 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
1748 self.clock
1749 .as_ref()
1750 .unwrap_or_else(|| {
1751 panic!(
1752 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
1753 self.actor_id, self.trader_id
1754 )
1755 })
1756 .borrow_mut()
1757 }
1758
1759 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
1760 self.clock
1761 .as_ref()
1762 .unwrap_or_else(|| {
1763 panic!(
1764 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
1765 self.actor_id, self.trader_id
1766 )
1767 })
1768 .borrow()
1769 }
1770
1771 pub fn register(
1780 &mut self,
1781 trader_id: TraderId,
1782 clock: Rc<RefCell<dyn Clock>>,
1783 cache: Rc<RefCell<Cache>>,
1784 ) -> anyhow::Result<()> {
1785 if let Some(existing_trader_id) = self.trader_id {
1786 anyhow::bail!(
1787 "DataActor {} already registered with trader {existing_trader_id}",
1788 self.actor_id
1789 );
1790 }
1791
1792 {
1794 let _timestamp = clock.borrow().timestamp_ns();
1795 }
1796
1797 {
1799 let _cache_borrow = cache.borrow();
1800 }
1801
1802 self.trader_id = Some(trader_id);
1803 self.clock = Some(clock);
1804 self.cache = Some(cache);
1805
1806 if !self.is_properly_registered() {
1808 anyhow::bail!(
1809 "DataActor {} registration incomplete - validation failed",
1810 self.actor_id
1811 );
1812 }
1813
1814 log::info!("Registered {} with trader {trader_id}", self.actor_id);
1815 Ok(())
1816 }
1817
1818 pub fn register_warning_event(&mut self, event_type: &str) {
1820 self.warning_events.insert(event_type.to_string());
1821 log::debug!("Registered event type '{event_type}' for warning logs")
1822 }
1823
1824 pub fn deregister_warning_event(&mut self, event_type: &str) {
1826 self.warning_events.remove(event_type);
1827 log::debug!("Deregistered event type '{event_type}' from warning logs")
1828 }
1829
1830 fn check_registered(&self) {
1831 assert!(
1832 self.trader_id.is_some(),
1833 "Actor has not been registered with a Trader"
1834 );
1835 }
1836
1837 fn is_properly_registered(&self) -> bool {
1839 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
1840 }
1841
1842 fn send_data_cmd(&self, command: DataCommand) {
1843 if self.config.log_commands {
1844 log::info!("{CMD}{SEND} {command:?}");
1845 }
1846
1847 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
1848 msgbus::send_any(endpoint, command.as_any())
1849 }
1850
1851 fn send_data_req(&self, request: RequestCommand) {
1852 if self.config.log_commands {
1853 log::info!("{REQ}{SEND} {request:?}");
1854 }
1855
1856 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
1859 msgbus::send_any(endpoint, request.as_any())
1860 }
1861
1862 pub fn shutdown_system(&self, reason: Option<String>) {
1868 self.check_registered();
1869
1870 let command = ShutdownSystem::new(
1872 self.trader_id().unwrap(),
1873 self.actor_id.inner(),
1874 reason,
1875 UUID4::new(),
1876 self.timestamp_ns(),
1877 );
1878
1879 let endpoint = "command.system.shutdown".into();
1880 msgbus::send_any(endpoint, command.as_any());
1881 }
1882
1883 fn get_or_create_handler_for_topic<F>(
1886 &mut self,
1887 topic: MStr<Topic>,
1888 create_handler: F,
1889 ) -> ShareableMessageHandler
1890 where
1891 F: FnOnce() -> ShareableMessageHandler,
1892 {
1893 if let Some(existing_handler) = self.topic_handlers.get(&topic) {
1894 existing_handler.clone()
1895 } else {
1896 let new_handler = create_handler();
1897 self.topic_handlers.insert(topic, new_handler.clone());
1898 new_handler
1899 }
1900 }
1901
1902 fn get_handler_for_topic(&self, topic: MStr<Topic>) -> Option<ShareableMessageHandler> {
1903 self.topic_handlers.get(&topic).cloned()
1904 }
1905
1906 pub fn subscribe_data(
1912 &mut self,
1913 handler: ShareableMessageHandler,
1914 data_type: DataType,
1915 client_id: Option<ClientId>,
1916 params: Option<IndexMap<String, String>>,
1917 ) {
1918 if !self.is_properly_registered() {
1919 panic!(
1920 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
1921 self.actor_id,
1922 self.trader_id,
1923 self.clock.is_some(),
1924 self.cache.is_some()
1925 );
1926 }
1927
1928 let topic = get_custom_topic(&data_type);
1929 self.topic_handlers.insert(topic, handler.clone());
1930 msgbus::subscribe_topic(topic, handler, None);
1931
1932 if client_id.is_none() {
1934 return;
1935 }
1936
1937 let command = SubscribeCommand::Data(SubscribeCustomData {
1938 data_type,
1939 client_id,
1940 venue: None,
1941 command_id: UUID4::new(),
1942 ts_init: self.timestamp_ns(),
1943 params,
1944 });
1945
1946 self.send_data_cmd(DataCommand::Subscribe(command));
1947 }
1948
1949 pub fn subscribe_quotes(
1951 &mut self,
1952 topic: MStr<Topic>,
1953 handler: ShareableMessageHandler,
1954 instrument_id: InstrumentId,
1955 client_id: Option<ClientId>,
1956 params: Option<IndexMap<String, String>>,
1957 ) {
1958 self.check_registered();
1959
1960 self.topic_handlers.insert(topic, handler.clone());
1961 msgbus::subscribe_topic(topic, handler, None);
1962
1963 let command = SubscribeCommand::Quotes(SubscribeQuotes {
1964 instrument_id,
1965 client_id,
1966 venue: Some(instrument_id.venue),
1967 command_id: UUID4::new(),
1968 ts_init: self.timestamp_ns(),
1969 params,
1970 });
1971
1972 self.send_data_cmd(DataCommand::Subscribe(command));
1973 }
1974
1975 pub fn subscribe_instruments(
1977 &mut self,
1978 topic: MStr<Topic>,
1979 handler: ShareableMessageHandler,
1980 venue: Venue,
1981 client_id: Option<ClientId>,
1982 params: Option<IndexMap<String, String>>,
1983 ) {
1984 self.check_registered();
1985
1986 self.topic_handlers.insert(topic, handler.clone());
1987 msgbus::subscribe_topic(topic, handler, None);
1988
1989 let command = SubscribeCommand::Instruments(SubscribeInstruments {
1990 client_id,
1991 venue,
1992 command_id: UUID4::new(),
1993 ts_init: self.timestamp_ns(),
1994 params,
1995 });
1996
1997 self.send_data_cmd(DataCommand::Subscribe(command));
1998 }
1999
2000 pub fn subscribe_instrument(
2002 &mut self,
2003 topic: MStr<Topic>,
2004 handler: ShareableMessageHandler,
2005 instrument_id: InstrumentId,
2006 client_id: Option<ClientId>,
2007 params: Option<IndexMap<String, String>>,
2008 ) {
2009 self.check_registered();
2010
2011 self.topic_handlers.insert(topic, handler.clone());
2012 msgbus::subscribe_topic(topic, handler, None);
2013
2014 let command = SubscribeCommand::Instrument(SubscribeInstrument {
2015 instrument_id,
2016 client_id,
2017 venue: Some(instrument_id.venue),
2018 command_id: UUID4::new(),
2019 ts_init: self.timestamp_ns(),
2020 params,
2021 });
2022
2023 self.send_data_cmd(DataCommand::Subscribe(command));
2024 }
2025
2026 #[allow(clippy::too_many_arguments)]
2028 pub fn subscribe_book_deltas(
2029 &mut self,
2030 topic: MStr<Topic>,
2031 handler: ShareableMessageHandler,
2032 instrument_id: InstrumentId,
2033 book_type: BookType,
2034 depth: Option<NonZeroUsize>,
2035 client_id: Option<ClientId>,
2036 managed: bool,
2037 params: Option<IndexMap<String, String>>,
2038 ) {
2039 self.check_registered();
2040
2041 self.topic_handlers.insert(topic, handler.clone());
2042 msgbus::subscribe_topic(topic, handler, None);
2043
2044 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2045 instrument_id,
2046 book_type,
2047 client_id,
2048 venue: Some(instrument_id.venue),
2049 command_id: UUID4::new(),
2050 ts_init: self.timestamp_ns(),
2051 depth,
2052 managed,
2053 params,
2054 });
2055
2056 self.send_data_cmd(DataCommand::Subscribe(command));
2057 }
2058
2059 pub fn subscribe_book_at_interval(
2061 &mut self,
2062 topic: MStr<Topic>,
2063 handler: ShareableMessageHandler,
2064 instrument_id: InstrumentId,
2065 book_type: BookType,
2066 depth: Option<NonZeroUsize>,
2067 interval_ms: NonZeroUsize,
2068 client_id: Option<ClientId>,
2069 params: Option<IndexMap<String, String>>,
2070 ) {
2071 self.check_registered();
2072
2073 self.topic_handlers.insert(topic, handler.clone());
2074 msgbus::subscribe_topic(topic, handler, None);
2075
2076 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2077 instrument_id,
2078 book_type,
2079 client_id,
2080 venue: Some(instrument_id.venue),
2081 command_id: UUID4::new(),
2082 ts_init: self.timestamp_ns(),
2083 depth,
2084 interval_ms,
2085 params,
2086 });
2087
2088 self.send_data_cmd(DataCommand::Subscribe(command));
2089 }
2090
2091 pub fn subscribe_trades(
2093 &mut self,
2094 topic: MStr<Topic>,
2095 handler: ShareableMessageHandler,
2096 instrument_id: InstrumentId,
2097 client_id: Option<ClientId>,
2098 params: Option<IndexMap<String, String>>,
2099 ) {
2100 self.check_registered();
2101
2102 self.topic_handlers.insert(topic, handler.clone());
2103 msgbus::subscribe_topic(topic, handler, None);
2104
2105 let command = SubscribeCommand::Trades(SubscribeTrades {
2106 instrument_id,
2107 client_id,
2108 venue: Some(instrument_id.venue),
2109 command_id: UUID4::new(),
2110 ts_init: self.timestamp_ns(),
2111 params,
2112 });
2113
2114 self.send_data_cmd(DataCommand::Subscribe(command));
2115 }
2116
2117 pub fn subscribe_bars(
2119 &mut self,
2120 topic: MStr<Topic>,
2121 handler: ShareableMessageHandler,
2122 bar_type: BarType,
2123 client_id: Option<ClientId>,
2124 await_partial: bool,
2125 params: Option<IndexMap<String, String>>,
2126 ) {
2127 self.check_registered();
2128
2129 self.topic_handlers.insert(topic, handler.clone());
2130 msgbus::subscribe_topic(topic, handler, None);
2131
2132 let command = SubscribeCommand::Bars(SubscribeBars {
2133 bar_type,
2134 client_id,
2135 venue: Some(bar_type.instrument_id().venue),
2136 command_id: UUID4::new(),
2137 ts_init: self.timestamp_ns(),
2138 await_partial,
2139 params,
2140 });
2141
2142 self.send_data_cmd(DataCommand::Subscribe(command));
2143 }
2144
2145 pub fn subscribe_mark_prices(
2147 &mut self,
2148 topic: MStr<Topic>,
2149 handler: ShareableMessageHandler,
2150 instrument_id: InstrumentId,
2151 client_id: Option<ClientId>,
2152 params: Option<IndexMap<String, String>>,
2153 ) {
2154 self.check_registered();
2155
2156 self.topic_handlers.insert(topic, handler.clone());
2157 msgbus::subscribe_topic(topic, handler, None);
2158
2159 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2160 instrument_id,
2161 client_id,
2162 venue: Some(instrument_id.venue),
2163 command_id: UUID4::new(),
2164 ts_init: self.timestamp_ns(),
2165 params,
2166 });
2167
2168 self.send_data_cmd(DataCommand::Subscribe(command));
2169 }
2170
2171 pub fn subscribe_index_prices(
2173 &mut self,
2174 topic: MStr<Topic>,
2175 handler: ShareableMessageHandler,
2176 instrument_id: InstrumentId,
2177 client_id: Option<ClientId>,
2178 params: Option<IndexMap<String, String>>,
2179 ) {
2180 self.check_registered();
2181
2182 self.topic_handlers.insert(topic, handler.clone());
2183 msgbus::subscribe_topic(topic, handler, None);
2184
2185 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2186 instrument_id,
2187 client_id,
2188 venue: Some(instrument_id.venue),
2189 command_id: UUID4::new(),
2190 ts_init: self.timestamp_ns(),
2191 params,
2192 });
2193
2194 self.send_data_cmd(DataCommand::Subscribe(command));
2195 }
2196
2197 pub fn subscribe_instrument_status(
2199 &mut self,
2200 topic: MStr<Topic>,
2201 handler: ShareableMessageHandler,
2202 instrument_id: InstrumentId,
2203 client_id: Option<ClientId>,
2204 params: Option<IndexMap<String, String>>,
2205 ) {
2206 self.check_registered();
2207
2208 self.topic_handlers.insert(topic, handler.clone());
2209 msgbus::subscribe_topic(topic, handler, None);
2210
2211 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2212 instrument_id,
2213 client_id,
2214 venue: Some(instrument_id.venue),
2215 command_id: UUID4::new(),
2216 ts_init: self.timestamp_ns(),
2217 params,
2218 });
2219
2220 self.send_data_cmd(DataCommand::Subscribe(command));
2221 }
2222
2223 pub fn subscribe_instrument_close(
2225 &mut self,
2226 topic: MStr<Topic>,
2227 handler: ShareableMessageHandler,
2228 instrument_id: InstrumentId,
2229 client_id: Option<ClientId>,
2230 params: Option<IndexMap<String, String>>,
2231 ) {
2232 self.check_registered();
2233
2234 self.topic_handlers.insert(topic, handler.clone());
2235 msgbus::subscribe_topic(topic, handler, None);
2236
2237 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2238 instrument_id,
2239 client_id,
2240 venue: Some(instrument_id.venue),
2241 command_id: UUID4::new(),
2242 ts_init: self.timestamp_ns(),
2243 params,
2244 });
2245
2246 self.send_data_cmd(DataCommand::Subscribe(command));
2247 }
2248
2249 #[cfg(feature = "defi")]
2250 pub fn subscribe_blocks(
2252 &mut self,
2253 topic: MStr<Topic>,
2254 handler: ShareableMessageHandler,
2255 chain: Blockchain,
2256 client_id: Option<ClientId>,
2257 params: Option<IndexMap<String, String>>,
2258 ) {
2259 use crate::messages::defi::{DefiSubscribeCommand, SubscribeBlocks};
2260
2261 self.check_registered();
2262
2263 self.topic_handlers.insert(topic, handler.clone());
2264 msgbus::subscribe_topic(topic, handler, None);
2265
2266 let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
2267 chain,
2268 client_id,
2269 command_id: UUID4::new(),
2270 ts_init: self.timestamp_ns(),
2271 params,
2272 });
2273
2274 self.send_data_cmd(DataCommand::DefiSubscribe(command));
2275 }
2276
2277 #[cfg(feature = "defi")]
2278 pub fn subscribe_pool(
2280 &mut self,
2281 topic: MStr<Topic>,
2282 handler: ShareableMessageHandler,
2283 address: Address,
2284 client_id: Option<ClientId>,
2285 params: Option<IndexMap<String, String>>,
2286 ) {
2287 use crate::messages::defi::{DefiSubscribeCommand, SubscribePool};
2288
2289 self.check_registered();
2290
2291 self.topic_handlers.insert(topic, handler.clone());
2292 msgbus::subscribe_topic(topic, handler, None);
2293
2294 let command = DefiSubscribeCommand::Pool(SubscribePool {
2295 address,
2296 client_id,
2297 command_id: UUID4::new(),
2298 ts_init: self.timestamp_ns(),
2299 params,
2300 });
2301
2302 self.send_data_cmd(DataCommand::DefiSubscribe(command));
2303 }
2304
2305 #[cfg(feature = "defi")]
2306 pub fn subscribe_pool_swaps(
2308 &mut self,
2309 topic: MStr<Topic>,
2310 handler: ShareableMessageHandler,
2311 address: Address,
2312 client_id: Option<ClientId>,
2313 params: Option<IndexMap<String, String>>,
2314 ) {
2315 use crate::messages::defi::{DefiSubscribeCommand, SubscribePoolSwaps};
2316
2317 self.check_registered();
2318
2319 self.topic_handlers.insert(topic, handler.clone());
2320 msgbus::subscribe_topic(topic, handler, None);
2321
2322 let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
2323 address,
2324 client_id,
2325 command_id: UUID4::new(),
2326 ts_init: self.timestamp_ns(),
2327 params,
2328 });
2329
2330 self.send_data_cmd(DataCommand::DefiSubscribe(command));
2331 }
2332
2333 #[cfg(feature = "defi")]
2334 pub fn subscribe_pool_liquidity_updates(
2336 &mut self,
2337 topic: MStr<Topic>,
2338 handler: ShareableMessageHandler,
2339 address: Address,
2340 client_id: Option<ClientId>,
2341 params: Option<IndexMap<String, String>>,
2342 ) {
2343 use crate::messages::defi::{DefiSubscribeCommand, SubscribePoolLiquidityUpdates};
2344
2345 self.check_registered();
2346
2347 self.topic_handlers.insert(topic, handler.clone());
2348 msgbus::subscribe_topic(topic, handler, None);
2349
2350 let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
2351 address,
2352 client_id,
2353 command_id: UUID4::new(),
2354 ts_init: self.timestamp_ns(),
2355 params,
2356 });
2357
2358 self.send_data_cmd(DataCommand::DefiSubscribe(command));
2359 }
2360
2361 pub fn unsubscribe_data(
2363 &self,
2364 data_type: DataType,
2365 client_id: Option<ClientId>,
2366 params: Option<IndexMap<String, String>>,
2367 ) {
2368 self.check_registered();
2369
2370 let topic = get_custom_topic(&data_type);
2371 if let Some(handler) = self.topic_handlers.get(&topic) {
2372 msgbus::unsubscribe_topic(topic, handler.clone());
2373 };
2374
2375 if client_id.is_none() {
2376 return;
2377 }
2378
2379 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2380 data_type,
2381 client_id,
2382 venue: None,
2383 command_id: UUID4::new(),
2384 ts_init: self.timestamp_ns(),
2385 params,
2386 });
2387
2388 self.send_data_cmd(DataCommand::Unsubscribe(command));
2389 }
2390
2391 pub fn unsubscribe_instruments(
2393 &self,
2394 venue: Venue,
2395 client_id: Option<ClientId>,
2396 params: Option<IndexMap<String, String>>,
2397 ) {
2398 self.check_registered();
2399
2400 let topic = get_instruments_topic(venue);
2401 if let Some(handler) = self.topic_handlers.get(&topic) {
2402 msgbus::unsubscribe_topic(topic, handler.clone());
2403 };
2404
2405 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2406 client_id,
2407 venue,
2408 command_id: UUID4::new(),
2409 ts_init: self.timestamp_ns(),
2410 params,
2411 });
2412
2413 self.send_data_cmd(DataCommand::Unsubscribe(command));
2414 }
2415
2416 pub fn unsubscribe_instrument(
2418 &self,
2419 instrument_id: InstrumentId,
2420 client_id: Option<ClientId>,
2421 params: Option<IndexMap<String, String>>,
2422 ) {
2423 self.check_registered();
2424
2425 let topic = get_instrument_topic(instrument_id);
2426 if let Some(handler) = self.topic_handlers.get(&topic) {
2427 msgbus::unsubscribe_topic(topic, handler.clone());
2428 };
2429
2430 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2431 instrument_id,
2432 client_id,
2433 venue: Some(instrument_id.venue),
2434 command_id: UUID4::new(),
2435 ts_init: self.timestamp_ns(),
2436 params,
2437 });
2438
2439 self.send_data_cmd(DataCommand::Unsubscribe(command));
2440 }
2441
2442 pub fn unsubscribe_book_deltas(
2444 &self,
2445 instrument_id: InstrumentId,
2446 client_id: Option<ClientId>,
2447 params: Option<IndexMap<String, String>>,
2448 ) {
2449 self.check_registered();
2450
2451 let topic = get_book_deltas_topic(instrument_id);
2452 if let Some(handler) = self.topic_handlers.get(&topic) {
2453 msgbus::unsubscribe_topic(topic, handler.clone());
2454 };
2455
2456 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2457 instrument_id,
2458 client_id,
2459 venue: Some(instrument_id.venue),
2460 command_id: UUID4::new(),
2461 ts_init: self.timestamp_ns(),
2462 params,
2463 });
2464
2465 self.send_data_cmd(DataCommand::Unsubscribe(command));
2466 }
2467
2468 pub fn unsubscribe_book_at_interval(
2470 &mut self,
2471 instrument_id: InstrumentId,
2472 interval_ms: NonZeroUsize,
2473 client_id: Option<ClientId>,
2474 params: Option<IndexMap<String, String>>,
2475 ) {
2476 self.check_registered();
2477
2478 let topic = get_book_snapshots_topic(instrument_id);
2479 if let Some(handler) = self.topic_handlers.get(&topic) {
2480 msgbus::unsubscribe_topic(topic, handler.clone());
2481 };
2482
2483 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2484 instrument_id,
2485 client_id,
2486 venue: Some(instrument_id.venue),
2487 command_id: UUID4::new(),
2488 ts_init: self.timestamp_ns(),
2489 params,
2490 });
2491
2492 self.send_data_cmd(DataCommand::Unsubscribe(command));
2493 }
2494
2495 pub fn unsubscribe_quotes(
2497 &self,
2498 instrument_id: InstrumentId,
2499 client_id: Option<ClientId>,
2500 params: Option<IndexMap<String, String>>,
2501 ) {
2502 self.check_registered();
2503
2504 let topic = get_quotes_topic(instrument_id);
2505 if let Some(handler) = self.topic_handlers.get(&topic) {
2506 msgbus::unsubscribe_topic(topic, handler.clone());
2507 };
2508
2509 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2510 instrument_id,
2511 client_id,
2512 venue: Some(instrument_id.venue),
2513 command_id: UUID4::new(),
2514 ts_init: self.timestamp_ns(),
2515 params,
2516 });
2517
2518 self.send_data_cmd(DataCommand::Unsubscribe(command));
2519 }
2520
2521 pub fn unsubscribe_trades(
2523 &self,
2524 instrument_id: InstrumentId,
2525 client_id: Option<ClientId>,
2526 params: Option<IndexMap<String, String>>,
2527 ) {
2528 self.check_registered();
2529
2530 let topic = get_trades_topic(instrument_id);
2531 if let Some(handler) = self.topic_handlers.get(&topic) {
2532 msgbus::unsubscribe_topic(topic, handler.clone());
2533 };
2534
2535 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2536 instrument_id,
2537 client_id,
2538 venue: Some(instrument_id.venue),
2539 command_id: UUID4::new(),
2540 ts_init: self.timestamp_ns(),
2541 params,
2542 });
2543
2544 self.send_data_cmd(DataCommand::Unsubscribe(command));
2545 }
2546
2547 pub fn unsubscribe_bars(
2549 &mut self,
2550 bar_type: BarType,
2551 client_id: Option<ClientId>,
2552 params: Option<IndexMap<String, String>>,
2553 ) {
2554 self.check_registered();
2555
2556 let topic = get_bars_topic(bar_type);
2557 if let Some(handler) = self.topic_handlers.get(&topic) {
2558 msgbus::unsubscribe_topic(topic, handler.clone());
2559 };
2560
2561 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2562 bar_type,
2563 client_id,
2564 venue: Some(bar_type.instrument_id().venue),
2565 command_id: UUID4::new(),
2566 ts_init: self.timestamp_ns(),
2567 params,
2568 });
2569
2570 self.send_data_cmd(DataCommand::Unsubscribe(command));
2571 }
2572
2573 pub fn unsubscribe_mark_prices(
2575 &self,
2576 instrument_id: InstrumentId,
2577 client_id: Option<ClientId>,
2578 params: Option<IndexMap<String, String>>,
2579 ) {
2580 self.check_registered();
2581
2582 let topic = get_mark_price_topic(instrument_id);
2583 if let Some(handler) = self.topic_handlers.get(&topic) {
2584 msgbus::unsubscribe_topic(topic, handler.clone());
2585 };
2586
2587 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2588 instrument_id,
2589 client_id,
2590 venue: Some(instrument_id.venue),
2591 command_id: UUID4::new(),
2592 ts_init: self.timestamp_ns(),
2593 params,
2594 });
2595
2596 self.send_data_cmd(DataCommand::Unsubscribe(command));
2597 }
2598
2599 pub fn unsubscribe_index_prices(
2601 &self,
2602 instrument_id: InstrumentId,
2603 client_id: Option<ClientId>,
2604 params: Option<IndexMap<String, String>>,
2605 ) {
2606 self.check_registered();
2607
2608 let topic = get_index_price_topic(instrument_id);
2609 if let Some(handler) = self.topic_handlers.get(&topic) {
2610 msgbus::unsubscribe_topic(topic, handler.clone());
2611 };
2612
2613 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2614 instrument_id,
2615 client_id,
2616 venue: Some(instrument_id.venue),
2617 command_id: UUID4::new(),
2618 ts_init: self.timestamp_ns(),
2619 params,
2620 });
2621
2622 self.send_data_cmd(DataCommand::Unsubscribe(command));
2623 }
2624
2625 pub fn unsubscribe_instrument_status(
2627 &self,
2628 instrument_id: InstrumentId,
2629 client_id: Option<ClientId>,
2630 params: Option<IndexMap<String, String>>,
2631 ) {
2632 self.check_registered();
2633
2634 let topic = get_instrument_status_topic(instrument_id);
2635 if let Some(handler) = self.topic_handlers.get(&topic) {
2636 msgbus::unsubscribe_topic(topic, handler.clone());
2637 };
2638
2639 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
2640 instrument_id,
2641 client_id,
2642 venue: Some(instrument_id.venue),
2643 command_id: UUID4::new(),
2644 ts_init: self.timestamp_ns(),
2645 params,
2646 });
2647
2648 self.send_data_cmd(DataCommand::Unsubscribe(command));
2649 }
2650
2651 pub fn unsubscribe_instrument_close(
2653 &self,
2654 instrument_id: InstrumentId,
2655 client_id: Option<ClientId>,
2656 params: Option<IndexMap<String, String>>,
2657 ) {
2658 self.check_registered();
2659
2660 let topic = get_instrument_close_topic(instrument_id);
2661 if let Some(handler) = self.topic_handlers.get(&topic) {
2662 msgbus::unsubscribe_topic(topic, handler.clone());
2663 };
2664
2665 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
2666 instrument_id,
2667 client_id,
2668 venue: Some(instrument_id.venue),
2669 command_id: UUID4::new(),
2670 ts_init: self.timestamp_ns(),
2671 params,
2672 });
2673
2674 self.send_data_cmd(DataCommand::Unsubscribe(command));
2675 }
2676
2677 #[cfg(feature = "defi")]
2678 pub fn unsubscribe_blocks(
2680 &mut self,
2681 chain: Blockchain,
2682 client_id: Option<ClientId>,
2683 params: Option<IndexMap<String, String>>,
2684 ) {
2685 use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribeBlocks};
2686
2687 self.check_registered();
2688
2689 let topic = get_defi_blocks_topic(chain);
2690 if let Some(handler) = self.topic_handlers.get(&topic) {
2691 msgbus::unsubscribe_topic(topic, handler.clone());
2692 };
2693
2694 let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
2695 chain,
2696 client_id,
2697 command_id: UUID4::new(),
2698 ts_init: self.timestamp_ns(),
2699 params,
2700 });
2701
2702 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2703 }
2704
2705 #[cfg(feature = "defi")]
2706 pub fn unsubscribe_pool(
2708 &mut self,
2709 address: Address,
2710 client_id: Option<ClientId>,
2711 params: Option<IndexMap<String, String>>,
2712 ) {
2713 use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePool};
2714
2715 self.check_registered();
2716
2717 let topic = get_defi_pool_topic(address);
2718 if let Some(handler) = self.topic_handlers.get(&topic) {
2719 msgbus::unsubscribe_topic(topic, handler.clone());
2720 };
2721
2722 let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
2723 address,
2724 client_id,
2725 command_id: UUID4::new(),
2726 ts_init: self.timestamp_ns(),
2727 params,
2728 });
2729
2730 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2731 }
2732
2733 #[cfg(feature = "defi")]
2734 pub fn unsubscribe_pool_swaps(
2736 &mut self,
2737 address: Address,
2738 client_id: Option<ClientId>,
2739 params: Option<IndexMap<String, String>>,
2740 ) {
2741 use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePoolSwaps};
2742
2743 self.check_registered();
2744
2745 let topic = get_defi_pool_swaps_topic(address);
2746 if let Some(handler) = self.topic_handlers.get(&topic) {
2747 msgbus::unsubscribe_topic(topic, handler.clone());
2748 };
2749
2750 let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
2751 address,
2752 client_id,
2753 command_id: UUID4::new(),
2754 ts_init: self.timestamp_ns(),
2755 params,
2756 });
2757
2758 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2759 }
2760
2761 #[cfg(feature = "defi")]
2762 pub fn unsubscribe_pool_liquidity_updates(
2764 &mut self,
2765 address: Address,
2766 client_id: Option<ClientId>,
2767 params: Option<IndexMap<String, String>>,
2768 ) {
2769 use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePoolLiquidityUpdates};
2770
2771 self.check_registered();
2772
2773 let topic = get_defi_liquidity_topic(address);
2774 if let Some(handler) = self.topic_handlers.get(&topic) {
2775 msgbus::unsubscribe_topic(topic, handler.clone());
2776 };
2777
2778 let command =
2779 DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
2780 address,
2781 client_id,
2782 command_id: UUID4::new(),
2783 ts_init: self.timestamp_ns(),
2784 params,
2785 });
2786
2787 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2788 }
2789
2790 pub fn request_data(
2796 &self,
2797 data_type: DataType,
2798 client_id: ClientId,
2799 start: Option<DateTime<Utc>>,
2800 end: Option<DateTime<Utc>>,
2801 limit: Option<NonZeroUsize>,
2802 params: Option<IndexMap<String, String>>,
2803 handler: ShareableMessageHandler,
2804 ) -> anyhow::Result<UUID4> {
2805 self.check_registered();
2806
2807 let now = self.clock_ref().utc_now();
2808 check_timestamps(now, start, end)?;
2809
2810 let request_id = UUID4::new();
2811 let command = RequestCommand::Data(RequestCustomData {
2812 client_id,
2813 data_type,
2814 request_id,
2815 ts_init: self.timestamp_ns(),
2816 params,
2817 });
2818
2819 let msgbus = get_message_bus()
2820 .borrow_mut()
2821 .register_response_handler(command.request_id(), handler);
2822
2823 self.send_data_cmd(DataCommand::Request(command));
2824
2825 Ok(request_id)
2826 }
2827
2828 pub fn request_instrument(
2834 &self,
2835 instrument_id: InstrumentId,
2836 start: Option<DateTime<Utc>>,
2837 end: Option<DateTime<Utc>>,
2838 client_id: Option<ClientId>,
2839 params: Option<IndexMap<String, String>>,
2840 handler: ShareableMessageHandler,
2841 ) -> anyhow::Result<UUID4> {
2842 self.check_registered();
2843
2844 let now = self.clock_ref().utc_now();
2845 check_timestamps(now, start, end)?;
2846
2847 let request_id = UUID4::new();
2848 let command = RequestCommand::Instrument(RequestInstrument {
2849 instrument_id,
2850 start,
2851 end,
2852 client_id,
2853 request_id,
2854 ts_init: now.into(),
2855 params,
2856 });
2857
2858 let msgbus = get_message_bus()
2859 .borrow_mut()
2860 .register_response_handler(command.request_id(), handler);
2861
2862 self.send_data_cmd(DataCommand::Request(command));
2863
2864 Ok(request_id)
2865 }
2866
2867 pub fn request_instruments(
2873 &self,
2874 venue: Option<Venue>,
2875 start: Option<DateTime<Utc>>,
2876 end: Option<DateTime<Utc>>,
2877 client_id: Option<ClientId>,
2878 params: Option<IndexMap<String, String>>,
2879 handler: ShareableMessageHandler,
2880 ) -> anyhow::Result<UUID4> {
2881 self.check_registered();
2882
2883 let now = self.clock_ref().utc_now();
2884 check_timestamps(now, start, end)?;
2885
2886 let request_id = UUID4::new();
2887 let command = RequestCommand::Instruments(RequestInstruments {
2888 venue,
2889 start,
2890 end,
2891 client_id,
2892 request_id,
2893 ts_init: now.into(),
2894 params,
2895 });
2896
2897 let msgbus = get_message_bus()
2898 .borrow_mut()
2899 .register_response_handler(command.request_id(), handler);
2900
2901 self.send_data_cmd(DataCommand::Request(command));
2902
2903 Ok(request_id)
2904 }
2905
2906 pub fn request_book_snapshot(
2912 &self,
2913 instrument_id: InstrumentId,
2914 depth: Option<NonZeroUsize>,
2915 client_id: Option<ClientId>,
2916 params: Option<IndexMap<String, String>>,
2917 handler: ShareableMessageHandler,
2918 ) -> anyhow::Result<UUID4> {
2919 self.check_registered();
2920
2921 let request_id = UUID4::new();
2922 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
2923 instrument_id,
2924 depth,
2925 client_id,
2926 request_id,
2927 ts_init: self.timestamp_ns(),
2928 params,
2929 });
2930
2931 let msgbus = get_message_bus()
2932 .borrow_mut()
2933 .register_response_handler(command.request_id(), handler);
2934
2935 self.send_data_cmd(DataCommand::Request(command));
2936
2937 Ok(request_id)
2938 }
2939
2940 pub fn request_quotes(
2946 &self,
2947 instrument_id: InstrumentId,
2948 start: Option<DateTime<Utc>>,
2949 end: Option<DateTime<Utc>>,
2950 limit: Option<NonZeroUsize>,
2951 client_id: Option<ClientId>,
2952 params: Option<IndexMap<String, String>>,
2953 handler: ShareableMessageHandler,
2954 ) -> anyhow::Result<UUID4> {
2955 self.check_registered();
2956
2957 let now = self.clock_ref().utc_now();
2958 check_timestamps(now, start, end)?;
2959
2960 let request_id = UUID4::new();
2961 let command = RequestCommand::Quotes(RequestQuotes {
2962 instrument_id,
2963 start,
2964 end,
2965 limit,
2966 client_id,
2967 request_id,
2968 ts_init: now.into(),
2969 params,
2970 });
2971
2972 let msgbus = get_message_bus()
2973 .borrow_mut()
2974 .register_response_handler(command.request_id(), handler);
2975
2976 self.send_data_cmd(DataCommand::Request(command));
2977
2978 Ok(request_id)
2979 }
2980
2981 pub fn request_trades(
2987 &self,
2988 instrument_id: InstrumentId,
2989 start: Option<DateTime<Utc>>,
2990 end: Option<DateTime<Utc>>,
2991 limit: Option<NonZeroUsize>,
2992 client_id: Option<ClientId>,
2993 params: Option<IndexMap<String, String>>,
2994 handler: ShareableMessageHandler,
2995 ) -> anyhow::Result<UUID4> {
2996 self.check_registered();
2997
2998 let now = self.clock_ref().utc_now();
2999 check_timestamps(now, start, end)?;
3000
3001 let request_id = UUID4::new();
3002 let command = RequestCommand::Trades(RequestTrades {
3003 instrument_id,
3004 start,
3005 end,
3006 limit,
3007 client_id,
3008 request_id,
3009 ts_init: now.into(),
3010 params,
3011 });
3012
3013 let msgbus = get_message_bus()
3014 .borrow_mut()
3015 .register_response_handler(command.request_id(), handler);
3016
3017 self.send_data_cmd(DataCommand::Request(command));
3018
3019 Ok(request_id)
3020 }
3021
3022 pub fn request_bars(
3028 &self,
3029 bar_type: BarType,
3030 start: Option<DateTime<Utc>>,
3031 end: Option<DateTime<Utc>>,
3032 limit: Option<NonZeroUsize>,
3033 client_id: Option<ClientId>,
3034 params: Option<IndexMap<String, String>>,
3035 handler: ShareableMessageHandler,
3036 ) -> anyhow::Result<UUID4> {
3037 self.check_registered();
3038
3039 let now = self.clock_ref().utc_now();
3040 check_timestamps(now, start, end)?;
3041
3042 let request_id = UUID4::new();
3043 let command = RequestCommand::Bars(RequestBars {
3044 bar_type,
3045 start,
3046 end,
3047 limit,
3048 client_id,
3049 request_id,
3050 ts_init: now.into(),
3051 params,
3052 });
3053
3054 let msgbus = get_message_bus()
3055 .borrow_mut()
3056 .register_response_handler(command.request_id(), handler);
3057
3058 self.send_data_cmd(DataCommand::Request(command));
3059
3060 Ok(request_id)
3061 }
3062}
3063
3064fn check_timestamps(
3065 now: DateTime<Utc>,
3066 start: Option<DateTime<Utc>>,
3067 end: Option<DateTime<Utc>>,
3068) -> anyhow::Result<()> {
3069 if let Some(start) = start {
3070 check_predicate_true(start <= now, "start was > now")?
3071 }
3072 if let Some(end) = end {
3073 check_predicate_true(end <= now, "end was > now")?
3074 }
3075
3076 if let (Some(start), Some(end)) = (start, end) {
3077 check_predicate_true(start < end, "start was >= end")?
3078 }
3079
3080 Ok(())
3081}
3082
3083fn log_error(e: &anyhow::Error) {
3084 log::error!("{e}");
3085}
3086
3087fn log_not_running<T>(msg: &T)
3088where
3089 T: Debug,
3090{
3091 log::warn!("Received message when not running - skipping {msg:?}");
3093}
3094
3095fn log_received<T>(msg: &T)
3096where
3097 T: Debug,
3098{
3099 log::debug!("{RECV} {msg:?}");
3100}