1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, rc::Rc};
21
22use futures::future::join_all;
23use nautilus_common::{
24 cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
25 clock::{Clock, LiveClock, TestClock},
26 component::Component,
27 enums::Environment,
28 logging::{
29 headers, init_logging, init_tracing,
30 logger::{LogGuard, LoggerConfig},
31 writer::FileWriterConfig,
32 },
33 msgbus::{MessageBus, get_message_bus, set_message_bus},
34 runtime::get_runtime,
35};
36use nautilus_core::{UUID4, UnixNanos};
37use posei_trader::engine::DataEngine;
38use nautilus_execution::engine::ExecutionEngine;
39use nautilus_model::identifiers::TraderId;
40use nautilus_portfolio::portfolio::Portfolio;
41use nautilus_risk::engine::RiskEngine;
42use nautilus_trading::trader::Trader;
43use ustr::Ustr;
44
45use crate::{builder::PoseiKernelBuilder, config::PoseiKernelConfig};
46
47#[derive(Debug)]
51pub struct PoseiKernel {
52 pub name: String,
54 pub instance_id: UUID4,
56 pub machine_id: String,
58 pub config: Box<dyn PoseiKernelConfig>,
60 pub cache: Rc<RefCell<Cache>>,
62 pub clock: Rc<RefCell<dyn Clock>>,
64 pub portfolio: Portfolio,
66 pub log_guard: LogGuard,
68 pub data_engine: DataEngine,
70 pub risk_engine: RiskEngine,
72 pub exec_engine: ExecutionEngine,
74 pub trader: Trader,
76 pub ts_created: UnixNanos,
78 pub ts_started: Option<UnixNanos>,
80 pub ts_shutdown: Option<UnixNanos>,
82}
83
84impl PoseiKernel {
85 #[must_use]
87 pub const fn builder(
88 name: String,
89 trader_id: TraderId,
90 environment: nautilus_common::enums::Environment,
91 ) -> PoseiKernelBuilder {
92 PoseiKernelBuilder::new(name, trader_id, environment)
93 }
94
95 pub fn new<T: PoseiKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
101 let instance_id = config.instance_id().unwrap_or_default();
102 let machine_id = Self::determine_machine_id()?;
103
104 let logger_config = config.logging();
105 let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
106 headers::log_header(
107 config.trader_id(),
108 &machine_id,
109 instance_id,
110 Ustr::from(stringify!(LiveNode)),
111 );
112
113 log::info!("Building system kernel");
114
115 let clock = Self::initialize_clock(&config.environment());
116 let cache = Self::initialize_cache(config.cache());
117
118 let msgbus = Rc::new(RefCell::new(MessageBus::new(
119 config.trader_id(),
120 instance_id,
121 Some(name.to_string()),
122 None,
123 )));
124 set_message_bus(msgbus);
125
126 let portfolio = Portfolio::new(cache.clone(), clock.clone(), config.portfolio());
127 let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
128 let risk_engine = RiskEngine::new(
129 config.risk_engine().unwrap_or_default(),
130 Portfolio::new(cache.clone(), clock.clone(), config.portfolio()),
131 clock.clone(),
132 cache.clone(),
133 );
134 let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
135
136 let trader = Trader::new(
137 config.trader_id(),
138 instance_id,
139 config.environment(),
140 clock.clone(),
141 );
142
143 let ts_created = clock.borrow().timestamp_ns();
144
145 Ok(Self {
146 name,
147 instance_id,
148 machine_id,
149 config: Box::new(config),
150 cache,
151 clock,
152 portfolio,
153 log_guard,
154 data_engine,
155 risk_engine,
156 exec_engine,
157 trader,
158 ts_created,
159 ts_started: None,
160 ts_shutdown: None,
161 })
162 }
163
164 fn determine_machine_id() -> anyhow::Result<String> {
165 Ok(hostname::get()?.to_string_lossy().into_owned())
166 }
167
168 fn initialize_logging(
169 trader_id: TraderId,
170 instance_id: UUID4,
171 config: LoggerConfig,
172 ) -> anyhow::Result<LogGuard> {
173 init_tracing()?;
174
175 let log_guard = init_logging(
176 trader_id,
177 instance_id,
178 config,
179 FileWriterConfig::default(), )?;
181
182 Ok(log_guard)
183 }
184
185 fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
186 match environment {
187 Environment::Backtest => {
188 let test_clock = TestClock::new();
189 Rc::new(RefCell::new(test_clock))
190 }
191 Environment::Live | Environment::Sandbox => {
192 let live_clock = LiveClock::new();
193 Rc::new(RefCell::new(live_clock))
194 }
195 }
196 }
197
198 fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
199 let cache_config = cache_config.unwrap_or_default();
200
201 let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
203 let cache = Cache::new(Some(cache_config), cache_database);
204
205 Rc::new(RefCell::new(cache))
206 }
207
208 fn cancel_timers(&self) {
209 self.clock.borrow_mut().cancel_timers();
210 }
211
212 #[must_use]
213 pub fn generate_timestamp_ns(&self) -> UnixNanos {
214 self.clock.borrow().timestamp_ns()
215 }
216
217 #[must_use]
219 pub fn environment(&self) -> Environment {
220 self.config.environment()
221 }
222
223 #[must_use]
225 pub const fn name(&self) -> &str {
226 self.name.as_str()
227 }
228
229 #[must_use]
231 pub fn trader_id(&self) -> TraderId {
232 self.config.trader_id()
233 }
234
235 #[must_use]
237 pub fn machine_id(&self) -> &str {
238 &self.machine_id
239 }
240
241 #[must_use]
243 pub const fn instance_id(&self) -> UUID4 {
244 self.instance_id
245 }
246
247 #[must_use]
249 pub const fn ts_created(&self) -> UnixNanos {
250 self.ts_created
251 }
252
253 #[must_use]
255 pub const fn ts_started(&self) -> Option<UnixNanos> {
256 self.ts_started
257 }
258
259 #[must_use]
261 pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
262 self.ts_shutdown
263 }
264
265 #[must_use]
267 pub fn load_state(&self) -> bool {
268 self.config.load_state()
269 }
270
271 #[must_use]
273 pub fn save_state(&self) -> bool {
274 self.config.save_state()
275 }
276
277 #[must_use]
279 pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
280 self.clock.clone()
281 }
282
283 #[must_use]
285 pub fn cache(&self) -> Rc<RefCell<Cache>> {
286 self.cache.clone()
287 }
288
289 #[must_use]
291 pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
292 get_message_bus()
293 }
294
295 #[must_use]
297 pub const fn portfolio(&self) -> &Portfolio {
298 &self.portfolio
299 }
300
301 #[must_use]
303 pub const fn data_engine(&self) -> &DataEngine {
304 &self.data_engine
305 }
306
307 #[must_use]
309 pub const fn risk_engine(&self) -> &RiskEngine {
310 &self.risk_engine
311 }
312
313 #[must_use]
315 pub const fn exec_engine(&self) -> &ExecutionEngine {
316 &self.exec_engine
317 }
318
319 #[must_use]
321 pub const fn trader(&self) -> &Trader {
322 &self.trader
323 }
324
325 pub fn start(&mut self) {
327 self.start_engines();
328
329 if let Err(e) = self.trader.initialize() {
331 log::error!("Error initializing trader: {e:?}");
332 return;
333 }
334
335 if let Err(e) = self.trader.start() {
337 log::error!("Error starting trader: {e:?}");
338 }
339
340 tokio::task::block_in_place(|| {
342 get_runtime().block_on(async {
343 if let Err(e) = self.connect_clients().await {
344 log::error!("Error connecting clients: {e:?}");
345 }
346 });
347 });
348
349 self.ts_started = Some(self.clock.borrow().timestamp_ns());
350 log::info!("Posei system kernel started");
351 }
352
353 pub fn stop(&mut self) {
355 log::info!("Stopping Posei system kernel");
356
357 if let Err(e) = self.trader.stop() {
359 log::error!("Error stopping trader: {e:?}");
360 }
361
362 tokio::task::block_in_place(|| {
364 get_runtime().block_on(async {
365 if let Err(e) = self.disconnect_clients().await {
366 log::error!("Error disconnecting clients: {e:?}");
367 }
368 });
369 });
370
371 self.stop_engines();
372 self.cancel_timers();
373
374 self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
375 log::info!("Posei system kernel stopped");
376 }
377
378 pub fn reset(&mut self) {
380 if let Err(e) = self.trader.reset() {
381 log::error!("Error resetting trader: {e:?}");
382 }
383
384 self.data_engine.reset();
386 self.ts_started = None;
389 self.ts_shutdown = None;
390
391 log::info!("Posei system kernel reset");
392 }
393
394 pub fn dispose(&mut self) {
396 if let Err(e) = self.trader.dispose() {
397 log::error!("Error disposing trader: {e:?}");
398 }
399
400 self.stop_engines();
401
402 self.data_engine.dispose();
403 log::info!("Posei system kernel disposed");
406 }
407
408 const fn cancel_all_tasks(&self) {
412 }
414
415 fn start_engines(&self) {
417 self.data_engine.start();
418 }
420
421 fn stop_engines(&self) {
423 self.data_engine.stop();
424 }
426
427 async fn connect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
429 let data_adapters = self.data_engine.get_clients_mut();
430 let mut futures = Vec::with_capacity(data_adapters.len());
431
432 for adapter in data_adapters {
433 futures.push(adapter.get_client().connect());
434 }
435
436 let results = join_all(futures).await;
437 let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
438
439 if errors.is_empty() {
440 Ok(())
441 } else {
442 Err(errors)
443 }
444 }
445
446 async fn disconnect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
448 let data_adapters = self.data_engine.get_clients_mut();
449 let mut futures = Vec::with_capacity(data_adapters.len());
450
451 for adapter in data_adapters {
452 futures.push(adapter.get_client().disconnect());
453 }
454
455 let results = join_all(futures).await;
456 let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
457
458 if errors.is_empty() {
459 Ok(())
460 } else {
461 Err(errors)
462 }
463 }
464
465 fn stop_clients(&self) {
467 self.data_engine.stop();
468 }
469
470 const fn initialize_portfolio(&self) {
472 }
474
475 const fn await_engines_connected(&self) {
479 }
481
482 const fn await_execution_reconciliation(&self) {
486 }
488
489 const fn await_portfolio_initialized(&self) {
493 }
495
496 const fn await_trader_residuals(&self) {
500 }
502
503 const fn check_engines_connected(&self) {
505 }
507
508 const fn check_engines_disconnected(&self) {
510 }
512
513 const fn check_portfolio_initialized(&self) {
515 }
517
518 const fn flush_writer(&self) {
520 }
522}