1pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
24
25use config::RiskEngineConfig;
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 logging::{CMD, EVT, RECV},
30 messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
31 msgbus,
32 throttler::Throttler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36 accounts::{Account, AccountAny},
37 enums::{InstrumentClass, OrderSide, OrderStatus, TimeInForce, TradingState},
38 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
39 identifiers::InstrumentId,
40 instruments::{Instrument, InstrumentAny},
41 orders::{Order, OrderAny, OrderList},
42 types::{Currency, Money, Price, Quantity},
43};
44use nautilus_portfolio::Portfolio;
45use rust_decimal::{Decimal, prelude::ToPrimitive};
46use ustr::Ustr;
47
48type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
49type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
50
51#[allow(dead_code)]
58pub struct RiskEngine {
59 clock: Rc<RefCell<dyn Clock>>,
60 cache: Rc<RefCell<Cache>>,
61 portfolio: Portfolio,
62 pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
63 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
64 max_notional_per_order: HashMap<InstrumentId, Decimal>,
65 trading_state: TradingState,
66 config: RiskEngineConfig,
67}
68
69impl Debug for RiskEngine {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct(stringify!(RiskEngine)).finish()
72 }
73}
74
75impl RiskEngine {
76 pub fn new(
78 config: RiskEngineConfig,
79 portfolio: Portfolio,
80 clock: Rc<RefCell<dyn Clock>>,
81 cache: Rc<RefCell<Cache>>,
82 ) -> Self {
83 let throttled_submit_order =
84 Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
85
86 let throttled_modify_order =
87 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
88
89 Self {
90 clock,
91 cache,
92 portfolio,
93 throttled_submit_order,
94 throttled_modify_order,
95 max_notional_per_order: HashMap::new(),
96 trading_state: TradingState::Active,
97 config,
98 }
99 }
100
101 fn create_submit_order_throttler(
102 config: &RiskEngineConfig,
103 clock: Rc<RefCell<dyn Clock>>,
104 cache: Rc<RefCell<Cache>>,
105 ) -> Throttler<SubmitOrder, SubmitOrderFn> {
106 let success_handler = {
107 Box::new(move |submit_order: SubmitOrder| {
108 msgbus::send_any(
109 "ExecEngine.execute".into(),
110 &TradingCommand::SubmitOrder(submit_order),
111 );
112 }) as Box<dyn Fn(SubmitOrder)>
113 };
114
115 let failure_handler = {
116 let cache = cache;
117 let clock = clock.clone();
118 Box::new(move |submit_order: SubmitOrder| {
119 let reason = "REJECTED BY THROTTLER";
120 log::warn!(
121 "SubmitOrder for {} DENIED: {}",
122 submit_order.client_order_id,
123 reason
124 );
125
126 Self::handle_submit_order_cache(&cache, &submit_order);
127
128 let denied = Self::create_order_denied(&submit_order, reason, &clock);
129
130 msgbus::send_any("ExecEngine.process".into(), &denied);
131 }) as Box<dyn Fn(SubmitOrder)>
132 };
133
134 Throttler::new(
135 config.max_order_submit.limit,
136 config.max_order_submit.interval_ns,
137 clock,
138 "ORDER_SUBMIT_THROTTLER".to_string(),
139 success_handler,
140 Some(failure_handler),
141 Ustr::from(&UUID4::new().to_string()),
142 )
143 }
144
145 fn create_modify_order_throttler(
146 config: &RiskEngineConfig,
147 clock: Rc<RefCell<dyn Clock>>,
148 cache: Rc<RefCell<Cache>>,
149 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
150 let success_handler = {
151 Box::new(move |order: ModifyOrder| {
152 msgbus::send_any(
153 "ExecEngine.execute".into(),
154 &TradingCommand::ModifyOrder(order),
155 );
156 }) as Box<dyn Fn(ModifyOrder)>
157 };
158
159 let failure_handler = {
160 let cache = cache;
161 let clock = clock.clone();
162 Box::new(move |order: ModifyOrder| {
163 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
164 log::warn!(
165 "SubmitOrder for {} DENIED: {}",
166 order.client_order_id,
167 reason
168 );
169
170 let order = match Self::get_existing_order(&cache, &order) {
171 Some(order) => order,
172 None => return,
173 };
174
175 let rejected = Self::create_modify_rejected(&order, reason, &clock);
176
177 msgbus::send_any("ExecEngine.process".into(), &rejected);
178 }) as Box<dyn Fn(ModifyOrder)>
179 };
180
181 Throttler::new(
182 config.max_order_modify.limit,
183 config.max_order_modify.interval_ns,
184 clock,
185 "ORDER_MODIFY_THROTTLER".to_string(),
186 success_handler,
187 Some(failure_handler),
188 Ustr::from(&UUID4::new().to_string()),
189 )
190 }
191
192 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
193 let mut cache = cache.borrow_mut();
194 if !cache.order_exists(&submit_order.client_order_id) {
195 cache
196 .add_order(submit_order.order.clone(), None, None, false)
197 .map_err(|e| {
198 log::error!("Cannot add order to cache: {e}");
199 })
200 .unwrap();
201 }
202 }
203
204 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
205 let cache = cache.borrow();
206 if let Some(order) = cache.order(&order.client_order_id) {
207 Some(order.clone())
208 } else {
209 log::error!(
210 "Order with command.client_order_id: {} not found",
211 order.client_order_id
212 );
213 None
214 }
215 }
216
217 fn create_order_denied(
218 submit_order: &SubmitOrder,
219 reason: &str,
220 clock: &Rc<RefCell<dyn Clock>>,
221 ) -> OrderEventAny {
222 let timestamp = clock.borrow().timestamp_ns();
223 OrderEventAny::Denied(OrderDenied::new(
224 submit_order.trader_id,
225 submit_order.strategy_id,
226 submit_order.instrument_id,
227 submit_order.client_order_id,
228 reason.into(),
229 UUID4::new(),
230 timestamp,
231 timestamp,
232 ))
233 }
234
235 fn create_modify_rejected(
236 order: &OrderAny,
237 reason: &str,
238 clock: &Rc<RefCell<dyn Clock>>,
239 ) -> OrderEventAny {
240 let timestamp = clock.borrow().timestamp_ns();
241 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
242 order.trader_id(),
243 order.strategy_id(),
244 order.instrument_id(),
245 order.client_order_id(),
246 reason.into(),
247 UUID4::new(),
248 timestamp,
249 timestamp,
250 false,
251 order.venue_order_id(),
252 None,
253 ))
254 }
255
256 pub fn execute(&mut self, command: TradingCommand) {
260 self.handle_command(command);
262 }
263
264 pub fn process(&mut self, event: OrderEventAny) {
266 self.handle_event(event);
268 }
269
270 pub fn set_trading_state(&mut self, state: TradingState) {
272 if state == self.trading_state {
273 log::warn!("No change to trading state: already set to {state:?}");
274 return;
275 }
276
277 self.trading_state = state;
278
279 let _ts_now = self.clock.borrow().timestamp_ns();
280
281 msgbus::publish("events.risk".into(), &"message"); log::info!("Trading state set to {state:?}");
287 }
288
289 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
291 self.max_notional_per_order.insert(instrument_id, new_value);
292
293 let new_value_str = new_value.to_string();
294 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
295 }
296
297 fn handle_command(&mut self, command: TradingCommand) {
301 if self.config.debug {
302 log::debug!("{CMD}{RECV} {command:?}");
303 }
304
305 match command {
306 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
307 TradingCommand::SubmitOrderList(submit_order_list) => {
308 self.handle_submit_order_list(submit_order_list);
309 }
310 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
311 _ => {
312 log::error!("Cannot handle command: {command}");
313 }
314 }
315 }
316
317 fn handle_submit_order(&self, command: SubmitOrder) {
318 if self.config.bypass {
319 self.send_to_execution(TradingCommand::SubmitOrder(command));
320 return;
321 }
322
323 let order = &command.order;
324 if let Some(position_id) = command.position_id {
325 if order.is_reduce_only() {
326 let position_exists = {
327 let cache = self.cache.borrow();
328 cache
329 .position(&position_id)
330 .map(|pos| (pos.side, pos.quantity))
331 };
332
333 if let Some((pos_side, pos_quantity)) = position_exists {
334 if !order.would_reduce_only(pos_side, pos_quantity) {
335 self.deny_command(
336 TradingCommand::SubmitOrder(command),
337 &format!("Reduce only order would increase position {position_id}"),
338 );
339 return; }
341 } else {
342 self.deny_command(
343 TradingCommand::SubmitOrder(command),
344 &format!("Position {position_id} not found for reduce-only order"),
345 );
346 return;
347 }
348 }
349 }
350
351 let instrument_exists = {
352 let cache = self.cache.borrow();
353 cache.instrument(&order.instrument_id()).cloned()
354 };
355
356 let instrument = if let Some(instrument) = instrument_exists {
357 instrument
358 } else {
359 self.deny_command(
360 TradingCommand::SubmitOrder(command.clone()),
361 &format!("Instrument for {} not found", command.instrument_id),
362 );
363 return; };
365
366 if !self.check_order(instrument.clone(), order.clone()) {
370 return; }
372
373 if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
374 return; }
376
377 self.execution_gateway(instrument, TradingCommand::SubmitOrder(command.clone()));
378 }
379
380 fn handle_submit_order_list(&self, command: SubmitOrderList) {
381 if self.config.bypass {
382 self.send_to_execution(TradingCommand::SubmitOrderList(command));
383 return;
384 }
385
386 let instrument_exists = {
387 let cache = self.cache.borrow();
388 cache.instrument(&command.instrument_id).cloned()
389 };
390
391 let instrument = if let Some(instrument) = instrument_exists {
392 instrument
393 } else {
394 self.deny_command(
395 TradingCommand::SubmitOrderList(command.clone()),
396 &format!("no instrument found for {}", command.instrument_id),
397 );
398 return; };
400
401 for order in command.order_list.orders.clone() {
405 if !self.check_order(instrument.clone(), order) {
406 return; }
408 }
409
410 if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
411 self.deny_order_list(
412 command.order_list.clone(),
413 &format!("OrderList {} DENIED", command.order_list.id),
414 );
415 return; }
417
418 self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
419 }
420
421 fn handle_modify_order(&self, command: ModifyOrder) {
422 let order_exists = {
426 let cache = self.cache.borrow();
427 cache.order(&command.client_order_id).cloned()
428 };
429
430 let order = if let Some(order) = order_exists {
431 order
432 } else {
433 log::error!(
434 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
435 command.client_order_id
436 );
437 return;
438 };
439
440 if order.is_closed() {
441 self.reject_modify_order(
442 order,
443 &format!(
444 "Order with command.client_order_id: {} already closed",
445 command.client_order_id
446 ),
447 );
448 return;
449 } else if order.status() == OrderStatus::PendingCancel {
450 self.reject_modify_order(
451 order,
452 &format!(
453 "Order with command.client_order_id: {} is already pending cancel",
454 command.client_order_id
455 ),
456 );
457 return;
458 }
459
460 let maybe_instrument = {
462 let cache = self.cache.borrow();
463 cache.instrument(&command.instrument_id).cloned()
464 };
465
466 let instrument = if let Some(instrument) = maybe_instrument {
467 instrument
468 } else {
469 self.reject_modify_order(
470 order,
471 &format!("no instrument found for {}", command.instrument_id),
472 );
473 return; };
475
476 let mut risk_msg = self.check_price(&instrument, command.price);
478 if let Some(risk_msg) = risk_msg {
479 self.reject_modify_order(order, &risk_msg);
480 return; }
482
483 risk_msg = self.check_price(&instrument, command.trigger_price);
485 if let Some(risk_msg) = risk_msg {
486 self.reject_modify_order(order, &risk_msg);
487 return; }
489
490 risk_msg = self.check_quantity(&instrument, command.quantity);
492 if let Some(risk_msg) = risk_msg {
493 self.reject_modify_order(order, &risk_msg);
494 return; }
496
497 match self.trading_state {
499 TradingState::Halted => {
500 self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
501 }
502 TradingState::Reducing => {
503 if let Some(quantity) = command.quantity {
504 if quantity > order.quantity()
505 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
506 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
507 {
508 self.reject_modify_order(
509 order,
510 &format!(
511 "TradingState is REDUCING and update will increase exposure {}",
512 instrument.id()
513 ),
514 );
515 }
516 }
517 }
518 _ => {}
519 }
520
521 }
524
525 fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
528 if order.time_in_force() == TimeInForce::Gtd {
532 let expire_time = order.expire_time().unwrap();
534 if expire_time <= self.clock.borrow().timestamp_ns() {
535 self.deny_order(
536 order,
537 &format!("GTD {} already past", expire_time.to_rfc3339()),
538 );
539 return false; }
541 }
542
543 if !self.check_order_price(instrument.clone(), order.clone())
544 || !self.check_order_quantity(instrument, order)
545 {
546 return false; }
548
549 true
550 }
551
552 fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
553 if order.price().is_some() {
557 let risk_msg = self.check_price(&instrument, order.price());
558 if let Some(risk_msg) = risk_msg {
559 self.deny_order(order, &risk_msg);
560 return false; }
562 }
563
564 if order.trigger_price().is_some() {
568 let risk_msg = self.check_price(&instrument, order.trigger_price());
569 if let Some(risk_msg) = risk_msg {
570 self.deny_order(order, &risk_msg);
571 return false; }
573 }
574
575 true
576 }
577
578 fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
579 let risk_msg = self.check_quantity(&instrument, Some(order.quantity()));
580 if let Some(risk_msg) = risk_msg {
581 self.deny_order(order, &risk_msg);
582 return false; }
584
585 true
586 }
587
588 fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
589 let mut last_px: Option<Price> = None;
593 let mut max_notional: Option<Money> = None;
594
595 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
597 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
598 max_notional = Some(Money::new(
599 max_notional_setting_val
600 .to_f64()
601 .expect("Invalid decimal conversion"),
602 instrument.quote_currency(),
603 ));
604 }
605
606 let account_exists = {
608 let cache = self.cache.borrow();
609 cache.account_for_venue(&instrument.id().venue).cloned()
610 };
611
612 let account = if let Some(account) = account_exists {
613 account
614 } else {
615 log::debug!("Cannot find account for venue {}", instrument.id().venue);
616 return true; };
618 let cash_account = match account {
619 AccountAny::Cash(cash_account) => cash_account,
620 AccountAny::Margin(_) => return true, };
622 let free = cash_account.balance_free(Some(instrument.quote_currency()));
623 if self.config.debug {
624 log::debug!("Free cash: {free:?}");
625 }
626
627 let mut cum_notional_buy: Option<Money> = None;
628 let mut cum_notional_sell: Option<Money> = None;
629 let mut base_currency: Option<Currency> = None;
630 for order in &orders {
631 last_px = match order {
633 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
634 if last_px.is_none() {
635 let cache = self.cache.borrow();
636 if let Some(last_quote) = cache.quote(&instrument.id()) {
637 match order.order_side() {
638 OrderSide::Buy => Some(last_quote.ask_price),
639 OrderSide::Sell => Some(last_quote.bid_price),
640 _ => panic!("Invalid order side"),
641 }
642 } else {
643 let cache = self.cache.borrow();
644 let last_trade = cache.trade(&instrument.id());
645
646 if let Some(last_trade) = last_trade {
647 Some(last_trade.price)
648 } else {
649 log::warn!(
650 "Cannot check MARKET order risk: no prices for {}",
651 instrument.id()
652 );
653 continue;
654 }
655 }
656 } else {
657 last_px
658 }
659 }
660 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
661 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
662 if let Some(trigger_price) = order.trigger_price() {
663 Some(trigger_price)
664 } else {
665 log::warn!(
666 "Cannot check {} order risk: no trigger price was set", order.order_type()
668 );
669 continue;
670 }
671 }
672 _ => order.price(),
673 };
674
675 let last_px = if let Some(px) = last_px {
676 px
677 } else {
678 log::error!("Cannot check order risk: no price available");
679 continue;
680 };
681
682 let notional =
683 instrument.calculate_notional_value(order.quantity(), last_px, Some(true));
684
685 if self.config.debug {
686 log::debug!("Notional: {notional:?}");
687 }
688
689 if let Some(max_notional_value) = max_notional {
691 if notional > max_notional_value {
692 self.deny_order(
693 order.clone(),
694 &format!(
695 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
696 ),
697 );
698 return false; }
700 }
701
702 if let Some(min_notional) = instrument.min_notional() {
704 if notional.currency == min_notional.currency && notional < min_notional {
705 self.deny_order(
706 order.clone(),
707 &format!(
708 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
709 ),
710 );
711 return false; }
713 }
714
715 if let Some(max_notional) = instrument.max_notional() {
717 if notional.currency == max_notional.currency && notional > max_notional {
718 self.deny_order(
719 order.clone(),
720 &format!(
721 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
722 ),
723 );
724 return false; }
726 }
727
728 let notional = instrument.calculate_notional_value(order.quantity(), last_px, None);
730 let order_balance_impact = match order.order_side() {
731 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
732 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
733 OrderSide::NoOrderSide => {
734 panic!("invalid `OrderSide`, was {}", order.order_side());
735 }
736 };
737
738 if self.config.debug {
739 log::debug!("Balance impact: {order_balance_impact}");
740 }
741
742 if let Some(free_val) = free {
743 if (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO {
744 self.deny_order(
745 order.clone(),
746 &format!(
747 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
748 ),
749 );
750 return false;
751 }
752 }
753
754 if base_currency.is_none() {
755 base_currency = instrument.base_currency();
756 }
757 if order.is_buy() {
758 match cum_notional_buy.as_mut() {
759 Some(cum_notional_buy_val) => {
760 cum_notional_buy_val.raw += -order_balance_impact.raw;
761 }
762 None => {
763 cum_notional_buy = Some(Money::from_raw(
764 -order_balance_impact.raw,
765 order_balance_impact.currency,
766 ));
767 }
768 }
769
770 if self.config.debug {
771 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
772 }
773
774 if let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy) {
775 if cum_notional_buy > free {
776 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
777 return false; }
779 }
780 } else if order.is_sell() {
781 if cash_account.base_currency.is_some() {
782 match cum_notional_sell.as_mut() {
783 Some(cum_notional_buy_val) => {
784 cum_notional_buy_val.raw += order_balance_impact.raw;
785 }
786 None => {
787 cum_notional_sell = Some(Money::from_raw(
788 order_balance_impact.raw,
789 order_balance_impact.currency,
790 ));
791 }
792 }
793 if self.config.debug {
794 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
795 }
796
797 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell) {
798 if cum_notional_sell > free {
799 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
800 return false; }
802 }
803 }
804 else if let Some(base_currency) = base_currency {
806 let cash_value = Money::from_raw(
807 order
808 .quantity()
809 .raw
810 .try_into()
811 .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
812 .unwrap(),
813 base_currency,
814 );
815
816 if self.config.debug {
817 log::debug!("Cash value: {cash_value:?}");
818 log::debug!(
819 "Total: {:?}",
820 cash_account.balance_total(Some(base_currency))
821 );
822 log::debug!(
823 "Locked: {:?}",
824 cash_account.balance_locked(Some(base_currency))
825 );
826 log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
827 }
828
829 match cum_notional_sell {
830 Some(mut cum_notional_sell) => {
831 cum_notional_sell.raw += cash_value.raw;
832 }
833 None => cum_notional_sell = Some(cash_value),
834 }
835
836 if self.config.debug {
837 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
838 }
839 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell) {
840 if cum_notional_sell.raw > free.raw {
841 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
842 return false; }
844 }
845 }
846 }
847 }
848
849 true }
852
853 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
854 let price_val = price?;
855
856 if price_val.precision > instrument.price_precision() {
857 return Some(format!(
858 "price {} invalid (precision {} > {})",
859 price_val,
860 price_val.precision,
861 instrument.price_precision()
862 ));
863 }
864
865 if instrument.instrument_class() != InstrumentClass::Option && price_val.raw <= 0 {
866 return Some(format!("price {price_val} invalid (<= 0)"));
867 }
868
869 None
870 }
871
872 fn check_quantity(
873 &self,
874 instrument: &InstrumentAny,
875 quantity: Option<Quantity>,
876 ) -> Option<String> {
877 let quantity_val = quantity?;
878
879 if quantity_val.precision > instrument.size_precision() {
881 return Some(format!(
882 "quantity {} invalid (precision {} > {})",
883 quantity_val,
884 quantity_val.precision,
885 instrument.size_precision()
886 ));
887 }
888
889 if let Some(max_quantity) = instrument.max_quantity() {
891 if quantity_val > max_quantity {
892 return Some(format!(
893 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
894 ));
895 }
896 }
897
898 if let Some(min_quantity) = instrument.min_quantity() {
900 if quantity_val < min_quantity {
901 return Some(format!(
902 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
903 ));
904 }
905 }
906
907 None
908 }
909
910 fn deny_command(&self, command: TradingCommand, reason: &str) {
913 match command {
914 TradingCommand::SubmitOrder(submit_order) => {
915 self.deny_order(submit_order.order, reason);
916 }
917 TradingCommand::SubmitOrderList(submit_order_list) => {
918 self.deny_order_list(submit_order_list.order_list, reason);
919 }
920 _ => {
921 panic!("Cannot deny command {command}");
922 }
923 }
924 }
925
926 fn deny_order(&self, order: OrderAny, reason: &str) {
927 log::warn!(
928 "SubmitOrder for {} DENIED: {}",
929 order.client_order_id(),
930 reason
931 );
932
933 if order.status() != OrderStatus::Initialized {
934 return;
935 }
936
937 let mut cache = self.cache.borrow_mut();
938 if !cache.order_exists(&order.client_order_id()) {
939 cache
940 .add_order(order.clone(), None, None, false)
941 .map_err(|e| {
942 log::error!("Cannot add order to cache: {e}");
943 })
944 .unwrap();
945 }
946
947 let denied = OrderEventAny::Denied(OrderDenied::new(
948 order.trader_id(),
949 order.strategy_id(),
950 order.instrument_id(),
951 order.client_order_id(),
952 reason.into(),
953 UUID4::new(),
954 self.clock.borrow().timestamp_ns(),
955 self.clock.borrow().timestamp_ns(),
956 ));
957
958 msgbus::send_any("ExecEngine.process".into(), &denied);
959 }
960
961 fn deny_order_list(&self, order_list: OrderList, reason: &str) {
962 for order in order_list.orders {
963 if !order.is_closed() {
964 self.deny_order(order, reason);
965 }
966 }
967 }
968
969 fn reject_modify_order(&self, order: OrderAny, reason: &str) {
970 let ts_event = self.clock.borrow().timestamp_ns();
971 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
972 order.trader_id(),
973 order.strategy_id(),
974 order.instrument_id(),
975 order.client_order_id(),
976 reason.into(),
977 UUID4::new(),
978 ts_event,
979 ts_event,
980 false,
981 order.venue_order_id(),
982 order.account_id(),
983 ));
984
985 msgbus::send_any("ExecEngine.process".into(), &denied);
986 }
987
988 fn execution_gateway(&self, instrument: InstrumentAny, command: TradingCommand) {
991 match self.trading_state {
992 TradingState::Halted => match command {
993 TradingCommand::SubmitOrder(submit_order) => {
994 self.deny_order(submit_order.order, "TradingState::HALTED");
995 }
996 TradingCommand::SubmitOrderList(submit_order_list) => {
997 self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
998 }
999 _ => {}
1000 },
1001 TradingState::Reducing => match command {
1002 TradingCommand::SubmitOrder(submit_order) => {
1003 let order = submit_order.order;
1004 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1005 self.deny_order(
1006 order,
1007 &format!(
1008 "BUY when TradingState::REDUCING and LONG {}",
1009 instrument.id()
1010 ),
1011 );
1012 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1013 self.deny_order(
1014 order,
1015 &format!(
1016 "SELL when TradingState::REDUCING and SHORT {}",
1017 instrument.id()
1018 ),
1019 );
1020 }
1021 }
1022 TradingCommand::SubmitOrderList(submit_order_list) => {
1023 let order_list = submit_order_list.order_list;
1024 for order in &order_list.orders {
1025 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1026 self.deny_order_list(
1027 order_list,
1028 &format!(
1029 "BUY when TradingState::REDUCING and LONG {}",
1030 instrument.id()
1031 ),
1032 );
1033 return;
1034 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1035 self.deny_order_list(
1036 order_list,
1037 &format!(
1038 "SELL when TradingState::REDUCING and SHORT {}",
1039 instrument.id()
1040 ),
1041 );
1042 return;
1043 }
1044 }
1045 }
1046 _ => {}
1047 },
1048 TradingState::Active => match command {
1049 TradingCommand::SubmitOrder(_submit_order) => {
1050 }
1053 TradingCommand::SubmitOrderList(_submit_order_list) => {
1054 todo!("NOT IMPLEMENTED");
1055 }
1056 _ => {}
1057 },
1058 }
1059 }
1060
1061 fn send_to_execution(&self, command: TradingCommand) {
1062 msgbus::send_any("ExecEngine.execute".into(), &command);
1063 }
1064
1065 fn handle_event(&mut self, event: OrderEventAny) {
1066 if self.config.debug {
1069 log::debug!("{RECV}{EVT} {event:?}");
1070 }
1071 }
1072}