nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19#![allow(unused_imports)]
20#![allow(clippy::too_many_arguments)]
21
22use std::{
23    any::{Any, TypeId},
24    cell::{Ref, RefCell, RefMut, UnsafeCell},
25    collections::HashSet,
26    fmt::Debug,
27    num::NonZeroUsize,
28    ops::{Deref, DerefMut},
29    rc::Rc,
30    sync::Arc,
31};
32
33use ahash::{AHashMap, AHashSet};
34#[cfg(feature = "defi")]
35use alloy_primitives::Address;
36use chrono::{DateTime, Utc};
37use indexmap::IndexMap;
38use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
39#[cfg(feature = "defi")]
40use nautilus_model::defi::{Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap};
41use nautilus_model::{
42    data::{
43        Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
44        OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
45    },
46    enums::BookType,
47    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
48    instruments::{Instrument, InstrumentAny},
49    orderbook::OrderBook,
50};
51use ustr::Ustr;
52use uuid::Uuid;
53
54#[cfg(feature = "indicators")]
55use super::indicators::Indicators;
56use super::{
57    Actor,
58    registry::{get_actor, get_actor_unchecked, try_get_actor_unchecked},
59};
60#[cfg(feature = "defi")]
61use crate::msgbus::switchboard::{
62    get_defi_blocks_topic, get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
63};
64use crate::{
65    cache::Cache,
66    clock::Clock,
67    component::Component,
68    enums::{ComponentState, ComponentTrigger},
69    logging::{CMD, RECV, REQ, SEND},
70    messages::{
71        data::{
72            BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
73            InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
74            RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
75            SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
76            SubscribeCustomData, SubscribeIndexPrices, SubscribeInstrument,
77            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
78            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
79            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
80            UnsubscribeCustomData, UnsubscribeIndexPrices, UnsubscribeInstrument,
81            UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
82            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
83        },
84        system::ShutdownSystem,
85    },
86    msgbus::{
87        self, MStr, Pattern, Topic, get_message_bus,
88        handler::{MessageHandler, ShareableMessageHandler, TypedMessageHandler},
89        switchboard::{
90            self, MessagingSwitchboard, get_bars_topic, get_book_deltas_topic,
91            get_book_snapshots_topic, get_custom_topic, get_index_price_topic,
92            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
93            get_instruments_topic, get_mark_price_topic, get_quotes_topic, get_trades_topic,
94        },
95    },
96    signal::Signal,
97    timer::{TimeEvent, TimeEventCallback},
98};
99
100/// Common configuration for [`DataActor`] based components.
101#[derive(Debug, Clone)]
102pub struct DataActorConfig {
103    /// The custom identifier for the Actor.
104    pub actor_id: Option<ActorId>,
105    /// If events should be logged.
106    pub log_events: bool,
107    /// If commands should be logged.
108    pub log_commands: bool,
109}
110
111impl Default for DataActorConfig {
112    fn default() -> Self {
113        Self {
114            actor_id: None,
115            log_events: true,
116            log_commands: true,
117        }
118    }
119}
120
121type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; // TODO: TBD
122
123pub trait DataActor:
124    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
125{
126    /// Actions to be performed when the actor state is saved.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if saving the actor state fails.
131    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
132        Ok(IndexMap::new())
133    }
134
135    /// Actions to be performed when the actor state is loaded.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if loading the actor state fails.
140    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
141        Ok(())
142    }
143
144    /// Actions to be performed on start.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if starting the actor fails.
149    fn on_start(&mut self) -> anyhow::Result<()> {
150        log::warn!(
151            "The `on_start` handler was called when not overridden, \
152            it's expected that any actions required when starting the actor \
153            occur here, such as subscribing/requesting data"
154        );
155        Ok(())
156    }
157
158    /// Actions to be performed on stop.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if stopping the actor fails.
163    fn on_stop(&mut self) -> anyhow::Result<()> {
164        log::warn!(
165            "The `on_stop` handler was called when not overridden, \
166            it's expected that any actions required when stopping the actor \
167            occur here, such as unsubscribing from data",
168        );
169        Ok(())
170    }
171
172    /// Actions to be performed on resume.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if resuming the actor fails.
177    fn on_resume(&mut self) -> anyhow::Result<()> {
178        log::warn!(
179            "The `on_resume` handler was called when not overridden, \
180            it's expected that any actions required when resuming the actor \
181            following a stop occur here"
182        );
183        Ok(())
184    }
185
186    /// Actions to be performed on reset.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if resetting the actor fails.
191    fn on_reset(&mut self) -> anyhow::Result<()> {
192        log::warn!(
193            "The `on_reset` handler was called when not overridden, \
194            it's expected that any actions required when resetting the actor \
195            occur here, such as resetting indicators and other state"
196        );
197        Ok(())
198    }
199
200    /// Actions to be performed on dispose.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if disposing the actor fails.
205    fn on_dispose(&mut self) -> anyhow::Result<()> {
206        Ok(())
207    }
208
209    /// Actions to be performed on degrade.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if degrading the actor fails.
214    fn on_degrade(&mut self) -> anyhow::Result<()> {
215        Ok(())
216    }
217
218    /// Actions to be performed on fault.
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if faulting the actor fails.
223    fn on_fault(&mut self) -> anyhow::Result<()> {
224        Ok(())
225    }
226
227    /// Actions to be performed when receiving a time event.
228    ///
229    /// # Errors
230    ///
231    /// Returns an error if handling the time event fails.
232    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
233        Ok(())
234    }
235
236    /// Actions to be performed when receiving custom data.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if handling the data fails.
241    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
242        Ok(())
243    }
244
245    /// Actions to be performed when receiving a signal.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if handling the signal fails.
250    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
251        Ok(())
252    }
253
254    /// Actions to be performed when receiving an instrument.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if handling the instrument fails.
259    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
260        Ok(())
261    }
262
263    /// Actions to be performed when receiving order book deltas.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if handling the book deltas fails.
268    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
269        Ok(())
270    }
271
272    /// Actions to be performed when receiving an order book.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if handling the book fails.
277    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
278        Ok(())
279    }
280
281    /// Actions to be performed when receiving a quote.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if handling the quote fails.
286    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
287        Ok(())
288    }
289
290    /// Actions to be performed when receiving a trade.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if handling the trade fails.
295    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
296        Ok(())
297    }
298
299    /// Actions to be performed when receiving a bar.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if handling the bar fails.
304    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
305        Ok(())
306    }
307
308    /// Actions to be performed when receiving a mark price update.
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if handling the mark price update fails.
313    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
314        Ok(())
315    }
316
317    /// Actions to be performed when receiving an index price update.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if handling the index price update fails.
322    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
323        Ok(())
324    }
325
326    /// Actions to be performed when receiving an instrument status update.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if handling the instrument status update fails.
331    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
332        Ok(())
333    }
334
335    /// Actions to be performed when receiving an instrument close update.
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if handling the instrument close update fails.
340    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
341        Ok(())
342    }
343
344    #[cfg(feature = "defi")]
345    /// Actions to be performed when receiving a block.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if handling the block fails.
350    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
351        Ok(())
352    }
353
354    #[cfg(feature = "defi")]
355    /// Actions to be performed when receiving a pool.
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if handling the pool fails.
360    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
361        Ok(())
362    }
363
364    #[cfg(feature = "defi")]
365    /// Actions to be performed when receiving a pool swap.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if handling the pool swap fails.
370    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
371        Ok(())
372    }
373
374    #[cfg(feature = "defi")]
375    /// Actions to be performed when receiving a pool liquidity update.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if handling the pool liquidity update fails.
380    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
381        Ok(())
382    }
383
384    /// Actions to be performed when receiving historical data.
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if handling the historical data fails.
389    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
390        Ok(())
391    }
392
393    /// Actions to be performed when receiving historical quotes.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if handling the historical quotes fails.
398    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
399        Ok(())
400    }
401
402    /// Actions to be performed when receiving historical trades.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if handling the historical trades fails.
407    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
408        Ok(())
409    }
410
411    /// Actions to be performed when receiving historical bars.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if handling the historical bars fails.
416    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
417        Ok(())
418    }
419
420    /// Actions to be performed when receiving historical mark prices.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if handling the historical mark prices fails.
425    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
426        Ok(())
427    }
428
429    /// Actions to be performed when receiving historical index prices.
430    ///
431    /// # Errors
432    ///
433    /// Returns an error if handling the historical index prices fails.
434    fn on_historical_index_prices(
435        &mut self,
436        index_prices: &[IndexPriceUpdate],
437    ) -> anyhow::Result<()> {
438        Ok(())
439    }
440
441    /// Handles a received time event.
442    fn handle_time_event(&mut self, event: &TimeEvent) {
443        log_received(&event);
444
445        if let Err(e) = DataActor::on_time_event(self, event) {
446            log_error(&e);
447        }
448    }
449
450    /// Handles a received custom data point.
451    fn handle_data(&mut self, data: &dyn Any) {
452        log_received(&data);
453
454        if self.not_running() {
455            log_not_running(&data);
456            return;
457        }
458
459        if let Err(e) = self.on_data(data) {
460            log_error(&e);
461        }
462    }
463
464    /// Handles a received signal.
465    fn handle_signal(&mut self, signal: &Signal) {
466        log_received(&signal);
467
468        if self.not_running() {
469            log_not_running(&signal);
470            return;
471        }
472
473        if let Err(e) = self.on_signal(signal) {
474            log_error(&e);
475        }
476    }
477
478    /// Handles a received instrument.
479    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
480        log_received(&instrument);
481
482        if self.not_running() {
483            log_not_running(&instrument);
484            return;
485        }
486
487        if let Err(e) = self.on_instrument(instrument) {
488            log_error(&e);
489        }
490    }
491
492    /// Handles received order book deltas.
493    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
494        log_received(&deltas);
495
496        if self.not_running() {
497            log_not_running(&deltas);
498            return;
499        }
500
501        if let Err(e) = self.on_book_deltas(deltas) {
502            log_error(&e);
503        }
504    }
505
506    /// Handles a received order book reference.
507    fn handle_book(&mut self, book: &OrderBook) {
508        log_received(&book);
509
510        if self.not_running() {
511            log_not_running(&book);
512            return;
513        }
514
515        if let Err(e) = self.on_book(book) {
516            log_error(&e);
517        };
518    }
519
520    /// Handles a received quote.
521    fn handle_quote(&mut self, quote: &QuoteTick) {
522        log_received(&quote);
523
524        if self.not_running() {
525            log_not_running(&quote);
526            return;
527        }
528
529        if let Err(e) = self.on_quote(quote) {
530            log_error(&e);
531        }
532    }
533
534    /// Handles a received trade.
535    fn handle_trade(&mut self, trade: &TradeTick) {
536        log_received(&trade);
537
538        if self.not_running() {
539            log_not_running(&trade);
540            return;
541        }
542
543        if let Err(e) = self.on_trade(trade) {
544            log_error(&e);
545        }
546    }
547
548    /// Handles a receiving bar.
549    fn handle_bar(&mut self, bar: &Bar) {
550        log_received(&bar);
551
552        if self.not_running() {
553            log_not_running(&bar);
554            return;
555        }
556
557        if let Err(e) = self.on_bar(bar) {
558            log_error(&e);
559        }
560    }
561
562    /// Handles a received mark price update.
563    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
564        log_received(&mark_price);
565
566        if self.not_running() {
567            log_not_running(&mark_price);
568            return;
569        }
570
571        if let Err(e) = self.on_mark_price(mark_price) {
572            log_error(&e);
573        }
574    }
575
576    /// Handles a received index price update.
577    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
578        log_received(&index_price);
579
580        if self.not_running() {
581            log_not_running(&index_price);
582            return;
583        }
584
585        if let Err(e) = self.on_index_price(index_price) {
586            log_error(&e);
587        }
588    }
589
590    /// Handles a received instrument status.
591    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
592        log_received(&status);
593
594        if self.not_running() {
595            log_not_running(&status);
596            return;
597        }
598
599        if let Err(e) = self.on_instrument_status(status) {
600            log_error(&e);
601        }
602    }
603
604    /// Handles a received instrument close.
605    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
606        log_received(&close);
607
608        if self.not_running() {
609            log_not_running(&close);
610            return;
611        }
612
613        if let Err(e) = self.on_instrument_close(close) {
614            log_error(&e);
615        }
616    }
617
618    #[cfg(feature = "defi")]
619    /// Handles a received block.
620    fn handle_block(&mut self, block: &Block) {
621        log_received(&block);
622
623        if self.not_running() {
624            log_not_running(&block);
625            return;
626        }
627
628        if let Err(e) = self.on_block(block) {
629            log_error(&e);
630        }
631    }
632
633    #[cfg(feature = "defi")]
634    /// Handles a received pool definition update.
635    fn handle_pool(&mut self, pool: &Pool) {
636        log_received(&pool);
637
638        if self.not_running() {
639            log_not_running(&pool);
640            return;
641        }
642
643        if let Err(e) = self.on_pool(pool) {
644            log_error(&e);
645        }
646    }
647
648    #[cfg(feature = "defi")]
649    /// Handles a received pool swap.
650    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
651        log_received(&swap);
652
653        if self.not_running() {
654            log_not_running(&swap);
655            return;
656        }
657
658        if let Err(e) = self.on_pool_swap(swap) {
659            log_error(&e);
660        }
661    }
662
663    #[cfg(feature = "defi")]
664    /// Handles a received pool liquidity update.
665    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
666        log_received(&update);
667
668        if self.not_running() {
669            log_not_running(&update);
670            return;
671        }
672
673        if let Err(e) = self.on_pool_liquidity_update(update) {
674            log_error(&e);
675        }
676    }
677
678    /// Handles received historical data.
679    fn handle_historical_data(&mut self, data: &dyn Any) {
680        log_received(&data);
681
682        if let Err(e) = self.on_historical_data(data) {
683            log_error(&e);
684        }
685    }
686
687    /// Handles a data response.
688    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
689        log_received(&resp);
690
691        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
692            log_error(&e);
693        }
694    }
695
696    /// Handles an instrument response.
697    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
698        log_received(&resp);
699
700        if let Err(e) = self.on_instrument(&resp.data) {
701            log_error(&e);
702        }
703    }
704
705    /// Handles an instruments response.
706    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
707        log_received(&resp);
708
709        for inst in &resp.data {
710            if let Err(e) = self.on_instrument(inst) {
711                log_error(&e);
712            }
713        }
714    }
715
716    /// Handles a book response.
717    fn handle_book_response(&mut self, resp: &BookResponse) {
718        log_received(&resp);
719
720        if let Err(e) = self.on_book(&resp.data) {
721            log_error(&e);
722        }
723    }
724
725    /// Handles a quotes response.
726    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
727        log_received(&resp);
728
729        if let Err(e) = self.on_historical_quotes(&resp.data) {
730            log_error(&e);
731        }
732    }
733
734    /// Handles a trades response.
735    fn handle_trades_response(&mut self, resp: &TradesResponse) {
736        log_received(&resp);
737
738        if let Err(e) = self.on_historical_trades(&resp.data) {
739            log_error(&e);
740        }
741    }
742
743    /// Handles a bars response.
744    fn handle_bars_response(&mut self, resp: &BarsResponse) {
745        log_received(&resp);
746
747        if let Err(e) = self.on_historical_bars(&resp.data) {
748            log_error(&e);
749        }
750    }
751
752    /// Subscribe to streaming `data_type` data.
753    fn subscribe_data(
754        &mut self,
755        data_type: DataType,
756        client_id: Option<ClientId>,
757        params: Option<IndexMap<String, String>>,
758    ) where
759        Self: 'static + Debug + Sized,
760    {
761        let actor_id = self.actor_id().inner();
762        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
763            move |data: &dyn Any| {
764                get_actor_unchecked::<Self>(&actor_id).handle_data(data);
765            },
766        )));
767
768        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
769    }
770
771    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
772    fn subscribe_quotes(
773        &mut self,
774        instrument_id: InstrumentId,
775        client_id: Option<ClientId>,
776        params: Option<IndexMap<String, String>>,
777    ) where
778        Self: 'static + Debug + Sized,
779    {
780        let actor_id = self.actor_id().inner();
781        let topic = get_quotes_topic(instrument_id);
782
783        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
784            move |quote: &QuoteTick| {
785                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
786                    actor.handle_quote(quote);
787                } else {
788                    log::error!("Actor {actor_id} not found for quote handling");
789                }
790            },
791        )));
792
793        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
794    }
795
796    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
797    fn subscribe_instruments(
798        &mut self,
799        venue: Venue,
800        client_id: Option<ClientId>,
801        params: Option<IndexMap<String, String>>,
802    ) where
803        Self: 'static + Debug + Sized,
804    {
805        let actor_id = self.actor_id().inner();
806        let topic = get_instruments_topic(venue);
807
808        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
809            move |instrument: &InstrumentAny| {
810                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
811                    actor.handle_instrument(instrument);
812                } else {
813                    log::error!("Actor {actor_id} not found for instruments handling");
814                }
815            },
816        )));
817
818        DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
819    }
820
821    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
822    fn subscribe_instrument(
823        &mut self,
824        instrument_id: InstrumentId,
825        client_id: Option<ClientId>,
826        params: Option<IndexMap<String, String>>,
827    ) where
828        Self: 'static + Debug + Sized,
829    {
830        let actor_id = self.actor_id().inner();
831        let topic = get_instrument_topic(instrument_id);
832
833        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
834            move |instrument: &InstrumentAny| {
835                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
836                    actor.handle_instrument(instrument);
837                } else {
838                    log::error!("Actor {actor_id} not found for instrument handling");
839                }
840            },
841        )));
842
843        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
844    }
845
846    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
847    fn subscribe_book_deltas(
848        &mut self,
849        instrument_id: InstrumentId,
850        book_type: BookType,
851        depth: Option<NonZeroUsize>,
852        client_id: Option<ClientId>,
853        managed: bool,
854        params: Option<IndexMap<String, String>>,
855    ) where
856        Self: 'static + Debug + Sized,
857    {
858        let actor_id = self.actor_id().inner();
859        let topic = get_book_deltas_topic(instrument_id);
860
861        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
862            move |deltas: &OrderBookDeltas| {
863                get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
864            },
865        )));
866
867        DataActorCore::subscribe_book_deltas(
868            self,
869            topic,
870            handler,
871            instrument_id,
872            book_type,
873            depth,
874            client_id,
875            managed,
876            params,
877        );
878    }
879
880    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
881    fn subscribe_book_at_interval(
882        &mut self,
883        instrument_id: InstrumentId,
884        book_type: BookType,
885        depth: Option<NonZeroUsize>,
886        interval_ms: NonZeroUsize,
887        client_id: Option<ClientId>,
888        params: Option<IndexMap<String, String>>,
889    ) where
890        Self: 'static + Debug + Sized,
891    {
892        let actor_id = self.actor_id().inner();
893        let topic = get_book_snapshots_topic(instrument_id);
894
895        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
896            move |book: &OrderBook| {
897                get_actor_unchecked::<Self>(&actor_id).handle_book(book);
898            },
899        )));
900
901        DataActorCore::subscribe_book_at_interval(
902            self,
903            topic,
904            handler,
905            instrument_id,
906            book_type,
907            depth,
908            interval_ms,
909            client_id,
910            params,
911        );
912    }
913
914    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
915    fn subscribe_trades(
916        &mut self,
917        instrument_id: InstrumentId,
918        client_id: Option<ClientId>,
919        params: Option<IndexMap<String, String>>,
920    ) where
921        Self: 'static + Debug + Sized,
922    {
923        let actor_id = self.actor_id().inner();
924        let topic = get_trades_topic(instrument_id);
925
926        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
927            move |trade: &TradeTick| {
928                get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
929            },
930        )));
931
932        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
933    }
934
935    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
936    fn subscribe_bars(
937        &mut self,
938        bar_type: BarType,
939        client_id: Option<ClientId>,
940        await_partial: bool,
941        params: Option<IndexMap<String, String>>,
942    ) where
943        Self: 'static + Debug + Sized,
944    {
945        let actor_id = self.actor_id().inner();
946        let topic = get_bars_topic(bar_type);
947
948        let handler =
949            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
950                get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
951            })));
952
953        DataActorCore::subscribe_bars(
954            self,
955            topic,
956            handler,
957            bar_type,
958            client_id,
959            await_partial,
960            params,
961        );
962    }
963
964    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
965    fn subscribe_mark_prices(
966        &mut self,
967        instrument_id: InstrumentId,
968        client_id: Option<ClientId>,
969        params: Option<IndexMap<String, String>>,
970    ) where
971        Self: 'static + Debug + Sized,
972    {
973        let actor_id = self.actor_id().inner();
974        let topic = get_mark_price_topic(instrument_id);
975
976        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
977            move |mark_price: &MarkPriceUpdate| {
978                get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
979            },
980        )));
981
982        DataActorCore::subscribe_mark_prices(
983            self,
984            topic,
985            handler,
986            instrument_id,
987            client_id,
988            params,
989        );
990    }
991
992    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
993    fn subscribe_index_prices(
994        &mut self,
995        instrument_id: InstrumentId,
996        client_id: Option<ClientId>,
997        params: Option<IndexMap<String, String>>,
998    ) where
999        Self: 'static + Debug + Sized,
1000    {
1001        let actor_id = self.actor_id().inner();
1002        let topic = get_index_price_topic(instrument_id);
1003
1004        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1005            move |index_price: &IndexPriceUpdate| {
1006                get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1007            },
1008        )));
1009
1010        DataActorCore::subscribe_index_prices(
1011            self,
1012            topic,
1013            handler,
1014            instrument_id,
1015            client_id,
1016            params,
1017        );
1018    }
1019
1020    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1021    fn subscribe_instrument_status(
1022        &mut self,
1023        instrument_id: InstrumentId,
1024        client_id: Option<ClientId>,
1025        params: Option<IndexMap<String, String>>,
1026    ) where
1027        Self: 'static + Debug + Sized,
1028    {
1029        let actor_id = self.actor_id().inner();
1030        let topic = get_instrument_status_topic(instrument_id);
1031
1032        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1033            move |status: &InstrumentStatus| {
1034                get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1035            },
1036        )));
1037
1038        DataActorCore::subscribe_instrument_status(
1039            self,
1040            topic,
1041            handler,
1042            instrument_id,
1043            client_id,
1044            params,
1045        );
1046    }
1047
1048    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1049    fn subscribe_instrument_close(
1050        &mut self,
1051        instrument_id: InstrumentId,
1052        client_id: Option<ClientId>,
1053        params: Option<IndexMap<String, String>>,
1054    ) where
1055        Self: 'static + Debug + Sized,
1056    {
1057        let actor_id = self.actor_id().inner();
1058        let topic = get_instrument_close_topic(instrument_id);
1059
1060        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1061            move |close: &InstrumentClose| {
1062                get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1063            },
1064        )));
1065
1066        DataActorCore::subscribe_instrument_close(
1067            self,
1068            topic,
1069            handler,
1070            instrument_id,
1071            client_id,
1072            params,
1073        );
1074    }
1075
1076    #[cfg(feature = "defi")]
1077    /// Subscribe to streaming [`Block`] data for the `chain`.
1078    fn subscribe_blocks(
1079        &mut self,
1080        chain: Blockchain,
1081        client_id: Option<ClientId>,
1082        params: Option<IndexMap<String, String>>,
1083    ) where
1084        Self: 'static + Debug + Sized,
1085    {
1086        let actor_id = self.actor_id().inner();
1087        let topic = get_defi_blocks_topic(chain);
1088
1089        let handler =
1090            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1091                get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1092            })));
1093
1094        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1095    }
1096
1097    #[cfg(feature = "defi")]
1098    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `address`.
1099    fn subscribe_pool(
1100        &mut self,
1101        address: Address,
1102        client_id: Option<ClientId>,
1103        params: Option<IndexMap<String, String>>,
1104    ) where
1105        Self: 'static + Debug + Sized,
1106    {
1107        let actor_id = self.actor_id().inner();
1108        let topic = get_defi_pool_topic(address);
1109
1110        let handler =
1111            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1112                get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1113            })));
1114
1115        DataActorCore::subscribe_pool(self, topic, handler, address, client_id, params);
1116    }
1117
1118    #[cfg(feature = "defi")]
1119    /// Subscribe to streaming [`PoolSwap`] data for the `address`.
1120    fn subscribe_pool_swaps(
1121        &mut self,
1122        address: Address,
1123        client_id: Option<ClientId>,
1124        params: Option<IndexMap<String, String>>,
1125    ) where
1126        Self: 'static + Debug + Sized,
1127    {
1128        let actor_id = self.actor_id().inner();
1129        let topic = get_defi_pool_swaps_topic(address);
1130
1131        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1132            move |swap: &PoolSwap| {
1133                get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1134            },
1135        )));
1136
1137        DataActorCore::subscribe_pool_swaps(self, topic, handler, address, client_id, params);
1138    }
1139
1140    #[cfg(feature = "defi")]
1141    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `address`.
1142    fn subscribe_pool_liquidity_updates(
1143        &mut self,
1144        address: Address,
1145        client_id: Option<ClientId>,
1146        params: Option<IndexMap<String, String>>,
1147    ) where
1148        Self: 'static + Debug + Sized,
1149    {
1150        let actor_id = self.actor_id().inner();
1151        let topic = get_defi_liquidity_topic(address);
1152
1153        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1154            move |update: &PoolLiquidityUpdate| {
1155                get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1156            },
1157        )));
1158
1159        DataActorCore::subscribe_pool_liquidity_updates(
1160            self, topic, handler, address, client_id, params,
1161        );
1162    }
1163
1164    /// Unsubscribe from streaming `data_type` data.
1165    fn unsubscribe_data(
1166        &mut self,
1167        data_type: DataType,
1168        client_id: Option<ClientId>,
1169        params: Option<IndexMap<String, String>>,
1170    ) where
1171        Self: 'static + Debug + Sized,
1172    {
1173        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1174    }
1175
1176    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1177    fn unsubscribe_instruments(
1178        &mut self,
1179        venue: Venue,
1180        client_id: Option<ClientId>,
1181        params: Option<IndexMap<String, String>>,
1182    ) where
1183        Self: 'static + Debug + Sized,
1184    {
1185        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1186    }
1187
1188    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1189    fn unsubscribe_instrument(
1190        &mut self,
1191        instrument_id: InstrumentId,
1192        client_id: Option<ClientId>,
1193        params: Option<IndexMap<String, String>>,
1194    ) where
1195        Self: 'static + Debug + Sized,
1196    {
1197        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1198    }
1199
1200    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1201    fn unsubscribe_book_deltas(
1202        &mut self,
1203        instrument_id: InstrumentId,
1204        client_id: Option<ClientId>,
1205        params: Option<IndexMap<String, String>>,
1206    ) where
1207        Self: 'static + Debug + Sized,
1208    {
1209        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1210    }
1211
1212    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1213    fn unsubscribe_book_at_interval(
1214        &mut self,
1215        instrument_id: InstrumentId,
1216        interval_ms: NonZeroUsize,
1217        client_id: Option<ClientId>,
1218        params: Option<IndexMap<String, String>>,
1219    ) where
1220        Self: 'static + Debug + Sized,
1221    {
1222        DataActorCore::unsubscribe_book_at_interval(
1223            self,
1224            instrument_id,
1225            interval_ms,
1226            client_id,
1227            params,
1228        );
1229    }
1230
1231    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1232    fn unsubscribe_quotes(
1233        &mut self,
1234        instrument_id: InstrumentId,
1235        client_id: Option<ClientId>,
1236        params: Option<IndexMap<String, String>>,
1237    ) where
1238        Self: 'static + Debug + Sized,
1239    {
1240        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1241    }
1242
1243    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1244    fn unsubscribe_trades(
1245        &mut self,
1246        instrument_id: InstrumentId,
1247        client_id: Option<ClientId>,
1248        params: Option<IndexMap<String, String>>,
1249    ) where
1250        Self: 'static + Debug + Sized,
1251    {
1252        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1253    }
1254
1255    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1256    fn unsubscribe_bars(
1257        &mut self,
1258        bar_type: BarType,
1259        client_id: Option<ClientId>,
1260        params: Option<IndexMap<String, String>>,
1261    ) where
1262        Self: 'static + Debug + Sized,
1263    {
1264        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1265    }
1266
1267    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1268    fn unsubscribe_mark_prices(
1269        &mut self,
1270        instrument_id: InstrumentId,
1271        client_id: Option<ClientId>,
1272        params: Option<IndexMap<String, String>>,
1273    ) where
1274        Self: 'static + Debug + Sized,
1275    {
1276        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1277    }
1278
1279    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1280    fn unsubscribe_index_prices(
1281        &mut self,
1282        instrument_id: InstrumentId,
1283        client_id: Option<ClientId>,
1284        params: Option<IndexMap<String, String>>,
1285    ) where
1286        Self: 'static + Debug + Sized,
1287    {
1288        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1289    }
1290
1291    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1292    fn unsubscribe_instrument_status(
1293        &mut self,
1294        instrument_id: InstrumentId,
1295        client_id: Option<ClientId>,
1296        params: Option<IndexMap<String, String>>,
1297    ) where
1298        Self: 'static + Debug + Sized,
1299    {
1300        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1301    }
1302
1303    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1304    fn unsubscribe_instrument_close(
1305        &mut self,
1306        instrument_id: InstrumentId,
1307        client_id: Option<ClientId>,
1308        params: Option<IndexMap<String, String>>,
1309    ) where
1310        Self: 'static + Debug + Sized,
1311    {
1312        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1313    }
1314
1315    #[cfg(feature = "defi")]
1316    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1317    fn unsubscribe_blocks(
1318        &mut self,
1319        chain: Blockchain,
1320        client_id: Option<ClientId>,
1321        params: Option<IndexMap<String, String>>,
1322    ) where
1323        Self: 'static + Debug + Sized,
1324    {
1325        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1326    }
1327
1328    #[cfg(feature = "defi")]
1329    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `address`.
1330    fn unsubscribe_pool(
1331        &mut self,
1332        address: Address,
1333        client_id: Option<ClientId>,
1334        params: Option<IndexMap<String, String>>,
1335    ) where
1336        Self: 'static + Debug + Sized,
1337    {
1338        DataActorCore::unsubscribe_pool(self, address, client_id, params);
1339    }
1340
1341    #[cfg(feature = "defi")]
1342    /// Unsubscribe from streaming [`PoolSwap`] data for the `address`.
1343    fn unsubscribe_pool_swaps(
1344        &mut self,
1345        address: Address,
1346        client_id: Option<ClientId>,
1347        params: Option<IndexMap<String, String>>,
1348    ) where
1349        Self: 'static + Debug + Sized,
1350    {
1351        DataActorCore::unsubscribe_pool_swaps(self, address, client_id, params);
1352    }
1353
1354    #[cfg(feature = "defi")]
1355    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `address`.
1356    fn unsubscribe_pool_liquidity_updates(
1357        &mut self,
1358        address: Address,
1359        client_id: Option<ClientId>,
1360        params: Option<IndexMap<String, String>>,
1361    ) where
1362        Self: 'static + Debug + Sized,
1363    {
1364        DataActorCore::unsubscribe_pool_liquidity_updates(self, address, client_id, params);
1365    }
1366
1367    /// Request historical custom data of the given `data_type`.
1368    ///
1369    /// # Errors
1370    ///
1371    /// Returns an error if input parameters are invalid.
1372    fn request_data(
1373        &mut self,
1374        data_type: DataType,
1375        client_id: ClientId,
1376        start: Option<DateTime<Utc>>,
1377        end: Option<DateTime<Utc>>,
1378        limit: Option<NonZeroUsize>,
1379        params: Option<IndexMap<String, String>>,
1380    ) -> anyhow::Result<UUID4>
1381    where
1382        Self: 'static + Debug + Sized,
1383    {
1384        let actor_id = self.actor_id().inner();
1385        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1386            move |resp: &CustomDataResponse| {
1387                get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1388            },
1389        )));
1390
1391        DataActorCore::request_data(
1392            self, data_type, client_id, start, end, limit, params, handler,
1393        )
1394    }
1395
1396    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1397    ///
1398    /// # Errors
1399    ///
1400    /// Returns an error if input parameters are invalid.
1401    fn request_instrument(
1402        &mut self,
1403        instrument_id: InstrumentId,
1404        start: Option<DateTime<Utc>>,
1405        end: Option<DateTime<Utc>>,
1406        client_id: Option<ClientId>,
1407        params: Option<IndexMap<String, String>>,
1408    ) -> anyhow::Result<UUID4>
1409    where
1410        Self: 'static + Debug + Sized,
1411    {
1412        let actor_id = self.actor_id().inner();
1413        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1414            move |resp: &InstrumentResponse| {
1415                get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1416            },
1417        )));
1418
1419        DataActorCore::request_instrument(
1420            self,
1421            instrument_id,
1422            start,
1423            end,
1424            client_id,
1425            params,
1426            handler,
1427        )
1428    }
1429
1430    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1431    ///
1432    /// # Errors
1433    ///
1434    /// Returns an error if input parameters are invalid.
1435    fn request_instruments(
1436        &mut self,
1437        venue: Option<Venue>,
1438        start: Option<DateTime<Utc>>,
1439        end: Option<DateTime<Utc>>,
1440        client_id: Option<ClientId>,
1441        params: Option<IndexMap<String, String>>,
1442    ) -> anyhow::Result<UUID4>
1443    where
1444        Self: 'static + Debug + Sized,
1445    {
1446        let actor_id = self.actor_id().inner();
1447        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1448            move |resp: &InstrumentsResponse| {
1449                get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1450            },
1451        )));
1452
1453        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1454    }
1455
1456    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
1457    ///
1458    /// # Errors
1459    ///
1460    /// Returns an error if input parameters are invalid.
1461    fn request_book_snapshot(
1462        &mut self,
1463        instrument_id: InstrumentId,
1464        depth: Option<NonZeroUsize>,
1465        client_id: Option<ClientId>,
1466        params: Option<IndexMap<String, String>>,
1467    ) -> anyhow::Result<UUID4>
1468    where
1469        Self: 'static + Debug + Sized,
1470    {
1471        let actor_id = self.actor_id().inner();
1472        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1473            move |resp: &BookResponse| {
1474                get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1475            },
1476        )));
1477
1478        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1479    }
1480
1481    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
1482    ///
1483    /// # Errors
1484    ///
1485    /// Returns an error if input parameters are invalid.
1486    fn request_quotes(
1487        &mut self,
1488        instrument_id: InstrumentId,
1489        start: Option<DateTime<Utc>>,
1490        end: Option<DateTime<Utc>>,
1491        limit: Option<NonZeroUsize>,
1492        client_id: Option<ClientId>,
1493        params: Option<IndexMap<String, String>>,
1494    ) -> anyhow::Result<UUID4>
1495    where
1496        Self: 'static + Debug + Sized,
1497    {
1498        let actor_id = self.actor_id().inner();
1499        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1500            move |resp: &QuotesResponse| {
1501                get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1502            },
1503        )));
1504
1505        DataActorCore::request_quotes(
1506            self,
1507            instrument_id,
1508            start,
1509            end,
1510            limit,
1511            client_id,
1512            params,
1513            handler,
1514        )
1515    }
1516
1517    /// Request historical [`TradeTick`] data for the given `instrument_id`.
1518    ///
1519    /// # Errors
1520    ///
1521    /// Returns an error if input parameters are invalid.
1522    fn request_trades(
1523        &mut self,
1524        instrument_id: InstrumentId,
1525        start: Option<DateTime<Utc>>,
1526        end: Option<DateTime<Utc>>,
1527        limit: Option<NonZeroUsize>,
1528        client_id: Option<ClientId>,
1529        params: Option<IndexMap<String, String>>,
1530    ) -> anyhow::Result<UUID4>
1531    where
1532        Self: 'static + Debug + Sized,
1533    {
1534        let actor_id = self.actor_id().inner();
1535        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1536            move |resp: &TradesResponse| {
1537                get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1538            },
1539        )));
1540
1541        DataActorCore::request_trades(
1542            self,
1543            instrument_id,
1544            start,
1545            end,
1546            limit,
1547            client_id,
1548            params,
1549            handler,
1550        )
1551    }
1552
1553    /// Request historical [`Bar`] data for the given `bar_type`.
1554    ///
1555    /// # Errors
1556    ///
1557    /// Returns an error if input parameters are invalid.
1558    fn request_bars(
1559        &mut self,
1560        bar_type: BarType,
1561        start: Option<DateTime<Utc>>,
1562        end: Option<DateTime<Utc>>,
1563        limit: Option<NonZeroUsize>,
1564        client_id: Option<ClientId>,
1565        params: Option<IndexMap<String, String>>,
1566    ) -> anyhow::Result<UUID4>
1567    where
1568        Self: 'static + Debug + Sized,
1569    {
1570        let actor_id = self.actor_id().inner();
1571        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1572            move |resp: &BarsResponse| {
1573                get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1574            },
1575        )));
1576
1577        DataActorCore::request_bars(
1578            self, bar_type, start, end, limit, client_id, params, handler,
1579        )
1580    }
1581}
1582
1583// Blanket implementation: any DataActor automatically implements Actor
1584impl<T> Actor for T
1585where
1586    T: DataActor + Debug + 'static,
1587{
1588    fn id(&self) -> Ustr {
1589        self.actor_id.inner()
1590    }
1591
1592    fn handle(&mut self, msg: &dyn Any) {
1593        // Default empty implementation - concrete actors can override if needed
1594    }
1595
1596    fn as_any(&self) -> &dyn Any {
1597        self
1598    }
1599}
1600
1601// Blanket implementation: any DataActor automatically implements Component
1602impl<T> Component for T
1603where
1604    T: DataActor + Debug + 'static,
1605{
1606    fn component_id(&self) -> ComponentId {
1607        ComponentId::new(self.actor_id.inner().as_str())
1608    }
1609
1610    fn state(&self) -> ComponentState {
1611        self.state
1612    }
1613
1614    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1615        self.state = self.state.transition(&trigger)?;
1616        log::info!("{}", self.state);
1617        Ok(())
1618    }
1619
1620    fn register(
1621        &mut self,
1622        trader_id: TraderId,
1623        clock: Rc<RefCell<dyn Clock>>,
1624        cache: Rc<RefCell<Cache>>,
1625    ) -> anyhow::Result<()> {
1626        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1627
1628        // Register default time event handler for this actor
1629        let actor_id = self.actor_id().inner();
1630        let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
1631            if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1632                actor.handle_time_event(&event);
1633            } else {
1634                log::error!("Actor {actor_id} not found for time event handling");
1635            }
1636        }));
1637
1638        clock.borrow_mut().register_default_handler(callback);
1639
1640        self.initialize()
1641    }
1642
1643    fn on_start(&mut self) -> anyhow::Result<()> {
1644        DataActor::on_start(self)
1645    }
1646
1647    fn on_stop(&mut self) -> anyhow::Result<()> {
1648        DataActor::on_stop(self)
1649    }
1650
1651    fn on_resume(&mut self) -> anyhow::Result<()> {
1652        DataActor::on_resume(self)
1653    }
1654
1655    fn on_degrade(&mut self) -> anyhow::Result<()> {
1656        DataActor::on_degrade(self)
1657    }
1658
1659    fn on_fault(&mut self) -> anyhow::Result<()> {
1660        DataActor::on_fault(self)
1661    }
1662
1663    fn on_reset(&mut self) -> anyhow::Result<()> {
1664        DataActor::on_reset(self)
1665    }
1666
1667    fn on_dispose(&mut self) -> anyhow::Result<()> {
1668        DataActor::on_dispose(self)
1669    }
1670}
1671
1672/// Core functionality for all actors.
1673pub struct DataActorCore {
1674    /// The actor identifier.
1675    pub actor_id: ActorId,
1676    /// The actors configuration.
1677    pub config: DataActorConfig,
1678    trader_id: Option<TraderId>,
1679    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
1680    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
1681    state: ComponentState,
1682    warning_events: AHashSet<String>, // TODO: TBD
1683    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
1684    signal_classes: AHashMap<String, String>,
1685    #[cfg(feature = "indicators")]
1686    indicators: Indicators,
1687    topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
1688}
1689
1690impl Debug for DataActorCore {
1691    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1692        f.debug_struct(stringify!(DataActorCore))
1693            .field("actor_id", &self.actor_id)
1694            .field("config", &self.config)
1695            .field("state", &self.state)
1696            .field("trader_id", &self.trader_id)
1697            .finish()
1698    }
1699}
1700
1701impl DataActorCore {
1702    /// Creates a new [`DataActorCore`] instance.
1703    pub fn new(config: DataActorConfig) -> Self {
1704        let actor_id = config
1705            .actor_id
1706            .unwrap_or_else(|| Self::default_actor_id(&config));
1707
1708        Self {
1709            actor_id,
1710            config,
1711            trader_id: None, // None until registered
1712            clock: None,     // None until registered
1713            cache: None,     // None until registered
1714            state: ComponentState::default(),
1715            warning_events: AHashSet::new(),
1716            pending_requests: AHashMap::new(),
1717            signal_classes: AHashMap::new(),
1718            #[cfg(feature = "indicators")]
1719            indicators: Indicators::default(),
1720            topic_handlers: AHashMap::new(),
1721        }
1722    }
1723
1724    /// Returns the trader ID this actor is registered to.
1725    pub fn trader_id(&self) -> Option<TraderId> {
1726        self.trader_id
1727    }
1728
1729    pub fn actor_id(&self) -> ActorId {
1730        self.actor_id
1731    }
1732
1733    fn default_actor_id(config: &DataActorConfig) -> ActorId {
1734        let memory_address = std::ptr::from_ref(config) as *const _ as usize;
1735        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
1736    }
1737
1738    pub fn timestamp_ns(&self) -> UnixNanos {
1739        self.clock_ref().timestamp_ns()
1740    }
1741
1742    /// Returns the clock for the actor (if registered).
1743    ///
1744    /// # Panics
1745    ///
1746    /// Panics if the actor has not been registered with a trader.
1747    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
1748        self.clock
1749            .as_ref()
1750            .unwrap_or_else(|| {
1751                panic!(
1752                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
1753                    self.actor_id, self.trader_id
1754                )
1755            })
1756            .borrow_mut()
1757    }
1758
1759    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
1760        self.clock
1761            .as_ref()
1762            .unwrap_or_else(|| {
1763                panic!(
1764                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
1765                    self.actor_id, self.trader_id
1766                )
1767            })
1768            .borrow()
1769    }
1770
1771    // -- REGISTRATION ----------------------------------------------------------------------------
1772
1773    /// Register the data actor with a trader.
1774    ///
1775    /// # Errors
1776    ///
1777    /// Returns an error if the actor has already been registered with a trader
1778    /// or if the provided dependencies are invalid.
1779    pub fn register(
1780        &mut self,
1781        trader_id: TraderId,
1782        clock: Rc<RefCell<dyn Clock>>,
1783        cache: Rc<RefCell<Cache>>,
1784    ) -> anyhow::Result<()> {
1785        if let Some(existing_trader_id) = self.trader_id {
1786            anyhow::bail!(
1787                "DataActor {} already registered with trader {existing_trader_id}",
1788                self.actor_id
1789            );
1790        }
1791
1792        // Validate clock by attempting to access it
1793        {
1794            let _timestamp = clock.borrow().timestamp_ns();
1795        }
1796
1797        // Validate cache by attempting to access it
1798        {
1799            let _cache_borrow = cache.borrow();
1800        }
1801
1802        self.trader_id = Some(trader_id);
1803        self.clock = Some(clock);
1804        self.cache = Some(cache);
1805
1806        // Verify complete registration
1807        if !self.is_properly_registered() {
1808            anyhow::bail!(
1809                "DataActor {} registration incomplete - validation failed",
1810                self.actor_id
1811            );
1812        }
1813
1814        log::info!("Registered {} with trader {trader_id}", self.actor_id);
1815        Ok(())
1816    }
1817
1818    /// Register an event type for warning log levels.
1819    pub fn register_warning_event(&mut self, event_type: &str) {
1820        self.warning_events.insert(event_type.to_string());
1821        log::debug!("Registered event type '{event_type}' for warning logs")
1822    }
1823
1824    /// Deregister an event type from warning log levels.
1825    pub fn deregister_warning_event(&mut self, event_type: &str) {
1826        self.warning_events.remove(event_type);
1827        log::debug!("Deregistered event type '{event_type}' from warning logs")
1828    }
1829
1830    fn check_registered(&self) {
1831        assert!(
1832            self.trader_id.is_some(),
1833            "Actor has not been registered with a Trader"
1834        );
1835    }
1836
1837    /// Validates registration state without panicking.
1838    fn is_properly_registered(&self) -> bool {
1839        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
1840    }
1841
1842    fn send_data_cmd(&self, command: DataCommand) {
1843        if self.config.log_commands {
1844            log::info!("{CMD}{SEND} {command:?}");
1845        }
1846
1847        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
1848        msgbus::send_any(endpoint, command.as_any())
1849    }
1850
1851    fn send_data_req(&self, request: RequestCommand) {
1852        if self.config.log_commands {
1853            log::info!("{REQ}{SEND} {request:?}");
1854        }
1855
1856        // For now, simplified approach - data requests without dynamic handlers
1857        // TODO: Implement proper dynamic dispatch for response handlers
1858        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
1859        msgbus::send_any(endpoint, request.as_any())
1860    }
1861
1862    /// Sends a shutdown command to the system with an optional reason.
1863    ///
1864    /// # Panics
1865    ///
1866    /// Panics if the actor is not registered or has no trader ID.
1867    pub fn shutdown_system(&self, reason: Option<String>) {
1868        self.check_registered();
1869
1870        // SAFETY: Checked registered before unwrapping trader ID
1871        let command = ShutdownSystem::new(
1872            self.trader_id().unwrap(),
1873            self.actor_id.inner(),
1874            reason,
1875            UUID4::new(),
1876            self.timestamp_ns(),
1877        );
1878
1879        let endpoint = "command.system.shutdown".into();
1880        msgbus::send_any(endpoint, command.as_any());
1881    }
1882
1883    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
1884
1885    fn get_or_create_handler_for_topic<F>(
1886        &mut self,
1887        topic: MStr<Topic>,
1888        create_handler: F,
1889    ) -> ShareableMessageHandler
1890    where
1891        F: FnOnce() -> ShareableMessageHandler,
1892    {
1893        if let Some(existing_handler) = self.topic_handlers.get(&topic) {
1894            existing_handler.clone()
1895        } else {
1896            let new_handler = create_handler();
1897            self.topic_handlers.insert(topic, new_handler.clone());
1898            new_handler
1899        }
1900    }
1901
1902    fn get_handler_for_topic(&self, topic: MStr<Topic>) -> Option<ShareableMessageHandler> {
1903        self.topic_handlers.get(&topic).cloned()
1904    }
1905
1906    /// Helper method for registering data subscriptions from the trait.
1907    ///
1908    /// # Panics
1909    ///
1910    /// Panics if the actor is not properly registered.
1911    pub fn subscribe_data(
1912        &mut self,
1913        handler: ShareableMessageHandler,
1914        data_type: DataType,
1915        client_id: Option<ClientId>,
1916        params: Option<IndexMap<String, String>>,
1917    ) {
1918        if !self.is_properly_registered() {
1919            panic!(
1920                "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
1921                self.actor_id,
1922                self.trader_id,
1923                self.clock.is_some(),
1924                self.cache.is_some()
1925            );
1926        }
1927
1928        let topic = get_custom_topic(&data_type);
1929        self.topic_handlers.insert(topic, handler.clone());
1930        msgbus::subscribe_topic(topic, handler, None);
1931
1932        // If no client ID specified, just subscribe to the topic
1933        if client_id.is_none() {
1934            return;
1935        }
1936
1937        let command = SubscribeCommand::Data(SubscribeCustomData {
1938            data_type,
1939            client_id,
1940            venue: None,
1941            command_id: UUID4::new(),
1942            ts_init: self.timestamp_ns(),
1943            params,
1944        });
1945
1946        self.send_data_cmd(DataCommand::Subscribe(command));
1947    }
1948
1949    /// Helper method for registering quotes subscriptions from the trait.
1950    pub fn subscribe_quotes(
1951        &mut self,
1952        topic: MStr<Topic>,
1953        handler: ShareableMessageHandler,
1954        instrument_id: InstrumentId,
1955        client_id: Option<ClientId>,
1956        params: Option<IndexMap<String, String>>,
1957    ) {
1958        self.check_registered();
1959
1960        self.topic_handlers.insert(topic, handler.clone());
1961        msgbus::subscribe_topic(topic, handler, None);
1962
1963        let command = SubscribeCommand::Quotes(SubscribeQuotes {
1964            instrument_id,
1965            client_id,
1966            venue: Some(instrument_id.venue),
1967            command_id: UUID4::new(),
1968            ts_init: self.timestamp_ns(),
1969            params,
1970        });
1971
1972        self.send_data_cmd(DataCommand::Subscribe(command));
1973    }
1974
1975    /// Helper method for registering instruments subscriptions from the trait.
1976    pub fn subscribe_instruments(
1977        &mut self,
1978        topic: MStr<Topic>,
1979        handler: ShareableMessageHandler,
1980        venue: Venue,
1981        client_id: Option<ClientId>,
1982        params: Option<IndexMap<String, String>>,
1983    ) {
1984        self.check_registered();
1985
1986        self.topic_handlers.insert(topic, handler.clone());
1987        msgbus::subscribe_topic(topic, handler, None);
1988
1989        let command = SubscribeCommand::Instruments(SubscribeInstruments {
1990            client_id,
1991            venue,
1992            command_id: UUID4::new(),
1993            ts_init: self.timestamp_ns(),
1994            params,
1995        });
1996
1997        self.send_data_cmd(DataCommand::Subscribe(command));
1998    }
1999
2000    /// Helper method for registering instrument subscriptions from the trait.
2001    pub fn subscribe_instrument(
2002        &mut self,
2003        topic: MStr<Topic>,
2004        handler: ShareableMessageHandler,
2005        instrument_id: InstrumentId,
2006        client_id: Option<ClientId>,
2007        params: Option<IndexMap<String, String>>,
2008    ) {
2009        self.check_registered();
2010
2011        self.topic_handlers.insert(topic, handler.clone());
2012        msgbus::subscribe_topic(topic, handler, None);
2013
2014        let command = SubscribeCommand::Instrument(SubscribeInstrument {
2015            instrument_id,
2016            client_id,
2017            venue: Some(instrument_id.venue),
2018            command_id: UUID4::new(),
2019            ts_init: self.timestamp_ns(),
2020            params,
2021        });
2022
2023        self.send_data_cmd(DataCommand::Subscribe(command));
2024    }
2025
2026    /// Helper method for registering book deltas subscriptions from the trait.
2027    #[allow(clippy::too_many_arguments)]
2028    pub fn subscribe_book_deltas(
2029        &mut self,
2030        topic: MStr<Topic>,
2031        handler: ShareableMessageHandler,
2032        instrument_id: InstrumentId,
2033        book_type: BookType,
2034        depth: Option<NonZeroUsize>,
2035        client_id: Option<ClientId>,
2036        managed: bool,
2037        params: Option<IndexMap<String, String>>,
2038    ) {
2039        self.check_registered();
2040
2041        self.topic_handlers.insert(topic, handler.clone());
2042        msgbus::subscribe_topic(topic, handler, None);
2043
2044        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2045            instrument_id,
2046            book_type,
2047            client_id,
2048            venue: Some(instrument_id.venue),
2049            command_id: UUID4::new(),
2050            ts_init: self.timestamp_ns(),
2051            depth,
2052            managed,
2053            params,
2054        });
2055
2056        self.send_data_cmd(DataCommand::Subscribe(command));
2057    }
2058
2059    /// Helper method for registering book snapshots subscriptions from the trait.
2060    pub fn subscribe_book_at_interval(
2061        &mut self,
2062        topic: MStr<Topic>,
2063        handler: ShareableMessageHandler,
2064        instrument_id: InstrumentId,
2065        book_type: BookType,
2066        depth: Option<NonZeroUsize>,
2067        interval_ms: NonZeroUsize,
2068        client_id: Option<ClientId>,
2069        params: Option<IndexMap<String, String>>,
2070    ) {
2071        self.check_registered();
2072
2073        self.topic_handlers.insert(topic, handler.clone());
2074        msgbus::subscribe_topic(topic, handler, None);
2075
2076        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2077            instrument_id,
2078            book_type,
2079            client_id,
2080            venue: Some(instrument_id.venue),
2081            command_id: UUID4::new(),
2082            ts_init: self.timestamp_ns(),
2083            depth,
2084            interval_ms,
2085            params,
2086        });
2087
2088        self.send_data_cmd(DataCommand::Subscribe(command));
2089    }
2090
2091    /// Helper method for registering trades subscriptions from the trait.
2092    pub fn subscribe_trades(
2093        &mut self,
2094        topic: MStr<Topic>,
2095        handler: ShareableMessageHandler,
2096        instrument_id: InstrumentId,
2097        client_id: Option<ClientId>,
2098        params: Option<IndexMap<String, String>>,
2099    ) {
2100        self.check_registered();
2101
2102        self.topic_handlers.insert(topic, handler.clone());
2103        msgbus::subscribe_topic(topic, handler, None);
2104
2105        let command = SubscribeCommand::Trades(SubscribeTrades {
2106            instrument_id,
2107            client_id,
2108            venue: Some(instrument_id.venue),
2109            command_id: UUID4::new(),
2110            ts_init: self.timestamp_ns(),
2111            params,
2112        });
2113
2114        self.send_data_cmd(DataCommand::Subscribe(command));
2115    }
2116
2117    /// Helper method for registering bars subscriptions from the trait.
2118    pub fn subscribe_bars(
2119        &mut self,
2120        topic: MStr<Topic>,
2121        handler: ShareableMessageHandler,
2122        bar_type: BarType,
2123        client_id: Option<ClientId>,
2124        await_partial: bool,
2125        params: Option<IndexMap<String, String>>,
2126    ) {
2127        self.check_registered();
2128
2129        self.topic_handlers.insert(topic, handler.clone());
2130        msgbus::subscribe_topic(topic, handler, None);
2131
2132        let command = SubscribeCommand::Bars(SubscribeBars {
2133            bar_type,
2134            client_id,
2135            venue: Some(bar_type.instrument_id().venue),
2136            command_id: UUID4::new(),
2137            ts_init: self.timestamp_ns(),
2138            await_partial,
2139            params,
2140        });
2141
2142        self.send_data_cmd(DataCommand::Subscribe(command));
2143    }
2144
2145    /// Helper method for registering mark prices subscriptions from the trait.
2146    pub fn subscribe_mark_prices(
2147        &mut self,
2148        topic: MStr<Topic>,
2149        handler: ShareableMessageHandler,
2150        instrument_id: InstrumentId,
2151        client_id: Option<ClientId>,
2152        params: Option<IndexMap<String, String>>,
2153    ) {
2154        self.check_registered();
2155
2156        self.topic_handlers.insert(topic, handler.clone());
2157        msgbus::subscribe_topic(topic, handler, None);
2158
2159        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2160            instrument_id,
2161            client_id,
2162            venue: Some(instrument_id.venue),
2163            command_id: UUID4::new(),
2164            ts_init: self.timestamp_ns(),
2165            params,
2166        });
2167
2168        self.send_data_cmd(DataCommand::Subscribe(command));
2169    }
2170
2171    /// Helper method for registering index prices subscriptions from the trait.
2172    pub fn subscribe_index_prices(
2173        &mut self,
2174        topic: MStr<Topic>,
2175        handler: ShareableMessageHandler,
2176        instrument_id: InstrumentId,
2177        client_id: Option<ClientId>,
2178        params: Option<IndexMap<String, String>>,
2179    ) {
2180        self.check_registered();
2181
2182        self.topic_handlers.insert(topic, handler.clone());
2183        msgbus::subscribe_topic(topic, handler, None);
2184
2185        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2186            instrument_id,
2187            client_id,
2188            venue: Some(instrument_id.venue),
2189            command_id: UUID4::new(),
2190            ts_init: self.timestamp_ns(),
2191            params,
2192        });
2193
2194        self.send_data_cmd(DataCommand::Subscribe(command));
2195    }
2196
2197    /// Helper method for registering instrument status subscriptions from the trait.
2198    pub fn subscribe_instrument_status(
2199        &mut self,
2200        topic: MStr<Topic>,
2201        handler: ShareableMessageHandler,
2202        instrument_id: InstrumentId,
2203        client_id: Option<ClientId>,
2204        params: Option<IndexMap<String, String>>,
2205    ) {
2206        self.check_registered();
2207
2208        self.topic_handlers.insert(topic, handler.clone());
2209        msgbus::subscribe_topic(topic, handler, None);
2210
2211        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2212            instrument_id,
2213            client_id,
2214            venue: Some(instrument_id.venue),
2215            command_id: UUID4::new(),
2216            ts_init: self.timestamp_ns(),
2217            params,
2218        });
2219
2220        self.send_data_cmd(DataCommand::Subscribe(command));
2221    }
2222
2223    /// Helper method for registering instrument close subscriptions from the trait.
2224    pub fn subscribe_instrument_close(
2225        &mut self,
2226        topic: MStr<Topic>,
2227        handler: ShareableMessageHandler,
2228        instrument_id: InstrumentId,
2229        client_id: Option<ClientId>,
2230        params: Option<IndexMap<String, String>>,
2231    ) {
2232        self.check_registered();
2233
2234        self.topic_handlers.insert(topic, handler.clone());
2235        msgbus::subscribe_topic(topic, handler, None);
2236
2237        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2238            instrument_id,
2239            client_id,
2240            venue: Some(instrument_id.venue),
2241            command_id: UUID4::new(),
2242            ts_init: self.timestamp_ns(),
2243            params,
2244        });
2245
2246        self.send_data_cmd(DataCommand::Subscribe(command));
2247    }
2248
2249    #[cfg(feature = "defi")]
2250    /// Helper method for registering block subscriptions from the trait.
2251    pub fn subscribe_blocks(
2252        &mut self,
2253        topic: MStr<Topic>,
2254        handler: ShareableMessageHandler,
2255        chain: Blockchain,
2256        client_id: Option<ClientId>,
2257        params: Option<IndexMap<String, String>>,
2258    ) {
2259        use crate::messages::defi::{DefiSubscribeCommand, SubscribeBlocks};
2260
2261        self.check_registered();
2262
2263        self.topic_handlers.insert(topic, handler.clone());
2264        msgbus::subscribe_topic(topic, handler, None);
2265
2266        let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
2267            chain,
2268            client_id,
2269            command_id: UUID4::new(),
2270            ts_init: self.timestamp_ns(),
2271            params,
2272        });
2273
2274        self.send_data_cmd(DataCommand::DefiSubscribe(command));
2275    }
2276
2277    #[cfg(feature = "defi")]
2278    /// Helper method for registering pool subscriptions from the trait.
2279    pub fn subscribe_pool(
2280        &mut self,
2281        topic: MStr<Topic>,
2282        handler: ShareableMessageHandler,
2283        address: Address,
2284        client_id: Option<ClientId>,
2285        params: Option<IndexMap<String, String>>,
2286    ) {
2287        use crate::messages::defi::{DefiSubscribeCommand, SubscribePool};
2288
2289        self.check_registered();
2290
2291        self.topic_handlers.insert(topic, handler.clone());
2292        msgbus::subscribe_topic(topic, handler, None);
2293
2294        let command = DefiSubscribeCommand::Pool(SubscribePool {
2295            address,
2296            client_id,
2297            command_id: UUID4::new(),
2298            ts_init: self.timestamp_ns(),
2299            params,
2300        });
2301
2302        self.send_data_cmd(DataCommand::DefiSubscribe(command));
2303    }
2304
2305    #[cfg(feature = "defi")]
2306    /// Helper method for registering pool swap subscriptions from the trait.
2307    pub fn subscribe_pool_swaps(
2308        &mut self,
2309        topic: MStr<Topic>,
2310        handler: ShareableMessageHandler,
2311        address: Address,
2312        client_id: Option<ClientId>,
2313        params: Option<IndexMap<String, String>>,
2314    ) {
2315        use crate::messages::defi::{DefiSubscribeCommand, SubscribePoolSwaps};
2316
2317        self.check_registered();
2318
2319        self.topic_handlers.insert(topic, handler.clone());
2320        msgbus::subscribe_topic(topic, handler, None);
2321
2322        let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
2323            address,
2324            client_id,
2325            command_id: UUID4::new(),
2326            ts_init: self.timestamp_ns(),
2327            params,
2328        });
2329
2330        self.send_data_cmd(DataCommand::DefiSubscribe(command));
2331    }
2332
2333    #[cfg(feature = "defi")]
2334    /// Helper method for registering pool liquidity update subscriptions from the trait.
2335    pub fn subscribe_pool_liquidity_updates(
2336        &mut self,
2337        topic: MStr<Topic>,
2338        handler: ShareableMessageHandler,
2339        address: Address,
2340        client_id: Option<ClientId>,
2341        params: Option<IndexMap<String, String>>,
2342    ) {
2343        use crate::messages::defi::{DefiSubscribeCommand, SubscribePoolLiquidityUpdates};
2344
2345        self.check_registered();
2346
2347        self.topic_handlers.insert(topic, handler.clone());
2348        msgbus::subscribe_topic(topic, handler, None);
2349
2350        let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
2351            address,
2352            client_id,
2353            command_id: UUID4::new(),
2354            ts_init: self.timestamp_ns(),
2355            params,
2356        });
2357
2358        self.send_data_cmd(DataCommand::DefiSubscribe(command));
2359    }
2360
2361    /// Helper method for unsubscribing from data.
2362    pub fn unsubscribe_data(
2363        &self,
2364        data_type: DataType,
2365        client_id: Option<ClientId>,
2366        params: Option<IndexMap<String, String>>,
2367    ) {
2368        self.check_registered();
2369
2370        let topic = get_custom_topic(&data_type);
2371        if let Some(handler) = self.topic_handlers.get(&topic) {
2372            msgbus::unsubscribe_topic(topic, handler.clone());
2373        };
2374
2375        if client_id.is_none() {
2376            return;
2377        }
2378
2379        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2380            data_type,
2381            client_id,
2382            venue: None,
2383            command_id: UUID4::new(),
2384            ts_init: self.timestamp_ns(),
2385            params,
2386        });
2387
2388        self.send_data_cmd(DataCommand::Unsubscribe(command));
2389    }
2390
2391    /// Helper method for unsubscribing from instruments.
2392    pub fn unsubscribe_instruments(
2393        &self,
2394        venue: Venue,
2395        client_id: Option<ClientId>,
2396        params: Option<IndexMap<String, String>>,
2397    ) {
2398        self.check_registered();
2399
2400        let topic = get_instruments_topic(venue);
2401        if let Some(handler) = self.topic_handlers.get(&topic) {
2402            msgbus::unsubscribe_topic(topic, handler.clone());
2403        };
2404
2405        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2406            client_id,
2407            venue,
2408            command_id: UUID4::new(),
2409            ts_init: self.timestamp_ns(),
2410            params,
2411        });
2412
2413        self.send_data_cmd(DataCommand::Unsubscribe(command));
2414    }
2415
2416    /// Helper method for unsubscribing from instrument.
2417    pub fn unsubscribe_instrument(
2418        &self,
2419        instrument_id: InstrumentId,
2420        client_id: Option<ClientId>,
2421        params: Option<IndexMap<String, String>>,
2422    ) {
2423        self.check_registered();
2424
2425        let topic = get_instrument_topic(instrument_id);
2426        if let Some(handler) = self.topic_handlers.get(&topic) {
2427            msgbus::unsubscribe_topic(topic, handler.clone());
2428        };
2429
2430        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2431            instrument_id,
2432            client_id,
2433            venue: Some(instrument_id.venue),
2434            command_id: UUID4::new(),
2435            ts_init: self.timestamp_ns(),
2436            params,
2437        });
2438
2439        self.send_data_cmd(DataCommand::Unsubscribe(command));
2440    }
2441
2442    /// Helper method for unsubscribing from book deltas.
2443    pub fn unsubscribe_book_deltas(
2444        &self,
2445        instrument_id: InstrumentId,
2446        client_id: Option<ClientId>,
2447        params: Option<IndexMap<String, String>>,
2448    ) {
2449        self.check_registered();
2450
2451        let topic = get_book_deltas_topic(instrument_id);
2452        if let Some(handler) = self.topic_handlers.get(&topic) {
2453            msgbus::unsubscribe_topic(topic, handler.clone());
2454        };
2455
2456        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2457            instrument_id,
2458            client_id,
2459            venue: Some(instrument_id.venue),
2460            command_id: UUID4::new(),
2461            ts_init: self.timestamp_ns(),
2462            params,
2463        });
2464
2465        self.send_data_cmd(DataCommand::Unsubscribe(command));
2466    }
2467
2468    /// Helper method for unsubscribing from book snapshots at interval.
2469    pub fn unsubscribe_book_at_interval(
2470        &mut self,
2471        instrument_id: InstrumentId,
2472        interval_ms: NonZeroUsize,
2473        client_id: Option<ClientId>,
2474        params: Option<IndexMap<String, String>>,
2475    ) {
2476        self.check_registered();
2477
2478        let topic = get_book_snapshots_topic(instrument_id);
2479        if let Some(handler) = self.topic_handlers.get(&topic) {
2480            msgbus::unsubscribe_topic(topic, handler.clone());
2481        };
2482
2483        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2484            instrument_id,
2485            client_id,
2486            venue: Some(instrument_id.venue),
2487            command_id: UUID4::new(),
2488            ts_init: self.timestamp_ns(),
2489            params,
2490        });
2491
2492        self.send_data_cmd(DataCommand::Unsubscribe(command));
2493    }
2494
2495    /// Helper method for unsubscribing from quotes.
2496    pub fn unsubscribe_quotes(
2497        &self,
2498        instrument_id: InstrumentId,
2499        client_id: Option<ClientId>,
2500        params: Option<IndexMap<String, String>>,
2501    ) {
2502        self.check_registered();
2503
2504        let topic = get_quotes_topic(instrument_id);
2505        if let Some(handler) = self.topic_handlers.get(&topic) {
2506            msgbus::unsubscribe_topic(topic, handler.clone());
2507        };
2508
2509        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2510            instrument_id,
2511            client_id,
2512            venue: Some(instrument_id.venue),
2513            command_id: UUID4::new(),
2514            ts_init: self.timestamp_ns(),
2515            params,
2516        });
2517
2518        self.send_data_cmd(DataCommand::Unsubscribe(command));
2519    }
2520
2521    /// Helper method for unsubscribing from trades.
2522    pub fn unsubscribe_trades(
2523        &self,
2524        instrument_id: InstrumentId,
2525        client_id: Option<ClientId>,
2526        params: Option<IndexMap<String, String>>,
2527    ) {
2528        self.check_registered();
2529
2530        let topic = get_trades_topic(instrument_id);
2531        if let Some(handler) = self.topic_handlers.get(&topic) {
2532            msgbus::unsubscribe_topic(topic, handler.clone());
2533        };
2534
2535        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2536            instrument_id,
2537            client_id,
2538            venue: Some(instrument_id.venue),
2539            command_id: UUID4::new(),
2540            ts_init: self.timestamp_ns(),
2541            params,
2542        });
2543
2544        self.send_data_cmd(DataCommand::Unsubscribe(command));
2545    }
2546
2547    /// Helper method for unsubscribing from bars.
2548    pub fn unsubscribe_bars(
2549        &mut self,
2550        bar_type: BarType,
2551        client_id: Option<ClientId>,
2552        params: Option<IndexMap<String, String>>,
2553    ) {
2554        self.check_registered();
2555
2556        let topic = get_bars_topic(bar_type);
2557        if let Some(handler) = self.topic_handlers.get(&topic) {
2558            msgbus::unsubscribe_topic(topic, handler.clone());
2559        };
2560
2561        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2562            bar_type,
2563            client_id,
2564            venue: Some(bar_type.instrument_id().venue),
2565            command_id: UUID4::new(),
2566            ts_init: self.timestamp_ns(),
2567            params,
2568        });
2569
2570        self.send_data_cmd(DataCommand::Unsubscribe(command));
2571    }
2572
2573    /// Helper method for unsubscribing from mark prices.
2574    pub fn unsubscribe_mark_prices(
2575        &self,
2576        instrument_id: InstrumentId,
2577        client_id: Option<ClientId>,
2578        params: Option<IndexMap<String, String>>,
2579    ) {
2580        self.check_registered();
2581
2582        let topic = get_mark_price_topic(instrument_id);
2583        if let Some(handler) = self.topic_handlers.get(&topic) {
2584            msgbus::unsubscribe_topic(topic, handler.clone());
2585        };
2586
2587        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2588            instrument_id,
2589            client_id,
2590            venue: Some(instrument_id.venue),
2591            command_id: UUID4::new(),
2592            ts_init: self.timestamp_ns(),
2593            params,
2594        });
2595
2596        self.send_data_cmd(DataCommand::Unsubscribe(command));
2597    }
2598
2599    /// Helper method for unsubscribing from index prices.
2600    pub fn unsubscribe_index_prices(
2601        &self,
2602        instrument_id: InstrumentId,
2603        client_id: Option<ClientId>,
2604        params: Option<IndexMap<String, String>>,
2605    ) {
2606        self.check_registered();
2607
2608        let topic = get_index_price_topic(instrument_id);
2609        if let Some(handler) = self.topic_handlers.get(&topic) {
2610            msgbus::unsubscribe_topic(topic, handler.clone());
2611        };
2612
2613        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2614            instrument_id,
2615            client_id,
2616            venue: Some(instrument_id.venue),
2617            command_id: UUID4::new(),
2618            ts_init: self.timestamp_ns(),
2619            params,
2620        });
2621
2622        self.send_data_cmd(DataCommand::Unsubscribe(command));
2623    }
2624
2625    /// Helper method for unsubscribing from instrument status.
2626    pub fn unsubscribe_instrument_status(
2627        &self,
2628        instrument_id: InstrumentId,
2629        client_id: Option<ClientId>,
2630        params: Option<IndexMap<String, String>>,
2631    ) {
2632        self.check_registered();
2633
2634        let topic = get_instrument_status_topic(instrument_id);
2635        if let Some(handler) = self.topic_handlers.get(&topic) {
2636            msgbus::unsubscribe_topic(topic, handler.clone());
2637        };
2638
2639        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
2640            instrument_id,
2641            client_id,
2642            venue: Some(instrument_id.venue),
2643            command_id: UUID4::new(),
2644            ts_init: self.timestamp_ns(),
2645            params,
2646        });
2647
2648        self.send_data_cmd(DataCommand::Unsubscribe(command));
2649    }
2650
2651    /// Helper method for unsubscribing from instrument close.
2652    pub fn unsubscribe_instrument_close(
2653        &self,
2654        instrument_id: InstrumentId,
2655        client_id: Option<ClientId>,
2656        params: Option<IndexMap<String, String>>,
2657    ) {
2658        self.check_registered();
2659
2660        let topic = get_instrument_close_topic(instrument_id);
2661        if let Some(handler) = self.topic_handlers.get(&topic) {
2662            msgbus::unsubscribe_topic(topic, handler.clone());
2663        };
2664
2665        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
2666            instrument_id,
2667            client_id,
2668            venue: Some(instrument_id.venue),
2669            command_id: UUID4::new(),
2670            ts_init: self.timestamp_ns(),
2671            params,
2672        });
2673
2674        self.send_data_cmd(DataCommand::Unsubscribe(command));
2675    }
2676
2677    #[cfg(feature = "defi")]
2678    /// Helper method for unsubscribing from blocks.
2679    pub fn unsubscribe_blocks(
2680        &mut self,
2681        chain: Blockchain,
2682        client_id: Option<ClientId>,
2683        params: Option<IndexMap<String, String>>,
2684    ) {
2685        use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribeBlocks};
2686
2687        self.check_registered();
2688
2689        let topic = get_defi_blocks_topic(chain);
2690        if let Some(handler) = self.topic_handlers.get(&topic) {
2691            msgbus::unsubscribe_topic(topic, handler.clone());
2692        };
2693
2694        let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
2695            chain,
2696            client_id,
2697            command_id: UUID4::new(),
2698            ts_init: self.timestamp_ns(),
2699            params,
2700        });
2701
2702        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2703    }
2704
2705    #[cfg(feature = "defi")]
2706    /// Helper method for unsubscribing from pool definition updates.
2707    pub fn unsubscribe_pool(
2708        &mut self,
2709        address: Address,
2710        client_id: Option<ClientId>,
2711        params: Option<IndexMap<String, String>>,
2712    ) {
2713        use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePool};
2714
2715        self.check_registered();
2716
2717        let topic = get_defi_pool_topic(address);
2718        if let Some(handler) = self.topic_handlers.get(&topic) {
2719            msgbus::unsubscribe_topic(topic, handler.clone());
2720        };
2721
2722        let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
2723            address,
2724            client_id,
2725            command_id: UUID4::new(),
2726            ts_init: self.timestamp_ns(),
2727            params,
2728        });
2729
2730        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2731    }
2732
2733    #[cfg(feature = "defi")]
2734    /// Helper method for unsubscribing from pool swaps.
2735    pub fn unsubscribe_pool_swaps(
2736        &mut self,
2737        address: Address,
2738        client_id: Option<ClientId>,
2739        params: Option<IndexMap<String, String>>,
2740    ) {
2741        use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePoolSwaps};
2742
2743        self.check_registered();
2744
2745        let topic = get_defi_pool_swaps_topic(address);
2746        if let Some(handler) = self.topic_handlers.get(&topic) {
2747            msgbus::unsubscribe_topic(topic, handler.clone());
2748        };
2749
2750        let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
2751            address,
2752            client_id,
2753            command_id: UUID4::new(),
2754            ts_init: self.timestamp_ns(),
2755            params,
2756        });
2757
2758        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2759    }
2760
2761    #[cfg(feature = "defi")]
2762    /// Helper method for unsubscribing from pool liquidity updates.
2763    pub fn unsubscribe_pool_liquidity_updates(
2764        &mut self,
2765        address: Address,
2766        client_id: Option<ClientId>,
2767        params: Option<IndexMap<String, String>>,
2768    ) {
2769        use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePoolLiquidityUpdates};
2770
2771        self.check_registered();
2772
2773        let topic = get_defi_liquidity_topic(address);
2774        if let Some(handler) = self.topic_handlers.get(&topic) {
2775            msgbus::unsubscribe_topic(topic, handler.clone());
2776        };
2777
2778        let command =
2779            DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
2780                address,
2781                client_id,
2782                command_id: UUID4::new(),
2783                ts_init: self.timestamp_ns(),
2784                params,
2785            });
2786
2787        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2788    }
2789
2790    /// Helper method for requesting data.
2791    ///
2792    /// # Errors
2793    ///
2794    /// Returns an error if input parameters are invalid.
2795    pub fn request_data(
2796        &self,
2797        data_type: DataType,
2798        client_id: ClientId,
2799        start: Option<DateTime<Utc>>,
2800        end: Option<DateTime<Utc>>,
2801        limit: Option<NonZeroUsize>,
2802        params: Option<IndexMap<String, String>>,
2803        handler: ShareableMessageHandler,
2804    ) -> anyhow::Result<UUID4> {
2805        self.check_registered();
2806
2807        let now = self.clock_ref().utc_now();
2808        check_timestamps(now, start, end)?;
2809
2810        let request_id = UUID4::new();
2811        let command = RequestCommand::Data(RequestCustomData {
2812            client_id,
2813            data_type,
2814            request_id,
2815            ts_init: self.timestamp_ns(),
2816            params,
2817        });
2818
2819        let msgbus = get_message_bus()
2820            .borrow_mut()
2821            .register_response_handler(command.request_id(), handler);
2822
2823        self.send_data_cmd(DataCommand::Request(command));
2824
2825        Ok(request_id)
2826    }
2827
2828    /// Helper method for requesting instrument.
2829    ///
2830    /// # Errors
2831    ///
2832    /// Returns an error if input parameters are invalid.
2833    pub fn request_instrument(
2834        &self,
2835        instrument_id: InstrumentId,
2836        start: Option<DateTime<Utc>>,
2837        end: Option<DateTime<Utc>>,
2838        client_id: Option<ClientId>,
2839        params: Option<IndexMap<String, String>>,
2840        handler: ShareableMessageHandler,
2841    ) -> anyhow::Result<UUID4> {
2842        self.check_registered();
2843
2844        let now = self.clock_ref().utc_now();
2845        check_timestamps(now, start, end)?;
2846
2847        let request_id = UUID4::new();
2848        let command = RequestCommand::Instrument(RequestInstrument {
2849            instrument_id,
2850            start,
2851            end,
2852            client_id,
2853            request_id,
2854            ts_init: now.into(),
2855            params,
2856        });
2857
2858        let msgbus = get_message_bus()
2859            .borrow_mut()
2860            .register_response_handler(command.request_id(), handler);
2861
2862        self.send_data_cmd(DataCommand::Request(command));
2863
2864        Ok(request_id)
2865    }
2866
2867    /// Helper method for requesting instruments.
2868    ///
2869    /// # Errors
2870    ///
2871    /// Returns an error if input parameters are invalid.
2872    pub fn request_instruments(
2873        &self,
2874        venue: Option<Venue>,
2875        start: Option<DateTime<Utc>>,
2876        end: Option<DateTime<Utc>>,
2877        client_id: Option<ClientId>,
2878        params: Option<IndexMap<String, String>>,
2879        handler: ShareableMessageHandler,
2880    ) -> anyhow::Result<UUID4> {
2881        self.check_registered();
2882
2883        let now = self.clock_ref().utc_now();
2884        check_timestamps(now, start, end)?;
2885
2886        let request_id = UUID4::new();
2887        let command = RequestCommand::Instruments(RequestInstruments {
2888            venue,
2889            start,
2890            end,
2891            client_id,
2892            request_id,
2893            ts_init: now.into(),
2894            params,
2895        });
2896
2897        let msgbus = get_message_bus()
2898            .borrow_mut()
2899            .register_response_handler(command.request_id(), handler);
2900
2901        self.send_data_cmd(DataCommand::Request(command));
2902
2903        Ok(request_id)
2904    }
2905
2906    /// Helper method for requesting book snapshot.
2907    ///
2908    /// # Errors
2909    ///
2910    /// Returns an error if input parameters are invalid.
2911    pub fn request_book_snapshot(
2912        &self,
2913        instrument_id: InstrumentId,
2914        depth: Option<NonZeroUsize>,
2915        client_id: Option<ClientId>,
2916        params: Option<IndexMap<String, String>>,
2917        handler: ShareableMessageHandler,
2918    ) -> anyhow::Result<UUID4> {
2919        self.check_registered();
2920
2921        let request_id = UUID4::new();
2922        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
2923            instrument_id,
2924            depth,
2925            client_id,
2926            request_id,
2927            ts_init: self.timestamp_ns(),
2928            params,
2929        });
2930
2931        let msgbus = get_message_bus()
2932            .borrow_mut()
2933            .register_response_handler(command.request_id(), handler);
2934
2935        self.send_data_cmd(DataCommand::Request(command));
2936
2937        Ok(request_id)
2938    }
2939
2940    /// Helper method for requesting quotes.
2941    ///
2942    /// # Errors
2943    ///
2944    /// Returns an error if input parameters are invalid.
2945    pub fn request_quotes(
2946        &self,
2947        instrument_id: InstrumentId,
2948        start: Option<DateTime<Utc>>,
2949        end: Option<DateTime<Utc>>,
2950        limit: Option<NonZeroUsize>,
2951        client_id: Option<ClientId>,
2952        params: Option<IndexMap<String, String>>,
2953        handler: ShareableMessageHandler,
2954    ) -> anyhow::Result<UUID4> {
2955        self.check_registered();
2956
2957        let now = self.clock_ref().utc_now();
2958        check_timestamps(now, start, end)?;
2959
2960        let request_id = UUID4::new();
2961        let command = RequestCommand::Quotes(RequestQuotes {
2962            instrument_id,
2963            start,
2964            end,
2965            limit,
2966            client_id,
2967            request_id,
2968            ts_init: now.into(),
2969            params,
2970        });
2971
2972        let msgbus = get_message_bus()
2973            .borrow_mut()
2974            .register_response_handler(command.request_id(), handler);
2975
2976        self.send_data_cmd(DataCommand::Request(command));
2977
2978        Ok(request_id)
2979    }
2980
2981    /// Helper method for requesting trades.
2982    ///
2983    /// # Errors
2984    ///
2985    /// Returns an error if input parameters are invalid.
2986    pub fn request_trades(
2987        &self,
2988        instrument_id: InstrumentId,
2989        start: Option<DateTime<Utc>>,
2990        end: Option<DateTime<Utc>>,
2991        limit: Option<NonZeroUsize>,
2992        client_id: Option<ClientId>,
2993        params: Option<IndexMap<String, String>>,
2994        handler: ShareableMessageHandler,
2995    ) -> anyhow::Result<UUID4> {
2996        self.check_registered();
2997
2998        let now = self.clock_ref().utc_now();
2999        check_timestamps(now, start, end)?;
3000
3001        let request_id = UUID4::new();
3002        let command = RequestCommand::Trades(RequestTrades {
3003            instrument_id,
3004            start,
3005            end,
3006            limit,
3007            client_id,
3008            request_id,
3009            ts_init: now.into(),
3010            params,
3011        });
3012
3013        let msgbus = get_message_bus()
3014            .borrow_mut()
3015            .register_response_handler(command.request_id(), handler);
3016
3017        self.send_data_cmd(DataCommand::Request(command));
3018
3019        Ok(request_id)
3020    }
3021
3022    /// Helper method for requesting bars.
3023    ///
3024    /// # Errors
3025    ///
3026    /// Returns an error if input parameters are invalid.
3027    pub fn request_bars(
3028        &self,
3029        bar_type: BarType,
3030        start: Option<DateTime<Utc>>,
3031        end: Option<DateTime<Utc>>,
3032        limit: Option<NonZeroUsize>,
3033        client_id: Option<ClientId>,
3034        params: Option<IndexMap<String, String>>,
3035        handler: ShareableMessageHandler,
3036    ) -> anyhow::Result<UUID4> {
3037        self.check_registered();
3038
3039        let now = self.clock_ref().utc_now();
3040        check_timestamps(now, start, end)?;
3041
3042        let request_id = UUID4::new();
3043        let command = RequestCommand::Bars(RequestBars {
3044            bar_type,
3045            start,
3046            end,
3047            limit,
3048            client_id,
3049            request_id,
3050            ts_init: now.into(),
3051            params,
3052        });
3053
3054        let msgbus = get_message_bus()
3055            .borrow_mut()
3056            .register_response_handler(command.request_id(), handler);
3057
3058        self.send_data_cmd(DataCommand::Request(command));
3059
3060        Ok(request_id)
3061    }
3062}
3063
3064fn check_timestamps(
3065    now: DateTime<Utc>,
3066    start: Option<DateTime<Utc>>,
3067    end: Option<DateTime<Utc>>,
3068) -> anyhow::Result<()> {
3069    if let Some(start) = start {
3070        check_predicate_true(start <= now, "start was > now")?
3071    }
3072    if let Some(end) = end {
3073        check_predicate_true(end <= now, "end was > now")?
3074    }
3075
3076    if let (Some(start), Some(end)) = (start, end) {
3077        check_predicate_true(start < end, "start was >= end")?
3078    }
3079
3080    Ok(())
3081}
3082
3083fn log_error(e: &anyhow::Error) {
3084    log::error!("{e}");
3085}
3086
3087fn log_not_running<T>(msg: &T)
3088where
3089    T: Debug,
3090{
3091    // TODO: Potentially temporary for development? drop level at some stage
3092    log::warn!("Received message when not running - skipping {msg:?}");
3093}
3094
3095fn log_received<T>(msg: &T)
3096where
3097    T: Debug,
3098{
3099    log::debug!("{RECV} {msg:?}");
3100}