nautilus_common/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Global runtime machinery and thread-local storage.
17//!
18//! This module provides global access to shared runtime resources including clocks,
19//! message queues, and time event channels. It manages thread-local storage for
20//! system-wide components that need to be accessible across threads.
21
22use std::{cell::OnceCell, fmt::Debug, sync::Arc};
23
24use crate::{
25    messages::{DataEvent, data::DataCommand},
26    msgbus::{self, switchboard::MessagingSwitchboard},
27    timer::TimeEventHandlerV2,
28};
29
30// Represents different event types for the runner.
31#[allow(clippy::large_enum_variant)]
32#[derive(Debug)]
33pub enum RunnerEvent {
34    Time(TimeEventHandlerV2),
35    Data(DataEvent),
36}
37
38/// Trait for data command sending that can be implemented for both sync and async runners.
39pub trait DataCommandSender {
40    /// Executes a data command.
41    ///
42    /// - **Sync runners** send the command to a queue for synchronous execution.
43    /// - **Async runners** send the command to a channel for asynchronous execution.
44    fn execute(&self, command: DataCommand);
45}
46
47/// Synchronous implementation of DataCommandSender for backtest environments.
48#[derive(Debug)]
49pub struct SyncDataCommandSender;
50
51impl DataCommandSender for SyncDataCommandSender {
52    fn execute(&self, command: DataCommand) {
53        // TODO: Placeholder, we still need to queue and drain even for sync
54        let endpoint = MessagingSwitchboard::data_engine_execute();
55        msgbus::send_any(endpoint, &command);
56    }
57}
58
59/// Gets the global data command sender.
60///
61/// # Panics
62///
63/// Panics if the sender is uninitialized.
64#[must_use]
65pub fn get_data_cmd_sender() -> Arc<dyn DataCommandSender> {
66    DATA_CMD_SENDER.with(|sender| {
67        sender
68            .get()
69            .expect("Data command sender should be initialized by runner")
70            .clone()
71    })
72}
73
74/// Sets the global data command sender.
75///
76/// This should be called by the runner when it initializes.
77/// Can only be called once per thread.
78///
79/// # Panics
80///
81/// Panics if a sender has already been set.
82pub fn set_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
83    DATA_CMD_SENDER.with(|s| {
84        if s.set(sender).is_err() {
85            panic!("Data command sender can only be set once");
86        }
87    });
88}
89
90/// Trait for time event sending that can be implemented for both sync and async runners.
91pub trait TimeEventSender: Debug + Send + Sync {
92    /// Sends a time event handler.
93    fn send(&self, handler: TimeEventHandlerV2);
94}
95
96/// Gets the global time event sender.
97///
98/// # Panics
99///
100/// Panics if the sender is uninitialized.
101#[must_use]
102pub fn get_time_event_sender() -> Arc<dyn TimeEventSender> {
103    TIME_EVENT_SENDER.with(|sender| {
104        sender
105            .get()
106            .expect("Time event sender should be initialized by runner")
107            .clone()
108    })
109}
110
111/// Attempts to get the global time event sender without panicking.
112///
113/// Returns `None` if the sender is not initialized (e.g., in test environments).
114#[must_use]
115pub fn try_get_time_event_sender() -> Option<Arc<dyn TimeEventSender>> {
116    TIME_EVENT_SENDER.with(|sender| sender.get().cloned())
117}
118
119/// Sets the global time event sender.
120///
121/// Can only be called once per thread.
122///
123/// # Panics
124///
125/// Panics if a sender has already been set.
126pub fn set_time_event_sender(sender: Arc<dyn TimeEventSender>) {
127    TIME_EVENT_SENDER.with(|s| {
128        if s.set(sender).is_err() {
129            panic!("Time event sender can only be set once");
130        }
131    });
132}
133
134/// Gets the global data event sender.
135///
136/// # Panics
137///
138/// Panics if the sender is uninitialized.
139#[must_use]
140pub fn get_data_event_sender() -> tokio::sync::mpsc::UnboundedSender<DataEvent> {
141    DATA_EVENT_SENDER.with(|sender| {
142        sender
143            .get()
144            .expect("Data event sender should be initialized by runner")
145            .clone()
146    })
147}
148
149/// Sets the global data event sender.
150///
151/// Can only be called once per thread.
152///
153/// # Panics
154///
155/// Panics if a sender has already been set.
156pub fn set_data_event_sender(sender: tokio::sync::mpsc::UnboundedSender<DataEvent>) {
157    DATA_EVENT_SENDER.with(|s| {
158        if s.set(sender).is_err() {
159            panic!("Data event sender can only be set once");
160        }
161    });
162}
163
164// TODO: We can refine this for the synch runner later, data event sender won't be required
165thread_local! {
166    static TIME_EVENT_SENDER: OnceCell<Arc<dyn TimeEventSender>> = const { OnceCell::new() };
167    static DATA_EVENT_SENDER: OnceCell<tokio::sync::mpsc::UnboundedSender<DataEvent>> = const { OnceCell::new() };
168    static DATA_CMD_SENDER: OnceCell<Arc<dyn DataCommandSender>> = const { OnceCell::new() };
169}