nautilus_common/runner.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3// https://poseitrader.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Global runtime machinery and thread-local storage.
17//!
18//! This module provides global access to shared runtime resources including clocks,
19//! message queues, and time event channels. It manages thread-local storage for
20//! system-wide components that need to be accessible across threads.
21
22use std::{cell::OnceCell, fmt::Debug, sync::Arc};
23
24use crate::{
25 messages::{DataEvent, data::DataCommand},
26 msgbus::{self, switchboard::MessagingSwitchboard},
27 timer::TimeEventHandlerV2,
28};
29
30// Represents different event types for the runner.
31#[allow(clippy::large_enum_variant)]
32#[derive(Debug)]
33pub enum RunnerEvent {
34 Time(TimeEventHandlerV2),
35 Data(DataEvent),
36}
37
38/// Trait for data command sending that can be implemented for both sync and async runners.
39pub trait DataCommandSender {
40 /// Executes a data command.
41 ///
42 /// - **Sync runners** send the command to a queue for synchronous execution.
43 /// - **Async runners** send the command to a channel for asynchronous execution.
44 fn execute(&self, command: DataCommand);
45}
46
47/// Synchronous implementation of DataCommandSender for backtest environments.
48#[derive(Debug)]
49pub struct SyncDataCommandSender;
50
51impl DataCommandSender for SyncDataCommandSender {
52 fn execute(&self, command: DataCommand) {
53 // TODO: Placeholder, we still need to queue and drain even for sync
54 let endpoint = MessagingSwitchboard::data_engine_execute();
55 msgbus::send_any(endpoint, &command);
56 }
57}
58
59/// Gets the global data command sender.
60///
61/// # Panics
62///
63/// Panics if the sender is uninitialized.
64#[must_use]
65pub fn get_data_cmd_sender() -> Arc<dyn DataCommandSender> {
66 DATA_CMD_SENDER.with(|sender| {
67 sender
68 .get()
69 .expect("Data command sender should be initialized by runner")
70 .clone()
71 })
72}
73
74/// Sets the global data command sender.
75///
76/// This should be called by the runner when it initializes.
77/// Can only be called once per thread.
78///
79/// # Panics
80///
81/// Panics if a sender has already been set.
82pub fn set_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
83 DATA_CMD_SENDER.with(|s| {
84 if s.set(sender).is_err() {
85 panic!("Data command sender can only be set once");
86 }
87 });
88}
89
90/// Trait for time event sending that can be implemented for both sync and async runners.
91pub trait TimeEventSender: Debug + Send + Sync {
92 /// Sends a time event handler.
93 fn send(&self, handler: TimeEventHandlerV2);
94}
95
96/// Gets the global time event sender.
97///
98/// # Panics
99///
100/// Panics if the sender is uninitialized.
101#[must_use]
102pub fn get_time_event_sender() -> Arc<dyn TimeEventSender> {
103 TIME_EVENT_SENDER.with(|sender| {
104 sender
105 .get()
106 .expect("Time event sender should be initialized by runner")
107 .clone()
108 })
109}
110
111/// Attempts to get the global time event sender without panicking.
112///
113/// Returns `None` if the sender is not initialized (e.g., in test environments).
114#[must_use]
115pub fn try_get_time_event_sender() -> Option<Arc<dyn TimeEventSender>> {
116 TIME_EVENT_SENDER.with(|sender| sender.get().cloned())
117}
118
119/// Sets the global time event sender.
120///
121/// Can only be called once per thread.
122///
123/// # Panics
124///
125/// Panics if a sender has already been set.
126pub fn set_time_event_sender(sender: Arc<dyn TimeEventSender>) {
127 TIME_EVENT_SENDER.with(|s| {
128 if s.set(sender).is_err() {
129 panic!("Time event sender can only be set once");
130 }
131 });
132}
133
134/// Gets the global data event sender.
135///
136/// # Panics
137///
138/// Panics if the sender is uninitialized.
139#[must_use]
140pub fn get_data_event_sender() -> tokio::sync::mpsc::UnboundedSender<DataEvent> {
141 DATA_EVENT_SENDER.with(|sender| {
142 sender
143 .get()
144 .expect("Data event sender should be initialized by runner")
145 .clone()
146 })
147}
148
149/// Sets the global data event sender.
150///
151/// Can only be called once per thread.
152///
153/// # Panics
154///
155/// Panics if a sender has already been set.
156pub fn set_data_event_sender(sender: tokio::sync::mpsc::UnboundedSender<DataEvent>) {
157 DATA_EVENT_SENDER.with(|s| {
158 if s.set(sender).is_err() {
159 panic!("Data event sender can only be set once");
160 }
161 });
162}
163
164// TODO: We can refine this for the synch runner later, data event sender won't be required
165thread_local! {
166 static TIME_EVENT_SENDER: OnceCell<Arc<dyn TimeEventSender>> = const { OnceCell::new() };
167 static DATA_EVENT_SENDER: OnceCell<tokio::sync::mpsc::UnboundedSender<DataEvent>> = const { OnceCell::new() };
168 static DATA_CMD_SENDER: OnceCell<Arc<dyn DataCommandSender>> = const { OnceCell::new() };
169}