nautilus_execution/order_manager/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    logging::{CMD, EVT, SEND},
26    messages::execution::{SubmitOrder, TradingCommand},
27    msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31    enums::{ContingencyType, TriggerType},
32    events::{
33        OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34    },
35    identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36    orders::{Order, OrderAny},
37    types::Quantity,
38};
39
40pub struct OrderManager {
41    clock: Rc<RefCell<dyn Clock>>,
42    cache: Rc<RefCell<Cache>>,
43    active_local: bool,
44    // submit_order_handler: Option<SubmitOrderHandlerAny>,
45    // cancel_order_handler: Option<CancelOrderHandlerAny>,
46    // modify_order_handler: Option<ModifyOrderHandlerAny>,
47    submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
48}
49
50impl Debug for OrderManager {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct(stringify!(OrderManager))
53            .field("pending_commands", &self.submit_order_commands.len())
54            .finish()
55    }
56}
57
58impl OrderManager {
59    pub fn new(
60        clock: Rc<RefCell<dyn Clock>>,
61        cache: Rc<RefCell<Cache>>,
62        active_local: bool,
63        // submit_order_handler: Option<SubmitOrderHandlerAny>,
64        // cancel_order_handler: Option<CancelOrderHandlerAny>,
65        // modify_order_handler: Option<ModifyOrderHandlerAny>,
66    ) -> Self {
67        Self {
68            clock,
69            cache,
70            active_local,
71            // submit_order_handler,
72            // cancel_order_handler,
73            // modify_order_handler,
74            submit_order_commands: HashMap::new(),
75        }
76    }
77
78    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
79    //     self.submit_order_handler = Some(handler);
80    // }
81    //
82    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
83    //     self.cancel_order_handler = Some(handler);
84    // }
85    //
86    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
87    //     self.modify_order_handler = Some(handler);
88    // }
89
90    #[must_use]
91    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
92        self.submit_order_commands.clone()
93    }
94
95    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
96        self.submit_order_commands
97            .insert(command.order.client_order_id(), command);
98    }
99
100    pub fn pop_submit_order_command(
101        &mut self,
102        client_order_id: ClientOrderId,
103    ) -> Option<SubmitOrder> {
104        self.submit_order_commands.remove(&client_order_id)
105    }
106
107    pub fn reset(&mut self) {
108        self.submit_order_commands.clear();
109    }
110
111    pub fn cancel_order(&mut self, order: &OrderAny) {
112        if self
113            .cache
114            .borrow()
115            .is_order_pending_cancel_local(&order.client_order_id())
116        {
117            return;
118        }
119
120        if order.is_closed() {
121            log::warn!("Cannot cancel order: already closed");
122            return;
123        }
124
125        self.submit_order_commands.remove(&order.client_order_id());
126
127        // if let Some(handler) = &self.cancel_order_handler {
128        //     handler.handle_cancel_order(order);
129        // }
130    }
131
132    pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
133        // if let Some(handler) = &self.modify_order_handler {
134        //     handler.handle_modify_order(order, new_quantity);
135        // }
136    }
137
138    /// # Errors
139    ///
140    /// Returns an error if creating a new submit order fails.
141    pub fn create_new_submit_order(
142        &mut self,
143        order: &OrderAny,
144        position_id: Option<PositionId>,
145        client_id: Option<ClientId>,
146    ) -> anyhow::Result<()> {
147        let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
148        let venue_order_id = order
149            .venue_order_id()
150            .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
151
152        let submit = SubmitOrder::new(
153            order.trader_id(),
154            client_id,
155            order.strategy_id(),
156            order.instrument_id(),
157            order.client_order_id(),
158            venue_order_id,
159            order.clone(),
160            order.exec_algorithm_id(),
161            position_id,
162            UUID4::new(),
163            self.clock.borrow().timestamp_ns(),
164        )?;
165
166        if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
167            self.cache_submit_order_command(submit.clone());
168
169            match order.exec_algorithm_id() {
170                Some(exec_algorithm_id) => {
171                    self.send_algo_command(submit, exec_algorithm_id);
172                }
173                None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
174            }
175        } // else if let Some(handler) = &self.submit_order_handler {
176        //     handler.handle_submit_order(submit);
177        // }
178
179        Ok(())
180    }
181
182    #[must_use]
183    pub fn should_manage_order(&self, order: &OrderAny) -> bool {
184        self.active_local && order.is_active_local()
185    }
186
187    // Event Handlers
188    pub fn handle_event(&mut self, event: OrderEventAny) {
189        match event {
190            OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
191            OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
192            OrderEventAny::Expired(event) => self.handle_order_expired(event),
193            OrderEventAny::Updated(event) => self.handle_order_updated(event),
194            OrderEventAny::Filled(event) => self.handle_order_filled(event),
195            _ => self.handle_position_event(event),
196        }
197    }
198
199    pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
200        let cloned_order = self
201            .cache
202            .borrow()
203            .order(&rejected.client_order_id)
204            .cloned();
205        if let Some(order) = cloned_order {
206            if order.contingency_type() != Some(ContingencyType::NoContingency) {
207                self.handle_contingencies(order);
208            }
209        } else {
210            log::error!(
211                "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
212                rejected.client_order_id,
213                rejected
214            );
215        }
216    }
217
218    pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
219        let cloned_order = self
220            .cache
221            .borrow()
222            .order(&canceled.client_order_id)
223            .cloned();
224        if let Some(order) = cloned_order {
225            if order.contingency_type() != Some(ContingencyType::NoContingency) {
226                self.handle_contingencies(order);
227            }
228        } else {
229            log::error!(
230                "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
231                canceled.client_order_id,
232                canceled
233            );
234        }
235    }
236
237    pub fn handle_order_expired(&mut self, expired: OrderExpired) {
238        let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
239        if let Some(order) = cloned_order {
240            if order.contingency_type() != Some(ContingencyType::NoContingency) {
241                self.handle_contingencies(order);
242            }
243        } else {
244            log::error!(
245                "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
246                expired.client_order_id,
247                expired
248            );
249        }
250    }
251
252    pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
253        let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
254        if let Some(order) = cloned_order {
255            if order.contingency_type() != Some(ContingencyType::NoContingency) {
256                self.handle_contingencies_update(order);
257            }
258        } else {
259            log::error!(
260                "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
261                updated.client_order_id,
262                updated
263            );
264        }
265    }
266
267    /// # Panics
268    ///
269    /// Panics if the OTO child order cannot be found for the given client order ID.
270    pub fn handle_order_filled(&mut self, filled: OrderFilled) {
271        let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
272        {
273            order
274        } else {
275            log::error!(
276                "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
277                filled.client_order_id,
278                filled
279            );
280            return;
281        };
282
283        match order.contingency_type() {
284            Some(ContingencyType::Oto) => {
285                let position_id = self
286                    .cache
287                    .borrow()
288                    .position_id(&order.client_order_id())
289                    .copied();
290                let client_id = self
291                    .cache
292                    .borrow()
293                    .client_id(&order.client_order_id())
294                    .copied();
295
296                let parent_filled_qty = match order.exec_spawn_id() {
297                    Some(spawn_id) => {
298                        if let Some(qty) = self
299                            .cache
300                            .borrow()
301                            .exec_spawn_total_filled_qty(&spawn_id, true)
302                        {
303                            qty
304                        } else {
305                            log::error!("Failed to get spawn filled quantity for {spawn_id}");
306                            return;
307                        }
308                    }
309                    None => order.filled_qty(),
310                };
311
312                let linked_orders = if let Some(orders) = order.linked_order_ids() {
313                    orders
314                } else {
315                    log::error!("No linked orders found for OTO order");
316                    return;
317                };
318
319                for client_order_id in linked_orders {
320                    let mut child_order =
321                        if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
322                            order
323                        } else {
324                            panic!(
325                                "Cannot find OTO child order for client_order_id: {client_order_id}"
326                            );
327                        };
328
329                    if !self.should_manage_order(&child_order) {
330                        continue;
331                    }
332
333                    if child_order.position_id().is_none() {
334                        child_order.set_position_id(position_id);
335                    }
336
337                    if parent_filled_qty != child_order.leaves_qty() {
338                        self.modify_order_quantity(&mut child_order, parent_filled_qty);
339                    }
340
341                    // if self.submit_order_handler.is_none() {
342                    //     return;
343                    // }
344
345                    if !self
346                        .submit_order_commands
347                        .contains_key(&child_order.client_order_id())
348                    {
349                        if let Err(e) =
350                            self.create_new_submit_order(&child_order, position_id, client_id)
351                        {
352                            log::error!("Failed to create new submit order: {e}");
353                        }
354                    }
355                }
356            }
357            Some(ContingencyType::Oco) => {
358                let linked_orders = if let Some(orders) = order.linked_order_ids() {
359                    orders
360                } else {
361                    log::error!("No linked orders found for OCO order");
362                    return;
363                };
364
365                for client_order_id in linked_orders {
366                    let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
367                    {
368                        Some(contingent_order) => contingent_order,
369                        None => {
370                            panic!(
371                                "Cannot find OCO contingent order for client_order_id: {client_order_id}"
372                            );
373                        }
374                    };
375
376                    // Not being managed || Already completed
377                    if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
378                    {
379                        continue;
380                    }
381                    if contingent_order.client_order_id() != order.client_order_id() {
382                        self.cancel_order(&contingent_order);
383                    }
384                }
385            }
386            Some(ContingencyType::Ouo) => self.handle_contingencies(order),
387            _ => {}
388        }
389    }
390
391    /// # Panics
392    ///
393    /// Panics if a contingent order cannot be found for the given client order ID.
394    pub fn handle_contingencies(&mut self, order: OrderAny) {
395        let (filled_qty, leaves_qty, is_spawn_active) =
396            if let Some(exec_spawn_id) = order.exec_spawn_id() {
397                if let (Some(filled), Some(leaves)) = (
398                    self.cache
399                        .borrow()
400                        .exec_spawn_total_filled_qty(&exec_spawn_id, true),
401                    self.cache
402                        .borrow()
403                        .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
404                ) {
405                    (filled, leaves, leaves.raw > 0)
406                } else {
407                    log::error!("Failed to get spawn quantities for {exec_spawn_id}");
408                    return;
409                }
410            } else {
411                (order.filled_qty(), order.leaves_qty(), false)
412            };
413
414        let linked_orders = if let Some(orders) = order.linked_order_ids() {
415            orders
416        } else {
417            log::error!("No linked orders found");
418            return;
419        };
420
421        for client_order_id in linked_orders {
422            let mut contingent_order =
423                if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
424                    order
425                } else {
426                    panic!("Cannot find contingent order for client_order_id: {client_order_id}");
427                };
428
429            if !self.should_manage_order(&contingent_order)
430                || client_order_id == &order.client_order_id()
431            {
432                continue;
433            }
434
435            if contingent_order.is_closed() {
436                self.submit_order_commands.remove(&order.client_order_id());
437                continue;
438            }
439
440            match order.contingency_type() {
441                Some(ContingencyType::Oto) => {
442                    if order.is_closed()
443                        && filled_qty.raw == 0
444                        && (order.exec_spawn_id().is_none() || !is_spawn_active)
445                    {
446                        self.cancel_order(&contingent_order);
447                    } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
448                        self.modify_order_quantity(&mut contingent_order, filled_qty);
449                    }
450                }
451                Some(ContingencyType::Oco) => {
452                    if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
453                        self.cancel_order(&contingent_order);
454                    }
455                }
456                Some(ContingencyType::Ouo) => {
457                    if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
458                        || (order.is_closed()
459                            && (order.exec_spawn_id().is_none() || !is_spawn_active))
460                    {
461                        self.cancel_order(&contingent_order);
462                    } else if leaves_qty != contingent_order.leaves_qty() {
463                        self.modify_order_quantity(&mut contingent_order, leaves_qty);
464                    }
465                }
466                _ => {}
467            }
468        }
469    }
470
471    /// # Panics
472    ///
473    /// Panics if an OCO contingent order cannot be found for the given client order ID.
474    pub fn handle_contingencies_update(&mut self, order: OrderAny) {
475        let quantity = match order.exec_spawn_id() {
476            Some(exec_spawn_id) => {
477                if let Some(qty) = self
478                    .cache
479                    .borrow()
480                    .exec_spawn_total_quantity(&exec_spawn_id, true)
481                {
482                    qty
483                } else {
484                    log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
485                    return;
486                }
487            }
488            None => order.quantity(),
489        };
490
491        if quantity.raw == 0 {
492            return;
493        }
494
495        let linked_orders = if let Some(orders) = order.linked_order_ids() {
496            orders
497        } else {
498            log::error!("No linked orders found for contingent order");
499            return;
500        };
501
502        for client_order_id in linked_orders {
503            let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
504                Some(contingent_order) => contingent_order,
505                None => panic!(
506                    "Cannot find OCO contingent order for client_order_id: {client_order_id}"
507                ),
508            };
509
510            if !self.should_manage_order(&contingent_order)
511                || client_order_id == &order.client_order_id()
512                || contingent_order.is_closed()
513            {
514                continue;
515            }
516
517            if let Some(contingency_type) = order.contingency_type() {
518                if matches!(
519                    contingency_type,
520                    ContingencyType::Oto | ContingencyType::Oco
521                ) && quantity != contingent_order.quantity()
522                {
523                    self.modify_order_quantity(&mut contingent_order, quantity);
524                }
525            }
526        }
527    }
528
529    pub fn handle_position_event(&mut self, _event: OrderEventAny) {
530        todo!()
531    }
532
533    // Message sending methods
534    pub fn send_emulator_command(&self, command: TradingCommand) {
535        log::info!("{CMD}{SEND} {command}");
536
537        msgbus::send("OrderEmulator.execute".into(), &command);
538    }
539
540    pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
541        log::info!("{CMD}{SEND} {command}");
542
543        let endpoint = format!("{exec_algorithm_id}.execute");
544        msgbus::send(endpoint.into(), &TradingCommand::SubmitOrder(command));
545    }
546
547    pub fn send_risk_command(&self, command: TradingCommand) {
548        log::info!("{CMD}{SEND} {command}");
549        msgbus::send("RiskEngine.execute".into(), &command);
550    }
551
552    pub fn send_exec_command(&self, command: TradingCommand) {
553        log::info!("{CMD}{SEND} {command}");
554        msgbus::send("ExecEngine.execute".into(), &command);
555    }
556
557    pub fn send_risk_event(&self, event: OrderEventAny) {
558        log::info!("{EVT}{SEND} {event}");
559        msgbus::send("RiskEngine.process".into(), &event);
560    }
561
562    pub fn send_exec_event(&self, event: OrderEventAny) {
563        log::info!("{EVT}{SEND} {event}");
564        msgbus::send("ExecEngine.process".into(), &event);
565    }
566}