1pub mod config;
21pub mod database;
22
23mod index;
24
25#[cfg(test)]
26mod tests;
27
28use std::{
29 collections::{HashMap, HashSet, VecDeque},
30 fmt::Debug,
31 time::{SystemTime, UNIX_EPOCH},
32};
33
34use bytes::Bytes;
35pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
37use index::CacheIndex;
38use nautilus_core::{
39 UUID4, UnixNanos,
40 correctness::{
41 check_key_not_in_map, check_predicate_false, check_slice_not_empty, check_valid_string,
42 },
43 datetime::secs_to_nanos,
44};
45use nautilus_model::{
46 accounts::{Account, AccountAny},
47 data::{
48 Bar, BarType, GreeksData, QuoteTick, TradeTick, YieldCurveData,
49 prices::{IndexPriceUpdate, MarkPriceUpdate},
50 },
51 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
52 identifiers::{
53 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
54 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
55 },
56 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
57 orderbook::{OrderBook, own::OwnOrderBook},
58 orders::{Order, OrderAny, OrderList},
59 position::Position,
60 types::{Currency, Money, Price, Quantity},
61};
62use ustr::Ustr;
63
64use crate::xrate::get_exchange_rate;
65
66pub struct Cache {
68 config: CacheConfig,
69 index: CacheIndex,
70 database: Option<Box<dyn CacheDatabaseAdapter>>,
71 general: HashMap<String, Bytes>,
72 currencies: HashMap<Ustr, Currency>,
73 instruments: HashMap<InstrumentId, InstrumentAny>,
74 synthetics: HashMap<InstrumentId, SyntheticInstrument>,
75 books: HashMap<InstrumentId, OrderBook>,
76 own_books: HashMap<InstrumentId, OwnOrderBook>,
77 quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
78 trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
79 mark_xrates: HashMap<(Currency, Currency), f64>,
80 mark_prices: HashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
81 index_prices: HashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
82 bars: HashMap<BarType, VecDeque<Bar>>,
83 greeks: HashMap<InstrumentId, GreeksData>,
84 yield_curves: HashMap<String, YieldCurveData>,
85 accounts: HashMap<AccountId, AccountAny>,
86 orders: HashMap<ClientOrderId, OrderAny>,
87 order_lists: HashMap<OrderListId, OrderList>,
88 positions: HashMap<PositionId, Position>,
89 position_snapshots: HashMap<PositionId, Bytes>,
90}
91
92impl Debug for Cache {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 f.debug_struct(stringify!(Cache))
95 .field("config", &self.config)
96 .field("index", &self.index)
97 .field("general", &self.general)
98 .field("currencies", &self.currencies)
99 .field("instruments", &self.instruments)
100 .field("synthetics", &self.synthetics)
101 .field("books", &self.books)
102 .field("own_books", &self.own_books)
103 .field("quotes", &self.quotes)
104 .field("trades", &self.trades)
105 .field("mark_xrates", &self.mark_xrates)
106 .field("mark_prices", &self.mark_prices)
107 .field("index_prices", &self.index_prices)
108 .field("bars", &self.bars)
109 .field("greeks", &self.greeks)
110 .field("yield_curves", &self.yield_curves)
111 .field("accounts", &self.accounts)
112 .field("orders", &self.orders)
113 .field("order_lists", &self.order_lists)
114 .field("positions", &self.positions)
115 .field("position_snapshots", &self.position_snapshots)
116 .finish()
117 }
118}
119
120impl Default for Cache {
121 fn default() -> Self {
123 Self::new(Some(CacheConfig::default()), None)
124 }
125}
126
127impl Cache {
128 #[must_use]
130 pub fn new(
134 config: Option<CacheConfig>,
135 database: Option<Box<dyn CacheDatabaseAdapter>>,
136 ) -> Self {
137 Self {
138 config: config.unwrap_or_default(),
139 index: CacheIndex::default(),
140 database,
141 general: HashMap::new(),
142 currencies: HashMap::new(),
143 instruments: HashMap::new(),
144 synthetics: HashMap::new(),
145 books: HashMap::new(),
146 own_books: HashMap::new(),
147 quotes: HashMap::new(),
148 trades: HashMap::new(),
149 mark_xrates: HashMap::new(),
150 mark_prices: HashMap::new(),
151 index_prices: HashMap::new(),
152 bars: HashMap::new(),
153 greeks: HashMap::new(),
154 yield_curves: HashMap::new(),
155 accounts: HashMap::new(),
156 orders: HashMap::new(),
157 order_lists: HashMap::new(),
158 positions: HashMap::new(),
159 position_snapshots: HashMap::new(),
160 }
161 }
162
163 #[must_use]
165 pub fn memory_address(&self) -> String {
166 format!("{:?}", std::ptr::from_ref(self))
167 }
168
169 pub fn cache_general(&mut self) -> anyhow::Result<()> {
177 self.general = match &mut self.database {
178 Some(db) => db.load()?,
179 None => HashMap::new(),
180 };
181
182 log::info!(
183 "Cached {} general object(s) from database",
184 self.general.len()
185 );
186 Ok(())
187 }
188
189 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
195 let cache_map = match &self.database {
196 Some(db) => db.load_all().await?,
197 None => CacheMap::default(),
198 };
199
200 self.currencies = cache_map.currencies;
201 self.instruments = cache_map.instruments;
202 self.synthetics = cache_map.synthetics;
203 self.accounts = cache_map.accounts;
204 self.orders = cache_map.orders;
205 self.positions = cache_map.positions;
206 Ok(())
207 }
208
209 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
215 self.currencies = match &mut self.database {
216 Some(db) => db.load_currencies().await?,
217 None => HashMap::new(),
218 };
219
220 log::info!("Cached {} currencies from database", self.general.len());
221 Ok(())
222 }
223
224 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
230 self.instruments = match &mut self.database {
231 Some(db) => db.load_instruments().await?,
232 None => HashMap::new(),
233 };
234
235 log::info!("Cached {} instruments from database", self.general.len());
236 Ok(())
237 }
238
239 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
245 self.synthetics = match &mut self.database {
246 Some(db) => db.load_synthetics().await?,
247 None => HashMap::new(),
248 };
249
250 log::info!(
251 "Cached {} synthetic instruments from database",
252 self.general.len()
253 );
254 Ok(())
255 }
256
257 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
263 self.accounts = match &mut self.database {
264 Some(db) => db.load_accounts().await?,
265 None => HashMap::new(),
266 };
267
268 log::info!(
269 "Cached {} synthetic instruments from database",
270 self.general.len()
271 );
272 Ok(())
273 }
274
275 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
281 self.orders = match &mut self.database {
282 Some(db) => db.load_orders().await?,
283 None => HashMap::new(),
284 };
285
286 log::info!("Cached {} orders from database", self.general.len());
287 Ok(())
288 }
289
290 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
296 self.positions = match &mut self.database {
297 Some(db) => db.load_positions().await?,
298 None => HashMap::new(),
299 };
300
301 log::info!("Cached {} positions from database", self.general.len());
302 Ok(())
303 }
304
305 pub fn build_index(&mut self) {
307 log::debug!("Building index");
308
309 for account_id in self.accounts.keys() {
311 self.index
312 .venue_account
313 .insert(account_id.get_issuer(), *account_id);
314 }
315
316 for (client_order_id, order) in &self.orders {
318 let instrument_id = order.instrument_id();
319 let venue = instrument_id.venue;
320 let strategy_id = order.strategy_id();
321
322 self.index
324 .venue_orders
325 .entry(venue)
326 .or_default()
327 .insert(*client_order_id);
328
329 if let Some(venue_order_id) = order.venue_order_id() {
331 self.index
332 .venue_order_ids
333 .insert(venue_order_id, *client_order_id);
334 }
335
336 if let Some(position_id) = order.position_id() {
338 self.index
339 .order_position
340 .insert(*client_order_id, position_id);
341 }
342
343 self.index
345 .order_strategy
346 .insert(*client_order_id, order.strategy_id());
347
348 self.index
350 .instrument_orders
351 .entry(instrument_id)
352 .or_default()
353 .insert(*client_order_id);
354
355 self.index
357 .strategy_orders
358 .entry(strategy_id)
359 .or_default()
360 .insert(*client_order_id);
361
362 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
364 self.index
365 .exec_algorithm_orders
366 .entry(exec_algorithm_id)
367 .or_default()
368 .insert(*client_order_id);
369 }
370
371 if let Some(exec_spawn_id) = order.exec_spawn_id() {
373 self.index
374 .exec_spawn_orders
375 .entry(exec_spawn_id)
376 .or_default()
377 .insert(*client_order_id);
378 }
379
380 self.index.orders.insert(*client_order_id);
382
383 if order.is_open() {
385 self.index.orders_open.insert(*client_order_id);
386 }
387
388 if order.is_closed() {
390 self.index.orders_closed.insert(*client_order_id);
391 }
392
393 if let Some(emulation_trigger) = order.emulation_trigger() {
395 if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
396 self.index.orders_emulated.insert(*client_order_id);
397 }
398 }
399
400 if order.is_inflight() {
402 self.index.orders_inflight.insert(*client_order_id);
403 }
404
405 self.index.strategies.insert(strategy_id);
407
408 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
410 self.index.exec_algorithms.insert(exec_algorithm_id);
411 }
412 }
413
414 for (position_id, position) in &self.positions {
416 let instrument_id = position.instrument_id;
417 let venue = instrument_id.venue;
418 let strategy_id = position.strategy_id;
419
420 self.index
422 .venue_positions
423 .entry(venue)
424 .or_default()
425 .insert(*position_id);
426
427 self.index
429 .position_strategy
430 .insert(*position_id, position.strategy_id);
431
432 self.index
434 .position_orders
435 .entry(*position_id)
436 .or_default()
437 .extend(position.client_order_ids().into_iter());
438
439 self.index
441 .instrument_positions
442 .entry(instrument_id)
443 .or_default()
444 .insert(*position_id);
445
446 self.index
448 .strategy_positions
449 .entry(strategy_id)
450 .or_default()
451 .insert(*position_id);
452
453 self.index.positions.insert(*position_id);
455
456 if position.is_open() {
458 self.index.positions_open.insert(*position_id);
459 }
460
461 if position.is_closed() {
463 self.index.positions_closed.insert(*position_id);
464 }
465
466 self.index.strategies.insert(strategy_id);
468 }
469 }
470
471 #[must_use]
473 pub const fn has_backing(&self) -> bool {
474 self.config.database.is_some()
475 }
476
477 #[must_use]
479 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
480 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
481 quote
482 } else {
483 log::warn!(
484 "Cannot calculate unrealized PnL for {}, no quotes for {}",
485 position.id,
486 position.instrument_id
487 );
488 return None;
489 };
490
491 let last = match position.side {
492 PositionSide::Flat | PositionSide::NoPositionSide => {
493 return Some(Money::new(0.0, position.settlement_currency));
494 }
495 PositionSide::Long => quote.ask_price,
496 PositionSide::Short => quote.bid_price,
497 };
498
499 Some(position.unrealized_pnl(last))
500 }
501
502 #[must_use]
511 pub fn check_integrity(&mut self) -> bool {
512 let mut error_count = 0;
513 let failure = "Integrity failure";
514
515 let timestamp_us = SystemTime::now()
517 .duration_since(UNIX_EPOCH)
518 .expect("Time went backwards")
519 .as_micros();
520
521 log::info!("Checking data integrity");
522
523 for account_id in self.accounts.keys() {
525 if !self
526 .index
527 .venue_account
528 .contains_key(&account_id.get_issuer())
529 {
530 log::error!(
531 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
532 );
533 error_count += 1;
534 }
535 }
536
537 for (client_order_id, order) in &self.orders {
538 if !self.index.order_strategy.contains_key(client_order_id) {
539 log::error!(
540 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
541 );
542 error_count += 1;
543 }
544 if !self.index.orders.contains(client_order_id) {
545 log::error!(
546 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
547 );
548 error_count += 1;
549 }
550 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
551 log::error!(
552 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
553 );
554 error_count += 1;
555 }
556 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
557 log::error!(
558 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
559 );
560 error_count += 1;
561 }
562 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
563 log::error!(
564 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
565 );
566 error_count += 1;
567 }
568 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
569 if !self
570 .index
571 .exec_algorithm_orders
572 .contains_key(&exec_algorithm_id)
573 {
574 log::error!(
575 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
576 );
577 error_count += 1;
578 }
579 if order.exec_spawn_id().is_none()
580 && !self.index.exec_spawn_orders.contains_key(client_order_id)
581 {
582 log::error!(
583 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
584 );
585 error_count += 1;
586 }
587 }
588 }
589
590 for (position_id, position) in &self.positions {
591 if !self.index.position_strategy.contains_key(position_id) {
592 log::error!(
593 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
594 );
595 error_count += 1;
596 }
597 if !self.index.position_orders.contains_key(position_id) {
598 log::error!(
599 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
600 );
601 error_count += 1;
602 }
603 if !self.index.positions.contains(position_id) {
604 log::error!(
605 "{failure} in positions: {position_id} not found in `self.index.positions`",
606 );
607 error_count += 1;
608 }
609 if position.is_open() && !self.index.positions_open.contains(position_id) {
610 log::error!(
611 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
612 );
613 error_count += 1;
614 }
615 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
616 log::error!(
617 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
618 );
619 error_count += 1;
620 }
621 }
622
623 for account_id in self.index.venue_account.values() {
625 if !self.accounts.contains_key(account_id) {
626 log::error!(
627 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
628 );
629 error_count += 1;
630 }
631 }
632
633 for client_order_id in self.index.venue_order_ids.values() {
634 if !self.orders.contains_key(client_order_id) {
635 log::error!(
636 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
637 );
638 error_count += 1;
639 }
640 }
641
642 for client_order_id in self.index.client_order_ids.keys() {
643 if !self.orders.contains_key(client_order_id) {
644 log::error!(
645 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
646 );
647 error_count += 1;
648 }
649 }
650
651 for client_order_id in self.index.order_position.keys() {
652 if !self.orders.contains_key(client_order_id) {
653 log::error!(
654 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
655 );
656 error_count += 1;
657 }
658 }
659
660 for client_order_id in self.index.order_strategy.keys() {
662 if !self.orders.contains_key(client_order_id) {
663 log::error!(
664 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
665 );
666 error_count += 1;
667 }
668 }
669
670 for position_id in self.index.position_strategy.keys() {
671 if !self.positions.contains_key(position_id) {
672 log::error!(
673 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
674 );
675 error_count += 1;
676 }
677 }
678
679 for position_id in self.index.position_orders.keys() {
680 if !self.positions.contains_key(position_id) {
681 log::error!(
682 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
683 );
684 error_count += 1;
685 }
686 }
687
688 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
689 for client_order_id in client_order_ids {
690 if !self.orders.contains_key(client_order_id) {
691 log::error!(
692 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
693 );
694 error_count += 1;
695 }
696 }
697 }
698
699 for instrument_id in self.index.instrument_positions.keys() {
700 if !self.index.instrument_orders.contains_key(instrument_id) {
701 log::error!(
702 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
703 );
704 error_count += 1;
705 }
706 }
707
708 for client_order_ids in self.index.strategy_orders.values() {
709 for client_order_id in client_order_ids {
710 if !self.orders.contains_key(client_order_id) {
711 log::error!(
712 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
713 );
714 error_count += 1;
715 }
716 }
717 }
718
719 for position_ids in self.index.strategy_positions.values() {
720 for position_id in position_ids {
721 if !self.positions.contains_key(position_id) {
722 log::error!(
723 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
724 );
725 error_count += 1;
726 }
727 }
728 }
729
730 for client_order_id in &self.index.orders {
731 if !self.orders.contains_key(client_order_id) {
732 log::error!(
733 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
734 );
735 error_count += 1;
736 }
737 }
738
739 for client_order_id in &self.index.orders_emulated {
740 if !self.orders.contains_key(client_order_id) {
741 log::error!(
742 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
743 );
744 error_count += 1;
745 }
746 }
747
748 for client_order_id in &self.index.orders_inflight {
749 if !self.orders.contains_key(client_order_id) {
750 log::error!(
751 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
752 );
753 error_count += 1;
754 }
755 }
756
757 for client_order_id in &self.index.orders_open {
758 if !self.orders.contains_key(client_order_id) {
759 log::error!(
760 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
761 );
762 error_count += 1;
763 }
764 }
765
766 for client_order_id in &self.index.orders_closed {
767 if !self.orders.contains_key(client_order_id) {
768 log::error!(
769 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
770 );
771 error_count += 1;
772 }
773 }
774
775 for position_id in &self.index.positions {
776 if !self.positions.contains_key(position_id) {
777 log::error!(
778 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
779 );
780 error_count += 1;
781 }
782 }
783
784 for position_id in &self.index.positions_open {
785 if !self.positions.contains_key(position_id) {
786 log::error!(
787 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
788 );
789 error_count += 1;
790 }
791 }
792
793 for position_id in &self.index.positions_closed {
794 if !self.positions.contains_key(position_id) {
795 log::error!(
796 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
797 );
798 error_count += 1;
799 }
800 }
801
802 for strategy_id in &self.index.strategies {
803 if !self.index.strategy_orders.contains_key(strategy_id) {
804 log::error!(
805 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
806 );
807 error_count += 1;
808 }
809 }
810
811 for exec_algorithm_id in &self.index.exec_algorithms {
812 if !self
813 .index
814 .exec_algorithm_orders
815 .contains_key(exec_algorithm_id)
816 {
817 log::error!(
818 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
819 );
820 error_count += 1;
821 }
822 }
823
824 let total_us = SystemTime::now()
825 .duration_since(UNIX_EPOCH)
826 .expect("Time went backwards")
827 .as_micros()
828 - timestamp_us;
829
830 if error_count == 0 {
831 log::info!("Integrity check passed in {total_us}μs");
832 true
833 } else {
834 log::error!(
835 "Integrity check failed with {error_count} error{} in {total_us}μs",
836 if error_count == 1 { "" } else { "s" },
837 );
838 false
839 }
840 }
841
842 #[must_use]
846 pub fn check_residuals(&self) -> bool {
847 log::debug!("Checking residuals");
848
849 let mut residuals = false;
850
851 for order in self.orders_open(None, None, None, None) {
853 residuals = true;
854 log::warn!("Residual {order:?}");
855 }
856
857 for position in self.positions_open(None, None, None, None) {
859 residuals = true;
860 log::warn!("Residual {position}");
861 }
862
863 residuals
864 }
865
866 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
872 log::debug!(
873 "Purging closed orders{}",
874 if buffer_secs > 0 {
875 format!(" with buffer_secs={buffer_secs}")
876 } else {
877 String::new()
878 }
879 );
880
881 let buffer_ns = secs_to_nanos(buffer_secs as f64);
882
883 for client_order_id in self.index.orders_closed.clone() {
884 if let Some(order) = self.orders.get(&client_order_id) {
885 if let Some(ts_closed) = order.ts_closed() {
886 if ts_closed + buffer_ns <= ts_now {
887 self.purge_order(client_order_id);
888 }
889 }
890 }
891 }
892 }
893
894 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
896 log::debug!(
897 "Purging closed positions{}",
898 if buffer_secs > 0 {
899 format!(" with buffer_secs={buffer_secs}")
900 } else {
901 String::new()
902 }
903 );
904
905 let buffer_ns = secs_to_nanos(buffer_secs as f64);
906
907 for position_id in self.index.positions_closed.clone() {
908 if let Some(position) = self.positions.get(&position_id) {
909 if let Some(ts_closed) = position.ts_closed {
910 if ts_closed + buffer_ns <= ts_now {
911 self.purge_position(position_id);
912 }
913 }
914 }
915 }
916 }
917
918 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
922 if let Some(position_id) = self.index.order_position.get(&client_order_id) {
924 if let Some(position) = self.positions.get_mut(position_id) {
925 position.purge_events_for_order(client_order_id);
926 }
927 }
928
929 if let Some(order) = self.orders.remove(&client_order_id) {
930 if let Some(venue_orders) = self
932 .index
933 .venue_orders
934 .get_mut(&order.instrument_id().venue)
935 {
936 venue_orders.remove(&client_order_id);
937 }
938
939 if let Some(venue_order_id) = order.venue_order_id() {
941 self.index.venue_order_ids.remove(&venue_order_id);
942 }
943
944 if let Some(instrument_orders) =
946 self.index.instrument_orders.get_mut(&order.instrument_id())
947 {
948 instrument_orders.remove(&client_order_id);
949 }
950
951 if let Some(position_id) = order.position_id() {
953 if let Some(position_orders) = self.index.position_orders.get_mut(&position_id) {
954 position_orders.remove(&client_order_id);
955 }
956 }
957
958 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
960 if let Some(exec_algorithm_orders) =
961 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
962 {
963 exec_algorithm_orders.remove(&client_order_id);
964 }
965 }
966
967 log::info!("Purged order {client_order_id}");
968 } else {
969 log::warn!("Order {client_order_id} not found when purging");
970 }
971
972 self.index.order_position.remove(&client_order_id);
974 self.index.order_strategy.remove(&client_order_id);
975 self.index.order_client.remove(&client_order_id);
976 self.index.client_order_ids.remove(&client_order_id);
977 self.index.exec_spawn_orders.remove(&client_order_id);
978 self.index.orders.remove(&client_order_id);
979 self.index.orders_closed.remove(&client_order_id);
980 self.index.orders_emulated.remove(&client_order_id);
981 self.index.orders_inflight.remove(&client_order_id);
982 self.index.orders_pending_cancel.remove(&client_order_id);
983 }
984
985 pub fn purge_position(&mut self, position_id: PositionId) {
987 if let Some(position) = self.positions.remove(&position_id) {
988 if let Some(venue_positions) = self
990 .index
991 .venue_positions
992 .get_mut(&position.instrument_id.venue)
993 {
994 venue_positions.remove(&position_id);
995 }
996
997 if let Some(instrument_positions) = self
999 .index
1000 .instrument_positions
1001 .get_mut(&position.instrument_id)
1002 {
1003 instrument_positions.remove(&position_id);
1004 }
1005
1006 if let Some(strategy_positions) =
1008 self.index.strategy_positions.get_mut(&position.strategy_id)
1009 {
1010 strategy_positions.remove(&position_id);
1011 }
1012
1013 for client_order_id in position.client_order_ids() {
1015 self.index.order_position.remove(&client_order_id);
1016 }
1017
1018 log::info!("Purged position {position_id}");
1019 } else {
1020 log::warn!("Position {position_id} not found when purging");
1021 }
1022
1023 self.index.position_strategy.remove(&position_id);
1025 self.index.position_orders.remove(&position_id);
1026 self.index.positions.remove(&position_id);
1027 self.index.positions_open.remove(&position_id);
1028 self.index.positions_closed.remove(&position_id);
1029 }
1030
1031 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1036 log::debug!(
1037 "Purging account events{}",
1038 if lookback_secs > 0 {
1039 format!(" with lookback_secs={lookback_secs}")
1040 } else {
1041 String::new()
1042 }
1043 );
1044
1045 for account in self.accounts.values_mut() {
1046 let event_count = account.event_count();
1047 account.purge_account_events(ts_now, lookback_secs);
1048 let count_diff = event_count - account.event_count();
1049 if count_diff > 0 {
1050 log::info!(
1051 "Purged {} event(s) from account {}",
1052 count_diff,
1053 account.id()
1054 );
1055 }
1056 }
1057 }
1058
1059 pub fn clear_index(&mut self) {
1061 self.index.clear();
1062 log::debug!("Cleared index");
1063 }
1064
1065 pub fn reset(&mut self) {
1069 log::debug!("Resetting cache");
1070
1071 self.general.clear();
1072 self.currencies.clear();
1073 self.instruments.clear();
1074 self.synthetics.clear();
1075 self.books.clear();
1076 self.own_books.clear();
1077 self.quotes.clear();
1078 self.trades.clear();
1079 self.mark_xrates.clear();
1080 self.mark_prices.clear();
1081 self.index_prices.clear();
1082 self.bars.clear();
1083 self.accounts.clear();
1084 self.orders.clear();
1085 self.order_lists.clear();
1086 self.positions.clear();
1087 self.position_snapshots.clear();
1088 self.greeks.clear();
1089 self.yield_curves.clear();
1090
1091 self.clear_index();
1092
1093 log::info!("Reset cache");
1094 }
1095
1096 pub fn dispose(&mut self) {
1102 if let Some(database) = &mut self.database {
1103 database.close().expect("Failed to close database");
1104 }
1105 }
1106
1107 pub fn flush_db(&mut self) {
1113 if let Some(database) = &mut self.database {
1114 database.flush().expect("Failed to flush database");
1115 }
1116 }
1117
1118 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1126 check_valid_string(key, stringify!(key))?;
1127 check_predicate_false(value.is_empty(), stringify!(value))?;
1128
1129 log::debug!("Adding general {key}");
1130 self.general.insert(key.to_string(), value.clone());
1131
1132 if let Some(database) = &mut self.database {
1133 database.add(key.to_string(), value)?;
1134 }
1135 Ok(())
1136 }
1137
1138 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1144 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1145
1146 if self.config.save_market_data {
1147 if let Some(database) = &mut self.database {
1148 database.add_order_book(&book)?;
1149 }
1150 }
1151
1152 self.books.insert(book.instrument_id, book);
1153 Ok(())
1154 }
1155
1156 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1162 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1163
1164 self.own_books.insert(own_book.instrument_id, own_book);
1165 Ok(())
1166 }
1167
1168 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1174 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1175
1176 if self.config.save_market_data {
1177 }
1179
1180 let mark_prices_deque = self
1181 .mark_prices
1182 .entry(mark_price.instrument_id)
1183 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1184 mark_prices_deque.push_front(mark_price);
1185 Ok(())
1186 }
1187
1188 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1194 log::debug!(
1195 "Adding `IndexPriceUpdate` for {}",
1196 index_price.instrument_id
1197 );
1198
1199 if self.config.save_market_data {
1200 }
1202
1203 let index_prices_deque = self
1204 .index_prices
1205 .entry(index_price.instrument_id)
1206 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1207 index_prices_deque.push_front(index_price);
1208 Ok(())
1209 }
1210
1211 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1217 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1218
1219 if self.config.save_market_data {
1220 if let Some(database) = &mut self.database {
1221 database.add_quote("e)?;
1222 }
1223 }
1224
1225 let quotes_deque = self
1226 .quotes
1227 .entry(quote.instrument_id)
1228 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1229 quotes_deque.push_front(quote);
1230 Ok(())
1231 }
1232
1233 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1239 check_slice_not_empty(quotes, stringify!(quotes))?;
1240
1241 let instrument_id = quotes[0].instrument_id;
1242 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1243
1244 if self.config.save_market_data {
1245 if let Some(database) = &mut self.database {
1246 for quote in quotes {
1247 database.add_quote(quote)?;
1248 }
1249 }
1250 }
1251
1252 let quotes_deque = self
1253 .quotes
1254 .entry(instrument_id)
1255 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1256
1257 for quote in quotes {
1258 quotes_deque.push_front(*quote);
1259 }
1260 Ok(())
1261 }
1262
1263 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1269 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1270
1271 if self.config.save_market_data {
1272 if let Some(database) = &mut self.database {
1273 database.add_trade(&trade)?;
1274 }
1275 }
1276
1277 let trades_deque = self
1278 .trades
1279 .entry(trade.instrument_id)
1280 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1281 trades_deque.push_front(trade);
1282 Ok(())
1283 }
1284
1285 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1291 check_slice_not_empty(trades, stringify!(trades))?;
1292
1293 let instrument_id = trades[0].instrument_id;
1294 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1295
1296 if self.config.save_market_data {
1297 if let Some(database) = &mut self.database {
1298 for trade in trades {
1299 database.add_trade(trade)?;
1300 }
1301 }
1302 }
1303
1304 let trades_deque = self
1305 .trades
1306 .entry(instrument_id)
1307 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1308
1309 for trade in trades {
1310 trades_deque.push_front(*trade);
1311 }
1312 Ok(())
1313 }
1314
1315 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1321 log::debug!("Adding `Bar` {}", bar.bar_type);
1322
1323 if self.config.save_market_data {
1324 if let Some(database) = &mut self.database {
1325 database.add_bar(&bar)?;
1326 }
1327 }
1328
1329 let bars = self
1330 .bars
1331 .entry(bar.bar_type)
1332 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1333 bars.push_front(bar);
1334 Ok(())
1335 }
1336
1337 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1343 check_slice_not_empty(bars, stringify!(bars))?;
1344
1345 let bar_type = bars[0].bar_type;
1346 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1347
1348 if self.config.save_market_data {
1349 if let Some(database) = &mut self.database {
1350 for bar in bars {
1351 database.add_bar(bar)?;
1352 }
1353 }
1354 }
1355
1356 let bars_deque = self
1357 .bars
1358 .entry(bar_type)
1359 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1360
1361 for bar in bars {
1362 bars_deque.push_front(*bar);
1363 }
1364 Ok(())
1365 }
1366
1367 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1373 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1374
1375 if self.config.save_market_data {
1376 if let Some(_database) = &mut self.database {
1377 }
1379 }
1380
1381 self.greeks.insert(greeks.instrument_id, greeks);
1382 Ok(())
1383 }
1384
1385 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1387 self.greeks.get(instrument_id).cloned()
1388 }
1389
1390 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1396 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1397
1398 if self.config.save_market_data {
1399 if let Some(_database) = &mut self.database {
1400 }
1402 }
1403
1404 self.yield_curves
1405 .insert(yield_curve.curve_name.clone(), yield_curve);
1406 Ok(())
1407 }
1408
1409 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1411 self.yield_curves.get(key).map(|curve| {
1412 let curve_clone = curve.clone();
1413 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1414 as Box<dyn Fn(f64) -> f64>
1415 })
1416 }
1417
1418 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1424 log::debug!("Adding `Currency` {}", currency.code);
1425
1426 if let Some(database) = &mut self.database {
1427 database.add_currency(¤cy)?;
1428 }
1429
1430 self.currencies.insert(currency.code, currency);
1431 Ok(())
1432 }
1433
1434 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1440 log::debug!("Adding `Instrument` {}", instrument.id());
1441
1442 if let Some(database) = &mut self.database {
1443 database.add_instrument(&instrument)?;
1444 }
1445
1446 self.instruments.insert(instrument.id(), instrument);
1447 Ok(())
1448 }
1449
1450 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1456 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1457
1458 if let Some(database) = &mut self.database {
1459 database.add_synthetic(&synthetic)?;
1460 }
1461
1462 self.synthetics.insert(synthetic.id, synthetic);
1463 Ok(())
1464 }
1465
1466 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1472 log::debug!("Adding `Account` {}", account.id());
1473
1474 if let Some(database) = &mut self.database {
1475 database.add_account(&account)?;
1476 }
1477
1478 let account_id = account.id();
1479 self.accounts.insert(account_id, account);
1480 self.index
1481 .venue_account
1482 .insert(account_id.get_issuer(), account_id);
1483 Ok(())
1484 }
1485
1486 pub fn add_venue_order_id(
1494 &mut self,
1495 client_order_id: &ClientOrderId,
1496 venue_order_id: &VenueOrderId,
1497 overwrite: bool,
1498 ) -> anyhow::Result<()> {
1499 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1500 if !overwrite && existing_venue_order_id != venue_order_id {
1501 anyhow::bail!(
1502 "Existing {existing_venue_order_id} for {client_order_id}
1503 did not match the given {venue_order_id}.
1504 If you are writing a test then try a different `venue_order_id`,
1505 otherwise this is probably a bug."
1506 );
1507 }
1508 }
1509
1510 self.index
1511 .client_order_ids
1512 .insert(*client_order_id, *venue_order_id);
1513 self.index
1514 .venue_order_ids
1515 .insert(*venue_order_id, *client_order_id);
1516
1517 Ok(())
1518 }
1519
1520 pub fn add_order(
1532 &mut self,
1533 order: OrderAny,
1534 position_id: Option<PositionId>,
1535 client_id: Option<ClientId>,
1536 replace_existing: bool,
1537 ) -> anyhow::Result<()> {
1538 let instrument_id = order.instrument_id();
1539 let venue = instrument_id.venue;
1540 let client_order_id = order.client_order_id();
1541 let strategy_id = order.strategy_id();
1542 let exec_algorithm_id = order.exec_algorithm_id();
1543 let exec_spawn_id = order.exec_spawn_id();
1544
1545 if !replace_existing {
1546 check_key_not_in_map(
1547 &client_order_id,
1548 &self.orders,
1549 stringify!(client_order_id),
1550 stringify!(orders),
1551 )?;
1552 }
1553
1554 log::debug!("Adding {order:?}");
1555
1556 self.index.orders.insert(client_order_id);
1557 self.index
1558 .order_strategy
1559 .insert(client_order_id, strategy_id);
1560 self.index.strategies.insert(strategy_id);
1561
1562 self.index
1564 .venue_orders
1565 .entry(venue)
1566 .or_default()
1567 .insert(client_order_id);
1568
1569 self.index
1571 .instrument_orders
1572 .entry(instrument_id)
1573 .or_default()
1574 .insert(client_order_id);
1575
1576 self.index
1578 .strategy_orders
1579 .entry(strategy_id)
1580 .or_default()
1581 .insert(client_order_id);
1582
1583 if let (Some(exec_algorithm_id), Some(exec_spawn_id)) = (exec_algorithm_id, exec_spawn_id) {
1586 self.index.exec_algorithms.insert(exec_algorithm_id);
1587
1588 self.index
1589 .exec_algorithm_orders
1590 .entry(exec_algorithm_id)
1591 .or_default()
1592 .insert(client_order_id);
1593
1594 self.index
1595 .exec_spawn_orders
1596 .entry(exec_spawn_id)
1597 .or_default()
1598 .insert(client_order_id);
1599 }
1600
1601 match order.emulation_trigger() {
1603 Some(_) => {
1604 self.index.orders_emulated.remove(&client_order_id);
1605 }
1606 None => {
1607 self.index.orders_emulated.insert(client_order_id);
1608 }
1609 }
1610
1611 if let Some(position_id) = position_id {
1613 self.add_position_id(
1614 &position_id,
1615 &order.instrument_id().venue,
1616 &client_order_id,
1617 &strategy_id,
1618 )?;
1619 }
1620
1621 if let Some(client_id) = client_id {
1623 self.index.order_client.insert(client_order_id, client_id);
1624 log::debug!("Indexed {client_id:?}");
1625 }
1626
1627 if let Some(database) = &mut self.database {
1628 database.add_order(&order, client_id)?;
1629 }
1634
1635 self.orders.insert(client_order_id, order);
1636
1637 Ok(())
1638 }
1639
1640 pub fn add_position_id(
1646 &mut self,
1647 position_id: &PositionId,
1648 venue: &Venue,
1649 client_order_id: &ClientOrderId,
1650 strategy_id: &StrategyId,
1651 ) -> anyhow::Result<()> {
1652 self.index
1653 .order_position
1654 .insert(*client_order_id, *position_id);
1655
1656 if let Some(database) = &mut self.database {
1658 database.index_order_position(*client_order_id, *position_id)?;
1659 }
1660
1661 self.index
1663 .position_strategy
1664 .insert(*position_id, *strategy_id);
1665
1666 self.index
1668 .position_orders
1669 .entry(*position_id)
1670 .or_default()
1671 .insert(*client_order_id);
1672
1673 self.index
1675 .strategy_positions
1676 .entry(*strategy_id)
1677 .or_default()
1678 .insert(*position_id);
1679
1680 self.index
1682 .venue_positions
1683 .entry(*venue)
1684 .or_default()
1685 .insert(*position_id);
1686
1687 Ok(())
1688 }
1689
1690 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1696 self.positions.insert(position.id, position.clone());
1697 self.index.positions.insert(position.id);
1698 self.index.positions_open.insert(position.id);
1699
1700 log::debug!("Adding {position}");
1701
1702 self.add_position_id(
1703 &position.id,
1704 &position.instrument_id.venue,
1705 &position.opening_order_id,
1706 &position.strategy_id,
1707 )?;
1708
1709 let venue = position.instrument_id.venue;
1710 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1711 venue_positions.insert(position.id);
1712
1713 let instrument_id = position.instrument_id;
1715 let instrument_positions = self
1716 .index
1717 .instrument_positions
1718 .entry(instrument_id)
1719 .or_default();
1720 instrument_positions.insert(position.id);
1721
1722 if let Some(database) = &mut self.database {
1723 database.add_position(&position)?;
1724 }
1733
1734 Ok(())
1735 }
1736
1737 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1743 if let Some(database) = &mut self.database {
1744 database.update_account(&account)?;
1745 }
1746 Ok(())
1747 }
1748
1749 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1755 let client_order_id = order.client_order_id();
1756
1757 if let Some(venue_order_id) = order.venue_order_id() {
1759 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1762 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1764 }
1765 }
1766
1767 if order.is_inflight() {
1769 self.index.orders_inflight.insert(client_order_id);
1770 } else {
1771 self.index.orders_inflight.remove(&client_order_id);
1772 }
1773
1774 if order.is_open() {
1776 self.index.orders_closed.remove(&client_order_id);
1777 self.index.orders_open.insert(client_order_id);
1778 } else if order.is_closed() {
1779 self.index.orders_open.remove(&client_order_id);
1780 self.index.orders_pending_cancel.remove(&client_order_id);
1781 self.index.orders_closed.insert(client_order_id);
1782 }
1783
1784 if let Some(emulation_trigger) = order.emulation_trigger() {
1786 match emulation_trigger {
1787 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1788 _ => self.index.orders_emulated.insert(client_order_id),
1789 };
1790 }
1791
1792 if let Some(database) = &mut self.database {
1793 database.update_order(order.last_event())?;
1794 }
1799
1800 self.orders.insert(client_order_id, order.clone());
1802
1803 Ok(())
1804 }
1805
1806 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1808 self.index
1809 .orders_pending_cancel
1810 .insert(order.client_order_id());
1811 }
1812
1813 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1819 if position.is_open() {
1821 self.index.positions_open.insert(position.id);
1822 self.index.positions_closed.remove(&position.id);
1823 } else {
1824 self.index.positions_closed.insert(position.id);
1825 self.index.positions_open.remove(&position.id);
1826 }
1827
1828 if let Some(database) = &mut self.database {
1829 database.update_position(position)?;
1830 }
1835 Ok(())
1836 }
1837
1838 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1845 let position_id = position.id;
1846
1847 let mut copied_position = position.clone();
1848 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1849 copied_position.id = PositionId::new(new_id);
1850
1851 let position_serialized = serde_json::to_vec(&copied_position)?;
1853
1854 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1855 let new_snapshots = match snapshots {
1856 Some(existing_snapshots) => {
1857 let mut combined = existing_snapshots.to_vec();
1858 combined.extend(position_serialized);
1859 Bytes::from(combined)
1860 }
1861 None => Bytes::from(position_serialized),
1862 };
1863 self.position_snapshots.insert(position_id, new_snapshots);
1864
1865 log::debug!("Snapshot {}", copied_position);
1866 Ok(())
1867 }
1868
1869 pub fn snapshot_position_state(
1875 &mut self,
1876 position: &Position,
1877 open_only: Option<bool>,
1880 ) -> anyhow::Result<()> {
1881 let open_only = open_only.unwrap_or(true);
1882
1883 if open_only && !position.is_open() {
1884 return Ok(());
1885 }
1886
1887 if let Some(database) = &mut self.database {
1888 database.snapshot_position_state(position).map_err(|e| {
1889 log::error!(
1890 "Failed to snapshot position state for {}: {e:?}",
1891 position.id
1892 );
1893 e
1894 })?;
1895 } else {
1896 log::warn!(
1897 "Cannot snapshot position state for {} (no database configured)",
1898 position.id
1899 );
1900 }
1901
1902 todo!()
1904 }
1905
1906 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1912 let database = if let Some(database) = &self.database {
1913 database
1914 } else {
1915 log::warn!(
1916 "Cannot snapshot order state for {} (no database configured)",
1917 order.client_order_id()
1918 );
1919 return Ok(());
1920 };
1921
1922 database.snapshot_order_state(order)
1923 }
1924
1925 fn build_order_query_filter_set(
1928 &self,
1929 venue: Option<&Venue>,
1930 instrument_id: Option<&InstrumentId>,
1931 strategy_id: Option<&StrategyId>,
1932 ) -> Option<HashSet<ClientOrderId>> {
1933 let mut query: Option<HashSet<ClientOrderId>> = None;
1934
1935 if let Some(venue) = venue {
1936 query = Some(
1937 self.index
1938 .venue_orders
1939 .get(venue)
1940 .cloned()
1941 .unwrap_or_default(),
1942 );
1943 }
1944
1945 if let Some(instrument_id) = instrument_id {
1946 let instrument_orders = self
1947 .index
1948 .instrument_orders
1949 .get(instrument_id)
1950 .cloned()
1951 .unwrap_or_default();
1952
1953 if let Some(existing_query) = &mut query {
1954 *existing_query = existing_query
1955 .intersection(&instrument_orders)
1956 .copied()
1957 .collect();
1958 } else {
1959 query = Some(instrument_orders);
1960 }
1961 }
1962
1963 if let Some(strategy_id) = strategy_id {
1964 let strategy_orders = self
1965 .index
1966 .strategy_orders
1967 .get(strategy_id)
1968 .cloned()
1969 .unwrap_or_default();
1970
1971 if let Some(existing_query) = &mut query {
1972 *existing_query = existing_query
1973 .intersection(&strategy_orders)
1974 .copied()
1975 .collect();
1976 } else {
1977 query = Some(strategy_orders);
1978 }
1979 }
1980
1981 query
1982 }
1983
1984 fn build_position_query_filter_set(
1985 &self,
1986 venue: Option<&Venue>,
1987 instrument_id: Option<&InstrumentId>,
1988 strategy_id: Option<&StrategyId>,
1989 ) -> Option<HashSet<PositionId>> {
1990 let mut query: Option<HashSet<PositionId>> = None;
1991
1992 if let Some(venue) = venue {
1993 query = Some(
1994 self.index
1995 .venue_positions
1996 .get(venue)
1997 .cloned()
1998 .unwrap_or_default(),
1999 );
2000 }
2001
2002 if let Some(instrument_id) = instrument_id {
2003 let instrument_positions = self
2004 .index
2005 .instrument_positions
2006 .get(instrument_id)
2007 .cloned()
2008 .unwrap_or_default();
2009
2010 if let Some(existing_query) = query {
2011 query = Some(
2012 existing_query
2013 .intersection(&instrument_positions)
2014 .copied()
2015 .collect(),
2016 );
2017 } else {
2018 query = Some(instrument_positions);
2019 }
2020 }
2021
2022 if let Some(strategy_id) = strategy_id {
2023 let strategy_positions = self
2024 .index
2025 .strategy_positions
2026 .get(strategy_id)
2027 .cloned()
2028 .unwrap_or_default();
2029
2030 if let Some(existing_query) = query {
2031 query = Some(
2032 existing_query
2033 .intersection(&strategy_positions)
2034 .copied()
2035 .collect(),
2036 );
2037 } else {
2038 query = Some(strategy_positions);
2039 }
2040 }
2041
2042 query
2043 }
2044
2045 fn get_orders_for_ids(
2051 &self,
2052 client_order_ids: &HashSet<ClientOrderId>,
2053 side: Option<OrderSide>,
2054 ) -> Vec<&OrderAny> {
2055 let side = side.unwrap_or(OrderSide::NoOrderSide);
2056 let mut orders = Vec::new();
2057
2058 for client_order_id in client_order_ids {
2059 let order = self
2060 .orders
2061 .get(client_order_id)
2062 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2063 if side == OrderSide::NoOrderSide || side == order.order_side() {
2064 orders.push(order);
2065 }
2066 }
2067
2068 orders
2069 }
2070
2071 fn get_positions_for_ids(
2077 &self,
2078 position_ids: &HashSet<PositionId>,
2079 side: Option<PositionSide>,
2080 ) -> Vec<&Position> {
2081 let side = side.unwrap_or(PositionSide::NoPositionSide);
2082 let mut positions = Vec::new();
2083
2084 for position_id in position_ids {
2085 let position = self
2086 .positions
2087 .get(position_id)
2088 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2089 if side == PositionSide::NoPositionSide || side == position.side {
2090 positions.push(position);
2091 }
2092 }
2093
2094 positions
2095 }
2096
2097 #[must_use]
2099 pub fn client_order_ids(
2100 &self,
2101 venue: Option<&Venue>,
2102 instrument_id: Option<&InstrumentId>,
2103 strategy_id: Option<&StrategyId>,
2104 ) -> HashSet<ClientOrderId> {
2105 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2106 match query {
2107 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2108 None => self.index.orders.clone(),
2109 }
2110 }
2111
2112 #[must_use]
2114 pub fn client_order_ids_open(
2115 &self,
2116 venue: Option<&Venue>,
2117 instrument_id: Option<&InstrumentId>,
2118 strategy_id: Option<&StrategyId>,
2119 ) -> HashSet<ClientOrderId> {
2120 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2121 match query {
2122 Some(query) => self
2123 .index
2124 .orders_open
2125 .intersection(&query)
2126 .copied()
2127 .collect(),
2128 None => self.index.orders_open.clone(),
2129 }
2130 }
2131
2132 #[must_use]
2134 pub fn client_order_ids_closed(
2135 &self,
2136 venue: Option<&Venue>,
2137 instrument_id: Option<&InstrumentId>,
2138 strategy_id: Option<&StrategyId>,
2139 ) -> HashSet<ClientOrderId> {
2140 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2141 match query {
2142 Some(query) => self
2143 .index
2144 .orders_closed
2145 .intersection(&query)
2146 .copied()
2147 .collect(),
2148 None => self.index.orders_closed.clone(),
2149 }
2150 }
2151
2152 #[must_use]
2154 pub fn client_order_ids_emulated(
2155 &self,
2156 venue: Option<&Venue>,
2157 instrument_id: Option<&InstrumentId>,
2158 strategy_id: Option<&StrategyId>,
2159 ) -> HashSet<ClientOrderId> {
2160 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2161 match query {
2162 Some(query) => self
2163 .index
2164 .orders_emulated
2165 .intersection(&query)
2166 .copied()
2167 .collect(),
2168 None => self.index.orders_emulated.clone(),
2169 }
2170 }
2171
2172 #[must_use]
2174 pub fn client_order_ids_inflight(
2175 &self,
2176 venue: Option<&Venue>,
2177 instrument_id: Option<&InstrumentId>,
2178 strategy_id: Option<&StrategyId>,
2179 ) -> HashSet<ClientOrderId> {
2180 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2181 match query {
2182 Some(query) => self
2183 .index
2184 .orders_inflight
2185 .intersection(&query)
2186 .copied()
2187 .collect(),
2188 None => self.index.orders_inflight.clone(),
2189 }
2190 }
2191
2192 #[must_use]
2194 pub fn position_ids(
2195 &self,
2196 venue: Option<&Venue>,
2197 instrument_id: Option<&InstrumentId>,
2198 strategy_id: Option<&StrategyId>,
2199 ) -> HashSet<PositionId> {
2200 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2201 match query {
2202 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2203 None => self.index.positions.clone(),
2204 }
2205 }
2206
2207 #[must_use]
2209 pub fn position_open_ids(
2210 &self,
2211 venue: Option<&Venue>,
2212 instrument_id: Option<&InstrumentId>,
2213 strategy_id: Option<&StrategyId>,
2214 ) -> HashSet<PositionId> {
2215 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2216 match query {
2217 Some(query) => self
2218 .index
2219 .positions_open
2220 .intersection(&query)
2221 .copied()
2222 .collect(),
2223 None => self.index.positions_open.clone(),
2224 }
2225 }
2226
2227 #[must_use]
2229 pub fn position_closed_ids(
2230 &self,
2231 venue: Option<&Venue>,
2232 instrument_id: Option<&InstrumentId>,
2233 strategy_id: Option<&StrategyId>,
2234 ) -> HashSet<PositionId> {
2235 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2236 match query {
2237 Some(query) => self
2238 .index
2239 .positions_closed
2240 .intersection(&query)
2241 .copied()
2242 .collect(),
2243 None => self.index.positions_closed.clone(),
2244 }
2245 }
2246
2247 #[must_use]
2249 pub fn actor_ids(&self) -> HashSet<ComponentId> {
2250 self.index.actors.clone()
2251 }
2252
2253 #[must_use]
2255 pub fn strategy_ids(&self) -> HashSet<StrategyId> {
2256 self.index.strategies.clone()
2257 }
2258
2259 #[must_use]
2261 pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
2262 self.index.exec_algorithms.clone()
2263 }
2264
2265 #[must_use]
2269 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2270 self.orders.get(client_order_id)
2271 }
2272
2273 #[must_use]
2275 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2276 self.orders.get_mut(client_order_id)
2277 }
2278
2279 #[must_use]
2281 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2282 self.index.venue_order_ids.get(venue_order_id)
2283 }
2284
2285 #[must_use]
2287 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2288 self.index.client_order_ids.get(client_order_id)
2289 }
2290
2291 #[must_use]
2293 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2294 self.index.order_client.get(client_order_id)
2295 }
2296
2297 #[must_use]
2299 pub fn orders(
2300 &self,
2301 venue: Option<&Venue>,
2302 instrument_id: Option<&InstrumentId>,
2303 strategy_id: Option<&StrategyId>,
2304 side: Option<OrderSide>,
2305 ) -> Vec<&OrderAny> {
2306 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2307 self.get_orders_for_ids(&client_order_ids, side)
2308 }
2309
2310 #[must_use]
2312 pub fn orders_open(
2313 &self,
2314 venue: Option<&Venue>,
2315 instrument_id: Option<&InstrumentId>,
2316 strategy_id: Option<&StrategyId>,
2317 side: Option<OrderSide>,
2318 ) -> Vec<&OrderAny> {
2319 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2320 self.get_orders_for_ids(&client_order_ids, side)
2321 }
2322
2323 #[must_use]
2325 pub fn orders_closed(
2326 &self,
2327 venue: Option<&Venue>,
2328 instrument_id: Option<&InstrumentId>,
2329 strategy_id: Option<&StrategyId>,
2330 side: Option<OrderSide>,
2331 ) -> Vec<&OrderAny> {
2332 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2333 self.get_orders_for_ids(&client_order_ids, side)
2334 }
2335
2336 #[must_use]
2338 pub fn orders_emulated(
2339 &self,
2340 venue: Option<&Venue>,
2341 instrument_id: Option<&InstrumentId>,
2342 strategy_id: Option<&StrategyId>,
2343 side: Option<OrderSide>,
2344 ) -> Vec<&OrderAny> {
2345 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2346 self.get_orders_for_ids(&client_order_ids, side)
2347 }
2348
2349 #[must_use]
2351 pub fn orders_inflight(
2352 &self,
2353 venue: Option<&Venue>,
2354 instrument_id: Option<&InstrumentId>,
2355 strategy_id: Option<&StrategyId>,
2356 side: Option<OrderSide>,
2357 ) -> Vec<&OrderAny> {
2358 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2359 self.get_orders_for_ids(&client_order_ids, side)
2360 }
2361
2362 #[must_use]
2364 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2365 let client_order_ids = self.index.position_orders.get(position_id);
2366 match client_order_ids {
2367 Some(client_order_ids) => {
2368 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2369 }
2370 None => Vec::new(),
2371 }
2372 }
2373
2374 #[must_use]
2376 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2377 self.index.orders.contains(client_order_id)
2378 }
2379
2380 #[must_use]
2382 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2383 self.index.orders_open.contains(client_order_id)
2384 }
2385
2386 #[must_use]
2388 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2389 self.index.orders_closed.contains(client_order_id)
2390 }
2391
2392 #[must_use]
2394 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2395 self.index.orders_emulated.contains(client_order_id)
2396 }
2397
2398 #[must_use]
2400 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2401 self.index.orders_inflight.contains(client_order_id)
2402 }
2403
2404 #[must_use]
2406 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2407 self.index.orders_pending_cancel.contains(client_order_id)
2408 }
2409
2410 #[must_use]
2412 pub fn orders_open_count(
2413 &self,
2414 venue: Option<&Venue>,
2415 instrument_id: Option<&InstrumentId>,
2416 strategy_id: Option<&StrategyId>,
2417 side: Option<OrderSide>,
2418 ) -> usize {
2419 self.orders_open(venue, instrument_id, strategy_id, side)
2420 .len()
2421 }
2422
2423 #[must_use]
2425 pub fn orders_closed_count(
2426 &self,
2427 venue: Option<&Venue>,
2428 instrument_id: Option<&InstrumentId>,
2429 strategy_id: Option<&StrategyId>,
2430 side: Option<OrderSide>,
2431 ) -> usize {
2432 self.orders_closed(venue, instrument_id, strategy_id, side)
2433 .len()
2434 }
2435
2436 #[must_use]
2438 pub fn orders_emulated_count(
2439 &self,
2440 venue: Option<&Venue>,
2441 instrument_id: Option<&InstrumentId>,
2442 strategy_id: Option<&StrategyId>,
2443 side: Option<OrderSide>,
2444 ) -> usize {
2445 self.orders_emulated(venue, instrument_id, strategy_id, side)
2446 .len()
2447 }
2448
2449 #[must_use]
2451 pub fn orders_inflight_count(
2452 &self,
2453 venue: Option<&Venue>,
2454 instrument_id: Option<&InstrumentId>,
2455 strategy_id: Option<&StrategyId>,
2456 side: Option<OrderSide>,
2457 ) -> usize {
2458 self.orders_inflight(venue, instrument_id, strategy_id, side)
2459 .len()
2460 }
2461
2462 #[must_use]
2464 pub fn orders_total_count(
2465 &self,
2466 venue: Option<&Venue>,
2467 instrument_id: Option<&InstrumentId>,
2468 strategy_id: Option<&StrategyId>,
2469 side: Option<OrderSide>,
2470 ) -> usize {
2471 self.orders(venue, instrument_id, strategy_id, side).len()
2472 }
2473
2474 #[must_use]
2476 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2477 self.order_lists.get(order_list_id)
2478 }
2479
2480 #[must_use]
2482 pub fn order_lists(
2483 &self,
2484 venue: Option<&Venue>,
2485 instrument_id: Option<&InstrumentId>,
2486 strategy_id: Option<&StrategyId>,
2487 ) -> Vec<&OrderList> {
2488 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2489
2490 if let Some(venue) = venue {
2491 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2492 }
2493
2494 if let Some(instrument_id) = instrument_id {
2495 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2496 }
2497
2498 if let Some(strategy_id) = strategy_id {
2499 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2500 }
2501
2502 order_lists
2503 }
2504
2505 #[must_use]
2507 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2508 self.order_lists.contains_key(order_list_id)
2509 }
2510
2511 #[must_use]
2516 pub fn orders_for_exec_algorithm(
2517 &self,
2518 exec_algorithm_id: &ExecAlgorithmId,
2519 venue: Option<&Venue>,
2520 instrument_id: Option<&InstrumentId>,
2521 strategy_id: Option<&StrategyId>,
2522 side: Option<OrderSide>,
2523 ) -> Vec<&OrderAny> {
2524 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2525 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2526
2527 if let Some(query) = query {
2528 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2529 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2530 }
2531 }
2532
2533 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2534 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2535 } else {
2536 Vec::new()
2537 }
2538 }
2539
2540 #[must_use]
2542 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2543 self.get_orders_for_ids(
2544 self.index
2545 .exec_spawn_orders
2546 .get(exec_spawn_id)
2547 .unwrap_or(&HashSet::new()),
2548 None,
2549 )
2550 }
2551
2552 #[must_use]
2554 pub fn exec_spawn_total_quantity(
2555 &self,
2556 exec_spawn_id: &ClientOrderId,
2557 active_only: bool,
2558 ) -> Option<Quantity> {
2559 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2560
2561 let mut total_quantity: Option<Quantity> = None;
2562
2563 for spawn_order in exec_spawn_orders {
2564 if !active_only || !spawn_order.is_closed() {
2565 if let Some(mut total_quantity) = total_quantity {
2566 total_quantity += spawn_order.quantity();
2567 }
2568 } else {
2569 total_quantity = Some(spawn_order.quantity());
2570 }
2571 }
2572
2573 total_quantity
2574 }
2575
2576 #[must_use]
2578 pub fn exec_spawn_total_filled_qty(
2579 &self,
2580 exec_spawn_id: &ClientOrderId,
2581 active_only: bool,
2582 ) -> Option<Quantity> {
2583 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2584
2585 let mut total_quantity: Option<Quantity> = None;
2586
2587 for spawn_order in exec_spawn_orders {
2588 if !active_only || !spawn_order.is_closed() {
2589 if let Some(mut total_quantity) = total_quantity {
2590 total_quantity += spawn_order.filled_qty();
2591 }
2592 } else {
2593 total_quantity = Some(spawn_order.filled_qty());
2594 }
2595 }
2596
2597 total_quantity
2598 }
2599
2600 #[must_use]
2602 pub fn exec_spawn_total_leaves_qty(
2603 &self,
2604 exec_spawn_id: &ClientOrderId,
2605 active_only: bool,
2606 ) -> Option<Quantity> {
2607 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2608
2609 let mut total_quantity: Option<Quantity> = None;
2610
2611 for spawn_order in exec_spawn_orders {
2612 if !active_only || !spawn_order.is_closed() {
2613 if let Some(mut total_quantity) = total_quantity {
2614 total_quantity += spawn_order.leaves_qty();
2615 }
2616 } else {
2617 total_quantity = Some(spawn_order.leaves_qty());
2618 }
2619 }
2620
2621 total_quantity
2622 }
2623
2624 #[must_use]
2628 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2629 self.positions.get(position_id)
2630 }
2631
2632 #[must_use]
2634 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2635 self.index
2636 .order_position
2637 .get(client_order_id)
2638 .and_then(|position_id| self.positions.get(position_id))
2639 }
2640
2641 #[must_use]
2643 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2644 self.index.order_position.get(client_order_id)
2645 }
2646
2647 #[must_use]
2649 pub fn positions(
2650 &self,
2651 venue: Option<&Venue>,
2652 instrument_id: Option<&InstrumentId>,
2653 strategy_id: Option<&StrategyId>,
2654 side: Option<PositionSide>,
2655 ) -> Vec<&Position> {
2656 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2657 self.get_positions_for_ids(&position_ids, side)
2658 }
2659
2660 #[must_use]
2662 pub fn positions_open(
2663 &self,
2664 venue: Option<&Venue>,
2665 instrument_id: Option<&InstrumentId>,
2666 strategy_id: Option<&StrategyId>,
2667 side: Option<PositionSide>,
2668 ) -> Vec<&Position> {
2669 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2670 self.get_positions_for_ids(&position_ids, side)
2671 }
2672
2673 #[must_use]
2675 pub fn positions_closed(
2676 &self,
2677 venue: Option<&Venue>,
2678 instrument_id: Option<&InstrumentId>,
2679 strategy_id: Option<&StrategyId>,
2680 side: Option<PositionSide>,
2681 ) -> Vec<&Position> {
2682 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2683 self.get_positions_for_ids(&position_ids, side)
2684 }
2685
2686 #[must_use]
2688 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2689 self.index.positions.contains(position_id)
2690 }
2691
2692 #[must_use]
2694 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2695 self.index.positions_open.contains(position_id)
2696 }
2697
2698 #[must_use]
2700 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2701 self.index.positions_closed.contains(position_id)
2702 }
2703
2704 #[must_use]
2706 pub fn positions_open_count(
2707 &self,
2708 venue: Option<&Venue>,
2709 instrument_id: Option<&InstrumentId>,
2710 strategy_id: Option<&StrategyId>,
2711 side: Option<PositionSide>,
2712 ) -> usize {
2713 self.positions_open(venue, instrument_id, strategy_id, side)
2714 .len()
2715 }
2716
2717 #[must_use]
2719 pub fn positions_closed_count(
2720 &self,
2721 venue: Option<&Venue>,
2722 instrument_id: Option<&InstrumentId>,
2723 strategy_id: Option<&StrategyId>,
2724 side: Option<PositionSide>,
2725 ) -> usize {
2726 self.positions_closed(venue, instrument_id, strategy_id, side)
2727 .len()
2728 }
2729
2730 #[must_use]
2732 pub fn positions_total_count(
2733 &self,
2734 venue: Option<&Venue>,
2735 instrument_id: Option<&InstrumentId>,
2736 strategy_id: Option<&StrategyId>,
2737 side: Option<PositionSide>,
2738 ) -> usize {
2739 self.positions(venue, instrument_id, strategy_id, side)
2740 .len()
2741 }
2742
2743 #[must_use]
2747 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2748 self.index.order_strategy.get(client_order_id)
2749 }
2750
2751 #[must_use]
2753 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2754 self.index.position_strategy.get(position_id)
2755 }
2756
2757 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2765 check_valid_string(key, stringify!(key))?;
2766
2767 Ok(self.general.get(key))
2768 }
2769
2770 #[must_use]
2774 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2775 match price_type {
2776 PriceType::Bid => self
2777 .quotes
2778 .get(instrument_id)
2779 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2780 PriceType::Ask => self
2781 .quotes
2782 .get(instrument_id)
2783 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2784 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2785 quotes.front().map(|quote| {
2786 Price::new(
2787 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2788 quote.bid_price.precision + 1,
2789 )
2790 })
2791 }),
2792 PriceType::Last => self
2793 .trades
2794 .get(instrument_id)
2795 .and_then(|trades| trades.front().map(|trade| trade.price)),
2796 PriceType::Mark => self
2797 .mark_prices
2798 .get(instrument_id)
2799 .and_then(|marks| marks.front().map(|mark| mark.value)),
2800 }
2801 }
2802
2803 #[must_use]
2805 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2806 self.quotes
2807 .get(instrument_id)
2808 .map(|quotes| quotes.iter().copied().collect())
2809 }
2810
2811 #[must_use]
2813 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2814 self.trades
2815 .get(instrument_id)
2816 .map(|trades| trades.iter().copied().collect())
2817 }
2818
2819 #[must_use]
2821 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2822 self.mark_prices
2823 .get(instrument_id)
2824 .map(|mark_prices| mark_prices.iter().copied().collect())
2825 }
2826
2827 #[must_use]
2829 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2830 self.index_prices
2831 .get(instrument_id)
2832 .map(|index_prices| index_prices.iter().copied().collect())
2833 }
2834
2835 #[must_use]
2837 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2838 self.bars
2839 .get(bar_type)
2840 .map(|bars| bars.iter().copied().collect())
2841 }
2842
2843 #[must_use]
2845 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2846 self.books.get(instrument_id)
2847 }
2848
2849 #[must_use]
2851 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2852 self.books.get_mut(instrument_id)
2853 }
2854
2855 #[must_use]
2857 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
2858 self.own_books.get(instrument_id)
2859 }
2860
2861 #[must_use]
2863 pub fn own_order_book_mut(
2864 &mut self,
2865 instrument_id: &InstrumentId,
2866 ) -> Option<&mut OwnOrderBook> {
2867 self.own_books.get_mut(instrument_id)
2868 }
2869
2870 #[must_use]
2872 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2873 self.quotes
2874 .get(instrument_id)
2875 .and_then(|quotes| quotes.front())
2876 }
2877
2878 #[must_use]
2880 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2881 self.trades
2882 .get(instrument_id)
2883 .and_then(|trades| trades.front())
2884 }
2885
2886 #[must_use]
2888 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
2889 self.mark_prices
2890 .get(instrument_id)
2891 .and_then(|mark_prices| mark_prices.front())
2892 }
2893
2894 #[must_use]
2896 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
2897 self.index_prices
2898 .get(instrument_id)
2899 .and_then(|index_prices| index_prices.front())
2900 }
2901
2902 #[must_use]
2904 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2905 self.bars.get(bar_type).and_then(|bars| bars.front())
2906 }
2907
2908 #[must_use]
2910 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2911 self.books
2912 .get(instrument_id)
2913 .map_or(0, |book| book.update_count) as usize
2914 }
2915
2916 #[must_use]
2918 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2919 self.quotes
2920 .get(instrument_id)
2921 .map_or(0, std::collections::VecDeque::len)
2922 }
2923
2924 #[must_use]
2926 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2927 self.trades
2928 .get(instrument_id)
2929 .map_or(0, std::collections::VecDeque::len)
2930 }
2931
2932 #[must_use]
2934 pub fn bar_count(&self, bar_type: &BarType) -> usize {
2935 self.bars
2936 .get(bar_type)
2937 .map_or(0, std::collections::VecDeque::len)
2938 }
2939
2940 #[must_use]
2942 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2943 self.books.contains_key(instrument_id)
2944 }
2945
2946 #[must_use]
2948 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2949 self.quote_count(instrument_id) > 0
2950 }
2951
2952 #[must_use]
2954 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2955 self.trade_count(instrument_id) > 0
2956 }
2957
2958 #[must_use]
2960 pub fn has_bars(&self, bar_type: &BarType) -> bool {
2961 self.bar_count(bar_type) > 0
2962 }
2963
2964 #[must_use]
2965 pub fn get_xrate(
2966 &self,
2967 venue: Venue,
2968 from_currency: Currency,
2969 to_currency: Currency,
2970 price_type: PriceType,
2971 ) -> Option<f64> {
2972 if from_currency == to_currency {
2973 return Some(1.0);
2976 }
2977
2978 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2979
2980 match get_exchange_rate(
2981 from_currency.code,
2982 to_currency.code,
2983 price_type,
2984 bid_quote,
2985 ask_quote,
2986 ) {
2987 Ok(rate) => rate,
2988 Err(e) => {
2989 log::error!("Failed to calculate xrate: {e}");
2990 None
2991 }
2992 }
2993 }
2994
2995 fn build_quote_table(&self, venue: &Venue) -> (HashMap<String, f64>, HashMap<String, f64>) {
2996 let mut bid_quotes = HashMap::new();
2997 let mut ask_quotes = HashMap::new();
2998
2999 for instrument_id in self.instruments.keys() {
3000 if instrument_id.venue != *venue {
3001 continue;
3002 }
3003
3004 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3005 if let Some(tick) = ticks.front() {
3006 (tick.bid_price, tick.ask_price)
3007 } else {
3008 continue; }
3010 } else {
3011 let bid_bar = self
3012 .bars
3013 .iter()
3014 .find(|(k, _)| {
3015 k.instrument_id() == *instrument_id
3016 && matches!(k.spec().price_type, PriceType::Bid)
3017 })
3018 .map(|(_, v)| v);
3019
3020 let ask_bar = self
3021 .bars
3022 .iter()
3023 .find(|(k, _)| {
3024 k.instrument_id() == *instrument_id
3025 && matches!(k.spec().price_type, PriceType::Ask)
3026 })
3027 .map(|(_, v)| v);
3028
3029 match (bid_bar, ask_bar) {
3030 (Some(bid), Some(ask)) => {
3031 let bid_price = bid.front().unwrap().close;
3032 let ask_price = ask.front().unwrap().close;
3033
3034 (bid_price, ask_price)
3035 }
3036 _ => continue,
3037 }
3038 };
3039
3040 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3041 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3042 }
3043
3044 (bid_quotes, ask_quotes)
3045 }
3046
3047 #[must_use]
3049 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3050 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3051 }
3052
3053 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3059 assert!(xrate > 0.0, "xrate was zero");
3060 self.mark_xrates.insert((from_currency, to_currency), xrate);
3061 self.mark_xrates
3062 .insert((to_currency, from_currency), 1.0 / xrate);
3063 }
3064
3065 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3067 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3068 }
3069
3070 pub fn clear_mark_xrates(&mut self) {
3072 self.mark_xrates.clear();
3073 }
3074
3075 #[must_use]
3079 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3080 self.instruments.get(instrument_id)
3081 }
3082
3083 #[must_use]
3085 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3086 match venue {
3087 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3088 None => self.instruments.keys().collect(),
3089 }
3090 }
3091
3092 #[must_use]
3094 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3095 self.instruments
3096 .values()
3097 .filter(|i| &i.id().venue == venue)
3098 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3099 .collect()
3100 }
3101
3102 #[must_use]
3104 pub fn bar_types(
3105 &self,
3106 instrument_id: Option<&InstrumentId>,
3107 price_type: Option<&PriceType>,
3108 aggregation_source: AggregationSource,
3109 ) -> Vec<&BarType> {
3110 let mut bar_types = self
3111 .bars
3112 .keys()
3113 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3114 .collect::<Vec<&BarType>>();
3115
3116 if let Some(instrument_id) = instrument_id {
3117 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3118 }
3119
3120 if let Some(price_type) = price_type {
3121 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3122 }
3123
3124 bar_types
3125 }
3126
3127 #[must_use]
3131 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3132 self.synthetics.get(instrument_id)
3133 }
3134
3135 #[must_use]
3137 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3138 self.synthetics.keys().collect()
3139 }
3140
3141 #[must_use]
3143 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3144 self.synthetics.values().collect()
3145 }
3146
3147 #[must_use]
3151 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3152 self.accounts.get(account_id)
3153 }
3154
3155 #[must_use]
3157 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3158 self.index
3159 .venue_account
3160 .get(venue)
3161 .and_then(|account_id| self.accounts.get(account_id))
3162 }
3163
3164 #[must_use]
3166 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3167 self.index.venue_account.get(venue)
3168 }
3169
3170 #[must_use]
3172 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3173 self.accounts
3174 .values()
3175 .filter(|account| &account.id() == account_id)
3176 .collect()
3177 }
3178}