nautilus_backtest/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    cell::{OnceCell, RefCell},
22    collections::VecDeque,
23    rc::Rc,
24};
25
26use nautilus_common::{
27    clock::{Clock, TestClock},
28    messages::data::SubscribeCommand,
29    runner::{DataQueue, GlobalDataQueue},
30};
31use posei_trader::engine::DataEngine;
32
33/// # Panics
34///
35/// Panics if the global data queue has not been initialized.
36#[must_use]
37pub fn get_data_queue() -> Rc<RefCell<dyn DataQueue>> {
38    DATA_QUEUE
39        .try_with(|dq| {
40            dq.get()
41                .expect("Data queue should be initialized by runner")
42                .clone()
43        })
44        .expect("Should be able to access thread local storage")
45}
46
47/// # Panics
48///
49/// Panics if setting the global data queue fails or it is already set.
50pub fn set_data_queue(dq: Rc<RefCell<dyn DataQueue>>) {
51    DATA_QUEUE
52        .try_with(|deque| {
53            assert!(deque.set(dq).is_ok(), "Global data queue already set");
54        })
55        .expect("Should be able to access thread local storage");
56}
57
58pub type GlobalClock = Rc<RefCell<dyn Clock>>;
59
60/// # Panics
61///
62/// Panics if the global clock has not been initialized.
63#[must_use]
64pub fn get_clock() -> Rc<RefCell<dyn Clock>> {
65    CLOCK
66        .try_with(|clock| {
67            clock
68                .get()
69                .expect("Clock should be initialized by runner")
70                .clone()
71        })
72        .expect("Should be able to access thread local storage")
73}
74
75/// # Panics
76///
77/// Panics if setting the global clock fails or it is already set.
78pub fn set_clock(c: Rc<RefCell<dyn Clock>>) {
79    CLOCK
80        .try_with(|clock| {
81            assert!(clock.set(c).is_ok(), "Global clock already set");
82        })
83        .expect("Should be able to access thread local clock");
84}
85
86pub type MessageBusCommands = Rc<RefCell<VecDeque<SubscribeCommand>>>;
87
88/// Get globally shared message bus command queue
89///
90/// # Panics
91///
92/// Panics if the global message bus commands have not been initialized.
93#[must_use]
94pub fn get_msgbus_cmd() -> MessageBusCommands {
95    MSGBUS_CMD
96        .try_with(std::clone::Clone::clone)
97        .expect("Should be able to access thread local storage")
98}
99
100thread_local! {
101    static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
102    static DATA_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
103    static MSGBUS_CMD: MessageBusCommands = Rc::new(RefCell::new(VecDeque::new()));
104}
105
106// TODO: Determine how to deduplicate trait
107pub trait Runner {
108    fn new() -> Self;
109    fn run(&mut self, engine: &mut DataEngine);
110}
111
112#[derive(Debug)]
113pub struct SyncRunner {
114    pub clock: Rc<RefCell<TestClock>>,
115}
116
117// TODO: Untangle puzzle later (can use common trait once message bus wired up)
118// impl Runner for BacktestRunner {
119//     fn new() -> Self {
120//         let clock = Rc::new(RefCell::new(TestClock::new()));
121//         set_clock(clock.clone());
122//
123//         let dq = Rc::new(RefCell::new(SyncDataQueue(VecDeque::new())));
124//         set_data_queue(dq.clone());
125//         Self { dq, clock }
126//     }
127//
128//     fn run(&mut self, engine: &mut DataEngine) {
129//         let msgbus_cmd = get_msgbus_cmd();
130//
131//         while let Some(resp) = self.dq.as_ref().borrow_mut().0.pop_front() {
132//             match resp {
133//                 DataEvent::Response(resp) => engine.response(resp),
134//                 DataEvent::Data(data) => {
135//                     while let Some(sub_cmd) = msgbus_cmd.borrow_mut().pop_front() {
136//                         engine.execute(sub_cmd);
137//                     }
138//
139//                     // Advance clock time and collect all triggered events and handlers
140//                     let handlers: Vec<TimeEventHandlerV2> = {
141//                         let mut guard = self.clock.borrow_mut();
142//                         guard.advance_to_time_on_heap(data.ts_init());
143//                         guard.by_ref().collect()
144//                         // drop guard
145//                     };
146//
147//                     // Execute all handlers before processing the data
148//                     handlers.into_iter().for_each(TimeEventHandlerV2::run);
149//
150//                     engine.process_data(data);
151//                 }
152//             }
153//         }
154//     }
155// }
156
157#[cfg(test)]
158#[cfg(feature = "clock_v2")]
159mod tests {
160    use std::{cell::RefCell, rc::Rc};
161
162    use nautilus_common::{
163        clock::TestClock,
164        timer::{TimeEvent, TimeEventCallback},
165    };
166    use rstest::rstest;
167
168    use super::{get_clock, set_clock};
169
170    #[rstest]
171    fn test_global_test_clock() {
172        let test_clock = Rc::new(RefCell::new(TestClock::new()));
173        set_clock(test_clock.clone());
174
175        // component/actor adding an alert
176        let _ = get_clock().borrow_mut().set_time_alert_ns(
177            "hola",
178            2.into(),
179            Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
180            None,
181        );
182
183        // runner pulling advancing and pulling from event stream
184        test_clock.borrow_mut().advance_to_time_on_heap(3.into());
185        assert!(test_clock.borrow_mut().next().is_some());
186    }
187}