nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24
25use std::{
26    cell::{RefCell, RefMut},
27    collections::{HashMap, HashSet},
28    fmt::Debug,
29    rc::Rc,
30    time::SystemTime,
31};
32
33use config::ExecutionEngineConfig;
34use nautilus_common::{
35    cache::Cache,
36    clock::Clock,
37    generators::position_id::PositionIdGenerator,
38    logging::{CMD, EVT, RECV},
39    messages::execution::{
40        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
41        SubmitOrderList, TradingCommand,
42    },
43    msgbus::{
44        self, get_message_bus,
45        switchboard::{self},
46    },
47};
48use nautilus_core::UUID4;
49use nautilus_model::{
50    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
51    events::{
52        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
53        PositionOpened,
54    },
55    identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
56    instruments::{Instrument, InstrumentAny},
57    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
58    orders::{Order, OrderAny, OrderError},
59    position::Position,
60    types::{Money, Price, Quantity},
61};
62
63use crate::client::ExecutionClient;
64
65pub struct ExecutionEngine {
66    clock: Rc<RefCell<dyn Clock>>,
67    cache: Rc<RefCell<Cache>>,
68    clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
69    default_client: Option<Rc<dyn ExecutionClient>>,
70    routing_map: HashMap<Venue, ClientId>,
71    oms_overrides: HashMap<StrategyId, OmsType>,
72    external_order_claims: HashMap<InstrumentId, StrategyId>,
73    pos_id_generator: PositionIdGenerator,
74    config: ExecutionEngineConfig,
75}
76
77impl Debug for ExecutionEngine {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct(stringify!(ExecutionEngine))
80            .field("client_count", &self.clients.len())
81            .finish()
82    }
83}
84
85impl ExecutionEngine {
86    pub fn new(
87        clock: Rc<RefCell<dyn Clock>>,
88        cache: Rc<RefCell<Cache>>,
89        config: Option<ExecutionEngineConfig>,
90    ) -> Self {
91        let trader_id = get_message_bus().borrow().trader_id;
92        Self {
93            clock: clock.clone(),
94            cache,
95            clients: HashMap::new(),
96            default_client: None,
97            routing_map: HashMap::new(),
98            oms_overrides: HashMap::new(),
99            external_order_claims: HashMap::new(),
100            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
101            config: config.unwrap_or_default(),
102        }
103    }
104
105    #[must_use]
106    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
107        self.pos_id_generator.count(strategy_id)
108    }
109
110    #[must_use]
111    pub fn check_integrity(&self) -> bool {
112        self.cache.borrow_mut().check_integrity()
113    }
114
115    #[must_use]
116    pub fn check_connected(&self) -> bool {
117        self.clients.values().all(|c| c.is_connected())
118    }
119
120    #[must_use]
121    pub fn check_disconnected(&self) -> bool {
122        self.clients.values().all(|c| !c.is_connected())
123    }
124
125    #[must_use]
126    pub fn check_residuals(&self) -> bool {
127        self.cache.borrow().check_residuals()
128    }
129
130    #[must_use]
131    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
132        self.external_order_claims.keys().copied().collect()
133    }
134
135    // -- REGISTRATION ----------------------------------------------------------------------------
136
137    /// Registers a new execution client.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if a client with the same ID is already registered.
142    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
143        if self.clients.contains_key(&client.client_id()) {
144            anyhow::bail!("Client already registered with ID {}", client.client_id());
145        }
146
147        // If client has venue, register routing
148        self.routing_map.insert(client.venue(), client.client_id());
149
150        log::info!("Registered client {}", client.client_id());
151        self.clients.insert(client.client_id(), client);
152        Ok(())
153    }
154
155    pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
156        log::info!("Registered default client {}", client.client_id());
157        self.default_client = Some(client);
158    }
159
160    #[must_use]
161    pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
162        self.clients.get(client_id).cloned()
163    }
164
165    /// Sets routing for a specific venue to a given client ID.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the client ID is not registered.
170    pub fn register_venue_routing(
171        &mut self,
172        client_id: ClientId,
173        venue: Venue,
174    ) -> anyhow::Result<()> {
175        if !self.clients.contains_key(&client_id) {
176            anyhow::bail!("No client registered with ID {client_id}");
177        }
178
179        self.routing_map.insert(venue, client_id);
180        log::info!("Set client {client_id} routing for {venue}");
181        Ok(())
182    }
183
184    // TODO: Implement `Strategy`
185    // pub fn register_external_order_claims(&mut self, strategy: Strategy) -> anyhow::Result<()> {
186    //     todo!();
187    // }
188
189    /// # Errors
190    ///
191    /// Returns an error if no client is registered with the given ID.
192    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
193        if self.clients.remove(&client_id).is_some() {
194            // Remove from routing map if present
195            self.routing_map
196                .retain(|_, mapped_id| mapped_id != &client_id);
197            log::info!("Deregistered client {client_id}");
198            Ok(())
199        } else {
200            anyhow::bail!("No client registered with ID {client_id}")
201        }
202    }
203
204    // -- COMMANDS --------------------------------------------------------------------------------
205
206    #[allow(clippy::await_holding_refcell_ref)]
207    /// Loads persistent state into cache and rebuilds indices.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if any cache operation fails.
212    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
213        let ts = SystemTime::now();
214
215        {
216            let mut cache = self.cache.borrow_mut();
217            cache.clear_index();
218            cache.cache_general()?;
219            self.cache.borrow_mut().cache_all().await?;
220            cache.build_index();
221            let _ = cache.check_integrity();
222
223            if self.config.manage_own_order_books {
224                for order in cache.orders(None, None, None, None) {
225                    if order.is_closed() || !should_handle_own_book_order(order) {
226                        continue;
227                    }
228                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
229                    own_book.add(order.to_own_book_order());
230                }
231            }
232        }
233
234        self.set_position_id_counts();
235
236        log::info!(
237            "Loaded cache in {}ms",
238            SystemTime::now()
239                .duration_since(ts)
240                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
241                .as_millis()
242        );
243
244        Ok(())
245    }
246
247    pub fn flush_db(&self) {
248        self.cache.borrow_mut().flush_db();
249    }
250
251    pub fn process(&mut self, event: &OrderEventAny) {
252        self.handle_event(event);
253    }
254
255    pub fn execute(&self, command: &TradingCommand) {
256        self.execute_command(command);
257    }
258
259    // -- COMMAND HANDLERS ------------------------------------------------------------------------
260
261    fn execute_command(&self, command: &TradingCommand) {
262        if self.config.debug {
263            log::debug!("{RECV}{CMD} {command:?}");
264        }
265
266        let client: Rc<dyn ExecutionClient> = if let Some(client) = self
267            .clients
268            .get(&command.client_id())
269            .or_else(|| {
270                self.routing_map
271                    .get(&command.instrument_id().venue)
272                    .and_then(|client_id| self.clients.get(client_id))
273            })
274            .or(self.default_client.as_ref())
275        {
276            client.clone()
277        } else {
278            log::error!(
279                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
280                command.client_id(),
281                command.instrument_id().venue,
282            );
283            return;
284        };
285
286        match command {
287            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
288            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
289            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
290            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
291            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
292            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
293            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
294        }
295    }
296
297    fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrder) {
298        let mut order = cmd.order.clone();
299        let client_order_id = order.client_order_id();
300        let instrument_id = order.instrument_id();
301
302        // Check if the order exists in the cache
303        if !self.cache.borrow().order_exists(&client_order_id) {
304            // Add order to cache in a separate scope to drop the mutable borrow
305            {
306                let mut cache = self.cache.borrow_mut();
307                if let Err(e) =
308                    cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
309                {
310                    log::error!("Error adding order to cache: {e}");
311                    return;
312                }
313            }
314
315            if self.config.snapshot_orders {
316                self.create_order_state_snapshot(&order);
317            }
318        }
319
320        // Get instrument in a separate scope to manage borrows
321        let instrument = {
322            let cache = self.cache.borrow();
323            if let Some(instrument) = cache.instrument(&instrument_id) {
324                instrument.clone()
325            } else {
326                log::error!(
327                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
328                );
329                return;
330            }
331        };
332
333        // Handle quote quantity conversion
334        if !instrument.is_inverse() && order.is_quote_quantity() {
335            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
336
337            if let Some(price) = last_px {
338                let base_qty = instrument.get_base_quantity(order.quantity(), price);
339                self.set_order_base_qty(&mut order, base_qty);
340            } else {
341                self.deny_order(
342                    &order,
343                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
344                );
345                return;
346            }
347        }
348
349        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
350            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
351            own_book.add(order.to_own_book_order());
352        }
353
354        // Send the order to the execution client
355        if let Err(e) = client.submit_order(cmd) {
356            log::error!("Error submitting order to client: {e}");
357            self.deny_order(
358                &cmd.order,
359                &format!("failed-to-submit-order-to-client: {e}"),
360            );
361        }
362    }
363
364    fn handle_submit_order_list(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrderList) {
365        let orders = cmd.order_list.orders.clone();
366
367        // Cache orders
368        let mut cache = self.cache.borrow_mut();
369        for order in &orders {
370            if !cache.order_exists(&order.client_order_id()) {
371                if let Err(e) =
372                    cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
373                {
374                    log::error!("Error adding order to cache: {e}");
375                    return;
376                }
377
378                if self.config.snapshot_orders {
379                    self.create_order_state_snapshot(order);
380                }
381            }
382        }
383        drop(cache);
384
385        // Get instrument from cache
386        let cache = self.cache.borrow();
387        let instrument = if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
388            instrument
389        } else {
390            log::error!(
391                "Cannot handle submit order list: no instrument found for {}, {cmd}",
392                cmd.instrument_id,
393            );
394            return;
395        };
396
397        // Check if converting quote quantity
398        if !instrument.is_inverse() && cmd.order_list.orders[0].is_quote_quantity() {
399            let mut quote_qty = None;
400            let mut _last_px = None;
401
402            for order in &cmd.order_list.orders {
403                if !order.is_quote_quantity() {
404                    continue; // Base quantity already set
405                }
406
407                if Some(order.quantity()) != quote_qty {
408                    _last_px =
409                        self.last_px_for_conversion(&order.instrument_id(), order.order_side());
410                    quote_qty = Some(order.quantity());
411                }
412
413                // TODO: Pull order out of cache to modify
414                // if let Some(px) = last_px {
415                //     let base_qty = instrument.get_base_quantity(order.quantity(), px);
416                //     self.set_order_base_qty(order, base_qty);
417                // } else {
418                //     for order in &cmd.order_list.orders {
419                //         self.deny_order(
420                //             order,
421                //             &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
422                //         );
423                //     }
424                //     return; // Denied
425                // }
426            }
427        }
428
429        if self.config.manage_own_order_books {
430            let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
431            for order in &cmd.order_list.orders {
432                if should_handle_own_book_order(order) {
433                    own_book.add(order.to_own_book_order());
434                }
435            }
436        }
437
438        // Send to execution client
439        if let Err(e) = client.submit_order_list(cmd) {
440            log::error!("Error submitting order list to client: {e}");
441            for order in &orders {
442                self.deny_order(
443                    order,
444                    &format!("failed-to-submit-order-list-to-client: {e}"),
445                );
446            }
447        }
448    }
449
450    fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, cmd: &ModifyOrder) {
451        if let Err(e) = client.modify_order(cmd) {
452            log::error!("Error modifying order: {e}");
453        }
454    }
455
456    fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelOrder) {
457        if let Err(e) = client.cancel_order(cmd) {
458            log::error!("Error canceling order: {e}");
459        }
460    }
461
462    fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelAllOrders) {
463        if let Err(e) = client.cancel_all_orders(cmd) {
464            log::error!("Error canceling all orders: {e}");
465        }
466    }
467
468    fn handle_batch_cancel_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &BatchCancelOrders) {
469        if let Err(e) = client.batch_cancel_orders(cmd) {
470            log::error!("Error batch canceling orders: {e}");
471        }
472    }
473
474    fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryOrder) {
475        if let Err(e) = client.query_order(cmd) {
476            log::error!("Error querying order: {e}");
477        }
478    }
479
480    fn create_order_state_snapshot(&self, order: &OrderAny) {
481        if self.config.debug {
482            log::debug!("Creating order state snapshot for {order}");
483        }
484
485        if self.cache.borrow().has_backing() {
486            if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
487                log::error!("Failed to snapshot order state: {e}");
488                return;
489            }
490        }
491
492        if get_message_bus().borrow().has_backing {
493            let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
494            msgbus::publish(topic, order);
495        }
496    }
497
498    fn create_position_state_snapshot(&self, position: &Position) {
499        if self.config.debug {
500            log::debug!("Creating position state snapshot for {position}");
501        }
502
503        // let mut position: Position = position.clone();
504        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
505        //     position.unrealized_pnl(last)
506        // }
507
508        let topic = switchboard::get_positions_snapshots_topic(position.id);
509        msgbus::publish(topic, position);
510    }
511
512    // -- EVENT HANDLERS --------------------------------------------------------------------------
513
514    fn handle_event(&mut self, event: &OrderEventAny) {
515        if self.config.debug {
516            log::debug!("{RECV}{EVT} {event:?}");
517        }
518
519        let client_order_id = event.client_order_id();
520        let cache = self.cache.borrow();
521        let mut order = if let Some(order) = cache.order(&client_order_id) {
522            order.clone()
523        } else {
524            log::warn!(
525                "Order with {} not found in the cache to apply {}",
526                event.client_order_id(),
527                event
528            );
529
530            // Try to find order by venue order ID if available
531            let venue_order_id = if let Some(id) = event.venue_order_id() {
532                id
533            } else {
534                log::error!(
535                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
536                    event.client_order_id()
537                );
538                return;
539            };
540
541            // Look up client order ID from venue order ID
542            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
543                id
544            } else {
545                log::error!(
546                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
547                    event.client_order_id(),
548                );
549                return;
550            };
551
552            // Get order using found client order ID
553            if let Some(order) = cache.order(client_order_id) {
554                log::info!("Order with {client_order_id} was found in the cache");
555                order.clone()
556            } else {
557                log::error!(
558                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
559                );
560                return;
561            }
562        };
563
564        drop(cache);
565        match event {
566            OrderEventAny::Filled(fill) => {
567                let oms_type = self.determine_oms_type(fill);
568                let position_id = self.determine_position_id(*fill, oms_type);
569
570                // Create a new fill with the determined position ID
571                let mut fill = *fill;
572                if fill.position_id.is_none() {
573                    fill.position_id = Some(position_id);
574                }
575
576                self.apply_event_to_order(&mut order, OrderEventAny::Filled(fill));
577                self.handle_order_fill(&order, fill, oms_type);
578            }
579            _ => {
580                self.apply_event_to_order(&mut order, event.clone());
581            }
582        }
583    }
584
585    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
586        // Check for strategy OMS override
587        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
588            return *oms_type;
589        }
590
591        // Use native venue OMS
592        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
593            if let Some(client) = self.clients.get(client_id) {
594                return client.oms_type();
595            }
596        }
597
598        if let Some(client) = &self.default_client {
599            return client.oms_type();
600        }
601
602        OmsType::Netting // Default fallback
603    }
604
605    fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
606        match oms_type {
607            OmsType::Hedging => self.determine_hedging_position_id(fill),
608            OmsType::Netting => self.determine_netting_position_id(fill),
609            _ => self.determine_netting_position_id(fill), // Default to netting
610        }
611    }
612
613    fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
614        // Check if position ID already exists
615        if let Some(position_id) = fill.position_id {
616            if self.config.debug {
617                log::debug!("Already had a position ID of: {position_id}");
618            }
619            return position_id;
620        }
621
622        // Check for order
623        let cache = self.cache.borrow();
624        let order = match cache.order(&fill.client_order_id()) {
625            Some(o) => o,
626            None => {
627                panic!(
628                    "Order for {} not found to determine position ID",
629                    fill.client_order_id()
630                );
631            }
632        };
633
634        // Check execution spawn orders
635        if let Some(spawn_id) = order.exec_spawn_id() {
636            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
637            for spawned_order in spawn_orders {
638                if let Some(pos_id) = spawned_order.position_id() {
639                    if self.config.debug {
640                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
641                    }
642                    return pos_id;
643                }
644            }
645        }
646
647        // Generate new position ID
648        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
649        if self.config.debug {
650            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
651        }
652        position_id
653    }
654
655    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
656        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
657    }
658
659    fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
660        if let Err(e) = order.apply(event.clone()) {
661            match e {
662                OrderError::InvalidStateTransition => {
663                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
664                }
665                _ => {
666                    log::error!("Error applying event: {e}, did not apply {event}");
667                }
668            }
669            return;
670        }
671
672        if let Err(e) = self.cache.borrow_mut().update_order(order) {
673            log::error!("Error updating order in cache: {e}");
674        }
675
676        let topic = switchboard::get_event_orders_topic(event.strategy_id());
677        msgbus::publish(topic, order);
678
679        if self.config.snapshot_orders {
680            self.create_order_state_snapshot(order);
681        }
682    }
683
684    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
685        let instrument =
686            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
687                instrument.clone()
688            } else {
689                log::error!(
690                    "Cannot handle order fill: no instrument found for {}, {fill}",
691                    fill.instrument_id,
692                );
693                return;
694            };
695
696        if self.cache.borrow().account(&fill.account_id).is_none() {
697            log::error!(
698                "Cannot handle order fill: no account found for {}, {fill}",
699                fill.instrument_id.venue,
700            );
701            return;
702        }
703
704        let position_id = if let Some(position_id) = fill.position_id {
705            position_id
706        } else {
707            log::error!("Cannot handle order fill: no position ID found for fill {fill}");
708            return;
709        };
710
711        let mut position = match self.cache.borrow().position(&position_id) {
712            Some(pos) if !pos.is_closed() => pos.clone(),
713            _ => self
714                .open_position(instrument.clone(), None, fill, oms_type)
715                .unwrap(),
716        };
717
718        if self.will_flip_position(&position, fill) {
719            self.flip_position(instrument, &mut position, fill, oms_type);
720        } else {
721            self.update_position(&mut position, fill);
722        }
723
724        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
725            for client_order_id in order.linked_order_ids().unwrap_or_default() {
726                let mut cache = self.cache.borrow_mut();
727                let contingent_order = cache.mut_order(client_order_id);
728                if let Some(contingent_order) = contingent_order {
729                    if contingent_order.position_id().is_none() {
730                        contingent_order.set_position_id(Some(position_id));
731
732                        if let Err(e) = self.cache.borrow_mut().add_position_id(
733                            &position_id,
734                            &contingent_order.instrument_id().venue,
735                            &contingent_order.client_order_id(),
736                            &contingent_order.strategy_id(),
737                        ) {
738                            log::error!("Failed to add position ID: {e}");
739                        }
740                    }
741                }
742            }
743        }
744    }
745
746    fn open_position(
747        &self,
748        instrument: InstrumentAny,
749        position: Option<&Position>,
750        fill: OrderFilled,
751        oms_type: OmsType,
752    ) -> anyhow::Result<Position> {
753        let position = if let Some(position) = position {
754            // Always snapshot opening positions to handle NETTING OMS
755            self.cache.borrow_mut().snapshot_position(position)?;
756            let mut position = position.clone();
757            position.apply(&fill);
758            self.cache.borrow_mut().update_position(&position)?;
759            position
760        } else {
761            let position = Position::new(&instrument, fill);
762            self.cache
763                .borrow_mut()
764                .add_position(position.clone(), oms_type)?;
765            if self.config.snapshot_positions {
766                self.create_position_state_snapshot(&position);
767            }
768            position
769        };
770
771        let ts_init = self.clock.borrow().timestamp_ns();
772        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
773        let topic = switchboard::get_event_positions_topic(event.strategy_id);
774        msgbus::publish(topic, &event);
775
776        Ok(position)
777    }
778
779    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
780        position.apply(&fill);
781
782        if let Err(e) = self.cache.borrow_mut().update_position(position) {
783            log::error!("Failed to update position: {e:?}");
784            return;
785        }
786
787        if self.config.snapshot_positions {
788            self.create_position_state_snapshot(position);
789        }
790
791        let topic = switchboard::get_event_positions_topic(position.strategy_id);
792        let ts_init = self.clock.borrow().timestamp_ns();
793
794        if position.is_closed() {
795            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
796            msgbus::publish(topic, &event);
797        } else {
798            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
799            msgbus::publish(topic, &event);
800        }
801    }
802
803    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
804        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
805    }
806
807    fn flip_position(
808        &mut self,
809        instrument: InstrumentAny,
810        position: &mut Position,
811        fill: OrderFilled,
812        oms_type: OmsType,
813    ) {
814        let difference = match position.side {
815            PositionSide::Long => Quantity::from_raw(
816                fill.last_qty.raw - position.quantity.raw,
817                position.size_precision,
818            ),
819            PositionSide::Short => Quantity::from_raw(
820                position.quantity.raw - fill.last_qty.raw,
821                position.size_precision,
822            ),
823            _ => fill.last_qty,
824        };
825
826        // Split commission between two positions
827        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
828        let (commission1, commission2) = if let Some(commission) = fill.commission {
829            let commission_currency = commission.currency;
830            let commission1 = Money::new(commission * fill_percent, commission_currency);
831            let commission2 = commission - commission1;
832            (Some(commission1), Some(commission2))
833        } else {
834            log::error!("Commission is not available.");
835            (None, None)
836        };
837
838        let mut fill_split1: Option<OrderFilled> = None;
839        if position.is_open() {
840            fill_split1 = Some(OrderFilled::new(
841                fill.trader_id,
842                fill.strategy_id,
843                fill.instrument_id,
844                fill.client_order_id,
845                fill.venue_order_id,
846                fill.account_id,
847                fill.trade_id,
848                fill.order_side,
849                fill.order_type,
850                position.quantity,
851                fill.last_px,
852                fill.currency,
853                fill.liquidity_side,
854                UUID4::new(),
855                fill.ts_event,
856                fill.ts_init,
857                fill.reconciliation,
858                fill.position_id,
859                commission1,
860            ));
861
862            self.update_position(position, fill_split1.unwrap());
863        }
864
865        // Guard against flipping a position with a zero fill size
866        if difference.raw == 0 {
867            log::warn!(
868                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
869            );
870            return;
871        }
872
873        let position_id_flip = if oms_type == OmsType::Hedging {
874            if let Some(position_id) = fill.position_id {
875                if position_id.is_virtual() {
876                    // Generate new position ID for flipped virtual position
877                    Some(self.pos_id_generator.generate(fill.strategy_id, true))
878                } else {
879                    Some(position_id)
880                }
881            } else {
882                None
883            }
884        } else {
885            fill.position_id
886        };
887
888        let fill_split2 = OrderFilled::new(
889            fill.trader_id,
890            fill.strategy_id,
891            fill.instrument_id,
892            fill.client_order_id,
893            fill.venue_order_id,
894            fill.account_id,
895            fill.trade_id,
896            fill.order_side,
897            fill.order_type,
898            difference,
899            fill.last_px,
900            fill.currency,
901            fill.liquidity_side,
902            UUID4::new(),
903            fill.ts_event,
904            fill.ts_init,
905            fill.reconciliation,
906            position_id_flip,
907            commission2,
908        );
909
910        if oms_type == OmsType::Hedging {
911            if let Some(position_id) = fill.position_id {
912                if position_id.is_virtual() {
913                    log::warn!("Closing position {fill_split1:?}");
914                    log::warn!("Flipping position {fill_split2:?}");
915                }
916            }
917        }
918
919        // Open flipped position
920        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
921            log::error!("Failed to open flipped position: {e:?}");
922        }
923    }
924
925    // -- INTERNAL --------------------------------------------------------------------------------
926
927    fn set_position_id_counts(&mut self) {
928        // For the internal position ID generator
929        let cache = self.cache.borrow();
930        let positions = cache.positions(None, None, None, None);
931
932        // Count positions per instrument_id using a HashMap
933        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
934
935        for position in positions {
936            *counts.entry(position.strategy_id).or_insert(0) += 1;
937        }
938
939        self.pos_id_generator.reset();
940
941        for (strategy_id, count) in counts {
942            self.pos_id_generator.set_count(count, strategy_id);
943            log::info!("Set PositionId count for {strategy_id} to {count}");
944        }
945    }
946
947    fn last_px_for_conversion(
948        &self,
949        instrument_id: &InstrumentId,
950        side: OrderSide,
951    ) -> Option<Price> {
952        let cache = self.cache.borrow();
953
954        // Try to get last trade price
955        if let Some(trade) = cache.trade(instrument_id) {
956            return Some(trade.price);
957        }
958
959        // Fall back to quote if available
960        if let Some(quote) = cache.quote(instrument_id) {
961            match side {
962                OrderSide::Buy => Some(quote.ask_price),
963                OrderSide::Sell => Some(quote.bid_price),
964                OrderSide::NoOrderSide => None,
965            }
966        } else {
967            None
968        }
969    }
970
971    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
972        log::info!(
973            "Setting {} order quote quantity {} to base quantity {}",
974            order.instrument_id(),
975            order.quantity(),
976            base_qty
977        );
978
979        let original_qty = order.quantity();
980        order.set_quantity(base_qty);
981        order.set_leaves_qty(base_qty);
982        order.set_is_quote_quantity(false);
983
984        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
985            return;
986        }
987
988        if let Some(linked_order_ids) = order.linked_order_ids() {
989            for client_order_id in linked_order_ids {
990                match self.cache.borrow_mut().mut_order(client_order_id) {
991                    Some(contingent_order) => {
992                        if !contingent_order.is_quote_quantity() {
993                            continue; // Already base quantity
994                        }
995
996                        if contingent_order.quantity() != original_qty {
997                            log::warn!(
998                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
999                                contingent_order.quantity(),
1000                                original_qty,
1001                                base_qty
1002                            );
1003                        }
1004
1005                        log::info!(
1006                            "Setting {} order quote quantity {} to base quantity {}",
1007                            contingent_order.instrument_id(),
1008                            contingent_order.quantity(),
1009                            base_qty
1010                        );
1011
1012                        contingent_order.set_quantity(base_qty);
1013                        contingent_order.set_leaves_qty(base_qty);
1014                        contingent_order.set_is_quote_quantity(false);
1015                    }
1016                    None => {
1017                        log::error!("Contingency order {client_order_id} not found");
1018                    }
1019                }
1020            }
1021        } else {
1022            log::warn!(
1023                "No linked order IDs found for order {}",
1024                order.client_order_id()
1025            );
1026        }
1027    }
1028
1029    fn deny_order(&self, order: &OrderAny, reason: &str) {
1030        log::error!(
1031            "Order denied: {reason}, order ID: {}",
1032            order.client_order_id()
1033        );
1034
1035        let denied = OrderDenied::new(
1036            order.trader_id(),
1037            order.strategy_id(),
1038            order.instrument_id(),
1039            order.client_order_id(),
1040            reason.into(),
1041            UUID4::new(),
1042            self.clock.borrow().timestamp_ns(),
1043            self.clock.borrow().timestamp_ns(),
1044        );
1045
1046        let mut order = order.clone();
1047
1048        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1049            log::error!("Failed to apply denied event to order: {e}");
1050            return;
1051        }
1052
1053        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1054            log::error!("Failed to update order in cache: {e}");
1055            return;
1056        }
1057
1058        let topic = switchboard::get_event_orders_topic(order.strategy_id());
1059        msgbus::publish(topic, &denied);
1060
1061        if self.config.snapshot_orders {
1062            self.create_order_state_snapshot(&order);
1063        }
1064    }
1065
1066    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<OwnOrderBook> {
1067        let mut cache = self.cache.borrow_mut();
1068        if cache.own_order_book_mut(instrument_id).is_none() {
1069            let own_book = OwnOrderBook::new(*instrument_id);
1070            cache.add_own_order_book(own_book).unwrap();
1071        }
1072
1073        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1074    }
1075}
1076
1077////////////////////////////////////////////////////////////////////////////////
1078// Tests
1079////////////////////////////////////////////////////////////////////////////////
1080#[cfg(test)]
1081mod tests {
1082    use std::{cell::RefCell, rc::Rc};
1083
1084    use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1085    use rstest::fixture;
1086
1087    use super::*;
1088
1089    #[fixture]
1090    fn msgbus() -> MessageBus {
1091        MessageBus::default()
1092    }
1093
1094    #[fixture]
1095    fn simple_cache() -> Cache {
1096        Cache::new(None, None)
1097    }
1098
1099    #[fixture]
1100    fn clock() -> TestClock {
1101        TestClock::new()
1102    }
1103
1104    // Helpers
1105    fn _get_exec_engine(
1106        cache: Rc<RefCell<Cache>>,
1107        clock: Rc<RefCell<TestClock>>,
1108        config: Option<ExecutionEngineConfig>,
1109    ) -> ExecutionEngine {
1110        ExecutionEngine::new(clock, cache, config)
1111    }
1112
1113    // TODO: After Implementing ExecutionClient & Strategy
1114}