1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23 cell::RefCell,
24 collections::{BinaryHeap, HashMap, VecDeque},
25 fmt::Debug,
26 rc::Rc,
27};
28
29use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
30use nautilus_core::{
31 UnixNanos,
32 correctness::{FAILED, check_equal},
33};
34use nautilus_execution::{
35 client::ExecutionClient,
36 matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
37 models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40 accounts::AccountAny,
41 data::{
42 Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43 QuoteTick, TradeTick,
44 },
45 enums::{AccountType, BookType, OmsType},
46 identifiers::{InstrumentId, Venue},
47 instruments::{Instrument, InstrumentAny},
48 orderbook::OrderBook,
49 orders::PassiveOrderAny,
50 types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53
54use crate::modules::SimulationModule;
55
56#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61 ts: UnixNanos,
62 counter: u32,
63 command: TradingCommand,
64}
65
66impl InflightCommand {
67 const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68 Self {
69 ts,
70 counter,
71 command,
72 }
73 }
74}
75
76impl Ord for InflightCommand {
77 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78 other
80 .ts
81 .cmp(&self.ts)
82 .then_with(|| other.counter.cmp(&self.counter))
83 }
84}
85
86impl PartialOrd for InflightCommand {
87 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92pub struct SimulatedExchange {
93 pub id: Venue,
94 pub oms_type: OmsType,
95 pub account_type: AccountType,
96 starting_balances: Vec<Money>,
97 book_type: BookType,
98 default_leverage: Decimal,
99 exec_client: Option<Rc<dyn ExecutionClient>>,
100 pub base_currency: Option<Currency>,
101 fee_model: FeeModelAny,
102 fill_model: FillModel,
103 latency_model: Option<LatencyModel>,
104 instruments: HashMap<InstrumentId, InstrumentAny>,
105 matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
106 leverages: HashMap<InstrumentId, Decimal>,
107 modules: Vec<Box<dyn SimulationModule>>,
108 clock: Rc<RefCell<dyn Clock>>,
109 cache: Rc<RefCell<Cache>>,
110 message_queue: VecDeque<TradingCommand>,
111 inflight_queue: BinaryHeap<InflightCommand>,
112 inflight_counter: HashMap<UnixNanos, u32>,
113 frozen_account: bool,
114 bar_execution: bool,
115 reject_stop_orders: bool,
116 support_gtd_orders: bool,
117 support_contingent_orders: bool,
118 use_position_ids: bool,
119 use_random_ids: bool,
120 use_reduce_only: bool,
121 use_message_queue: bool,
122}
123
124impl Debug for SimulatedExchange {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct(stringify!(SimulatedExchange))
127 .field("id", &self.id)
128 .field("account_type", &self.account_type)
129 .finish()
130 }
131}
132
133impl SimulatedExchange {
134 #[allow(clippy::too_many_arguments)]
142 pub fn new(
143 venue: Venue,
144 oms_type: OmsType,
145 account_type: AccountType,
146 starting_balances: Vec<Money>,
147 base_currency: Option<Currency>,
148 default_leverage: Decimal,
149 leverages: HashMap<InstrumentId, Decimal>,
150 modules: Vec<Box<dyn SimulationModule>>,
151 cache: Rc<RefCell<Cache>>,
152 clock: Rc<RefCell<dyn Clock>>,
153 fill_model: FillModel,
154 fee_model: FeeModelAny,
155 book_type: BookType,
156 latency_model: Option<LatencyModel>,
157 frozen_account: Option<bool>,
158 bar_execution: Option<bool>,
159 reject_stop_orders: Option<bool>,
160 support_gtd_orders: Option<bool>,
161 support_contingent_orders: Option<bool>,
162 use_position_ids: Option<bool>,
163 use_random_ids: Option<bool>,
164 use_reduce_only: Option<bool>,
165 use_message_queue: Option<bool>,
166 ) -> anyhow::Result<Self> {
167 if starting_balances.is_empty() {
168 anyhow::bail!("Starting balances must be provided")
169 }
170 if base_currency.is_some() && starting_balances.len() > 1 {
171 anyhow::bail!("single-currency account has multiple starting currencies")
172 }
173 Ok(Self {
175 id: venue,
176 oms_type,
177 account_type,
178 starting_balances,
179 book_type,
180 default_leverage,
181 exec_client: None,
182 base_currency,
183 fee_model,
184 fill_model,
185 latency_model,
186 instruments: HashMap::new(),
187 matching_engines: HashMap::new(),
188 leverages,
189 modules,
190 clock,
191 cache,
192 message_queue: VecDeque::new(),
193 inflight_queue: BinaryHeap::new(),
194 inflight_counter: HashMap::new(),
195 frozen_account: frozen_account.unwrap_or(false),
196 bar_execution: bar_execution.unwrap_or(true),
197 reject_stop_orders: reject_stop_orders.unwrap_or(true),
198 support_gtd_orders: support_gtd_orders.unwrap_or(true),
199 support_contingent_orders: support_contingent_orders.unwrap_or(true),
200 use_position_ids: use_position_ids.unwrap_or(true),
201 use_random_ids: use_random_ids.unwrap_or(false),
202 use_reduce_only: use_reduce_only.unwrap_or(true),
203 use_message_queue: use_message_queue.unwrap_or(true),
204 })
205 }
206
207 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
208 self.exec_client = Some(client);
209 }
210
211 pub fn set_fill_model(&mut self, fill_model: FillModel) {
212 for matching_engine in self.matching_engines.values_mut() {
213 matching_engine.set_fill_model(fill_model.clone());
214 log::info!(
215 "Setting fill model for {} to {}",
216 matching_engine.venue,
217 self.fill_model
218 );
219 }
220 self.fill_model = fill_model;
221 }
222
223 pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
224 self.latency_model = Some(latency_model);
225 }
226
227 pub fn initialize_account(&mut self) {
228 self.generate_fresh_account_state();
229 }
230
231 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
241 check_equal(
242 &instrument.id().venue,
243 &self.id,
244 "Venue of instrument id",
245 "Venue of simulated exchange",
246 )
247 .expect(FAILED);
248
249 if self.account_type == AccountType::Cash
250 && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
251 || matches!(instrument, InstrumentAny::CryptoFuture(_)))
252 {
253 anyhow::bail!("Cash account cannot trade futures or perpetuals")
254 }
255
256 self.instruments.insert(instrument.id(), instrument.clone());
257
258 let matching_engine_config = OrderMatchingEngineConfig::new(
259 self.bar_execution,
260 self.reject_stop_orders,
261 self.support_gtd_orders,
262 self.support_contingent_orders,
263 self.use_position_ids,
264 self.use_random_ids,
265 self.use_reduce_only,
266 );
267 let instrument_id = instrument.id();
268 let matching_engine = OrderMatchingEngine::new(
269 instrument,
270 self.instruments.len() as u32,
271 self.fill_model.clone(),
272 self.fee_model.clone(),
273 self.book_type,
274 self.oms_type,
275 self.account_type,
276 self.clock.clone(),
277 Rc::clone(&self.cache),
278 matching_engine_config,
279 );
280 self.matching_engines.insert(instrument_id, matching_engine);
281
282 log::info!("Added instrument {instrument_id} and created matching engine");
283 Ok(())
284 }
285
286 #[must_use]
287 pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
288 self.matching_engines
289 .get(&instrument_id)
290 .and_then(OrderMatchingEngine::best_bid_price)
291 }
292
293 #[must_use]
294 pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
295 self.matching_engines
296 .get(&instrument_id)
297 .and_then(OrderMatchingEngine::best_ask_price)
298 }
299
300 pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
301 self.matching_engines
302 .get(&instrument_id)
303 .map(OrderMatchingEngine::get_book)
304 }
305
306 #[must_use]
307 pub fn get_matching_engine(
308 &self,
309 instrument_id: &InstrumentId,
310 ) -> Option<&OrderMatchingEngine> {
311 self.matching_engines.get(instrument_id)
312 }
313
314 #[must_use]
315 pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
316 &self.matching_engines
317 }
318
319 #[must_use]
320 pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
321 let mut books = HashMap::new();
322 for (instrument_id, matching_engine) in &self.matching_engines {
323 books.insert(*instrument_id, matching_engine.get_book().clone());
324 }
325 books
326 }
327
328 #[must_use]
329 pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
330 instrument_id
331 .and_then(|id| {
332 self.matching_engines
333 .get(&id)
334 .map(OrderMatchingEngine::get_open_orders)
335 })
336 .unwrap_or_else(|| {
337 self.matching_engines
338 .values()
339 .flat_map(OrderMatchingEngine::get_open_orders)
340 .collect()
341 })
342 }
343
344 #[must_use]
345 pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
346 instrument_id
347 .and_then(|id| {
348 self.matching_engines
349 .get(&id)
350 .map(|engine| engine.get_open_bid_orders().to_vec())
351 })
352 .unwrap_or_else(|| {
353 self.matching_engines
354 .values()
355 .flat_map(|engine| engine.get_open_bid_orders().to_vec())
356 .collect()
357 })
358 }
359
360 #[must_use]
361 pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
362 instrument_id
363 .and_then(|id| {
364 self.matching_engines
365 .get(&id)
366 .map(|engine| engine.get_open_ask_orders().to_vec())
367 })
368 .unwrap_or_else(|| {
369 self.matching_engines
370 .values()
371 .flat_map(|engine| engine.get_open_ask_orders().to_vec())
372 .collect()
373 })
374 }
375
376 #[must_use]
380 pub fn get_account(&self) -> Option<AccountAny> {
381 self.exec_client
382 .as_ref()
383 .map(|client| client.get_account().unwrap())
384 }
385
386 pub fn adjust_account(&mut self, adjustment: Money) {
390 if self.frozen_account {
391 return;
393 }
394
395 if let Some(exec_client) = &self.exec_client {
396 let venue = exec_client.venue();
397 println!("Adjusting account for venue {venue}");
398 if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
399 match account.balance(Some(adjustment.currency)) {
400 Some(balance) => {
401 let mut current_balance = *balance;
402 current_balance.total += adjustment;
403 current_balance.free += adjustment;
404
405 let margins = match account {
406 AccountAny::Margin(margin_account) => margin_account.margins.clone(),
407 _ => HashMap::new(),
408 };
409
410 if let Some(exec_client) = &self.exec_client {
411 exec_client
412 .generate_account_state(
413 vec![current_balance],
414 margins.values().copied().collect(),
415 true,
416 self.clock.borrow().timestamp_ns(),
417 )
418 .unwrap();
419 }
420 }
421 None => {
422 log::error!(
423 "Cannot adjust account: no balance for currency {}",
424 adjustment.currency
425 );
426 }
427 }
428 } else {
429 log::error!("Cannot adjust account: no account for venue {venue}");
430 }
431 }
432 }
433
434 pub fn send(&mut self, command: TradingCommand) {
435 if !self.use_message_queue {
436 self.process_trading_command(command);
437 } else if self.latency_model.is_none() {
438 self.message_queue.push_back(command);
439 } else {
440 let (ts, counter) = self.generate_inflight_command(&command);
441 self.inflight_queue
442 .push(InflightCommand::new(ts, counter, command));
443 }
444 }
445
446 pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
450 if let Some(latency_model) = &self.latency_model {
451 let ts = match command {
452 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
453 command.ts_init() + latency_model.insert_latency_nanos
454 }
455 TradingCommand::ModifyOrder(_) => {
456 command.ts_init() + latency_model.update_latency_nanos
457 }
458 TradingCommand::CancelOrder(_)
459 | TradingCommand::CancelAllOrders(_)
460 | TradingCommand::BatchCancelOrders(_) => {
461 command.ts_init() + latency_model.delete_latency_nanos
462 }
463 _ => panic!("Invalid command was {command}"),
464 };
465
466 let counter = self
467 .inflight_counter
468 .entry(ts)
469 .and_modify(|e| *e += 1)
470 .or_insert(1);
471
472 (ts, *counter)
473 } else {
474 panic!("Latency model should be initialized");
475 }
476 }
477
478 pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
482 for module in &self.modules {
483 module.pre_process(Data::Delta(delta));
484 }
485
486 if !self.matching_engines.contains_key(&delta.instrument_id) {
487 let instrument = {
488 let cache = self.cache.as_ref().borrow();
489 cache.instrument(&delta.instrument_id).cloned()
490 };
491
492 if let Some(instrument) = instrument {
493 self.add_instrument(instrument).unwrap();
494 } else {
495 panic!(
496 "No matching engine found for instrument {}",
497 delta.instrument_id
498 );
499 }
500 }
501
502 if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
503 matching_engine.process_order_book_delta(&delta);
504 } else {
505 panic!("Matching engine should be initialized");
506 }
507 }
508
509 pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
513 for module in &self.modules {
514 module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
515 }
516
517 if !self.matching_engines.contains_key(&deltas.instrument_id) {
518 let instrument = {
519 let cache = self.cache.as_ref().borrow();
520 cache.instrument(&deltas.instrument_id).cloned()
521 };
522
523 if let Some(instrument) = instrument {
524 self.add_instrument(instrument).unwrap();
525 } else {
526 panic!(
527 "No matching engine found for instrument {}",
528 deltas.instrument_id
529 );
530 }
531 }
532
533 if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
534 matching_engine.process_order_book_deltas(&deltas);
535 } else {
536 panic!("Matching engine should be initialized");
537 }
538 }
539
540 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
544 for module in &self.modules {
545 module.pre_process(Data::Quote(quote.to_owned()));
546 }
547
548 if !self.matching_engines.contains_key("e.instrument_id) {
549 let instrument = {
550 let cache = self.cache.as_ref().borrow();
551 cache.instrument("e.instrument_id).cloned()
552 };
553
554 if let Some(instrument) = instrument {
555 self.add_instrument(instrument).unwrap();
556 } else {
557 panic!(
558 "No matching engine found for instrument {}",
559 quote.instrument_id
560 );
561 }
562 }
563
564 if let Some(matching_engine) = self.matching_engines.get_mut("e.instrument_id) {
565 matching_engine.process_quote_tick(quote);
566 } else {
567 panic!("Matching engine should be initialized");
568 }
569 }
570
571 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
575 for module in &self.modules {
576 module.pre_process(Data::Trade(trade.to_owned()));
577 }
578
579 if !self.matching_engines.contains_key(&trade.instrument_id) {
580 let instrument = {
581 let cache = self.cache.as_ref().borrow();
582 cache.instrument(&trade.instrument_id).cloned()
583 };
584
585 if let Some(instrument) = instrument {
586 self.add_instrument(instrument).unwrap();
587 } else {
588 panic!(
589 "No matching engine found for instrument {}",
590 trade.instrument_id
591 );
592 }
593 }
594
595 if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
596 matching_engine.process_trade_tick(trade);
597 } else {
598 panic!("Matching engine should be initialized");
599 }
600 }
601
602 pub fn process_bar(&mut self, bar: Bar) {
606 for module in &self.modules {
607 module.pre_process(Data::Bar(bar));
608 }
609
610 if !self.matching_engines.contains_key(&bar.instrument_id()) {
611 let instrument = {
612 let cache = self.cache.as_ref().borrow();
613 cache.instrument(&bar.instrument_id()).cloned()
614 };
615
616 if let Some(instrument) = instrument {
617 self.add_instrument(instrument).unwrap();
618 } else {
619 panic!(
620 "No matching engine found for instrument {}",
621 bar.instrument_id()
622 );
623 }
624 }
625
626 if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
627 matching_engine.process_bar(&bar);
628 } else {
629 panic!("Matching engine should be initialized");
630 }
631 }
632
633 pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
637 if !self.matching_engines.contains_key(&status.instrument_id) {
640 let instrument = {
641 let cache = self.cache.as_ref().borrow();
642 cache.instrument(&status.instrument_id).cloned()
643 };
644
645 if let Some(instrument) = instrument {
646 self.add_instrument(instrument).unwrap();
647 } else {
648 panic!(
649 "No matching engine found for instrument {}",
650 status.instrument_id
651 );
652 }
653 }
654
655 if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
656 matching_engine.process_status(status.action);
657 } else {
658 panic!("Matching engine should be initialized");
659 }
660 }
661
662 pub fn process(&mut self, ts_now: UnixNanos) {
666 while let Some(inflight) = self.inflight_queue.peek() {
670 if inflight.ts > ts_now {
671 break;
673 }
674 let inflight = self.inflight_queue.pop().unwrap();
676 self.process_trading_command(inflight.command);
677 }
678
679 while let Some(command) = self.message_queue.pop_front() {
681 self.process_trading_command(command);
682 }
683 }
684
685 pub fn reset(&mut self) {
686 for module in &self.modules {
687 module.reset();
688 }
689
690 self.generate_fresh_account_state();
691
692 for matching_engine in self.matching_engines.values_mut() {
693 matching_engine.reset();
694 }
695
696 log::info!("Resetting exchange state");
698 }
699
700 pub fn process_trading_command(&mut self, command: TradingCommand) {
704 if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
705 let account_id = if let Some(exec_client) = &self.exec_client {
706 exec_client.account_id()
707 } else {
708 panic!("Execution client should be initialized");
709 };
710 match command {
711 TradingCommand::SubmitOrder(mut command) => {
712 matching_engine.process_order(&mut command.order, account_id);
713 }
714 TradingCommand::ModifyOrder(ref command) => {
715 matching_engine.process_modify(command, account_id);
716 }
717 TradingCommand::CancelOrder(ref command) => {
718 matching_engine.process_cancel(command, account_id);
719 }
720 TradingCommand::CancelAllOrders(ref command) => {
721 matching_engine.process_cancel_all(command, account_id);
722 }
723 TradingCommand::BatchCancelOrders(ref command) => {
724 matching_engine.process_batch_cancel(command, account_id);
725 }
726 TradingCommand::SubmitOrderList(mut command) => {
727 for order in &mut command.order_list.orders {
728 matching_engine.process_order(order, account_id);
729 }
730 }
731 _ => {}
732 }
733 } else {
734 panic!("Matching engine should be initialized");
735 }
736 }
737
738 pub fn generate_fresh_account_state(&self) {
742 let balances: Vec<AccountBalance> = self
743 .starting_balances
744 .iter()
745 .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
746 .collect();
747
748 if let Some(exec_client) = &self.exec_client {
749 exec_client
750 .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
751 .unwrap();
752 }
753
754 if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
756 margin_account.set_default_leverage(self.default_leverage.to_f64().unwrap());
757
758 for (instrument_id, leverage) in &self.leverages {
760 margin_account.set_leverage(*instrument_id, leverage.to_f64().unwrap());
761 }
762 }
763 }
764}
765
766#[cfg(test)]
770mod tests {
771 use std::{
772 cell::RefCell,
773 collections::{BinaryHeap, HashMap},
774 rc::Rc,
775 sync::LazyLock,
776 };
777
778 use nautilus_common::{
779 cache::Cache,
780 clock::TestClock,
781 messages::execution::{SubmitOrder, TradingCommand},
782 msgbus::{
783 self,
784 stubs::{get_message_saving_handler, get_saved_messages},
785 },
786 };
787 use nautilus_core::{AtomicTime, UUID4, UnixNanos};
788 use nautilus_execution::models::{
789 fee::{FeeModelAny, MakerTakerFeeModel},
790 fill::FillModel,
791 latency::LatencyModel,
792 };
793 use nautilus_model::{
794 accounts::{AccountAny, MarginAccount},
795 data::{
796 Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
797 TradeTick,
798 },
799 enums::{
800 AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
801 OmsType, OrderSide, OrderType,
802 },
803 events::AccountState,
804 identifiers::{
805 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
806 VenueOrderId,
807 },
808 instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
809 orders::OrderTestBuilder,
810 types::{AccountBalance, Currency, Money, Price, Quantity},
811 };
812 use rstest::rstest;
813
814 use crate::{
815 exchange::{InflightCommand, SimulatedExchange},
816 execution_client::BacktestExecutionClient,
817 };
818
819 static ATOMIC_TIME: LazyLock<AtomicTime> =
820 LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
821
822 fn get_exchange(
823 venue: Venue,
824 account_type: AccountType,
825 book_type: BookType,
826 cache: Option<Rc<RefCell<Cache>>>,
827 ) -> Rc<RefCell<SimulatedExchange>> {
828 let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
829 let clock = Rc::new(RefCell::new(TestClock::new()));
830 let exchange = Rc::new(RefCell::new(
831 SimulatedExchange::new(
832 venue,
833 OmsType::Netting,
834 account_type,
835 vec![Money::new(1000.0, Currency::USD())],
836 None,
837 1.into(),
838 HashMap::new(),
839 vec![],
840 cache.clone(),
841 clock,
842 FillModel::default(),
843 FeeModelAny::MakerTaker(MakerTakerFeeModel),
844 book_type,
845 None,
846 None,
847 None,
848 None,
849 None,
850 None,
851 None,
852 None,
853 None,
854 None,
855 )
856 .unwrap(),
857 ));
858
859 let clock = TestClock::new();
860 let execution_client = BacktestExecutionClient::new(
861 TraderId::default(),
862 AccountId::default(),
863 exchange.clone(),
864 cache,
865 Rc::new(RefCell::new(clock)),
866 None,
867 None,
868 );
869 exchange
870 .borrow_mut()
871 .register_client(Rc::new(execution_client));
872
873 exchange
874 }
875
876 fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
877 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
878 let order = OrderTestBuilder::new(OrderType::Market)
879 .instrument_id(instrument_id)
880 .quantity(Quantity::from(1))
881 .build();
882 TradingCommand::SubmitOrder(
883 SubmitOrder::new(
884 TraderId::default(),
885 ClientId::default(),
886 StrategyId::default(),
887 instrument_id,
888 ClientOrderId::default(),
889 VenueOrderId::default(),
890 order,
891 None,
892 None,
893 UUID4::default(),
894 ts_init,
895 )
896 .unwrap(),
897 )
898 }
899
900 #[rstest]
901 #[should_panic(
902 expected = r#"Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"#
903 )]
904 fn test_venue_mismatch_between_exchange_and_instrument(
905 crypto_perpetual_ethusdt: CryptoPerpetual,
906 ) {
907 let exchange = get_exchange(
908 Venue::new("SIM"),
909 AccountType::Margin,
910 BookType::L1_MBP,
911 None,
912 );
913 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
914 exchange.borrow_mut().add_instrument(instrument).unwrap();
915 }
916
917 #[rstest]
918 #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
919 fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
920 let exchange = get_exchange(
921 Venue::new("BINANCE"),
922 AccountType::Cash,
923 BookType::L1_MBP,
924 None,
925 );
926 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
927 exchange.borrow_mut().add_instrument(instrument).unwrap();
928 }
929
930 #[rstest]
931 fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
932 let exchange = get_exchange(
933 Venue::new("BINANCE"),
934 AccountType::Margin,
935 BookType::L1_MBP,
936 None,
937 );
938 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
939
940 exchange.borrow_mut().add_instrument(instrument).unwrap();
942
943 let quote_tick = QuoteTick::new(
945 crypto_perpetual_ethusdt.id,
946 Price::from("1000"),
947 Price::from("1001"),
948 Quantity::from(1),
949 Quantity::from(1),
950 UnixNanos::default(),
951 UnixNanos::default(),
952 );
953 exchange.borrow_mut().process_quote_tick("e_tick);
954
955 let best_bid_price = exchange
956 .borrow()
957 .best_bid_price(crypto_perpetual_ethusdt.id);
958 assert_eq!(best_bid_price, Some(Price::from("1000")));
959 let best_ask_price = exchange
960 .borrow()
961 .best_ask_price(crypto_perpetual_ethusdt.id);
962 assert_eq!(best_ask_price, Some(Price::from("1001")));
963 }
964
965 #[rstest]
966 fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
967 let exchange = get_exchange(
968 Venue::new("BINANCE"),
969 AccountType::Margin,
970 BookType::L1_MBP,
971 None,
972 );
973 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
974
975 exchange.borrow_mut().add_instrument(instrument).unwrap();
977
978 let trade_tick = TradeTick::new(
980 crypto_perpetual_ethusdt.id,
981 Price::from("1000"),
982 Quantity::from(1),
983 AggressorSide::Buyer,
984 TradeId::from("1"),
985 UnixNanos::default(),
986 UnixNanos::default(),
987 );
988 exchange.borrow_mut().process_trade_tick(&trade_tick);
989
990 let best_bid_price = exchange
991 .borrow()
992 .best_bid_price(crypto_perpetual_ethusdt.id);
993 assert_eq!(best_bid_price, Some(Price::from("1000")));
994 let best_ask = exchange
995 .borrow()
996 .best_ask_price(crypto_perpetual_ethusdt.id);
997 assert_eq!(best_ask, Some(Price::from("1000")));
998 }
999
1000 #[rstest]
1001 fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1002 let exchange = get_exchange(
1003 Venue::new("BINANCE"),
1004 AccountType::Margin,
1005 BookType::L1_MBP,
1006 None,
1007 );
1008 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1009
1010 exchange.borrow_mut().add_instrument(instrument).unwrap();
1012
1013 let bar = Bar::new(
1015 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1016 Price::from("1500.00"),
1017 Price::from("1505.00"),
1018 Price::from("1490.00"),
1019 Price::from("1502.00"),
1020 Quantity::from(100),
1021 UnixNanos::default(),
1022 UnixNanos::default(),
1023 );
1024 exchange.borrow_mut().process_bar(bar);
1025
1026 let best_bid_price = exchange
1028 .borrow()
1029 .best_bid_price(crypto_perpetual_ethusdt.id);
1030 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1031 let best_ask_price = exchange
1032 .borrow()
1033 .best_ask_price(crypto_perpetual_ethusdt.id);
1034 assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1035 }
1036
1037 #[rstest]
1038 fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1039 let exchange = get_exchange(
1040 Venue::new("BINANCE"),
1041 AccountType::Margin,
1042 BookType::L1_MBP,
1043 None,
1044 );
1045 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1046
1047 exchange.borrow_mut().add_instrument(instrument).unwrap();
1049
1050 let bar_bid = Bar::new(
1053 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1054 Price::from("1500.00"),
1055 Price::from("1505.00"),
1056 Price::from("1490.00"),
1057 Price::from("1502.00"),
1058 Quantity::from(100),
1059 UnixNanos::from(1),
1060 UnixNanos::from(1),
1061 );
1062 let bar_ask = Bar::new(
1063 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1064 Price::from("1501.00"),
1065 Price::from("1506.00"),
1066 Price::from("1491.00"),
1067 Price::from("1503.00"),
1068 Quantity::from(100),
1069 UnixNanos::from(1),
1070 UnixNanos::from(1),
1071 );
1072
1073 exchange.borrow_mut().process_bar(bar_bid);
1075 exchange.borrow_mut().process_bar(bar_ask);
1076
1077 let best_bid_price = exchange
1079 .borrow()
1080 .best_bid_price(crypto_perpetual_ethusdt.id);
1081 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1082 let best_ask_price = exchange
1083 .borrow()
1084 .best_ask_price(crypto_perpetual_ethusdt.id);
1085 assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1086 }
1087
1088 #[rstest]
1089 fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1090 let exchange = get_exchange(
1091 Venue::new("BINANCE"),
1092 AccountType::Margin,
1093 BookType::L2_MBP,
1094 None,
1095 );
1096 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1097
1098 exchange.borrow_mut().add_instrument(instrument).unwrap();
1100
1101 let delta_buy = OrderBookDelta::new(
1103 crypto_perpetual_ethusdt.id,
1104 BookAction::Add,
1105 BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1106 0,
1107 0,
1108 UnixNanos::from(1),
1109 UnixNanos::from(1),
1110 );
1111 let delta_sell = OrderBookDelta::new(
1112 crypto_perpetual_ethusdt.id,
1113 BookAction::Add,
1114 BookOrder::new(
1115 OrderSide::Sell,
1116 Price::from("1001.00"),
1117 Quantity::from(1),
1118 1,
1119 ),
1120 0,
1121 1,
1122 UnixNanos::from(2),
1123 UnixNanos::from(2),
1124 );
1125
1126 exchange.borrow_mut().process_order_book_delta(delta_buy);
1128 exchange.borrow_mut().process_order_book_delta(delta_sell);
1129
1130 let book = exchange
1131 .borrow()
1132 .get_book(crypto_perpetual_ethusdt.id)
1133 .unwrap()
1134 .clone();
1135 assert_eq!(book.update_count, 2);
1136 assert_eq!(book.sequence, 1);
1137 assert_eq!(book.ts_last, UnixNanos::from(2));
1138 let best_bid_price = exchange
1139 .borrow()
1140 .best_bid_price(crypto_perpetual_ethusdt.id);
1141 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1142 let best_ask_price = exchange
1143 .borrow()
1144 .best_ask_price(crypto_perpetual_ethusdt.id);
1145 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1146 }
1147
1148 #[rstest]
1149 fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1150 let exchange = get_exchange(
1151 Venue::new("BINANCE"),
1152 AccountType::Margin,
1153 BookType::L2_MBP,
1154 None,
1155 );
1156 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1157
1158 exchange.borrow_mut().add_instrument(instrument).unwrap();
1160
1161 let delta_sell_1 = OrderBookDelta::new(
1163 crypto_perpetual_ethusdt.id,
1164 BookAction::Add,
1165 BookOrder::new(
1166 OrderSide::Sell,
1167 Price::from("1000.00"),
1168 Quantity::from(3),
1169 1,
1170 ),
1171 0,
1172 0,
1173 UnixNanos::from(1),
1174 UnixNanos::from(1),
1175 );
1176 let delta_sell_2 = OrderBookDelta::new(
1177 crypto_perpetual_ethusdt.id,
1178 BookAction::Add,
1179 BookOrder::new(
1180 OrderSide::Sell,
1181 Price::from("1001.00"),
1182 Quantity::from(1),
1183 1,
1184 ),
1185 0,
1186 1,
1187 UnixNanos::from(1),
1188 UnixNanos::from(1),
1189 );
1190 let orderbook_deltas = OrderBookDeltas::new(
1191 crypto_perpetual_ethusdt.id,
1192 vec![delta_sell_1, delta_sell_2],
1193 );
1194
1195 exchange
1197 .borrow_mut()
1198 .process_order_book_deltas(orderbook_deltas);
1199
1200 let book = exchange
1201 .borrow()
1202 .get_book(crypto_perpetual_ethusdt.id)
1203 .unwrap()
1204 .clone();
1205 assert_eq!(book.update_count, 2);
1206 assert_eq!(book.sequence, 1);
1207 assert_eq!(book.ts_last, UnixNanos::from(1));
1208 let best_bid_price = exchange
1209 .borrow()
1210 .best_bid_price(crypto_perpetual_ethusdt.id);
1211 assert_eq!(best_bid_price, None);
1213 let best_ask_price = exchange
1214 .borrow()
1215 .best_ask_price(crypto_perpetual_ethusdt.id);
1216 assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1218 }
1219
1220 #[rstest]
1221 fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1222 let exchange = get_exchange(
1223 Venue::new("BINANCE"),
1224 AccountType::Margin,
1225 BookType::L2_MBP,
1226 None,
1227 );
1228 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1229
1230 exchange.borrow_mut().add_instrument(instrument).unwrap();
1232
1233 let instrument_status = InstrumentStatus::new(
1234 crypto_perpetual_ethusdt.id,
1235 MarketStatusAction::Close, UnixNanos::from(1),
1237 UnixNanos::from(1),
1238 None,
1239 None,
1240 None,
1241 None,
1242 None,
1243 );
1244
1245 exchange
1246 .borrow_mut()
1247 .process_instrument_status(instrument_status);
1248
1249 let market_status = exchange
1250 .borrow()
1251 .get_matching_engine(&crypto_perpetual_ethusdt.id)
1252 .unwrap()
1253 .market_status;
1254 assert_eq!(market_status, MarketStatus::Closed);
1255 }
1256
1257 #[rstest]
1258 fn test_accounting() {
1259 let account_type = AccountType::Margin;
1260 let mut cache = Cache::default();
1261 let handler = get_message_saving_handler::<AccountState>(None);
1262 msgbus::register("Portfolio.update_account".into(), handler.clone());
1263 let margin_account = MarginAccount::new(
1264 AccountState::new(
1265 AccountId::from("SIM-001"),
1266 account_type,
1267 vec![AccountBalance::new(
1268 Money::from("1000 USD"),
1269 Money::from("0 USD"),
1270 Money::from("1000 USD"),
1271 )],
1272 vec![],
1273 false,
1274 UUID4::default(),
1275 UnixNanos::default(),
1276 UnixNanos::default(),
1277 None,
1278 ),
1279 false,
1280 );
1281 let () = cache
1282 .add_account(AccountAny::Margin(margin_account))
1283 .unwrap();
1284 cache.build_index();
1286
1287 let exchange = get_exchange(
1288 Venue::new("SIM"),
1289 account_type,
1290 BookType::L2_MBP,
1291 Some(Rc::new(RefCell::new(cache))),
1292 );
1293 exchange.borrow_mut().initialize_account();
1294
1295 exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1297
1298 let messages = get_saved_messages::<AccountState>(handler);
1300 assert_eq!(messages.len(), 2);
1301 let account_state_first = messages.first().unwrap();
1302 let account_state_second = messages.last().unwrap();
1303
1304 assert_eq!(account_state_first.balances.len(), 1);
1305 let current_balance = account_state_first.balances[0];
1306 assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1307 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1308 assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1309
1310 assert_eq!(account_state_second.balances.len(), 1);
1311 let current_balance = account_state_second.balances[0];
1312 assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1313 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1314 assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1315 }
1316
1317 #[rstest]
1318 fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1319 let inflight1 = InflightCommand::new(
1321 UnixNanos::from(100),
1322 1,
1323 create_submit_order_command(UnixNanos::from(100)),
1324 );
1325 let inflight2 = InflightCommand::new(
1326 UnixNanos::from(200),
1327 2,
1328 create_submit_order_command(UnixNanos::from(200)),
1329 );
1330 let inflight3 = InflightCommand::new(
1331 UnixNanos::from(100),
1332 2,
1333 create_submit_order_command(UnixNanos::from(100)),
1334 );
1335
1336 let mut inflight_heap = BinaryHeap::new();
1338 inflight_heap.push(inflight1);
1339 inflight_heap.push(inflight2);
1340 inflight_heap.push(inflight3);
1341
1342 let first = inflight_heap.pop().unwrap();
1345 let second = inflight_heap.pop().unwrap();
1346 let third = inflight_heap.pop().unwrap();
1347
1348 assert_eq!(first.ts, UnixNanos::from(100));
1349 assert_eq!(first.counter, 1);
1350 assert_eq!(second.ts, UnixNanos::from(100));
1351 assert_eq!(second.counter, 2);
1352 assert_eq!(third.ts, UnixNanos::from(200));
1353 assert_eq!(third.counter, 2);
1354 }
1355
1356 #[rstest]
1357 fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1358 let exchange = get_exchange(
1359 Venue::new("BINANCE"),
1360 AccountType::Margin,
1361 BookType::L2_MBP,
1362 None,
1363 );
1364
1365 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1366 exchange.borrow_mut().add_instrument(instrument).unwrap();
1367
1368 let command1 = create_submit_order_command(UnixNanos::from(100));
1369 let command2 = create_submit_order_command(UnixNanos::from(200));
1370
1371 exchange.borrow_mut().send(command1);
1372 exchange.borrow_mut().send(command2);
1373
1374 assert_eq!(exchange.borrow().message_queue.len(), 2);
1377 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1378
1379 exchange.borrow_mut().process(UnixNanos::from(300));
1381 assert_eq!(exchange.borrow().message_queue.len(), 0);
1382 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1383 }
1384
1385 #[rstest]
1386 fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1387 let latency_model = LatencyModel::new(
1388 UnixNanos::from(100),
1389 UnixNanos::from(200),
1390 UnixNanos::from(300),
1391 UnixNanos::from(100),
1392 );
1393 let exchange = get_exchange(
1394 Venue::new("BINANCE"),
1395 AccountType::Margin,
1396 BookType::L2_MBP,
1397 None,
1398 );
1399 exchange.borrow_mut().set_latency_model(latency_model);
1400
1401 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1402 exchange.borrow_mut().add_instrument(instrument).unwrap();
1403
1404 let command1 = create_submit_order_command(UnixNanos::from(100));
1405 let command2 = create_submit_order_command(UnixNanos::from(150));
1406 exchange.borrow_mut().send(command1);
1407 exchange.borrow_mut().send(command2);
1408
1409 assert_eq!(exchange.borrow().message_queue.len(), 0);
1411 assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1412 assert_eq!(
1414 exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1415 UnixNanos::from(300)
1416 );
1417 assert_eq!(
1419 exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1420 UnixNanos::from(350)
1421 );
1422
1423 exchange.borrow_mut().process(UnixNanos::from(320));
1425 assert_eq!(exchange.borrow().message_queue.len(), 0);
1426 assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1427 assert_eq!(
1428 exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1429 UnixNanos::from(350)
1430 );
1431 }
1432}