1pub mod config;
24
25use std::{
26 cell::{RefCell, RefMut},
27 collections::{HashMap, HashSet},
28 fmt::Debug,
29 rc::Rc,
30 time::SystemTime,
31};
32
33use config::ExecutionEngineConfig;
34use nautilus_common::{
35 cache::Cache,
36 clock::Clock,
37 generators::position_id::PositionIdGenerator,
38 logging::{CMD, EVT, RECV},
39 messages::execution::{
40 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
41 SubmitOrderList, TradingCommand,
42 },
43 msgbus::{
44 self, get_message_bus,
45 switchboard::{self},
46 },
47};
48use nautilus_core::UUID4;
49use nautilus_model::{
50 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
51 events::{
52 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
53 PositionOpened,
54 },
55 identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
56 instruments::{Instrument, InstrumentAny},
57 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
58 orders::{Order, OrderAny, OrderError},
59 position::Position,
60 types::{Money, Price, Quantity},
61};
62
63use crate::client::ExecutionClient;
64
65pub struct ExecutionEngine {
66 clock: Rc<RefCell<dyn Clock>>,
67 cache: Rc<RefCell<Cache>>,
68 clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
69 default_client: Option<Rc<dyn ExecutionClient>>,
70 routing_map: HashMap<Venue, ClientId>,
71 oms_overrides: HashMap<StrategyId, OmsType>,
72 external_order_claims: HashMap<InstrumentId, StrategyId>,
73 pos_id_generator: PositionIdGenerator,
74 config: ExecutionEngineConfig,
75}
76
77impl Debug for ExecutionEngine {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct(stringify!(ExecutionEngine))
80 .field("client_count", &self.clients.len())
81 .finish()
82 }
83}
84
85impl ExecutionEngine {
86 pub fn new(
87 clock: Rc<RefCell<dyn Clock>>,
88 cache: Rc<RefCell<Cache>>,
89 config: Option<ExecutionEngineConfig>,
90 ) -> Self {
91 let trader_id = get_message_bus().borrow().trader_id;
92 Self {
93 clock: clock.clone(),
94 cache,
95 clients: HashMap::new(),
96 default_client: None,
97 routing_map: HashMap::new(),
98 oms_overrides: HashMap::new(),
99 external_order_claims: HashMap::new(),
100 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
101 config: config.unwrap_or_default(),
102 }
103 }
104
105 #[must_use]
106 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
107 self.pos_id_generator.count(strategy_id)
108 }
109
110 #[must_use]
111 pub fn check_integrity(&self) -> bool {
112 self.cache.borrow_mut().check_integrity()
113 }
114
115 #[must_use]
116 pub fn check_connected(&self) -> bool {
117 self.clients.values().all(|c| c.is_connected())
118 }
119
120 #[must_use]
121 pub fn check_disconnected(&self) -> bool {
122 self.clients.values().all(|c| !c.is_connected())
123 }
124
125 #[must_use]
126 pub fn check_residuals(&self) -> bool {
127 self.cache.borrow().check_residuals()
128 }
129
130 #[must_use]
131 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
132 self.external_order_claims.keys().copied().collect()
133 }
134
135 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
143 if self.clients.contains_key(&client.client_id()) {
144 anyhow::bail!("Client already registered with ID {}", client.client_id());
145 }
146
147 self.routing_map.insert(client.venue(), client.client_id());
149
150 log::info!("Registered client {}", client.client_id());
151 self.clients.insert(client.client_id(), client);
152 Ok(())
153 }
154
155 pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
156 log::info!("Registered default client {}", client.client_id());
157 self.default_client = Some(client);
158 }
159
160 #[must_use]
161 pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
162 self.clients.get(client_id).cloned()
163 }
164
165 pub fn register_venue_routing(
171 &mut self,
172 client_id: ClientId,
173 venue: Venue,
174 ) -> anyhow::Result<()> {
175 if !self.clients.contains_key(&client_id) {
176 anyhow::bail!("No client registered with ID {client_id}");
177 }
178
179 self.routing_map.insert(venue, client_id);
180 log::info!("Set client {client_id} routing for {venue}");
181 Ok(())
182 }
183
184 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
193 if self.clients.remove(&client_id).is_some() {
194 self.routing_map
196 .retain(|_, mapped_id| mapped_id != &client_id);
197 log::info!("Deregistered client {client_id}");
198 Ok(())
199 } else {
200 anyhow::bail!("No client registered with ID {client_id}")
201 }
202 }
203
204 #[allow(clippy::await_holding_refcell_ref)]
207 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
213 let ts = SystemTime::now();
214
215 {
216 let mut cache = self.cache.borrow_mut();
217 cache.clear_index();
218 cache.cache_general()?;
219 self.cache.borrow_mut().cache_all().await?;
220 cache.build_index();
221 let _ = cache.check_integrity();
222
223 if self.config.manage_own_order_books {
224 for order in cache.orders(None, None, None, None) {
225 if order.is_closed() || !should_handle_own_book_order(order) {
226 continue;
227 }
228 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
229 own_book.add(order.to_own_book_order());
230 }
231 }
232 }
233
234 self.set_position_id_counts();
235
236 log::info!(
237 "Loaded cache in {}ms",
238 SystemTime::now()
239 .duration_since(ts)
240 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
241 .as_millis()
242 );
243
244 Ok(())
245 }
246
247 pub fn flush_db(&self) {
248 self.cache.borrow_mut().flush_db();
249 }
250
251 pub fn process(&mut self, event: &OrderEventAny) {
252 self.handle_event(event);
253 }
254
255 pub fn execute(&self, command: &TradingCommand) {
256 self.execute_command(command);
257 }
258
259 fn execute_command(&self, command: &TradingCommand) {
262 if self.config.debug {
263 log::debug!("{RECV}{CMD} {command:?}");
264 }
265
266 let client: Rc<dyn ExecutionClient> = if let Some(client) = self
267 .clients
268 .get(&command.client_id())
269 .or_else(|| {
270 self.routing_map
271 .get(&command.instrument_id().venue)
272 .and_then(|client_id| self.clients.get(client_id))
273 })
274 .or(self.default_client.as_ref())
275 {
276 client.clone()
277 } else {
278 log::error!(
279 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
280 command.client_id(),
281 command.instrument_id().venue,
282 );
283 return;
284 };
285
286 match command {
287 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
288 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
289 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
290 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
291 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
292 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
293 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
294 }
295 }
296
297 fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrder) {
298 let mut order = cmd.order.clone();
299 let client_order_id = order.client_order_id();
300 let instrument_id = order.instrument_id();
301
302 if !self.cache.borrow().order_exists(&client_order_id) {
304 {
306 let mut cache = self.cache.borrow_mut();
307 if let Err(e) =
308 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
309 {
310 log::error!("Error adding order to cache: {e}");
311 return;
312 }
313 }
314
315 if self.config.snapshot_orders {
316 self.create_order_state_snapshot(&order);
317 }
318 }
319
320 let instrument = {
322 let cache = self.cache.borrow();
323 if let Some(instrument) = cache.instrument(&instrument_id) {
324 instrument.clone()
325 } else {
326 log::error!(
327 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
328 );
329 return;
330 }
331 };
332
333 if !instrument.is_inverse() && order.is_quote_quantity() {
335 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
336
337 if let Some(price) = last_px {
338 let base_qty = instrument.get_base_quantity(order.quantity(), price);
339 self.set_order_base_qty(&mut order, base_qty);
340 } else {
341 self.deny_order(
342 &order,
343 &format!("no-price-to-convert-quote-qty {instrument_id}"),
344 );
345 return;
346 }
347 }
348
349 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
350 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
351 own_book.add(order.to_own_book_order());
352 }
353
354 if let Err(e) = client.submit_order(cmd) {
356 log::error!("Error submitting order to client: {e}");
357 self.deny_order(
358 &cmd.order,
359 &format!("failed-to-submit-order-to-client: {e}"),
360 );
361 }
362 }
363
364 fn handle_submit_order_list(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrderList) {
365 let orders = cmd.order_list.orders.clone();
366
367 let mut cache = self.cache.borrow_mut();
369 for order in &orders {
370 if !cache.order_exists(&order.client_order_id()) {
371 if let Err(e) =
372 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
373 {
374 log::error!("Error adding order to cache: {e}");
375 return;
376 }
377
378 if self.config.snapshot_orders {
379 self.create_order_state_snapshot(order);
380 }
381 }
382 }
383 drop(cache);
384
385 let cache = self.cache.borrow();
387 let instrument = if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
388 instrument
389 } else {
390 log::error!(
391 "Cannot handle submit order list: no instrument found for {}, {cmd}",
392 cmd.instrument_id,
393 );
394 return;
395 };
396
397 if !instrument.is_inverse() && cmd.order_list.orders[0].is_quote_quantity() {
399 let mut quote_qty = None;
400 let mut _last_px = None;
401
402 for order in &cmd.order_list.orders {
403 if !order.is_quote_quantity() {
404 continue; }
406
407 if Some(order.quantity()) != quote_qty {
408 _last_px =
409 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
410 quote_qty = Some(order.quantity());
411 }
412
413 }
427 }
428
429 if self.config.manage_own_order_books {
430 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
431 for order in &cmd.order_list.orders {
432 if should_handle_own_book_order(order) {
433 own_book.add(order.to_own_book_order());
434 }
435 }
436 }
437
438 if let Err(e) = client.submit_order_list(cmd) {
440 log::error!("Error submitting order list to client: {e}");
441 for order in &orders {
442 self.deny_order(
443 order,
444 &format!("failed-to-submit-order-list-to-client: {e}"),
445 );
446 }
447 }
448 }
449
450 fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, cmd: &ModifyOrder) {
451 if let Err(e) = client.modify_order(cmd) {
452 log::error!("Error modifying order: {e}");
453 }
454 }
455
456 fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelOrder) {
457 if let Err(e) = client.cancel_order(cmd) {
458 log::error!("Error canceling order: {e}");
459 }
460 }
461
462 fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelAllOrders) {
463 if let Err(e) = client.cancel_all_orders(cmd) {
464 log::error!("Error canceling all orders: {e}");
465 }
466 }
467
468 fn handle_batch_cancel_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &BatchCancelOrders) {
469 if let Err(e) = client.batch_cancel_orders(cmd) {
470 log::error!("Error batch canceling orders: {e}");
471 }
472 }
473
474 fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryOrder) {
475 if let Err(e) = client.query_order(cmd) {
476 log::error!("Error querying order: {e}");
477 }
478 }
479
480 fn create_order_state_snapshot(&self, order: &OrderAny) {
481 if self.config.debug {
482 log::debug!("Creating order state snapshot for {order}");
483 }
484
485 if self.cache.borrow().has_backing() {
486 if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
487 log::error!("Failed to snapshot order state: {e}");
488 return;
489 }
490 }
491
492 if get_message_bus().borrow().has_backing {
493 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
494 msgbus::publish(topic, order);
495 }
496 }
497
498 fn create_position_state_snapshot(&self, position: &Position) {
499 if self.config.debug {
500 log::debug!("Creating position state snapshot for {position}");
501 }
502
503 let topic = switchboard::get_positions_snapshots_topic(position.id);
509 msgbus::publish(topic, position);
510 }
511
512 fn handle_event(&mut self, event: &OrderEventAny) {
515 if self.config.debug {
516 log::debug!("{RECV}{EVT} {event:?}");
517 }
518
519 let client_order_id = event.client_order_id();
520 let cache = self.cache.borrow();
521 let mut order = if let Some(order) = cache.order(&client_order_id) {
522 order.clone()
523 } else {
524 log::warn!(
525 "Order with {} not found in the cache to apply {}",
526 event.client_order_id(),
527 event
528 );
529
530 let venue_order_id = if let Some(id) = event.venue_order_id() {
532 id
533 } else {
534 log::error!(
535 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
536 event.client_order_id()
537 );
538 return;
539 };
540
541 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
543 id
544 } else {
545 log::error!(
546 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
547 event.client_order_id(),
548 );
549 return;
550 };
551
552 if let Some(order) = cache.order(client_order_id) {
554 log::info!("Order with {client_order_id} was found in the cache");
555 order.clone()
556 } else {
557 log::error!(
558 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
559 );
560 return;
561 }
562 };
563
564 drop(cache);
565 match event {
566 OrderEventAny::Filled(fill) => {
567 let oms_type = self.determine_oms_type(fill);
568 let position_id = self.determine_position_id(*fill, oms_type);
569
570 let mut fill = *fill;
572 if fill.position_id.is_none() {
573 fill.position_id = Some(position_id);
574 }
575
576 self.apply_event_to_order(&mut order, OrderEventAny::Filled(fill));
577 self.handle_order_fill(&order, fill, oms_type);
578 }
579 _ => {
580 self.apply_event_to_order(&mut order, event.clone());
581 }
582 }
583 }
584
585 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
586 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
588 return *oms_type;
589 }
590
591 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
593 if let Some(client) = self.clients.get(client_id) {
594 return client.oms_type();
595 }
596 }
597
598 if let Some(client) = &self.default_client {
599 return client.oms_type();
600 }
601
602 OmsType::Netting }
604
605 fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
606 match oms_type {
607 OmsType::Hedging => self.determine_hedging_position_id(fill),
608 OmsType::Netting => self.determine_netting_position_id(fill),
609 _ => self.determine_netting_position_id(fill), }
611 }
612
613 fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
614 if let Some(position_id) = fill.position_id {
616 if self.config.debug {
617 log::debug!("Already had a position ID of: {position_id}");
618 }
619 return position_id;
620 }
621
622 let cache = self.cache.borrow();
624 let order = match cache.order(&fill.client_order_id()) {
625 Some(o) => o,
626 None => {
627 panic!(
628 "Order for {} not found to determine position ID",
629 fill.client_order_id()
630 );
631 }
632 };
633
634 if let Some(spawn_id) = order.exec_spawn_id() {
636 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
637 for spawned_order in spawn_orders {
638 if let Some(pos_id) = spawned_order.position_id() {
639 if self.config.debug {
640 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
641 }
642 return pos_id;
643 }
644 }
645 }
646
647 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
649 if self.config.debug {
650 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
651 }
652 position_id
653 }
654
655 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
656 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
657 }
658
659 fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
660 if let Err(e) = order.apply(event.clone()) {
661 match e {
662 OrderError::InvalidStateTransition => {
663 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
664 }
665 _ => {
666 log::error!("Error applying event: {e}, did not apply {event}");
667 }
668 }
669 return;
670 }
671
672 if let Err(e) = self.cache.borrow_mut().update_order(order) {
673 log::error!("Error updating order in cache: {e}");
674 }
675
676 let topic = switchboard::get_event_orders_topic(event.strategy_id());
677 msgbus::publish(topic, order);
678
679 if self.config.snapshot_orders {
680 self.create_order_state_snapshot(order);
681 }
682 }
683
684 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
685 let instrument =
686 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
687 instrument.clone()
688 } else {
689 log::error!(
690 "Cannot handle order fill: no instrument found for {}, {fill}",
691 fill.instrument_id,
692 );
693 return;
694 };
695
696 if self.cache.borrow().account(&fill.account_id).is_none() {
697 log::error!(
698 "Cannot handle order fill: no account found for {}, {fill}",
699 fill.instrument_id.venue,
700 );
701 return;
702 }
703
704 let position_id = if let Some(position_id) = fill.position_id {
705 position_id
706 } else {
707 log::error!("Cannot handle order fill: no position ID found for fill {fill}");
708 return;
709 };
710
711 let mut position = match self.cache.borrow().position(&position_id) {
712 Some(pos) if !pos.is_closed() => pos.clone(),
713 _ => self
714 .open_position(instrument.clone(), None, fill, oms_type)
715 .unwrap(),
716 };
717
718 if self.will_flip_position(&position, fill) {
719 self.flip_position(instrument, &mut position, fill, oms_type);
720 } else {
721 self.update_position(&mut position, fill);
722 }
723
724 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
725 for client_order_id in order.linked_order_ids().unwrap_or_default() {
726 let mut cache = self.cache.borrow_mut();
727 let contingent_order = cache.mut_order(client_order_id);
728 if let Some(contingent_order) = contingent_order {
729 if contingent_order.position_id().is_none() {
730 contingent_order.set_position_id(Some(position_id));
731
732 if let Err(e) = self.cache.borrow_mut().add_position_id(
733 &position_id,
734 &contingent_order.instrument_id().venue,
735 &contingent_order.client_order_id(),
736 &contingent_order.strategy_id(),
737 ) {
738 log::error!("Failed to add position ID: {e}");
739 }
740 }
741 }
742 }
743 }
744 }
745
746 fn open_position(
747 &self,
748 instrument: InstrumentAny,
749 position: Option<&Position>,
750 fill: OrderFilled,
751 oms_type: OmsType,
752 ) -> anyhow::Result<Position> {
753 let position = if let Some(position) = position {
754 self.cache.borrow_mut().snapshot_position(position)?;
756 let mut position = position.clone();
757 position.apply(&fill);
758 self.cache.borrow_mut().update_position(&position)?;
759 position
760 } else {
761 let position = Position::new(&instrument, fill);
762 self.cache
763 .borrow_mut()
764 .add_position(position.clone(), oms_type)?;
765 if self.config.snapshot_positions {
766 self.create_position_state_snapshot(&position);
767 }
768 position
769 };
770
771 let ts_init = self.clock.borrow().timestamp_ns();
772 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
773 let topic = switchboard::get_event_positions_topic(event.strategy_id);
774 msgbus::publish(topic, &event);
775
776 Ok(position)
777 }
778
779 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
780 position.apply(&fill);
781
782 if let Err(e) = self.cache.borrow_mut().update_position(position) {
783 log::error!("Failed to update position: {e:?}");
784 return;
785 }
786
787 if self.config.snapshot_positions {
788 self.create_position_state_snapshot(position);
789 }
790
791 let topic = switchboard::get_event_positions_topic(position.strategy_id);
792 let ts_init = self.clock.borrow().timestamp_ns();
793
794 if position.is_closed() {
795 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
796 msgbus::publish(topic, &event);
797 } else {
798 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
799 msgbus::publish(topic, &event);
800 }
801 }
802
803 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
804 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
805 }
806
807 fn flip_position(
808 &mut self,
809 instrument: InstrumentAny,
810 position: &mut Position,
811 fill: OrderFilled,
812 oms_type: OmsType,
813 ) {
814 let difference = match position.side {
815 PositionSide::Long => Quantity::from_raw(
816 fill.last_qty.raw - position.quantity.raw,
817 position.size_precision,
818 ),
819 PositionSide::Short => Quantity::from_raw(
820 position.quantity.raw - fill.last_qty.raw,
821 position.size_precision,
822 ),
823 _ => fill.last_qty,
824 };
825
826 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
828 let (commission1, commission2) = if let Some(commission) = fill.commission {
829 let commission_currency = commission.currency;
830 let commission1 = Money::new(commission * fill_percent, commission_currency);
831 let commission2 = commission - commission1;
832 (Some(commission1), Some(commission2))
833 } else {
834 log::error!("Commission is not available.");
835 (None, None)
836 };
837
838 let mut fill_split1: Option<OrderFilled> = None;
839 if position.is_open() {
840 fill_split1 = Some(OrderFilled::new(
841 fill.trader_id,
842 fill.strategy_id,
843 fill.instrument_id,
844 fill.client_order_id,
845 fill.venue_order_id,
846 fill.account_id,
847 fill.trade_id,
848 fill.order_side,
849 fill.order_type,
850 position.quantity,
851 fill.last_px,
852 fill.currency,
853 fill.liquidity_side,
854 UUID4::new(),
855 fill.ts_event,
856 fill.ts_init,
857 fill.reconciliation,
858 fill.position_id,
859 commission1,
860 ));
861
862 self.update_position(position, fill_split1.unwrap());
863 }
864
865 if difference.raw == 0 {
867 log::warn!(
868 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
869 );
870 return;
871 }
872
873 let position_id_flip = if oms_type == OmsType::Hedging {
874 if let Some(position_id) = fill.position_id {
875 if position_id.is_virtual() {
876 Some(self.pos_id_generator.generate(fill.strategy_id, true))
878 } else {
879 Some(position_id)
880 }
881 } else {
882 None
883 }
884 } else {
885 fill.position_id
886 };
887
888 let fill_split2 = OrderFilled::new(
889 fill.trader_id,
890 fill.strategy_id,
891 fill.instrument_id,
892 fill.client_order_id,
893 fill.venue_order_id,
894 fill.account_id,
895 fill.trade_id,
896 fill.order_side,
897 fill.order_type,
898 difference,
899 fill.last_px,
900 fill.currency,
901 fill.liquidity_side,
902 UUID4::new(),
903 fill.ts_event,
904 fill.ts_init,
905 fill.reconciliation,
906 position_id_flip,
907 commission2,
908 );
909
910 if oms_type == OmsType::Hedging {
911 if let Some(position_id) = fill.position_id {
912 if position_id.is_virtual() {
913 log::warn!("Closing position {fill_split1:?}");
914 log::warn!("Flipping position {fill_split2:?}");
915 }
916 }
917 }
918
919 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
921 log::error!("Failed to open flipped position: {e:?}");
922 }
923 }
924
925 fn set_position_id_counts(&mut self) {
928 let cache = self.cache.borrow();
930 let positions = cache.positions(None, None, None, None);
931
932 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
934
935 for position in positions {
936 *counts.entry(position.strategy_id).or_insert(0) += 1;
937 }
938
939 self.pos_id_generator.reset();
940
941 for (strategy_id, count) in counts {
942 self.pos_id_generator.set_count(count, strategy_id);
943 log::info!("Set PositionId count for {strategy_id} to {count}");
944 }
945 }
946
947 fn last_px_for_conversion(
948 &self,
949 instrument_id: &InstrumentId,
950 side: OrderSide,
951 ) -> Option<Price> {
952 let cache = self.cache.borrow();
953
954 if let Some(trade) = cache.trade(instrument_id) {
956 return Some(trade.price);
957 }
958
959 if let Some(quote) = cache.quote(instrument_id) {
961 match side {
962 OrderSide::Buy => Some(quote.ask_price),
963 OrderSide::Sell => Some(quote.bid_price),
964 OrderSide::NoOrderSide => None,
965 }
966 } else {
967 None
968 }
969 }
970
971 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
972 log::info!(
973 "Setting {} order quote quantity {} to base quantity {}",
974 order.instrument_id(),
975 order.quantity(),
976 base_qty
977 );
978
979 let original_qty = order.quantity();
980 order.set_quantity(base_qty);
981 order.set_leaves_qty(base_qty);
982 order.set_is_quote_quantity(false);
983
984 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
985 return;
986 }
987
988 if let Some(linked_order_ids) = order.linked_order_ids() {
989 for client_order_id in linked_order_ids {
990 match self.cache.borrow_mut().mut_order(client_order_id) {
991 Some(contingent_order) => {
992 if !contingent_order.is_quote_quantity() {
993 continue; }
995
996 if contingent_order.quantity() != original_qty {
997 log::warn!(
998 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
999 contingent_order.quantity(),
1000 original_qty,
1001 base_qty
1002 );
1003 }
1004
1005 log::info!(
1006 "Setting {} order quote quantity {} to base quantity {}",
1007 contingent_order.instrument_id(),
1008 contingent_order.quantity(),
1009 base_qty
1010 );
1011
1012 contingent_order.set_quantity(base_qty);
1013 contingent_order.set_leaves_qty(base_qty);
1014 contingent_order.set_is_quote_quantity(false);
1015 }
1016 None => {
1017 log::error!("Contingency order {client_order_id} not found");
1018 }
1019 }
1020 }
1021 } else {
1022 log::warn!(
1023 "No linked order IDs found for order {}",
1024 order.client_order_id()
1025 );
1026 }
1027 }
1028
1029 fn deny_order(&self, order: &OrderAny, reason: &str) {
1030 log::error!(
1031 "Order denied: {reason}, order ID: {}",
1032 order.client_order_id()
1033 );
1034
1035 let denied = OrderDenied::new(
1036 order.trader_id(),
1037 order.strategy_id(),
1038 order.instrument_id(),
1039 order.client_order_id(),
1040 reason.into(),
1041 UUID4::new(),
1042 self.clock.borrow().timestamp_ns(),
1043 self.clock.borrow().timestamp_ns(),
1044 );
1045
1046 let mut order = order.clone();
1047
1048 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1049 log::error!("Failed to apply denied event to order: {e}");
1050 return;
1051 }
1052
1053 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1054 log::error!("Failed to update order in cache: {e}");
1055 return;
1056 }
1057
1058 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1059 msgbus::publish(topic, &denied);
1060
1061 if self.config.snapshot_orders {
1062 self.create_order_state_snapshot(&order);
1063 }
1064 }
1065
1066 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<OwnOrderBook> {
1067 let mut cache = self.cache.borrow_mut();
1068 if cache.own_order_book_mut(instrument_id).is_none() {
1069 let own_book = OwnOrderBook::new(*instrument_id);
1070 cache.add_own_order_book(own_book).unwrap();
1071 }
1072
1073 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1074 }
1075}
1076
1077#[cfg(test)]
1081mod tests {
1082 use std::{cell::RefCell, rc::Rc};
1083
1084 use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1085 use rstest::fixture;
1086
1087 use super::*;
1088
1089 #[fixture]
1090 fn msgbus() -> MessageBus {
1091 MessageBus::default()
1092 }
1093
1094 #[fixture]
1095 fn simple_cache() -> Cache {
1096 Cache::new(None, None)
1097 }
1098
1099 #[fixture]
1100 fn clock() -> TestClock {
1101 TestClock::new()
1102 }
1103
1104 fn _get_exec_engine(
1106 cache: Rc<RefCell<Cache>>,
1107 clock: Rc<RefCell<TestClock>>,
1108 config: Option<ExecutionEngineConfig>,
1109 ) -> ExecutionEngine {
1110 ExecutionEngine::new(clock, cache, config)
1111 }
1112
1113 }