nautilus_risk/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Risk management engine implementation.
17
18pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
24
25use config::RiskEngineConfig;
26use nautilus_common::{
27    cache::Cache,
28    clock::Clock,
29    logging::{CMD, EVT, RECV},
30    messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
31    msgbus,
32    throttler::Throttler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36    accounts::{Account, AccountAny},
37    enums::{InstrumentClass, OrderSide, OrderStatus, TimeInForce, TradingState},
38    events::{OrderDenied, OrderEventAny, OrderModifyRejected},
39    identifiers::InstrumentId,
40    instruments::{Instrument, InstrumentAny},
41    orders::{Order, OrderAny, OrderList},
42    types::{Currency, Money, Price, Quantity},
43};
44use nautilus_portfolio::Portfolio;
45use rust_decimal::{Decimal, prelude::ToPrimitive};
46use ustr::Ustr;
47
48type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
49type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
50
51/// Central risk management engine that validates and controls trading operations.
52///
53/// The `RiskEngine` provides comprehensive pre-trade risk checks including order validation,
54/// balance verification, position sizing limits, and trading state management. It acts as
55/// a gateway between strategy orders and execution, ensuring all trades comply with
56/// defined risk parameters and regulatory constraints.
57#[allow(dead_code)]
58pub struct RiskEngine {
59    clock: Rc<RefCell<dyn Clock>>,
60    cache: Rc<RefCell<Cache>>,
61    portfolio: Portfolio,
62    pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
63    pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
64    max_notional_per_order: HashMap<InstrumentId, Decimal>,
65    trading_state: TradingState,
66    config: RiskEngineConfig,
67}
68
69impl Debug for RiskEngine {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct(stringify!(RiskEngine)).finish()
72    }
73}
74
75impl RiskEngine {
76    /// Creates a new [`RiskEngine`] instance.
77    pub fn new(
78        config: RiskEngineConfig,
79        portfolio: Portfolio,
80        clock: Rc<RefCell<dyn Clock>>,
81        cache: Rc<RefCell<Cache>>,
82    ) -> Self {
83        let throttled_submit_order =
84            Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
85
86        let throttled_modify_order =
87            Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
88
89        Self {
90            clock,
91            cache,
92            portfolio,
93            throttled_submit_order,
94            throttled_modify_order,
95            max_notional_per_order: HashMap::new(),
96            trading_state: TradingState::Active,
97            config,
98        }
99    }
100
101    fn create_submit_order_throttler(
102        config: &RiskEngineConfig,
103        clock: Rc<RefCell<dyn Clock>>,
104        cache: Rc<RefCell<Cache>>,
105    ) -> Throttler<SubmitOrder, SubmitOrderFn> {
106        let success_handler = {
107            Box::new(move |submit_order: SubmitOrder| {
108                msgbus::send_any(
109                    "ExecEngine.execute".into(),
110                    &TradingCommand::SubmitOrder(submit_order),
111                );
112            }) as Box<dyn Fn(SubmitOrder)>
113        };
114
115        let failure_handler = {
116            let cache = cache;
117            let clock = clock.clone();
118            Box::new(move |submit_order: SubmitOrder| {
119                let reason = "REJECTED BY THROTTLER";
120                log::warn!(
121                    "SubmitOrder for {} DENIED: {}",
122                    submit_order.client_order_id,
123                    reason
124                );
125
126                Self::handle_submit_order_cache(&cache, &submit_order);
127
128                let denied = Self::create_order_denied(&submit_order, reason, &clock);
129
130                msgbus::send_any("ExecEngine.process".into(), &denied);
131            }) as Box<dyn Fn(SubmitOrder)>
132        };
133
134        Throttler::new(
135            config.max_order_submit.limit,
136            config.max_order_submit.interval_ns,
137            clock,
138            "ORDER_SUBMIT_THROTTLER".to_string(),
139            success_handler,
140            Some(failure_handler),
141            Ustr::from(&UUID4::new().to_string()),
142        )
143    }
144
145    fn create_modify_order_throttler(
146        config: &RiskEngineConfig,
147        clock: Rc<RefCell<dyn Clock>>,
148        cache: Rc<RefCell<Cache>>,
149    ) -> Throttler<ModifyOrder, ModifyOrderFn> {
150        let success_handler = {
151            Box::new(move |order: ModifyOrder| {
152                msgbus::send_any(
153                    "ExecEngine.execute".into(),
154                    &TradingCommand::ModifyOrder(order),
155                );
156            }) as Box<dyn Fn(ModifyOrder)>
157        };
158
159        let failure_handler = {
160            let cache = cache;
161            let clock = clock.clone();
162            Box::new(move |order: ModifyOrder| {
163                let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
164                log::warn!(
165                    "SubmitOrder for {} DENIED: {}",
166                    order.client_order_id,
167                    reason
168                );
169
170                let order = match Self::get_existing_order(&cache, &order) {
171                    Some(order) => order,
172                    None => return,
173                };
174
175                let rejected = Self::create_modify_rejected(&order, reason, &clock);
176
177                msgbus::send_any("ExecEngine.process".into(), &rejected);
178            }) as Box<dyn Fn(ModifyOrder)>
179        };
180
181        Throttler::new(
182            config.max_order_modify.limit,
183            config.max_order_modify.interval_ns,
184            clock,
185            "ORDER_MODIFY_THROTTLER".to_string(),
186            success_handler,
187            Some(failure_handler),
188            Ustr::from(&UUID4::new().to_string()),
189        )
190    }
191
192    fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
193        let mut cache = cache.borrow_mut();
194        if !cache.order_exists(&submit_order.client_order_id) {
195            cache
196                .add_order(submit_order.order.clone(), None, None, false)
197                .map_err(|e| {
198                    log::error!("Cannot add order to cache: {e}");
199                })
200                .unwrap();
201        }
202    }
203
204    fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
205        let cache = cache.borrow();
206        if let Some(order) = cache.order(&order.client_order_id) {
207            Some(order.clone())
208        } else {
209            log::error!(
210                "Order with command.client_order_id: {} not found",
211                order.client_order_id
212            );
213            None
214        }
215    }
216
217    fn create_order_denied(
218        submit_order: &SubmitOrder,
219        reason: &str,
220        clock: &Rc<RefCell<dyn Clock>>,
221    ) -> OrderEventAny {
222        let timestamp = clock.borrow().timestamp_ns();
223        OrderEventAny::Denied(OrderDenied::new(
224            submit_order.trader_id,
225            submit_order.strategy_id,
226            submit_order.instrument_id,
227            submit_order.client_order_id,
228            reason.into(),
229            UUID4::new(),
230            timestamp,
231            timestamp,
232        ))
233    }
234
235    fn create_modify_rejected(
236        order: &OrderAny,
237        reason: &str,
238        clock: &Rc<RefCell<dyn Clock>>,
239    ) -> OrderEventAny {
240        let timestamp = clock.borrow().timestamp_ns();
241        OrderEventAny::ModifyRejected(OrderModifyRejected::new(
242            order.trader_id(),
243            order.strategy_id(),
244            order.instrument_id(),
245            order.client_order_id(),
246            reason.into(),
247            UUID4::new(),
248            timestamp,
249            timestamp,
250            false,
251            order.venue_order_id(),
252            None,
253        ))
254    }
255
256    // -- COMMANDS --------------------------------------------------------------------------------
257
258    /// Executes a trading command through the risk management pipeline.
259    pub fn execute(&mut self, command: TradingCommand) {
260        // This will extend to other commands such as `RiskCommand`
261        self.handle_command(command);
262    }
263
264    /// Processes an order event for risk monitoring and state updates.
265    pub fn process(&mut self, event: OrderEventAny) {
266        // This will extend to other events such as `RiskEvent`
267        self.handle_event(event);
268    }
269
270    /// Sets the trading state for risk control enforcement.
271    pub fn set_trading_state(&mut self, state: TradingState) {
272        if state == self.trading_state {
273            log::warn!("No change to trading state: already set to {state:?}");
274            return;
275        }
276
277        self.trading_state = state;
278
279        let _ts_now = self.clock.borrow().timestamp_ns();
280
281        // TODO: Create a new Event "TradingStateChanged" in OrderEventAny enum.
282        // let event = OrderEventAny::TradingStateChanged(TradingStateChanged::new(..,self.trading_state,..));
283
284        msgbus::publish("events.risk".into(), &"message"); // TODO: Send the new Event here
285
286        log::info!("Trading state set to {state:?}");
287    }
288
289    /// Sets the maximum notional value per order for the specified instrument.
290    pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
291        self.max_notional_per_order.insert(instrument_id, new_value);
292
293        let new_value_str = new_value.to_string();
294        log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
295    }
296
297    // -- COMMAND HANDLERS ------------------------------------------------------------------------
298
299    // Renamed from `execute_command`
300    fn handle_command(&mut self, command: TradingCommand) {
301        if self.config.debug {
302            log::debug!("{CMD}{RECV} {command:?}");
303        }
304
305        match command {
306            TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
307            TradingCommand::SubmitOrderList(submit_order_list) => {
308                self.handle_submit_order_list(submit_order_list);
309            }
310            TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
311            _ => {
312                log::error!("Cannot handle command: {command}");
313            }
314        }
315    }
316
317    fn handle_submit_order(&self, command: SubmitOrder) {
318        if self.config.bypass {
319            self.send_to_execution(TradingCommand::SubmitOrder(command));
320            return;
321        }
322
323        let order = &command.order;
324        if let Some(position_id) = command.position_id {
325            if order.is_reduce_only() {
326                let position_exists = {
327                    let cache = self.cache.borrow();
328                    cache
329                        .position(&position_id)
330                        .map(|pos| (pos.side, pos.quantity))
331                };
332
333                if let Some((pos_side, pos_quantity)) = position_exists {
334                    if !order.would_reduce_only(pos_side, pos_quantity) {
335                        self.deny_command(
336                            TradingCommand::SubmitOrder(command),
337                            &format!("Reduce only order would increase position {position_id}"),
338                        );
339                        return; // Denied
340                    }
341                } else {
342                    self.deny_command(
343                        TradingCommand::SubmitOrder(command),
344                        &format!("Position {position_id} not found for reduce-only order"),
345                    );
346                    return;
347                }
348            }
349        }
350
351        let instrument_exists = {
352            let cache = self.cache.borrow();
353            cache.instrument(&order.instrument_id()).cloned()
354        };
355
356        let instrument = if let Some(instrument) = instrument_exists {
357            instrument
358        } else {
359            self.deny_command(
360                TradingCommand::SubmitOrder(command.clone()),
361                &format!("Instrument for {} not found", command.instrument_id),
362            );
363            return; // Denied
364        };
365
366        ////////////////////////////////////////////////////////////////////////////////
367        // PRE-TRADE ORDER(S) CHECKS
368        ////////////////////////////////////////////////////////////////////////////////
369        if !self.check_order(instrument.clone(), order.clone()) {
370            return; // Denied
371        }
372
373        if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
374            return; // Denied
375        }
376
377        self.execution_gateway(instrument, TradingCommand::SubmitOrder(command.clone()));
378    }
379
380    fn handle_submit_order_list(&self, command: SubmitOrderList) {
381        if self.config.bypass {
382            self.send_to_execution(TradingCommand::SubmitOrderList(command));
383            return;
384        }
385
386        let instrument_exists = {
387            let cache = self.cache.borrow();
388            cache.instrument(&command.instrument_id).cloned()
389        };
390
391        let instrument = if let Some(instrument) = instrument_exists {
392            instrument
393        } else {
394            self.deny_command(
395                TradingCommand::SubmitOrderList(command.clone()),
396                &format!("no instrument found for {}", command.instrument_id),
397            );
398            return; // Denied
399        };
400
401        ////////////////////////////////////////////////////////////////////////////////
402        // PRE-TRADE ORDER(S) CHECKS
403        ////////////////////////////////////////////////////////////////////////////////
404        for order in command.order_list.orders.clone() {
405            if !self.check_order(instrument.clone(), order) {
406                return; // Denied
407            }
408        }
409
410        if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
411            self.deny_order_list(
412                command.order_list.clone(),
413                &format!("OrderList {} DENIED", command.order_list.id),
414            );
415            return; // Denied
416        }
417
418        self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
419    }
420
421    fn handle_modify_order(&self, command: ModifyOrder) {
422        ////////////////////////////////////////////////////////////////////////////////
423        // VALIDATE COMMAND
424        ////////////////////////////////////////////////////////////////////////////////
425        let order_exists = {
426            let cache = self.cache.borrow();
427            cache.order(&command.client_order_id).cloned()
428        };
429
430        let order = if let Some(order) = order_exists {
431            order
432        } else {
433            log::error!(
434                "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
435                command.client_order_id
436            );
437            return;
438        };
439
440        if order.is_closed() {
441            self.reject_modify_order(
442                order,
443                &format!(
444                    "Order with command.client_order_id: {} already closed",
445                    command.client_order_id
446                ),
447            );
448            return;
449        } else if order.status() == OrderStatus::PendingCancel {
450            self.reject_modify_order(
451                order,
452                &format!(
453                    "Order with command.client_order_id: {} is already pending cancel",
454                    command.client_order_id
455                ),
456            );
457            return;
458        }
459
460        // Get instrument for orders
461        let maybe_instrument = {
462            let cache = self.cache.borrow();
463            cache.instrument(&command.instrument_id).cloned()
464        };
465
466        let instrument = if let Some(instrument) = maybe_instrument {
467            instrument
468        } else {
469            self.reject_modify_order(
470                order,
471                &format!("no instrument found for {}", command.instrument_id),
472            );
473            return; // Denied
474        };
475
476        // Check Price
477        let mut risk_msg = self.check_price(&instrument, command.price);
478        if let Some(risk_msg) = risk_msg {
479            self.reject_modify_order(order, &risk_msg);
480            return; // Denied
481        }
482
483        // Check Trigger
484        risk_msg = self.check_price(&instrument, command.trigger_price);
485        if let Some(risk_msg) = risk_msg {
486            self.reject_modify_order(order, &risk_msg);
487            return; // Denied
488        }
489
490        // Check Quantity
491        risk_msg = self.check_quantity(&instrument, command.quantity);
492        if let Some(risk_msg) = risk_msg {
493            self.reject_modify_order(order, &risk_msg);
494            return; // Denied
495        }
496
497        // Check TradingState
498        match self.trading_state {
499            TradingState::Halted => {
500                self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
501            }
502            TradingState::Reducing => {
503                if let Some(quantity) = command.quantity {
504                    if quantity > order.quantity()
505                        && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
506                            || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
507                    {
508                        self.reject_modify_order(
509                            order,
510                            &format!(
511                                "TradingState is REDUCING and update will increase exposure {}",
512                                instrument.id()
513                            ),
514                        );
515                    }
516                }
517            }
518            _ => {}
519        }
520
521        // TODO: Fix message bus usage
522        // self.throttled_modify_order.send(command);
523    }
524
525    // -- PRE-TRADE CHECKS ------------------------------------------------------------------------
526
527    fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
528        ////////////////////////////////////////////////////////////////////////////////
529        // VALIDATION CHECKS
530        ////////////////////////////////////////////////////////////////////////////////
531        if order.time_in_force() == TimeInForce::Gtd {
532            // SAFETY: GTD guarantees an expire time
533            let expire_time = order.expire_time().unwrap();
534            if expire_time <= self.clock.borrow().timestamp_ns() {
535                self.deny_order(
536                    order,
537                    &format!("GTD {} already past", expire_time.to_rfc3339()),
538                );
539                return false; // Denied
540            }
541        }
542
543        if !self.check_order_price(instrument.clone(), order.clone())
544            || !self.check_order_quantity(instrument, order)
545        {
546            return false; // Denied
547        }
548
549        true
550    }
551
552    fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
553        ////////////////////////////////////////////////////////////////////////////////
554        // CHECK PRICE
555        ////////////////////////////////////////////////////////////////////////////////
556        if order.price().is_some() {
557            let risk_msg = self.check_price(&instrument, order.price());
558            if let Some(risk_msg) = risk_msg {
559                self.deny_order(order, &risk_msg);
560                return false; // Denied
561            }
562        }
563
564        ////////////////////////////////////////////////////////////////////////////////
565        // CHECK TRIGGER
566        ////////////////////////////////////////////////////////////////////////////////
567        if order.trigger_price().is_some() {
568            let risk_msg = self.check_price(&instrument, order.trigger_price());
569            if let Some(risk_msg) = risk_msg {
570                self.deny_order(order, &risk_msg);
571                return false; // Denied
572            }
573        }
574
575        true
576    }
577
578    fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
579        let risk_msg = self.check_quantity(&instrument, Some(order.quantity()));
580        if let Some(risk_msg) = risk_msg {
581            self.deny_order(order, &risk_msg);
582            return false; // Denied
583        }
584
585        true
586    }
587
588    fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
589        ////////////////////////////////////////////////////////////////////////////////
590        // CHECK TRIGGER
591        ////////////////////////////////////////////////////////////////////////////////
592        let mut last_px: Option<Price> = None;
593        let mut max_notional: Option<Money> = None;
594
595        // Determine max notional
596        let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
597        if let Some(max_notional_setting_val) = max_notional_setting.copied() {
598            max_notional = Some(Money::new(
599                max_notional_setting_val
600                    .to_f64()
601                    .expect("Invalid decimal conversion"),
602                instrument.quote_currency(),
603            ));
604        }
605
606        // Get account for risk checks
607        let account_exists = {
608            let cache = self.cache.borrow();
609            cache.account_for_venue(&instrument.id().venue).cloned()
610        };
611
612        let account = if let Some(account) = account_exists {
613            account
614        } else {
615            log::debug!("Cannot find account for venue {}", instrument.id().venue);
616            return true; // TODO: Temporary early return until handling routing/multiple venues
617        };
618        let cash_account = match account {
619            AccountAny::Cash(cash_account) => cash_account,
620            AccountAny::Margin(_) => return true, // TODO: Determine risk controls for margin
621        };
622        let free = cash_account.balance_free(Some(instrument.quote_currency()));
623        if self.config.debug {
624            log::debug!("Free cash: {free:?}");
625        }
626
627        let mut cum_notional_buy: Option<Money> = None;
628        let mut cum_notional_sell: Option<Money> = None;
629        let mut base_currency: Option<Currency> = None;
630        for order in &orders {
631            // Determine last price based on order type
632            last_px = match order {
633                OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
634                    if last_px.is_none() {
635                        let cache = self.cache.borrow();
636                        if let Some(last_quote) = cache.quote(&instrument.id()) {
637                            match order.order_side() {
638                                OrderSide::Buy => Some(last_quote.ask_price),
639                                OrderSide::Sell => Some(last_quote.bid_price),
640                                _ => panic!("Invalid order side"),
641                            }
642                        } else {
643                            let cache = self.cache.borrow();
644                            let last_trade = cache.trade(&instrument.id());
645
646                            if let Some(last_trade) = last_trade {
647                                Some(last_trade.price)
648                            } else {
649                                log::warn!(
650                                    "Cannot check MARKET order risk: no prices for {}",
651                                    instrument.id()
652                                );
653                                continue;
654                            }
655                        }
656                    } else {
657                        last_px
658                    }
659                }
660                OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
661                OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
662                    if let Some(trigger_price) = order.trigger_price() {
663                        Some(trigger_price)
664                    } else {
665                        log::warn!(
666                            "Cannot check {} order risk: no trigger price was set", // TODO: Use last_trade += offset
667                            order.order_type()
668                        );
669                        continue;
670                    }
671                }
672                _ => order.price(),
673            };
674
675            let last_px = if let Some(px) = last_px {
676                px
677            } else {
678                log::error!("Cannot check order risk: no price available");
679                continue;
680            };
681
682            let notional =
683                instrument.calculate_notional_value(order.quantity(), last_px, Some(true));
684
685            if self.config.debug {
686                log::debug!("Notional: {notional:?}");
687            }
688
689            // Check MAX notional per order limit
690            if let Some(max_notional_value) = max_notional {
691                if notional > max_notional_value {
692                    self.deny_order(
693                        order.clone(),
694                        &format!(
695                            "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
696                        ),
697                    );
698                    return false; // Denied
699                }
700            }
701
702            // Check MIN notional instrument limit
703            if let Some(min_notional) = instrument.min_notional() {
704                if notional.currency == min_notional.currency && notional < min_notional {
705                    self.deny_order(
706                        order.clone(),
707                        &format!(
708                            "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
709                        ),
710                    );
711                    return false; // Denied
712                }
713            }
714
715            // // Check MAX notional instrument limit
716            if let Some(max_notional) = instrument.max_notional() {
717                if notional.currency == max_notional.currency && notional > max_notional {
718                    self.deny_order(
719                        order.clone(),
720                        &format!(
721                            "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
722                        ),
723                    );
724                    return false; // Denied
725                }
726            }
727
728            // Calculate OrderBalanceImpact (valid for CashAccount only)
729            let notional = instrument.calculate_notional_value(order.quantity(), last_px, None);
730            let order_balance_impact = match order.order_side() {
731                OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
732                OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
733                OrderSide::NoOrderSide => {
734                    panic!("invalid `OrderSide`, was {}", order.order_side());
735                }
736            };
737
738            if self.config.debug {
739                log::debug!("Balance impact: {order_balance_impact}");
740            }
741
742            if let Some(free_val) = free {
743                if (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO {
744                    self.deny_order(
745                        order.clone(),
746                        &format!(
747                            "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
748                        ),
749                    );
750                    return false;
751                }
752            }
753
754            if base_currency.is_none() {
755                base_currency = instrument.base_currency();
756            }
757            if order.is_buy() {
758                match cum_notional_buy.as_mut() {
759                    Some(cum_notional_buy_val) => {
760                        cum_notional_buy_val.raw += -order_balance_impact.raw;
761                    }
762                    None => {
763                        cum_notional_buy = Some(Money::from_raw(
764                            -order_balance_impact.raw,
765                            order_balance_impact.currency,
766                        ));
767                    }
768                }
769
770                if self.config.debug {
771                    log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
772                }
773
774                if let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy) {
775                    if cum_notional_buy > free {
776                        self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
777                        return false; // Denied
778                    }
779                }
780            } else if order.is_sell() {
781                if cash_account.base_currency.is_some() {
782                    match cum_notional_sell.as_mut() {
783                        Some(cum_notional_buy_val) => {
784                            cum_notional_buy_val.raw += order_balance_impact.raw;
785                        }
786                        None => {
787                            cum_notional_sell = Some(Money::from_raw(
788                                order_balance_impact.raw,
789                                order_balance_impact.currency,
790                            ));
791                        }
792                    }
793                    if self.config.debug {
794                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
795                    }
796
797                    if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell) {
798                        if cum_notional_sell > free {
799                            self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
800                            return false; // Denied
801                        }
802                    }
803                }
804                // Account is already of type Cash, so no check
805                else if let Some(base_currency) = base_currency {
806                    let cash_value = Money::from_raw(
807                        order
808                            .quantity()
809                            .raw
810                            .try_into()
811                            .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
812                            .unwrap(),
813                        base_currency,
814                    );
815
816                    if self.config.debug {
817                        log::debug!("Cash value: {cash_value:?}");
818                        log::debug!(
819                            "Total: {:?}",
820                            cash_account.balance_total(Some(base_currency))
821                        );
822                        log::debug!(
823                            "Locked: {:?}",
824                            cash_account.balance_locked(Some(base_currency))
825                        );
826                        log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
827                    }
828
829                    match cum_notional_sell {
830                        Some(mut cum_notional_sell) => {
831                            cum_notional_sell.raw += cash_value.raw;
832                        }
833                        None => cum_notional_sell = Some(cash_value),
834                    }
835
836                    if self.config.debug {
837                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
838                    }
839                    if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell) {
840                        if cum_notional_sell.raw > free.raw {
841                            self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
842                            return false; // Denied
843                        }
844                    }
845                }
846            }
847        }
848
849        // Finally
850        true // Passed
851    }
852
853    fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
854        let price_val = price?;
855
856        if price_val.precision > instrument.price_precision() {
857            return Some(format!(
858                "price {} invalid (precision {} > {})",
859                price_val,
860                price_val.precision,
861                instrument.price_precision()
862            ));
863        }
864
865        if instrument.instrument_class() != InstrumentClass::Option && price_val.raw <= 0 {
866            return Some(format!("price {price_val} invalid (<= 0)"));
867        }
868
869        None
870    }
871
872    fn check_quantity(
873        &self,
874        instrument: &InstrumentAny,
875        quantity: Option<Quantity>,
876    ) -> Option<String> {
877        let quantity_val = quantity?;
878
879        // Check precision
880        if quantity_val.precision > instrument.size_precision() {
881            return Some(format!(
882                "quantity {} invalid (precision {} > {})",
883                quantity_val,
884                quantity_val.precision,
885                instrument.size_precision()
886            ));
887        }
888
889        // Check maximum quantity
890        if let Some(max_quantity) = instrument.max_quantity() {
891            if quantity_val > max_quantity {
892                return Some(format!(
893                    "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
894                ));
895            }
896        }
897
898        // // Check minimum quantity
899        if let Some(min_quantity) = instrument.min_quantity() {
900            if quantity_val < min_quantity {
901                return Some(format!(
902                    "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
903                ));
904            }
905        }
906
907        None
908    }
909
910    // -- DENIALS ---------------------------------------------------------------------------------
911
912    fn deny_command(&self, command: TradingCommand, reason: &str) {
913        match command {
914            TradingCommand::SubmitOrder(submit_order) => {
915                self.deny_order(submit_order.order, reason);
916            }
917            TradingCommand::SubmitOrderList(submit_order_list) => {
918                self.deny_order_list(submit_order_list.order_list, reason);
919            }
920            _ => {
921                panic!("Cannot deny command {command}");
922            }
923        }
924    }
925
926    fn deny_order(&self, order: OrderAny, reason: &str) {
927        log::warn!(
928            "SubmitOrder for {} DENIED: {}",
929            order.client_order_id(),
930            reason
931        );
932
933        if order.status() != OrderStatus::Initialized {
934            return;
935        }
936
937        let mut cache = self.cache.borrow_mut();
938        if !cache.order_exists(&order.client_order_id()) {
939            cache
940                .add_order(order.clone(), None, None, false)
941                .map_err(|e| {
942                    log::error!("Cannot add order to cache: {e}");
943                })
944                .unwrap();
945        }
946
947        let denied = OrderEventAny::Denied(OrderDenied::new(
948            order.trader_id(),
949            order.strategy_id(),
950            order.instrument_id(),
951            order.client_order_id(),
952            reason.into(),
953            UUID4::new(),
954            self.clock.borrow().timestamp_ns(),
955            self.clock.borrow().timestamp_ns(),
956        ));
957
958        msgbus::send_any("ExecEngine.process".into(), &denied);
959    }
960
961    fn deny_order_list(&self, order_list: OrderList, reason: &str) {
962        for order in order_list.orders {
963            if !order.is_closed() {
964                self.deny_order(order, reason);
965            }
966        }
967    }
968
969    fn reject_modify_order(&self, order: OrderAny, reason: &str) {
970        let ts_event = self.clock.borrow().timestamp_ns();
971        let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
972            order.trader_id(),
973            order.strategy_id(),
974            order.instrument_id(),
975            order.client_order_id(),
976            reason.into(),
977            UUID4::new(),
978            ts_event,
979            ts_event,
980            false,
981            order.venue_order_id(),
982            order.account_id(),
983        ));
984
985        msgbus::send_any("ExecEngine.process".into(), &denied);
986    }
987
988    // -- EGRESS ----------------------------------------------------------------------------------
989
990    fn execution_gateway(&self, instrument: InstrumentAny, command: TradingCommand) {
991        match self.trading_state {
992            TradingState::Halted => match command {
993                TradingCommand::SubmitOrder(submit_order) => {
994                    self.deny_order(submit_order.order, "TradingState::HALTED");
995                }
996                TradingCommand::SubmitOrderList(submit_order_list) => {
997                    self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
998                }
999                _ => {}
1000            },
1001            TradingState::Reducing => match command {
1002                TradingCommand::SubmitOrder(submit_order) => {
1003                    let order = submit_order.order;
1004                    if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1005                        self.deny_order(
1006                            order,
1007                            &format!(
1008                                "BUY when TradingState::REDUCING and LONG {}",
1009                                instrument.id()
1010                            ),
1011                        );
1012                    } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1013                        self.deny_order(
1014                            order,
1015                            &format!(
1016                                "SELL when TradingState::REDUCING and SHORT {}",
1017                                instrument.id()
1018                            ),
1019                        );
1020                    }
1021                }
1022                TradingCommand::SubmitOrderList(submit_order_list) => {
1023                    let order_list = submit_order_list.order_list;
1024                    for order in &order_list.orders {
1025                        if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1026                            self.deny_order_list(
1027                                order_list,
1028                                &format!(
1029                                    "BUY when TradingState::REDUCING and LONG {}",
1030                                    instrument.id()
1031                                ),
1032                            );
1033                            return;
1034                        } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1035                            self.deny_order_list(
1036                                order_list,
1037                                &format!(
1038                                    "SELL when TradingState::REDUCING and SHORT {}",
1039                                    instrument.id()
1040                                ),
1041                            );
1042                            return;
1043                        }
1044                    }
1045                }
1046                _ => {}
1047            },
1048            TradingState::Active => match command {
1049                TradingCommand::SubmitOrder(_submit_order) => {
1050                    // TODO: Fix message bus usage
1051                    // self.throttled_submit_order.send(submit_order);
1052                }
1053                TradingCommand::SubmitOrderList(_submit_order_list) => {
1054                    todo!("NOT IMPLEMENTED");
1055                }
1056                _ => {}
1057            },
1058        }
1059    }
1060
1061    fn send_to_execution(&self, command: TradingCommand) {
1062        msgbus::send_any("ExecEngine.execute".into(), &command);
1063    }
1064
1065    fn handle_event(&mut self, event: OrderEventAny) {
1066        // We intend to extend the risk engine to be able to handle additional events.
1067        // For now we just log.
1068        if self.config.debug {
1069            log::debug!("{RECV}{EVT} {event:?}");
1070        }
1071    }
1072}