nautilus_live/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, rc::Rc};
21
22use nautilus_common::{
23    actor::{Actor, DataActor},
24    clock::LiveClock,
25    component::Component,
26    enums::Environment,
27};
28use nautilus_core::UUID4;
29use posei_trader::client::DataClientAdapter;
30use nautilus_model::identifiers::TraderId;
31use nautilus_system::{
32    config::PoseiKernelConfig,
33    factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
34    kernel::PoseiKernel,
35};
36
37use crate::{config::LiveNodeConfig, runner::AsyncRunner};
38
39/// High-level abstraction for a live Posei system node.
40///
41/// Provides a simplified interface for running live systems
42/// with automatic client management and lifecycle handling.
43#[derive(Debug)]
44pub struct LiveNode {
45    clock: Rc<RefCell<LiveClock>>,
46    kernel: PoseiKernel,
47    runner: AsyncRunner,
48    config: LiveNodeConfig,
49    is_running: bool,
50}
51
52impl LiveNode {
53    /// Creates a new [`LiveNodeBuilder`] for fluent configuration.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the environment is invalid for live trading.
58    pub fn builder(
59        name: String,
60        trader_id: TraderId,
61        environment: Environment,
62    ) -> anyhow::Result<LiveNodeBuilder> {
63        LiveNodeBuilder::new(name, trader_id, environment)
64    }
65
66    /// Creates a new [`LiveNode`] directly from a kernel name and optional configuration.
67    ///
68    /// This is a convenience method for creating a live node with a pre-configured
69    /// kernel configuration, bypassing the builder pattern. If no config is provided,
70    /// a default configuration will be used.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if kernel construction fails.
75    pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
76        let mut config = config.unwrap_or_default();
77        config.environment = Environment::Live;
78
79        // Validate environment for live trading
80        match config.environment() {
81            Environment::Sandbox | Environment::Live => {}
82            Environment::Backtest => {
83                anyhow::bail!("LiveNode cannot be used with Backtest environment");
84            }
85        }
86
87        let runner = AsyncRunner::new();
88        let clock = Rc::new(RefCell::new(LiveClock::default()));
89        let kernel = PoseiKernel::new(name, config.clone())?;
90
91        log::info!("LiveNode built successfully with kernel config");
92
93        Ok(Self {
94            clock,
95            kernel,
96            runner,
97            config,
98            is_running: false,
99        })
100    }
101
102    /// Starts the live node.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if startup fails.
107    pub async fn start(&mut self) -> anyhow::Result<()> {
108        if self.is_running {
109            anyhow::bail!("LiveNode is already running");
110        }
111
112        log::info!("Starting LiveNode");
113
114        self.kernel.start_async().await;
115        self.is_running = true;
116
117        log::info!("LiveNode started successfully");
118        Ok(())
119    }
120
121    /// Stop the live node.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if shutdown fails.
126    pub async fn stop(&mut self) -> anyhow::Result<()> {
127        if !self.is_running {
128            anyhow::bail!("LiveNode is not running");
129        }
130
131        log::info!("Stopping LiveNode");
132
133        self.kernel.stop_async().await;
134        self.is_running = false;
135
136        log::info!("LiveNode stopped successfully");
137        Ok(())
138    }
139
140    /// Run the live node with automatic shutdown handling.
141    ///
142    /// This method will start the node, run indefinitely, and handle
143    /// graceful shutdown on interrupt signals.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the node fails to start or encounters a runtime error.
148    pub async fn run(&mut self) -> anyhow::Result<()> {
149        self.start().await?;
150
151        tokio::select! {
152            // Run on main thread
153            () = self.runner.run() => {
154                log::info!("AsyncRunner finished");
155            }
156            // Handle SIGINT signal
157            result = tokio::signal::ctrl_c() => {
158                match result {
159                    Ok(()) => {
160                        log::info!("Received SIGINT, shutting down");
161                        self.runner.stop();
162                        // Give the AsyncRunner a moment to process the shutdown signal
163                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
164                    }
165                    Err(e) => {
166                        log::error!("Failed to listen for SIGINT: {e}");
167                    }
168                }
169            }
170        }
171
172        log::debug!("AsyncRunner and signal handling finished"); // TODO: Temp logging
173
174        self.stop().await?;
175        Ok(())
176    }
177
178    /// Gets the node's environment.
179    #[must_use]
180    pub fn environment(&self) -> Environment {
181        self.kernel.environment()
182    }
183
184    /// Gets a reference to the underlying kernel.
185    #[must_use]
186    pub const fn kernel(&self) -> &PoseiKernel {
187        &self.kernel
188    }
189
190    /// Gets the node's trader ID.
191    #[must_use]
192    pub fn trader_id(&self) -> TraderId {
193        self.kernel.trader_id()
194    }
195
196    /// Gets the node's instance ID.
197    #[must_use]
198    pub const fn instance_id(&self) -> UUID4 {
199        self.kernel.instance_id()
200    }
201
202    /// Checks if the live node is currently running.
203    #[must_use]
204    pub const fn is_running(&self) -> bool {
205        self.is_running
206    }
207
208    /// Adds an actor to the trader.
209    ///
210    /// This method provides a high-level interface for adding actors to the underlying
211    /// trader without requiring direct access to the kernel. Actors should be added
212    /// after the node is built but before starting the node.
213    ///
214    /// # Errors
215    ///
216    /// Returns an error if:
217    /// - The trader is not in a valid state for adding components.
218    /// - An actor with the same ID is already registered.
219    /// - The node is currently running.
220    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
221    where
222        T: DataActor + Component + Actor + 'static,
223    {
224        if self.is_running {
225            anyhow::bail!(
226                "Cannot add actor while node is running. Add actors before calling start()."
227            );
228        }
229
230        self.kernel.trader.add_actor(actor)
231    }
232}
233
234/// Builder for constructing a [`LiveNode`] with a fluent API.
235///
236/// Provides configuration options specific to live nodes,
237/// including client factory registration and timeout settings.
238#[derive(Debug)]
239pub struct LiveNodeBuilder {
240    config: LiveNodeConfig,
241    data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
242    exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
243    data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
244    exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
245}
246
247impl LiveNodeBuilder {
248    /// Creates a new [`LiveNodeBuilder`] with required parameters.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if `environment` is invalid (BACKTEST).
253    pub fn new(
254        name: String,
255        trader_id: TraderId,
256        environment: Environment,
257    ) -> anyhow::Result<Self> {
258        match environment {
259            Environment::Sandbox | Environment::Live => {}
260            Environment::Backtest => {
261                anyhow::bail!("LiveNode cannot be used with Backtest environment");
262            }
263        }
264
265        let config = LiveNodeConfig {
266            environment,
267            trader_id,
268            ..Default::default()
269        };
270
271        Ok(Self {
272            config,
273            data_client_factories: HashMap::new(),
274            exec_client_factories: HashMap::new(),
275            data_client_configs: HashMap::new(),
276            exec_client_configs: HashMap::new(),
277        })
278    }
279
280    /// Set the instance ID for the node.
281    #[must_use]
282    pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
283        self.config.instance_id = Some(instance_id);
284        self
285    }
286
287    /// Configure whether to load state on startup.
288    #[must_use]
289    pub const fn with_load_state(mut self, load_state: bool) -> Self {
290        self.config.load_state = load_state;
291        self
292    }
293
294    /// Configure whether to save state on shutdown.
295    #[must_use]
296    pub const fn with_save_state(mut self, save_state: bool) -> Self {
297        self.config.save_state = save_state;
298        self
299    }
300
301    /// Set the connection timeout in seconds.
302    #[must_use]
303    pub const fn with_timeout_connection(mut self, timeout: u32) -> Self {
304        self.config.timeout_connection = timeout;
305        self
306    }
307
308    /// Set the reconciliation timeout in seconds.
309    #[must_use]
310    pub const fn with_timeout_reconciliation(mut self, timeout: u32) -> Self {
311        self.config.timeout_reconciliation = timeout;
312        self
313    }
314
315    /// Set the portfolio initialization timeout in seconds.
316    #[must_use]
317    pub const fn with_timeout_portfolio(mut self, timeout: u32) -> Self {
318        self.config.timeout_portfolio = timeout;
319        self
320    }
321
322    /// Set the disconnection timeout in seconds.
323    #[must_use]
324    pub const fn with_timeout_disconnection(mut self, timeout: u32) -> Self {
325        self.config.timeout_disconnection = timeout;
326        self
327    }
328
329    /// Set the post-stop timeout in seconds.
330    #[must_use]
331    pub const fn with_timeout_post_stop(mut self, timeout: u32) -> Self {
332        self.config.timeout_post_stop = timeout;
333        self
334    }
335
336    /// Set the shutdown timeout in seconds.
337    #[must_use]
338    pub const fn with_timeout_shutdown(mut self, timeout: u32) -> Self {
339        self.config.timeout_shutdown = timeout;
340        self
341    }
342
343    /// Adds a data client with both factory and configuration.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if a client with the same name is already registered.
348    pub fn add_data_client<F, C>(
349        mut self,
350        name: Option<String>,
351        factory: F,
352        config: C,
353    ) -> anyhow::Result<Self>
354    where
355        F: DataClientFactory + 'static,
356        C: ClientConfig + 'static,
357    {
358        let name = name.unwrap_or_else(|| factory.name().to_string());
359
360        if self.data_client_factories.contains_key(&name) {
361            anyhow::bail!("Data client '{name}' is already registered");
362        }
363
364        self.data_client_factories
365            .insert(name.clone(), Box::new(factory));
366        self.data_client_configs.insert(name, Box::new(config));
367        Ok(self)
368    }
369
370    /// Adds an execution client with both factory and configuration.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if a client with the same name is already registered.
375    pub fn add_exec_client<F, C>(
376        mut self,
377        name: Option<String>,
378        factory: F,
379        config: C,
380    ) -> anyhow::Result<Self>
381    where
382        F: ExecutionClientFactory + 'static,
383        C: ClientConfig + 'static,
384    {
385        let name = name.unwrap_or_else(|| factory.name().to_string());
386
387        if self.exec_client_factories.contains_key(&name) {
388            anyhow::bail!("Execution client '{name}' is already registered");
389        }
390
391        self.exec_client_factories
392            .insert(name.clone(), Box::new(factory));
393        self.exec_client_configs.insert(name, Box::new(config));
394        Ok(self)
395    }
396
397    /// Build the [`LiveNode`] with the configured settings.
398    ///
399    /// This will:
400    /// 1. Build the underlying kernel.
401    /// 2. Register all client factories.
402    /// 3. Create and register all clients.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if node construction fails.
407    pub fn build(mut self) -> anyhow::Result<LiveNode> {
408        log::info!(
409            "Building LiveNode with {} data clients",
410            self.data_client_factories.len()
411        );
412
413        let runner = AsyncRunner::new();
414        let clock = Rc::new(RefCell::new(LiveClock::default()));
415        let kernel = PoseiKernel::new("LiveNode".to_string(), self.config.clone())?;
416
417        // Create and register data clients
418        for (name, factory) in self.data_client_factories {
419            if let Some(config) = self.data_client_configs.remove(&name) {
420                log::info!("Creating data client '{name}'");
421
422                let client =
423                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
424
425                log::info!("Registering data client '{name}' with data engine");
426
427                let client_id = client.client_id();
428                let venue = client.venue();
429                let adapter = DataClientAdapter::new(
430                    client_id, venue, true, // handles_order_book_deltas
431                    true, // handles_order_book_snapshots
432                    client,
433                );
434
435                kernel
436                    .data_engine
437                    .borrow_mut()
438                    .register_client(adapter, venue);
439
440                log::info!("Successfully registered data client '{name}' ({client_id})");
441            } else {
442                log::warn!("No config found for data client factory '{name}'");
443            }
444        }
445
446        // Create and register execution clients
447        for (name, factory) in self.exec_client_factories {
448            if let Some(config) = self.exec_client_configs.remove(&name) {
449                log::info!("Creating execution client '{name}'");
450
451                let client =
452                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
453
454                log::info!("Registering execution client '{name}' with execution engine");
455
456                // TODO: Implement when ExecutionEngine has a register_client method
457                // kernel.exec_engine().register_client(client);
458            } else {
459                log::warn!("No config found for execution client factory '{name}'");
460            }
461        }
462
463        log::info!("LiveNode built successfully");
464
465        Ok(LiveNode {
466            clock,
467            kernel,
468            runner,
469            config: self.config,
470            is_running: false,
471        })
472    }
473}
474
475////////////////////////////////////////////////////////////////////////////////
476// Tests
477////////////////////////////////////////////////////////////////////////////////
478
479#[cfg(test)]
480mod tests {
481    use nautilus_model::identifiers::TraderId;
482    use rstest::*;
483
484    use super::*;
485
486    #[rstest]
487    fn test_trading_node_builder_creation() {
488        let result = LiveNode::builder(
489            "TestNode".to_string(),
490            TraderId::from("TRADER-001"),
491            Environment::Sandbox,
492        );
493
494        assert!(result.is_ok());
495    }
496
497    #[rstest]
498    fn test_trading_node_builder_rejects_backtest() {
499        let result = LiveNode::builder(
500            "TestNode".to_string(),
501            TraderId::from("TRADER-001"),
502            Environment::Backtest,
503        );
504
505        assert!(result.is_err());
506        assert!(
507            result
508                .unwrap_err()
509                .to_string()
510                .contains("Backtest environment")
511        );
512    }
513
514    #[rstest]
515    fn test_trading_node_builder_fluent_api() {
516        let result = LiveNode::builder(
517            "TestNode".to_string(),
518            TraderId::from("TRADER-001"),
519            Environment::Live,
520        );
521
522        assert!(result.is_ok());
523        let _builder = result
524            .unwrap()
525            .with_timeout_connection(30)
526            .with_load_state(false);
527
528        // Should not panic and methods should chain
529    }
530
531    #[rstest]
532    fn test_trading_node_build() {
533        #[cfg(feature = "python")]
534        pyo3::prepare_freethreaded_python();
535
536        let builder_result = LiveNode::builder(
537            "TestNode".to_string(),
538            TraderId::from("TRADER-001"),
539            Environment::Sandbox,
540        );
541
542        assert!(builder_result.is_ok());
543        let build_result = builder_result.unwrap().build();
544
545        assert!(build_result.is_ok());
546        let node = build_result.unwrap();
547        assert!(!node.is_running());
548        assert_eq!(node.environment(), Environment::Sandbox);
549    }
550
551    #[rstest]
552    fn test_builder_rejects_backtest_environment() {
553        let result = LiveNode::builder(
554            "TestNode".to_string(),
555            TraderId::from("TRADER-001"),
556            Environment::Backtest,
557        );
558
559        assert!(result.is_err());
560        assert!(
561            result
562                .unwrap_err()
563                .to_string()
564                .contains("Backtest environment")
565        );
566    }
567}