nautilus_backtest/runner.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3// https://poseitrader.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 cell::{OnceCell, RefCell},
22 collections::VecDeque,
23 rc::Rc,
24};
25
26use nautilus_common::{
27 clock::{Clock, TestClock},
28 messages::data::SubscribeCommand,
29 runner::{DataQueue, GlobalDataQueue},
30};
31use posei_trader::engine::DataEngine;
32
33/// # Panics
34///
35/// Panics if the global data queue has not been initialized.
36#[must_use]
37pub fn get_data_queue() -> Rc<RefCell<dyn DataQueue>> {
38 DATA_QUEUE
39 .try_with(|dq| {
40 dq.get()
41 .expect("Data queue should be initialized by runner")
42 .clone()
43 })
44 .expect("Should be able to access thread local storage")
45}
46
47/// # Panics
48///
49/// Panics if setting the global data queue fails or it is already set.
50pub fn set_data_queue(dq: Rc<RefCell<dyn DataQueue>>) {
51 DATA_QUEUE
52 .try_with(|deque| {
53 assert!(deque.set(dq).is_ok(), "Global data queue already set");
54 })
55 .expect("Should be able to access thread local storage");
56}
57
58pub type GlobalClock = Rc<RefCell<dyn Clock>>;
59
60/// # Panics
61///
62/// Panics if the global clock has not been initialized.
63#[must_use]
64pub fn get_clock() -> Rc<RefCell<dyn Clock>> {
65 CLOCK
66 .try_with(|clock| {
67 clock
68 .get()
69 .expect("Clock should be initialized by runner")
70 .clone()
71 })
72 .expect("Should be able to access thread local storage")
73}
74
75/// # Panics
76///
77/// Panics if setting the global clock fails or it is already set.
78pub fn set_clock(c: Rc<RefCell<dyn Clock>>) {
79 CLOCK
80 .try_with(|clock| {
81 assert!(clock.set(c).is_ok(), "Global clock already set");
82 })
83 .expect("Should be able to access thread local clock");
84}
85
86pub type MessageBusCommands = Rc<RefCell<VecDeque<SubscribeCommand>>>;
87
88/// Get globally shared message bus command queue
89///
90/// # Panics
91///
92/// Panics if the global message bus commands have not been initialized.
93#[must_use]
94pub fn get_msgbus_cmd() -> MessageBusCommands {
95 MSGBUS_CMD
96 .try_with(std::clone::Clone::clone)
97 .expect("Should be able to access thread local storage")
98}
99
100thread_local! {
101 static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
102 static DATA_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
103 static MSGBUS_CMD: MessageBusCommands = Rc::new(RefCell::new(VecDeque::new()));
104}
105
106// TODO: Determine how to deduplicate trait
107pub trait Runner {
108 fn new() -> Self;
109 fn run(&mut self, engine: &mut DataEngine);
110}
111
112#[derive(Debug)]
113pub struct SyncRunner {
114 pub clock: Rc<RefCell<TestClock>>,
115}
116
117// TODO: Untangle puzzle later (can use common trait once message bus wired up)
118// impl Runner for BacktestRunner {
119// fn new() -> Self {
120// let clock = Rc::new(RefCell::new(TestClock::new()));
121// set_clock(clock.clone());
122//
123// let dq = Rc::new(RefCell::new(SyncDataQueue(VecDeque::new())));
124// set_data_queue(dq.clone());
125// Self { dq, clock }
126// }
127//
128// fn run(&mut self, engine: &mut DataEngine) {
129// let msgbus_cmd = get_msgbus_cmd();
130//
131// while let Some(resp) = self.dq.as_ref().borrow_mut().0.pop_front() {
132// match resp {
133// DataEvent::Response(resp) => engine.response(resp),
134// DataEvent::Data(data) => {
135// while let Some(sub_cmd) = msgbus_cmd.borrow_mut().pop_front() {
136// engine.execute(sub_cmd);
137// }
138//
139// // Advance clock time and collect all triggered events and handlers
140// let handlers: Vec<TimeEventHandlerV2> = {
141// let mut guard = self.clock.borrow_mut();
142// guard.advance_to_time_on_heap(data.ts_init());
143// guard.by_ref().collect()
144// // drop guard
145// };
146//
147// // Execute all handlers before processing the data
148// handlers.into_iter().for_each(TimeEventHandlerV2::run);
149//
150// engine.process_data(data);
151// }
152// }
153// }
154// }
155// }
156
157#[cfg(test)]
158#[cfg(feature = "clock_v2")]
159mod tests {
160 use std::{cell::RefCell, rc::Rc};
161
162 use nautilus_common::{
163 clock::TestClock,
164 timer::{TimeEvent, TimeEventCallback},
165 };
166 use rstest::rstest;
167
168 use super::{get_clock, set_clock};
169
170 #[rstest]
171 fn test_global_test_clock() {
172 let test_clock = Rc::new(RefCell::new(TestClock::new()));
173 set_clock(test_clock.clone());
174
175 // component/actor adding an alert
176 let _ = get_clock().borrow_mut().set_time_alert_ns(
177 "hola",
178 2.into(),
179 Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
180 None,
181 );
182
183 // runner pulling advancing and pulling from event stream
184 test_clock.borrow_mut().advance_to_time_on_heap(3.into());
185 assert!(test_clock.borrow_mut().next().is_some());
186 }
187}