nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::{BinaryHeap, HashMap, VecDeque},
25    fmt::Debug,
26    rc::Rc,
27};
28
29use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
30use nautilus_core::{
31    UnixNanos,
32    correctness::{FAILED, check_equal},
33};
34use nautilus_execution::{
35    client::ExecutionClient,
36    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
37    models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40    accounts::AccountAny,
41    data::{
42        Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43        QuoteTick, TradeTick,
44    },
45    enums::{AccountType, BookType, OmsType},
46    identifiers::{InstrumentId, Venue},
47    instruments::{Instrument, InstrumentAny},
48    orderbook::OrderBook,
49    orders::PassiveOrderAny,
50    types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53
54use crate::modules::SimulationModule;
55
56/// Represents commands with simulated network latency in a min-heap priority queue.
57/// The commands are ordered by timestamp for FIFO processing, with the
58/// earliest timestamp having the highest priority in the queue.
59#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61    ts: UnixNanos,
62    counter: u32,
63    command: TradingCommand,
64}
65
66impl InflightCommand {
67    const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68        Self {
69            ts,
70            counter,
71            command,
72        }
73    }
74}
75
76impl Ord for InflightCommand {
77    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
79        other
80            .ts
81            .cmp(&self.ts)
82            .then_with(|| other.counter.cmp(&self.counter))
83    }
84}
85
86impl PartialOrd for InflightCommand {
87    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92pub struct SimulatedExchange {
93    pub id: Venue,
94    pub oms_type: OmsType,
95    pub account_type: AccountType,
96    starting_balances: Vec<Money>,
97    book_type: BookType,
98    default_leverage: Decimal,
99    exec_client: Option<Rc<dyn ExecutionClient>>,
100    pub base_currency: Option<Currency>,
101    fee_model: FeeModelAny,
102    fill_model: FillModel,
103    latency_model: Option<LatencyModel>,
104    instruments: HashMap<InstrumentId, InstrumentAny>,
105    matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
106    leverages: HashMap<InstrumentId, Decimal>,
107    modules: Vec<Box<dyn SimulationModule>>,
108    clock: Rc<RefCell<dyn Clock>>,
109    cache: Rc<RefCell<Cache>>,
110    message_queue: VecDeque<TradingCommand>,
111    inflight_queue: BinaryHeap<InflightCommand>,
112    inflight_counter: HashMap<UnixNanos, u32>,
113    frozen_account: bool,
114    bar_execution: bool,
115    reject_stop_orders: bool,
116    support_gtd_orders: bool,
117    support_contingent_orders: bool,
118    use_position_ids: bool,
119    use_random_ids: bool,
120    use_reduce_only: bool,
121    use_message_queue: bool,
122}
123
124impl Debug for SimulatedExchange {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct(stringify!(SimulatedExchange))
127            .field("id", &self.id)
128            .field("account_type", &self.account_type)
129            .finish()
130    }
131}
132
133impl SimulatedExchange {
134    /// Creates a new [`SimulatedExchange`] instance.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if:
139    /// - `starting_balances` is empty.
140    /// - `base_currency` is `Some` but `starting_balances` contains multiple currencies.
141    #[allow(clippy::too_many_arguments)]
142    pub fn new(
143        venue: Venue,
144        oms_type: OmsType,
145        account_type: AccountType,
146        starting_balances: Vec<Money>,
147        base_currency: Option<Currency>,
148        default_leverage: Decimal,
149        leverages: HashMap<InstrumentId, Decimal>,
150        modules: Vec<Box<dyn SimulationModule>>,
151        cache: Rc<RefCell<Cache>>,
152        clock: Rc<RefCell<dyn Clock>>,
153        fill_model: FillModel,
154        fee_model: FeeModelAny,
155        book_type: BookType,
156        latency_model: Option<LatencyModel>,
157        frozen_account: Option<bool>,
158        bar_execution: Option<bool>,
159        reject_stop_orders: Option<bool>,
160        support_gtd_orders: Option<bool>,
161        support_contingent_orders: Option<bool>,
162        use_position_ids: Option<bool>,
163        use_random_ids: Option<bool>,
164        use_reduce_only: Option<bool>,
165        use_message_queue: Option<bool>,
166    ) -> anyhow::Result<Self> {
167        if starting_balances.is_empty() {
168            anyhow::bail!("Starting balances must be provided")
169        }
170        if base_currency.is_some() && starting_balances.len() > 1 {
171            anyhow::bail!("single-currency account has multiple starting currencies")
172        }
173        // TODO register and load modules
174        Ok(Self {
175            id: venue,
176            oms_type,
177            account_type,
178            starting_balances,
179            book_type,
180            default_leverage,
181            exec_client: None,
182            base_currency,
183            fee_model,
184            fill_model,
185            latency_model,
186            instruments: HashMap::new(),
187            matching_engines: HashMap::new(),
188            leverages,
189            modules,
190            clock,
191            cache,
192            message_queue: VecDeque::new(),
193            inflight_queue: BinaryHeap::new(),
194            inflight_counter: HashMap::new(),
195            frozen_account: frozen_account.unwrap_or(false),
196            bar_execution: bar_execution.unwrap_or(true),
197            reject_stop_orders: reject_stop_orders.unwrap_or(true),
198            support_gtd_orders: support_gtd_orders.unwrap_or(true),
199            support_contingent_orders: support_contingent_orders.unwrap_or(true),
200            use_position_ids: use_position_ids.unwrap_or(true),
201            use_random_ids: use_random_ids.unwrap_or(false),
202            use_reduce_only: use_reduce_only.unwrap_or(true),
203            use_message_queue: use_message_queue.unwrap_or(true),
204        })
205    }
206
207    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
208        self.exec_client = Some(client);
209    }
210
211    pub fn set_fill_model(&mut self, fill_model: FillModel) {
212        for matching_engine in self.matching_engines.values_mut() {
213            matching_engine.set_fill_model(fill_model.clone());
214            log::info!(
215                "Setting fill model for {} to {}",
216                matching_engine.venue,
217                self.fill_model
218            );
219        }
220        self.fill_model = fill_model;
221    }
222
223    pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
224        self.latency_model = Some(latency_model);
225    }
226
227    pub fn initialize_account(&mut self) {
228        self.generate_fresh_account_state();
229    }
230
231    /// Adds an instrument to the simulated exchange and initializes its matching engine.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the exchange account type is `Cash` and the instrument is a `CryptoPerpetual` or `CryptoFuture`.
236    ///
237    /// # Panics
238    ///
239    /// Panics if the instrument cannot be added to the exchange.
240    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
241        check_equal(
242            &instrument.id().venue,
243            &self.id,
244            "Venue of instrument id",
245            "Venue of simulated exchange",
246        )
247        .expect(FAILED);
248
249        if self.account_type == AccountType::Cash
250            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
251                || matches!(instrument, InstrumentAny::CryptoFuture(_)))
252        {
253            anyhow::bail!("Cash account cannot trade futures or perpetuals")
254        }
255
256        self.instruments.insert(instrument.id(), instrument.clone());
257
258        let matching_engine_config = OrderMatchingEngineConfig::new(
259            self.bar_execution,
260            self.reject_stop_orders,
261            self.support_gtd_orders,
262            self.support_contingent_orders,
263            self.use_position_ids,
264            self.use_random_ids,
265            self.use_reduce_only,
266        );
267        let instrument_id = instrument.id();
268        let matching_engine = OrderMatchingEngine::new(
269            instrument,
270            self.instruments.len() as u32,
271            self.fill_model.clone(),
272            self.fee_model.clone(),
273            self.book_type,
274            self.oms_type,
275            self.account_type,
276            self.clock.clone(),
277            Rc::clone(&self.cache),
278            matching_engine_config,
279        );
280        self.matching_engines.insert(instrument_id, matching_engine);
281
282        log::info!("Added instrument {instrument_id} and created matching engine");
283        Ok(())
284    }
285
286    #[must_use]
287    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
288        self.matching_engines
289            .get(&instrument_id)
290            .and_then(OrderMatchingEngine::best_bid_price)
291    }
292
293    #[must_use]
294    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
295        self.matching_engines
296            .get(&instrument_id)
297            .and_then(OrderMatchingEngine::best_ask_price)
298    }
299
300    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
301        self.matching_engines
302            .get(&instrument_id)
303            .map(OrderMatchingEngine::get_book)
304    }
305
306    #[must_use]
307    pub fn get_matching_engine(
308        &self,
309        instrument_id: &InstrumentId,
310    ) -> Option<&OrderMatchingEngine> {
311        self.matching_engines.get(instrument_id)
312    }
313
314    #[must_use]
315    pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
316        &self.matching_engines
317    }
318
319    #[must_use]
320    pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
321        let mut books = HashMap::new();
322        for (instrument_id, matching_engine) in &self.matching_engines {
323            books.insert(*instrument_id, matching_engine.get_book().clone());
324        }
325        books
326    }
327
328    #[must_use]
329    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
330        instrument_id
331            .and_then(|id| {
332                self.matching_engines
333                    .get(&id)
334                    .map(OrderMatchingEngine::get_open_orders)
335            })
336            .unwrap_or_else(|| {
337                self.matching_engines
338                    .values()
339                    .flat_map(OrderMatchingEngine::get_open_orders)
340                    .collect()
341            })
342    }
343
344    #[must_use]
345    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
346        instrument_id
347            .and_then(|id| {
348                self.matching_engines
349                    .get(&id)
350                    .map(|engine| engine.get_open_bid_orders().to_vec())
351            })
352            .unwrap_or_else(|| {
353                self.matching_engines
354                    .values()
355                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
356                    .collect()
357            })
358    }
359
360    #[must_use]
361    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
362        instrument_id
363            .and_then(|id| {
364                self.matching_engines
365                    .get(&id)
366                    .map(|engine| engine.get_open_ask_orders().to_vec())
367            })
368            .unwrap_or_else(|| {
369                self.matching_engines
370                    .values()
371                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
372                    .collect()
373            })
374    }
375
376    /// # Panics
377    ///
378    /// Panics if retrieving the account from the execution client fails.
379    #[must_use]
380    pub fn get_account(&self) -> Option<AccountAny> {
381        self.exec_client
382            .as_ref()
383            .map(|client| client.get_account().unwrap())
384    }
385
386    /// # Panics
387    ///
388    /// Panics if generating account state fails during adjustment.
389    pub fn adjust_account(&mut self, adjustment: Money) {
390        if self.frozen_account {
391            // Nothing to adjust
392            return;
393        }
394
395        if let Some(exec_client) = &self.exec_client {
396            let venue = exec_client.venue();
397            println!("Adjusting account for venue {venue}");
398            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
399                match account.balance(Some(adjustment.currency)) {
400                    Some(balance) => {
401                        let mut current_balance = *balance;
402                        current_balance.total += adjustment;
403                        current_balance.free += adjustment;
404
405                        let margins = match account {
406                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
407                            _ => HashMap::new(),
408                        };
409
410                        if let Some(exec_client) = &self.exec_client {
411                            exec_client
412                                .generate_account_state(
413                                    vec![current_balance],
414                                    margins.values().copied().collect(),
415                                    true,
416                                    self.clock.borrow().timestamp_ns(),
417                                )
418                                .unwrap();
419                        }
420                    }
421                    None => {
422                        log::error!(
423                            "Cannot adjust account: no balance for currency {}",
424                            adjustment.currency
425                        );
426                    }
427                }
428            } else {
429                log::error!("Cannot adjust account: no account for venue {venue}");
430            }
431        }
432    }
433
434    pub fn send(&mut self, command: TradingCommand) {
435        if !self.use_message_queue {
436            self.process_trading_command(command);
437        } else if self.latency_model.is_none() {
438            self.message_queue.push_back(command);
439        } else {
440            let (ts, counter) = self.generate_inflight_command(&command);
441            self.inflight_queue
442                .push(InflightCommand::new(ts, counter, command));
443        }
444    }
445
446    /// # Panics
447    ///
448    /// Panics if the command is invalid when generating inflight command.
449    pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
450        if let Some(latency_model) = &self.latency_model {
451            let ts = match command {
452                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
453                    command.ts_init() + latency_model.insert_latency_nanos
454                }
455                TradingCommand::ModifyOrder(_) => {
456                    command.ts_init() + latency_model.update_latency_nanos
457                }
458                TradingCommand::CancelOrder(_)
459                | TradingCommand::CancelAllOrders(_)
460                | TradingCommand::BatchCancelOrders(_) => {
461                    command.ts_init() + latency_model.delete_latency_nanos
462                }
463                _ => panic!("Invalid command was {command}"),
464            };
465
466            let counter = self
467                .inflight_counter
468                .entry(ts)
469                .and_modify(|e| *e += 1)
470                .or_insert(1);
471
472            (ts, *counter)
473        } else {
474            panic!("Latency model should be initialized");
475        }
476    }
477
478    /// # Panics
479    ///
480    /// Panics if adding a missing instrument during delta processing fails.
481    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
482        for module in &self.modules {
483            module.pre_process(Data::Delta(delta));
484        }
485
486        if !self.matching_engines.contains_key(&delta.instrument_id) {
487            let instrument = {
488                let cache = self.cache.as_ref().borrow();
489                cache.instrument(&delta.instrument_id).cloned()
490            };
491
492            if let Some(instrument) = instrument {
493                self.add_instrument(instrument).unwrap();
494            } else {
495                panic!(
496                    "No matching engine found for instrument {}",
497                    delta.instrument_id
498                );
499            }
500        }
501
502        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
503            matching_engine.process_order_book_delta(&delta);
504        } else {
505            panic!("Matching engine should be initialized");
506        }
507    }
508
509    /// # Panics
510    ///
511    /// Panics if adding a missing instrument during deltas processing fails.
512    pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
513        for module in &self.modules {
514            module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
515        }
516
517        if !self.matching_engines.contains_key(&deltas.instrument_id) {
518            let instrument = {
519                let cache = self.cache.as_ref().borrow();
520                cache.instrument(&deltas.instrument_id).cloned()
521            };
522
523            if let Some(instrument) = instrument {
524                self.add_instrument(instrument).unwrap();
525            } else {
526                panic!(
527                    "No matching engine found for instrument {}",
528                    deltas.instrument_id
529                );
530            }
531        }
532
533        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
534            matching_engine.process_order_book_deltas(&deltas);
535        } else {
536            panic!("Matching engine should be initialized");
537        }
538    }
539
540    /// # Panics
541    ///
542    /// Panics if adding a missing instrument during quote tick processing fails.
543    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
544        for module in &self.modules {
545            module.pre_process(Data::Quote(quote.to_owned()));
546        }
547
548        if !self.matching_engines.contains_key(&quote.instrument_id) {
549            let instrument = {
550                let cache = self.cache.as_ref().borrow();
551                cache.instrument(&quote.instrument_id).cloned()
552            };
553
554            if let Some(instrument) = instrument {
555                self.add_instrument(instrument).unwrap();
556            } else {
557                panic!(
558                    "No matching engine found for instrument {}",
559                    quote.instrument_id
560                );
561            }
562        }
563
564        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
565            matching_engine.process_quote_tick(quote);
566        } else {
567            panic!("Matching engine should be initialized");
568        }
569    }
570
571    /// # Panics
572    ///
573    /// Panics if adding a missing instrument during trade tick processing fails.
574    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
575        for module in &self.modules {
576            module.pre_process(Data::Trade(trade.to_owned()));
577        }
578
579        if !self.matching_engines.contains_key(&trade.instrument_id) {
580            let instrument = {
581                let cache = self.cache.as_ref().borrow();
582                cache.instrument(&trade.instrument_id).cloned()
583            };
584
585            if let Some(instrument) = instrument {
586                self.add_instrument(instrument).unwrap();
587            } else {
588                panic!(
589                    "No matching engine found for instrument {}",
590                    trade.instrument_id
591                );
592            }
593        }
594
595        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
596            matching_engine.process_trade_tick(trade);
597        } else {
598            panic!("Matching engine should be initialized");
599        }
600    }
601
602    /// # Panics
603    ///
604    /// Panics if adding a missing instrument during bar processing fails.
605    pub fn process_bar(&mut self, bar: Bar) {
606        for module in &self.modules {
607            module.pre_process(Data::Bar(bar));
608        }
609
610        if !self.matching_engines.contains_key(&bar.instrument_id()) {
611            let instrument = {
612                let cache = self.cache.as_ref().borrow();
613                cache.instrument(&bar.instrument_id()).cloned()
614            };
615
616            if let Some(instrument) = instrument {
617                self.add_instrument(instrument).unwrap();
618            } else {
619                panic!(
620                    "No matching engine found for instrument {}",
621                    bar.instrument_id()
622                );
623            }
624        }
625
626        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
627            matching_engine.process_bar(&bar);
628        } else {
629            panic!("Matching engine should be initialized");
630        }
631    }
632
633    /// # Panics
634    ///
635    /// Panics if adding a missing instrument during instrument status processing fails.
636    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
637        // TODO add module preprocessing
638
639        if !self.matching_engines.contains_key(&status.instrument_id) {
640            let instrument = {
641                let cache = self.cache.as_ref().borrow();
642                cache.instrument(&status.instrument_id).cloned()
643            };
644
645            if let Some(instrument) = instrument {
646                self.add_instrument(instrument).unwrap();
647            } else {
648                panic!(
649                    "No matching engine found for instrument {}",
650                    status.instrument_id
651                );
652            }
653        }
654
655        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
656            matching_engine.process_status(status.action);
657        } else {
658            panic!("Matching engine should be initialized");
659        }
660    }
661
662    /// # Panics
663    ///
664    /// Panics if popping an inflight command fails during processing.
665    pub fn process(&mut self, ts_now: UnixNanos) {
666        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
667
668        // Process inflight commands
669        while let Some(inflight) = self.inflight_queue.peek() {
670            if inflight.ts > ts_now {
671                // Future commands remain in the queue
672                break;
673            }
674            // We get the inflight command, remove it from the queue and process it
675            let inflight = self.inflight_queue.pop().unwrap();
676            self.process_trading_command(inflight.command);
677        }
678
679        // Process regular message queue
680        while let Some(command) = self.message_queue.pop_front() {
681            self.process_trading_command(command);
682        }
683    }
684
685    pub fn reset(&mut self) {
686        for module in &self.modules {
687            module.reset();
688        }
689
690        self.generate_fresh_account_state();
691
692        for matching_engine in self.matching_engines.values_mut() {
693            matching_engine.reset();
694        }
695
696        // TODO Clear the inflight and message queues
697        log::info!("Resetting exchange state");
698    }
699
700    /// # Panics
701    ///
702    /// Panics if execution client is uninitialized when processing trading command.
703    pub fn process_trading_command(&mut self, command: TradingCommand) {
704        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
705            let account_id = if let Some(exec_client) = &self.exec_client {
706                exec_client.account_id()
707            } else {
708                panic!("Execution client should be initialized");
709            };
710            match command {
711                TradingCommand::SubmitOrder(mut command) => {
712                    matching_engine.process_order(&mut command.order, account_id);
713                }
714                TradingCommand::ModifyOrder(ref command) => {
715                    matching_engine.process_modify(command, account_id);
716                }
717                TradingCommand::CancelOrder(ref command) => {
718                    matching_engine.process_cancel(command, account_id);
719                }
720                TradingCommand::CancelAllOrders(ref command) => {
721                    matching_engine.process_cancel_all(command, account_id);
722                }
723                TradingCommand::BatchCancelOrders(ref command) => {
724                    matching_engine.process_batch_cancel(command, account_id);
725                }
726                TradingCommand::SubmitOrderList(mut command) => {
727                    for order in &mut command.order_list.orders {
728                        matching_engine.process_order(order, account_id);
729                    }
730                }
731                _ => {}
732            }
733        } else {
734            panic!("Matching engine should be initialized");
735        }
736    }
737
738    /// # Panics
739    ///
740    /// Panics if generating fresh account state fails.
741    pub fn generate_fresh_account_state(&self) {
742        let balances: Vec<AccountBalance> = self
743            .starting_balances
744            .iter()
745            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
746            .collect();
747
748        if let Some(exec_client) = &self.exec_client {
749            exec_client
750                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
751                .unwrap();
752        }
753
754        // Set leverages
755        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
756            margin_account.set_default_leverage(self.default_leverage.to_f64().unwrap());
757
758            // Set instrument specific leverages
759            for (instrument_id, leverage) in &self.leverages {
760                margin_account.set_leverage(*instrument_id, leverage.to_f64().unwrap());
761            }
762        }
763    }
764}
765
766////////////////////////////////////////////////////////////////////////////////
767// Tests
768////////////////////////////////////////////////////////////////////////////////
769#[cfg(test)]
770mod tests {
771    use std::{
772        cell::RefCell,
773        collections::{BinaryHeap, HashMap},
774        rc::Rc,
775        sync::LazyLock,
776    };
777
778    use nautilus_common::{
779        cache::Cache,
780        clock::TestClock,
781        messages::execution::{SubmitOrder, TradingCommand},
782        msgbus::{
783            self,
784            stubs::{get_message_saving_handler, get_saved_messages},
785        },
786    };
787    use nautilus_core::{AtomicTime, UUID4, UnixNanos};
788    use nautilus_execution::models::{
789        fee::{FeeModelAny, MakerTakerFeeModel},
790        fill::FillModel,
791        latency::LatencyModel,
792    };
793    use nautilus_model::{
794        accounts::{AccountAny, MarginAccount},
795        data::{
796            Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
797            TradeTick,
798        },
799        enums::{
800            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
801            OmsType, OrderSide, OrderType,
802        },
803        events::AccountState,
804        identifiers::{
805            AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
806            VenueOrderId,
807        },
808        instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
809        orders::OrderTestBuilder,
810        types::{AccountBalance, Currency, Money, Price, Quantity},
811    };
812    use rstest::rstest;
813
814    use crate::{
815        exchange::{InflightCommand, SimulatedExchange},
816        execution_client::BacktestExecutionClient,
817    };
818
819    static ATOMIC_TIME: LazyLock<AtomicTime> =
820        LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
821
822    fn get_exchange(
823        venue: Venue,
824        account_type: AccountType,
825        book_type: BookType,
826        cache: Option<Rc<RefCell<Cache>>>,
827    ) -> Rc<RefCell<SimulatedExchange>> {
828        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
829        let clock = Rc::new(RefCell::new(TestClock::new()));
830        let exchange = Rc::new(RefCell::new(
831            SimulatedExchange::new(
832                venue,
833                OmsType::Netting,
834                account_type,
835                vec![Money::new(1000.0, Currency::USD())],
836                None,
837                1.into(),
838                HashMap::new(),
839                vec![],
840                cache.clone(),
841                clock,
842                FillModel::default(),
843                FeeModelAny::MakerTaker(MakerTakerFeeModel),
844                book_type,
845                None,
846                None,
847                None,
848                None,
849                None,
850                None,
851                None,
852                None,
853                None,
854                None,
855            )
856            .unwrap(),
857        ));
858
859        let clock = TestClock::new();
860        let execution_client = BacktestExecutionClient::new(
861            TraderId::default(),
862            AccountId::default(),
863            exchange.clone(),
864            cache,
865            Rc::new(RefCell::new(clock)),
866            None,
867            None,
868        );
869        exchange
870            .borrow_mut()
871            .register_client(Rc::new(execution_client));
872
873        exchange
874    }
875
876    fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
877        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
878        let order = OrderTestBuilder::new(OrderType::Market)
879            .instrument_id(instrument_id)
880            .quantity(Quantity::from(1))
881            .build();
882        TradingCommand::SubmitOrder(
883            SubmitOrder::new(
884                TraderId::default(),
885                ClientId::default(),
886                StrategyId::default(),
887                instrument_id,
888                ClientOrderId::default(),
889                VenueOrderId::default(),
890                order,
891                None,
892                None,
893                UUID4::default(),
894                ts_init,
895            )
896            .unwrap(),
897        )
898    }
899
900    #[rstest]
901    #[should_panic(
902        expected = r#"Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"#
903    )]
904    fn test_venue_mismatch_between_exchange_and_instrument(
905        crypto_perpetual_ethusdt: CryptoPerpetual,
906    ) {
907        let exchange = get_exchange(
908            Venue::new("SIM"),
909            AccountType::Margin,
910            BookType::L1_MBP,
911            None,
912        );
913        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
914        exchange.borrow_mut().add_instrument(instrument).unwrap();
915    }
916
917    #[rstest]
918    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
919    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
920        let exchange = get_exchange(
921            Venue::new("BINANCE"),
922            AccountType::Cash,
923            BookType::L1_MBP,
924            None,
925        );
926        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
927        exchange.borrow_mut().add_instrument(instrument).unwrap();
928    }
929
930    #[rstest]
931    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
932        let exchange = get_exchange(
933            Venue::new("BINANCE"),
934            AccountType::Margin,
935            BookType::L1_MBP,
936            None,
937        );
938        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
939
940        // register instrument
941        exchange.borrow_mut().add_instrument(instrument).unwrap();
942
943        // process tick
944        let quote_tick = QuoteTick::new(
945            crypto_perpetual_ethusdt.id,
946            Price::from("1000"),
947            Price::from("1001"),
948            Quantity::from(1),
949            Quantity::from(1),
950            UnixNanos::default(),
951            UnixNanos::default(),
952        );
953        exchange.borrow_mut().process_quote_tick(&quote_tick);
954
955        let best_bid_price = exchange
956            .borrow()
957            .best_bid_price(crypto_perpetual_ethusdt.id);
958        assert_eq!(best_bid_price, Some(Price::from("1000")));
959        let best_ask_price = exchange
960            .borrow()
961            .best_ask_price(crypto_perpetual_ethusdt.id);
962        assert_eq!(best_ask_price, Some(Price::from("1001")));
963    }
964
965    #[rstest]
966    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
967        let exchange = get_exchange(
968            Venue::new("BINANCE"),
969            AccountType::Margin,
970            BookType::L1_MBP,
971            None,
972        );
973        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
974
975        // register instrument
976        exchange.borrow_mut().add_instrument(instrument).unwrap();
977
978        // process tick
979        let trade_tick = TradeTick::new(
980            crypto_perpetual_ethusdt.id,
981            Price::from("1000"),
982            Quantity::from(1),
983            AggressorSide::Buyer,
984            TradeId::from("1"),
985            UnixNanos::default(),
986            UnixNanos::default(),
987        );
988        exchange.borrow_mut().process_trade_tick(&trade_tick);
989
990        let best_bid_price = exchange
991            .borrow()
992            .best_bid_price(crypto_perpetual_ethusdt.id);
993        assert_eq!(best_bid_price, Some(Price::from("1000")));
994        let best_ask = exchange
995            .borrow()
996            .best_ask_price(crypto_perpetual_ethusdt.id);
997        assert_eq!(best_ask, Some(Price::from("1000")));
998    }
999
1000    #[rstest]
1001    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1002        let exchange = get_exchange(
1003            Venue::new("BINANCE"),
1004            AccountType::Margin,
1005            BookType::L1_MBP,
1006            None,
1007        );
1008        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1009
1010        // register instrument
1011        exchange.borrow_mut().add_instrument(instrument).unwrap();
1012
1013        // process bar
1014        let bar = Bar::new(
1015            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1016            Price::from("1500.00"),
1017            Price::from("1505.00"),
1018            Price::from("1490.00"),
1019            Price::from("1502.00"),
1020            Quantity::from(100),
1021            UnixNanos::default(),
1022            UnixNanos::default(),
1023        );
1024        exchange.borrow_mut().process_bar(bar);
1025
1026        // this will be processed as ticks so both bid and ask will be the same as close of the bar
1027        let best_bid_price = exchange
1028            .borrow()
1029            .best_bid_price(crypto_perpetual_ethusdt.id);
1030        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1031        let best_ask_price = exchange
1032            .borrow()
1033            .best_ask_price(crypto_perpetual_ethusdt.id);
1034        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1035    }
1036
1037    #[rstest]
1038    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1039        let exchange = get_exchange(
1040            Venue::new("BINANCE"),
1041            AccountType::Margin,
1042            BookType::L1_MBP,
1043            None,
1044        );
1045        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1046
1047        // register instrument
1048        exchange.borrow_mut().add_instrument(instrument).unwrap();
1049
1050        // create both bid and ask based bars
1051        // add +1 on ask to make sure it is different from bid
1052        let bar_bid = Bar::new(
1053            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1054            Price::from("1500.00"),
1055            Price::from("1505.00"),
1056            Price::from("1490.00"),
1057            Price::from("1502.00"),
1058            Quantity::from(100),
1059            UnixNanos::from(1),
1060            UnixNanos::from(1),
1061        );
1062        let bar_ask = Bar::new(
1063            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1064            Price::from("1501.00"),
1065            Price::from("1506.00"),
1066            Price::from("1491.00"),
1067            Price::from("1503.00"),
1068            Quantity::from(100),
1069            UnixNanos::from(1),
1070            UnixNanos::from(1),
1071        );
1072
1073        // process them
1074        exchange.borrow_mut().process_bar(bar_bid);
1075        exchange.borrow_mut().process_bar(bar_ask);
1076
1077        // current bid and ask prices will be the corresponding close of the ask and bid bar
1078        let best_bid_price = exchange
1079            .borrow()
1080            .best_bid_price(crypto_perpetual_ethusdt.id);
1081        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1082        let best_ask_price = exchange
1083            .borrow()
1084            .best_ask_price(crypto_perpetual_ethusdt.id);
1085        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1086    }
1087
1088    #[rstest]
1089    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1090        let exchange = get_exchange(
1091            Venue::new("BINANCE"),
1092            AccountType::Margin,
1093            BookType::L2_MBP,
1094            None,
1095        );
1096        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1097
1098        // register instrument
1099        exchange.borrow_mut().add_instrument(instrument).unwrap();
1100
1101        // create order book delta at both bid and ask with incremented ts init and sequence
1102        let delta_buy = OrderBookDelta::new(
1103            crypto_perpetual_ethusdt.id,
1104            BookAction::Add,
1105            BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1106            0,
1107            0,
1108            UnixNanos::from(1),
1109            UnixNanos::from(1),
1110        );
1111        let delta_sell = OrderBookDelta::new(
1112            crypto_perpetual_ethusdt.id,
1113            BookAction::Add,
1114            BookOrder::new(
1115                OrderSide::Sell,
1116                Price::from("1001.00"),
1117                Quantity::from(1),
1118                1,
1119            ),
1120            0,
1121            1,
1122            UnixNanos::from(2),
1123            UnixNanos::from(2),
1124        );
1125
1126        // process both deltas
1127        exchange.borrow_mut().process_order_book_delta(delta_buy);
1128        exchange.borrow_mut().process_order_book_delta(delta_sell);
1129
1130        let book = exchange
1131            .borrow()
1132            .get_book(crypto_perpetual_ethusdt.id)
1133            .unwrap()
1134            .clone();
1135        assert_eq!(book.update_count, 2);
1136        assert_eq!(book.sequence, 1);
1137        assert_eq!(book.ts_last, UnixNanos::from(2));
1138        let best_bid_price = exchange
1139            .borrow()
1140            .best_bid_price(crypto_perpetual_ethusdt.id);
1141        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1142        let best_ask_price = exchange
1143            .borrow()
1144            .best_ask_price(crypto_perpetual_ethusdt.id);
1145        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1146    }
1147
1148    #[rstest]
1149    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1150        let exchange = get_exchange(
1151            Venue::new("BINANCE"),
1152            AccountType::Margin,
1153            BookType::L2_MBP,
1154            None,
1155        );
1156        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1157
1158        // register instrument
1159        exchange.borrow_mut().add_instrument(instrument).unwrap();
1160
1161        // create two sell order book deltas with same timestamps and higher sequence
1162        let delta_sell_1 = OrderBookDelta::new(
1163            crypto_perpetual_ethusdt.id,
1164            BookAction::Add,
1165            BookOrder::new(
1166                OrderSide::Sell,
1167                Price::from("1000.00"),
1168                Quantity::from(3),
1169                1,
1170            ),
1171            0,
1172            0,
1173            UnixNanos::from(1),
1174            UnixNanos::from(1),
1175        );
1176        let delta_sell_2 = OrderBookDelta::new(
1177            crypto_perpetual_ethusdt.id,
1178            BookAction::Add,
1179            BookOrder::new(
1180                OrderSide::Sell,
1181                Price::from("1001.00"),
1182                Quantity::from(1),
1183                1,
1184            ),
1185            0,
1186            1,
1187            UnixNanos::from(1),
1188            UnixNanos::from(1),
1189        );
1190        let orderbook_deltas = OrderBookDeltas::new(
1191            crypto_perpetual_ethusdt.id,
1192            vec![delta_sell_1, delta_sell_2],
1193        );
1194
1195        // process both deltas
1196        exchange
1197            .borrow_mut()
1198            .process_order_book_deltas(orderbook_deltas);
1199
1200        let book = exchange
1201            .borrow()
1202            .get_book(crypto_perpetual_ethusdt.id)
1203            .unwrap()
1204            .clone();
1205        assert_eq!(book.update_count, 2);
1206        assert_eq!(book.sequence, 1);
1207        assert_eq!(book.ts_last, UnixNanos::from(1));
1208        let best_bid_price = exchange
1209            .borrow()
1210            .best_bid_price(crypto_perpetual_ethusdt.id);
1211        // no bid orders in orderbook deltas
1212        assert_eq!(best_bid_price, None);
1213        let best_ask_price = exchange
1214            .borrow()
1215            .best_ask_price(crypto_perpetual_ethusdt.id);
1216        // best ask price is the first order in orderbook deltas
1217        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1218    }
1219
1220    #[rstest]
1221    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1222        let exchange = get_exchange(
1223            Venue::new("BINANCE"),
1224            AccountType::Margin,
1225            BookType::L2_MBP,
1226            None,
1227        );
1228        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1229
1230        // register instrument
1231        exchange.borrow_mut().add_instrument(instrument).unwrap();
1232
1233        let instrument_status = InstrumentStatus::new(
1234            crypto_perpetual_ethusdt.id,
1235            MarketStatusAction::Close, // close the market
1236            UnixNanos::from(1),
1237            UnixNanos::from(1),
1238            None,
1239            None,
1240            None,
1241            None,
1242            None,
1243        );
1244
1245        exchange
1246            .borrow_mut()
1247            .process_instrument_status(instrument_status);
1248
1249        let market_status = exchange
1250            .borrow()
1251            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1252            .unwrap()
1253            .market_status;
1254        assert_eq!(market_status, MarketStatus::Closed);
1255    }
1256
1257    #[rstest]
1258    fn test_accounting() {
1259        let account_type = AccountType::Margin;
1260        let mut cache = Cache::default();
1261        let handler = get_message_saving_handler::<AccountState>(None);
1262        msgbus::register("Portfolio.update_account".into(), handler.clone());
1263        let margin_account = MarginAccount::new(
1264            AccountState::new(
1265                AccountId::from("SIM-001"),
1266                account_type,
1267                vec![AccountBalance::new(
1268                    Money::from("1000 USD"),
1269                    Money::from("0 USD"),
1270                    Money::from("1000 USD"),
1271                )],
1272                vec![],
1273                false,
1274                UUID4::default(),
1275                UnixNanos::default(),
1276                UnixNanos::default(),
1277                None,
1278            ),
1279            false,
1280        );
1281        let () = cache
1282            .add_account(AccountAny::Margin(margin_account))
1283            .unwrap();
1284        // build indexes
1285        cache.build_index();
1286
1287        let exchange = get_exchange(
1288            Venue::new("SIM"),
1289            account_type,
1290            BookType::L2_MBP,
1291            Some(Rc::new(RefCell::new(cache))),
1292        );
1293        exchange.borrow_mut().initialize_account();
1294
1295        // Test adjust account, increase balance by 500 USD
1296        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1297
1298        // Check if we received two messages, one for initial account state and one for adjusted account state
1299        let messages = get_saved_messages::<AccountState>(handler);
1300        assert_eq!(messages.len(), 2);
1301        let account_state_first = messages.first().unwrap();
1302        let account_state_second = messages.last().unwrap();
1303
1304        assert_eq!(account_state_first.balances.len(), 1);
1305        let current_balance = account_state_first.balances[0];
1306        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1307        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1308        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1309
1310        assert_eq!(account_state_second.balances.len(), 1);
1311        let current_balance = account_state_second.balances[0];
1312        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1313        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1314        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1315    }
1316
1317    #[rstest]
1318    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1319        // Create 3 inflight commands with different timestamps and counters
1320        let inflight1 = InflightCommand::new(
1321            UnixNanos::from(100),
1322            1,
1323            create_submit_order_command(UnixNanos::from(100)),
1324        );
1325        let inflight2 = InflightCommand::new(
1326            UnixNanos::from(200),
1327            2,
1328            create_submit_order_command(UnixNanos::from(200)),
1329        );
1330        let inflight3 = InflightCommand::new(
1331            UnixNanos::from(100),
1332            2,
1333            create_submit_order_command(UnixNanos::from(100)),
1334        );
1335
1336        // Create a binary heap and push the inflight commands
1337        let mut inflight_heap = BinaryHeap::new();
1338        inflight_heap.push(inflight1);
1339        inflight_heap.push(inflight2);
1340        inflight_heap.push(inflight3);
1341
1342        // Pop the inflight commands and check if they are in the correct order
1343        // by our custom ordering with counter and timestamp
1344        let first = inflight_heap.pop().unwrap();
1345        let second = inflight_heap.pop().unwrap();
1346        let third = inflight_heap.pop().unwrap();
1347
1348        assert_eq!(first.ts, UnixNanos::from(100));
1349        assert_eq!(first.counter, 1);
1350        assert_eq!(second.ts, UnixNanos::from(100));
1351        assert_eq!(second.counter, 2);
1352        assert_eq!(third.ts, UnixNanos::from(200));
1353        assert_eq!(third.counter, 2);
1354    }
1355
1356    #[rstest]
1357    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1358        let exchange = get_exchange(
1359            Venue::new("BINANCE"),
1360            AccountType::Margin,
1361            BookType::L2_MBP,
1362            None,
1363        );
1364
1365        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1366        exchange.borrow_mut().add_instrument(instrument).unwrap();
1367
1368        let command1 = create_submit_order_command(UnixNanos::from(100));
1369        let command2 = create_submit_order_command(UnixNanos::from(200));
1370
1371        exchange.borrow_mut().send(command1);
1372        exchange.borrow_mut().send(command2);
1373
1374        // Verify that message queue has 2 commands and inflight queue is empty
1375        // as we are not using latency model
1376        assert_eq!(exchange.borrow().message_queue.len(), 2);
1377        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1378
1379        // Process command and check that queues is empty
1380        exchange.borrow_mut().process(UnixNanos::from(300));
1381        assert_eq!(exchange.borrow().message_queue.len(), 0);
1382        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1383    }
1384
1385    #[rstest]
1386    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1387        let latency_model = LatencyModel::new(
1388            UnixNanos::from(100),
1389            UnixNanos::from(200),
1390            UnixNanos::from(300),
1391            UnixNanos::from(100),
1392        );
1393        let exchange = get_exchange(
1394            Venue::new("BINANCE"),
1395            AccountType::Margin,
1396            BookType::L2_MBP,
1397            None,
1398        );
1399        exchange.borrow_mut().set_latency_model(latency_model);
1400
1401        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1402        exchange.borrow_mut().add_instrument(instrument).unwrap();
1403
1404        let command1 = create_submit_order_command(UnixNanos::from(100));
1405        let command2 = create_submit_order_command(UnixNanos::from(150));
1406        exchange.borrow_mut().send(command1);
1407        exchange.borrow_mut().send(command2);
1408
1409        // Verify that inflight queue has 2 commands and message queue is empty
1410        assert_eq!(exchange.borrow().message_queue.len(), 0);
1411        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1412        // First inflight command should have timestamp at 100 and 200 insert latency
1413        assert_eq!(
1414            exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1415            UnixNanos::from(300)
1416        );
1417        // Second inflight command should have timestamp at 150 and 200 insert latency
1418        assert_eq!(
1419            exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1420            UnixNanos::from(350)
1421        );
1422
1423        // Process at timestamp 350, and test that only first command is processed
1424        exchange.borrow_mut().process(UnixNanos::from(320));
1425        assert_eq!(exchange.borrow().message_queue.len(), 0);
1426        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1427        assert_eq!(
1428            exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1429            UnixNanos::from(350)
1430        );
1431    }
1432}