1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23 cell::RefCell,
24 collections::{BinaryHeap, HashMap, VecDeque},
25 fmt::Debug,
26 rc::Rc,
27};
28
29use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
30use nautilus_core::{
31 UnixNanos,
32 correctness::{FAILED, check_equal},
33};
34use nautilus_execution::{
35 client::ExecutionClient,
36 matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
37 models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
38};
39use nautilus_model::{
40 accounts::AccountAny,
41 data::{
42 Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
43 QuoteTick, TradeTick,
44 },
45 enums::{AccountType, BookType, OmsType},
46 identifiers::{InstrumentId, Venue},
47 instruments::{Instrument, InstrumentAny},
48 orderbook::OrderBook,
49 orders::PassiveOrderAny,
50 types::{AccountBalance, Currency, Money, Price},
51};
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53
54use crate::modules::SimulationModule;
55
56#[derive(Debug, Eq, PartialEq)]
60struct InflightCommand {
61 ts: UnixNanos,
62 counter: u32,
63 command: TradingCommand,
64}
65
66impl InflightCommand {
67 const fn new(ts: UnixNanos, counter: u32, command: TradingCommand) -> Self {
68 Self {
69 ts,
70 counter,
71 command,
72 }
73 }
74}
75
76impl Ord for InflightCommand {
77 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78 other
80 .ts
81 .cmp(&self.ts)
82 .then_with(|| other.counter.cmp(&self.counter))
83 }
84}
85
86impl PartialOrd for InflightCommand {
87 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92pub struct SimulatedExchange {
108 pub id: Venue,
109 pub oms_type: OmsType,
110 pub account_type: AccountType,
111 starting_balances: Vec<Money>,
112 book_type: BookType,
113 default_leverage: Decimal,
114 exec_client: Option<Rc<dyn ExecutionClient>>,
115 pub base_currency: Option<Currency>,
116 fee_model: FeeModelAny,
117 fill_model: FillModel,
118 latency_model: Option<LatencyModel>,
119 instruments: HashMap<InstrumentId, InstrumentAny>,
120 matching_engines: HashMap<InstrumentId, OrderMatchingEngine>,
121 leverages: HashMap<InstrumentId, Decimal>,
122 modules: Vec<Box<dyn SimulationModule>>,
123 clock: Rc<RefCell<dyn Clock>>,
124 cache: Rc<RefCell<Cache>>,
125 message_queue: VecDeque<TradingCommand>,
126 inflight_queue: BinaryHeap<InflightCommand>,
127 inflight_counter: HashMap<UnixNanos, u32>,
128 frozen_account: bool,
129 bar_execution: bool,
130 reject_stop_orders: bool,
131 support_gtd_orders: bool,
132 support_contingent_orders: bool,
133 use_position_ids: bool,
134 use_random_ids: bool,
135 use_reduce_only: bool,
136 use_message_queue: bool,
137}
138
139impl Debug for SimulatedExchange {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct(stringify!(SimulatedExchange))
142 .field("id", &self.id)
143 .field("account_type", &self.account_type)
144 .finish()
145 }
146}
147
148impl SimulatedExchange {
149 #[allow(clippy::too_many_arguments)]
157 pub fn new(
158 venue: Venue,
159 oms_type: OmsType,
160 account_type: AccountType,
161 starting_balances: Vec<Money>,
162 base_currency: Option<Currency>,
163 default_leverage: Decimal,
164 leverages: HashMap<InstrumentId, Decimal>,
165 modules: Vec<Box<dyn SimulationModule>>,
166 cache: Rc<RefCell<Cache>>,
167 clock: Rc<RefCell<dyn Clock>>,
168 fill_model: FillModel,
169 fee_model: FeeModelAny,
170 book_type: BookType,
171 latency_model: Option<LatencyModel>,
172 frozen_account: Option<bool>,
173 bar_execution: Option<bool>,
174 reject_stop_orders: Option<bool>,
175 support_gtd_orders: Option<bool>,
176 support_contingent_orders: Option<bool>,
177 use_position_ids: Option<bool>,
178 use_random_ids: Option<bool>,
179 use_reduce_only: Option<bool>,
180 use_message_queue: Option<bool>,
181 ) -> anyhow::Result<Self> {
182 if starting_balances.is_empty() {
183 anyhow::bail!("Starting balances must be provided")
184 }
185 if base_currency.is_some() && starting_balances.len() > 1 {
186 anyhow::bail!("single-currency account has multiple starting currencies")
187 }
188 Ok(Self {
190 id: venue,
191 oms_type,
192 account_type,
193 starting_balances,
194 book_type,
195 default_leverage,
196 exec_client: None,
197 base_currency,
198 fee_model,
199 fill_model,
200 latency_model,
201 instruments: HashMap::new(),
202 matching_engines: HashMap::new(),
203 leverages,
204 modules,
205 clock,
206 cache,
207 message_queue: VecDeque::new(),
208 inflight_queue: BinaryHeap::new(),
209 inflight_counter: HashMap::new(),
210 frozen_account: frozen_account.unwrap_or(false),
211 bar_execution: bar_execution.unwrap_or(true),
212 reject_stop_orders: reject_stop_orders.unwrap_or(true),
213 support_gtd_orders: support_gtd_orders.unwrap_or(true),
214 support_contingent_orders: support_contingent_orders.unwrap_or(true),
215 use_position_ids: use_position_ids.unwrap_or(true),
216 use_random_ids: use_random_ids.unwrap_or(false),
217 use_reduce_only: use_reduce_only.unwrap_or(true),
218 use_message_queue: use_message_queue.unwrap_or(true),
219 })
220 }
221
222 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
223 self.exec_client = Some(client);
224 }
225
226 pub fn set_fill_model(&mut self, fill_model: FillModel) {
227 for matching_engine in self.matching_engines.values_mut() {
228 matching_engine.set_fill_model(fill_model.clone());
229 log::info!(
230 "Setting fill model for {} to {}",
231 matching_engine.venue,
232 self.fill_model
233 );
234 }
235 self.fill_model = fill_model;
236 }
237
238 pub const fn set_latency_model(&mut self, latency_model: LatencyModel) {
239 self.latency_model = Some(latency_model);
240 }
241
242 pub fn initialize_account(&mut self) {
243 self.generate_fresh_account_state();
244 }
245
246 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
256 check_equal(
257 &instrument.id().venue,
258 &self.id,
259 "Venue of instrument id",
260 "Venue of simulated exchange",
261 )
262 .expect(FAILED);
263
264 if self.account_type == AccountType::Cash
265 && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
266 || matches!(instrument, InstrumentAny::CryptoFuture(_)))
267 {
268 anyhow::bail!("Cash account cannot trade futures or perpetuals")
269 }
270
271 self.instruments.insert(instrument.id(), instrument.clone());
272
273 let matching_engine_config = OrderMatchingEngineConfig::new(
274 self.bar_execution,
275 self.reject_stop_orders,
276 self.support_gtd_orders,
277 self.support_contingent_orders,
278 self.use_position_ids,
279 self.use_random_ids,
280 self.use_reduce_only,
281 );
282 let instrument_id = instrument.id();
283 let matching_engine = OrderMatchingEngine::new(
284 instrument,
285 self.instruments.len() as u32,
286 self.fill_model.clone(),
287 self.fee_model.clone(),
288 self.book_type,
289 self.oms_type,
290 self.account_type,
291 self.clock.clone(),
292 Rc::clone(&self.cache),
293 matching_engine_config,
294 );
295 self.matching_engines.insert(instrument_id, matching_engine);
296
297 log::info!("Added instrument {instrument_id} and created matching engine");
298 Ok(())
299 }
300
301 #[must_use]
302 pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
303 self.matching_engines
304 .get(&instrument_id)
305 .and_then(OrderMatchingEngine::best_bid_price)
306 }
307
308 #[must_use]
309 pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
310 self.matching_engines
311 .get(&instrument_id)
312 .and_then(OrderMatchingEngine::best_ask_price)
313 }
314
315 pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
316 self.matching_engines
317 .get(&instrument_id)
318 .map(OrderMatchingEngine::get_book)
319 }
320
321 #[must_use]
322 pub fn get_matching_engine(
323 &self,
324 instrument_id: &InstrumentId,
325 ) -> Option<&OrderMatchingEngine> {
326 self.matching_engines.get(instrument_id)
327 }
328
329 #[must_use]
330 pub const fn get_matching_engines(&self) -> &HashMap<InstrumentId, OrderMatchingEngine> {
331 &self.matching_engines
332 }
333
334 #[must_use]
335 pub fn get_books(&self) -> HashMap<InstrumentId, OrderBook> {
336 let mut books = HashMap::new();
337 for (instrument_id, matching_engine) in &self.matching_engines {
338 books.insert(*instrument_id, matching_engine.get_book().clone());
339 }
340 books
341 }
342
343 #[must_use]
344 pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
345 instrument_id
346 .and_then(|id| {
347 self.matching_engines
348 .get(&id)
349 .map(OrderMatchingEngine::get_open_orders)
350 })
351 .unwrap_or_else(|| {
352 self.matching_engines
353 .values()
354 .flat_map(OrderMatchingEngine::get_open_orders)
355 .collect()
356 })
357 }
358
359 #[must_use]
360 pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
361 instrument_id
362 .and_then(|id| {
363 self.matching_engines
364 .get(&id)
365 .map(|engine| engine.get_open_bid_orders().to_vec())
366 })
367 .unwrap_or_else(|| {
368 self.matching_engines
369 .values()
370 .flat_map(|engine| engine.get_open_bid_orders().to_vec())
371 .collect()
372 })
373 }
374
375 #[must_use]
376 pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
377 instrument_id
378 .and_then(|id| {
379 self.matching_engines
380 .get(&id)
381 .map(|engine| engine.get_open_ask_orders().to_vec())
382 })
383 .unwrap_or_else(|| {
384 self.matching_engines
385 .values()
386 .flat_map(|engine| engine.get_open_ask_orders().to_vec())
387 .collect()
388 })
389 }
390
391 #[must_use]
395 pub fn get_account(&self) -> Option<AccountAny> {
396 self.exec_client
397 .as_ref()
398 .map(|client| client.get_account().unwrap())
399 }
400
401 pub fn adjust_account(&mut self, adjustment: Money) {
405 if self.frozen_account {
406 return;
408 }
409
410 if let Some(exec_client) = &self.exec_client {
411 let venue = exec_client.venue();
412 println!("Adjusting account for venue {venue}");
413 if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
414 match account.balance(Some(adjustment.currency)) {
415 Some(balance) => {
416 let mut current_balance = *balance;
417 current_balance.total += adjustment;
418 current_balance.free += adjustment;
419
420 let margins = match account {
421 AccountAny::Margin(margin_account) => margin_account.margins.clone(),
422 _ => HashMap::new(),
423 };
424
425 if let Some(exec_client) = &self.exec_client {
426 exec_client
427 .generate_account_state(
428 vec![current_balance],
429 margins.values().copied().collect(),
430 true,
431 self.clock.borrow().timestamp_ns(),
432 )
433 .unwrap();
434 }
435 }
436 None => {
437 log::error!(
438 "Cannot adjust account: no balance for currency {}",
439 adjustment.currency
440 );
441 }
442 }
443 } else {
444 log::error!("Cannot adjust account: no account for venue {venue}");
445 }
446 }
447 }
448
449 pub fn send(&mut self, command: TradingCommand) {
450 if !self.use_message_queue {
451 self.process_trading_command(command);
452 } else if self.latency_model.is_none() {
453 self.message_queue.push_back(command);
454 } else {
455 let (ts, counter) = self.generate_inflight_command(&command);
456 self.inflight_queue
457 .push(InflightCommand::new(ts, counter, command));
458 }
459 }
460
461 pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
465 if let Some(latency_model) = &self.latency_model {
466 let ts = match command {
467 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
468 command.ts_init() + latency_model.insert_latency_nanos
469 }
470 TradingCommand::ModifyOrder(_) => {
471 command.ts_init() + latency_model.update_latency_nanos
472 }
473 TradingCommand::CancelOrder(_)
474 | TradingCommand::CancelAllOrders(_)
475 | TradingCommand::BatchCancelOrders(_) => {
476 command.ts_init() + latency_model.delete_latency_nanos
477 }
478 _ => panic!("Invalid command was {command}"),
479 };
480
481 let counter = self
482 .inflight_counter
483 .entry(ts)
484 .and_modify(|e| *e += 1)
485 .or_insert(1);
486
487 (ts, *counter)
488 } else {
489 panic!("Latency model should be initialized");
490 }
491 }
492
493 pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
497 for module in &self.modules {
498 module.pre_process(Data::Delta(delta));
499 }
500
501 if !self.matching_engines.contains_key(&delta.instrument_id) {
502 let instrument = {
503 let cache = self.cache.as_ref().borrow();
504 cache.instrument(&delta.instrument_id).cloned()
505 };
506
507 if let Some(instrument) = instrument {
508 self.add_instrument(instrument).unwrap();
509 } else {
510 panic!(
511 "No matching engine found for instrument {}",
512 delta.instrument_id
513 );
514 }
515 }
516
517 if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
518 matching_engine.process_order_book_delta(&delta);
519 } else {
520 panic!("Matching engine should be initialized");
521 }
522 }
523
524 pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
528 for module in &self.modules {
529 module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
530 }
531
532 if !self.matching_engines.contains_key(&deltas.instrument_id) {
533 let instrument = {
534 let cache = self.cache.as_ref().borrow();
535 cache.instrument(&deltas.instrument_id).cloned()
536 };
537
538 if let Some(instrument) = instrument {
539 self.add_instrument(instrument).unwrap();
540 } else {
541 panic!(
542 "No matching engine found for instrument {}",
543 deltas.instrument_id
544 );
545 }
546 }
547
548 if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
549 matching_engine.process_order_book_deltas(&deltas);
550 } else {
551 panic!("Matching engine should be initialized");
552 }
553 }
554
555 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
559 for module in &self.modules {
560 module.pre_process(Data::Quote(quote.to_owned()));
561 }
562
563 if !self.matching_engines.contains_key("e.instrument_id) {
564 let instrument = {
565 let cache = self.cache.as_ref().borrow();
566 cache.instrument("e.instrument_id).cloned()
567 };
568
569 if let Some(instrument) = instrument {
570 self.add_instrument(instrument).unwrap();
571 } else {
572 panic!(
573 "No matching engine found for instrument {}",
574 quote.instrument_id
575 );
576 }
577 }
578
579 if let Some(matching_engine) = self.matching_engines.get_mut("e.instrument_id) {
580 matching_engine.process_quote_tick(quote);
581 } else {
582 panic!("Matching engine should be initialized");
583 }
584 }
585
586 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
590 for module in &self.modules {
591 module.pre_process(Data::Trade(trade.to_owned()));
592 }
593
594 if !self.matching_engines.contains_key(&trade.instrument_id) {
595 let instrument = {
596 let cache = self.cache.as_ref().borrow();
597 cache.instrument(&trade.instrument_id).cloned()
598 };
599
600 if let Some(instrument) = instrument {
601 self.add_instrument(instrument).unwrap();
602 } else {
603 panic!(
604 "No matching engine found for instrument {}",
605 trade.instrument_id
606 );
607 }
608 }
609
610 if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
611 matching_engine.process_trade_tick(trade);
612 } else {
613 panic!("Matching engine should be initialized");
614 }
615 }
616
617 pub fn process_bar(&mut self, bar: Bar) {
621 for module in &self.modules {
622 module.pre_process(Data::Bar(bar));
623 }
624
625 if !self.matching_engines.contains_key(&bar.instrument_id()) {
626 let instrument = {
627 let cache = self.cache.as_ref().borrow();
628 cache.instrument(&bar.instrument_id()).cloned()
629 };
630
631 if let Some(instrument) = instrument {
632 self.add_instrument(instrument).unwrap();
633 } else {
634 panic!(
635 "No matching engine found for instrument {}",
636 bar.instrument_id()
637 );
638 }
639 }
640
641 if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
642 matching_engine.process_bar(&bar);
643 } else {
644 panic!("Matching engine should be initialized");
645 }
646 }
647
648 pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
652 if !self.matching_engines.contains_key(&status.instrument_id) {
655 let instrument = {
656 let cache = self.cache.as_ref().borrow();
657 cache.instrument(&status.instrument_id).cloned()
658 };
659
660 if let Some(instrument) = instrument {
661 self.add_instrument(instrument).unwrap();
662 } else {
663 panic!(
664 "No matching engine found for instrument {}",
665 status.instrument_id
666 );
667 }
668 }
669
670 if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
671 matching_engine.process_status(status.action);
672 } else {
673 panic!("Matching engine should be initialized");
674 }
675 }
676
677 pub fn process(&mut self, ts_now: UnixNanos) {
681 while let Some(inflight) = self.inflight_queue.peek() {
685 if inflight.ts > ts_now {
686 break;
688 }
689 let inflight = self.inflight_queue.pop().unwrap();
691 self.process_trading_command(inflight.command);
692 }
693
694 while let Some(command) = self.message_queue.pop_front() {
696 self.process_trading_command(command);
697 }
698 }
699
700 pub fn reset(&mut self) {
701 for module in &self.modules {
702 module.reset();
703 }
704
705 self.generate_fresh_account_state();
706
707 for matching_engine in self.matching_engines.values_mut() {
708 matching_engine.reset();
709 }
710
711 log::info!("Resetting exchange state");
713 }
714
715 pub fn process_trading_command(&mut self, command: TradingCommand) {
719 if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
720 let account_id = if let Some(exec_client) = &self.exec_client {
721 exec_client.account_id()
722 } else {
723 panic!("Execution client should be initialized");
724 };
725 match command {
726 TradingCommand::SubmitOrder(mut command) => {
727 matching_engine.process_order(&mut command.order, account_id);
728 }
729 TradingCommand::ModifyOrder(ref command) => {
730 matching_engine.process_modify(command, account_id);
731 }
732 TradingCommand::CancelOrder(ref command) => {
733 matching_engine.process_cancel(command, account_id);
734 }
735 TradingCommand::CancelAllOrders(ref command) => {
736 matching_engine.process_cancel_all(command, account_id);
737 }
738 TradingCommand::BatchCancelOrders(ref command) => {
739 matching_engine.process_batch_cancel(command, account_id);
740 }
741 TradingCommand::SubmitOrderList(mut command) => {
742 for order in &mut command.order_list.orders {
743 matching_engine.process_order(order, account_id);
744 }
745 }
746 _ => {}
747 }
748 } else {
749 panic!("Matching engine should be initialized");
750 }
751 }
752
753 pub fn generate_fresh_account_state(&self) {
757 let balances: Vec<AccountBalance> = self
758 .starting_balances
759 .iter()
760 .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
761 .collect();
762
763 if let Some(exec_client) = &self.exec_client {
764 exec_client
765 .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
766 .unwrap();
767 }
768
769 if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
771 margin_account.set_default_leverage(self.default_leverage.to_f64().unwrap());
772
773 for (instrument_id, leverage) in &self.leverages {
775 margin_account.set_leverage(*instrument_id, leverage.to_f64().unwrap());
776 }
777 }
778 }
779}
780
781#[cfg(test)]
785mod tests {
786 use std::{
787 cell::RefCell,
788 collections::{BinaryHeap, HashMap},
789 rc::Rc,
790 sync::LazyLock,
791 };
792
793 use nautilus_common::{
794 cache::Cache,
795 clock::TestClock,
796 messages::execution::{SubmitOrder, TradingCommand},
797 msgbus::{
798 self,
799 stubs::{get_message_saving_handler, get_saved_messages},
800 },
801 };
802 use nautilus_core::{AtomicTime, UUID4, UnixNanos};
803 use nautilus_execution::models::{
804 fee::{FeeModelAny, MakerTakerFeeModel},
805 fill::FillModel,
806 latency::LatencyModel,
807 };
808 use nautilus_model::{
809 accounts::{AccountAny, MarginAccount},
810 data::{
811 Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
812 TradeTick,
813 },
814 enums::{
815 AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
816 OmsType, OrderSide, OrderType,
817 },
818 events::AccountState,
819 identifiers::{
820 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
821 VenueOrderId,
822 },
823 instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
824 orders::OrderTestBuilder,
825 types::{AccountBalance, Currency, Money, Price, Quantity},
826 };
827 use rstest::rstest;
828
829 use crate::{
830 exchange::{InflightCommand, SimulatedExchange},
831 execution_client::BacktestExecutionClient,
832 };
833
834 static ATOMIC_TIME: LazyLock<AtomicTime> =
835 LazyLock::new(|| AtomicTime::new(true, UnixNanos::default()));
836
837 fn get_exchange(
838 venue: Venue,
839 account_type: AccountType,
840 book_type: BookType,
841 cache: Option<Rc<RefCell<Cache>>>,
842 ) -> Rc<RefCell<SimulatedExchange>> {
843 let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
844 let clock = Rc::new(RefCell::new(TestClock::new()));
845 let exchange = Rc::new(RefCell::new(
846 SimulatedExchange::new(
847 venue,
848 OmsType::Netting,
849 account_type,
850 vec![Money::new(1000.0, Currency::USD())],
851 None,
852 1.into(),
853 HashMap::new(),
854 vec![],
855 cache.clone(),
856 clock,
857 FillModel::default(),
858 FeeModelAny::MakerTaker(MakerTakerFeeModel),
859 book_type,
860 None,
861 None,
862 None,
863 None,
864 None,
865 None,
866 None,
867 None,
868 None,
869 None,
870 )
871 .unwrap(),
872 ));
873
874 let clock = TestClock::new();
875 let execution_client = BacktestExecutionClient::new(
876 TraderId::default(),
877 AccountId::default(),
878 exchange.clone(),
879 cache,
880 Rc::new(RefCell::new(clock)),
881 None,
882 None,
883 );
884 exchange
885 .borrow_mut()
886 .register_client(Rc::new(execution_client));
887
888 exchange
889 }
890
891 fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
892 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
893 let order = OrderTestBuilder::new(OrderType::Market)
894 .instrument_id(instrument_id)
895 .quantity(Quantity::from(1))
896 .build();
897 TradingCommand::SubmitOrder(
898 SubmitOrder::new(
899 TraderId::default(),
900 ClientId::default(),
901 StrategyId::default(),
902 instrument_id,
903 ClientOrderId::default(),
904 VenueOrderId::default(),
905 order,
906 None,
907 None,
908 UUID4::default(),
909 ts_init,
910 )
911 .unwrap(),
912 )
913 }
914
915 #[rstest]
916 #[should_panic(
917 expected = r#"Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"#
918 )]
919 fn test_venue_mismatch_between_exchange_and_instrument(
920 crypto_perpetual_ethusdt: CryptoPerpetual,
921 ) {
922 let exchange = get_exchange(
923 Venue::new("SIM"),
924 AccountType::Margin,
925 BookType::L1_MBP,
926 None,
927 );
928 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
929 exchange.borrow_mut().add_instrument(instrument).unwrap();
930 }
931
932 #[rstest]
933 #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
934 fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
935 let exchange = get_exchange(
936 Venue::new("BINANCE"),
937 AccountType::Cash,
938 BookType::L1_MBP,
939 None,
940 );
941 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
942 exchange.borrow_mut().add_instrument(instrument).unwrap();
943 }
944
945 #[rstest]
946 fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
947 let exchange = get_exchange(
948 Venue::new("BINANCE"),
949 AccountType::Margin,
950 BookType::L1_MBP,
951 None,
952 );
953 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
954
955 exchange.borrow_mut().add_instrument(instrument).unwrap();
957
958 let quote_tick = QuoteTick::new(
960 crypto_perpetual_ethusdt.id,
961 Price::from("1000"),
962 Price::from("1001"),
963 Quantity::from(1),
964 Quantity::from(1),
965 UnixNanos::default(),
966 UnixNanos::default(),
967 );
968 exchange.borrow_mut().process_quote_tick("e_tick);
969
970 let best_bid_price = exchange
971 .borrow()
972 .best_bid_price(crypto_perpetual_ethusdt.id);
973 assert_eq!(best_bid_price, Some(Price::from("1000")));
974 let best_ask_price = exchange
975 .borrow()
976 .best_ask_price(crypto_perpetual_ethusdt.id);
977 assert_eq!(best_ask_price, Some(Price::from("1001")));
978 }
979
980 #[rstest]
981 fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
982 let exchange = get_exchange(
983 Venue::new("BINANCE"),
984 AccountType::Margin,
985 BookType::L1_MBP,
986 None,
987 );
988 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
989
990 exchange.borrow_mut().add_instrument(instrument).unwrap();
992
993 let trade_tick = TradeTick::new(
995 crypto_perpetual_ethusdt.id,
996 Price::from("1000"),
997 Quantity::from(1),
998 AggressorSide::Buyer,
999 TradeId::from("1"),
1000 UnixNanos::default(),
1001 UnixNanos::default(),
1002 );
1003 exchange.borrow_mut().process_trade_tick(&trade_tick);
1004
1005 let best_bid_price = exchange
1006 .borrow()
1007 .best_bid_price(crypto_perpetual_ethusdt.id);
1008 assert_eq!(best_bid_price, Some(Price::from("1000")));
1009 let best_ask = exchange
1010 .borrow()
1011 .best_ask_price(crypto_perpetual_ethusdt.id);
1012 assert_eq!(best_ask, Some(Price::from("1000")));
1013 }
1014
1015 #[rstest]
1016 fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1017 let exchange = get_exchange(
1018 Venue::new("BINANCE"),
1019 AccountType::Margin,
1020 BookType::L1_MBP,
1021 None,
1022 );
1023 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1024
1025 exchange.borrow_mut().add_instrument(instrument).unwrap();
1027
1028 let bar = Bar::new(
1030 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1031 Price::from("1500.00"),
1032 Price::from("1505.00"),
1033 Price::from("1490.00"),
1034 Price::from("1502.00"),
1035 Quantity::from(100),
1036 UnixNanos::default(),
1037 UnixNanos::default(),
1038 );
1039 exchange.borrow_mut().process_bar(bar);
1040
1041 let best_bid_price = exchange
1043 .borrow()
1044 .best_bid_price(crypto_perpetual_ethusdt.id);
1045 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1046 let best_ask_price = exchange
1047 .borrow()
1048 .best_ask_price(crypto_perpetual_ethusdt.id);
1049 assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1050 }
1051
1052 #[rstest]
1053 fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1054 let exchange = get_exchange(
1055 Venue::new("BINANCE"),
1056 AccountType::Margin,
1057 BookType::L1_MBP,
1058 None,
1059 );
1060 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1061
1062 exchange.borrow_mut().add_instrument(instrument).unwrap();
1064
1065 let bar_bid = Bar::new(
1068 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1069 Price::from("1500.00"),
1070 Price::from("1505.00"),
1071 Price::from("1490.00"),
1072 Price::from("1502.00"),
1073 Quantity::from(100),
1074 UnixNanos::from(1),
1075 UnixNanos::from(1),
1076 );
1077 let bar_ask = Bar::new(
1078 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1079 Price::from("1501.00"),
1080 Price::from("1506.00"),
1081 Price::from("1491.00"),
1082 Price::from("1503.00"),
1083 Quantity::from(100),
1084 UnixNanos::from(1),
1085 UnixNanos::from(1),
1086 );
1087
1088 exchange.borrow_mut().process_bar(bar_bid);
1090 exchange.borrow_mut().process_bar(bar_ask);
1091
1092 let best_bid_price = exchange
1094 .borrow()
1095 .best_bid_price(crypto_perpetual_ethusdt.id);
1096 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1097 let best_ask_price = exchange
1098 .borrow()
1099 .best_ask_price(crypto_perpetual_ethusdt.id);
1100 assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1101 }
1102
1103 #[rstest]
1104 fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1105 let exchange = get_exchange(
1106 Venue::new("BINANCE"),
1107 AccountType::Margin,
1108 BookType::L2_MBP,
1109 None,
1110 );
1111 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1112
1113 exchange.borrow_mut().add_instrument(instrument).unwrap();
1115
1116 let delta_buy = OrderBookDelta::new(
1118 crypto_perpetual_ethusdt.id,
1119 BookAction::Add,
1120 BookOrder::new(OrderSide::Buy, Price::from("1000.00"), Quantity::from(1), 1),
1121 0,
1122 0,
1123 UnixNanos::from(1),
1124 UnixNanos::from(1),
1125 );
1126 let delta_sell = OrderBookDelta::new(
1127 crypto_perpetual_ethusdt.id,
1128 BookAction::Add,
1129 BookOrder::new(
1130 OrderSide::Sell,
1131 Price::from("1001.00"),
1132 Quantity::from(1),
1133 1,
1134 ),
1135 0,
1136 1,
1137 UnixNanos::from(2),
1138 UnixNanos::from(2),
1139 );
1140
1141 exchange.borrow_mut().process_order_book_delta(delta_buy);
1143 exchange.borrow_mut().process_order_book_delta(delta_sell);
1144
1145 let book = exchange
1146 .borrow()
1147 .get_book(crypto_perpetual_ethusdt.id)
1148 .unwrap()
1149 .clone();
1150 assert_eq!(book.update_count, 2);
1151 assert_eq!(book.sequence, 1);
1152 assert_eq!(book.ts_last, UnixNanos::from(2));
1153 let best_bid_price = exchange
1154 .borrow()
1155 .best_bid_price(crypto_perpetual_ethusdt.id);
1156 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1157 let best_ask_price = exchange
1158 .borrow()
1159 .best_ask_price(crypto_perpetual_ethusdt.id);
1160 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1161 }
1162
1163 #[rstest]
1164 fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1165 let exchange = get_exchange(
1166 Venue::new("BINANCE"),
1167 AccountType::Margin,
1168 BookType::L2_MBP,
1169 None,
1170 );
1171 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1172
1173 exchange.borrow_mut().add_instrument(instrument).unwrap();
1175
1176 let delta_sell_1 = OrderBookDelta::new(
1178 crypto_perpetual_ethusdt.id,
1179 BookAction::Add,
1180 BookOrder::new(
1181 OrderSide::Sell,
1182 Price::from("1000.00"),
1183 Quantity::from(3),
1184 1,
1185 ),
1186 0,
1187 0,
1188 UnixNanos::from(1),
1189 UnixNanos::from(1),
1190 );
1191 let delta_sell_2 = OrderBookDelta::new(
1192 crypto_perpetual_ethusdt.id,
1193 BookAction::Add,
1194 BookOrder::new(
1195 OrderSide::Sell,
1196 Price::from("1001.00"),
1197 Quantity::from(1),
1198 1,
1199 ),
1200 0,
1201 1,
1202 UnixNanos::from(1),
1203 UnixNanos::from(1),
1204 );
1205 let orderbook_deltas = OrderBookDeltas::new(
1206 crypto_perpetual_ethusdt.id,
1207 vec![delta_sell_1, delta_sell_2],
1208 );
1209
1210 exchange
1212 .borrow_mut()
1213 .process_order_book_deltas(orderbook_deltas);
1214
1215 let book = exchange
1216 .borrow()
1217 .get_book(crypto_perpetual_ethusdt.id)
1218 .unwrap()
1219 .clone();
1220 assert_eq!(book.update_count, 2);
1221 assert_eq!(book.sequence, 1);
1222 assert_eq!(book.ts_last, UnixNanos::from(1));
1223 let best_bid_price = exchange
1224 .borrow()
1225 .best_bid_price(crypto_perpetual_ethusdt.id);
1226 assert_eq!(best_bid_price, None);
1228 let best_ask_price = exchange
1229 .borrow()
1230 .best_ask_price(crypto_perpetual_ethusdt.id);
1231 assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1233 }
1234
1235 #[rstest]
1236 fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1237 let exchange = get_exchange(
1238 Venue::new("BINANCE"),
1239 AccountType::Margin,
1240 BookType::L2_MBP,
1241 None,
1242 );
1243 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1244
1245 exchange.borrow_mut().add_instrument(instrument).unwrap();
1247
1248 let instrument_status = InstrumentStatus::new(
1249 crypto_perpetual_ethusdt.id,
1250 MarketStatusAction::Close, UnixNanos::from(1),
1252 UnixNanos::from(1),
1253 None,
1254 None,
1255 None,
1256 None,
1257 None,
1258 );
1259
1260 exchange
1261 .borrow_mut()
1262 .process_instrument_status(instrument_status);
1263
1264 let market_status = exchange
1265 .borrow()
1266 .get_matching_engine(&crypto_perpetual_ethusdt.id)
1267 .unwrap()
1268 .market_status;
1269 assert_eq!(market_status, MarketStatus::Closed);
1270 }
1271
1272 #[rstest]
1273 fn test_accounting() {
1274 let account_type = AccountType::Margin;
1275 let mut cache = Cache::default();
1276 let handler = get_message_saving_handler::<AccountState>(None);
1277 msgbus::register("Portfolio.update_account".into(), handler.clone());
1278 let margin_account = MarginAccount::new(
1279 AccountState::new(
1280 AccountId::from("SIM-001"),
1281 account_type,
1282 vec![AccountBalance::new(
1283 Money::from("1000 USD"),
1284 Money::from("0 USD"),
1285 Money::from("1000 USD"),
1286 )],
1287 vec![],
1288 false,
1289 UUID4::default(),
1290 UnixNanos::default(),
1291 UnixNanos::default(),
1292 None,
1293 ),
1294 false,
1295 );
1296 let () = cache
1297 .add_account(AccountAny::Margin(margin_account))
1298 .unwrap();
1299 cache.build_index();
1301
1302 let exchange = get_exchange(
1303 Venue::new("SIM"),
1304 account_type,
1305 BookType::L2_MBP,
1306 Some(Rc::new(RefCell::new(cache))),
1307 );
1308 exchange.borrow_mut().initialize_account();
1309
1310 exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1312
1313 let messages = get_saved_messages::<AccountState>(handler);
1315 assert_eq!(messages.len(), 2);
1316 let account_state_first = messages.first().unwrap();
1317 let account_state_second = messages.last().unwrap();
1318
1319 assert_eq!(account_state_first.balances.len(), 1);
1320 let current_balance = account_state_first.balances[0];
1321 assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1322 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1323 assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1324
1325 assert_eq!(account_state_second.balances.len(), 1);
1326 let current_balance = account_state_second.balances[0];
1327 assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1328 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1329 assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1330 }
1331
1332 #[rstest]
1333 fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1334 let inflight1 = InflightCommand::new(
1336 UnixNanos::from(100),
1337 1,
1338 create_submit_order_command(UnixNanos::from(100)),
1339 );
1340 let inflight2 = InflightCommand::new(
1341 UnixNanos::from(200),
1342 2,
1343 create_submit_order_command(UnixNanos::from(200)),
1344 );
1345 let inflight3 = InflightCommand::new(
1346 UnixNanos::from(100),
1347 2,
1348 create_submit_order_command(UnixNanos::from(100)),
1349 );
1350
1351 let mut inflight_heap = BinaryHeap::new();
1353 inflight_heap.push(inflight1);
1354 inflight_heap.push(inflight2);
1355 inflight_heap.push(inflight3);
1356
1357 let first = inflight_heap.pop().unwrap();
1360 let second = inflight_heap.pop().unwrap();
1361 let third = inflight_heap.pop().unwrap();
1362
1363 assert_eq!(first.ts, UnixNanos::from(100));
1364 assert_eq!(first.counter, 1);
1365 assert_eq!(second.ts, UnixNanos::from(100));
1366 assert_eq!(second.counter, 2);
1367 assert_eq!(third.ts, UnixNanos::from(200));
1368 assert_eq!(third.counter, 2);
1369 }
1370
1371 #[rstest]
1372 fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1373 let exchange = get_exchange(
1374 Venue::new("BINANCE"),
1375 AccountType::Margin,
1376 BookType::L2_MBP,
1377 None,
1378 );
1379
1380 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1381 exchange.borrow_mut().add_instrument(instrument).unwrap();
1382
1383 let command1 = create_submit_order_command(UnixNanos::from(100));
1384 let command2 = create_submit_order_command(UnixNanos::from(200));
1385
1386 exchange.borrow_mut().send(command1);
1387 exchange.borrow_mut().send(command2);
1388
1389 assert_eq!(exchange.borrow().message_queue.len(), 2);
1392 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1393
1394 exchange.borrow_mut().process(UnixNanos::from(300));
1396 assert_eq!(exchange.borrow().message_queue.len(), 0);
1397 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1398 }
1399
1400 #[rstest]
1401 fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1402 let latency_model = LatencyModel::new(
1403 UnixNanos::from(100),
1404 UnixNanos::from(200),
1405 UnixNanos::from(300),
1406 UnixNanos::from(100),
1407 );
1408 let exchange = get_exchange(
1409 Venue::new("BINANCE"),
1410 AccountType::Margin,
1411 BookType::L2_MBP,
1412 None,
1413 );
1414 exchange.borrow_mut().set_latency_model(latency_model);
1415
1416 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1417 exchange.borrow_mut().add_instrument(instrument).unwrap();
1418
1419 let command1 = create_submit_order_command(UnixNanos::from(100));
1420 let command2 = create_submit_order_command(UnixNanos::from(150));
1421 exchange.borrow_mut().send(command1);
1422 exchange.borrow_mut().send(command2);
1423
1424 assert_eq!(exchange.borrow().message_queue.len(), 0);
1426 assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1427 assert_eq!(
1429 exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1430 UnixNanos::from(300)
1431 );
1432 assert_eq!(
1434 exchange.borrow().inflight_queue.iter().nth(1).unwrap().ts,
1435 UnixNanos::from(350)
1436 );
1437
1438 exchange.borrow_mut().process(UnixNanos::from(320));
1440 assert_eq!(exchange.borrow().message_queue.len(), 0);
1441 assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1442 assert_eq!(
1443 exchange.borrow().inflight_queue.iter().next().unwrap().ts,
1444 UnixNanos::from(350)
1445 );
1446 }
1447}