1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, rc::Rc};
21
22use nautilus_common::{clock::LiveClock, component::Component, enums::Environment};
23use nautilus_core::UUID4;
24use nautilus_model::identifiers::TraderId;
25use nautilus_system::{
26 config::PoseiKernelConfig,
27 factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
28 kernel::PoseiKernel,
29};
30
31use crate::{config::LiveNodeConfig, runner::AsyncRunner};
32
33#[derive(Debug)]
38pub struct LiveNode {
39 clock: Rc<RefCell<LiveClock>>,
40 kernel: PoseiKernel,
41 runner: AsyncRunner,
42 config: LiveNodeConfig,
43 is_running: bool,
44}
45
46impl LiveNode {
47 pub fn builder(
53 name: String,
54 trader_id: TraderId,
55 environment: Environment,
56 ) -> anyhow::Result<LiveNodeBuilder> {
57 LiveNodeBuilder::new(name, trader_id, environment)
58 }
59
60 pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
70 let mut config = config.unwrap_or_default();
71 config.environment = Environment::Live;
72
73 match config.environment() {
75 Environment::Sandbox | Environment::Live => {}
76 Environment::Backtest => {
77 anyhow::bail!("LiveNode cannot be used with Backtest environment");
78 }
79 }
80
81 let clock = Rc::new(RefCell::new(LiveClock::new()));
82 let kernel = PoseiKernel::new(name, config.clone())?;
83 let runner = AsyncRunner::new(clock.clone());
84
85 log::info!("LiveNode built successfully with kernel config");
86
87 Ok(Self {
88 clock,
89 kernel,
90 runner,
91 config,
92 is_running: false,
93 })
94 }
95
96 pub async fn start(&mut self) -> anyhow::Result<()> {
102 if self.is_running {
103 anyhow::bail!("LiveNode is already running");
104 }
105
106 log::info!("Starting live node");
107 self.kernel.start();
108 self.is_running = true;
109
110 log::info!("LiveNode started successfully");
111 Ok(())
112 }
113
114 pub async fn stop(&mut self) -> anyhow::Result<()> {
120 if !self.is_running {
121 anyhow::bail!("LiveNode is not running");
122 }
123
124 log::info!("Stopping live node");
125 self.kernel.stop();
126 self.is_running = false;
127
128 log::info!("LiveNode stopped successfully");
129 Ok(())
130 }
131
132 pub async fn run_async(&mut self) -> anyhow::Result<()> {
141 self.start().await?;
142
143 let sigint = tokio::signal::ctrl_c();
145
146 tokio::select! {
147 _ = sigint => {
148 log::info!("Received SIGINT, shutting down...");
149 }
150 }
151
152 self.stop().await?;
153 Ok(())
154 }
155
156 #[must_use]
158 pub fn environment(&self) -> Environment {
159 self.kernel.environment()
160 }
161
162 #[must_use]
164 pub const fn kernel(&self) -> &PoseiKernel {
165 &self.kernel
166 }
167
168 #[must_use]
170 pub fn trader_id(&self) -> TraderId {
171 self.kernel.trader_id()
172 }
173
174 #[must_use]
176 pub const fn instance_id(&self) -> UUID4 {
177 self.kernel.instance_id()
178 }
179
180 #[must_use]
182 pub const fn is_running(&self) -> bool {
183 self.is_running
184 }
185
186 pub fn add_actor(&mut self, actor: Box<dyn Component>) -> anyhow::Result<()> {
199 if self.is_running {
200 anyhow::bail!(
201 "Cannot add actor while node is running. Add actors before calling start()."
202 );
203 }
204
205 self.kernel.trader.add_actor(actor)
206 }
207}
208
209#[derive(Debug)]
214pub struct LiveNodeBuilder {
215 config: LiveNodeConfig,
216 data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
217 exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
218 data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
219 exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
220}
221
222impl LiveNodeBuilder {
223 pub fn new(
229 name: String,
230 trader_id: TraderId,
231 environment: Environment,
232 ) -> anyhow::Result<Self> {
233 match environment {
234 Environment::Sandbox | Environment::Live => {}
235 Environment::Backtest => {
236 anyhow::bail!("LiveNode cannot be used with Backtest environment");
237 }
238 }
239
240 let config = LiveNodeConfig {
241 environment,
242 trader_id,
243 ..Default::default()
244 };
245
246 Ok(Self {
247 config,
248 data_client_factories: HashMap::new(),
249 exec_client_factories: HashMap::new(),
250 data_client_configs: HashMap::new(),
251 exec_client_configs: HashMap::new(),
252 })
253 }
254
255 #[must_use]
257 pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
258 self.config.instance_id = Some(instance_id);
259 self
260 }
261
262 #[must_use]
264 pub const fn with_load_state(mut self, load_state: bool) -> Self {
265 self.config.load_state = load_state;
266 self
267 }
268
269 #[must_use]
271 pub const fn with_save_state(mut self, save_state: bool) -> Self {
272 self.config.save_state = save_state;
273 self
274 }
275
276 #[must_use]
278 pub const fn with_timeout_connection(mut self, timeout: u32) -> Self {
279 self.config.timeout_connection = timeout;
280 self
281 }
282
283 #[must_use]
285 pub const fn with_timeout_reconciliation(mut self, timeout: u32) -> Self {
286 self.config.timeout_reconciliation = timeout;
287 self
288 }
289
290 #[must_use]
292 pub const fn with_timeout_portfolio(mut self, timeout: u32) -> Self {
293 self.config.timeout_portfolio = timeout;
294 self
295 }
296
297 #[must_use]
299 pub const fn with_timeout_disconnection(mut self, timeout: u32) -> Self {
300 self.config.timeout_disconnection = timeout;
301 self
302 }
303
304 #[must_use]
306 pub const fn with_timeout_post_stop(mut self, timeout: u32) -> Self {
307 self.config.timeout_post_stop = timeout;
308 self
309 }
310
311 #[must_use]
313 pub const fn with_timeout_shutdown(mut self, timeout: u32) -> Self {
314 self.config.timeout_shutdown = timeout;
315 self
316 }
317
318 pub fn add_data_client(
324 mut self,
325 name: Option<String>,
326 factory: Box<dyn DataClientFactory>,
327 config: Box<dyn ClientConfig>,
328 ) -> anyhow::Result<Self> {
329 let name = name.unwrap_or_else(|| factory.name().to_string());
330
331 if self.data_client_factories.contains_key(&name) {
332 anyhow::bail!("Data client '{name}' is already registered");
333 }
334
335 self.data_client_factories.insert(name.clone(), factory);
336 self.data_client_configs.insert(name, config);
337 Ok(self)
338 }
339
340 pub fn add_exec_client(
346 mut self,
347 name: Option<String>,
348 factory: Box<dyn ExecutionClientFactory>,
349 config: Box<dyn ClientConfig>,
350 ) -> anyhow::Result<Self> {
351 let name = name.unwrap_or_else(|| factory.name().to_string());
352
353 if self.exec_client_factories.contains_key(&name) {
354 anyhow::bail!("Execution client '{name}' is already registered");
355 }
356
357 self.exec_client_factories.insert(name.clone(), factory);
358 self.exec_client_configs.insert(name, config);
359 Ok(self)
360 }
361
362 pub fn build(self) -> anyhow::Result<LiveNode> {
373 log::info!("LiveNode built successfully");
380
381 let clock = Rc::new(RefCell::new(LiveClock::new()));
383 let kernel = PoseiKernel::new("LiveNode".to_string(), self.config.clone())?;
384 let runner = AsyncRunner::new(clock.clone());
385
386 Ok(LiveNode {
387 clock,
388 kernel,
389 runner,
390 config: self.config,
391 is_running: false,
392 })
393 }
394}
395
396#[cfg(test)]
401mod tests {
402 use nautilus_model::identifiers::TraderId;
403 use rstest::*;
404
405 use super::*;
406
407 #[rstest]
408 fn test_trading_node_builder_creation() {
409 let result = LiveNode::builder(
410 "TestNode".to_string(),
411 TraderId::from("TRADER-001"),
412 Environment::Sandbox,
413 );
414
415 assert!(result.is_ok());
416 }
417
418 #[rstest]
419 fn test_trading_node_builder_rejects_backtest() {
420 let result = LiveNode::builder(
421 "TestNode".to_string(),
422 TraderId::from("TRADER-001"),
423 Environment::Backtest,
424 );
425
426 assert!(result.is_err());
427 assert!(
428 result
429 .unwrap_err()
430 .to_string()
431 .contains("Backtest environment")
432 );
433 }
434
435 #[rstest]
436 fn test_trading_node_builder_fluent_api() {
437 let result = LiveNode::builder(
438 "TestNode".to_string(),
439 TraderId::from("TRADER-001"),
440 Environment::Live,
441 );
442
443 assert!(result.is_ok());
444 let _builder = result
445 .unwrap()
446 .with_timeout_connection(30)
447 .with_load_state(false);
448
449 }
451
452 #[rstest]
453 fn test_trading_node_build() {
454 #[cfg(feature = "python")]
455 pyo3::prepare_freethreaded_python();
456
457 let builder_result = LiveNode::builder(
458 "TestNode".to_string(),
459 TraderId::from("TRADER-001"),
460 Environment::Sandbox,
461 );
462
463 assert!(builder_result.is_ok());
464 let build_result = builder_result.unwrap().build();
465
466 assert!(build_result.is_ok());
467 let node = build_result.unwrap();
468 assert!(!node.is_running());
469 assert_eq!(node.environment(), Environment::Sandbox);
470 }
471
472 #[rstest]
473 fn test_builder_rejects_backtest_environment() {
474 let result = LiveNode::builder(
475 "TestNode".to_string(),
476 TraderId::from("TRADER-001"),
477 Environment::Backtest,
478 );
479
480 assert!(result.is_err());
481 assert!(
482 result
483 .unwrap_err()
484 .to_string()
485 .contains("Backtest environment")
486 );
487 }
488}