nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19    messages::{DataEvent, data::DataCommand},
20    msgbus::{self, switchboard::MessagingSwitchboard},
21    runner::{
22        DataCommandSender, RunnerEvent, TimeEventSender, set_data_cmd_sender,
23        set_data_event_sender, set_time_event_sender,
24    },
25    timer::TimeEventHandlerV2,
26};
27use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
28
29/// Asynchronous implementation of `DataCommandSender` for live environments.
30#[derive(Debug)]
31pub struct AsyncDataCommandSender {
32    cmd_tx: UnboundedSender<DataCommand>,
33}
34
35impl AsyncDataCommandSender {
36    #[must_use]
37    pub const fn new(cmd_tx: UnboundedSender<DataCommand>) -> Self {
38        Self { cmd_tx }
39    }
40}
41
42impl DataCommandSender for AsyncDataCommandSender {
43    fn execute(&self, command: DataCommand) {
44        if let Err(e) = self.cmd_tx.send(command) {
45            log::error!("Failed to send data command: {e}");
46        }
47    }
48}
49
50/// Asynchronous implementation of `TimeEventSender` for live environments.
51#[derive(Debug, Clone)]
52pub struct AsyncTimeEventSender {
53    time_tx: UnboundedSender<TimeEventHandlerV2>,
54}
55
56impl AsyncTimeEventSender {
57    #[must_use]
58    pub const fn new(time_tx: UnboundedSender<TimeEventHandlerV2>) -> Self {
59        Self { time_tx }
60    }
61
62    /// Gets a clone of the underlying channel sender for async use.
63    ///
64    /// This allows async contexts to get a direct channel sender that
65    /// can be moved into async tasks without `RefCell` borrowing issues.
66    #[must_use]
67    pub fn get_channel_sender(&self) -> UnboundedSender<TimeEventHandlerV2> {
68        self.time_tx.clone()
69    }
70}
71
72impl TimeEventSender for AsyncTimeEventSender {
73    fn send(&self, handler: TimeEventHandlerV2) {
74        if let Err(e) = self.time_tx.send(handler) {
75            log::error!("Failed to send time event handler: {e}");
76        }
77    }
78}
79
80pub trait Runner {
81    fn run(&mut self);
82}
83
84pub struct AsyncRunner {
85    data_rx: UnboundedReceiver<DataEvent>,
86    cmd_rx: UnboundedReceiver<DataCommand>,
87    time_rx: UnboundedReceiver<TimeEventHandlerV2>,
88    signal_rx: UnboundedReceiver<()>,
89    signal_tx: UnboundedSender<()>,
90}
91
92impl Default for AsyncRunner {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98impl Debug for AsyncRunner {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct(stringify!(AsyncRunner)).finish()
101    }
102}
103
104impl AsyncRunner {
105    #[must_use]
106    pub fn new() -> Self {
107        let (data_tx, data_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
108        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
109        let (time_tx, time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
110        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
111
112        set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_tx)));
113        set_data_event_sender(data_tx);
114        set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(cmd_tx)));
115
116        Self {
117            data_rx,
118            cmd_rx,
119            time_rx,
120            signal_rx,
121            signal_tx,
122        }
123    }
124
125    /// Stops the runner with an internal shutdown signal.
126    pub fn stop(&self) {
127        if let Err(e) = self.signal_tx.send(()) {
128            log::error!("Failed to send shutdown signal: {e}");
129        }
130    }
131}
132
133impl AsyncRunner {
134    /// Runs the async runner event loop.
135    ///
136    /// This method processes data events, time events, and signal events in an async loop.
137    /// It will run until a signal is received or the event streams are closed.
138    pub async fn run(&mut self) {
139        log::info!("Starting AsyncRunner");
140
141        let data_engine_process = MessagingSwitchboard::data_engine_process();
142        let data_engine_response = MessagingSwitchboard::data_engine_response();
143        let data_engine_execute = MessagingSwitchboard::data_engine_execute();
144
145        loop {
146            // Collect the next message to process, including signal events
147            let next_msg = tokio::select! {
148                Some(resp) = self.data_rx.recv() => Some(RunnerEvent::Data(resp)),
149                Some(handler) = self.time_rx.recv() => Some(RunnerEvent::Time(handler)),
150                Some(cmd) = self.cmd_rx.recv() => {
151                    msgbus::send_any(data_engine_execute, &cmd);
152                    None // TODO: Refactor this
153                },
154                Some(()) = self.signal_rx.recv() => {
155                    tracing::info!("AsyncRunner received signal, shutting down");
156                    return; // Signal to stop
157                },
158                else => return, // Sentinel event ends run
159            };
160
161            tracing::trace!("Received {next_msg:?}");
162
163            if let Some(msg) = next_msg {
164                match msg {
165                    RunnerEvent::Time(handler) => handler.run(),
166                    RunnerEvent::Data(event) => {
167                        #[cfg(feature = "defi")]
168                        match event {
169                            DataEvent::Data(data) => msgbus::send_any(data_engine_process, &data),
170                            DataEvent::Response(resp) => {
171                                msgbus::send_any(data_engine_response, &resp);
172                            }
173                            DataEvent::DeFi(data) => {
174                                msgbus::send_any(data_engine_process, &data);
175                            }
176                        }
177                        #[cfg(not(feature = "defi"))]
178                        match event {
179                            DataEvent::Data(data) => msgbus::send_any(data_engine_process, &data),
180                            DataEvent::Response(resp) => {
181                                msgbus::send_any(data_engine_response, &resp)
182                            }
183                        }
184                    }
185                }
186            }
187        }
188    }
189}