nautilus_live/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, rc::Rc};
21
22use nautilus_common::{clock::LiveClock, component::Component, enums::Environment};
23use nautilus_core::UUID4;
24use nautilus_model::identifiers::TraderId;
25use nautilus_system::{
26    config::PoseiKernelConfig,
27    factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
28    kernel::PoseiKernel,
29};
30
31use crate::{config::LiveNodeConfig, runner::AsyncRunner};
32
33/// High-level abstraction for a live Posei system node.
34///
35/// Provides a simplified interface for running live systems
36/// with automatic client management and lifecycle handling.
37#[derive(Debug)]
38pub struct LiveNode {
39    clock: Rc<RefCell<LiveClock>>,
40    kernel: PoseiKernel,
41    runner: AsyncRunner,
42    config: LiveNodeConfig,
43    is_running: bool,
44}
45
46impl LiveNode {
47    /// Creates a new [`LiveNodeBuilder`] for fluent configuration.
48    ///
49    /// # Errors
50    ///
51    /// Returns an error if the environment is invalid for live trading.
52    pub fn builder(
53        name: String,
54        trader_id: TraderId,
55        environment: Environment,
56    ) -> anyhow::Result<LiveNodeBuilder> {
57        LiveNodeBuilder::new(name, trader_id, environment)
58    }
59
60    /// Creates a new [`LiveNode`] directly from a kernel name and optional configuration.
61    ///
62    /// This is a convenience method for creating a live node with a pre-configured
63    /// kernel configuration, bypassing the builder pattern. If no config is provided,
64    /// a default configuration will be used.
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if kernel construction fails.
69    pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
70        let mut config = config.unwrap_or_default();
71        config.environment = Environment::Live;
72
73        // Validate environment for live trading
74        match config.environment() {
75            Environment::Sandbox | Environment::Live => {}
76            Environment::Backtest => {
77                anyhow::bail!("LiveNode cannot be used with Backtest environment");
78            }
79        }
80
81        let clock = Rc::new(RefCell::new(LiveClock::new()));
82        let kernel = PoseiKernel::new(name, config.clone())?;
83        let runner = AsyncRunner::new(clock.clone());
84
85        log::info!("LiveNode built successfully with kernel config");
86
87        Ok(Self {
88            clock,
89            kernel,
90            runner,
91            config,
92            is_running: false,
93        })
94    }
95
96    /// Starts the live node.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if startup fails.
101    pub async fn start(&mut self) -> anyhow::Result<()> {
102        if self.is_running {
103            anyhow::bail!("LiveNode is already running");
104        }
105
106        log::info!("Starting live node");
107        self.kernel.start();
108        self.is_running = true;
109
110        log::info!("LiveNode started successfully");
111        Ok(())
112    }
113
114    /// Stop the live node.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if shutdown fails.
119    pub async fn stop(&mut self) -> anyhow::Result<()> {
120        if !self.is_running {
121            anyhow::bail!("LiveNode is not running");
122        }
123
124        log::info!("Stopping live node");
125        self.kernel.stop();
126        self.is_running = false;
127
128        log::info!("LiveNode stopped successfully");
129        Ok(())
130    }
131
132    /// Run the live node with automatic shutdown handling.
133    ///
134    /// This method will start the node, run indefinitely, and handle
135    /// graceful shutdown on interrupt signals.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if the node fails to start or encounters a runtime error.
140    pub async fn run_async(&mut self) -> anyhow::Result<()> {
141        self.start().await?;
142
143        // Set up signal handling
144        let sigint = tokio::signal::ctrl_c();
145
146        tokio::select! {
147            _ = sigint => {
148                log::info!("Received SIGINT, shutting down...");
149            }
150        }
151
152        self.stop().await?;
153        Ok(())
154    }
155
156    /// Gets the node's environment.
157    #[must_use]
158    pub fn environment(&self) -> Environment {
159        self.kernel.environment()
160    }
161
162    /// Gets a reference to the underlying kernel.
163    #[must_use]
164    pub const fn kernel(&self) -> &PoseiKernel {
165        &self.kernel
166    }
167
168    /// Gets the node's trader ID.
169    #[must_use]
170    pub fn trader_id(&self) -> TraderId {
171        self.kernel.trader_id()
172    }
173
174    /// Gets the node's instance ID.
175    #[must_use]
176    pub const fn instance_id(&self) -> UUID4 {
177        self.kernel.instance_id()
178    }
179
180    /// Checks if the live node is currently running.
181    #[must_use]
182    pub const fn is_running(&self) -> bool {
183        self.is_running
184    }
185
186    /// Adds an actor to the trader.
187    ///
188    /// This method provides a high-level interface for adding actors to the underlying
189    /// trader without requiring direct access to the kernel. Actors should be added
190    /// after the node is built but before starting the node.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if:
195    /// - The trader is not in a valid state for adding components.
196    /// - An actor with the same ID is already registered.
197    /// - The node is currently running.
198    pub fn add_actor(&mut self, actor: Box<dyn Component>) -> anyhow::Result<()> {
199        if self.is_running {
200            anyhow::bail!(
201                "Cannot add actor while node is running. Add actors before calling start()."
202            );
203        }
204
205        self.kernel.trader.add_actor(actor)
206    }
207}
208
209/// Builder for constructing a [`LiveNode`] with a fluent API.
210///
211/// Provides configuration options specific to live nodes,
212/// including client factory registration and timeout settings.
213#[derive(Debug)]
214pub struct LiveNodeBuilder {
215    config: LiveNodeConfig,
216    data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
217    exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
218    data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
219    exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
220}
221
222impl LiveNodeBuilder {
223    /// Creates a new [`LiveNodeBuilder`] with required parameters.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if `environment` is invalid (BACKTEST).
228    pub fn new(
229        name: String,
230        trader_id: TraderId,
231        environment: Environment,
232    ) -> anyhow::Result<Self> {
233        match environment {
234            Environment::Sandbox | Environment::Live => {}
235            Environment::Backtest => {
236                anyhow::bail!("LiveNode cannot be used with Backtest environment");
237            }
238        }
239
240        let config = LiveNodeConfig {
241            environment,
242            trader_id,
243            ..Default::default()
244        };
245
246        Ok(Self {
247            config,
248            data_client_factories: HashMap::new(),
249            exec_client_factories: HashMap::new(),
250            data_client_configs: HashMap::new(),
251            exec_client_configs: HashMap::new(),
252        })
253    }
254
255    /// Set the instance ID for the node.
256    #[must_use]
257    pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
258        self.config.instance_id = Some(instance_id);
259        self
260    }
261
262    /// Configure whether to load state on startup.
263    #[must_use]
264    pub const fn with_load_state(mut self, load_state: bool) -> Self {
265        self.config.load_state = load_state;
266        self
267    }
268
269    /// Configure whether to save state on shutdown.
270    #[must_use]
271    pub const fn with_save_state(mut self, save_state: bool) -> Self {
272        self.config.save_state = save_state;
273        self
274    }
275
276    /// Set the connection timeout in seconds.
277    #[must_use]
278    pub const fn with_timeout_connection(mut self, timeout: u32) -> Self {
279        self.config.timeout_connection = timeout;
280        self
281    }
282
283    /// Set the reconciliation timeout in seconds.
284    #[must_use]
285    pub const fn with_timeout_reconciliation(mut self, timeout: u32) -> Self {
286        self.config.timeout_reconciliation = timeout;
287        self
288    }
289
290    /// Set the portfolio initialization timeout in seconds.
291    #[must_use]
292    pub const fn with_timeout_portfolio(mut self, timeout: u32) -> Self {
293        self.config.timeout_portfolio = timeout;
294        self
295    }
296
297    /// Set the disconnection timeout in seconds.
298    #[must_use]
299    pub const fn with_timeout_disconnection(mut self, timeout: u32) -> Self {
300        self.config.timeout_disconnection = timeout;
301        self
302    }
303
304    /// Set the post-stop timeout in seconds.
305    #[must_use]
306    pub const fn with_timeout_post_stop(mut self, timeout: u32) -> Self {
307        self.config.timeout_post_stop = timeout;
308        self
309    }
310
311    /// Set the shutdown timeout in seconds.
312    #[must_use]
313    pub const fn with_timeout_shutdown(mut self, timeout: u32) -> Self {
314        self.config.timeout_shutdown = timeout;
315        self
316    }
317
318    /// Adds a data client with both factory and configuration.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if a client with the same name is already registered.
323    pub fn add_data_client(
324        mut self,
325        name: Option<String>,
326        factory: Box<dyn DataClientFactory>,
327        config: Box<dyn ClientConfig>,
328    ) -> anyhow::Result<Self> {
329        let name = name.unwrap_or_else(|| factory.name().to_string());
330
331        if self.data_client_factories.contains_key(&name) {
332            anyhow::bail!("Data client '{name}' is already registered");
333        }
334
335        self.data_client_factories.insert(name.clone(), factory);
336        self.data_client_configs.insert(name, config);
337        Ok(self)
338    }
339
340    /// Adds an execution client with both factory and configuration.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if a client with the same name is already registered.
345    pub fn add_exec_client(
346        mut self,
347        name: Option<String>,
348        factory: Box<dyn ExecutionClientFactory>,
349        config: Box<dyn ClientConfig>,
350    ) -> anyhow::Result<Self> {
351        let name = name.unwrap_or_else(|| factory.name().to_string());
352
353        if self.exec_client_factories.contains_key(&name) {
354            anyhow::bail!("Execution client '{name}' is already registered");
355        }
356
357        self.exec_client_factories.insert(name.clone(), factory);
358        self.exec_client_configs.insert(name, config);
359        Ok(self)
360    }
361
362    /// Build the [`LiveNode`] with the configured settings.
363    ///
364    /// This will:
365    /// 1. Build the underlying kernel
366    /// 2. Register all client factories
367    /// 3. Create and register all clients
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if node construction fails.
372    pub fn build(self) -> anyhow::Result<LiveNode> {
373        // TODO: Register client factories and create clients
374        // This would involve:
375        // 1. Creating clients using factories and configs
376        // 2. Registering clients with the data/execution engines
377        // 3. Setting up routing configurations
378
379        log::info!("LiveNode built successfully");
380
381        // Create kernel directly with the config
382        let clock = Rc::new(RefCell::new(LiveClock::new()));
383        let kernel = PoseiKernel::new("LiveNode".to_string(), self.config.clone())?;
384        let runner = AsyncRunner::new(clock.clone());
385
386        Ok(LiveNode {
387            clock,
388            kernel,
389            runner,
390            config: self.config,
391            is_running: false,
392        })
393    }
394}
395
396////////////////////////////////////////////////////////////////////////////////
397// Tests
398////////////////////////////////////////////////////////////////////////////////
399
400#[cfg(test)]
401mod tests {
402    use nautilus_model::identifiers::TraderId;
403    use rstest::*;
404
405    use super::*;
406
407    #[rstest]
408    fn test_trading_node_builder_creation() {
409        let result = LiveNode::builder(
410            "TestNode".to_string(),
411            TraderId::from("TRADER-001"),
412            Environment::Sandbox,
413        );
414
415        assert!(result.is_ok());
416    }
417
418    #[rstest]
419    fn test_trading_node_builder_rejects_backtest() {
420        let result = LiveNode::builder(
421            "TestNode".to_string(),
422            TraderId::from("TRADER-001"),
423            Environment::Backtest,
424        );
425
426        assert!(result.is_err());
427        assert!(
428            result
429                .unwrap_err()
430                .to_string()
431                .contains("Backtest environment")
432        );
433    }
434
435    #[rstest]
436    fn test_trading_node_builder_fluent_api() {
437        let result = LiveNode::builder(
438            "TestNode".to_string(),
439            TraderId::from("TRADER-001"),
440            Environment::Live,
441        );
442
443        assert!(result.is_ok());
444        let _builder = result
445            .unwrap()
446            .with_timeout_connection(30)
447            .with_load_state(false);
448
449        // Should not panic and methods should chain
450    }
451
452    #[rstest]
453    fn test_trading_node_build() {
454        #[cfg(feature = "python")]
455        pyo3::prepare_freethreaded_python();
456
457        let builder_result = LiveNode::builder(
458            "TestNode".to_string(),
459            TraderId::from("TRADER-001"),
460            Environment::Sandbox,
461        );
462
463        assert!(builder_result.is_ok());
464        let build_result = builder_result.unwrap().build();
465
466        assert!(build_result.is_ok());
467        let node = build_result.unwrap();
468        assert!(!node.is_running());
469        assert_eq!(node.environment(), Environment::Sandbox);
470    }
471
472    #[rstest]
473    fn test_builder_rejects_backtest_environment() {
474        let result = LiveNode::builder(
475            "TestNode".to_string(),
476            TraderId::from("TRADER-001"),
477            Environment::Backtest,
478        );
479
480        assert!(result.is_err());
481        assert!(
482            result
483                .unwrap_err()
484                .to_string()
485                .contains("Backtest environment")
486        );
487    }
488}