nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24
25use std::{
26    cell::{RefCell, RefMut},
27    collections::{HashMap, HashSet},
28    fmt::Debug,
29    rc::Rc,
30    time::SystemTime,
31};
32
33use config::ExecutionEngineConfig;
34use nautilus_common::{
35    cache::Cache,
36    clock::Clock,
37    generators::position_id::PositionIdGenerator,
38    logging::{CMD, EVT, RECV},
39    messages::execution::{
40        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
41        SubmitOrderList, TradingCommand,
42    },
43    msgbus::{
44        self, get_message_bus,
45        switchboard::{self},
46    },
47};
48use nautilus_core::UUID4;
49use nautilus_model::{
50    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
51    events::{
52        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
53        PositionOpened,
54    },
55    identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
56    instruments::{Instrument, InstrumentAny},
57    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
58    orders::{Order, OrderAny, OrderError},
59    position::Position,
60    types::{Money, Price, Quantity},
61};
62
63use crate::client::ExecutionClient;
64
65/// Central execution engine responsible for orchestrating order routing and execution.
66///
67/// The execution engine manages the entire order lifecycle from submission to completion,
68/// handling routing to appropriate execution clients, position management, and event
69/// processing. It supports multiple execution venues through registered clients and
70/// provides sophisticated order management capabilities.
71pub struct ExecutionEngine {
72    clock: Rc<RefCell<dyn Clock>>,
73    cache: Rc<RefCell<Cache>>,
74    clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
75    default_client: Option<Rc<dyn ExecutionClient>>,
76    routing_map: HashMap<Venue, ClientId>,
77    oms_overrides: HashMap<StrategyId, OmsType>,
78    external_order_claims: HashMap<InstrumentId, StrategyId>,
79    pos_id_generator: PositionIdGenerator,
80    config: ExecutionEngineConfig,
81}
82
83impl Debug for ExecutionEngine {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct(stringify!(ExecutionEngine))
86            .field("client_count", &self.clients.len())
87            .finish()
88    }
89}
90
91impl ExecutionEngine {
92    /// Creates a new [`ExecutionEngine`] instance.
93    pub fn new(
94        clock: Rc<RefCell<dyn Clock>>,
95        cache: Rc<RefCell<Cache>>,
96        config: Option<ExecutionEngineConfig>,
97    ) -> Self {
98        let trader_id = get_message_bus().borrow().trader_id;
99        Self {
100            clock: clock.clone(),
101            cache,
102            clients: HashMap::new(),
103            default_client: None,
104            routing_map: HashMap::new(),
105            oms_overrides: HashMap::new(),
106            external_order_claims: HashMap::new(),
107            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
108            config: config.unwrap_or_default(),
109        }
110    }
111
112    #[must_use]
113    /// Returns the position ID count for the specified strategy.
114    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
115        self.pos_id_generator.count(strategy_id)
116    }
117
118    #[must_use]
119    /// Checks the integrity of cached execution data.
120    pub fn check_integrity(&self) -> bool {
121        self.cache.borrow_mut().check_integrity()
122    }
123
124    #[must_use]
125    /// Returns true if all registered execution clients are connected.
126    pub fn check_connected(&self) -> bool {
127        self.clients.values().all(|c| c.is_connected())
128    }
129
130    #[must_use]
131    /// Returns true if all registered execution clients are disconnected.
132    pub fn check_disconnected(&self) -> bool {
133        self.clients.values().all(|c| !c.is_connected())
134    }
135
136    #[must_use]
137    /// Checks for residual positions and orders in the cache.
138    pub fn check_residuals(&self) -> bool {
139        self.cache.borrow().check_residuals()
140    }
141
142    #[must_use]
143    /// Returns the set of instruments that have external order claims.
144    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
145        self.external_order_claims.keys().copied().collect()
146    }
147
148    // -- REGISTRATION ----------------------------------------------------------------------------
149
150    /// Registers a new execution client.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if a client with the same ID is already registered.
155    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
156        if self.clients.contains_key(&client.client_id()) {
157            anyhow::bail!("Client already registered with ID {}", client.client_id());
158        }
159
160        // If client has venue, register routing
161        self.routing_map.insert(client.venue(), client.client_id());
162
163        log::info!("Registered client {}", client.client_id());
164        self.clients.insert(client.client_id(), client);
165        Ok(())
166    }
167
168    /// Registers a default execution client for fallback routing.
169    pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
170        log::info!("Registered default client {}", client.client_id());
171        self.default_client = Some(client);
172    }
173
174    #[must_use]
175    /// Returns the execution client registered with the given ID.
176    pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
177        self.clients.get(client_id).cloned()
178    }
179
180    /// Sets routing for a specific venue to a given client ID.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the client ID is not registered.
185    pub fn register_venue_routing(
186        &mut self,
187        client_id: ClientId,
188        venue: Venue,
189    ) -> anyhow::Result<()> {
190        if !self.clients.contains_key(&client_id) {
191            anyhow::bail!("No client registered with ID {client_id}");
192        }
193
194        self.routing_map.insert(venue, client_id);
195        log::info!("Set client {client_id} routing for {venue}");
196        Ok(())
197    }
198
199    // TODO: Implement `Strategy`
200    // pub fn register_external_order_claims(&mut self, strategy: Strategy) -> anyhow::Result<()> {
201    //     todo!();
202    // }
203
204    /// # Errors
205    ///
206    /// Returns an error if no client is registered with the given ID.
207    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
208        if self.clients.remove(&client_id).is_some() {
209            // Remove from routing map if present
210            self.routing_map
211                .retain(|_, mapped_id| mapped_id != &client_id);
212            log::info!("Deregistered client {client_id}");
213            Ok(())
214        } else {
215            anyhow::bail!("No client registered with ID {client_id}")
216        }
217    }
218
219    // -- COMMANDS --------------------------------------------------------------------------------
220
221    #[allow(clippy::await_holding_refcell_ref)]
222    /// Loads persistent state into cache and rebuilds indices.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if any cache operation fails.
227    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
228        let ts = SystemTime::now();
229
230        {
231            let mut cache = self.cache.borrow_mut();
232            cache.clear_index();
233            cache.cache_general()?;
234            self.cache.borrow_mut().cache_all().await?;
235            cache.build_index();
236            let _ = cache.check_integrity();
237
238            if self.config.manage_own_order_books {
239                for order in cache.orders(None, None, None, None) {
240                    if order.is_closed() || !should_handle_own_book_order(order) {
241                        continue;
242                    }
243                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
244                    own_book.add(order.to_own_book_order());
245                }
246            }
247        }
248
249        self.set_position_id_counts();
250
251        log::info!(
252            "Loaded cache in {}ms",
253            SystemTime::now()
254                .duration_since(ts)
255                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
256                .as_millis()
257        );
258
259        Ok(())
260    }
261
262    /// Flushes the database to persist all cached data.
263    pub fn flush_db(&self) {
264        self.cache.borrow_mut().flush_db();
265    }
266
267    /// Processes an order event, updating internal state and routing as needed.
268    pub fn process(&mut self, event: &OrderEventAny) {
269        self.handle_event(event);
270    }
271
272    /// Executes a trading command by routing it to the appropriate execution client.
273    pub fn execute(&self, command: &TradingCommand) {
274        self.execute_command(command);
275    }
276
277    // -- COMMAND HANDLERS ------------------------------------------------------------------------
278
279    fn execute_command(&self, command: &TradingCommand) {
280        if self.config.debug {
281            log::debug!("{RECV}{CMD} {command:?}");
282        }
283
284        let client: Rc<dyn ExecutionClient> = if let Some(client) = self
285            .clients
286            .get(&command.client_id())
287            .or_else(|| {
288                self.routing_map
289                    .get(&command.instrument_id().venue)
290                    .and_then(|client_id| self.clients.get(client_id))
291            })
292            .or(self.default_client.as_ref())
293        {
294            client.clone()
295        } else {
296            log::error!(
297                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
298                command.client_id(),
299                command.instrument_id().venue,
300            );
301            return;
302        };
303
304        match command {
305            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
306            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
307            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
308            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
309            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
310            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
311            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
312        }
313    }
314
315    fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrder) {
316        let mut order = cmd.order.clone();
317        let client_order_id = order.client_order_id();
318        let instrument_id = order.instrument_id();
319
320        // Check if the order exists in the cache
321        if !self.cache.borrow().order_exists(&client_order_id) {
322            // Add order to cache in a separate scope to drop the mutable borrow
323            {
324                let mut cache = self.cache.borrow_mut();
325                if let Err(e) =
326                    cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
327                {
328                    log::error!("Error adding order to cache: {e}");
329                    return;
330                }
331            }
332
333            if self.config.snapshot_orders {
334                self.create_order_state_snapshot(&order);
335            }
336        }
337
338        // Get instrument in a separate scope to manage borrows
339        let instrument = {
340            let cache = self.cache.borrow();
341            if let Some(instrument) = cache.instrument(&instrument_id) {
342                instrument.clone()
343            } else {
344                log::error!(
345                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
346                );
347                return;
348            }
349        };
350
351        // Handle quote quantity conversion
352        if !instrument.is_inverse() && order.is_quote_quantity() {
353            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
354
355            if let Some(price) = last_px {
356                let base_qty = instrument.get_base_quantity(order.quantity(), price);
357                self.set_order_base_qty(&mut order, base_qty);
358            } else {
359                self.deny_order(
360                    &order,
361                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
362                );
363                return;
364            }
365        }
366
367        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
368            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
369            own_book.add(order.to_own_book_order());
370        }
371
372        // Send the order to the execution client
373        if let Err(e) = client.submit_order(cmd) {
374            log::error!("Error submitting order to client: {e}");
375            self.deny_order(
376                &cmd.order,
377                &format!("failed-to-submit-order-to-client: {e}"),
378            );
379        }
380    }
381
382    fn handle_submit_order_list(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrderList) {
383        let orders = cmd.order_list.orders.clone();
384
385        // Cache orders
386        let mut cache = self.cache.borrow_mut();
387        for order in &orders {
388            if !cache.order_exists(&order.client_order_id()) {
389                if let Err(e) =
390                    cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
391                {
392                    log::error!("Error adding order to cache: {e}");
393                    return;
394                }
395
396                if self.config.snapshot_orders {
397                    self.create_order_state_snapshot(order);
398                }
399            }
400        }
401        drop(cache);
402
403        // Get instrument from cache
404        let cache = self.cache.borrow();
405        let instrument = if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
406            instrument
407        } else {
408            log::error!(
409                "Cannot handle submit order list: no instrument found for {}, {cmd}",
410                cmd.instrument_id,
411            );
412            return;
413        };
414
415        // Check if converting quote quantity
416        if !instrument.is_inverse() && cmd.order_list.orders[0].is_quote_quantity() {
417            let mut quote_qty = None;
418            let mut _last_px = None;
419
420            for order in &cmd.order_list.orders {
421                if !order.is_quote_quantity() {
422                    continue; // Base quantity already set
423                }
424
425                if Some(order.quantity()) != quote_qty {
426                    _last_px =
427                        self.last_px_for_conversion(&order.instrument_id(), order.order_side());
428                    quote_qty = Some(order.quantity());
429                }
430
431                // TODO: Pull order out of cache to modify
432                // if let Some(px) = last_px {
433                //     let base_qty = instrument.get_base_quantity(order.quantity(), px);
434                //     self.set_order_base_qty(order, base_qty);
435                // } else {
436                //     for order in &cmd.order_list.orders {
437                //         self.deny_order(
438                //             order,
439                //             &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
440                //         );
441                //     }
442                //     return; // Denied
443                // }
444            }
445        }
446
447        if self.config.manage_own_order_books {
448            let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
449            for order in &cmd.order_list.orders {
450                if should_handle_own_book_order(order) {
451                    own_book.add(order.to_own_book_order());
452                }
453            }
454        }
455
456        // Send to execution client
457        if let Err(e) = client.submit_order_list(cmd) {
458            log::error!("Error submitting order list to client: {e}");
459            for order in &orders {
460                self.deny_order(
461                    order,
462                    &format!("failed-to-submit-order-list-to-client: {e}"),
463                );
464            }
465        }
466    }
467
468    fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, cmd: &ModifyOrder) {
469        if let Err(e) = client.modify_order(cmd) {
470            log::error!("Error modifying order: {e}");
471        }
472    }
473
474    fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelOrder) {
475        if let Err(e) = client.cancel_order(cmd) {
476            log::error!("Error canceling order: {e}");
477        }
478    }
479
480    fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelAllOrders) {
481        if let Err(e) = client.cancel_all_orders(cmd) {
482            log::error!("Error canceling all orders: {e}");
483        }
484    }
485
486    fn handle_batch_cancel_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &BatchCancelOrders) {
487        if let Err(e) = client.batch_cancel_orders(cmd) {
488            log::error!("Error batch canceling orders: {e}");
489        }
490    }
491
492    fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryOrder) {
493        if let Err(e) = client.query_order(cmd) {
494            log::error!("Error querying order: {e}");
495        }
496    }
497
498    fn create_order_state_snapshot(&self, order: &OrderAny) {
499        if self.config.debug {
500            log::debug!("Creating order state snapshot for {order}");
501        }
502
503        if self.cache.borrow().has_backing() {
504            if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
505                log::error!("Failed to snapshot order state: {e}");
506                return;
507            }
508        }
509
510        if get_message_bus().borrow().has_backing {
511            let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
512            msgbus::publish(topic, order);
513        }
514    }
515
516    fn create_position_state_snapshot(&self, position: &Position) {
517        if self.config.debug {
518            log::debug!("Creating position state snapshot for {position}");
519        }
520
521        // let mut position: Position = position.clone();
522        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
523        //     position.unrealized_pnl(last)
524        // }
525
526        let topic = switchboard::get_positions_snapshots_topic(position.id);
527        msgbus::publish(topic, position);
528    }
529
530    // -- EVENT HANDLERS --------------------------------------------------------------------------
531
532    fn handle_event(&mut self, event: &OrderEventAny) {
533        if self.config.debug {
534            log::debug!("{RECV}{EVT} {event:?}");
535        }
536
537        let client_order_id = event.client_order_id();
538        let cache = self.cache.borrow();
539        let mut order = if let Some(order) = cache.order(&client_order_id) {
540            order.clone()
541        } else {
542            log::warn!(
543                "Order with {} not found in the cache to apply {}",
544                event.client_order_id(),
545                event
546            );
547
548            // Try to find order by venue order ID if available
549            let venue_order_id = if let Some(id) = event.venue_order_id() {
550                id
551            } else {
552                log::error!(
553                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
554                    event.client_order_id()
555                );
556                return;
557            };
558
559            // Look up client order ID from venue order ID
560            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
561                id
562            } else {
563                log::error!(
564                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
565                    event.client_order_id(),
566                );
567                return;
568            };
569
570            // Get order using found client order ID
571            if let Some(order) = cache.order(client_order_id) {
572                log::info!("Order with {client_order_id} was found in the cache");
573                order.clone()
574            } else {
575                log::error!(
576                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
577                );
578                return;
579            }
580        };
581
582        drop(cache);
583        match event {
584            OrderEventAny::Filled(fill) => {
585                let oms_type = self.determine_oms_type(fill);
586                let position_id = self.determine_position_id(*fill, oms_type);
587
588                // Create a new fill with the determined position ID
589                let mut fill = *fill;
590                if fill.position_id.is_none() {
591                    fill.position_id = Some(position_id);
592                }
593
594                self.apply_event_to_order(&mut order, OrderEventAny::Filled(fill));
595                self.handle_order_fill(&order, fill, oms_type);
596            }
597            _ => {
598                self.apply_event_to_order(&mut order, event.clone());
599            }
600        }
601    }
602
603    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
604        // Check for strategy OMS override
605        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
606            return *oms_type;
607        }
608
609        // Use native venue OMS
610        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
611            if let Some(client) = self.clients.get(client_id) {
612                return client.oms_type();
613            }
614        }
615
616        if let Some(client) = &self.default_client {
617            return client.oms_type();
618        }
619
620        OmsType::Netting // Default fallback
621    }
622
623    fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
624        match oms_type {
625            OmsType::Hedging => self.determine_hedging_position_id(fill),
626            OmsType::Netting => self.determine_netting_position_id(fill),
627            _ => self.determine_netting_position_id(fill), // Default to netting
628        }
629    }
630
631    fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
632        // Check if position ID already exists
633        if let Some(position_id) = fill.position_id {
634            if self.config.debug {
635                log::debug!("Already had a position ID of: {position_id}");
636            }
637            return position_id;
638        }
639
640        // Check for order
641        let cache = self.cache.borrow();
642        let order = match cache.order(&fill.client_order_id()) {
643            Some(o) => o,
644            None => {
645                panic!(
646                    "Order for {} not found to determine position ID",
647                    fill.client_order_id()
648                );
649            }
650        };
651
652        // Check execution spawn orders
653        if let Some(spawn_id) = order.exec_spawn_id() {
654            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
655            for spawned_order in spawn_orders {
656                if let Some(pos_id) = spawned_order.position_id() {
657                    if self.config.debug {
658                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
659                    }
660                    return pos_id;
661                }
662            }
663        }
664
665        // Generate new position ID
666        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
667        if self.config.debug {
668            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
669        }
670        position_id
671    }
672
673    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
674        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
675    }
676
677    fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
678        if let Err(e) = order.apply(event.clone()) {
679            match e {
680                OrderError::InvalidStateTransition => {
681                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
682                }
683                _ => {
684                    log::error!("Error applying event: {e}, did not apply {event}");
685                }
686            }
687            return;
688        }
689
690        if let Err(e) = self.cache.borrow_mut().update_order(order) {
691            log::error!("Error updating order in cache: {e}");
692        }
693
694        let topic = switchboard::get_event_orders_topic(event.strategy_id());
695        msgbus::publish(topic, order);
696
697        if self.config.snapshot_orders {
698            self.create_order_state_snapshot(order);
699        }
700    }
701
702    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
703        let instrument =
704            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
705                instrument.clone()
706            } else {
707                log::error!(
708                    "Cannot handle order fill: no instrument found for {}, {fill}",
709                    fill.instrument_id,
710                );
711                return;
712            };
713
714        if self.cache.borrow().account(&fill.account_id).is_none() {
715            log::error!(
716                "Cannot handle order fill: no account found for {}, {fill}",
717                fill.instrument_id.venue,
718            );
719            return;
720        }
721
722        let position_id = if let Some(position_id) = fill.position_id {
723            position_id
724        } else {
725            log::error!("Cannot handle order fill: no position ID found for fill {fill}");
726            return;
727        };
728
729        let mut position = match self.cache.borrow().position(&position_id) {
730            Some(pos) if !pos.is_closed() => pos.clone(),
731            _ => self
732                .open_position(instrument.clone(), None, fill, oms_type)
733                .unwrap(),
734        };
735
736        if self.will_flip_position(&position, fill) {
737            self.flip_position(instrument, &mut position, fill, oms_type);
738        } else {
739            self.update_position(&mut position, fill);
740        }
741
742        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
743            for client_order_id in order.linked_order_ids().unwrap_or_default() {
744                let mut cache = self.cache.borrow_mut();
745                let contingent_order = cache.mut_order(client_order_id);
746                if let Some(contingent_order) = contingent_order {
747                    if contingent_order.position_id().is_none() {
748                        contingent_order.set_position_id(Some(position_id));
749
750                        if let Err(e) = self.cache.borrow_mut().add_position_id(
751                            &position_id,
752                            &contingent_order.instrument_id().venue,
753                            &contingent_order.client_order_id(),
754                            &contingent_order.strategy_id(),
755                        ) {
756                            log::error!("Failed to add position ID: {e}");
757                        }
758                    }
759                }
760            }
761        }
762    }
763
764    fn open_position(
765        &self,
766        instrument: InstrumentAny,
767        position: Option<&Position>,
768        fill: OrderFilled,
769        oms_type: OmsType,
770    ) -> anyhow::Result<Position> {
771        let position = if let Some(position) = position {
772            // Always snapshot opening positions to handle NETTING OMS
773            self.cache.borrow_mut().snapshot_position(position)?;
774            let mut position = position.clone();
775            position.apply(&fill);
776            self.cache.borrow_mut().update_position(&position)?;
777            position
778        } else {
779            let position = Position::new(&instrument, fill);
780            self.cache
781                .borrow_mut()
782                .add_position(position.clone(), oms_type)?;
783            if self.config.snapshot_positions {
784                self.create_position_state_snapshot(&position);
785            }
786            position
787        };
788
789        let ts_init = self.clock.borrow().timestamp_ns();
790        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
791        let topic = switchboard::get_event_positions_topic(event.strategy_id);
792        msgbus::publish(topic, &event);
793
794        Ok(position)
795    }
796
797    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
798        position.apply(&fill);
799
800        if let Err(e) = self.cache.borrow_mut().update_position(position) {
801            log::error!("Failed to update position: {e:?}");
802            return;
803        }
804
805        if self.config.snapshot_positions {
806            self.create_position_state_snapshot(position);
807        }
808
809        let topic = switchboard::get_event_positions_topic(position.strategy_id);
810        let ts_init = self.clock.borrow().timestamp_ns();
811
812        if position.is_closed() {
813            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
814            msgbus::publish(topic, &event);
815        } else {
816            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
817            msgbus::publish(topic, &event);
818        }
819    }
820
821    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
822        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
823    }
824
825    fn flip_position(
826        &mut self,
827        instrument: InstrumentAny,
828        position: &mut Position,
829        fill: OrderFilled,
830        oms_type: OmsType,
831    ) {
832        let difference = match position.side {
833            PositionSide::Long => Quantity::from_raw(
834                fill.last_qty.raw - position.quantity.raw,
835                position.size_precision,
836            ),
837            PositionSide::Short => Quantity::from_raw(
838                position.quantity.raw - fill.last_qty.raw,
839                position.size_precision,
840            ),
841            _ => fill.last_qty,
842        };
843
844        // Split commission between two positions
845        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
846        let (commission1, commission2) = if let Some(commission) = fill.commission {
847            let commission_currency = commission.currency;
848            let commission1 = Money::new(commission * fill_percent, commission_currency);
849            let commission2 = commission - commission1;
850            (Some(commission1), Some(commission2))
851        } else {
852            log::error!("Commission is not available.");
853            (None, None)
854        };
855
856        let mut fill_split1: Option<OrderFilled> = None;
857        if position.is_open() {
858            fill_split1 = Some(OrderFilled::new(
859                fill.trader_id,
860                fill.strategy_id,
861                fill.instrument_id,
862                fill.client_order_id,
863                fill.venue_order_id,
864                fill.account_id,
865                fill.trade_id,
866                fill.order_side,
867                fill.order_type,
868                position.quantity,
869                fill.last_px,
870                fill.currency,
871                fill.liquidity_side,
872                UUID4::new(),
873                fill.ts_event,
874                fill.ts_init,
875                fill.reconciliation,
876                fill.position_id,
877                commission1,
878            ));
879
880            self.update_position(position, fill_split1.unwrap());
881        }
882
883        // Guard against flipping a position with a zero fill size
884        if difference.raw == 0 {
885            log::warn!(
886                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
887            );
888            return;
889        }
890
891        let position_id_flip = if oms_type == OmsType::Hedging {
892            if let Some(position_id) = fill.position_id {
893                if position_id.is_virtual() {
894                    // Generate new position ID for flipped virtual position
895                    Some(self.pos_id_generator.generate(fill.strategy_id, true))
896                } else {
897                    Some(position_id)
898                }
899            } else {
900                None
901            }
902        } else {
903            fill.position_id
904        };
905
906        let fill_split2 = OrderFilled::new(
907            fill.trader_id,
908            fill.strategy_id,
909            fill.instrument_id,
910            fill.client_order_id,
911            fill.venue_order_id,
912            fill.account_id,
913            fill.trade_id,
914            fill.order_side,
915            fill.order_type,
916            difference,
917            fill.last_px,
918            fill.currency,
919            fill.liquidity_side,
920            UUID4::new(),
921            fill.ts_event,
922            fill.ts_init,
923            fill.reconciliation,
924            position_id_flip,
925            commission2,
926        );
927
928        if oms_type == OmsType::Hedging {
929            if let Some(position_id) = fill.position_id {
930                if position_id.is_virtual() {
931                    log::warn!("Closing position {fill_split1:?}");
932                    log::warn!("Flipping position {fill_split2:?}");
933                }
934            }
935        }
936
937        // Open flipped position
938        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
939            log::error!("Failed to open flipped position: {e:?}");
940        }
941    }
942
943    // -- INTERNAL --------------------------------------------------------------------------------
944
945    fn set_position_id_counts(&mut self) {
946        // For the internal position ID generator
947        let cache = self.cache.borrow();
948        let positions = cache.positions(None, None, None, None);
949
950        // Count positions per instrument_id using a HashMap
951        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
952
953        for position in positions {
954            *counts.entry(position.strategy_id).or_insert(0) += 1;
955        }
956
957        self.pos_id_generator.reset();
958
959        for (strategy_id, count) in counts {
960            self.pos_id_generator.set_count(count, strategy_id);
961            log::info!("Set PositionId count for {strategy_id} to {count}");
962        }
963    }
964
965    fn last_px_for_conversion(
966        &self,
967        instrument_id: &InstrumentId,
968        side: OrderSide,
969    ) -> Option<Price> {
970        let cache = self.cache.borrow();
971
972        // Try to get last trade price
973        if let Some(trade) = cache.trade(instrument_id) {
974            return Some(trade.price);
975        }
976
977        // Fall back to quote if available
978        if let Some(quote) = cache.quote(instrument_id) {
979            match side {
980                OrderSide::Buy => Some(quote.ask_price),
981                OrderSide::Sell => Some(quote.bid_price),
982                OrderSide::NoOrderSide => None,
983            }
984        } else {
985            None
986        }
987    }
988
989    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
990        log::info!(
991            "Setting {} order quote quantity {} to base quantity {}",
992            order.instrument_id(),
993            order.quantity(),
994            base_qty
995        );
996
997        let original_qty = order.quantity();
998        order.set_quantity(base_qty);
999        order.set_leaves_qty(base_qty);
1000        order.set_is_quote_quantity(false);
1001
1002        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1003            return;
1004        }
1005
1006        if let Some(linked_order_ids) = order.linked_order_ids() {
1007            for client_order_id in linked_order_ids {
1008                match self.cache.borrow_mut().mut_order(client_order_id) {
1009                    Some(contingent_order) => {
1010                        if !contingent_order.is_quote_quantity() {
1011                            continue; // Already base quantity
1012                        }
1013
1014                        if contingent_order.quantity() != original_qty {
1015                            log::warn!(
1016                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1017                                contingent_order.quantity(),
1018                                original_qty,
1019                                base_qty
1020                            );
1021                        }
1022
1023                        log::info!(
1024                            "Setting {} order quote quantity {} to base quantity {}",
1025                            contingent_order.instrument_id(),
1026                            contingent_order.quantity(),
1027                            base_qty
1028                        );
1029
1030                        contingent_order.set_quantity(base_qty);
1031                        contingent_order.set_leaves_qty(base_qty);
1032                        contingent_order.set_is_quote_quantity(false);
1033                    }
1034                    None => {
1035                        log::error!("Contingency order {client_order_id} not found");
1036                    }
1037                }
1038            }
1039        } else {
1040            log::warn!(
1041                "No linked order IDs found for order {}",
1042                order.client_order_id()
1043            );
1044        }
1045    }
1046
1047    fn deny_order(&self, order: &OrderAny, reason: &str) {
1048        log::error!(
1049            "Order denied: {reason}, order ID: {}",
1050            order.client_order_id()
1051        );
1052
1053        let denied = OrderDenied::new(
1054            order.trader_id(),
1055            order.strategy_id(),
1056            order.instrument_id(),
1057            order.client_order_id(),
1058            reason.into(),
1059            UUID4::new(),
1060            self.clock.borrow().timestamp_ns(),
1061            self.clock.borrow().timestamp_ns(),
1062        );
1063
1064        let mut order = order.clone();
1065
1066        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1067            log::error!("Failed to apply denied event to order: {e}");
1068            return;
1069        }
1070
1071        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1072            log::error!("Failed to update order in cache: {e}");
1073            return;
1074        }
1075
1076        let topic = switchboard::get_event_orders_topic(order.strategy_id());
1077        msgbus::publish(topic, &denied);
1078
1079        if self.config.snapshot_orders {
1080            self.create_order_state_snapshot(&order);
1081        }
1082    }
1083
1084    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1085        let mut cache = self.cache.borrow_mut();
1086        if cache.own_order_book_mut(instrument_id).is_none() {
1087            let own_book = OwnOrderBook::new(*instrument_id);
1088            cache.add_own_order_book(own_book).unwrap();
1089        }
1090
1091        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1092    }
1093}
1094
1095////////////////////////////////////////////////////////////////////////////////
1096// Tests
1097////////////////////////////////////////////////////////////////////////////////
1098#[cfg(test)]
1099mod tests {
1100    use std::{cell::RefCell, rc::Rc};
1101
1102    use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1103    use rstest::fixture;
1104
1105    use super::*;
1106
1107    #[fixture]
1108    fn msgbus() -> MessageBus {
1109        MessageBus::default()
1110    }
1111
1112    #[fixture]
1113    fn simple_cache() -> Cache {
1114        Cache::new(None, None)
1115    }
1116
1117    #[fixture]
1118    fn clock() -> TestClock {
1119        TestClock::new()
1120    }
1121
1122    // Helpers
1123    fn _get_exec_engine(
1124        cache: Rc<RefCell<Cache>>,
1125        clock: Rc<RefCell<TestClock>>,
1126        config: Option<ExecutionEngineConfig>,
1127    ) -> ExecutionEngine {
1128        ExecutionEngine::new(clock, cache, config)
1129    }
1130
1131    // TODO: After Implementing ExecutionClient & Strategy
1132}