nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::{BinaryHeap, HashMap, VecDeque},
25    fmt::Debug,
26    rc::Rc,
27};
28
29use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
30use nautilus_core::{
31    UnixNanos,
32    correctness::{FAILED, check_equal},
33};
34use nautilus_execution::{
35    client::ExecutionClient,
36    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
37    models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40    accounts::AccountAny,
41    data::{
42        Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43        QuoteTick, TradeTick,
44    },
45    enums::{AccountType, BookType, OmsType},
46    identifiers::{InstrumentId, Venue},
47    instruments::{Instrument, InstrumentAny},
48    orderbook::OrderBook,
49    orders::PassiveOrderAny,
50    types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53
54use crate::modules::SimulationModule;
55
56/// Represents commands with simulated network latency in a min-heap priority queue.
57/// The commands are ordered by timestamp for FIFO processing, with the
58/// earliest timestamp having the highest priority in the queue.
59#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61    ts: UnixNanos,
62    counter: u32,
63    command: TradingCommand,
64}
65
66impl InflightCommand {
67    const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68        Self {
69            ts,
70            counter,
71            command,
72        }
73    }
74}
75
76impl Ord for InflightCommand {
77    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
79        other
80            .ts
81            .cmp(&self.ts)
82            .then_with(|| other.counter.cmp(&self.counter))
83    }
84}
85
86impl PartialOrd for InflightCommand {
87    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92/// Simulated exchange venue for realistic trading execution during backtesting.
93///
94/// The `SimulatedExchange` provides a comprehensive simulation of a trading venue,
95/// including order matching engines, account management, and realistic execution
96/// models. It maintains order books, processes market data, and executes trades
97/// with configurable latency and fill models to accurately simulate real market
98/// conditions during backtesting.
99///
100/// Key features:
101/// - Multi-instrument order matching with realistic execution
102/// - Configurable fee, fill, and latency models
103/// - Support for various order types and execution options
104/// - Account balance and position management
105/// - Market data processing and order book maintenance
106/// - Simulation modules for custom venue behaviors
107pub struct SimulatedExchange {
108    pub id: Venue,
109    pub oms_type: OmsType,
110    pub account_type: AccountType,
111    starting_balances: Vec<Money>,
112    book_type: BookType,
113    default_leverage: Decimal,
114    exec_client: Option<Rc<dyn ExecutionClient>>,
115    pub base_currency: Option<Currency>,
116    fee_model: FeeModelAny,
117    fill_model: FillModel,
118    latency_model: Option<LatencyModel>,
119    instruments: HashMap<InstrumentId, InstrumentAny>,
120    matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
121    leverages: HashMap<InstrumentId, Decimal>,
122    modules: Vec<Box<dyn SimulationModule>>,
123    clock: Rc<RefCell<dyn Clock>>,
124    cache: Rc<RefCell<Cache>>,
125    message_queue: VecDeque<TradingCommand>,
126    inflight_queue: BinaryHeap<InflightCommand>,
127    inflight_counter: HashMap<UnixNanos, u32>,
128    frozen_account: bool,
129    bar_execution: bool,
130    reject_stop_orders: bool,
131    support_gtd_orders: bool,
132    support_contingent_orders: bool,
133    use_position_ids: bool,
134    use_random_ids: bool,
135    use_reduce_only: bool,
136    use_message_queue: bool,
137}
138
139impl Debug for SimulatedExchange {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct(stringify!(SimulatedExchange))
142            .field("id", &self.id)
143            .field("account_type", &self.account_type)
144            .finish()
145    }
146}
147
148impl SimulatedExchange {
149    /// Creates a new [`SimulatedExchange`] instance.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if:
154    /// - `starting_balances` is empty.
155    /// - `base_currency` is `Some` but `starting_balances` contains multiple currencies.
156    #[allow(clippy::too_many_arguments)]
157    pub fn new(
158        venue: Venue,
159        oms_type: OmsType,
160        account_type: AccountType,
161        starting_balances: Vec<Money>,
162        base_currency: Option<Currency>,
163        default_leverage: Decimal,
164        leverages: HashMap<InstrumentId, Decimal>,
165        modules: Vec<Box<dyn SimulationModule>>,
166        cache: Rc<RefCell<Cache>>,
167        clock: Rc<RefCell<dyn Clock>>,
168        fill_model: FillModel,
169        fee_model: FeeModelAny,
170        book_type: BookType,
171        latency_model: Option<LatencyModel>,
172        frozen_account: Option<bool>,
173        bar_execution: Option<bool>,
174        reject_stop_orders: Option<bool>,
175        support_gtd_orders: Option<bool>,
176        support_contingent_orders: Option<bool>,
177        use_position_ids: Option<bool>,
178        use_random_ids: Option<bool>,
179        use_reduce_only: Option<bool>,
180        use_message_queue: Option<bool>,
181    ) -> anyhow::Result<Self> {
182        if starting_balances.is_empty() {
183            anyhow::bail!("Starting balances must be provided")
184        }
185        if base_currency.is_some() && starting_balances.len() > 1 {
186            anyhow::bail!("single-currency account has multiple starting currencies")
187        }
188        // TODO register and load modules
189        Ok(Self {
190            id: venue,
191            oms_type,
192            account_type,
193            starting_balances,
194            book_type,
195            default_leverage,
196            exec_client: None,
197            base_currency,
198            fee_model,
199            fill_model,
200            latency_model,
201            instruments: HashMap::new(),
202            matching_engines: HashMap::new(),
203            leverages,
204            modules,
205            clock,
206            cache,
207            message_queue: VecDeque::new(),
208            inflight_queue: BinaryHeap::new(),
209            inflight_counter: HashMap::new(),
210            frozen_account: frozen_account.unwrap_or(false),
211            bar_execution: bar_execution.unwrap_or(true),
212            reject_stop_orders: reject_stop_orders.unwrap_or(true),
213            support_gtd_orders: support_gtd_orders.unwrap_or(true),
214            support_contingent_orders: support_contingent_orders.unwrap_or(true),
215            use_position_ids: use_position_ids.unwrap_or(true),
216            use_random_ids: use_random_ids.unwrap_or(false),
217            use_reduce_only: use_reduce_only.unwrap_or(true),
218            use_message_queue: use_message_queue.unwrap_or(true),
219        })
220    }
221
222    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
223        self.exec_client = Some(client);
224    }
225
226    pub fn set_fill_model(&mut self, fill_model: FillModel) {
227        for matching_engine in self.matching_engines.values_mut() {
228            matching_engine.set_fill_model(fill_model.clone());
229            log::info!(
230                "Setting fill model for {} to {}",
231                matching_engine.venue,
232                self.fill_model
233            );
234        }
235        self.fill_model = fill_model;
236    }
237
238    pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
239        self.latency_model = Some(latency_model);
240    }
241
242    pub fn initialize_account(&mut self) {
243        self.generate_fresh_account_state();
244    }
245
246    /// Adds an instrument to the simulated exchange and initializes its matching engine.
247    ///
248    /// # Errors
249    ///
250    /// Returns an error if the exchange account type is `Cash` and the instrument is a `CryptoPerpetual` or `CryptoFuture`.
251    ///
252    /// # Panics
253    ///
254    /// Panics if the instrument cannot be added to the exchange.
255    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
256        check_equal(
257            &instrument.id().venue,
258            &self.id,
259            "Venue of instrument id",
260            "Venue of simulated exchange",
261        )
262        .expect(FAILED);
263
264        if self.account_type == AccountType::Cash
265            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
266                || matches!(instrument, InstrumentAny::CryptoFuture(_)))
267        {
268            anyhow::bail!("Cash account cannot trade futures or perpetuals")
269        }
270
271        self.instruments.insert(instrument.id(), instrument.clone());
272
273        let matching_engine_config = OrderMatchingEngineConfig::new(
274            self.bar_execution,
275            self.reject_stop_orders,
276            self.support_gtd_orders,
277            self.support_contingent_orders,
278            self.use_position_ids,
279            self.use_random_ids,
280            self.use_reduce_only,
281        );
282        let instrument_id = instrument.id();
283        let matching_engine = OrderMatchingEngine::new(
284            instrument,
285            self.instruments.len() as u32,
286            self.fill_model.clone(),
287            self.fee_model.clone(),
288            self.book_type,
289            self.oms_type,
290            self.account_type,
291            self.clock.clone(),
292            Rc::clone(&self.cache),
293            matching_engine_config,
294        );
295        self.matching_engines.insert(instrument_id, matching_engine);
296
297        log::info!("Added instrument {instrument_id} and created matching engine");
298        Ok(())
299    }
300
301    #[must_use]
302    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
303        self.matching_engines
304            .get(&instrument_id)
305            .and_then(OrderMatchingEngine::best_bid_price)
306    }
307
308    #[must_use]
309    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
310        self.matching_engines
311            .get(&instrument_id)
312            .and_then(OrderMatchingEngine::best_ask_price)
313    }
314
315    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
316        self.matching_engines
317            .get(&instrument_id)
318            .map(OrderMatchingEngine::get_book)
319    }
320
321    #[must_use]
322    pub fn get_matching_engine(
323        &self,
324        instrument_id: &InstrumentId,
325    ) -> Option<&OrderMatchingEngine> {
326        self.matching_engines.get(instrument_id)
327    }
328
329    #[must_use]
330    pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
331        &self.matching_engines
332    }
333
334    #[must_use]
335    pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
336        let mut books = HashMap::new();
337        for (instrument_id, matching_engine) in &self.matching_engines {
338            books.insert(*instrument_id, matching_engine.get_book().clone());
339        }
340        books
341    }
342
343    #[must_use]
344    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
345        instrument_id
346            .and_then(|id| {
347                self.matching_engines
348                    .get(&id)
349                    .map(OrderMatchingEngine::get_open_orders)
350            })
351            .unwrap_or_else(|| {
352                self.matching_engines
353                    .values()
354                    .flat_map(OrderMatchingEngine::get_open_orders)
355                    .collect()
356            })
357    }
358
359    #[must_use]
360    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
361        instrument_id
362            .and_then(|id| {
363                self.matching_engines
364                    .get(&id)
365                    .map(|engine| engine.get_open_bid_orders().to_vec())
366            })
367            .unwrap_or_else(|| {
368                self.matching_engines
369                    .values()
370                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
371                    .collect()
372            })
373    }
374
375    #[must_use]
376    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
377        instrument_id
378            .and_then(|id| {
379                self.matching_engines
380                    .get(&id)
381                    .map(|engine| engine.get_open_ask_orders().to_vec())
382            })
383            .unwrap_or_else(|| {
384                self.matching_engines
385                    .values()
386                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
387                    .collect()
388            })
389    }
390
391    /// # Panics
392    ///
393    /// Panics if retrieving the account from the execution client fails.
394    #[must_use]
395    pub fn get_account(&self) -> Option<AccountAny> {
396        self.exec_client
397            .as_ref()
398            .map(|client| client.get_account().unwrap())
399    }
400
401    /// # Panics
402    ///
403    /// Panics if generating account state fails during adjustment.
404    pub fn adjust_account(&mut self, adjustment: Money) {
405        if self.frozen_account {
406            // Nothing to adjust
407            return;
408        }
409
410        if let Some(exec_client) = &self.exec_client {
411            let venue = exec_client.venue();
412            println!("Adjusting account for venue {venue}");
413            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
414                match account.balance(Some(adjustment.currency)) {
415                    Some(balance) => {
416                        let mut current_balance = *balance;
417                        current_balance.total += adjustment;
418                        current_balance.free += adjustment;
419
420                        let margins = match account {
421                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
422                            _ => HashMap::new(),
423                        };
424
425                        if let Some(exec_client) = &self.exec_client {
426                            exec_client
427                                .generate_account_state(
428                                    vec![current_balance],
429                                    margins.values().copied().collect(),
430                                    true,
431                                    self.clock.borrow().timestamp_ns(),
432                                )
433                                .unwrap();
434                        }
435                    }
436                    None => {
437                        log::error!(
438                            "Cannot adjust account: no balance for currency {}",
439                            adjustment.currency
440                        );
441                    }
442                }
443            } else {
444                log::error!("Cannot adjust account: no account for venue {venue}");
445            }
446        }
447    }
448
449    pub fn send(&mut self, command: TradingCommand) {
450        if !self.use_message_queue {
451            self.process_trading_command(command);
452        } else if self.latency_model.is_none() {
453            self.message_queue.push_back(command);
454        } else {
455            let (ts, counter) = self.generate_inflight_command(&command);
456            self.inflight_queue
457                .push(InflightCommand::new(ts, counter, command));
458        }
459    }
460
461    /// # Panics
462    ///
463    /// Panics if the command is invalid when generating inflight command.
464    pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
465        if let Some(latency_model) = &self.latency_model {
466            let ts = match command {
467                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
468                    command.ts_init() + latency_model.insert_latency_nanos
469                }
470                TradingCommand::ModifyOrder(_) => {
471                    command.ts_init() + latency_model.update_latency_nanos
472                }
473                TradingCommand::CancelOrder(_)
474                | TradingCommand::CancelAllOrders(_)
475                | TradingCommand::BatchCancelOrders(_) => {
476                    command.ts_init() + latency_model.delete_latency_nanos
477                }
478                _ => panic!("Invalid command was {command}"),
479            };
480
481            let counter = self
482                .inflight_counter
483                .entry(ts)
484                .and_modify(|e| *e += 1)
485                .or_insert(1);
486
487            (ts, *counter)
488        } else {
489            panic!("Latency model should be initialized");
490        }
491    }
492
493    /// # Panics
494    ///
495    /// Panics if adding a missing instrument during delta processing fails.
496    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
497        for module in &self.modules {
498            module.pre_process(Data::Delta(delta));
499        }
500
501        if !self.matching_engines.contains_key(&delta.instrument_id) {
502            let instrument = {
503                let cache = self.cache.as_ref().borrow();
504                cache.instrument(&delta.instrument_id).cloned()
505            };
506
507            if let Some(instrument) = instrument {
508                self.add_instrument(instrument).unwrap();
509            } else {
510                panic!(
511                    "No matching engine found for instrument {}",
512                    delta.instrument_id
513                );
514            }
515        }
516
517        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
518            matching_engine.process_order_book_delta(&delta);
519        } else {
520            panic!("Matching engine should be initialized");
521        }
522    }
523
524    /// # Panics
525    ///
526    /// Panics if adding a missing instrument during deltas processing fails.
527    pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
528        for module in &self.modules {
529            module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
530        }
531
532        if !self.matching_engines.contains_key(&deltas.instrument_id) {
533            let instrument = {
534                let cache = self.cache.as_ref().borrow();
535                cache.instrument(&deltas.instrument_id).cloned()
536            };
537
538            if let Some(instrument) = instrument {
539                self.add_instrument(instrument).unwrap();
540            } else {
541                panic!(
542                    "No matching engine found for instrument {}",
543                    deltas.instrument_id
544                );
545            }
546        }
547
548        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
549            matching_engine.process_order_book_deltas(&deltas);
550        } else {
551            panic!("Matching engine should be initialized");
552        }
553    }
554
555    /// # Panics
556    ///
557    /// Panics if adding a missing instrument during quote tick processing fails.
558    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
559        for module in &self.modules {
560            module.pre_process(Data::Quote(quote.to_owned()));
561        }
562
563        if !self.matching_engines.contains_key(&quote.instrument_id) {
564            let instrument = {
565                let cache = self.cache.as_ref().borrow();
566                cache.instrument(&quote.instrument_id).cloned()
567            };
568
569            if let Some(instrument) = instrument {
570                self.add_instrument(instrument).unwrap();
571            } else {
572                panic!(
573                    "No matching engine found for instrument {}",
574                    quote.instrument_id
575                );
576            }
577        }
578
579        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
580            matching_engine.process_quote_tick(quote);
581        } else {
582            panic!("Matching engine should be initialized");
583        }
584    }
585
586    /// # Panics
587    ///
588    /// Panics if adding a missing instrument during trade tick processing fails.
589    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
590        for module in &self.modules {
591            module.pre_process(Data::Trade(trade.to_owned()));
592        }
593
594        if !self.matching_engines.contains_key(&trade.instrument_id) {
595            let instrument = {
596                let cache = self.cache.as_ref().borrow();
597                cache.instrument(&trade.instrument_id).cloned()
598            };
599
600            if let Some(instrument) = instrument {
601                self.add_instrument(instrument).unwrap();
602            } else {
603                panic!(
604                    "No matching engine found for instrument {}",
605                    trade.instrument_id
606                );
607            }
608        }
609
610        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
611            matching_engine.process_trade_tick(trade);
612        } else {
613            panic!("Matching engine should be initialized");
614        }
615    }
616
617    /// # Panics
618    ///
619    /// Panics if adding a missing instrument during bar processing fails.
620    pub fn process_bar(&mut self, bar: Bar) {
621        for module in &self.modules {
622            module.pre_process(Data::Bar(bar));
623        }
624
625        if !self.matching_engines.contains_key(&bar.instrument_id()) {
626            let instrument = {
627                let cache = self.cache.as_ref().borrow();
628                cache.instrument(&bar.instrument_id()).cloned()
629            };
630
631            if let Some(instrument) = instrument {
632                self.add_instrument(instrument).unwrap();
633            } else {
634                panic!(
635                    "No matching engine found for instrument {}",
636                    bar.instrument_id()
637                );
638            }
639        }
640
641        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
642            matching_engine.process_bar(&bar);
643        } else {
644            panic!("Matching engine should be initialized");
645        }
646    }
647
648    /// # Panics
649    ///
650    /// Panics if adding a missing instrument during instrument status processing fails.
651    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
652        // TODO add module preprocessing
653
654        if !self.matching_engines.contains_key(&status.instrument_id) {
655            let instrument = {
656                let cache = self.cache.as_ref().borrow();
657                cache.instrument(&status.instrument_id).cloned()
658            };
659
660            if let Some(instrument) = instrument {
661                self.add_instrument(instrument).unwrap();
662            } else {
663                panic!(
664                    "No matching engine found for instrument {}",
665                    status.instrument_id
666                );
667            }
668        }
669
670        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
671            matching_engine.process_status(status.action);
672        } else {
673            panic!("Matching engine should be initialized");
674        }
675    }
676
677    /// # Panics
678    ///
679    /// Panics if popping an inflight command fails during processing.
680    pub fn process(&mut self, ts_now: UnixNanos) {
681        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
682
683        // Process inflight commands
684        while let Some(inflight) = self.inflight_queue.peek() {
685            if inflight.ts > ts_now {
686                // Future commands remain in the queue
687                break;
688            }
689            // We get the inflight command, remove it from the queue and process it
690            let inflight = self.inflight_queue.pop().unwrap();
691            self.process_trading_command(inflight.command);
692        }
693
694        // Process regular message queue
695        while let Some(command) = self.message_queue.pop_front() {
696            self.process_trading_command(command);
697        }
698    }
699
700    pub fn reset(&mut self) {
701        for module in &self.modules {
702            module.reset();
703        }
704
705        self.generate_fresh_account_state();
706
707        for matching_engine in self.matching_engines.values_mut() {
708            matching_engine.reset();
709        }
710
711        // TODO Clear the inflight and message queues
712        log::info!("Resetting exchange state");
713    }
714
715    /// # Panics
716    ///
717    /// Panics if execution client is uninitialized when processing trading command.
718    pub fn process_trading_command(&mut self, command: TradingCommand) {
719        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
720            let account_id = if let Some(exec_client) = &self.exec_client {
721                exec_client.account_id()
722            } else {
723                panic!("Execution client should be initialized");
724            };
725            match command {
726                TradingCommand::SubmitOrder(mut command) => {
727                    matching_engine.process_order(&mut command.order, account_id);
728                }
729                TradingCommand::ModifyOrder(ref command) => {
730                    matching_engine.process_modify(command, account_id);
731                }
732                TradingCommand::CancelOrder(ref command) => {
733                    matching_engine.process_cancel(command, account_id);
734                }
735                TradingCommand::CancelAllOrders(ref command) => {
736                    matching_engine.process_cancel_all(command, account_id);
737                }
738                TradingCommand::BatchCancelOrders(ref command) => {
739                    matching_engine.process_batch_cancel(command, account_id);
740                }
741                TradingCommand::SubmitOrderList(mut command) => {
742                    for order in &mut command.order_list.orders {
743                        matching_engine.process_order(order, account_id);
744                    }
745                }
746                _ => {}
747            }
748        } else {
749            panic!("Matching engine should be initialized");
750        }
751    }
752
753    /// # Panics
754    ///
755    /// Panics if generating fresh account state fails.
756    pub fn generate_fresh_account_state(&self) {
757        let balances: Vec<AccountBalance> = self
758            .starting_balances
759            .iter()
760            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
761            .collect();
762
763        if let Some(exec_client) = &self.exec_client {
764            exec_client
765                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
766                .unwrap();
767        }
768
769        // Set leverages
770        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
771            margin_account.set_default_leverage(self.default_leverage.to_f64().unwrap());
772
773            // Set instrument specific leverages
774            for (instrument_id, leverage) in &self.leverages {
775                margin_account.set_leverage(*instrument_id, leverage.to_f64().unwrap());
776            }
777        }
778    }
779}
780
781////////////////////////////////////////////////////////////////////////////////
782// Tests
783////////////////////////////////////////////////////////////////////////////////
784#[cfg(test)]
785mod tests {
786    use std::{
787        cell::RefCell,
788        collections::{BinaryHeap, HashMap},
789        rc::Rc,
790        sync::LazyLock,
791    };
792
793    use nautilus_common::{
794        cache::Cache,
795        clock::TestClock,
796        messages::execution::{SubmitOrder, TradingCommand},
797        msgbus::{
798            self,
799            stubs::{get_message_saving_handler, get_saved_messages},
800        },
801    };
802    use nautilus_core::{AtomicTime, UUID4, UnixNanos};
803    use nautilus_execution::models::{
804        fee::{FeeModelAny, MakerTakerFeeModel},
805        fill::FillModel,
806        latency::LatencyModel,
807    };
808    use nautilus_model::{
809        accounts::{AccountAny, MarginAccount},
810        data::{
811            Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
812            TradeTick,
813        },
814        enums::{
815            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
816            OmsType, OrderSide, OrderType,
817        },
818        events::AccountState,
819        identifiers::{
820            AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
821            VenueOrderId,
822        },
823        instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
824        orders::OrderTestBuilder,
825        types::{AccountBalance, Currency, Money, Price, Quantity},
826    };
827    use rstest::rstest;
828
829    use crate::{
830        exchange::{InflightCommand, SimulatedExchange},
831        execution_client::BacktestExecutionClient,
832    };
833
834    static ATOMIC_TIME: LazyLock<AtomicTime> =
835        LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
836
837    fn get_exchange(
838        venue: Venue,
839        account_type: AccountType,
840        book_type: BookType,
841        cache: Option<Rc<RefCell<Cache>>>,
842    ) -> Rc<RefCell<SimulatedExchange>> {
843        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
844        let clock = Rc::new(RefCell::new(TestClock::new()));
845        let exchange = Rc::new(RefCell::new(
846            SimulatedExchange::new(
847                venue,
848                OmsType::Netting,
849                account_type,
850                vec![Money::new(1000.0, Currency::USD())],
851                None,
852                1.into(),
853                HashMap::new(),
854                vec![],
855                cache.clone(),
856                clock,
857                FillModel::default(),
858                FeeModelAny::MakerTaker(MakerTakerFeeModel),
859                book_type,
860                None,
861                None,
862                None,
863                None,
864                None,
865                None,
866                None,
867                None,
868                None,
869                None,
870            )
871            .unwrap(),
872        ));
873
874        let clock = TestClock::new();
875        let execution_client = BacktestExecutionClient::new(
876            TraderId::default(),
877            AccountId::default(),
878            exchange.clone(),
879            cache,
880            Rc::new(RefCell::new(clock)),
881            None,
882            None,
883        );
884        exchange
885            .borrow_mut()
886            .register_client(Rc::new(execution_client));
887
888        exchange
889    }
890
891    fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
892        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
893        let order = OrderTestBuilder::new(OrderType::Market)
894            .instrument_id(instrument_id)
895            .quantity(Quantity::from(1))
896            .build();
897        TradingCommand::SubmitOrder(
898            SubmitOrder::new(
899                TraderId::default(),
900                ClientId::default(),
901                StrategyId::default(),
902                instrument_id,
903                ClientOrderId::default(),
904                VenueOrderId::default(),
905                order,
906                None,
907                None,
908                UUID4::default(),
909                ts_init,
910            )
911            .unwrap(),
912        )
913    }
914
915    #[rstest]
916    #[should_panic(
917        expected = r#"Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"#
918    )]
919    fn test_venue_mismatch_between_exchange_and_instrument(
920        crypto_perpetual_ethusdt: CryptoPerpetual,
921    ) {
922        let exchange = get_exchange(
923            Venue::new("SIM"),
924            AccountType::Margin,
925            BookType::L1_MBP,
926            None,
927        );
928        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
929        exchange.borrow_mut().add_instrument(instrument).unwrap();
930    }
931
932    #[rstest]
933    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
934    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
935        let exchange = get_exchange(
936            Venue::new("BINANCE"),
937            AccountType::Cash,
938            BookType::L1_MBP,
939            None,
940        );
941        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
942        exchange.borrow_mut().add_instrument(instrument).unwrap();
943    }
944
945    #[rstest]
946    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
947        let exchange = get_exchange(
948            Venue::new("BINANCE"),
949            AccountType::Margin,
950            BookType::L1_MBP,
951            None,
952        );
953        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
954
955        // register instrument
956        exchange.borrow_mut().add_instrument(instrument).unwrap();
957
958        // process tick
959        let quote_tick = QuoteTick::new(
960            crypto_perpetual_ethusdt.id,
961            Price::from("1000"),
962            Price::from("1001"),
963            Quantity::from(1),
964            Quantity::from(1),
965            UnixNanos::default(),
966            UnixNanos::default(),
967        );
968        exchange.borrow_mut().process_quote_tick(&quote_tick);
969
970        let best_bid_price = exchange
971            .borrow()
972            .best_bid_price(crypto_perpetual_ethusdt.id);
973        assert_eq!(best_bid_price, Some(Price::from("1000")));
974        let best_ask_price = exchange
975            .borrow()
976            .best_ask_price(crypto_perpetual_ethusdt.id);
977        assert_eq!(best_ask_price, Some(Price::from("1001")));
978    }
979
980    #[rstest]
981    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
982        let exchange = get_exchange(
983            Venue::new("BINANCE"),
984            AccountType::Margin,
985            BookType::L1_MBP,
986            None,
987        );
988        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
989
990        // register instrument
991        exchange.borrow_mut().add_instrument(instrument).unwrap();
992
993        // process tick
994        let trade_tick = TradeTick::new(
995            crypto_perpetual_ethusdt.id,
996            Price::from("1000"),
997            Quantity::from(1),
998            AggressorSide::Buyer,
999            TradeId::from("1"),
1000            UnixNanos::default(),
1001            UnixNanos::default(),
1002        );
1003        exchange.borrow_mut().process_trade_tick(&trade_tick);
1004
1005        let best_bid_price = exchange
1006            .borrow()
1007            .best_bid_price(crypto_perpetual_ethusdt.id);
1008        assert_eq!(best_bid_price, Some(Price::from("1000")));
1009        let best_ask = exchange
1010            .borrow()
1011            .best_ask_price(crypto_perpetual_ethusdt.id);
1012        assert_eq!(best_ask, Some(Price::from("1000")));
1013    }
1014
1015    #[rstest]
1016    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1017        let exchange = get_exchange(
1018            Venue::new("BINANCE"),
1019            AccountType::Margin,
1020            BookType::L1_MBP,
1021            None,
1022        );
1023        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1024
1025        // register instrument
1026        exchange.borrow_mut().add_instrument(instrument).unwrap();
1027
1028        // process bar
1029        let bar = Bar::new(
1030            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1031            Price::from("1500.00"),
1032            Price::from("1505.00"),
1033            Price::from("1490.00"),
1034            Price::from("1502.00"),
1035            Quantity::from(100),
1036            UnixNanos::default(),
1037            UnixNanos::default(),
1038        );
1039        exchange.borrow_mut().process_bar(bar);
1040
1041        // this will be processed as ticks so both bid and ask will be the same as close of the bar
1042        let best_bid_price = exchange
1043            .borrow()
1044            .best_bid_price(crypto_perpetual_ethusdt.id);
1045        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1046        let best_ask_price = exchange
1047            .borrow()
1048            .best_ask_price(crypto_perpetual_ethusdt.id);
1049        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1050    }
1051
1052    #[rstest]
1053    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1054        let exchange = get_exchange(
1055            Venue::new("BINANCE"),
1056            AccountType::Margin,
1057            BookType::L1_MBP,
1058            None,
1059        );
1060        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1061
1062        // register instrument
1063        exchange.borrow_mut().add_instrument(instrument).unwrap();
1064
1065        // create both bid and ask based bars
1066        // add +1 on ask to make sure it is different from bid
1067        let bar_bid = Bar::new(
1068            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1069            Price::from("1500.00"),
1070            Price::from("1505.00"),
1071            Price::from("1490.00"),
1072            Price::from("1502.00"),
1073            Quantity::from(100),
1074            UnixNanos::from(1),
1075            UnixNanos::from(1),
1076        );
1077        let bar_ask = Bar::new(
1078            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1079            Price::from("1501.00"),
1080            Price::from("1506.00"),
1081            Price::from("1491.00"),
1082            Price::from("1503.00"),
1083            Quantity::from(100),
1084            UnixNanos::from(1),
1085            UnixNanos::from(1),
1086        );
1087
1088        // process them
1089        exchange.borrow_mut().process_bar(bar_bid);
1090        exchange.borrow_mut().process_bar(bar_ask);
1091
1092        // current bid and ask prices will be the corresponding close of the ask and bid bar
1093        let best_bid_price = exchange
1094            .borrow()
1095            .best_bid_price(crypto_perpetual_ethusdt.id);
1096        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1097        let best_ask_price = exchange
1098            .borrow()
1099            .best_ask_price(crypto_perpetual_ethusdt.id);
1100        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1101    }
1102
1103    #[rstest]
1104    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1105        let exchange = get_exchange(
1106            Venue::new("BINANCE"),
1107            AccountType::Margin,
1108            BookType::L2_MBP,
1109            None,
1110        );
1111        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1112
1113        // register instrument
1114        exchange.borrow_mut().add_instrument(instrument).unwrap();
1115
1116        // create order book delta at both bid and ask with incremented ts init and sequence
1117        let delta_buy = OrderBookDelta::new(
1118            crypto_perpetual_ethusdt.id,
1119            BookAction::Add,
1120            BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1121            0,
1122            0,
1123            UnixNanos::from(1),
1124            UnixNanos::from(1),
1125        );
1126        let delta_sell = OrderBookDelta::new(
1127            crypto_perpetual_ethusdt.id,
1128            BookAction::Add,
1129            BookOrder::new(
1130                OrderSide::Sell,
1131                Price::from("1001.00"),
1132                Quantity::from(1),
1133                1,
1134            ),
1135            0,
1136            1,
1137            UnixNanos::from(2),
1138            UnixNanos::from(2),
1139        );
1140
1141        // process both deltas
1142        exchange.borrow_mut().process_order_book_delta(delta_buy);
1143        exchange.borrow_mut().process_order_book_delta(delta_sell);
1144
1145        let book = exchange
1146            .borrow()
1147            .get_book(crypto_perpetual_ethusdt.id)
1148            .unwrap()
1149            .clone();
1150        assert_eq!(book.update_count, 2);
1151        assert_eq!(book.sequence, 1);
1152        assert_eq!(book.ts_last, UnixNanos::from(2));
1153        let best_bid_price = exchange
1154            .borrow()
1155            .best_bid_price(crypto_perpetual_ethusdt.id);
1156        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1157        let best_ask_price = exchange
1158            .borrow()
1159            .best_ask_price(crypto_perpetual_ethusdt.id);
1160        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1161    }
1162
1163    #[rstest]
1164    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1165        let exchange = get_exchange(
1166            Venue::new("BINANCE"),
1167            AccountType::Margin,
1168            BookType::L2_MBP,
1169            None,
1170        );
1171        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1172
1173        // register instrument
1174        exchange.borrow_mut().add_instrument(instrument).unwrap();
1175
1176        // create two sell order book deltas with same timestamps and higher sequence
1177        let delta_sell_1 = OrderBookDelta::new(
1178            crypto_perpetual_ethusdt.id,
1179            BookAction::Add,
1180            BookOrder::new(
1181                OrderSide::Sell,
1182                Price::from("1000.00"),
1183                Quantity::from(3),
1184                1,
1185            ),
1186            0,
1187            0,
1188            UnixNanos::from(1),
1189            UnixNanos::from(1),
1190        );
1191        let delta_sell_2 = OrderBookDelta::new(
1192            crypto_perpetual_ethusdt.id,
1193            BookAction::Add,
1194            BookOrder::new(
1195                OrderSide::Sell,
1196                Price::from("1001.00"),
1197                Quantity::from(1),
1198                1,
1199            ),
1200            0,
1201            1,
1202            UnixNanos::from(1),
1203            UnixNanos::from(1),
1204        );
1205        let orderbook_deltas = OrderBookDeltas::new(
1206            crypto_perpetual_ethusdt.id,
1207            vec![delta_sell_1, delta_sell_2],
1208        );
1209
1210        // process both deltas
1211        exchange
1212            .borrow_mut()
1213            .process_order_book_deltas(orderbook_deltas);
1214
1215        let book = exchange
1216            .borrow()
1217            .get_book(crypto_perpetual_ethusdt.id)
1218            .unwrap()
1219            .clone();
1220        assert_eq!(book.update_count, 2);
1221        assert_eq!(book.sequence, 1);
1222        assert_eq!(book.ts_last, UnixNanos::from(1));
1223        let best_bid_price = exchange
1224            .borrow()
1225            .best_bid_price(crypto_perpetual_ethusdt.id);
1226        // no bid orders in orderbook deltas
1227        assert_eq!(best_bid_price, None);
1228        let best_ask_price = exchange
1229            .borrow()
1230            .best_ask_price(crypto_perpetual_ethusdt.id);
1231        // best ask price is the first order in orderbook deltas
1232        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1233    }
1234
1235    #[rstest]
1236    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1237        let exchange = get_exchange(
1238            Venue::new("BINANCE"),
1239            AccountType::Margin,
1240            BookType::L2_MBP,
1241            None,
1242        );
1243        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1244
1245        // register instrument
1246        exchange.borrow_mut().add_instrument(instrument).unwrap();
1247
1248        let instrument_status = InstrumentStatus::new(
1249            crypto_perpetual_ethusdt.id,
1250            MarketStatusAction::Close, // close the market
1251            UnixNanos::from(1),
1252            UnixNanos::from(1),
1253            None,
1254            None,
1255            None,
1256            None,
1257            None,
1258        );
1259
1260        exchange
1261            .borrow_mut()
1262            .process_instrument_status(instrument_status);
1263
1264        let market_status = exchange
1265            .borrow()
1266            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1267            .unwrap()
1268            .market_status;
1269        assert_eq!(market_status, MarketStatus::Closed);
1270    }
1271
1272    #[rstest]
1273    fn test_accounting() {
1274        let account_type = AccountType::Margin;
1275        let mut cache = Cache::default();
1276        let handler = get_message_saving_handler::<AccountState>(None);
1277        msgbus::register("Portfolio.update_account".into(), handler.clone());
1278        let margin_account = MarginAccount::new(
1279            AccountState::new(
1280                AccountId::from("SIM-001"),
1281                account_type,
1282                vec![AccountBalance::new(
1283                    Money::from("1000 USD"),
1284                    Money::from("0 USD"),
1285                    Money::from("1000 USD"),
1286                )],
1287                vec![],
1288                false,
1289                UUID4::default(),
1290                UnixNanos::default(),
1291                UnixNanos::default(),
1292                None,
1293            ),
1294            false,
1295        );
1296        let () = cache
1297            .add_account(AccountAny::Margin(margin_account))
1298            .unwrap();
1299        // build indexes
1300        cache.build_index();
1301
1302        let exchange = get_exchange(
1303            Venue::new("SIM"),
1304            account_type,
1305            BookType::L2_MBP,
1306            Some(Rc::new(RefCell::new(cache))),
1307        );
1308        exchange.borrow_mut().initialize_account();
1309
1310        // Test adjust account, increase balance by 500 USD
1311        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1312
1313        // Check if we received two messages, one for initial account state and one for adjusted account state
1314        let messages = get_saved_messages::<AccountState>(handler);
1315        assert_eq!(messages.len(), 2);
1316        let account_state_first = messages.first().unwrap();
1317        let account_state_second = messages.last().unwrap();
1318
1319        assert_eq!(account_state_first.balances.len(), 1);
1320        let current_balance = account_state_first.balances[0];
1321        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1322        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1323        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1324
1325        assert_eq!(account_state_second.balances.len(), 1);
1326        let current_balance = account_state_second.balances[0];
1327        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1328        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1329        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1330    }
1331
1332    #[rstest]
1333    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1334        // Create 3 inflight commands with different timestamps and counters
1335        let inflight1 = InflightCommand::new(
1336            UnixNanos::from(100),
1337            1,
1338            create_submit_order_command(UnixNanos::from(100)),
1339        );
1340        let inflight2 = InflightCommand::new(
1341            UnixNanos::from(200),
1342            2,
1343            create_submit_order_command(UnixNanos::from(200)),
1344        );
1345        let inflight3 = InflightCommand::new(
1346            UnixNanos::from(100),
1347            2,
1348            create_submit_order_command(UnixNanos::from(100)),
1349        );
1350
1351        // Create a binary heap and push the inflight commands
1352        let mut inflight_heap = BinaryHeap::new();
1353        inflight_heap.push(inflight1);
1354        inflight_heap.push(inflight2);
1355        inflight_heap.push(inflight3);
1356
1357        // Pop the inflight commands and check if they are in the correct order
1358        // by our custom ordering with counter and timestamp
1359        let first = inflight_heap.pop().unwrap();
1360        let second = inflight_heap.pop().unwrap();
1361        let third = inflight_heap.pop().unwrap();
1362
1363        assert_eq!(first.ts, UnixNanos::from(100));
1364        assert_eq!(first.counter, 1);
1365        assert_eq!(second.ts, UnixNanos::from(100));
1366        assert_eq!(second.counter, 2);
1367        assert_eq!(third.ts, UnixNanos::from(200));
1368        assert_eq!(third.counter, 2);
1369    }
1370
1371    #[rstest]
1372    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1373        let exchange = get_exchange(
1374            Venue::new("BINANCE"),
1375            AccountType::Margin,
1376            BookType::L2_MBP,
1377            None,
1378        );
1379
1380        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1381        exchange.borrow_mut().add_instrument(instrument).unwrap();
1382
1383        let command1 = create_submit_order_command(UnixNanos::from(100));
1384        let command2 = create_submit_order_command(UnixNanos::from(200));
1385
1386        exchange.borrow_mut().send(command1);
1387        exchange.borrow_mut().send(command2);
1388
1389        // Verify that message queue has 2 commands and inflight queue is empty
1390        // as we are not using latency model
1391        assert_eq!(exchange.borrow().message_queue.len(), 2);
1392        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1393
1394        // Process command and check that queues is empty
1395        exchange.borrow_mut().process(UnixNanos::from(300));
1396        assert_eq!(exchange.borrow().message_queue.len(), 0);
1397        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1398    }
1399
1400    #[rstest]
1401    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1402        let latency_model = LatencyModel::new(
1403            UnixNanos::from(100),
1404            UnixNanos::from(200),
1405            UnixNanos::from(300),
1406            UnixNanos::from(100),
1407        );
1408        let exchange = get_exchange(
1409            Venue::new("BINANCE"),
1410            AccountType::Margin,
1411            BookType::L2_MBP,
1412            None,
1413        );
1414        exchange.borrow_mut().set_latency_model(latency_model);
1415
1416        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1417        exchange.borrow_mut().add_instrument(instrument).unwrap();
1418
1419        let command1 = create_submit_order_command(UnixNanos::from(100));
1420        let command2 = create_submit_order_command(UnixNanos::from(150));
1421        exchange.borrow_mut().send(command1);
1422        exchange.borrow_mut().send(command2);
1423
1424        // Verify that inflight queue has 2 commands and message queue is empty
1425        assert_eq!(exchange.borrow().message_queue.len(), 0);
1426        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1427        // First inflight command should have timestamp at 100 and 200 insert latency
1428        assert_eq!(
1429            exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1430            UnixNanos::from(300)
1431        );
1432        // Second inflight command should have timestamp at 150 and 200 insert latency
1433        assert_eq!(
1434            exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1435            UnixNanos::from(350)
1436        );
1437
1438        // Process at timestamp 350, and test that only first command is processed
1439        exchange.borrow_mut().process(UnixNanos::from(320));
1440        assert_eq!(exchange.borrow().message_queue.len(), 0);
1441        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1442        assert_eq!(
1443            exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1444            UnixNanos::from(350)
1445        );
1446    }
1447}