1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, rc::Rc};
21
22use nautilus_common::{
23 actor::{Actor, DataActor},
24 clock::LiveClock,
25 component::Component,
26 enums::Environment,
27};
28use nautilus_core::UUID4;
29use posei_trader::client::DataClientAdapter;
30use nautilus_model::identifiers::TraderId;
31use nautilus_system::{
32 config::PoseiKernelConfig,
33 factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
34 kernel::PoseiKernel,
35};
36
37use crate::{config::LiveNodeConfig, runner::AsyncRunner};
38
39#[derive(Debug)]
44pub struct LiveNode {
45 clock: Rc<RefCell<LiveClock>>,
46 kernel: PoseiKernel,
47 runner: AsyncRunner,
48 config: LiveNodeConfig,
49 is_running: bool,
50}
51
52impl LiveNode {
53 pub fn builder(
59 name: String,
60 trader_id: TraderId,
61 environment: Environment,
62 ) -> anyhow::Result<LiveNodeBuilder> {
63 LiveNodeBuilder::new(name, trader_id, environment)
64 }
65
66 pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
76 let mut config = config.unwrap_or_default();
77 config.environment = Environment::Live;
78
79 match config.environment() {
81 Environment::Sandbox | Environment::Live => {}
82 Environment::Backtest => {
83 anyhow::bail!("LiveNode cannot be used with Backtest environment");
84 }
85 }
86
87 let runner = AsyncRunner::new();
88 let clock = Rc::new(RefCell::new(LiveClock::default()));
89 let kernel = PoseiKernel::new(name, config.clone())?;
90
91 log::info!("LiveNode built successfully with kernel config");
92
93 Ok(Self {
94 clock,
95 kernel,
96 runner,
97 config,
98 is_running: false,
99 })
100 }
101
102 pub async fn start(&mut self) -> anyhow::Result<()> {
108 if self.is_running {
109 anyhow::bail!("LiveNode is already running");
110 }
111
112 log::info!("Starting LiveNode");
113
114 self.kernel.start_async().await;
115 self.is_running = true;
116
117 log::info!("LiveNode started successfully");
118 Ok(())
119 }
120
121 pub async fn stop(&mut self) -> anyhow::Result<()> {
127 if !self.is_running {
128 anyhow::bail!("LiveNode is not running");
129 }
130
131 log::info!("Stopping LiveNode");
132
133 self.kernel.stop_async().await;
134 self.is_running = false;
135
136 log::info!("LiveNode stopped successfully");
137 Ok(())
138 }
139
140 pub async fn run(&mut self) -> anyhow::Result<()> {
149 self.start().await?;
150
151 tokio::select! {
152 () = self.runner.run() => {
154 log::info!("AsyncRunner finished");
155 }
156 result = tokio::signal::ctrl_c() => {
158 match result {
159 Ok(()) => {
160 log::info!("Received SIGINT, shutting down");
161 self.runner.stop();
162 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
164 }
165 Err(e) => {
166 log::error!("Failed to listen for SIGINT: {e}");
167 }
168 }
169 }
170 }
171
172 log::debug!("AsyncRunner and signal handling finished"); self.stop().await?;
175 Ok(())
176 }
177
178 #[must_use]
180 pub fn environment(&self) -> Environment {
181 self.kernel.environment()
182 }
183
184 #[must_use]
186 pub const fn kernel(&self) -> &PoseiKernel {
187 &self.kernel
188 }
189
190 #[must_use]
192 pub fn trader_id(&self) -> TraderId {
193 self.kernel.trader_id()
194 }
195
196 #[must_use]
198 pub const fn instance_id(&self) -> UUID4 {
199 self.kernel.instance_id()
200 }
201
202 #[must_use]
204 pub const fn is_running(&self) -> bool {
205 self.is_running
206 }
207
208 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
221 where
222 T: DataActor + Component + Actor + 'static,
223 {
224 if self.is_running {
225 anyhow::bail!(
226 "Cannot add actor while node is running. Add actors before calling start()."
227 );
228 }
229
230 self.kernel.trader.add_actor(actor)
231 }
232}
233
234#[derive(Debug)]
239pub struct LiveNodeBuilder {
240 config: LiveNodeConfig,
241 data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
242 exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
243 data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
244 exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
245}
246
247impl LiveNodeBuilder {
248 pub fn new(
254 name: String,
255 trader_id: TraderId,
256 environment: Environment,
257 ) -> anyhow::Result<Self> {
258 match environment {
259 Environment::Sandbox | Environment::Live => {}
260 Environment::Backtest => {
261 anyhow::bail!("LiveNode cannot be used with Backtest environment");
262 }
263 }
264
265 let config = LiveNodeConfig {
266 environment,
267 trader_id,
268 ..Default::default()
269 };
270
271 Ok(Self {
272 config,
273 data_client_factories: HashMap::new(),
274 exec_client_factories: HashMap::new(),
275 data_client_configs: HashMap::new(),
276 exec_client_configs: HashMap::new(),
277 })
278 }
279
280 #[must_use]
282 pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
283 self.config.instance_id = Some(instance_id);
284 self
285 }
286
287 #[must_use]
289 pub const fn with_load_state(mut self, load_state: bool) -> Self {
290 self.config.load_state = load_state;
291 self
292 }
293
294 #[must_use]
296 pub const fn with_save_state(mut self, save_state: bool) -> Self {
297 self.config.save_state = save_state;
298 self
299 }
300
301 #[must_use]
303 pub const fn with_timeout_connection(mut self, timeout: u32) -> Self {
304 self.config.timeout_connection = timeout;
305 self
306 }
307
308 #[must_use]
310 pub const fn with_timeout_reconciliation(mut self, timeout: u32) -> Self {
311 self.config.timeout_reconciliation = timeout;
312 self
313 }
314
315 #[must_use]
317 pub const fn with_timeout_portfolio(mut self, timeout: u32) -> Self {
318 self.config.timeout_portfolio = timeout;
319 self
320 }
321
322 #[must_use]
324 pub const fn with_timeout_disconnection(mut self, timeout: u32) -> Self {
325 self.config.timeout_disconnection = timeout;
326 self
327 }
328
329 #[must_use]
331 pub const fn with_timeout_post_stop(mut self, timeout: u32) -> Self {
332 self.config.timeout_post_stop = timeout;
333 self
334 }
335
336 #[must_use]
338 pub const fn with_timeout_shutdown(mut self, timeout: u32) -> Self {
339 self.config.timeout_shutdown = timeout;
340 self
341 }
342
343 pub fn add_data_client<F, C>(
349 mut self,
350 name: Option<String>,
351 factory: F,
352 config: C,
353 ) -> anyhow::Result<Self>
354 where
355 F: DataClientFactory + 'static,
356 C: ClientConfig + 'static,
357 {
358 let name = name.unwrap_or_else(|| factory.name().to_string());
359
360 if self.data_client_factories.contains_key(&name) {
361 anyhow::bail!("Data client '{name}' is already registered");
362 }
363
364 self.data_client_factories
365 .insert(name.clone(), Box::new(factory));
366 self.data_client_configs.insert(name, Box::new(config));
367 Ok(self)
368 }
369
370 pub fn add_exec_client<F, C>(
376 mut self,
377 name: Option<String>,
378 factory: F,
379 config: C,
380 ) -> anyhow::Result<Self>
381 where
382 F: ExecutionClientFactory + 'static,
383 C: ClientConfig + 'static,
384 {
385 let name = name.unwrap_or_else(|| factory.name().to_string());
386
387 if self.exec_client_factories.contains_key(&name) {
388 anyhow::bail!("Execution client '{name}' is already registered");
389 }
390
391 self.exec_client_factories
392 .insert(name.clone(), Box::new(factory));
393 self.exec_client_configs.insert(name, Box::new(config));
394 Ok(self)
395 }
396
397 pub fn build(mut self) -> anyhow::Result<LiveNode> {
408 log::info!(
409 "Building LiveNode with {} data clients",
410 self.data_client_factories.len()
411 );
412
413 let runner = AsyncRunner::new();
414 let clock = Rc::new(RefCell::new(LiveClock::default()));
415 let kernel = PoseiKernel::new("LiveNode".to_string(), self.config.clone())?;
416
417 for (name, factory) in self.data_client_factories {
419 if let Some(config) = self.data_client_configs.remove(&name) {
420 log::info!("Creating data client '{name}'");
421
422 let client =
423 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
424
425 log::info!("Registering data client '{name}' with data engine");
426
427 let client_id = client.client_id();
428 let venue = client.venue();
429 let adapter = DataClientAdapter::new(
430 client_id, venue, true, true, client,
433 );
434
435 kernel
436 .data_engine
437 .borrow_mut()
438 .register_client(adapter, venue);
439
440 log::info!("Successfully registered data client '{name}' ({client_id})");
441 } else {
442 log::warn!("No config found for data client factory '{name}'");
443 }
444 }
445
446 for (name, factory) in self.exec_client_factories {
448 if let Some(config) = self.exec_client_configs.remove(&name) {
449 log::info!("Creating execution client '{name}'");
450
451 let client =
452 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
453
454 log::info!("Registering execution client '{name}' with execution engine");
455
456 } else {
459 log::warn!("No config found for execution client factory '{name}'");
460 }
461 }
462
463 log::info!("LiveNode built successfully");
464
465 Ok(LiveNode {
466 clock,
467 kernel,
468 runner,
469 config: self.config,
470 is_running: false,
471 })
472 }
473}
474
475#[cfg(test)]
480mod tests {
481 use nautilus_model::identifiers::TraderId;
482 use rstest::*;
483
484 use super::*;
485
486 #[rstest]
487 fn test_trading_node_builder_creation() {
488 let result = LiveNode::builder(
489 "TestNode".to_string(),
490 TraderId::from("TRADER-001"),
491 Environment::Sandbox,
492 );
493
494 assert!(result.is_ok());
495 }
496
497 #[rstest]
498 fn test_trading_node_builder_rejects_backtest() {
499 let result = LiveNode::builder(
500 "TestNode".to_string(),
501 TraderId::from("TRADER-001"),
502 Environment::Backtest,
503 );
504
505 assert!(result.is_err());
506 assert!(
507 result
508 .unwrap_err()
509 .to_string()
510 .contains("Backtest environment")
511 );
512 }
513
514 #[rstest]
515 fn test_trading_node_builder_fluent_api() {
516 let result = LiveNode::builder(
517 "TestNode".to_string(),
518 TraderId::from("TRADER-001"),
519 Environment::Live,
520 );
521
522 assert!(result.is_ok());
523 let _builder = result
524 .unwrap()
525 .with_timeout_connection(30)
526 .with_load_state(false);
527
528 }
530
531 #[rstest]
532 fn test_trading_node_build() {
533 #[cfg(feature = "python")]
534 pyo3::prepare_freethreaded_python();
535
536 let builder_result = LiveNode::builder(
537 "TestNode".to_string(),
538 TraderId::from("TRADER-001"),
539 Environment::Sandbox,
540 );
541
542 assert!(builder_result.is_ok());
543 let build_result = builder_result.unwrap().build();
544
545 assert!(build_result.is_ok());
546 let node = build_result.unwrap();
547 assert!(!node.is_running());
548 assert_eq!(node.environment(), Environment::Sandbox);
549 }
550
551 #[rstest]
552 fn test_builder_rejects_backtest_environment() {
553 let result = LiveNode::builder(
554 "TestNode".to_string(),
555 TraderId::from("TRADER-001"),
556 Environment::Backtest,
557 );
558
559 assert!(result.is_err());
560 assert!(
561 result
562 .unwrap_err()
563 .to_string()
564 .contains("Backtest environment")
565 );
566 }
567}