1use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19 messages::{DataEvent, data::DataCommand},
20 msgbus::{self, switchboard::MessagingSwitchboard},
21 runner::{
22 DataCommandSender, RunnerEvent, TimeEventSender, set_data_cmd_sender,
23 set_data_event_sender, set_time_event_sender,
24 },
25 timer::TimeEventHandlerV2,
26};
27use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
28
29#[derive(Debug)]
31pub struct AsyncDataCommandSender {
32 cmd_tx: UnboundedSender<DataCommand>,
33}
34
35impl AsyncDataCommandSender {
36 #[must_use]
37 pub const fn new(cmd_tx: UnboundedSender<DataCommand>) -> Self {
38 Self { cmd_tx }
39 }
40}
41
42impl DataCommandSender for AsyncDataCommandSender {
43 fn execute(&self, command: DataCommand) {
44 if let Err(e) = self.cmd_tx.send(command) {
45 log::error!("Failed to send data command: {e}");
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct AsyncTimeEventSender {
53 time_tx: UnboundedSender<TimeEventHandlerV2>,
54}
55
56impl AsyncTimeEventSender {
57 #[must_use]
58 pub const fn new(time_tx: UnboundedSender<TimeEventHandlerV2>) -> Self {
59 Self { time_tx }
60 }
61
62 #[must_use]
67 pub fn get_channel_sender(&self) -> UnboundedSender<TimeEventHandlerV2> {
68 self.time_tx.clone()
69 }
70}
71
72impl TimeEventSender for AsyncTimeEventSender {
73 fn send(&self, handler: TimeEventHandlerV2) {
74 if let Err(e) = self.time_tx.send(handler) {
75 log::error!("Failed to send time event handler: {e}");
76 }
77 }
78}
79
80pub trait Runner {
81 fn run(&mut self);
82}
83
84pub struct AsyncRunner {
85 data_rx: UnboundedReceiver<DataEvent>,
86 cmd_rx: UnboundedReceiver<DataCommand>,
87 time_rx: UnboundedReceiver<TimeEventHandlerV2>,
88 signal_rx: UnboundedReceiver<()>,
89 signal_tx: UnboundedSender<()>,
90}
91
92impl Default for AsyncRunner {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98impl Debug for AsyncRunner {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct(stringify!(AsyncRunner)).finish()
101 }
102}
103
104impl AsyncRunner {
105 #[must_use]
106 pub fn new() -> Self {
107 let (data_tx, data_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
108 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
109 let (time_tx, time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
110 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
111
112 set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_tx)));
113 set_data_event_sender(data_tx);
114 set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(cmd_tx)));
115
116 Self {
117 data_rx,
118 cmd_rx,
119 time_rx,
120 signal_rx,
121 signal_tx,
122 }
123 }
124
125 pub fn stop(&self) {
127 if let Err(e) = self.signal_tx.send(()) {
128 log::error!("Failed to send shutdown signal: {e}");
129 }
130 }
131}
132
133impl AsyncRunner {
134 pub async fn run(&mut self) {
139 log::info!("Starting AsyncRunner");
140
141 let data_engine_process = MessagingSwitchboard::data_engine_process();
142 let data_engine_response = MessagingSwitchboard::data_engine_response();
143 let data_engine_execute = MessagingSwitchboard::data_engine_execute();
144
145 loop {
146 let next_msg = tokio::select! {
148 Some(resp) = self.data_rx.recv() => Some(RunnerEvent::Data(resp)),
149 Some(handler) = self.time_rx.recv() => Some(RunnerEvent::Time(handler)),
150 Some(cmd) = self.cmd_rx.recv() => {
151 msgbus::send_any(data_engine_execute, &cmd);
152 None },
154 Some(()) = self.signal_rx.recv() => {
155 tracing::info!("AsyncRunner received signal, shutting down");
156 return; },
158 else => return, };
160
161 tracing::trace!("Received {next_msg:?}");
162
163 if let Some(msg) = next_msg {
164 match msg {
165 RunnerEvent::Time(handler) => handler.run(),
166 RunnerEvent::Data(event) => {
167 #[cfg(feature = "defi")]
168 match event {
169 DataEvent::Data(data) => msgbus::send_any(data_engine_process, &data),
170 DataEvent::Response(resp) => {
171 msgbus::send_any(data_engine_response, &resp);
172 }
173 DataEvent::DeFi(data) => {
174 msgbus::send_any(data_engine_process, &data);
175 }
176 }
177 #[cfg(not(feature = "defi"))]
178 match event {
179 DataEvent::Data(data) => msgbus::send_any(data_engine_process, &data),
180 DataEvent::Response(resp) => {
181 msgbus::send_any(data_engine_response, &resp)
182 }
183 }
184 }
185 }
186 }
187 }
188 }
189}