nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22
23mod index;
24
25#[cfg(test)]
26mod tests;
27
28use std::{
29    collections::{HashMap, HashSet, VecDeque},
30    fmt::Debug,
31    time::{SystemTime, UNIX_EPOCH},
32};
33
34use bytes::Bytes;
35pub use config::CacheConfig; // Re-export
36use database::{CacheDatabaseAdapter, CacheMap};
37use index::CacheIndex;
38use nautilus_core::{
39    UUID4, UnixNanos,
40    correctness::{
41        check_key_not_in_map, check_predicate_false, check_slice_not_empty, check_valid_string,
42    },
43    datetime::secs_to_nanos,
44};
45use nautilus_model::{
46    accounts::{Account, AccountAny},
47    data::{
48        Bar, BarType, GreeksData, QuoteTick, TradeTick, YieldCurveData,
49        prices::{IndexPriceUpdate, MarkPriceUpdate},
50    },
51    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
52    identifiers::{
53        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
54        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
55    },
56    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
57    orderbook::{OrderBook, own::OwnOrderBook},
58    orders::{Order, OrderAny, OrderList},
59    position::Position,
60    types::{Currency, Money, Price, Quantity},
61};
62use ustr::Ustr;
63
64use crate::xrate::get_exchange_rate;
65
66/// A common in-memory `Cache` for market and execution related data.
67pub struct Cache {
68    config: CacheConfig,
69    index: CacheIndex,
70    database: Option<Box<dyn CacheDatabaseAdapter>>,
71    general: HashMap<String, Bytes>,
72    currencies: HashMap<Ustr, Currency>,
73    instruments: HashMap<InstrumentId, InstrumentAny>,
74    synthetics: HashMap<InstrumentId, SyntheticInstrument>,
75    books: HashMap<InstrumentId, OrderBook>,
76    own_books: HashMap<InstrumentId, OwnOrderBook>,
77    quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
78    trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
79    mark_xrates: HashMap<(Currency, Currency), f64>,
80    mark_prices: HashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
81    index_prices: HashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
82    bars: HashMap<BarType, VecDeque<Bar>>,
83    greeks: HashMap<InstrumentId, GreeksData>,
84    yield_curves: HashMap<String, YieldCurveData>,
85    accounts: HashMap<AccountId, AccountAny>,
86    orders: HashMap<ClientOrderId, OrderAny>,
87    order_lists: HashMap<OrderListId, OrderList>,
88    positions: HashMap<PositionId, Position>,
89    position_snapshots: HashMap<PositionId, Bytes>,
90}
91
92impl Debug for Cache {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct(stringify!(Cache))
95            .field("config", &self.config)
96            .field("index", &self.index)
97            .field("general", &self.general)
98            .field("currencies", &self.currencies)
99            .field("instruments", &self.instruments)
100            .field("synthetics", &self.synthetics)
101            .field("books", &self.books)
102            .field("own_books", &self.own_books)
103            .field("quotes", &self.quotes)
104            .field("trades", &self.trades)
105            .field("mark_xrates", &self.mark_xrates)
106            .field("mark_prices", &self.mark_prices)
107            .field("index_prices", &self.index_prices)
108            .field("bars", &self.bars)
109            .field("greeks", &self.greeks)
110            .field("yield_curves", &self.yield_curves)
111            .field("accounts", &self.accounts)
112            .field("orders", &self.orders)
113            .field("order_lists", &self.order_lists)
114            .field("positions", &self.positions)
115            .field("position_snapshots", &self.position_snapshots)
116            .finish()
117    }
118}
119
120impl Default for Cache {
121    /// Creates a new default [`Cache`] instance.
122    fn default() -> Self {
123        Self::new(Some(CacheConfig::default()), None)
124    }
125}
126
127impl Cache {
128    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
129    #[must_use]
130    /// # Note
131    ///
132    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
133    pub fn new(
134        config: Option<CacheConfig>,
135        database: Option<Box<dyn CacheDatabaseAdapter>>,
136    ) -> Self {
137        Self {
138            config: config.unwrap_or_default(),
139            index: CacheIndex::default(),
140            database,
141            general: HashMap::new(),
142            currencies: HashMap::new(),
143            instruments: HashMap::new(),
144            synthetics: HashMap::new(),
145            books: HashMap::new(),
146            own_books: HashMap::new(),
147            quotes: HashMap::new(),
148            trades: HashMap::new(),
149            mark_xrates: HashMap::new(),
150            mark_prices: HashMap::new(),
151            index_prices: HashMap::new(),
152            bars: HashMap::new(),
153            greeks: HashMap::new(),
154            yield_curves: HashMap::new(),
155            accounts: HashMap::new(),
156            orders: HashMap::new(),
157            order_lists: HashMap::new(),
158            positions: HashMap::new(),
159            position_snapshots: HashMap::new(),
160        }
161    }
162
163    /// Returns the cache instances memory address.
164    #[must_use]
165    pub fn memory_address(&self) -> String {
166        format!("{:?}", std::ptr::from_ref(self))
167    }
168
169    // -- COMMANDS --------------------------------------------------------------------------------
170
171    /// Clears and reloads general entries from the database into the cache.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if loading general cache data fails.
176    pub fn cache_general(&mut self) -> anyhow::Result<()> {
177        self.general = match &mut self.database {
178            Some(db) => db.load()?,
179            None => HashMap::new(),
180        };
181
182        log::info!(
183            "Cached {} general object(s) from database",
184            self.general.len()
185        );
186        Ok(())
187    }
188
189    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if loading all cache data fails.
194    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
195        let cache_map = match &self.database {
196            Some(db) => db.load_all().await?,
197            None => CacheMap::default(),
198        };
199
200        self.currencies = cache_map.currencies;
201        self.instruments = cache_map.instruments;
202        self.synthetics = cache_map.synthetics;
203        self.accounts = cache_map.accounts;
204        self.orders = cache_map.orders;
205        self.positions = cache_map.positions;
206        Ok(())
207    }
208
209    /// Clears and reloads the currency cache from the database.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if loading currencies cache fails.
214    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
215        self.currencies = match &mut self.database {
216            Some(db) => db.load_currencies().await?,
217            None => HashMap::new(),
218        };
219
220        log::info!("Cached {} currencies from database", self.general.len());
221        Ok(())
222    }
223
224    /// Clears and reloads the instrument cache from the database.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if loading instruments cache fails.
229    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
230        self.instruments = match &mut self.database {
231            Some(db) => db.load_instruments().await?,
232            None => HashMap::new(),
233        };
234
235        log::info!("Cached {} instruments from database", self.general.len());
236        Ok(())
237    }
238
239    /// Clears and reloads the synthetic instrument cache from the database.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if loading synthetic instruments cache fails.
244    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
245        self.synthetics = match &mut self.database {
246            Some(db) => db.load_synthetics().await?,
247            None => HashMap::new(),
248        };
249
250        log::info!(
251            "Cached {} synthetic instruments from database",
252            self.general.len()
253        );
254        Ok(())
255    }
256
257    /// Clears and reloads the account cache from the database.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if loading accounts cache fails.
262    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
263        self.accounts = match &mut self.database {
264            Some(db) => db.load_accounts().await?,
265            None => HashMap::new(),
266        };
267
268        log::info!(
269            "Cached {} synthetic instruments from database",
270            self.general.len()
271        );
272        Ok(())
273    }
274
275    /// Clears and reloads the order cache from the database.
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if loading orders cache fails.
280    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
281        self.orders = match &mut self.database {
282            Some(db) => db.load_orders().await?,
283            None => HashMap::new(),
284        };
285
286        log::info!("Cached {} orders from database", self.general.len());
287        Ok(())
288    }
289
290    /// Clears and reloads the position cache from the database.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if loading positions cache fails.
295    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
296        self.positions = match &mut self.database {
297            Some(db) => db.load_positions().await?,
298            None => HashMap::new(),
299        };
300
301        log::info!("Cached {} positions from database", self.general.len());
302        Ok(())
303    }
304
305    /// Clears the current cache index and re-build.
306    pub fn build_index(&mut self) {
307        log::debug!("Building index");
308
309        // Index accounts
310        for account_id in self.accounts.keys() {
311            self.index
312                .venue_account
313                .insert(account_id.get_issuer(), *account_id);
314        }
315
316        // Index orders
317        for (client_order_id, order) in &self.orders {
318            let instrument_id = order.instrument_id();
319            let venue = instrument_id.venue;
320            let strategy_id = order.strategy_id();
321
322            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
323            self.index
324                .venue_orders
325                .entry(venue)
326                .or_default()
327                .insert(*client_order_id);
328
329            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
330            if let Some(venue_order_id) = order.venue_order_id() {
331                self.index
332                    .venue_order_ids
333                    .insert(venue_order_id, *client_order_id);
334            }
335
336            // 3: Build index.order_position -> {ClientOrderId, PositionId}
337            if let Some(position_id) = order.position_id() {
338                self.index
339                    .order_position
340                    .insert(*client_order_id, position_id);
341            }
342
343            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
344            self.index
345                .order_strategy
346                .insert(*client_order_id, order.strategy_id());
347
348            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
349            self.index
350                .instrument_orders
351                .entry(instrument_id)
352                .or_default()
353                .insert(*client_order_id);
354
355            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
356            self.index
357                .strategy_orders
358                .entry(strategy_id)
359                .or_default()
360                .insert(*client_order_id);
361
362            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
363            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
364                self.index
365                    .exec_algorithm_orders
366                    .entry(exec_algorithm_id)
367                    .or_default()
368                    .insert(*client_order_id);
369            }
370
371            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
372            if let Some(exec_spawn_id) = order.exec_spawn_id() {
373                self.index
374                    .exec_spawn_orders
375                    .entry(exec_spawn_id)
376                    .or_default()
377                    .insert(*client_order_id);
378            }
379
380            // 9: Build index.orders -> {ClientOrderId}
381            self.index.orders.insert(*client_order_id);
382
383            // 10: Build index.orders_open -> {ClientOrderId}
384            if order.is_open() {
385                self.index.orders_open.insert(*client_order_id);
386            }
387
388            // 11: Build index.orders_closed -> {ClientOrderId}
389            if order.is_closed() {
390                self.index.orders_closed.insert(*client_order_id);
391            }
392
393            // 12: Build index.orders_emulated -> {ClientOrderId}
394            if let Some(emulation_trigger) = order.emulation_trigger() {
395                if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
396                    self.index.orders_emulated.insert(*client_order_id);
397                }
398            }
399
400            // 13: Build index.orders_inflight -> {ClientOrderId}
401            if order.is_inflight() {
402                self.index.orders_inflight.insert(*client_order_id);
403            }
404
405            // 14: Build index.strategies -> {StrategyId}
406            self.index.strategies.insert(strategy_id);
407
408            // 15: Build index.strategies -> {ExecAlgorithmId}
409            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
410                self.index.exec_algorithms.insert(exec_algorithm_id);
411            }
412        }
413
414        // Index positions
415        for (position_id, position) in &self.positions {
416            let instrument_id = position.instrument_id;
417            let venue = instrument_id.venue;
418            let strategy_id = position.strategy_id;
419
420            // 1: Build index.venue_positions -> {Venue, {PositionId}}
421            self.index
422                .venue_positions
423                .entry(venue)
424                .or_default()
425                .insert(*position_id);
426
427            // 2: Build index.position_strategy -> {PositionId, StrategyId}
428            self.index
429                .position_strategy
430                .insert(*position_id, position.strategy_id);
431
432            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
433            self.index
434                .position_orders
435                .entry(*position_id)
436                .or_default()
437                .extend(position.client_order_ids().into_iter());
438
439            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
440            self.index
441                .instrument_positions
442                .entry(instrument_id)
443                .or_default()
444                .insert(*position_id);
445
446            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
447            self.index
448                .strategy_positions
449                .entry(strategy_id)
450                .or_default()
451                .insert(*position_id);
452
453            // 6: Build index.positions -> {PositionId}
454            self.index.positions.insert(*position_id);
455
456            // 7: Build index.positions_open -> {PositionId}
457            if position.is_open() {
458                self.index.positions_open.insert(*position_id);
459            }
460
461            // 8: Build index.positions_closed -> {PositionId}
462            if position.is_closed() {
463                self.index.positions_closed.insert(*position_id);
464            }
465
466            // 9: Build index.strategies -> {StrategyId}
467            self.index.strategies.insert(strategy_id);
468        }
469    }
470
471    /// Returns whether the cache has a backing database.
472    #[must_use]
473    pub const fn has_backing(&self) -> bool {
474        self.config.database.is_some()
475    }
476
477    // Calculate the unrealized profit and loss (PnL) for a given position.
478    #[must_use]
479    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
480        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
481            quote
482        } else {
483            log::warn!(
484                "Cannot calculate unrealized PnL for {}, no quotes for {}",
485                position.id,
486                position.instrument_id
487            );
488            return None;
489        };
490
491        let last = match position.side {
492            PositionSide::Flat | PositionSide::NoPositionSide => {
493                return Some(Money::new(0.0, position.settlement_currency));
494            }
495            PositionSide::Long => quote.ask_price,
496            PositionSide::Short => quote.bid_price,
497        };
498
499        Some(position.unrealized_pnl(last))
500    }
501
502    /// Checks integrity of data within the cache.
503    ///
504    /// All data should be loaded from the database prior to this call.
505    /// If an error is found then a log error message will also be produced.
506    ///
507    /// # Panics
508    ///
509    /// Panics if failure calling system clock.
510    #[must_use]
511    pub fn check_integrity(&mut self) -> bool {
512        let mut error_count = 0;
513        let failure = "Integrity failure";
514
515        // Get current timestamp in microseconds
516        let timestamp_us = SystemTime::now()
517            .duration_since(UNIX_EPOCH)
518            .expect("Time went backwards")
519            .as_micros();
520
521        log::info!("Checking data integrity");
522
523        // Check object caches
524        for account_id in self.accounts.keys() {
525            if !self
526                .index
527                .venue_account
528                .contains_key(&account_id.get_issuer())
529            {
530                log::error!(
531                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
532                );
533                error_count += 1;
534            }
535        }
536
537        for (client_order_id, order) in &self.orders {
538            if !self.index.order_strategy.contains_key(client_order_id) {
539                log::error!(
540                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
541                );
542                error_count += 1;
543            }
544            if !self.index.orders.contains(client_order_id) {
545                log::error!(
546                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
547                );
548                error_count += 1;
549            }
550            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
551                log::error!(
552                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
553                );
554                error_count += 1;
555            }
556            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
557                log::error!(
558                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
559                );
560                error_count += 1;
561            }
562            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
563                log::error!(
564                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
565                );
566                error_count += 1;
567            }
568            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
569                if !self
570                    .index
571                    .exec_algorithm_orders
572                    .contains_key(&exec_algorithm_id)
573                {
574                    log::error!(
575                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
576                    );
577                    error_count += 1;
578                }
579                if order.exec_spawn_id().is_none()
580                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
581                {
582                    log::error!(
583                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
584                    );
585                    error_count += 1;
586                }
587            }
588        }
589
590        for (position_id, position) in &self.positions {
591            if !self.index.position_strategy.contains_key(position_id) {
592                log::error!(
593                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
594                );
595                error_count += 1;
596            }
597            if !self.index.position_orders.contains_key(position_id) {
598                log::error!(
599                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
600                );
601                error_count += 1;
602            }
603            if !self.index.positions.contains(position_id) {
604                log::error!(
605                    "{failure} in positions: {position_id} not found in `self.index.positions`",
606                );
607                error_count += 1;
608            }
609            if position.is_open() && !self.index.positions_open.contains(position_id) {
610                log::error!(
611                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
612                );
613                error_count += 1;
614            }
615            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
616                log::error!(
617                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
618                );
619                error_count += 1;
620            }
621        }
622
623        // Check indexes
624        for account_id in self.index.venue_account.values() {
625            if !self.accounts.contains_key(account_id) {
626                log::error!(
627                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
628                );
629                error_count += 1;
630            }
631        }
632
633        for client_order_id in self.index.venue_order_ids.values() {
634            if !self.orders.contains_key(client_order_id) {
635                log::error!(
636                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
637                );
638                error_count += 1;
639            }
640        }
641
642        for client_order_id in self.index.client_order_ids.keys() {
643            if !self.orders.contains_key(client_order_id) {
644                log::error!(
645                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
646                );
647                error_count += 1;
648            }
649        }
650
651        for client_order_id in self.index.order_position.keys() {
652            if !self.orders.contains_key(client_order_id) {
653                log::error!(
654                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
655                );
656                error_count += 1;
657            }
658        }
659
660        // Check indexes
661        for client_order_id in self.index.order_strategy.keys() {
662            if !self.orders.contains_key(client_order_id) {
663                log::error!(
664                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
665                );
666                error_count += 1;
667            }
668        }
669
670        for position_id in self.index.position_strategy.keys() {
671            if !self.positions.contains_key(position_id) {
672                log::error!(
673                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
674                );
675                error_count += 1;
676            }
677        }
678
679        for position_id in self.index.position_orders.keys() {
680            if !self.positions.contains_key(position_id) {
681                log::error!(
682                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
683                );
684                error_count += 1;
685            }
686        }
687
688        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
689            for client_order_id in client_order_ids {
690                if !self.orders.contains_key(client_order_id) {
691                    log::error!(
692                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
693                    );
694                    error_count += 1;
695                }
696            }
697        }
698
699        for instrument_id in self.index.instrument_positions.keys() {
700            if !self.index.instrument_orders.contains_key(instrument_id) {
701                log::error!(
702                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
703                );
704                error_count += 1;
705            }
706        }
707
708        for client_order_ids in self.index.strategy_orders.values() {
709            for client_order_id in client_order_ids {
710                if !self.orders.contains_key(client_order_id) {
711                    log::error!(
712                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
713                    );
714                    error_count += 1;
715                }
716            }
717        }
718
719        for position_ids in self.index.strategy_positions.values() {
720            for position_id in position_ids {
721                if !self.positions.contains_key(position_id) {
722                    log::error!(
723                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
724                    );
725                    error_count += 1;
726                }
727            }
728        }
729
730        for client_order_id in &self.index.orders {
731            if !self.orders.contains_key(client_order_id) {
732                log::error!(
733                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
734                );
735                error_count += 1;
736            }
737        }
738
739        for client_order_id in &self.index.orders_emulated {
740            if !self.orders.contains_key(client_order_id) {
741                log::error!(
742                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
743                );
744                error_count += 1;
745            }
746        }
747
748        for client_order_id in &self.index.orders_inflight {
749            if !self.orders.contains_key(client_order_id) {
750                log::error!(
751                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
752                );
753                error_count += 1;
754            }
755        }
756
757        for client_order_id in &self.index.orders_open {
758            if !self.orders.contains_key(client_order_id) {
759                log::error!(
760                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
761                );
762                error_count += 1;
763            }
764        }
765
766        for client_order_id in &self.index.orders_closed {
767            if !self.orders.contains_key(client_order_id) {
768                log::error!(
769                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
770                );
771                error_count += 1;
772            }
773        }
774
775        for position_id in &self.index.positions {
776            if !self.positions.contains_key(position_id) {
777                log::error!(
778                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
779                );
780                error_count += 1;
781            }
782        }
783
784        for position_id in &self.index.positions_open {
785            if !self.positions.contains_key(position_id) {
786                log::error!(
787                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
788                );
789                error_count += 1;
790            }
791        }
792
793        for position_id in &self.index.positions_closed {
794            if !self.positions.contains_key(position_id) {
795                log::error!(
796                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
797                );
798                error_count += 1;
799            }
800        }
801
802        for strategy_id in &self.index.strategies {
803            if !self.index.strategy_orders.contains_key(strategy_id) {
804                log::error!(
805                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
806                );
807                error_count += 1;
808            }
809        }
810
811        for exec_algorithm_id in &self.index.exec_algorithms {
812            if !self
813                .index
814                .exec_algorithm_orders
815                .contains_key(exec_algorithm_id)
816            {
817                log::error!(
818                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
819                );
820                error_count += 1;
821            }
822        }
823
824        let total_us = SystemTime::now()
825            .duration_since(UNIX_EPOCH)
826            .expect("Time went backwards")
827            .as_micros()
828            - timestamp_us;
829
830        if error_count == 0 {
831            log::info!("Integrity check passed in {total_us}μs");
832            true
833        } else {
834            log::error!(
835                "Integrity check failed with {error_count} error{} in {total_us}μs",
836                if error_count == 1 { "" } else { "s" },
837            );
838            false
839        }
840    }
841
842    /// Checks for any residual open state and log warnings if any are found.
843    ///
844    ///'Open state' is considered to be open orders and open positions.
845    #[must_use]
846    pub fn check_residuals(&self) -> bool {
847        log::debug!("Checking residuals");
848
849        let mut residuals = false;
850
851        // Check for any open orders
852        for order in self.orders_open(None, None, None, None) {
853            residuals = true;
854            log::warn!("Residual {order:?}");
855        }
856
857        // Check for any open positions
858        for position in self.positions_open(None, None, None, None) {
859            residuals = true;
860            log::warn!("Residual {position}");
861        }
862
863        residuals
864    }
865
866    /// Purges all closed orders from the cache that are older than the given buffer time.
867    ///
868    ///
869    /// Only orders that have been closed for at least this amount of time will be purged.
870    /// A value of 0 means purge all closed orders regardless of when they were closed.
871    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
872        log::debug!(
873            "Purging closed orders{}",
874            if buffer_secs > 0 {
875                format!(" with buffer_secs={buffer_secs}")
876            } else {
877                String::new()
878            }
879        );
880
881        let buffer_ns = secs_to_nanos(buffer_secs as f64);
882
883        for client_order_id in self.index.orders_closed.clone() {
884            if let Some(order) = self.orders.get(&client_order_id) {
885                if let Some(ts_closed) = order.ts_closed() {
886                    if ts_closed + buffer_ns <= ts_now {
887                        self.purge_order(client_order_id);
888                    }
889                }
890            }
891        }
892    }
893
894    /// Purges all closed positions from the cache that are older than the given buffer time.
895    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
896        log::debug!(
897            "Purging closed positions{}",
898            if buffer_secs > 0 {
899                format!(" with buffer_secs={buffer_secs}")
900            } else {
901                String::new()
902            }
903        );
904
905        let buffer_ns = secs_to_nanos(buffer_secs as f64);
906
907        for position_id in self.index.positions_closed.clone() {
908            if let Some(position) = self.positions.get(&position_id) {
909                if let Some(ts_closed) = position.ts_closed {
910                    if ts_closed + buffer_ns <= ts_now {
911                        self.purge_position(position_id);
912                    }
913                }
914            }
915        }
916    }
917
918    /// Purges the order with the given client order ID from the cache (if found).
919    ///
920    /// All `OrderFilled` events for the order will also be purged from any associated position.
921    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
922        // Purge events from associated position if exists
923        if let Some(position_id) = self.index.order_position.get(&client_order_id) {
924            if let Some(position) = self.positions.get_mut(position_id) {
925                position.purge_events_for_order(client_order_id);
926            }
927        }
928
929        if let Some(order) = self.orders.remove(&client_order_id) {
930            // Remove order from venue index
931            if let Some(venue_orders) = self
932                .index
933                .venue_orders
934                .get_mut(&order.instrument_id().venue)
935            {
936                venue_orders.remove(&client_order_id);
937            }
938
939            // Remove venue order ID index if exists
940            if let Some(venue_order_id) = order.venue_order_id() {
941                self.index.venue_order_ids.remove(&venue_order_id);
942            }
943
944            // Remove from instrument orders index
945            if let Some(instrument_orders) =
946                self.index.instrument_orders.get_mut(&order.instrument_id())
947            {
948                instrument_orders.remove(&client_order_id);
949            }
950
951            // Remove from position orders index if associated with a position
952            if let Some(position_id) = order.position_id() {
953                if let Some(position_orders) = self.index.position_orders.get_mut(&position_id) {
954                    position_orders.remove(&client_order_id);
955                }
956            }
957
958            // Remove from exec algorithm orders index if it has an exec algorithm
959            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
960                if let Some(exec_algorithm_orders) =
961                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
962                {
963                    exec_algorithm_orders.remove(&client_order_id);
964                }
965            }
966
967            log::info!("Purged order {client_order_id}");
968        } else {
969            log::warn!("Order {client_order_id} not found when purging");
970        }
971
972        // Remove from all other index collections regardless of whether order was found
973        self.index.order_position.remove(&client_order_id);
974        self.index.order_strategy.remove(&client_order_id);
975        self.index.order_client.remove(&client_order_id);
976        self.index.client_order_ids.remove(&client_order_id);
977        self.index.exec_spawn_orders.remove(&client_order_id);
978        self.index.orders.remove(&client_order_id);
979        self.index.orders_closed.remove(&client_order_id);
980        self.index.orders_emulated.remove(&client_order_id);
981        self.index.orders_inflight.remove(&client_order_id);
982        self.index.orders_pending_cancel.remove(&client_order_id);
983    }
984
985    /// Purges the position with the given position ID from the cache (if found).
986    pub fn purge_position(&mut self, position_id: PositionId) {
987        if let Some(position) = self.positions.remove(&position_id) {
988            // Remove from venue positions index
989            if let Some(venue_positions) = self
990                .index
991                .venue_positions
992                .get_mut(&position.instrument_id.venue)
993            {
994                venue_positions.remove(&position_id);
995            }
996
997            // Remove from instrument positions index
998            if let Some(instrument_positions) = self
999                .index
1000                .instrument_positions
1001                .get_mut(&position.instrument_id)
1002            {
1003                instrument_positions.remove(&position_id);
1004            }
1005
1006            // Remove from strategy positions index
1007            if let Some(strategy_positions) =
1008                self.index.strategy_positions.get_mut(&position.strategy_id)
1009            {
1010                strategy_positions.remove(&position_id);
1011            }
1012
1013            // Remove position ID from orders that reference it
1014            for client_order_id in position.client_order_ids() {
1015                self.index.order_position.remove(&client_order_id);
1016            }
1017
1018            log::info!("Purged position {position_id}");
1019        } else {
1020            log::warn!("Position {position_id} not found when purging");
1021        }
1022
1023        // Remove from all other index collections regardless of whether position was found
1024        self.index.position_strategy.remove(&position_id);
1025        self.index.position_orders.remove(&position_id);
1026        self.index.positions.remove(&position_id);
1027        self.index.positions_open.remove(&position_id);
1028        self.index.positions_closed.remove(&position_id);
1029    }
1030
1031    /// Purges all account state events which are outside the lookback window.
1032    ///
1033    /// Only events which are outside the lookback window will be purged.
1034    /// A value of 0 means purge all account state events.
1035    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1036        log::debug!(
1037            "Purging account events{}",
1038            if lookback_secs > 0 {
1039                format!(" with lookback_secs={lookback_secs}")
1040            } else {
1041                String::new()
1042            }
1043        );
1044
1045        for account in self.accounts.values_mut() {
1046            let event_count = account.event_count();
1047            account.purge_account_events(ts_now, lookback_secs);
1048            let count_diff = event_count - account.event_count();
1049            if count_diff > 0 {
1050                log::info!(
1051                    "Purged {} event(s) from account {}",
1052                    count_diff,
1053                    account.id()
1054                );
1055            }
1056        }
1057    }
1058
1059    /// Clears the caches index.
1060    pub fn clear_index(&mut self) {
1061        self.index.clear();
1062        log::debug!("Cleared index");
1063    }
1064
1065    /// Resets the cache.
1066    ///
1067    /// All stateful fields are reset to their initial value.
1068    pub fn reset(&mut self) {
1069        log::debug!("Resetting cache");
1070
1071        self.general.clear();
1072        self.currencies.clear();
1073        self.instruments.clear();
1074        self.synthetics.clear();
1075        self.books.clear();
1076        self.own_books.clear();
1077        self.quotes.clear();
1078        self.trades.clear();
1079        self.mark_xrates.clear();
1080        self.mark_prices.clear();
1081        self.index_prices.clear();
1082        self.bars.clear();
1083        self.accounts.clear();
1084        self.orders.clear();
1085        self.order_lists.clear();
1086        self.positions.clear();
1087        self.position_snapshots.clear();
1088        self.greeks.clear();
1089        self.yield_curves.clear();
1090
1091        self.clear_index();
1092
1093        log::info!("Reset cache");
1094    }
1095
1096    /// Dispose of the cache which will close any underlying database adapter.
1097    ///
1098    /// # Panics
1099    ///
1100    /// Panics if closing the database connection fails.
1101    pub fn dispose(&mut self) {
1102        if let Some(database) = &mut self.database {
1103            database.close().expect("Failed to close database");
1104        }
1105    }
1106
1107    /// Flushes the caches database which permanently removes all persisted data.
1108    ///
1109    /// # Panics
1110    ///
1111    /// Panics if flushing the database connection fails.
1112    pub fn flush_db(&mut self) {
1113        if let Some(database) = &mut self.database {
1114            database.flush().expect("Failed to flush database");
1115        }
1116    }
1117
1118    /// Adds a raw bytes entry to the cache under the given key.
1119    ///
1120    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1121    ///
1122    /// # Errors
1123    ///
1124    /// Returns an error if persisting the entry to the backing database fails.
1125    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1126        check_valid_string(key, stringify!(key))?;
1127        check_predicate_false(value.is_empty(), stringify!(value))?;
1128
1129        log::debug!("Adding general {key}");
1130        self.general.insert(key.to_string(), value.clone());
1131
1132        if let Some(database) = &mut self.database {
1133            database.add(key.to_string(), value)?;
1134        }
1135        Ok(())
1136    }
1137
1138    /// Adds an `OrderBook` to the cache.
1139    ///
1140    /// # Errors
1141    ///
1142    /// Returns an error if persisting the order book to the backing database fails.
1143    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1144        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1145
1146        if self.config.save_market_data {
1147            if let Some(database) = &mut self.database {
1148                database.add_order_book(&book)?;
1149            }
1150        }
1151
1152        self.books.insert(book.instrument_id, book);
1153        Ok(())
1154    }
1155
1156    /// Adds an `OwnOrderBook` to the cache.
1157    ///
1158    /// # Errors
1159    ///
1160    /// Returns an error if persisting the own order book fails.
1161    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1162        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1163
1164        self.own_books.insert(own_book.instrument_id, own_book);
1165        Ok(())
1166    }
1167
1168    /// Adds the given `mark_price` update for the given `instrument_id` to the cache.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if persisting the mark price to the backing database fails.
1173    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1174        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1175
1176        if self.config.save_market_data {
1177            // TODO: Placeholder and return Result for consistency
1178        }
1179
1180        let mark_prices_deque = self
1181            .mark_prices
1182            .entry(mark_price.instrument_id)
1183            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1184        mark_prices_deque.push_front(mark_price);
1185        Ok(())
1186    }
1187
1188    /// Adds the given `index_price` update for the given `instrument_id` to the cache.
1189    ///
1190    /// # Errors
1191    ///
1192    /// Returns an error if persisting the index price to the backing database fails.
1193    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1194        log::debug!(
1195            "Adding `IndexPriceUpdate` for {}",
1196            index_price.instrument_id
1197        );
1198
1199        if self.config.save_market_data {
1200            // TODO: Placeholder and return Result for consistency
1201        }
1202
1203        let index_prices_deque = self
1204            .index_prices
1205            .entry(index_price.instrument_id)
1206            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1207        index_prices_deque.push_front(index_price);
1208        Ok(())
1209    }
1210
1211    /// Adds the given `quote` tick to the cache.
1212    ///
1213    /// # Errors
1214    ///
1215    /// Returns an error if persisting the quote tick to the backing database fails.
1216    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1217        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1218
1219        if self.config.save_market_data {
1220            if let Some(database) = &mut self.database {
1221                database.add_quote(&quote)?;
1222            }
1223        }
1224
1225        let quotes_deque = self
1226            .quotes
1227            .entry(quote.instrument_id)
1228            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1229        quotes_deque.push_front(quote);
1230        Ok(())
1231    }
1232
1233    /// Adds the given `quotes` to the cache.
1234    ///
1235    /// # Errors
1236    ///
1237    /// Returns an error if persisting the quote ticks to the backing database fails.
1238    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1239        check_slice_not_empty(quotes, stringify!(quotes))?;
1240
1241        let instrument_id = quotes[0].instrument_id;
1242        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1243
1244        if self.config.save_market_data {
1245            if let Some(database) = &mut self.database {
1246                for quote in quotes {
1247                    database.add_quote(quote)?;
1248                }
1249            }
1250        }
1251
1252        let quotes_deque = self
1253            .quotes
1254            .entry(instrument_id)
1255            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1256
1257        for quote in quotes {
1258            quotes_deque.push_front(*quote);
1259        }
1260        Ok(())
1261    }
1262
1263    /// Adds the given `trade` tick to the cache.
1264    ///
1265    /// # Errors
1266    ///
1267    /// Returns an error if persisting the trade tick to the backing database fails.
1268    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1269        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1270
1271        if self.config.save_market_data {
1272            if let Some(database) = &mut self.database {
1273                database.add_trade(&trade)?;
1274            }
1275        }
1276
1277        let trades_deque = self
1278            .trades
1279            .entry(trade.instrument_id)
1280            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1281        trades_deque.push_front(trade);
1282        Ok(())
1283    }
1284
1285    /// Adds the give `trades` to the cache.
1286    ///
1287    /// # Errors
1288    ///
1289    /// Returns an error if persisting the trade ticks to the backing database fails.
1290    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1291        check_slice_not_empty(trades, stringify!(trades))?;
1292
1293        let instrument_id = trades[0].instrument_id;
1294        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1295
1296        if self.config.save_market_data {
1297            if let Some(database) = &mut self.database {
1298                for trade in trades {
1299                    database.add_trade(trade)?;
1300                }
1301            }
1302        }
1303
1304        let trades_deque = self
1305            .trades
1306            .entry(instrument_id)
1307            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1308
1309        for trade in trades {
1310            trades_deque.push_front(*trade);
1311        }
1312        Ok(())
1313    }
1314
1315    /// Adds the given `bar` to the cache.
1316    ///
1317    /// # Errors
1318    ///
1319    /// Returns an error if persisting the bar to the backing database fails.
1320    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1321        log::debug!("Adding `Bar` {}", bar.bar_type);
1322
1323        if self.config.save_market_data {
1324            if let Some(database) = &mut self.database {
1325                database.add_bar(&bar)?;
1326            }
1327        }
1328
1329        let bars = self
1330            .bars
1331            .entry(bar.bar_type)
1332            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1333        bars.push_front(bar);
1334        Ok(())
1335    }
1336
1337    /// Adds the given `bars` to the cache.
1338    ///
1339    /// # Errors
1340    ///
1341    /// Returns an error if persisting the bars to the backing database fails.
1342    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1343        check_slice_not_empty(bars, stringify!(bars))?;
1344
1345        let bar_type = bars[0].bar_type;
1346        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1347
1348        if self.config.save_market_data {
1349            if let Some(database) = &mut self.database {
1350                for bar in bars {
1351                    database.add_bar(bar)?;
1352                }
1353            }
1354        }
1355
1356        let bars_deque = self
1357            .bars
1358            .entry(bar_type)
1359            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1360
1361        for bar in bars {
1362            bars_deque.push_front(*bar);
1363        }
1364        Ok(())
1365    }
1366
1367    /// Adds the given `greeks` data to the cache.
1368    ///
1369    /// # Errors
1370    ///
1371    /// Returns an error if persisting the greeks data to the backing database fails.
1372    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1373        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1374
1375        if self.config.save_market_data {
1376            if let Some(_database) = &mut self.database {
1377                // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1378            }
1379        }
1380
1381        self.greeks.insert(greeks.instrument_id, greeks);
1382        Ok(())
1383    }
1384
1385    /// Gets the greeks data for the given instrument ID.
1386    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1387        self.greeks.get(instrument_id).cloned()
1388    }
1389
1390    /// Adds the given `yield_curve` data to the cache.
1391    ///
1392    /// # Errors
1393    ///
1394    /// Returns an error if persisting the yield curve data to the backing database fails.
1395    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1396        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1397
1398        if self.config.save_market_data {
1399            if let Some(_database) = &mut self.database {
1400                // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1401            }
1402        }
1403
1404        self.yield_curves
1405            .insert(yield_curve.curve_name.clone(), yield_curve);
1406        Ok(())
1407    }
1408
1409    /// Gets the yield curve for the given key.
1410    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1411        self.yield_curves.get(key).map(|curve| {
1412            let curve_clone = curve.clone();
1413            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1414                as Box<dyn Fn(f64) -> f64>
1415        })
1416    }
1417
1418    /// Adds the given `currency` to the cache.
1419    ///
1420    /// # Errors
1421    ///
1422    /// Returns an error if persisting the currency to the backing database fails.
1423    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1424        log::debug!("Adding `Currency` {}", currency.code);
1425
1426        if let Some(database) = &mut self.database {
1427            database.add_currency(&currency)?;
1428        }
1429
1430        self.currencies.insert(currency.code, currency);
1431        Ok(())
1432    }
1433
1434    /// Adds the given `instrument` to the cache.
1435    ///
1436    /// # Errors
1437    ///
1438    /// Returns an error if persisting the instrument to the backing database fails.
1439    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1440        log::debug!("Adding `Instrument` {}", instrument.id());
1441
1442        if let Some(database) = &mut self.database {
1443            database.add_instrument(&instrument)?;
1444        }
1445
1446        self.instruments.insert(instrument.id(), instrument);
1447        Ok(())
1448    }
1449
1450    /// Adds the given `synthetic` instrument to the cache.
1451    ///
1452    /// # Errors
1453    ///
1454    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1455    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1456        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1457
1458        if let Some(database) = &mut self.database {
1459            database.add_synthetic(&synthetic)?;
1460        }
1461
1462        self.synthetics.insert(synthetic.id, synthetic);
1463        Ok(())
1464    }
1465
1466    /// Adds the given `account` to the cache.
1467    ///
1468    /// # Errors
1469    ///
1470    /// Returns an error if persisting the account to the backing database fails.
1471    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1472        log::debug!("Adding `Account` {}", account.id());
1473
1474        if let Some(database) = &mut self.database {
1475            database.add_account(&account)?;
1476        }
1477
1478        let account_id = account.id();
1479        self.accounts.insert(account_id, account);
1480        self.index
1481            .venue_account
1482            .insert(account_id.get_issuer(), account_id);
1483        Ok(())
1484    }
1485
1486    /// Indexes the given `client_order_id` with the given `venue_order_id`.
1487    ///
1488    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1489    ///
1490    /// # Errors
1491    ///
1492    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1493    pub fn add_venue_order_id(
1494        &mut self,
1495        client_order_id: &ClientOrderId,
1496        venue_order_id: &VenueOrderId,
1497        overwrite: bool,
1498    ) -> anyhow::Result<()> {
1499        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1500            if !overwrite && existing_venue_order_id != venue_order_id {
1501                anyhow::bail!(
1502                    "Existing {existing_venue_order_id} for {client_order_id}
1503                    did not match the given {venue_order_id}.
1504                    If you are writing a test then try a different `venue_order_id`,
1505                    otherwise this is probably a bug."
1506                );
1507            }
1508        }
1509
1510        self.index
1511            .client_order_ids
1512            .insert(*client_order_id, *venue_order_id);
1513        self.index
1514            .venue_order_ids
1515            .insert(*venue_order_id, *client_order_id);
1516
1517        Ok(())
1518    }
1519
1520    /// Adds the given `order` to the cache indexed with any given identifiers.
1521    ///
1522    /// # Parameters
1523    ///
1524    /// `override_existing`: If the added order should 'override' any existing order and replace
1525    /// it in the cache. This is currently used for emulated orders which are
1526    /// being released and transformed into another type.
1527    ///
1528    /// # Errors
1529    ///
1530    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1531    pub fn add_order(
1532        &mut self,
1533        order: OrderAny,
1534        position_id: Option<PositionId>,
1535        client_id: Option<ClientId>,
1536        replace_existing: bool,
1537    ) -> anyhow::Result<()> {
1538        let instrument_id = order.instrument_id();
1539        let venue = instrument_id.venue;
1540        let client_order_id = order.client_order_id();
1541        let strategy_id = order.strategy_id();
1542        let exec_algorithm_id = order.exec_algorithm_id();
1543        let exec_spawn_id = order.exec_spawn_id();
1544
1545        if !replace_existing {
1546            check_key_not_in_map(
1547                &client_order_id,
1548                &self.orders,
1549                stringify!(client_order_id),
1550                stringify!(orders),
1551            )?;
1552        }
1553
1554        log::debug!("Adding {order:?}");
1555
1556        self.index.orders.insert(client_order_id);
1557        self.index
1558            .order_strategy
1559            .insert(client_order_id, strategy_id);
1560        self.index.strategies.insert(strategy_id);
1561
1562        // Update venue -> orders index
1563        self.index
1564            .venue_orders
1565            .entry(venue)
1566            .or_default()
1567            .insert(client_order_id);
1568
1569        // Update instrument -> orders index
1570        self.index
1571            .instrument_orders
1572            .entry(instrument_id)
1573            .or_default()
1574            .insert(client_order_id);
1575
1576        // Update strategy -> orders index
1577        self.index
1578            .strategy_orders
1579            .entry(strategy_id)
1580            .or_default()
1581            .insert(client_order_id);
1582
1583        // Update exec_algorithm -> orders index
1584        // Update exec_algorithm -> orders index
1585        if let (Some(exec_algorithm_id), Some(exec_spawn_id)) = (exec_algorithm_id, exec_spawn_id) {
1586            self.index.exec_algorithms.insert(exec_algorithm_id);
1587
1588            self.index
1589                .exec_algorithm_orders
1590                .entry(exec_algorithm_id)
1591                .or_default()
1592                .insert(client_order_id);
1593
1594            self.index
1595                .exec_spawn_orders
1596                .entry(exec_spawn_id)
1597                .or_default()
1598                .insert(client_order_id);
1599        }
1600
1601        // Update emulation index
1602        match order.emulation_trigger() {
1603            Some(_) => {
1604                self.index.orders_emulated.remove(&client_order_id);
1605            }
1606            None => {
1607                self.index.orders_emulated.insert(client_order_id);
1608            }
1609        }
1610
1611        // Index position ID if provided
1612        if let Some(position_id) = position_id {
1613            self.add_position_id(
1614                &position_id,
1615                &order.instrument_id().venue,
1616                &client_order_id,
1617                &strategy_id,
1618            )?;
1619        }
1620
1621        // Index client ID if provided
1622        if let Some(client_id) = client_id {
1623            self.index.order_client.insert(client_order_id, client_id);
1624            log::debug!("Indexed {client_id:?}");
1625        }
1626
1627        if let Some(database) = &mut self.database {
1628            database.add_order(&order, client_id)?;
1629            // TODO: Implement
1630            // if self.config.snapshot_orders {
1631            //     database.snapshot_order_state(order)?;
1632            // }
1633        }
1634
1635        self.orders.insert(client_order_id, order);
1636
1637        Ok(())
1638    }
1639
1640    /// Indexes the given `position_id` with the other given IDs.
1641    ///
1642    /// # Errors
1643    ///
1644    /// Returns an error if indexing position ID in the backing database fails.
1645    pub fn add_position_id(
1646        &mut self,
1647        position_id: &PositionId,
1648        venue: &Venue,
1649        client_order_id: &ClientOrderId,
1650        strategy_id: &StrategyId,
1651    ) -> anyhow::Result<()> {
1652        self.index
1653            .order_position
1654            .insert(*client_order_id, *position_id);
1655
1656        // Index: ClientOrderId -> PositionId
1657        if let Some(database) = &mut self.database {
1658            database.index_order_position(*client_order_id, *position_id)?;
1659        }
1660
1661        // Index: PositionId -> StrategyId
1662        self.index
1663            .position_strategy
1664            .insert(*position_id, *strategy_id);
1665
1666        // Index: PositionId -> set[ClientOrderId]
1667        self.index
1668            .position_orders
1669            .entry(*position_id)
1670            .or_default()
1671            .insert(*client_order_id);
1672
1673        // Index: StrategyId -> set[PositionId]
1674        self.index
1675            .strategy_positions
1676            .entry(*strategy_id)
1677            .or_default()
1678            .insert(*position_id);
1679
1680        // Index: Venue -> set[PositionId]
1681        self.index
1682            .venue_positions
1683            .entry(*venue)
1684            .or_default()
1685            .insert(*position_id);
1686
1687        Ok(())
1688    }
1689
1690    /// Adds the given `position` to the cache.
1691    ///
1692    /// # Errors
1693    ///
1694    /// Returns an error if persisting the position to the backing database fails.
1695    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1696        self.positions.insert(position.id, position.clone());
1697        self.index.positions.insert(position.id);
1698        self.index.positions_open.insert(position.id);
1699
1700        log::debug!("Adding {position}");
1701
1702        self.add_position_id(
1703            &position.id,
1704            &position.instrument_id.venue,
1705            &position.opening_order_id,
1706            &position.strategy_id,
1707        )?;
1708
1709        let venue = position.instrument_id.venue;
1710        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1711        venue_positions.insert(position.id);
1712
1713        // Index: InstrumentId -> HashSet
1714        let instrument_id = position.instrument_id;
1715        let instrument_positions = self
1716            .index
1717            .instrument_positions
1718            .entry(instrument_id)
1719            .or_default();
1720        instrument_positions.insert(position.id);
1721
1722        if let Some(database) = &mut self.database {
1723            database.add_position(&position)?;
1724            // TODO: Implement position snapshots
1725            // if self.snapshot_positions {
1726            //     database.snapshot_position_state(
1727            //         position,
1728            //         position.ts_last,
1729            //         self.calculate_unrealized_pnl(&position),
1730            //     )?;
1731            // }
1732        }
1733
1734        Ok(())
1735    }
1736
1737    /// Updates the given `account` in the cache.
1738    ///
1739    /// # Errors
1740    ///
1741    /// Returns an error if updating the account in the database fails.
1742    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1743        if let Some(database) = &mut self.database {
1744            database.update_account(&account)?;
1745        }
1746        Ok(())
1747    }
1748
1749    /// Updates the given `order` in the cache.
1750    ///
1751    /// # Errors
1752    ///
1753    /// Returns an error if updating the order in the database fails.
1754    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1755        let client_order_id = order.client_order_id();
1756
1757        // Update venue order ID
1758        if let Some(venue_order_id) = order.venue_order_id() {
1759            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1760            // venues which use a cancel+replace update strategy.
1761            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1762                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1763                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1764            }
1765        }
1766
1767        // Update in-flight state
1768        if order.is_inflight() {
1769            self.index.orders_inflight.insert(client_order_id);
1770        } else {
1771            self.index.orders_inflight.remove(&client_order_id);
1772        }
1773
1774        // Update open/closed state
1775        if order.is_open() {
1776            self.index.orders_closed.remove(&client_order_id);
1777            self.index.orders_open.insert(client_order_id);
1778        } else if order.is_closed() {
1779            self.index.orders_open.remove(&client_order_id);
1780            self.index.orders_pending_cancel.remove(&client_order_id);
1781            self.index.orders_closed.insert(client_order_id);
1782        }
1783
1784        // Update emulation
1785        if let Some(emulation_trigger) = order.emulation_trigger() {
1786            match emulation_trigger {
1787                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1788                _ => self.index.orders_emulated.insert(client_order_id),
1789            };
1790        }
1791
1792        if let Some(database) = &mut self.database {
1793            database.update_order(order.last_event())?;
1794            // TODO: Implement order snapshots
1795            // if self.snapshot_orders {
1796            //     database.snapshot_order_state(order)?;
1797            // }
1798        }
1799
1800        // update the order in the cache
1801        self.orders.insert(client_order_id, order.clone());
1802
1803        Ok(())
1804    }
1805
1806    /// Updates the given `order` as pending cancel locally.
1807    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1808        self.index
1809            .orders_pending_cancel
1810            .insert(order.client_order_id());
1811    }
1812
1813    /// Updates the given `position` in the cache.
1814    ///
1815    /// # Errors
1816    ///
1817    /// Returns an error if updating the position in the database fails.
1818    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1819        // Update open/closed state
1820        if position.is_open() {
1821            self.index.positions_open.insert(position.id);
1822            self.index.positions_closed.remove(&position.id);
1823        } else {
1824            self.index.positions_closed.insert(position.id);
1825            self.index.positions_open.remove(&position.id);
1826        }
1827
1828        if let Some(database) = &mut self.database {
1829            database.update_position(position)?;
1830            // TODO: Implement order snapshots
1831            // if self.snapshot_orders {
1832            //     database.snapshot_order_state(order)?;
1833            // }
1834        }
1835        Ok(())
1836    }
1837
1838    /// Creates a snapshot of the given position by cloning it, assigning a new ID,
1839    /// serializing it, and storing it in the position snapshots.
1840    ///
1841    /// # Errors
1842    ///
1843    /// Returns an error if serializing or storing the position snapshot fails.
1844    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1845        let position_id = position.id;
1846
1847        let mut copied_position = position.clone();
1848        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1849        copied_position.id = PositionId::new(new_id);
1850
1851        // Serialize the position (TODO: temporily just to JSON to remove a dependency)
1852        let position_serialized = serde_json::to_vec(&copied_position)?;
1853
1854        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1855        let new_snapshots = match snapshots {
1856            Some(existing_snapshots) => {
1857                let mut combined = existing_snapshots.to_vec();
1858                combined.extend(position_serialized);
1859                Bytes::from(combined)
1860            }
1861            None => Bytes::from(position_serialized),
1862        };
1863        self.position_snapshots.insert(position_id, new_snapshots);
1864
1865        log::debug!("Snapshot {}", copied_position);
1866        Ok(())
1867    }
1868
1869    /// Creates a snapshot of the given position state in the database.
1870    ///
1871    /// # Errors
1872    ///
1873    /// Returns an error if snapshotting the position state fails.
1874    pub fn snapshot_position_state(
1875        &mut self,
1876        position: &Position,
1877        // ts_snapshot: u64,
1878        // unrealized_pnl: Option<Money>,
1879        open_only: Option<bool>,
1880    ) -> anyhow::Result<()> {
1881        let open_only = open_only.unwrap_or(true);
1882
1883        if open_only && !position.is_open() {
1884            return Ok(());
1885        }
1886
1887        if let Some(database) = &mut self.database {
1888            database.snapshot_position_state(position).map_err(|e| {
1889                log::error!(
1890                    "Failed to snapshot position state for {}: {e:?}",
1891                    position.id
1892                );
1893                e
1894            })?;
1895        } else {
1896            log::warn!(
1897                "Cannot snapshot position state for {} (no database configured)",
1898                position.id
1899            );
1900        }
1901
1902        // Ok(())
1903        todo!()
1904    }
1905
1906    /// Snapshots the given order state in the database.
1907    ///
1908    /// # Errors
1909    ///
1910    /// Returns an error if snapshotting the order state fails.
1911    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1912        let database = if let Some(database) = &self.database {
1913            database
1914        } else {
1915            log::warn!(
1916                "Cannot snapshot order state for {} (no database configured)",
1917                order.client_order_id()
1918            );
1919            return Ok(());
1920        };
1921
1922        database.snapshot_order_state(order)
1923    }
1924
1925    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
1926
1927    fn build_order_query_filter_set(
1928        &self,
1929        venue: Option<&Venue>,
1930        instrument_id: Option<&InstrumentId>,
1931        strategy_id: Option<&StrategyId>,
1932    ) -> Option<HashSet<ClientOrderId>> {
1933        let mut query: Option<HashSet<ClientOrderId>> = None;
1934
1935        if let Some(venue) = venue {
1936            query = Some(
1937                self.index
1938                    .venue_orders
1939                    .get(venue)
1940                    .cloned()
1941                    .unwrap_or_default(),
1942            );
1943        }
1944
1945        if let Some(instrument_id) = instrument_id {
1946            let instrument_orders = self
1947                .index
1948                .instrument_orders
1949                .get(instrument_id)
1950                .cloned()
1951                .unwrap_or_default();
1952
1953            if let Some(existing_query) = &mut query {
1954                *existing_query = existing_query
1955                    .intersection(&instrument_orders)
1956                    .copied()
1957                    .collect();
1958            } else {
1959                query = Some(instrument_orders);
1960            }
1961        }
1962
1963        if let Some(strategy_id) = strategy_id {
1964            let strategy_orders = self
1965                .index
1966                .strategy_orders
1967                .get(strategy_id)
1968                .cloned()
1969                .unwrap_or_default();
1970
1971            if let Some(existing_query) = &mut query {
1972                *existing_query = existing_query
1973                    .intersection(&strategy_orders)
1974                    .copied()
1975                    .collect();
1976            } else {
1977                query = Some(strategy_orders);
1978            }
1979        }
1980
1981        query
1982    }
1983
1984    fn build_position_query_filter_set(
1985        &self,
1986        venue: Option<&Venue>,
1987        instrument_id: Option<&InstrumentId>,
1988        strategy_id: Option<&StrategyId>,
1989    ) -> Option<HashSet<PositionId>> {
1990        let mut query: Option<HashSet<PositionId>> = None;
1991
1992        if let Some(venue) = venue {
1993            query = Some(
1994                self.index
1995                    .venue_positions
1996                    .get(venue)
1997                    .cloned()
1998                    .unwrap_or_default(),
1999            );
2000        }
2001
2002        if let Some(instrument_id) = instrument_id {
2003            let instrument_positions = self
2004                .index
2005                .instrument_positions
2006                .get(instrument_id)
2007                .cloned()
2008                .unwrap_or_default();
2009
2010            if let Some(existing_query) = query {
2011                query = Some(
2012                    existing_query
2013                        .intersection(&instrument_positions)
2014                        .copied()
2015                        .collect(),
2016                );
2017            } else {
2018                query = Some(instrument_positions);
2019            }
2020        }
2021
2022        if let Some(strategy_id) = strategy_id {
2023            let strategy_positions = self
2024                .index
2025                .strategy_positions
2026                .get(strategy_id)
2027                .cloned()
2028                .unwrap_or_default();
2029
2030            if let Some(existing_query) = query {
2031                query = Some(
2032                    existing_query
2033                        .intersection(&strategy_positions)
2034                        .copied()
2035                        .collect(),
2036                );
2037            } else {
2038                query = Some(strategy_positions);
2039            }
2040        }
2041
2042        query
2043    }
2044
2045    /// Retrieves orders corresponding to the given `client_order_ids`, optionally filtering by side.
2046    ///
2047    /// # Panics
2048    ///
2049    /// Panics if any `client_order_id` in the set is not found in the cache.
2050    fn get_orders_for_ids(
2051        &self,
2052        client_order_ids: &HashSet<ClientOrderId>,
2053        side: Option<OrderSide>,
2054    ) -> Vec<&OrderAny> {
2055        let side = side.unwrap_or(OrderSide::NoOrderSide);
2056        let mut orders = Vec::new();
2057
2058        for client_order_id in client_order_ids {
2059            let order = self
2060                .orders
2061                .get(client_order_id)
2062                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2063            if side == OrderSide::NoOrderSide || side == order.order_side() {
2064                orders.push(order);
2065            }
2066        }
2067
2068        orders
2069    }
2070
2071    /// Retrieves positions corresponding to the given `position_ids`, optionally filtering by side.
2072    ///
2073    /// # Panics
2074    ///
2075    /// Panics if any `position_id` in the set is not found in the cache.
2076    fn get_positions_for_ids(
2077        &self,
2078        position_ids: &HashSet<PositionId>,
2079        side: Option<PositionSide>,
2080    ) -> Vec<&Position> {
2081        let side = side.unwrap_or(PositionSide::NoPositionSide);
2082        let mut positions = Vec::new();
2083
2084        for position_id in position_ids {
2085            let position = self
2086                .positions
2087                .get(position_id)
2088                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2089            if side == PositionSide::NoPositionSide || side == position.side {
2090                positions.push(position);
2091            }
2092        }
2093
2094        positions
2095    }
2096
2097    /// Returns the `ClientOrderId`s of all orders.
2098    #[must_use]
2099    pub fn client_order_ids(
2100        &self,
2101        venue: Option<&Venue>,
2102        instrument_id: Option<&InstrumentId>,
2103        strategy_id: Option<&StrategyId>,
2104    ) -> HashSet<ClientOrderId> {
2105        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2106        match query {
2107            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2108            None => self.index.orders.clone(),
2109        }
2110    }
2111
2112    /// Returns the `ClientOrderId`s of all open orders.
2113    #[must_use]
2114    pub fn client_order_ids_open(
2115        &self,
2116        venue: Option<&Venue>,
2117        instrument_id: Option<&InstrumentId>,
2118        strategy_id: Option<&StrategyId>,
2119    ) -> HashSet<ClientOrderId> {
2120        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2121        match query {
2122            Some(query) => self
2123                .index
2124                .orders_open
2125                .intersection(&query)
2126                .copied()
2127                .collect(),
2128            None => self.index.orders_open.clone(),
2129        }
2130    }
2131
2132    /// Returns the `ClientOrderId`s of all closed orders.
2133    #[must_use]
2134    pub fn client_order_ids_closed(
2135        &self,
2136        venue: Option<&Venue>,
2137        instrument_id: Option<&InstrumentId>,
2138        strategy_id: Option<&StrategyId>,
2139    ) -> HashSet<ClientOrderId> {
2140        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2141        match query {
2142            Some(query) => self
2143                .index
2144                .orders_closed
2145                .intersection(&query)
2146                .copied()
2147                .collect(),
2148            None => self.index.orders_closed.clone(),
2149        }
2150    }
2151
2152    /// Returns the `ClientOrderId`s of all emulated orders.
2153    #[must_use]
2154    pub fn client_order_ids_emulated(
2155        &self,
2156        venue: Option<&Venue>,
2157        instrument_id: Option<&InstrumentId>,
2158        strategy_id: Option<&StrategyId>,
2159    ) -> HashSet<ClientOrderId> {
2160        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2161        match query {
2162            Some(query) => self
2163                .index
2164                .orders_emulated
2165                .intersection(&query)
2166                .copied()
2167                .collect(),
2168            None => self.index.orders_emulated.clone(),
2169        }
2170    }
2171
2172    /// Returns the `ClientOrderId`s of all in-flight orders.
2173    #[must_use]
2174    pub fn client_order_ids_inflight(
2175        &self,
2176        venue: Option<&Venue>,
2177        instrument_id: Option<&InstrumentId>,
2178        strategy_id: Option<&StrategyId>,
2179    ) -> HashSet<ClientOrderId> {
2180        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2181        match query {
2182            Some(query) => self
2183                .index
2184                .orders_inflight
2185                .intersection(&query)
2186                .copied()
2187                .collect(),
2188            None => self.index.orders_inflight.clone(),
2189        }
2190    }
2191
2192    /// Returns `PositionId`s of all positions.
2193    #[must_use]
2194    pub fn position_ids(
2195        &self,
2196        venue: Option<&Venue>,
2197        instrument_id: Option<&InstrumentId>,
2198        strategy_id: Option<&StrategyId>,
2199    ) -> HashSet<PositionId> {
2200        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2201        match query {
2202            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2203            None => self.index.positions.clone(),
2204        }
2205    }
2206
2207    /// Returns the `PositionId`s of all open positions.
2208    #[must_use]
2209    pub fn position_open_ids(
2210        &self,
2211        venue: Option<&Venue>,
2212        instrument_id: Option<&InstrumentId>,
2213        strategy_id: Option<&StrategyId>,
2214    ) -> HashSet<PositionId> {
2215        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2216        match query {
2217            Some(query) => self
2218                .index
2219                .positions_open
2220                .intersection(&query)
2221                .copied()
2222                .collect(),
2223            None => self.index.positions_open.clone(),
2224        }
2225    }
2226
2227    /// Returns the `PositionId`s of all closed positions.
2228    #[must_use]
2229    pub fn position_closed_ids(
2230        &self,
2231        venue: Option<&Venue>,
2232        instrument_id: Option<&InstrumentId>,
2233        strategy_id: Option<&StrategyId>,
2234    ) -> HashSet<PositionId> {
2235        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2236        match query {
2237            Some(query) => self
2238                .index
2239                .positions_closed
2240                .intersection(&query)
2241                .copied()
2242                .collect(),
2243            None => self.index.positions_closed.clone(),
2244        }
2245    }
2246
2247    /// Returns the `ComponentId`s of all actors.
2248    #[must_use]
2249    pub fn actor_ids(&self) -> HashSet<ComponentId> {
2250        self.index.actors.clone()
2251    }
2252
2253    /// Returns the `StrategyId`s of all strategies.
2254    #[must_use]
2255    pub fn strategy_ids(&self) -> HashSet<StrategyId> {
2256        self.index.strategies.clone()
2257    }
2258
2259    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2260    #[must_use]
2261    pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
2262        self.index.exec_algorithms.clone()
2263    }
2264
2265    // -- ORDER QUERIES ---------------------------------------------------------------------------
2266
2267    /// Gets a reference to the order with the given `client_order_id` (if found).
2268    #[must_use]
2269    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2270        self.orders.get(client_order_id)
2271    }
2272
2273    /// Gets a reference to the order with the given `client_order_id` (if found).
2274    #[must_use]
2275    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2276        self.orders.get_mut(client_order_id)
2277    }
2278
2279    /// Gets a reference to the client order ID for given `venue_order_id` (if found).
2280    #[must_use]
2281    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2282        self.index.venue_order_ids.get(venue_order_id)
2283    }
2284
2285    /// Gets a reference to the venue order ID for given `client_order_id` (if found).
2286    #[must_use]
2287    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2288        self.index.client_order_ids.get(client_order_id)
2289    }
2290
2291    /// Gets a reference to the client ID indexed for given `client_order_id` (if found).
2292    #[must_use]
2293    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2294        self.index.order_client.get(client_order_id)
2295    }
2296
2297    /// Returns references to all orders matching the given optional filter parameters.
2298    #[must_use]
2299    pub fn orders(
2300        &self,
2301        venue: Option<&Venue>,
2302        instrument_id: Option<&InstrumentId>,
2303        strategy_id: Option<&StrategyId>,
2304        side: Option<OrderSide>,
2305    ) -> Vec<&OrderAny> {
2306        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2307        self.get_orders_for_ids(&client_order_ids, side)
2308    }
2309
2310    /// Returns references to all open orders matching the given optional filter parameters.
2311    #[must_use]
2312    pub fn orders_open(
2313        &self,
2314        venue: Option<&Venue>,
2315        instrument_id: Option<&InstrumentId>,
2316        strategy_id: Option<&StrategyId>,
2317        side: Option<OrderSide>,
2318    ) -> Vec<&OrderAny> {
2319        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2320        self.get_orders_for_ids(&client_order_ids, side)
2321    }
2322
2323    /// Returns references to all closed orders matching the given optional filter parameters.
2324    #[must_use]
2325    pub fn orders_closed(
2326        &self,
2327        venue: Option<&Venue>,
2328        instrument_id: Option<&InstrumentId>,
2329        strategy_id: Option<&StrategyId>,
2330        side: Option<OrderSide>,
2331    ) -> Vec<&OrderAny> {
2332        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2333        self.get_orders_for_ids(&client_order_ids, side)
2334    }
2335
2336    /// Returns references to all emulated orders matching the given optional filter parameters.
2337    #[must_use]
2338    pub fn orders_emulated(
2339        &self,
2340        venue: Option<&Venue>,
2341        instrument_id: Option<&InstrumentId>,
2342        strategy_id: Option<&StrategyId>,
2343        side: Option<OrderSide>,
2344    ) -> Vec<&OrderAny> {
2345        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2346        self.get_orders_for_ids(&client_order_ids, side)
2347    }
2348
2349    /// Returns references to all in-flight orders matching the given optional filter parameters.
2350    #[must_use]
2351    pub fn orders_inflight(
2352        &self,
2353        venue: Option<&Venue>,
2354        instrument_id: Option<&InstrumentId>,
2355        strategy_id: Option<&StrategyId>,
2356        side: Option<OrderSide>,
2357    ) -> Vec<&OrderAny> {
2358        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2359        self.get_orders_for_ids(&client_order_ids, side)
2360    }
2361
2362    /// Returns references to all orders for the given `position_id`.
2363    #[must_use]
2364    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2365        let client_order_ids = self.index.position_orders.get(position_id);
2366        match client_order_ids {
2367            Some(client_order_ids) => {
2368                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2369            }
2370            None => Vec::new(),
2371        }
2372    }
2373
2374    /// Returns whether an order with the given `client_order_id` exists.
2375    #[must_use]
2376    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2377        self.index.orders.contains(client_order_id)
2378    }
2379
2380    /// Returns whether an order with the given `client_order_id` is open.
2381    #[must_use]
2382    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2383        self.index.orders_open.contains(client_order_id)
2384    }
2385
2386    /// Returns whether an order with the given `client_order_id` is closed.
2387    #[must_use]
2388    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2389        self.index.orders_closed.contains(client_order_id)
2390    }
2391
2392    /// Returns whether an order with the given `client_order_id` is emulated.
2393    #[must_use]
2394    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2395        self.index.orders_emulated.contains(client_order_id)
2396    }
2397
2398    /// Returns whether an order with the given `client_order_id` is in-flight.
2399    #[must_use]
2400    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2401        self.index.orders_inflight.contains(client_order_id)
2402    }
2403
2404    /// Returns whether an order with the given `client_order_id` is `PENDING_CANCEL` locally.
2405    #[must_use]
2406    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2407        self.index.orders_pending_cancel.contains(client_order_id)
2408    }
2409
2410    /// Returns the count of all open orders.
2411    #[must_use]
2412    pub fn orders_open_count(
2413        &self,
2414        venue: Option<&Venue>,
2415        instrument_id: Option<&InstrumentId>,
2416        strategy_id: Option<&StrategyId>,
2417        side: Option<OrderSide>,
2418    ) -> usize {
2419        self.orders_open(venue, instrument_id, strategy_id, side)
2420            .len()
2421    }
2422
2423    /// Returns the count of all closed orders.
2424    #[must_use]
2425    pub fn orders_closed_count(
2426        &self,
2427        venue: Option<&Venue>,
2428        instrument_id: Option<&InstrumentId>,
2429        strategy_id: Option<&StrategyId>,
2430        side: Option<OrderSide>,
2431    ) -> usize {
2432        self.orders_closed(venue, instrument_id, strategy_id, side)
2433            .len()
2434    }
2435
2436    /// Returns the count of all emulated orders.
2437    #[must_use]
2438    pub fn orders_emulated_count(
2439        &self,
2440        venue: Option<&Venue>,
2441        instrument_id: Option<&InstrumentId>,
2442        strategy_id: Option<&StrategyId>,
2443        side: Option<OrderSide>,
2444    ) -> usize {
2445        self.orders_emulated(venue, instrument_id, strategy_id, side)
2446            .len()
2447    }
2448
2449    /// Returns the count of all in-flight orders.
2450    #[must_use]
2451    pub fn orders_inflight_count(
2452        &self,
2453        venue: Option<&Venue>,
2454        instrument_id: Option<&InstrumentId>,
2455        strategy_id: Option<&StrategyId>,
2456        side: Option<OrderSide>,
2457    ) -> usize {
2458        self.orders_inflight(venue, instrument_id, strategy_id, side)
2459            .len()
2460    }
2461
2462    /// Returns the count of all orders.
2463    #[must_use]
2464    pub fn orders_total_count(
2465        &self,
2466        venue: Option<&Venue>,
2467        instrument_id: Option<&InstrumentId>,
2468        strategy_id: Option<&StrategyId>,
2469        side: Option<OrderSide>,
2470    ) -> usize {
2471        self.orders(venue, instrument_id, strategy_id, side).len()
2472    }
2473
2474    /// Returns the order list for the given `order_list_id`.
2475    #[must_use]
2476    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2477        self.order_lists.get(order_list_id)
2478    }
2479
2480    /// Returns all order lists matching the given optional filter parameters.
2481    #[must_use]
2482    pub fn order_lists(
2483        &self,
2484        venue: Option<&Venue>,
2485        instrument_id: Option<&InstrumentId>,
2486        strategy_id: Option<&StrategyId>,
2487    ) -> Vec<&OrderList> {
2488        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2489
2490        if let Some(venue) = venue {
2491            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2492        }
2493
2494        if let Some(instrument_id) = instrument_id {
2495            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2496        }
2497
2498        if let Some(strategy_id) = strategy_id {
2499            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2500        }
2501
2502        order_lists
2503    }
2504
2505    /// Returns whether an order list with the given `order_list_id` exists.
2506    #[must_use]
2507    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2508        self.order_lists.contains_key(order_list_id)
2509    }
2510
2511    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2512
2513    /// Returns references to all orders associated with the given `exec_algorithm_id` matching the given
2514    /// optional filter parameters.
2515    #[must_use]
2516    pub fn orders_for_exec_algorithm(
2517        &self,
2518        exec_algorithm_id: &ExecAlgorithmId,
2519        venue: Option<&Venue>,
2520        instrument_id: Option<&InstrumentId>,
2521        strategy_id: Option<&StrategyId>,
2522        side: Option<OrderSide>,
2523    ) -> Vec<&OrderAny> {
2524        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2525        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2526
2527        if let Some(query) = query {
2528            if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2529                let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2530            }
2531        }
2532
2533        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2534            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2535        } else {
2536            Vec::new()
2537        }
2538    }
2539
2540    /// Returns references to all orders with the given `exec_spawn_id`.
2541    #[must_use]
2542    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2543        self.get_orders_for_ids(
2544            self.index
2545                .exec_spawn_orders
2546                .get(exec_spawn_id)
2547                .unwrap_or(&HashSet::new()),
2548            None,
2549        )
2550    }
2551
2552    /// Returns the total order quantity for the given `exec_spawn_id`.
2553    #[must_use]
2554    pub fn exec_spawn_total_quantity(
2555        &self,
2556        exec_spawn_id: &ClientOrderId,
2557        active_only: bool,
2558    ) -> Option<Quantity> {
2559        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2560
2561        let mut total_quantity: Option<Quantity> = None;
2562
2563        for spawn_order in exec_spawn_orders {
2564            if !active_only || !spawn_order.is_closed() {
2565                if let Some(mut total_quantity) = total_quantity {
2566                    total_quantity += spawn_order.quantity();
2567                }
2568            } else {
2569                total_quantity = Some(spawn_order.quantity());
2570            }
2571        }
2572
2573        total_quantity
2574    }
2575
2576    /// Returns the total filled quantity for all orders with the given `exec_spawn_id`.
2577    #[must_use]
2578    pub fn exec_spawn_total_filled_qty(
2579        &self,
2580        exec_spawn_id: &ClientOrderId,
2581        active_only: bool,
2582    ) -> Option<Quantity> {
2583        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2584
2585        let mut total_quantity: Option<Quantity> = None;
2586
2587        for spawn_order in exec_spawn_orders {
2588            if !active_only || !spawn_order.is_closed() {
2589                if let Some(mut total_quantity) = total_quantity {
2590                    total_quantity += spawn_order.filled_qty();
2591                }
2592            } else {
2593                total_quantity = Some(spawn_order.filled_qty());
2594            }
2595        }
2596
2597        total_quantity
2598    }
2599
2600    /// Returns the total leaves quantity for all orders with the given `exec_spawn_id`.
2601    #[must_use]
2602    pub fn exec_spawn_total_leaves_qty(
2603        &self,
2604        exec_spawn_id: &ClientOrderId,
2605        active_only: bool,
2606    ) -> Option<Quantity> {
2607        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2608
2609        let mut total_quantity: Option<Quantity> = None;
2610
2611        for spawn_order in exec_spawn_orders {
2612            if !active_only || !spawn_order.is_closed() {
2613                if let Some(mut total_quantity) = total_quantity {
2614                    total_quantity += spawn_order.leaves_qty();
2615                }
2616            } else {
2617                total_quantity = Some(spawn_order.leaves_qty());
2618            }
2619        }
2620
2621        total_quantity
2622    }
2623
2624    // -- POSITION QUERIES ------------------------------------------------------------------------
2625
2626    /// Returns a reference to the position with the given `position_id` (if found).
2627    #[must_use]
2628    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2629        self.positions.get(position_id)
2630    }
2631
2632    /// Returns a reference to the position for the given `client_order_id` (if found).
2633    #[must_use]
2634    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2635        self.index
2636            .order_position
2637            .get(client_order_id)
2638            .and_then(|position_id| self.positions.get(position_id))
2639    }
2640
2641    /// Returns a reference to the position ID for the given `client_order_id` (if found).
2642    #[must_use]
2643    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2644        self.index.order_position.get(client_order_id)
2645    }
2646
2647    /// Returns a reference to all positions matching the given optional filter parameters.
2648    #[must_use]
2649    pub fn positions(
2650        &self,
2651        venue: Option<&Venue>,
2652        instrument_id: Option<&InstrumentId>,
2653        strategy_id: Option<&StrategyId>,
2654        side: Option<PositionSide>,
2655    ) -> Vec<&Position> {
2656        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2657        self.get_positions_for_ids(&position_ids, side)
2658    }
2659
2660    /// Returns a reference to all open positions matching the given optional filter parameters.
2661    #[must_use]
2662    pub fn positions_open(
2663        &self,
2664        venue: Option<&Venue>,
2665        instrument_id: Option<&InstrumentId>,
2666        strategy_id: Option<&StrategyId>,
2667        side: Option<PositionSide>,
2668    ) -> Vec<&Position> {
2669        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2670        self.get_positions_for_ids(&position_ids, side)
2671    }
2672
2673    /// Returns a reference to all closed positions matching the given optional filter parameters.
2674    #[must_use]
2675    pub fn positions_closed(
2676        &self,
2677        venue: Option<&Venue>,
2678        instrument_id: Option<&InstrumentId>,
2679        strategy_id: Option<&StrategyId>,
2680        side: Option<PositionSide>,
2681    ) -> Vec<&Position> {
2682        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2683        self.get_positions_for_ids(&position_ids, side)
2684    }
2685
2686    /// Returns whether a position with the given `position_id` exists.
2687    #[must_use]
2688    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2689        self.index.positions.contains(position_id)
2690    }
2691
2692    /// Returns whether a position with the given `position_id` is open.
2693    #[must_use]
2694    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2695        self.index.positions_open.contains(position_id)
2696    }
2697
2698    /// Returns whether a position with the given `position_id` is closed.
2699    #[must_use]
2700    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2701        self.index.positions_closed.contains(position_id)
2702    }
2703
2704    /// Returns the count of all open positions.
2705    #[must_use]
2706    pub fn positions_open_count(
2707        &self,
2708        venue: Option<&Venue>,
2709        instrument_id: Option<&InstrumentId>,
2710        strategy_id: Option<&StrategyId>,
2711        side: Option<PositionSide>,
2712    ) -> usize {
2713        self.positions_open(venue, instrument_id, strategy_id, side)
2714            .len()
2715    }
2716
2717    /// Returns the count of all closed positions.
2718    #[must_use]
2719    pub fn positions_closed_count(
2720        &self,
2721        venue: Option<&Venue>,
2722        instrument_id: Option<&InstrumentId>,
2723        strategy_id: Option<&StrategyId>,
2724        side: Option<PositionSide>,
2725    ) -> usize {
2726        self.positions_closed(venue, instrument_id, strategy_id, side)
2727            .len()
2728    }
2729
2730    /// Returns the count of all positions.
2731    #[must_use]
2732    pub fn positions_total_count(
2733        &self,
2734        venue: Option<&Venue>,
2735        instrument_id: Option<&InstrumentId>,
2736        strategy_id: Option<&StrategyId>,
2737        side: Option<PositionSide>,
2738    ) -> usize {
2739        self.positions(venue, instrument_id, strategy_id, side)
2740            .len()
2741    }
2742
2743    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2744
2745    /// Gets a reference to the strategy ID for the given `client_order_id` (if found).
2746    #[must_use]
2747    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2748        self.index.order_strategy.get(client_order_id)
2749    }
2750
2751    /// Gets a reference to the strategy ID for the given `position_id` (if found).
2752    #[must_use]
2753    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2754        self.index.position_strategy.get(position_id)
2755    }
2756
2757    // -- GENERAL ---------------------------------------------------------------------------------
2758
2759    /// Gets a reference to the general object value for the given `key` (if found).
2760    ///
2761    /// # Errors
2762    ///
2763    /// Returns an error if the `key` is invalid.
2764    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2765        check_valid_string(key, stringify!(key))?;
2766
2767        Ok(self.general.get(key))
2768    }
2769
2770    // -- DATA QUERIES ----------------------------------------------------------------------------
2771
2772    /// Returns the price for the given `instrument_id` and `price_type` (if found).
2773    #[must_use]
2774    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2775        match price_type {
2776            PriceType::Bid => self
2777                .quotes
2778                .get(instrument_id)
2779                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2780            PriceType::Ask => self
2781                .quotes
2782                .get(instrument_id)
2783                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2784            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2785                quotes.front().map(|quote| {
2786                    Price::new(
2787                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2788                        quote.bid_price.precision + 1,
2789                    )
2790                })
2791            }),
2792            PriceType::Last => self
2793                .trades
2794                .get(instrument_id)
2795                .and_then(|trades| trades.front().map(|trade| trade.price)),
2796            PriceType::Mark => self
2797                .mark_prices
2798                .get(instrument_id)
2799                .and_then(|marks| marks.front().map(|mark| mark.value)),
2800        }
2801    }
2802
2803    /// Gets all quotes for the given `instrument_id`.
2804    #[must_use]
2805    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2806        self.quotes
2807            .get(instrument_id)
2808            .map(|quotes| quotes.iter().copied().collect())
2809    }
2810
2811    /// Gets all trades for the given `instrument_id`.
2812    #[must_use]
2813    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2814        self.trades
2815            .get(instrument_id)
2816            .map(|trades| trades.iter().copied().collect())
2817    }
2818
2819    /// Gets all mark price updates for the given `instrument_id`.
2820    #[must_use]
2821    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2822        self.mark_prices
2823            .get(instrument_id)
2824            .map(|mark_prices| mark_prices.iter().copied().collect())
2825    }
2826
2827    /// Gets all index price updates for the given `instrument_id`.
2828    #[must_use]
2829    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2830        self.index_prices
2831            .get(instrument_id)
2832            .map(|index_prices| index_prices.iter().copied().collect())
2833    }
2834
2835    /// Gets all bars for the given `bar_type`.
2836    #[must_use]
2837    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2838        self.bars
2839            .get(bar_type)
2840            .map(|bars| bars.iter().copied().collect())
2841    }
2842
2843    /// Gets a reference to the order book for the given `instrument_id`.
2844    #[must_use]
2845    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2846        self.books.get(instrument_id)
2847    }
2848
2849    /// Gets a reference to the order book for the given `instrument_id`.
2850    #[must_use]
2851    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2852        self.books.get_mut(instrument_id)
2853    }
2854
2855    /// Gets a reference to the own order book for the given `instrument_id`.
2856    #[must_use]
2857    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
2858        self.own_books.get(instrument_id)
2859    }
2860
2861    /// Gets a reference to the own order book for the given `instrument_id`.
2862    #[must_use]
2863    pub fn own_order_book_mut(
2864        &mut self,
2865        instrument_id: &InstrumentId,
2866    ) -> Option<&mut OwnOrderBook> {
2867        self.own_books.get_mut(instrument_id)
2868    }
2869
2870    /// Gets a reference to the latest quote tick for the given `instrument_id`.
2871    #[must_use]
2872    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2873        self.quotes
2874            .get(instrument_id)
2875            .and_then(|quotes| quotes.front())
2876    }
2877
2878    /// Gets a reference to the latest trade tick for the given `instrument_id`.
2879    #[must_use]
2880    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2881        self.trades
2882            .get(instrument_id)
2883            .and_then(|trades| trades.front())
2884    }
2885
2886    /// Gets a referenece to the latest mark price update for the given `instrument_id`.
2887    #[must_use]
2888    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
2889        self.mark_prices
2890            .get(instrument_id)
2891            .and_then(|mark_prices| mark_prices.front())
2892    }
2893
2894    /// Gets a referenece to the latest index price update for the given `instrument_id`.
2895    #[must_use]
2896    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
2897        self.index_prices
2898            .get(instrument_id)
2899            .and_then(|index_prices| index_prices.front())
2900    }
2901
2902    /// Gets a reference to the latest bar for the given `bar_type`.
2903    #[must_use]
2904    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2905        self.bars.get(bar_type).and_then(|bars| bars.front())
2906    }
2907
2908    /// Gets the order book update count for the given `instrument_id`.
2909    #[must_use]
2910    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2911        self.books
2912            .get(instrument_id)
2913            .map_or(0, |book| book.update_count) as usize
2914    }
2915
2916    /// Gets the quote tick count for the given `instrument_id`.
2917    #[must_use]
2918    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2919        self.quotes
2920            .get(instrument_id)
2921            .map_or(0, std::collections::VecDeque::len)
2922    }
2923
2924    /// Gets the trade tick count for the given `instrument_id`.
2925    #[must_use]
2926    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2927        self.trades
2928            .get(instrument_id)
2929            .map_or(0, std::collections::VecDeque::len)
2930    }
2931
2932    /// Gets the bar count for the given `instrument_id`.
2933    #[must_use]
2934    pub fn bar_count(&self, bar_type: &BarType) -> usize {
2935        self.bars
2936            .get(bar_type)
2937            .map_or(0, std::collections::VecDeque::len)
2938    }
2939
2940    /// Returns whether the cache contains an order book for the given `instrument_id`.
2941    #[must_use]
2942    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2943        self.books.contains_key(instrument_id)
2944    }
2945
2946    /// Returns whether the cache contains quotes for the given `instrument_id`.
2947    #[must_use]
2948    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2949        self.quote_count(instrument_id) > 0
2950    }
2951
2952    /// Returns whether the cache contains trades for the given `instrument_id`.
2953    #[must_use]
2954    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2955        self.trade_count(instrument_id) > 0
2956    }
2957
2958    /// Returns whether the cache contains bars for the given `bar_type`.
2959    #[must_use]
2960    pub fn has_bars(&self, bar_type: &BarType) -> bool {
2961        self.bar_count(bar_type) > 0
2962    }
2963
2964    #[must_use]
2965    pub fn get_xrate(
2966        &self,
2967        venue: Venue,
2968        from_currency: Currency,
2969        to_currency: Currency,
2970        price_type: PriceType,
2971    ) -> Option<f64> {
2972        if from_currency == to_currency {
2973            // When the source and target currencies are identical,
2974            // no conversion is needed; return an exchange rate of 1.0.
2975            return Some(1.0);
2976        }
2977
2978        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2979
2980        match get_exchange_rate(
2981            from_currency.code,
2982            to_currency.code,
2983            price_type,
2984            bid_quote,
2985            ask_quote,
2986        ) {
2987            Ok(rate) => rate,
2988            Err(e) => {
2989                log::error!("Failed to calculate xrate: {e}");
2990                None
2991            }
2992        }
2993    }
2994
2995    fn build_quote_table(&self, venue: &Venue) -> (HashMap<String, f64>, HashMap<String, f64>) {
2996        let mut bid_quotes = HashMap::new();
2997        let mut ask_quotes = HashMap::new();
2998
2999        for instrument_id in self.instruments.keys() {
3000            if instrument_id.venue != *venue {
3001                continue;
3002            }
3003
3004            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3005                if let Some(tick) = ticks.front() {
3006                    (tick.bid_price, tick.ask_price)
3007                } else {
3008                    continue; // Empty ticks vector
3009                }
3010            } else {
3011                let bid_bar = self
3012                    .bars
3013                    .iter()
3014                    .find(|(k, _)| {
3015                        k.instrument_id() == *instrument_id
3016                            && matches!(k.spec().price_type, PriceType::Bid)
3017                    })
3018                    .map(|(_, v)| v);
3019
3020                let ask_bar = self
3021                    .bars
3022                    .iter()
3023                    .find(|(k, _)| {
3024                        k.instrument_id() == *instrument_id
3025                            && matches!(k.spec().price_type, PriceType::Ask)
3026                    })
3027                    .map(|(_, v)| v);
3028
3029                match (bid_bar, ask_bar) {
3030                    (Some(bid), Some(ask)) => {
3031                        let bid_price = bid.front().unwrap().close;
3032                        let ask_price = ask.front().unwrap().close;
3033
3034                        (bid_price, ask_price)
3035                    }
3036                    _ => continue,
3037                }
3038            };
3039
3040            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3041            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3042        }
3043
3044        (bid_quotes, ask_quotes)
3045    }
3046
3047    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3048    #[must_use]
3049    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3050        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3051    }
3052
3053    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3054    ///
3055    /// # Panics
3056    ///
3057    /// Panics if `xrate` is not positive.
3058    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3059        assert!(xrate > 0.0, "xrate was zero");
3060        self.mark_xrates.insert((from_currency, to_currency), xrate);
3061        self.mark_xrates
3062            .insert((to_currency, from_currency), 1.0 / xrate);
3063    }
3064
3065    /// Clears the mark exchange rate for the given currency pair.
3066    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3067        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3068    }
3069
3070    /// Clears all mark exchange rates.
3071    pub fn clear_mark_xrates(&mut self) {
3072        self.mark_xrates.clear();
3073    }
3074
3075    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3076
3077    /// Returns a reference to the instrument for the given `instrument_id` (if found).
3078    #[must_use]
3079    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3080        self.instruments.get(instrument_id)
3081    }
3082
3083    /// Returns references to all instrument IDs for the given `venue`.
3084    #[must_use]
3085    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3086        match venue {
3087            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3088            None => self.instruments.keys().collect(),
3089        }
3090    }
3091
3092    /// Returns references to all instruments for the given `venue`.
3093    #[must_use]
3094    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3095        self.instruments
3096            .values()
3097            .filter(|i| &i.id().venue == venue)
3098            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3099            .collect()
3100    }
3101
3102    /// Returns references to all bar types contained in the cache.
3103    #[must_use]
3104    pub fn bar_types(
3105        &self,
3106        instrument_id: Option<&InstrumentId>,
3107        price_type: Option<&PriceType>,
3108        aggregation_source: AggregationSource,
3109    ) -> Vec<&BarType> {
3110        let mut bar_types = self
3111            .bars
3112            .keys()
3113            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3114            .collect::<Vec<&BarType>>();
3115
3116        if let Some(instrument_id) = instrument_id {
3117            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3118        }
3119
3120        if let Some(price_type) = price_type {
3121            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3122        }
3123
3124        bar_types
3125    }
3126
3127    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3128
3129    /// Returns a reference to the synthetic instrument for the given `instrument_id` (if found).
3130    #[must_use]
3131    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3132        self.synthetics.get(instrument_id)
3133    }
3134
3135    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3136    #[must_use]
3137    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3138        self.synthetics.keys().collect()
3139    }
3140
3141    /// Returns references to all synthetic instruments contained in the cache.
3142    #[must_use]
3143    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3144        self.synthetics.values().collect()
3145    }
3146
3147    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3148
3149    /// Returns a reference to the account for the given `account_id` (if found).
3150    #[must_use]
3151    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3152        self.accounts.get(account_id)
3153    }
3154
3155    /// Returns a reference to the account for the given `venue` (if found).
3156    #[must_use]
3157    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3158        self.index
3159            .venue_account
3160            .get(venue)
3161            .and_then(|account_id| self.accounts.get(account_id))
3162    }
3163
3164    /// Returns a reference to the account ID for the given `venue` (if found).
3165    #[must_use]
3166    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3167        self.index.venue_account.get(venue)
3168    }
3169
3170    /// Returns references to all accounts for the given `account_id`.
3171    #[must_use]
3172    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3173        self.accounts
3174            .values()
3175            .filter(|account| &account.id() == account_id)
3176            .collect()
3177    }
3178}