nautilus_execution/order_manager/
manager.rs1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 logging::{CMD, EVT, SEND},
26 messages::execution::{SubmitOrder, TradingCommand},
27 msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31 enums::{ContingencyType, TriggerType},
32 events::{
33 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34 },
35 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36 orders::{Order, OrderAny},
37 types::Quantity,
38};
39
40pub struct OrderManager {
41 clock: Rc<RefCell<dyn Clock>>,
42 cache: Rc<RefCell<Cache>>,
43 active_local: bool,
44 submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
48}
49
50impl Debug for OrderManager {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct(stringify!(OrderManager))
53 .field("pending_commands", &self.submit_order_commands.len())
54 .finish()
55 }
56}
57
58impl OrderManager {
59 pub fn new(
60 clock: Rc<RefCell<dyn Clock>>,
61 cache: Rc<RefCell<Cache>>,
62 active_local: bool,
63 ) -> Self {
67 Self {
68 clock,
69 cache,
70 active_local,
71 submit_order_commands: HashMap::new(),
75 }
76 }
77
78 #[must_use]
91 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
92 self.submit_order_commands.clone()
93 }
94
95 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
96 self.submit_order_commands
97 .insert(command.order.client_order_id(), command);
98 }
99
100 pub fn pop_submit_order_command(
101 &mut self,
102 client_order_id: ClientOrderId,
103 ) -> Option<SubmitOrder> {
104 self.submit_order_commands.remove(&client_order_id)
105 }
106
107 pub fn reset(&mut self) {
108 self.submit_order_commands.clear();
109 }
110
111 pub fn cancel_order(&mut self, order: &OrderAny) {
112 if self
113 .cache
114 .borrow()
115 .is_order_pending_cancel_local(&order.client_order_id())
116 {
117 return;
118 }
119
120 if order.is_closed() {
121 log::warn!("Cannot cancel order: already closed");
122 return;
123 }
124
125 self.submit_order_commands.remove(&order.client_order_id());
126
127 }
131
132 pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
133 }
137
138 pub fn create_new_submit_order(
142 &mut self,
143 order: &OrderAny,
144 position_id: Option<PositionId>,
145 client_id: Option<ClientId>,
146 ) -> anyhow::Result<()> {
147 let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
148 let venue_order_id = order
149 .venue_order_id()
150 .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
151
152 let submit = SubmitOrder::new(
153 order.trader_id(),
154 client_id,
155 order.strategy_id(),
156 order.instrument_id(),
157 order.client_order_id(),
158 venue_order_id,
159 order.clone(),
160 order.exec_algorithm_id(),
161 position_id,
162 UUID4::new(),
163 self.clock.borrow().timestamp_ns(),
164 )?;
165
166 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
167 self.cache_submit_order_command(submit.clone());
168
169 match order.exec_algorithm_id() {
170 Some(exec_algorithm_id) => {
171 self.send_algo_command(submit, exec_algorithm_id);
172 }
173 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
174 }
175 } Ok(())
180 }
181
182 #[must_use]
183 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
184 self.active_local && order.is_active_local()
185 }
186
187 pub fn handle_event(&mut self, event: OrderEventAny) {
189 match event {
190 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
191 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
192 OrderEventAny::Expired(event) => self.handle_order_expired(event),
193 OrderEventAny::Updated(event) => self.handle_order_updated(event),
194 OrderEventAny::Filled(event) => self.handle_order_filled(event),
195 _ => self.handle_position_event(event),
196 }
197 }
198
199 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
200 let cloned_order = self
201 .cache
202 .borrow()
203 .order(&rejected.client_order_id)
204 .cloned();
205 if let Some(order) = cloned_order {
206 if order.contingency_type() != Some(ContingencyType::NoContingency) {
207 self.handle_contingencies(order);
208 }
209 } else {
210 log::error!(
211 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
212 rejected.client_order_id,
213 rejected
214 );
215 }
216 }
217
218 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
219 let cloned_order = self
220 .cache
221 .borrow()
222 .order(&canceled.client_order_id)
223 .cloned();
224 if let Some(order) = cloned_order {
225 if order.contingency_type() != Some(ContingencyType::NoContingency) {
226 self.handle_contingencies(order);
227 }
228 } else {
229 log::error!(
230 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
231 canceled.client_order_id,
232 canceled
233 );
234 }
235 }
236
237 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
238 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
239 if let Some(order) = cloned_order {
240 if order.contingency_type() != Some(ContingencyType::NoContingency) {
241 self.handle_contingencies(order);
242 }
243 } else {
244 log::error!(
245 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
246 expired.client_order_id,
247 expired
248 );
249 }
250 }
251
252 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
253 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
254 if let Some(order) = cloned_order {
255 if order.contingency_type() != Some(ContingencyType::NoContingency) {
256 self.handle_contingencies_update(order);
257 }
258 } else {
259 log::error!(
260 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
261 updated.client_order_id,
262 updated
263 );
264 }
265 }
266
267 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
271 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
272 {
273 order
274 } else {
275 log::error!(
276 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
277 filled.client_order_id,
278 filled
279 );
280 return;
281 };
282
283 match order.contingency_type() {
284 Some(ContingencyType::Oto) => {
285 let position_id = self
286 .cache
287 .borrow()
288 .position_id(&order.client_order_id())
289 .copied();
290 let client_id = self
291 .cache
292 .borrow()
293 .client_id(&order.client_order_id())
294 .copied();
295
296 let parent_filled_qty = match order.exec_spawn_id() {
297 Some(spawn_id) => {
298 if let Some(qty) = self
299 .cache
300 .borrow()
301 .exec_spawn_total_filled_qty(&spawn_id, true)
302 {
303 qty
304 } else {
305 log::error!("Failed to get spawn filled quantity for {spawn_id}");
306 return;
307 }
308 }
309 None => order.filled_qty(),
310 };
311
312 let linked_orders = if let Some(orders) = order.linked_order_ids() {
313 orders
314 } else {
315 log::error!("No linked orders found for OTO order");
316 return;
317 };
318
319 for client_order_id in linked_orders {
320 let mut child_order =
321 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
322 order
323 } else {
324 panic!(
325 "Cannot find OTO child order for client_order_id: {client_order_id}"
326 );
327 };
328
329 if !self.should_manage_order(&child_order) {
330 continue;
331 }
332
333 if child_order.position_id().is_none() {
334 child_order.set_position_id(position_id);
335 }
336
337 if parent_filled_qty != child_order.leaves_qty() {
338 self.modify_order_quantity(&mut child_order, parent_filled_qty);
339 }
340
341 if !self
346 .submit_order_commands
347 .contains_key(&child_order.client_order_id())
348 {
349 if let Err(e) =
350 self.create_new_submit_order(&child_order, position_id, client_id)
351 {
352 log::error!("Failed to create new submit order: {e}");
353 }
354 }
355 }
356 }
357 Some(ContingencyType::Oco) => {
358 let linked_orders = if let Some(orders) = order.linked_order_ids() {
359 orders
360 } else {
361 log::error!("No linked orders found for OCO order");
362 return;
363 };
364
365 for client_order_id in linked_orders {
366 let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
367 {
368 Some(contingent_order) => contingent_order,
369 None => {
370 panic!(
371 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
372 );
373 }
374 };
375
376 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
378 {
379 continue;
380 }
381 if contingent_order.client_order_id() != order.client_order_id() {
382 self.cancel_order(&contingent_order);
383 }
384 }
385 }
386 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
387 _ => {}
388 }
389 }
390
391 pub fn handle_contingencies(&mut self, order: OrderAny) {
395 let (filled_qty, leaves_qty, is_spawn_active) =
396 if let Some(exec_spawn_id) = order.exec_spawn_id() {
397 if let (Some(filled), Some(leaves)) = (
398 self.cache
399 .borrow()
400 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
401 self.cache
402 .borrow()
403 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
404 ) {
405 (filled, leaves, leaves.raw > 0)
406 } else {
407 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
408 return;
409 }
410 } else {
411 (order.filled_qty(), order.leaves_qty(), false)
412 };
413
414 let linked_orders = if let Some(orders) = order.linked_order_ids() {
415 orders
416 } else {
417 log::error!("No linked orders found");
418 return;
419 };
420
421 for client_order_id in linked_orders {
422 let mut contingent_order =
423 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
424 order
425 } else {
426 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
427 };
428
429 if !self.should_manage_order(&contingent_order)
430 || client_order_id == &order.client_order_id()
431 {
432 continue;
433 }
434
435 if contingent_order.is_closed() {
436 self.submit_order_commands.remove(&order.client_order_id());
437 continue;
438 }
439
440 match order.contingency_type() {
441 Some(ContingencyType::Oto) => {
442 if order.is_closed()
443 && filled_qty.raw == 0
444 && (order.exec_spawn_id().is_none() || !is_spawn_active)
445 {
446 self.cancel_order(&contingent_order);
447 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
448 self.modify_order_quantity(&mut contingent_order, filled_qty);
449 }
450 }
451 Some(ContingencyType::Oco) => {
452 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
453 self.cancel_order(&contingent_order);
454 }
455 }
456 Some(ContingencyType::Ouo) => {
457 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
458 || (order.is_closed()
459 && (order.exec_spawn_id().is_none() || !is_spawn_active))
460 {
461 self.cancel_order(&contingent_order);
462 } else if leaves_qty != contingent_order.leaves_qty() {
463 self.modify_order_quantity(&mut contingent_order, leaves_qty);
464 }
465 }
466 _ => {}
467 }
468 }
469 }
470
471 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
475 let quantity = match order.exec_spawn_id() {
476 Some(exec_spawn_id) => {
477 if let Some(qty) = self
478 .cache
479 .borrow()
480 .exec_spawn_total_quantity(&exec_spawn_id, true)
481 {
482 qty
483 } else {
484 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
485 return;
486 }
487 }
488 None => order.quantity(),
489 };
490
491 if quantity.raw == 0 {
492 return;
493 }
494
495 let linked_orders = if let Some(orders) = order.linked_order_ids() {
496 orders
497 } else {
498 log::error!("No linked orders found for contingent order");
499 return;
500 };
501
502 for client_order_id in linked_orders {
503 let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
504 Some(contingent_order) => contingent_order,
505 None => panic!(
506 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
507 ),
508 };
509
510 if !self.should_manage_order(&contingent_order)
511 || client_order_id == &order.client_order_id()
512 || contingent_order.is_closed()
513 {
514 continue;
515 }
516
517 if let Some(contingency_type) = order.contingency_type() {
518 if matches!(
519 contingency_type,
520 ContingencyType::Oto | ContingencyType::Oco
521 ) && quantity != contingent_order.quantity()
522 {
523 self.modify_order_quantity(&mut contingent_order, quantity);
524 }
525 }
526 }
527 }
528
529 pub fn handle_position_event(&mut self, _event: OrderEventAny) {
530 todo!()
531 }
532
533 pub fn send_emulator_command(&self, command: TradingCommand) {
535 log::info!("{CMD}{SEND} {command}");
536
537 msgbus::send("OrderEmulator.execute".into(), &command);
538 }
539
540 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
541 log::info!("{CMD}{SEND} {command}");
542
543 let endpoint = format!("{exec_algorithm_id}.execute");
544 msgbus::send(endpoint.into(), &TradingCommand::SubmitOrder(command));
545 }
546
547 pub fn send_risk_command(&self, command: TradingCommand) {
548 log::info!("{CMD}{SEND} {command}");
549 msgbus::send("RiskEngine.execute".into(), &command);
550 }
551
552 pub fn send_exec_command(&self, command: TradingCommand) {
553 log::info!("{CMD}{SEND} {command}");
554 msgbus::send("ExecEngine.execute".into(), &command);
555 }
556
557 pub fn send_risk_event(&self, event: OrderEventAny) {
558 log::info!("{EVT}{SEND} {event}");
559 msgbus::send("RiskEngine.process".into(), &event);
560 }
561
562 pub fn send_exec_event(&self, event: OrderEventAny) {
563 log::info!("{EVT}{SEND} {event}");
564 msgbus::send("ExecEngine.process".into(), &event);
565 }
566}