1pub mod config;
24
25use std::{
26 cell::{RefCell, RefMut},
27 collections::{HashMap, HashSet},
28 fmt::Debug,
29 rc::Rc,
30 time::SystemTime,
31};
32
33use config::ExecutionEngineConfig;
34use nautilus_common::{
35 cache::Cache,
36 clock::Clock,
37 generators::position_id::PositionIdGenerator,
38 logging::{CMD, EVT, RECV},
39 messages::execution::{
40 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
41 SubmitOrderList, TradingCommand,
42 },
43 msgbus::{
44 self, get_message_bus,
45 switchboard::{self},
46 },
47};
48use nautilus_core::UUID4;
49use nautilus_model::{
50 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
51 events::{
52 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
53 PositionOpened,
54 },
55 identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
56 instruments::{Instrument, InstrumentAny},
57 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
58 orders::{Order, OrderAny, OrderError},
59 position::Position,
60 types::{Money, Price, Quantity},
61};
62
63use crate::client::ExecutionClient;
64
65pub struct ExecutionEngine {
72 clock: Rc<RefCell<dyn Clock>>,
73 cache: Rc<RefCell<Cache>>,
74 clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
75 default_client: Option<Rc<dyn ExecutionClient>>,
76 routing_map: HashMap<Venue, ClientId>,
77 oms_overrides: HashMap<StrategyId, OmsType>,
78 external_order_claims: HashMap<InstrumentId, StrategyId>,
79 pos_id_generator: PositionIdGenerator,
80 config: ExecutionEngineConfig,
81}
82
83impl Debug for ExecutionEngine {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct(stringify!(ExecutionEngine))
86 .field("client_count", &self.clients.len())
87 .finish()
88 }
89}
90
91impl ExecutionEngine {
92 pub fn new(
94 clock: Rc<RefCell<dyn Clock>>,
95 cache: Rc<RefCell<Cache>>,
96 config: Option<ExecutionEngineConfig>,
97 ) -> Self {
98 let trader_id = get_message_bus().borrow().trader_id;
99 Self {
100 clock: clock.clone(),
101 cache,
102 clients: HashMap::new(),
103 default_client: None,
104 routing_map: HashMap::new(),
105 oms_overrides: HashMap::new(),
106 external_order_claims: HashMap::new(),
107 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
108 config: config.unwrap_or_default(),
109 }
110 }
111
112 #[must_use]
113 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
115 self.pos_id_generator.count(strategy_id)
116 }
117
118 #[must_use]
119 pub fn check_integrity(&self) -> bool {
121 self.cache.borrow_mut().check_integrity()
122 }
123
124 #[must_use]
125 pub fn check_connected(&self) -> bool {
127 self.clients.values().all(|c| c.is_connected())
128 }
129
130 #[must_use]
131 pub fn check_disconnected(&self) -> bool {
133 self.clients.values().all(|c| !c.is_connected())
134 }
135
136 #[must_use]
137 pub fn check_residuals(&self) -> bool {
139 self.cache.borrow().check_residuals()
140 }
141
142 #[must_use]
143 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
145 self.external_order_claims.keys().copied().collect()
146 }
147
148 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
156 if self.clients.contains_key(&client.client_id()) {
157 anyhow::bail!("Client already registered with ID {}", client.client_id());
158 }
159
160 self.routing_map.insert(client.venue(), client.client_id());
162
163 log::info!("Registered client {}", client.client_id());
164 self.clients.insert(client.client_id(), client);
165 Ok(())
166 }
167
168 pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
170 log::info!("Registered default client {}", client.client_id());
171 self.default_client = Some(client);
172 }
173
174 #[must_use]
175 pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
177 self.clients.get(client_id).cloned()
178 }
179
180 pub fn register_venue_routing(
186 &mut self,
187 client_id: ClientId,
188 venue: Venue,
189 ) -> anyhow::Result<()> {
190 if !self.clients.contains_key(&client_id) {
191 anyhow::bail!("No client registered with ID {client_id}");
192 }
193
194 self.routing_map.insert(venue, client_id);
195 log::info!("Set client {client_id} routing for {venue}");
196 Ok(())
197 }
198
199 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
208 if self.clients.remove(&client_id).is_some() {
209 self.routing_map
211 .retain(|_, mapped_id| mapped_id != &client_id);
212 log::info!("Deregistered client {client_id}");
213 Ok(())
214 } else {
215 anyhow::bail!("No client registered with ID {client_id}")
216 }
217 }
218
219 #[allow(clippy::await_holding_refcell_ref)]
222 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
228 let ts = SystemTime::now();
229
230 {
231 let mut cache = self.cache.borrow_mut();
232 cache.clear_index();
233 cache.cache_general()?;
234 self.cache.borrow_mut().cache_all().await?;
235 cache.build_index();
236 let _ = cache.check_integrity();
237
238 if self.config.manage_own_order_books {
239 for order in cache.orders(None, None, None, None) {
240 if order.is_closed() || !should_handle_own_book_order(order) {
241 continue;
242 }
243 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
244 own_book.add(order.to_own_book_order());
245 }
246 }
247 }
248
249 self.set_position_id_counts();
250
251 log::info!(
252 "Loaded cache in {}ms",
253 SystemTime::now()
254 .duration_since(ts)
255 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
256 .as_millis()
257 );
258
259 Ok(())
260 }
261
262 pub fn flush_db(&self) {
264 self.cache.borrow_mut().flush_db();
265 }
266
267 pub fn process(&mut self, event: &OrderEventAny) {
269 self.handle_event(event);
270 }
271
272 pub fn execute(&self, command: &TradingCommand) {
274 self.execute_command(command);
275 }
276
277 fn execute_command(&self, command: &TradingCommand) {
280 if self.config.debug {
281 log::debug!("{RECV}{CMD} {command:?}");
282 }
283
284 let client: Rc<dyn ExecutionClient> = if let Some(client) = self
285 .clients
286 .get(&command.client_id())
287 .or_else(|| {
288 self.routing_map
289 .get(&command.instrument_id().venue)
290 .and_then(|client_id| self.clients.get(client_id))
291 })
292 .or(self.default_client.as_ref())
293 {
294 client.clone()
295 } else {
296 log::error!(
297 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
298 command.client_id(),
299 command.instrument_id().venue,
300 );
301 return;
302 };
303
304 match command {
305 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
306 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
307 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
308 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
309 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
310 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
311 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
312 }
313 }
314
315 fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrder) {
316 let mut order = cmd.order.clone();
317 let client_order_id = order.client_order_id();
318 let instrument_id = order.instrument_id();
319
320 if !self.cache.borrow().order_exists(&client_order_id) {
322 {
324 let mut cache = self.cache.borrow_mut();
325 if let Err(e) =
326 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
327 {
328 log::error!("Error adding order to cache: {e}");
329 return;
330 }
331 }
332
333 if self.config.snapshot_orders {
334 self.create_order_state_snapshot(&order);
335 }
336 }
337
338 let instrument = {
340 let cache = self.cache.borrow();
341 if let Some(instrument) = cache.instrument(&instrument_id) {
342 instrument.clone()
343 } else {
344 log::error!(
345 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
346 );
347 return;
348 }
349 };
350
351 if !instrument.is_inverse() && order.is_quote_quantity() {
353 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
354
355 if let Some(price) = last_px {
356 let base_qty = instrument.get_base_quantity(order.quantity(), price);
357 self.set_order_base_qty(&mut order, base_qty);
358 } else {
359 self.deny_order(
360 &order,
361 &format!("no-price-to-convert-quote-qty {instrument_id}"),
362 );
363 return;
364 }
365 }
366
367 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
368 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
369 own_book.add(order.to_own_book_order());
370 }
371
372 if let Err(e) = client.submit_order(cmd) {
374 log::error!("Error submitting order to client: {e}");
375 self.deny_order(
376 &cmd.order,
377 &format!("failed-to-submit-order-to-client: {e}"),
378 );
379 }
380 }
381
382 fn handle_submit_order_list(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrderList) {
383 let orders = cmd.order_list.orders.clone();
384
385 let mut cache = self.cache.borrow_mut();
387 for order in &orders {
388 if !cache.order_exists(&order.client_order_id()) {
389 if let Err(e) =
390 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
391 {
392 log::error!("Error adding order to cache: {e}");
393 return;
394 }
395
396 if self.config.snapshot_orders {
397 self.create_order_state_snapshot(order);
398 }
399 }
400 }
401 drop(cache);
402
403 let cache = self.cache.borrow();
405 let instrument = if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
406 instrument
407 } else {
408 log::error!(
409 "Cannot handle submit order list: no instrument found for {}, {cmd}",
410 cmd.instrument_id,
411 );
412 return;
413 };
414
415 if !instrument.is_inverse() && cmd.order_list.orders[0].is_quote_quantity() {
417 let mut quote_qty = None;
418 let mut _last_px = None;
419
420 for order in &cmd.order_list.orders {
421 if !order.is_quote_quantity() {
422 continue; }
424
425 if Some(order.quantity()) != quote_qty {
426 _last_px =
427 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
428 quote_qty = Some(order.quantity());
429 }
430
431 }
445 }
446
447 if self.config.manage_own_order_books {
448 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
449 for order in &cmd.order_list.orders {
450 if should_handle_own_book_order(order) {
451 own_book.add(order.to_own_book_order());
452 }
453 }
454 }
455
456 if let Err(e) = client.submit_order_list(cmd) {
458 log::error!("Error submitting order list to client: {e}");
459 for order in &orders {
460 self.deny_order(
461 order,
462 &format!("failed-to-submit-order-list-to-client: {e}"),
463 );
464 }
465 }
466 }
467
468 fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, cmd: &ModifyOrder) {
469 if let Err(e) = client.modify_order(cmd) {
470 log::error!("Error modifying order: {e}");
471 }
472 }
473
474 fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelOrder) {
475 if let Err(e) = client.cancel_order(cmd) {
476 log::error!("Error canceling order: {e}");
477 }
478 }
479
480 fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelAllOrders) {
481 if let Err(e) = client.cancel_all_orders(cmd) {
482 log::error!("Error canceling all orders: {e}");
483 }
484 }
485
486 fn handle_batch_cancel_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &BatchCancelOrders) {
487 if let Err(e) = client.batch_cancel_orders(cmd) {
488 log::error!("Error batch canceling orders: {e}");
489 }
490 }
491
492 fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryOrder) {
493 if let Err(e) = client.query_order(cmd) {
494 log::error!("Error querying order: {e}");
495 }
496 }
497
498 fn create_order_state_snapshot(&self, order: &OrderAny) {
499 if self.config.debug {
500 log::debug!("Creating order state snapshot for {order}");
501 }
502
503 if self.cache.borrow().has_backing() {
504 if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
505 log::error!("Failed to snapshot order state: {e}");
506 return;
507 }
508 }
509
510 if get_message_bus().borrow().has_backing {
511 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
512 msgbus::publish(topic, order);
513 }
514 }
515
516 fn create_position_state_snapshot(&self, position: &Position) {
517 if self.config.debug {
518 log::debug!("Creating position state snapshot for {position}");
519 }
520
521 let topic = switchboard::get_positions_snapshots_topic(position.id);
527 msgbus::publish(topic, position);
528 }
529
530 fn handle_event(&mut self, event: &OrderEventAny) {
533 if self.config.debug {
534 log::debug!("{RECV}{EVT} {event:?}");
535 }
536
537 let client_order_id = event.client_order_id();
538 let cache = self.cache.borrow();
539 let mut order = if let Some(order) = cache.order(&client_order_id) {
540 order.clone()
541 } else {
542 log::warn!(
543 "Order with {} not found in the cache to apply {}",
544 event.client_order_id(),
545 event
546 );
547
548 let venue_order_id = if let Some(id) = event.venue_order_id() {
550 id
551 } else {
552 log::error!(
553 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
554 event.client_order_id()
555 );
556 return;
557 };
558
559 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
561 id
562 } else {
563 log::error!(
564 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
565 event.client_order_id(),
566 );
567 return;
568 };
569
570 if let Some(order) = cache.order(client_order_id) {
572 log::info!("Order with {client_order_id} was found in the cache");
573 order.clone()
574 } else {
575 log::error!(
576 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
577 );
578 return;
579 }
580 };
581
582 drop(cache);
583 match event {
584 OrderEventAny::Filled(fill) => {
585 let oms_type = self.determine_oms_type(fill);
586 let position_id = self.determine_position_id(*fill, oms_type);
587
588 let mut fill = *fill;
590 if fill.position_id.is_none() {
591 fill.position_id = Some(position_id);
592 }
593
594 self.apply_event_to_order(&mut order, OrderEventAny::Filled(fill));
595 self.handle_order_fill(&order, fill, oms_type);
596 }
597 _ => {
598 self.apply_event_to_order(&mut order, event.clone());
599 }
600 }
601 }
602
603 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
604 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
606 return *oms_type;
607 }
608
609 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
611 if let Some(client) = self.clients.get(client_id) {
612 return client.oms_type();
613 }
614 }
615
616 if let Some(client) = &self.default_client {
617 return client.oms_type();
618 }
619
620 OmsType::Netting }
622
623 fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
624 match oms_type {
625 OmsType::Hedging => self.determine_hedging_position_id(fill),
626 OmsType::Netting => self.determine_netting_position_id(fill),
627 _ => self.determine_netting_position_id(fill), }
629 }
630
631 fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
632 if let Some(position_id) = fill.position_id {
634 if self.config.debug {
635 log::debug!("Already had a position ID of: {position_id}");
636 }
637 return position_id;
638 }
639
640 let cache = self.cache.borrow();
642 let order = match cache.order(&fill.client_order_id()) {
643 Some(o) => o,
644 None => {
645 panic!(
646 "Order for {} not found to determine position ID",
647 fill.client_order_id()
648 );
649 }
650 };
651
652 if let Some(spawn_id) = order.exec_spawn_id() {
654 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
655 for spawned_order in spawn_orders {
656 if let Some(pos_id) = spawned_order.position_id() {
657 if self.config.debug {
658 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
659 }
660 return pos_id;
661 }
662 }
663 }
664
665 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
667 if self.config.debug {
668 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
669 }
670 position_id
671 }
672
673 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
674 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
675 }
676
677 fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
678 if let Err(e) = order.apply(event.clone()) {
679 match e {
680 OrderError::InvalidStateTransition => {
681 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
682 }
683 _ => {
684 log::error!("Error applying event: {e}, did not apply {event}");
685 }
686 }
687 return;
688 }
689
690 if let Err(e) = self.cache.borrow_mut().update_order(order) {
691 log::error!("Error updating order in cache: {e}");
692 }
693
694 let topic = switchboard::get_event_orders_topic(event.strategy_id());
695 msgbus::publish(topic, order);
696
697 if self.config.snapshot_orders {
698 self.create_order_state_snapshot(order);
699 }
700 }
701
702 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
703 let instrument =
704 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
705 instrument.clone()
706 } else {
707 log::error!(
708 "Cannot handle order fill: no instrument found for {}, {fill}",
709 fill.instrument_id,
710 );
711 return;
712 };
713
714 if self.cache.borrow().account(&fill.account_id).is_none() {
715 log::error!(
716 "Cannot handle order fill: no account found for {}, {fill}",
717 fill.instrument_id.venue,
718 );
719 return;
720 }
721
722 let position_id = if let Some(position_id) = fill.position_id {
723 position_id
724 } else {
725 log::error!("Cannot handle order fill: no position ID found for fill {fill}");
726 return;
727 };
728
729 let mut position = match self.cache.borrow().position(&position_id) {
730 Some(pos) if !pos.is_closed() => pos.clone(),
731 _ => self
732 .open_position(instrument.clone(), None, fill, oms_type)
733 .unwrap(),
734 };
735
736 if self.will_flip_position(&position, fill) {
737 self.flip_position(instrument, &mut position, fill, oms_type);
738 } else {
739 self.update_position(&mut position, fill);
740 }
741
742 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
743 for client_order_id in order.linked_order_ids().unwrap_or_default() {
744 let mut cache = self.cache.borrow_mut();
745 let contingent_order = cache.mut_order(client_order_id);
746 if let Some(contingent_order) = contingent_order {
747 if contingent_order.position_id().is_none() {
748 contingent_order.set_position_id(Some(position_id));
749
750 if let Err(e) = self.cache.borrow_mut().add_position_id(
751 &position_id,
752 &contingent_order.instrument_id().venue,
753 &contingent_order.client_order_id(),
754 &contingent_order.strategy_id(),
755 ) {
756 log::error!("Failed to add position ID: {e}");
757 }
758 }
759 }
760 }
761 }
762 }
763
764 fn open_position(
765 &self,
766 instrument: InstrumentAny,
767 position: Option<&Position>,
768 fill: OrderFilled,
769 oms_type: OmsType,
770 ) -> anyhow::Result<Position> {
771 let position = if let Some(position) = position {
772 self.cache.borrow_mut().snapshot_position(position)?;
774 let mut position = position.clone();
775 position.apply(&fill);
776 self.cache.borrow_mut().update_position(&position)?;
777 position
778 } else {
779 let position = Position::new(&instrument, fill);
780 self.cache
781 .borrow_mut()
782 .add_position(position.clone(), oms_type)?;
783 if self.config.snapshot_positions {
784 self.create_position_state_snapshot(&position);
785 }
786 position
787 };
788
789 let ts_init = self.clock.borrow().timestamp_ns();
790 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
791 let topic = switchboard::get_event_positions_topic(event.strategy_id);
792 msgbus::publish(topic, &event);
793
794 Ok(position)
795 }
796
797 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
798 position.apply(&fill);
799
800 if let Err(e) = self.cache.borrow_mut().update_position(position) {
801 log::error!("Failed to update position: {e:?}");
802 return;
803 }
804
805 if self.config.snapshot_positions {
806 self.create_position_state_snapshot(position);
807 }
808
809 let topic = switchboard::get_event_positions_topic(position.strategy_id);
810 let ts_init = self.clock.borrow().timestamp_ns();
811
812 if position.is_closed() {
813 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
814 msgbus::publish(topic, &event);
815 } else {
816 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
817 msgbus::publish(topic, &event);
818 }
819 }
820
821 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
822 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
823 }
824
825 fn flip_position(
826 &mut self,
827 instrument: InstrumentAny,
828 position: &mut Position,
829 fill: OrderFilled,
830 oms_type: OmsType,
831 ) {
832 let difference = match position.side {
833 PositionSide::Long => Quantity::from_raw(
834 fill.last_qty.raw - position.quantity.raw,
835 position.size_precision,
836 ),
837 PositionSide::Short => Quantity::from_raw(
838 position.quantity.raw - fill.last_qty.raw,
839 position.size_precision,
840 ),
841 _ => fill.last_qty,
842 };
843
844 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
846 let (commission1, commission2) = if let Some(commission) = fill.commission {
847 let commission_currency = commission.currency;
848 let commission1 = Money::new(commission * fill_percent, commission_currency);
849 let commission2 = commission - commission1;
850 (Some(commission1), Some(commission2))
851 } else {
852 log::error!("Commission is not available.");
853 (None, None)
854 };
855
856 let mut fill_split1: Option<OrderFilled> = None;
857 if position.is_open() {
858 fill_split1 = Some(OrderFilled::new(
859 fill.trader_id,
860 fill.strategy_id,
861 fill.instrument_id,
862 fill.client_order_id,
863 fill.venue_order_id,
864 fill.account_id,
865 fill.trade_id,
866 fill.order_side,
867 fill.order_type,
868 position.quantity,
869 fill.last_px,
870 fill.currency,
871 fill.liquidity_side,
872 UUID4::new(),
873 fill.ts_event,
874 fill.ts_init,
875 fill.reconciliation,
876 fill.position_id,
877 commission1,
878 ));
879
880 self.update_position(position, fill_split1.unwrap());
881 }
882
883 if difference.raw == 0 {
885 log::warn!(
886 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
887 );
888 return;
889 }
890
891 let position_id_flip = if oms_type == OmsType::Hedging {
892 if let Some(position_id) = fill.position_id {
893 if position_id.is_virtual() {
894 Some(self.pos_id_generator.generate(fill.strategy_id, true))
896 } else {
897 Some(position_id)
898 }
899 } else {
900 None
901 }
902 } else {
903 fill.position_id
904 };
905
906 let fill_split2 = OrderFilled::new(
907 fill.trader_id,
908 fill.strategy_id,
909 fill.instrument_id,
910 fill.client_order_id,
911 fill.venue_order_id,
912 fill.account_id,
913 fill.trade_id,
914 fill.order_side,
915 fill.order_type,
916 difference,
917 fill.last_px,
918 fill.currency,
919 fill.liquidity_side,
920 UUID4::new(),
921 fill.ts_event,
922 fill.ts_init,
923 fill.reconciliation,
924 position_id_flip,
925 commission2,
926 );
927
928 if oms_type == OmsType::Hedging {
929 if let Some(position_id) = fill.position_id {
930 if position_id.is_virtual() {
931 log::warn!("Closing position {fill_split1:?}");
932 log::warn!("Flipping position {fill_split2:?}");
933 }
934 }
935 }
936
937 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
939 log::error!("Failed to open flipped position: {e:?}");
940 }
941 }
942
943 fn set_position_id_counts(&mut self) {
946 let cache = self.cache.borrow();
948 let positions = cache.positions(None, None, None, None);
949
950 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
952
953 for position in positions {
954 *counts.entry(position.strategy_id).or_insert(0) += 1;
955 }
956
957 self.pos_id_generator.reset();
958
959 for (strategy_id, count) in counts {
960 self.pos_id_generator.set_count(count, strategy_id);
961 log::info!("Set PositionId count for {strategy_id} to {count}");
962 }
963 }
964
965 fn last_px_for_conversion(
966 &self,
967 instrument_id: &InstrumentId,
968 side: OrderSide,
969 ) -> Option<Price> {
970 let cache = self.cache.borrow();
971
972 if let Some(trade) = cache.trade(instrument_id) {
974 return Some(trade.price);
975 }
976
977 if let Some(quote) = cache.quote(instrument_id) {
979 match side {
980 OrderSide::Buy => Some(quote.ask_price),
981 OrderSide::Sell => Some(quote.bid_price),
982 OrderSide::NoOrderSide => None,
983 }
984 } else {
985 None
986 }
987 }
988
989 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
990 log::info!(
991 "Setting {} order quote quantity {} to base quantity {}",
992 order.instrument_id(),
993 order.quantity(),
994 base_qty
995 );
996
997 let original_qty = order.quantity();
998 order.set_quantity(base_qty);
999 order.set_leaves_qty(base_qty);
1000 order.set_is_quote_quantity(false);
1001
1002 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1003 return;
1004 }
1005
1006 if let Some(linked_order_ids) = order.linked_order_ids() {
1007 for client_order_id in linked_order_ids {
1008 match self.cache.borrow_mut().mut_order(client_order_id) {
1009 Some(contingent_order) => {
1010 if !contingent_order.is_quote_quantity() {
1011 continue; }
1013
1014 if contingent_order.quantity() != original_qty {
1015 log::warn!(
1016 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1017 contingent_order.quantity(),
1018 original_qty,
1019 base_qty
1020 );
1021 }
1022
1023 log::info!(
1024 "Setting {} order quote quantity {} to base quantity {}",
1025 contingent_order.instrument_id(),
1026 contingent_order.quantity(),
1027 base_qty
1028 );
1029
1030 contingent_order.set_quantity(base_qty);
1031 contingent_order.set_leaves_qty(base_qty);
1032 contingent_order.set_is_quote_quantity(false);
1033 }
1034 None => {
1035 log::error!("Contingency order {client_order_id} not found");
1036 }
1037 }
1038 }
1039 } else {
1040 log::warn!(
1041 "No linked order IDs found for order {}",
1042 order.client_order_id()
1043 );
1044 }
1045 }
1046
1047 fn deny_order(&self, order: &OrderAny, reason: &str) {
1048 log::error!(
1049 "Order denied: {reason}, order ID: {}",
1050 order.client_order_id()
1051 );
1052
1053 let denied = OrderDenied::new(
1054 order.trader_id(),
1055 order.strategy_id(),
1056 order.instrument_id(),
1057 order.client_order_id(),
1058 reason.into(),
1059 UUID4::new(),
1060 self.clock.borrow().timestamp_ns(),
1061 self.clock.borrow().timestamp_ns(),
1062 );
1063
1064 let mut order = order.clone();
1065
1066 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1067 log::error!("Failed to apply denied event to order: {e}");
1068 return;
1069 }
1070
1071 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1072 log::error!("Failed to update order in cache: {e}");
1073 return;
1074 }
1075
1076 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1077 msgbus::publish(topic, &denied);
1078
1079 if self.config.snapshot_orders {
1080 self.create_order_state_snapshot(&order);
1081 }
1082 }
1083
1084 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1085 let mut cache = self.cache.borrow_mut();
1086 if cache.own_order_book_mut(instrument_id).is_none() {
1087 let own_book = OwnOrderBook::new(*instrument_id);
1088 cache.add_own_order_book(own_book).unwrap();
1089 }
1090
1091 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1092 }
1093}
1094
1095#[cfg(test)]
1099mod tests {
1100 use std::{cell::RefCell, rc::Rc};
1101
1102 use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1103 use rstest::fixture;
1104
1105 use super::*;
1106
1107 #[fixture]
1108 fn msgbus() -> MessageBus {
1109 MessageBus::default()
1110 }
1111
1112 #[fixture]
1113 fn simple_cache() -> Cache {
1114 Cache::new(None, None)
1115 }
1116
1117 #[fixture]
1118 fn clock() -> TestClock {
1119 TestClock::new()
1120 }
1121
1122 fn _get_exec_engine(
1124 cache: Rc<RefCell<Cache>>,
1125 clock: Rc<RefCell<TestClock>>,
1126 config: Option<ExecutionEngineConfig>,
1127 ) -> ExecutionEngine {
1128 ExecutionEngine::new(clock, cache, config)
1129 }
1130
1131 }