1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::RefCell,
23 cmp::min,
24 collections::HashMap,
25 fmt::Debug,
26 ops::{Add, Sub},
27 rc::Rc,
28};
29
30use chrono::TimeDelta;
31use nautilus_common::{
32 cache::Cache,
33 clock::Clock,
34 messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
35 msgbus,
36};
37use nautilus_core::{UUID4, UnixNanos};
38use nautilus_model::{
39 data::{Bar, BarType, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, order::BookOrder},
40 enums::{
41 AccountType, AggregationSource, AggressorSide, BarAggregation, BookType, ContingencyType,
42 LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderSideSpecified,
43 OrderStatus, OrderType, PriceType, TimeInForce,
44 },
45 events::{
46 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
47 OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
48 },
49 identifiers::{
50 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
51 VenueOrderId,
52 },
53 instruments::{EXPIRING_INSTRUMENT_TYPES, Instrument, InstrumentAny},
54 orderbook::OrderBook,
55 orders::{Order, OrderAny, PassiveOrderAny, StopOrderAny},
56 position::Position,
57 types::{Currency, Money, Price, Quantity, fixed::FIXED_PRECISION},
58};
59use ustr::Ustr;
60
61use crate::{
62 matching_core::OrderMatchingCore,
63 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
64 models::{
65 fee::{FeeModel, FeeModelAny},
66 fill::FillModel,
67 },
68 trailing::trailing_stop_calculate,
69};
70
71pub struct OrderMatchingEngine {
73 pub venue: Venue,
75 pub instrument: InstrumentAny,
77 pub raw_id: u32,
79 pub book_type: BookType,
81 pub oms_type: OmsType,
83 pub account_type: AccountType,
85 pub market_status: MarketStatus,
87 pub config: OrderMatchingEngineConfig,
89 clock: Rc<RefCell<dyn Clock>>,
90 cache: Rc<RefCell<Cache>>,
91 book: OrderBook,
92 pub core: OrderMatchingCore,
93 fill_model: FillModel,
94 fee_model: FeeModelAny,
95 target_bid: Option<Price>,
96 target_ask: Option<Price>,
97 target_last: Option<Price>,
98 last_bar_bid: Option<Bar>,
99 last_bar_ask: Option<Bar>,
100 execution_bar_types: HashMap<InstrumentId, BarType>,
101 execution_bar_deltas: HashMap<BarType, TimeDelta>,
102 account_ids: HashMap<TraderId, AccountId>,
103 cached_filled_qty: HashMap<ClientOrderId, Quantity>,
104 ids_generator: IdsGenerator,
105}
106
107impl Debug for OrderMatchingEngine {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct(stringify!(OrderMatchingEngine))
110 .field("venue", &self.venue)
111 .field("instrument", &self.instrument.id())
112 .finish()
113 }
114}
115
116impl OrderMatchingEngine {
117 #[allow(clippy::too_many_arguments)]
118 pub fn new(
119 instrument: InstrumentAny,
120 raw_id: u32,
121 fill_model: FillModel,
122 fee_model: FeeModelAny,
123 book_type: BookType,
124 oms_type: OmsType,
125 account_type: AccountType,
126 clock: Rc<RefCell<dyn Clock>>,
127 cache: Rc<RefCell<Cache>>,
128 config: OrderMatchingEngineConfig,
129 ) -> Self {
130 let book = OrderBook::new(instrument.id(), book_type);
131 let core = OrderMatchingCore::new(
132 instrument.id(),
133 instrument.price_increment(),
134 None, None, None, );
138 let ids_generator = IdsGenerator::new(
139 instrument.id().venue,
140 oms_type,
141 raw_id,
142 config.use_random_ids,
143 config.use_position_ids,
144 cache.clone(),
145 );
146
147 Self {
148 venue: instrument.id().venue,
149 instrument,
150 raw_id,
151 fill_model,
152 fee_model,
153 book_type,
154 oms_type,
155 account_type,
156 clock,
157 cache,
158 book,
159 core,
160 market_status: MarketStatus::Open,
161 config,
162 target_bid: None,
163 target_ask: None,
164 target_last: None,
165 last_bar_bid: None,
166 last_bar_ask: None,
167 execution_bar_types: HashMap::new(),
168 execution_bar_deltas: HashMap::new(),
169 account_ids: HashMap::new(),
170 cached_filled_qty: HashMap::new(),
171 ids_generator,
172 }
173 }
174
175 pub fn reset(&mut self) {
176 self.book.clear(0, UnixNanos::default());
177 self.execution_bar_types.clear();
178 self.execution_bar_deltas.clear();
179 self.account_ids.clear();
180 self.cached_filled_qty.clear();
181 self.core.reset();
182 self.target_bid = None;
183 self.target_ask = None;
184 self.target_last = None;
185 self.ids_generator.reset();
186
187 log::info!("Reset {}", self.instrument.id());
188 }
189
190 pub const fn set_fill_model(&mut self, fill_model: FillModel) {
191 self.fill_model = fill_model;
192 }
193
194 #[must_use]
195 pub fn best_bid_price(&self) -> Option<Price> {
196 self.book.best_bid_price()
197 }
198
199 #[must_use]
200 pub fn best_ask_price(&self) -> Option<Price> {
201 self.book.best_ask_price()
202 }
203
204 #[must_use]
205 pub const fn get_book(&self) -> &OrderBook {
206 &self.book
207 }
208
209 #[must_use]
210 pub const fn get_open_bid_orders(&self) -> &[PassiveOrderAny] {
211 self.core.get_orders_bid()
212 }
213
214 #[must_use]
215 pub const fn get_open_ask_orders(&self) -> &[PassiveOrderAny] {
216 self.core.get_orders_ask()
217 }
218
219 #[must_use]
220 pub fn get_open_orders(&self) -> Vec<PassiveOrderAny> {
221 let mut orders = Vec::new();
223 orders.extend_from_slice(self.core.get_orders_bid());
224 orders.extend_from_slice(self.core.get_orders_ask());
225 orders
226 }
227
228 #[must_use]
229 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
230 self.core.order_exists(client_order_id)
231 }
232
233 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) {
237 log::debug!("Processing {delta}");
238
239 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
240 self.book.apply_delta(delta);
241 }
242
243 self.iterate(delta.ts_event);
244 }
245
246 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
247 log::debug!("Processing {deltas}");
248
249 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
250 self.book.apply_deltas(deltas);
251 }
252
253 self.iterate(deltas.ts_event);
254 }
255
256 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
260 log::debug!("Processing {quote}");
261
262 if self.book_type == BookType::L1_MBP {
263 self.book.update_quote_tick(quote).unwrap();
264 }
265
266 self.iterate(quote.ts_event);
267 }
268
269 pub fn process_bar(&mut self, bar: &Bar) {
273 log::debug!("Processing {bar}");
274
275 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
277 return;
278 }
279
280 let bar_type = bar.bar_type;
281 if bar_type.aggregation_source() == AggregationSource::Internal {
283 return;
284 }
285
286 if bar_type.spec().aggregation == BarAggregation::Month {
288 return;
289 }
290
291 let execution_bar_type =
292 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
293 execution_bar_type.to_owned()
294 } else {
295 self.execution_bar_types
296 .insert(bar.instrument_id(), bar_type);
297 self.execution_bar_deltas
298 .insert(bar_type, bar_type.spec().timedelta());
299 bar_type
300 };
301
302 if execution_bar_type != bar_type {
303 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
304 if bar_type_timedelta.is_none() {
305 bar_type_timedelta = Some(bar_type.spec().timedelta());
306 self.execution_bar_deltas
307 .insert(bar_type, bar_type_timedelta.unwrap());
308 }
309 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
310 >= &bar_type_timedelta.unwrap()
311 {
312 self.execution_bar_types
313 .insert(bar_type.instrument_id(), bar_type);
314 } else {
315 return;
316 }
317 }
318
319 match bar_type.spec().price_type {
320 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
321 PriceType::Bid => {
322 self.last_bar_bid = Some(bar.to_owned());
323 self.process_quote_ticks_from_bar(bar);
324 }
325 PriceType::Ask => {
326 self.last_bar_ask = Some(bar.to_owned());
327 self.process_quote_ticks_from_bar(bar);
328 }
329 PriceType::Mark => panic!("Not implemented"),
330 }
331 }
332
333 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
334 let size = Quantity::new(bar.volume.as_f64() / 4.0, bar.volume.precision);
336 let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
337 {
338 AggressorSide::Buyer
339 } else {
340 AggressorSide::Seller
341 };
342
343 let mut trade_tick = TradeTick::new(
345 bar.instrument_id(),
346 bar.open,
347 size,
348 aggressor_side,
349 self.ids_generator.generate_trade_id(),
350 bar.ts_event,
351 bar.ts_event,
352 );
353
354 if !self.core.is_last_initialized {
357 self.book.update_trade_tick(&trade_tick).unwrap();
358 self.iterate(trade_tick.ts_init);
359 self.core.set_last_raw(trade_tick.price);
360 }
361
362 if self.core.last.is_some_and(|last| bar.high > last) {
365 trade_tick.price = bar.high;
366 trade_tick.aggressor_side = AggressorSide::Buyer;
367 trade_tick.trade_id = self.ids_generator.generate_trade_id();
368
369 self.book.update_trade_tick(&trade_tick).unwrap();
370 self.iterate(trade_tick.ts_init);
371
372 self.core.set_last_raw(trade_tick.price);
373 }
374
375 if self.core.last.is_some_and(|last| bar.low < last) {
379 trade_tick.price = bar.low;
380 trade_tick.aggressor_side = AggressorSide::Seller;
381 trade_tick.trade_id = self.ids_generator.generate_trade_id();
382
383 self.book.update_trade_tick(&trade_tick).unwrap();
384 self.iterate(trade_tick.ts_init);
385
386 self.core.set_last_raw(trade_tick.price);
387 }
388
389 if self.core.last.is_some_and(|last| bar.close != last) {
394 trade_tick.price = bar.close;
395 trade_tick.aggressor_side = if bar.close > self.core.last.unwrap() {
396 AggressorSide::Buyer
397 } else {
398 AggressorSide::Seller
399 };
400 trade_tick.trade_id = self.ids_generator.generate_trade_id();
401
402 self.book.update_trade_tick(&trade_tick).unwrap();
403 self.iterate(trade_tick.ts_init);
404
405 self.core.set_last_raw(trade_tick.price);
406 }
407 }
408
409 fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
410 if self.last_bar_bid.is_none()
412 || self.last_bar_ask.is_none()
413 || self.last_bar_bid.unwrap().ts_event != self.last_bar_ask.unwrap().ts_event
414 {
415 return;
416 }
417 let bid_bar = self.last_bar_bid.unwrap();
418 let ask_bar = self.last_bar_ask.unwrap();
419 let bid_size = Quantity::new(bid_bar.volume.as_f64() / 4.0, bar.volume.precision);
420 let ask_size = Quantity::new(ask_bar.volume.as_f64() / 4.0, bar.volume.precision);
421
422 let mut quote_tick = QuoteTick::new(
424 self.book.instrument_id,
425 bid_bar.open,
426 ask_bar.open,
427 bid_size,
428 ask_size,
429 bid_bar.ts_init,
430 bid_bar.ts_init,
431 );
432
433 self.book.update_quote_tick("e_tick).unwrap();
435 self.iterate(quote_tick.ts_init);
436
437 quote_tick.bid_price = bid_bar.high;
439 quote_tick.ask_price = ask_bar.high;
440 self.book.update_quote_tick("e_tick).unwrap();
441 self.iterate(quote_tick.ts_init);
442
443 quote_tick.bid_price = bid_bar.low;
445 quote_tick.ask_price = ask_bar.low;
446 self.book.update_quote_tick("e_tick).unwrap();
447 self.iterate(quote_tick.ts_init);
448
449 quote_tick.bid_price = bid_bar.close;
451 quote_tick.ask_price = ask_bar.close;
452 self.book.update_quote_tick("e_tick).unwrap();
453 self.iterate(quote_tick.ts_init);
454
455 self.last_bar_bid = None;
457 self.last_bar_ask = None;
458 }
459
460 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
464 log::debug!("Processing {trade}");
465
466 if self.book_type == BookType::L1_MBP {
467 self.book.update_trade_tick(trade).unwrap();
468 }
469 self.core.set_last_raw(trade.price);
470
471 self.iterate(trade.ts_event);
472 }
473
474 pub fn process_status(&mut self, action: MarketStatusAction) {
475 log::debug!("Processing {action}");
476
477 if self.market_status == MarketStatus::Closed
479 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
480 {
481 self.market_status = MarketStatus::Open;
482 }
483 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
485 self.market_status = MarketStatus::Paused;
486 }
487 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
489 self.market_status = MarketStatus::Suspended;
490 }
491 if self.market_status == MarketStatus::Open
493 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
494 {
495 self.market_status = MarketStatus::Closed;
496 }
497 }
498
499 #[allow(clippy::needless_return)]
505 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
506 {
508 let cache_borrow = self.cache.as_ref().borrow();
509
510 if self.core.order_exists(order.client_order_id()) {
511 self.generate_order_rejected(order, "Order already exists".into());
512 return;
513 }
514
515 self.account_ids.insert(order.trader_id(), account_id);
517
518 if EXPIRING_INSTRUMENT_TYPES.contains(&self.instrument.instrument_class()) {
520 if let Some(activation_ns) = self.instrument.activation_ns() {
521 if self.clock.borrow().timestamp_ns() < activation_ns {
522 self.generate_order_rejected(
523 order,
524 format!(
525 "Contract {} is not yet active, activation {}",
526 self.instrument.id(),
527 self.instrument.activation_ns().unwrap()
528 )
529 .into(),
530 );
531 return;
532 }
533 }
534 if let Some(expiration_ns) = self.instrument.expiration_ns() {
535 if self.clock.borrow().timestamp_ns() >= expiration_ns {
536 self.generate_order_rejected(
537 order,
538 format!(
539 "Contract {} has expired, expiration {}",
540 self.instrument.id(),
541 self.instrument.expiration_ns().unwrap()
542 )
543 .into(),
544 );
545 return;
546 }
547 }
548 }
549
550 if self.config.support_contingent_orders {
552 if let Some(parent_order_id) = order.parent_order_id() {
553 let parent_order = cache_borrow.order(&parent_order_id);
554 if parent_order.is_none()
555 || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
556 {
557 panic!("OTO parent not found");
558 }
559 if let Some(parent_order) = parent_order {
560 let parent_order_status = parent_order.status();
561 let order_is_open = order.is_open();
562 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
563 self.generate_order_rejected(
564 order,
565 format!("Rejected OTO order from {parent_order_id}").into(),
566 );
567 return;
568 } else if parent_order.status() == OrderStatus::Accepted
569 && parent_order.status() == OrderStatus::Triggered
570 {
571 log::info!(
572 "Pending OTO order {} triggers from {parent_order_id}",
573 order.client_order_id(),
574 );
575 return;
576 }
577 }
578 }
579
580 if let Some(linked_order_ids) = order.linked_order_ids() {
581 for client_order_id in linked_order_ids {
582 match cache_borrow.order(client_order_id) {
583 Some(contingent_order)
584 if (order.contingency_type().unwrap() == ContingencyType::Oco
585 || order.contingency_type().unwrap()
586 == ContingencyType::Ouo)
587 && !order.is_closed()
588 && contingent_order.is_closed() =>
589 {
590 self.generate_order_rejected(
591 order,
592 format!("Contingent order {client_order_id} already closed")
593 .into(),
594 );
595 return;
596 }
597 None => panic!("Cannot find contingent order for {client_order_id}"),
598 _ => {}
599 }
600 }
601 }
602 }
603
604 if order.quantity().precision != self.instrument.size_precision() {
606 self.generate_order_rejected(
607 order,
608 format!(
609 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
610 order.client_order_id(),
611 order.quantity().precision,
612 self.instrument.id(),
613 self.instrument.size_precision()
614 )
615 .into(),
616 );
617 return;
618 }
619
620 if let Some(price) = order.price() {
622 if price.precision != self.instrument.price_precision() {
623 self.generate_order_rejected(
624 order,
625 format!(
626 "Invalid order price precision for order {}, was {} when {} price precision is {}",
627 order.client_order_id(),
628 price.precision,
629 self.instrument.id(),
630 self.instrument.price_precision()
631 )
632 .into(),
633 );
634 return;
635 }
636 }
637
638 if let Some(trigger_price) = order.trigger_price() {
640 if trigger_price.precision != self.instrument.price_precision() {
641 self.generate_order_rejected(
642 order,
643 format!(
644 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
645 order.client_order_id(),
646 trigger_price.precision,
647 self.instrument.id(),
648 self.instrument.price_precision()
649 )
650 .into(),
651 );
652 return;
653 }
654 }
655
656 let position: Option<&Position> = cache_borrow
658 .position_for_order(&order.client_order_id())
659 .or_else(|| {
660 if self.oms_type == OmsType::Netting {
661 let position_id = PositionId::new(
662 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
663 );
664 cache_borrow.position(&position_id)
665 } else {
666 None
667 }
668 });
669
670 if order.order_side() == OrderSide::Sell
672 && self.account_type != AccountType::Margin
673 && matches!(self.instrument, InstrumentAny::Equity(_))
674 && (position.is_none()
675 || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
676 {
677 let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
678 self.generate_order_rejected(
679 order,
680 format!(
681 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
682 )
683 .into(),
684 );
685 return;
686 }
687
688 if self.config.use_reduce_only
690 && order.is_reduce_only()
691 && !order.is_closed()
692 && position.is_none_or(|pos| {
693 pos.is_closed()
694 || (order.is_buy() && pos.is_long())
695 || (order.is_sell() && pos.is_short())
696 })
697 {
698 self.generate_order_rejected(
699 order,
700 format!(
701 "Reduce-only order {} ({}-{}) would have increased position",
702 order.client_order_id(),
703 order.order_type().to_string().to_uppercase(),
704 order.order_side().to_string().to_uppercase()
705 )
706 .into(),
707 );
708 return;
709 }
710 }
711
712 match order.order_type() {
713 OrderType::Market => self.process_market_order(order),
714 OrderType::Limit => self.process_limit_order(order),
715 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
716 OrderType::StopMarket => self.process_stop_market_order(order),
717 OrderType::StopLimit => self.process_stop_limit_order(order),
718 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
719 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
720 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
721 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
722 }
723 }
724
725 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
726 if let Some(order) = self.core.get_order(command.client_order_id) {
727 self.update_order(
728 &mut order.to_any(),
729 command.quantity,
730 command.price,
731 command.trigger_price,
732 None,
733 );
734 } else {
735 self.generate_order_modify_rejected(
736 command.trader_id,
737 command.strategy_id,
738 command.instrument_id,
739 command.client_order_id,
740 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
741 Some(command.venue_order_id),
742 Some(account_id),
743 );
744 }
745 }
746
747 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
748 match self.core.get_order(command.client_order_id) {
749 Some(passive_order) => {
750 if passive_order.is_inflight() || passive_order.is_open() {
751 self.cancel_order(&OrderAny::from(passive_order.to_owned()), None);
752 }
753 }
754 None => self.generate_order_cancel_rejected(
755 command.trader_id,
756 command.strategy_id,
757 account_id,
758 command.instrument_id,
759 command.client_order_id,
760 command.venue_order_id,
761 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
762 ),
763 }
764 }
765
766 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
767 let open_orders = self
768 .cache
769 .borrow()
770 .orders_open(None, Some(&command.instrument_id), None, None)
771 .into_iter()
772 .cloned()
773 .collect::<Vec<OrderAny>>();
774 for order in open_orders {
775 if command.order_side != OrderSide::NoOrderSide
776 && command.order_side != order.order_side()
777 {
778 continue;
779 }
780 if order.is_inflight() || order.is_open() {
781 self.cancel_order(&order, None);
782 }
783 }
784 }
785
786 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
787 for order in &command.cancels {
788 self.process_cancel(order, account_id);
789 }
790 }
791
792 fn process_market_order(&mut self, order: &mut OrderAny) {
793 if order.time_in_force() == TimeInForce::AtTheOpen
794 || order.time_in_force() == TimeInForce::AtTheClose
795 {
796 log::error!(
797 "Market auction for the time in force {} is currently not supported",
798 order.time_in_force()
799 );
800 return;
801 }
802
803 let order_side = order.order_side();
805 let is_ask_initialized = self.core.is_ask_initialized;
806 let is_bid_initialized = self.core.is_bid_initialized;
807 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
808 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
809 {
810 self.generate_order_rejected(
811 order,
812 format!("No market for {}", order.instrument_id()).into(),
813 );
814 return;
815 }
816
817 self.fill_market_order(order);
818 }
819
820 fn process_limit_order(&mut self, order: &mut OrderAny) {
821 let limit_px = order.price().expect("Limit order must have a price");
822 if order.is_post_only()
823 && self
824 .core
825 .is_limit_matched(order.order_side_specified(), limit_px)
826 {
827 self.generate_order_rejected(
828 order,
829 format!(
830 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
831 order.order_type(),
832 order.order_side(),
833 order.price().unwrap(),
834 self.core
835 .bid
836 .map_or_else(|| "None".to_string(), |p| p.to_string()),
837 self.core
838 .ask
839 .map_or_else(|| "None".to_string(), |p| p.to_string())
840 )
841 .into(),
842 );
843 return;
844 }
845
846 self.accept_order(order);
848
849 if self
851 .core
852 .is_limit_matched(order.order_side_specified(), limit_px)
853 {
854 if order.liquidity_side().is_some()
856 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
857 {
858 order.set_liquidity_side(LiquiditySide::Taker);
859 }
860 self.fill_limit_order(order);
861 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
862 self.cancel_order(order, None);
863 }
864 }
865
866 fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
867 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
869 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
870 {
871 self.generate_order_rejected(
872 order,
873 format!("No market for {}", order.instrument_id()).into(),
874 );
875 return;
876 }
877
878 self.fill_market_order(order);
880
881 if order.is_open() {
882 self.accept_order(order);
883 }
884 }
885
886 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
887 let stop_px = order
888 .trigger_price()
889 .expect("Stop order must have a trigger price");
890 if self
891 .core
892 .is_stop_matched(order.order_side_specified(), stop_px)
893 {
894 if self.config.reject_stop_orders {
895 self.generate_order_rejected(
896 order,
897 format!(
898 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
899 order.order_type(),
900 order.order_side(),
901 order.trigger_price().unwrap(),
902 self.core
903 .bid
904 .map_or_else(|| "None".to_string(), |p| p.to_string()),
905 self.core
906 .ask
907 .map_or_else(|| "None".to_string(), |p| p.to_string())
908 ).into(),
909 );
910 return;
911 }
912 self.fill_market_order(order);
913 return;
914 }
915
916 self.accept_order(order);
918 }
919
920 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
921 let stop_px = order
922 .trigger_price()
923 .expect("Stop order must have a trigger price");
924 if self
925 .core
926 .is_stop_matched(order.order_side_specified(), stop_px)
927 {
928 if self.config.reject_stop_orders {
929 self.generate_order_rejected(
930 order,
931 format!(
932 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
933 order.order_type(),
934 order.order_side(),
935 order.trigger_price().unwrap(),
936 self.core
937 .bid
938 .map_or_else(|| "None".to_string(), |p| p.to_string()),
939 self.core
940 .ask
941 .map_or_else(|| "None".to_string(), |p| p.to_string())
942 ).into(),
943 );
944 return;
945 }
946
947 self.accept_order(order);
948 self.generate_order_triggered(order);
949
950 let limit_px = order.price().expect("Stop limit order must have a price");
952 if self
953 .core
954 .is_limit_matched(order.order_side_specified(), limit_px)
955 {
956 order.set_liquidity_side(LiquiditySide::Taker);
957 self.fill_limit_order(order);
958 }
959 }
960
961 self.accept_order(order);
963 }
964
965 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
966 if self
967 .core
968 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
969 {
970 if self.config.reject_stop_orders {
971 self.generate_order_rejected(
972 order,
973 format!(
974 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
975 order.order_type(),
976 order.order_side(),
977 order.trigger_price().unwrap(),
978 self.core
979 .bid
980 .map_or_else(|| "None".to_string(), |p| p.to_string()),
981 self.core
982 .ask
983 .map_or_else(|| "None".to_string(), |p| p.to_string())
984 ).into(),
985 );
986 return;
987 }
988 self.fill_market_order(order);
989 return;
990 }
991
992 self.accept_order(order);
994 }
995
996 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
997 if self
998 .core
999 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1000 {
1001 if self.config.reject_stop_orders {
1002 self.generate_order_rejected(
1003 order,
1004 format!(
1005 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1006 order.order_type(),
1007 order.order_side(),
1008 order.trigger_price().unwrap(),
1009 self.core
1010 .bid
1011 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1012 self.core
1013 .ask
1014 .map_or_else(|| "None".to_string(), |p| p.to_string())
1015 ).into(),
1016 );
1017 return;
1018 }
1019 self.accept_order(order);
1020 self.generate_order_triggered(order);
1021
1022 if self
1024 .core
1025 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1026 {
1027 order.set_liquidity_side(LiquiditySide::Taker);
1028 self.fill_limit_order(order);
1029 }
1030 return;
1031 }
1032
1033 self.accept_order(order);
1035 }
1036
1037 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1038 if let Some(trigger_price) = order.trigger_price() {
1039 if self
1040 .core
1041 .is_stop_matched(order.order_side_specified(), trigger_price)
1042 {
1043 self.generate_order_rejected(
1044 order,
1045 format!(
1046 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1047 order.order_type(),
1048 order.order_side(),
1049 trigger_price,
1050 self.core
1051 .bid
1052 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1053 self.core
1054 .ask
1055 .map_or_else(|| "None".to_string(), |p| p.to_string())
1056 ).into(),
1057 );
1058 return;
1059 }
1060 }
1061
1062 self.accept_order(order);
1064 }
1065
1066 pub fn iterate(&mut self, timestamp_ns: UnixNanos) {
1075 if self.book.has_bid() {
1079 self.core.set_bid_raw(self.book.best_bid_price().unwrap());
1080 }
1081 if self.book.has_ask() {
1082 self.core.set_ask_raw(self.book.best_ask_price().unwrap());
1083 }
1084 self.core.iterate();
1085
1086 self.core.bid = self.book.best_bid_price();
1087 self.core.ask = self.book.best_ask_price();
1088
1089 let orders_bid = self.core.get_orders_bid().to_vec();
1090 let orders_ask = self.core.get_orders_ask().to_vec();
1091
1092 self.iterate_orders(timestamp_ns, &orders_bid);
1093 self.iterate_orders(timestamp_ns, &orders_ask);
1094 }
1095
1096 fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[PassiveOrderAny]) {
1097 for order in orders {
1098 if order.is_closed() {
1099 continue;
1100 }
1101
1102 if self.config.support_gtd_orders {
1104 if let Some(expire_time) = order.expire_time() {
1105 if timestamp_ns >= expire_time {
1106 self.core.delete_order(order).unwrap();
1108 self.cached_filled_qty.remove(&order.client_order_id());
1109 self.expire_order(order);
1110 }
1111 }
1112 }
1113
1114 if let PassiveOrderAny::Stop(o) = order {
1116 if let PassiveOrderAny::Stop(
1117 StopOrderAny::TrailingStopMarket(_) | StopOrderAny::TrailingStopLimit(_),
1118 ) = order
1119 {
1120 let mut order = OrderAny::from(o.to_owned());
1121 self.update_trailing_stop_order(&mut order);
1122 }
1123 }
1124
1125 if let Some(target_bid) = self.target_bid {
1127 self.core.bid = Some(target_bid);
1128 self.target_bid = None;
1129 }
1130 if let Some(target_ask) = self.target_ask {
1131 self.core.ask = Some(target_ask);
1132 self.target_ask = None;
1133 }
1134 if let Some(target_last) = self.target_last {
1135 self.core.last = Some(target_last);
1136 self.target_last = None;
1137 }
1138 }
1139
1140 self.target_bid = None;
1142 self.target_ask = None;
1143 self.target_last = None;
1144 }
1145
1146 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1147 match order.price() {
1148 Some(order_price) => {
1149 let book_order =
1151 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
1152
1153 let mut fills = self.book.simulate_fills(&book_order);
1154
1155 if fills.is_empty() {
1157 return fills;
1158 }
1159
1160 if let Some(triggered_price) = order.trigger_price() {
1162 if order
1164 .liquidity_side()
1165 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
1166 {
1167 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
1168 let first_fill = fills.first().unwrap();
1170 let triggered_qty = first_fill.1;
1171 fills[0] = (triggered_price, triggered_qty);
1172 self.target_bid = self.core.bid;
1173 self.target_ask = self.core.ask;
1174 self.target_last = self.core.last;
1175 self.core.set_ask_raw(order_price);
1176 self.core.set_last_raw(order_price);
1177 } else if order.order_side() == OrderSide::Buy
1178 && order_price < triggered_price
1179 {
1180 let first_fill = fills.first().unwrap();
1182 let triggered_qty = first_fill.1;
1183 fills[0] = (triggered_price, triggered_qty);
1184 self.target_bid = self.core.bid;
1185 self.target_ask = self.core.ask;
1186 self.target_last = self.core.last;
1187 self.core.set_bid_raw(order_price);
1188 self.core.set_last_raw(order_price);
1189 }
1190 }
1191 }
1192
1193 if order
1195 .liquidity_side()
1196 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1197 {
1198 match order.order_side().as_specified() {
1199 OrderSideSpecified::Buy => {
1200 let target_price = if order
1201 .trigger_price()
1202 .is_some_and(|trigger_price| order_price > trigger_price)
1203 {
1204 order.trigger_price().unwrap()
1205 } else {
1206 order_price
1207 };
1208 for fill in &fills {
1209 let last_px = fill.0;
1210 if last_px < order_price {
1211 self.target_bid = self.core.bid;
1213 self.target_ask = self.core.ask;
1214 self.target_last = self.core.last;
1215 self.core.set_ask_raw(target_price);
1216 self.core.set_last_raw(target_price);
1217 }
1218 }
1219 }
1220 OrderSideSpecified::Sell => {
1221 let target_price = if order
1222 .trigger_price()
1223 .is_some_and(|trigger_price| order_price < trigger_price)
1224 {
1225 order.trigger_price().unwrap()
1226 } else {
1227 order_price
1228 };
1229 for fill in &fills {
1230 let last_px = fill.0;
1231 if last_px > order_price {
1232 self.target_bid = self.core.bid;
1234 self.target_ask = self.core.ask;
1235 self.target_last = self.core.last;
1236 self.core.set_bid_raw(target_price);
1237 self.core.set_last_raw(target_price);
1238 }
1239 }
1240 }
1241 }
1242 }
1243
1244 fills
1245 }
1246 None => panic!("Limit order must have a price"),
1247 }
1248 }
1249
1250 fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1251 let price = match order.order_side().as_specified() {
1253 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
1254 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
1255 };
1256
1257 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
1259 self.book.simulate_fills(&book_order)
1260 }
1261
1262 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
1263 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id()) {
1264 if filled_qty >= &order.quantity() {
1265 log::info!(
1266 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
1267 filled_qty,
1268 order.quantity(),
1269 order.filled_qty(),
1270 order.quantity()
1271 );
1272 return;
1273 }
1274 }
1275
1276 let venue_position_id = self.ids_generator.get_position_id(order, Some(true));
1277 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
1278 let cache = self.cache.as_ref().borrow();
1279 cache.position(&venue_position_id).cloned()
1280 } else {
1281 None
1282 };
1283
1284 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1285 log::warn!(
1286 "Canceling REDUCE_ONLY {} as would increase position",
1287 order.order_type()
1288 );
1289 self.cancel_order(order, None);
1290 return;
1291 }
1292 order.set_liquidity_side(LiquiditySide::Taker);
1294 let fills = self.determine_market_price_and_volume(order);
1295 self.apply_fills(order, fills, LiquiditySide::Taker, None, position);
1296 }
1297
1298 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
1302 match order.price() {
1303 Some(order_price) => {
1304 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
1305 if cached_filled_qty.is_some() && *cached_filled_qty.unwrap() >= order.quantity() {
1306 log::debug!(
1307 "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
1308 cached_filled_qty.unwrap(),
1309 order.quantity(),
1310 order.filled_qty(),
1311 order.leaves_qty(),
1312 );
1313 return;
1314 }
1315
1316 if order
1317 .liquidity_side()
1318 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1319 {
1320 if order.order_side() == OrderSide::Buy
1321 && self.core.bid.is_some_and(|bid| bid == order_price)
1322 && !self.fill_model.is_limit_filled()
1323 {
1324 return;
1326 }
1327 if order.order_side() == OrderSide::Sell
1328 && self.core.ask.is_some_and(|ask| ask == order_price)
1329 && !self.fill_model.is_limit_filled()
1330 {
1331 return;
1333 }
1334 }
1335
1336 let venue_position_id = self.ids_generator.get_position_id(order, None);
1337 let position = if let Some(venue_position_id) = venue_position_id {
1338 let cache = self.cache.as_ref().borrow();
1339 cache.position(&venue_position_id).cloned()
1340 } else {
1341 None
1342 };
1343
1344 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1345 log::warn!(
1346 "Canceling REDUCE_ONLY {} as would increase position",
1347 order.order_type()
1348 );
1349 self.cancel_order(order, None);
1350 return;
1351 }
1352
1353 let fills = self.determine_limit_price_and_volume(order);
1354
1355 self.apply_fills(
1356 order,
1357 fills,
1358 order.liquidity_side().unwrap(),
1359 venue_position_id,
1360 position,
1361 );
1362 }
1363 None => panic!("Limit order must have a price"),
1364 }
1365 }
1366
1367 fn apply_fills(
1368 &mut self,
1369 order: &mut OrderAny,
1370 fills: Vec<(Price, Quantity)>,
1371 liquidity_side: LiquiditySide,
1372 venue_position_id: Option<PositionId>,
1373 position: Option<Position>,
1374 ) {
1375 if order.time_in_force() == TimeInForce::Fok {
1376 let mut total_size = Quantity::zero(order.quantity().precision);
1377 for (fill_px, fill_qty) in &fills {
1378 total_size = total_size.add(*fill_qty);
1379 }
1380
1381 if order.leaves_qty() > total_size {
1382 self.cancel_order(order, None);
1383 return;
1384 }
1385 }
1386
1387 if fills.is_empty() {
1388 if order.status() == OrderStatus::Submitted {
1389 self.generate_order_rejected(
1390 order,
1391 format!("No market for {}", order.instrument_id()).into(),
1392 );
1393 } else {
1394 log::error!(
1395 "Cannot fill order: no fills from book when fills were expected (check size in data)"
1396 );
1397 return;
1398 }
1399 }
1400
1401 if self.oms_type == OmsType::Netting {
1402 let venue_position_id: Option<PositionId> = None;
1403 }
1404
1405 let mut initial_market_to_limit_fill = false;
1406 for &(mut fill_px, ref fill_qty) in &fills {
1407 assert!(
1409 (fill_px.precision == self.instrument.price_precision()),
1410 "Invalid price precision for fill price {} when instrument price precision is {}.\
1411 Check that the data price precision matches the {} instrument",
1412 fill_px.precision,
1413 self.instrument.price_precision(),
1414 self.instrument.id()
1415 );
1416
1417 assert!(
1419 (fill_qty.precision == self.instrument.size_precision()),
1420 "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
1421 Check that the data quantity precision matches the {} instrument",
1422 fill_qty.precision,
1423 self.instrument.size_precision(),
1424 self.instrument.id()
1425 );
1426
1427 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
1428 && order.order_type() == OrderType::MarketToLimit
1429 {
1430 self.generate_order_updated(order, order.quantity(), Some(fill_px), None);
1431 initial_market_to_limit_fill = true;
1432 }
1433
1434 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
1435 fill_px = match order.order_side().as_specified() {
1436 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
1437 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
1438 }
1439 }
1440
1441 if self.config.use_reduce_only && order.is_reduce_only() {
1443 if let Some(position) = &position {
1444 if *fill_qty > position.quantity {
1445 if position.quantity == Quantity::zero(position.quantity.precision) {
1446 return;
1448 }
1449
1450 let adjusted_fill_qty =
1452 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
1453
1454 self.generate_order_updated(order, adjusted_fill_qty, None, None);
1455 }
1456 }
1457 }
1458
1459 if fill_qty.is_zero() {
1460 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
1461 self.generate_order_rejected(
1462 order,
1463 format!("No market for {}", order.instrument_id()).into(),
1464 );
1465 }
1466 return;
1467 }
1468
1469 self.fill_order(
1470 order,
1471 fill_px,
1472 *fill_qty,
1473 liquidity_side,
1474 venue_position_id,
1475 position.clone(),
1476 );
1477
1478 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
1479 return;
1481 }
1482 }
1483
1484 if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
1485 self.cancel_order(order, None);
1487 return;
1488 }
1489
1490 if order.is_open()
1491 && self.book_type == BookType::L1_MBP
1492 && matches!(
1493 order.order_type(),
1494 OrderType::Market
1495 | OrderType::MarketIfTouched
1496 | OrderType::StopMarket
1497 | OrderType::TrailingStopMarket
1498 )
1499 {
1500 todo!("Exhausted simulated book volume")
1504 }
1505 }
1506
1507 fn fill_order(
1508 &mut self,
1509 order: &mut OrderAny,
1510 last_px: Price,
1511 last_qty: Quantity,
1512 liquidity_side: LiquiditySide,
1513 venue_position_id: Option<PositionId>,
1514 position: Option<Position>,
1515 ) {
1516 match self.cached_filled_qty.get(&order.client_order_id()) {
1517 Some(filled_qty) => {
1518 let leaves_qty = order.quantity() - *filled_qty;
1519 let last_qty = min(last_qty, leaves_qty);
1520 let new_filled_qty = *filled_qty + last_qty;
1521 self.cached_filled_qty
1523 .insert(order.client_order_id(), new_filled_qty);
1524 }
1525 None => {
1526 self.cached_filled_qty
1527 .insert(order.client_order_id(), last_qty);
1528 }
1529 }
1530
1531 let commission = self
1533 .fee_model
1534 .get_commission(order, last_qty, last_px, &self.instrument)
1535 .unwrap();
1536
1537 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1538 self.generate_order_filled(
1539 order,
1540 venue_order_id,
1541 venue_position_id,
1542 last_qty,
1543 last_px,
1544 self.instrument.quote_currency(),
1545 commission,
1546 liquidity_side,
1547 );
1548
1549 if order.is_passive() && order.is_closed() {
1550 if self.core.order_exists(order.client_order_id()) {
1552 let _ = self
1553 .core
1554 .delete_order(&PassiveOrderAny::from(order.clone()));
1555 }
1556 self.cached_filled_qty.remove(&order.client_order_id());
1557 }
1558
1559 if !self.config.support_contingent_orders {
1560 return;
1561 }
1562
1563 if let Some(contingency_type) = order.contingency_type() {
1564 match contingency_type {
1565 ContingencyType::Oto => {
1566 if let Some(linked_orders_ids) = order.linked_order_ids() {
1567 for client_order_id in linked_orders_ids {
1568 let mut child_order = match self.cache.borrow().order(client_order_id) {
1569 Some(child_order) => child_order.clone(),
1570 None => panic!("Order {client_order_id} not found in cache"),
1571 };
1572
1573 if child_order.is_closed() || child_order.is_active_local() {
1574 continue;
1575 }
1576
1577 if let (None, Some(position_id)) =
1579 (child_order.position_id(), order.position_id())
1580 {
1581 self.cache
1582 .borrow_mut()
1583 .add_position_id(
1584 &position_id,
1585 &self.venue,
1586 client_order_id,
1587 &child_order.strategy_id(),
1588 )
1589 .unwrap();
1590 log::debug!(
1591 "Added position id {position_id} to cache for order {client_order_id}"
1592 );
1593 }
1594
1595 if (!child_order.is_open())
1596 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
1597 && child_order
1598 .previous_status()
1599 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
1600 {
1601 let account_id = order.account_id().unwrap_or_else(|| {
1602 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
1603 panic!(
1604 "Account ID not found for trader {}",
1605 order.trader_id()
1606 )
1607 })
1608 });
1609 self.process_order(&mut child_order, account_id);
1610 }
1611 }
1612 } else {
1613 log::error!(
1614 "OTO order {} does not have linked orders",
1615 order.client_order_id()
1616 );
1617 }
1618 }
1619 ContingencyType::Oco => {
1620 if let Some(linked_orders_ids) = order.linked_order_ids() {
1621 for client_order_id in linked_orders_ids {
1622 let child_order = match self.cache.borrow().order(client_order_id) {
1623 Some(child_order) => child_order.clone(),
1624 None => panic!("Order {client_order_id} not found in cache"),
1625 };
1626
1627 if child_order.is_closed() || child_order.is_active_local() {
1628 continue;
1629 }
1630
1631 self.cancel_order(&child_order, None);
1632 }
1633 } else {
1634 log::error!(
1635 "OCO order {} does not have linked orders",
1636 order.client_order_id()
1637 );
1638 }
1639 }
1640 ContingencyType::Ouo => {
1641 if let Some(linked_orders_ids) = order.linked_order_ids() {
1642 for client_order_id in linked_orders_ids {
1643 let mut child_order = match self.cache.borrow().order(client_order_id) {
1644 Some(child_order) => child_order.clone(),
1645 None => panic!("Order {client_order_id} not found in cache"),
1646 };
1647
1648 if child_order.is_active_local() {
1649 continue;
1650 }
1651
1652 if order.is_closed() && child_order.is_open() {
1653 self.cancel_order(&child_order, None);
1654 } else if !order.leaves_qty().is_zero()
1655 && order.leaves_qty() != child_order.leaves_qty()
1656 {
1657 let price = child_order.price();
1658 let trigger_price = child_order.trigger_price();
1659 self.update_order(
1660 &mut child_order,
1661 Some(order.leaves_qty()),
1662 price,
1663 trigger_price,
1664 Some(false),
1665 );
1666 }
1667 }
1668 } else {
1669 log::error!(
1670 "OUO order {} does not have linked orders",
1671 order.client_order_id()
1672 );
1673 }
1674 }
1675 _ => {}
1676 }
1677 }
1678 }
1679
1680 fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
1681 if self
1682 .core
1683 .is_limit_matched(order.order_side_specified(), price)
1684 {
1685 if order.is_post_only() {
1686 self.generate_order_modify_rejected(
1687 order.trader_id(),
1688 order.strategy_id(),
1689 order.instrument_id(),
1690 order.client_order_id(),
1691 Ustr::from(format!(
1692 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1693 order.order_type(),
1694 order.order_side(),
1695 price,
1696 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1697 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1698 ).as_str()),
1699 order.venue_order_id(),
1700 order.account_id(),
1701 );
1702 return;
1703 }
1704
1705 self.generate_order_updated(order, quantity, Some(price), None);
1706 order.set_liquidity_side(LiquiditySide::Taker);
1707 self.fill_limit_order(order);
1708 return;
1709 }
1710 self.generate_order_updated(order, quantity, Some(price), None);
1711 }
1712
1713 fn update_stop_market_order(
1714 &mut self,
1715 order: &mut OrderAny,
1716 quantity: Quantity,
1717 trigger_price: Price,
1718 ) {
1719 if self
1720 .core
1721 .is_stop_matched(order.order_side_specified(), trigger_price)
1722 {
1723 self.generate_order_modify_rejected(
1724 order.trader_id(),
1725 order.strategy_id(),
1726 order.instrument_id(),
1727 order.client_order_id(),
1728 Ustr::from(
1729 format!(
1730 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1731 order.order_type(),
1732 order.order_side(),
1733 trigger_price,
1734 self.core
1735 .bid
1736 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1737 self.core
1738 .ask
1739 .map_or_else(|| "None".to_string(), |p| p.to_string())
1740 )
1741 .as_str(),
1742 ),
1743 order.venue_order_id(),
1744 order.account_id(),
1745 );
1746 return;
1747 }
1748
1749 self.generate_order_updated(order, quantity, None, Some(trigger_price));
1750 }
1751
1752 fn update_stop_limit_order(
1753 &mut self,
1754 order: &mut OrderAny,
1755 quantity: Quantity,
1756 price: Price,
1757 trigger_price: Price,
1758 ) {
1759 if order.is_triggered().is_some_and(|t| t) {
1760 if self
1762 .core
1763 .is_limit_matched(order.order_side_specified(), price)
1764 {
1765 if order.is_post_only() {
1766 self.generate_order_modify_rejected(
1767 order.trader_id(),
1768 order.strategy_id(),
1769 order.instrument_id(),
1770 order.client_order_id(),
1771 Ustr::from(format!(
1772 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1773 order.order_type(),
1774 order.order_side(),
1775 price,
1776 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1777 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1778 ).as_str()),
1779 order.venue_order_id(),
1780 order.account_id(),
1781 );
1782 return;
1783 }
1784 self.generate_order_updated(order, quantity, Some(price), None);
1785 order.set_liquidity_side(LiquiditySide::Taker);
1786 self.fill_limit_order(order);
1787 return; }
1789 } else {
1790 if self
1792 .core
1793 .is_stop_matched(order.order_side_specified(), trigger_price)
1794 {
1795 self.generate_order_modify_rejected(
1796 order.trader_id(),
1797 order.strategy_id(),
1798 order.instrument_id(),
1799 order.client_order_id(),
1800 Ustr::from(
1801 format!(
1802 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1803 order.order_type(),
1804 order.order_side(),
1805 trigger_price,
1806 self.core
1807 .bid
1808 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1809 self.core
1810 .ask
1811 .map_or_else(|| "None".to_string(), |p| p.to_string())
1812 )
1813 .as_str(),
1814 ),
1815 order.venue_order_id(),
1816 order.account_id(),
1817 );
1818 return;
1819 }
1820 }
1821
1822 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1823 }
1824
1825 fn update_market_if_touched_order(
1826 &mut self,
1827 order: &mut OrderAny,
1828 quantity: Quantity,
1829 trigger_price: Price,
1830 ) {
1831 if self
1832 .core
1833 .is_touch_triggered(order.order_side_specified(), trigger_price)
1834 {
1835 self.generate_order_modify_rejected(
1836 order.trader_id(),
1837 order.strategy_id(),
1838 order.instrument_id(),
1839 order.client_order_id(),
1840 Ustr::from(
1841 format!(
1842 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1843 order.order_type(),
1844 order.order_side(),
1845 trigger_price,
1846 self.core
1847 .bid
1848 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1849 self.core
1850 .ask
1851 .map_or_else(|| "None".to_string(), |p| p.to_string())
1852 )
1853 .as_str(),
1854 ),
1855 order.venue_order_id(),
1856 order.account_id(),
1857 );
1858 return;
1860 }
1861
1862 self.generate_order_updated(order, quantity, None, Some(trigger_price));
1863 }
1864
1865 fn update_limit_if_touched_order(
1866 &mut self,
1867 order: &mut OrderAny,
1868 quantity: Quantity,
1869 price: Price,
1870 trigger_price: Price,
1871 ) {
1872 if order.is_triggered().is_some_and(|t| t) {
1873 if self
1875 .core
1876 .is_limit_matched(order.order_side_specified(), price)
1877 {
1878 if order.is_post_only() {
1879 self.generate_order_modify_rejected(
1880 order.trader_id(),
1881 order.strategy_id(),
1882 order.instrument_id(),
1883 order.client_order_id(),
1884 Ustr::from(format!(
1885 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1886 order.order_type(),
1887 order.order_side(),
1888 price,
1889 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1890 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1891 ).as_str()),
1892 order.venue_order_id(),
1893 order.account_id(),
1894 );
1895 return;
1897 }
1898 self.generate_order_updated(order, quantity, Some(price), None);
1899 order.set_liquidity_side(LiquiditySide::Taker);
1900 self.fill_limit_order(order);
1901 return;
1902 }
1903 } else {
1904 if self
1906 .core
1907 .is_touch_triggered(order.order_side_specified(), trigger_price)
1908 {
1909 self.generate_order_modify_rejected(
1910 order.trader_id(),
1911 order.strategy_id(),
1912 order.instrument_id(),
1913 order.client_order_id(),
1914 Ustr::from(
1915 format!(
1916 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1917 order.order_type(),
1918 order.order_side(),
1919 trigger_price,
1920 self.core
1921 .bid
1922 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1923 self.core
1924 .ask
1925 .map_or_else(|| "None".to_string(), |p| p.to_string())
1926 )
1927 .as_str(),
1928 ),
1929 order.venue_order_id(),
1930 order.account_id(),
1931 );
1932 return;
1933 }
1934 }
1935
1936 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1937 }
1938
1939 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1940 let (new_trigger_price, new_price) = trailing_stop_calculate(
1941 self.instrument.price_increment(),
1942 order,
1943 self.core.bid,
1944 self.core.ask,
1945 self.core.last,
1946 )
1947 .unwrap();
1948
1949 if new_trigger_price.is_none() && new_price.is_none() {
1950 return;
1951 }
1952
1953 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price);
1954 }
1955
1956 fn accept_order(&mut self, order: &mut OrderAny) {
1959 if order.is_closed() {
1960 return;
1962 }
1963 if order.status() != OrderStatus::Accepted {
1964 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1965 self.generate_order_accepted(order, venue_order_id);
1966
1967 if matches!(
1968 order.order_type(),
1969 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
1970 ) && order.trigger_price().is_none()
1971 {
1972 self.update_trailing_stop_order(order);
1973 }
1974 }
1975
1976 let _ = self.core.add_order(order.to_owned().into());
1977 }
1978
1979 fn expire_order(&mut self, order: &PassiveOrderAny) {
1980 if self.config.support_contingent_orders
1981 && order
1982 .contingency_type()
1983 .is_some_and(|c| c != ContingencyType::NoContingency)
1984 {
1985 self.cancel_contingent_orders(&OrderAny::from(order.clone()));
1986 }
1987
1988 self.generate_order_expired(&order.to_any());
1989 }
1990
1991 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
1992 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
1993 if order.is_active_local() {
1994 log::error!(
1995 "Cannot cancel an order with {} from the matching engine",
1996 order.status()
1997 );
1998 return;
1999 }
2000
2001 if self.core.order_exists(order.client_order_id()) {
2003 let _ = self
2004 .core
2005 .delete_order(&PassiveOrderAny::from(order.clone()));
2006 }
2007 self.cached_filled_qty.remove(&order.client_order_id());
2008
2009 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2010 self.generate_order_canceled(order, venue_order_id);
2011
2012 if self.config.support_contingent_orders
2013 && order.contingency_type().is_some()
2014 && order.contingency_type().unwrap() != ContingencyType::NoContingency
2015 && cancel_contingencies
2016 {
2017 self.cancel_contingent_orders(order);
2018 }
2019 }
2020
2021 fn update_order(
2022 &mut self,
2023 order: &mut OrderAny,
2024 quantity: Option<Quantity>,
2025 price: Option<Price>,
2026 trigger_price: Option<Price>,
2027 update_contingencies: Option<bool>,
2028 ) {
2029 let update_contingencies = update_contingencies.unwrap_or(true);
2030 let quantity = quantity.unwrap_or(order.quantity());
2031
2032 match order {
2033 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
2034 let price = price.unwrap_or(order.price().unwrap());
2035 self.update_limit_order(order, quantity, price);
2036 }
2037 OrderAny::StopMarket(_) => {
2038 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2039 self.update_stop_market_order(order, quantity, trigger_price);
2040 }
2041 OrderAny::StopLimit(_) => {
2042 let price = price.unwrap_or(order.price().unwrap());
2043 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2044 self.update_stop_limit_order(order, quantity, price, trigger_price);
2045 }
2046 OrderAny::MarketIfTouched(_) => {
2047 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2048 self.update_market_if_touched_order(order, quantity, trigger_price);
2049 }
2050 OrderAny::LimitIfTouched(_) => {
2051 let price = price.unwrap_or(order.price().unwrap());
2052 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2053 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2054 }
2055 OrderAny::TrailingStopMarket(_) => {
2056 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2057 self.update_market_if_touched_order(order, quantity, trigger_price);
2058 }
2059 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
2060 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
2061 let trigger_price =
2062 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
2063 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2064 }
2065 _ => {
2066 panic!(
2067 "Unsupported order type {} for update_order",
2068 order.order_type()
2069 );
2070 }
2071 }
2072
2073 if self.config.support_contingent_orders
2074 && order
2075 .contingency_type()
2076 .is_some_and(|c| c != ContingencyType::NoContingency)
2077 && update_contingencies
2078 {
2079 self.update_contingent_order(order);
2080 }
2081 }
2082
2083 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
2084 todo!("trigger_stop_order")
2085 }
2086
2087 fn update_contingent_order(&mut self, order: &OrderAny) {
2088 log::debug!("Updating OUO orders from {}", order.client_order_id());
2089 if let Some(linked_order_ids) = order.linked_order_ids() {
2090 for client_order_id in linked_order_ids {
2091 let mut child_order = match self.cache.borrow().order(client_order_id) {
2092 Some(order) => order.clone(),
2093 None => panic!("Order {client_order_id} not found in cache."),
2094 };
2095
2096 if child_order.is_active_local() {
2097 continue;
2098 }
2099
2100 if order.leaves_qty().is_zero() {
2101 self.cancel_order(&child_order, None);
2102 } else if child_order.leaves_qty() != order.leaves_qty() {
2103 let price = child_order.price();
2104 let trigger_price = child_order.trigger_price();
2105 self.update_order(
2106 &mut child_order,
2107 Some(order.leaves_qty()),
2108 price,
2109 trigger_price,
2110 Some(false),
2111 );
2112 }
2113 }
2114 }
2115 }
2116
2117 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
2118 if let Some(linked_order_ids) = order.linked_order_ids() {
2119 for client_order_id in linked_order_ids {
2120 let contingent_order = match self.cache.borrow().order(client_order_id) {
2121 Some(order) => order.clone(),
2122 None => panic!("Cannot find contingent order for {client_order_id}"),
2123 };
2124 if contingent_order.is_active_local() {
2125 continue;
2127 }
2128 if !contingent_order.is_closed() {
2129 self.cancel_order(&contingent_order, Some(false));
2130 }
2131 }
2132 }
2133 }
2134
2135 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
2138 let ts_now = self.clock.borrow().timestamp_ns();
2139 let account_id = order
2140 .account_id()
2141 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2142
2143 let event = OrderEventAny::Rejected(OrderRejected::new(
2144 order.trader_id(),
2145 order.strategy_id(),
2146 order.instrument_id(),
2147 order.client_order_id(),
2148 account_id,
2149 reason,
2150 UUID4::new(),
2151 ts_now,
2152 ts_now,
2153 false,
2154 ));
2155 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2156 }
2157
2158 fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
2159 let ts_now = self.clock.borrow().timestamp_ns();
2160 let account_id = order
2161 .account_id()
2162 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2163 let event = OrderEventAny::Accepted(OrderAccepted::new(
2164 order.trader_id(),
2165 order.strategy_id(),
2166 order.instrument_id(),
2167 order.client_order_id(),
2168 venue_order_id,
2169 account_id,
2170 UUID4::new(),
2171 ts_now,
2172 ts_now,
2173 false,
2174 ));
2175 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2176
2177 order.apply(event).expect("Failed to apply order event");
2179 }
2180
2181 #[allow(clippy::too_many_arguments)]
2182 fn generate_order_modify_rejected(
2183 &self,
2184 trader_id: TraderId,
2185 strategy_id: StrategyId,
2186 instrument_id: InstrumentId,
2187 client_order_id: ClientOrderId,
2188 reason: Ustr,
2189 venue_order_id: Option<VenueOrderId>,
2190 account_id: Option<AccountId>,
2191 ) {
2192 let ts_now = self.clock.borrow().timestamp_ns();
2193 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
2194 trader_id,
2195 strategy_id,
2196 instrument_id,
2197 client_order_id,
2198 reason,
2199 UUID4::new(),
2200 ts_now,
2201 ts_now,
2202 false,
2203 venue_order_id,
2204 account_id,
2205 ));
2206 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2207 }
2208
2209 #[allow(clippy::too_many_arguments)]
2210 fn generate_order_cancel_rejected(
2211 &self,
2212 trader_id: TraderId,
2213 strategy_id: StrategyId,
2214 account_id: AccountId,
2215 instrument_id: InstrumentId,
2216 client_order_id: ClientOrderId,
2217 venue_order_id: VenueOrderId,
2218 reason: Ustr,
2219 ) {
2220 let ts_now = self.clock.borrow().timestamp_ns();
2221 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
2222 trader_id,
2223 strategy_id,
2224 instrument_id,
2225 client_order_id,
2226 reason,
2227 UUID4::new(),
2228 ts_now,
2229 ts_now,
2230 false,
2231 Some(venue_order_id),
2232 Some(account_id),
2233 ));
2234 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2235 }
2236
2237 fn generate_order_updated(
2238 &self,
2239 order: &mut OrderAny,
2240 quantity: Quantity,
2241 price: Option<Price>,
2242 trigger_price: Option<Price>,
2243 ) {
2244 let ts_now = self.clock.borrow().timestamp_ns();
2245 let event = OrderEventAny::Updated(OrderUpdated::new(
2246 order.trader_id(),
2247 order.strategy_id(),
2248 order.instrument_id(),
2249 order.client_order_id(),
2250 quantity,
2251 UUID4::new(),
2252 ts_now,
2253 ts_now,
2254 false,
2255 order.venue_order_id(),
2256 order.account_id(),
2257 price,
2258 trigger_price,
2259 ));
2260 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2261
2262 order.apply(event).expect("Failed to apply order event");
2264 }
2265
2266 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
2267 let ts_now = self.clock.borrow().timestamp_ns();
2268 let event = OrderEventAny::Canceled(OrderCanceled::new(
2269 order.trader_id(),
2270 order.strategy_id(),
2271 order.instrument_id(),
2272 order.client_order_id(),
2273 UUID4::new(),
2274 ts_now,
2275 ts_now,
2276 false,
2277 Some(venue_order_id),
2278 order.account_id(),
2279 ));
2280 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2281 }
2282
2283 fn generate_order_triggered(&self, order: &OrderAny) {
2284 let ts_now = self.clock.borrow().timestamp_ns();
2285 let event = OrderEventAny::Triggered(OrderTriggered::new(
2286 order.trader_id(),
2287 order.strategy_id(),
2288 order.instrument_id(),
2289 order.client_order_id(),
2290 UUID4::new(),
2291 ts_now,
2292 ts_now,
2293 false,
2294 order.venue_order_id(),
2295 order.account_id(),
2296 ));
2297 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2298 }
2299
2300 fn generate_order_expired(&self, order: &OrderAny) {
2301 let ts_now = self.clock.borrow().timestamp_ns();
2302 let event = OrderEventAny::Expired(OrderExpired::new(
2303 order.trader_id(),
2304 order.strategy_id(),
2305 order.instrument_id(),
2306 order.client_order_id(),
2307 UUID4::new(),
2308 ts_now,
2309 ts_now,
2310 false,
2311 order.venue_order_id(),
2312 order.account_id(),
2313 ));
2314 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2315 }
2316
2317 #[allow(clippy::too_many_arguments)]
2318 fn generate_order_filled(
2319 &mut self,
2320 order: &mut OrderAny,
2321 venue_order_id: VenueOrderId,
2322 venue_position_id: Option<PositionId>,
2323 last_qty: Quantity,
2324 last_px: Price,
2325 quote_currency: Currency,
2326 commission: Money,
2327 liquidity_side: LiquiditySide,
2328 ) {
2329 let ts_now = self.clock.borrow().timestamp_ns();
2330 let account_id = order
2331 .account_id()
2332 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2333 let event = OrderEventAny::Filled(OrderFilled::new(
2334 order.trader_id(),
2335 order.strategy_id(),
2336 order.instrument_id(),
2337 order.client_order_id(),
2338 venue_order_id,
2339 account_id,
2340 self.ids_generator.generate_trade_id(),
2341 order.order_side(),
2342 order.order_type(),
2343 last_qty,
2344 last_px,
2345 quote_currency,
2346 liquidity_side,
2347 UUID4::new(),
2348 ts_now,
2349 ts_now,
2350 false,
2351 venue_position_id,
2352 Some(commission),
2353 ));
2354 msgbus::send("ExecEngine.process".into(), &event as &dyn Any);
2355
2356 order.apply(event).expect("Failed to apply order event");
2358 }
2359}