1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::{Ref, RefCell},
23 rc::Rc,
24};
25
26use futures::future::join_all;
27use nautilus_common::{
28 cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
29 clock::{Clock, LiveClock, TestClock},
30 component::Component,
31 enums::Environment,
32 logging::{
33 headers, init_logging, init_tracing,
34 logger::{LogGuard, LoggerConfig},
35 writer::FileWriterConfig,
36 },
37 messages::{DataResponse, data::DataCommand},
38 msgbus::{
39 self, MessageBus, get_message_bus,
40 handler::{ShareableMessageHandler, TypedMessageHandler},
41 set_message_bus,
42 switchboard::MessagingSwitchboard,
43 },
44 runner::get_data_cmd_sender,
45};
46use nautilus_core::{UUID4, UnixNanos};
47use posei_trader::engine::DataEngine;
48use nautilus_execution::engine::ExecutionEngine;
49use nautilus_model::identifiers::TraderId;
50use nautilus_portfolio::portfolio::Portfolio;
51use nautilus_risk::engine::RiskEngine;
52use ustr::Ustr;
53
54use crate::{builder::PoseiKernelBuilder, config::PoseiKernelConfig, trader::Trader};
55
56#[derive(Debug)]
60pub struct PoseiKernel {
61 pub name: String,
63 pub instance_id: UUID4,
65 pub machine_id: String,
67 pub config: Box<dyn PoseiKernelConfig>,
69 pub cache: Rc<RefCell<Cache>>,
71 pub clock: Rc<RefCell<dyn Clock>>,
73 pub portfolio: Portfolio,
75 pub log_guard: LogGuard,
77 pub data_engine: Rc<RefCell<DataEngine>>,
79 pub risk_engine: RiskEngine,
81 pub exec_engine: ExecutionEngine,
83 pub trader: Trader,
85 pub ts_created: UnixNanos,
87 pub ts_started: Option<UnixNanos>,
89 pub ts_shutdown: Option<UnixNanos>,
91}
92
93impl PoseiKernel {
94 #[must_use]
96 pub const fn builder(
97 name: String,
98 trader_id: TraderId,
99 environment: nautilus_common::enums::Environment,
100 ) -> PoseiKernelBuilder {
101 PoseiKernelBuilder::new(name, trader_id, environment)
102 }
103
104 pub fn new<T: PoseiKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
110 let instance_id = config.instance_id().unwrap_or_default();
111 let machine_id = Self::determine_machine_id()?;
112
113 let logger_config = config.logging();
114 let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
115 headers::log_header(
116 config.trader_id(),
117 &machine_id,
118 instance_id,
119 Ustr::from(stringify!(LiveNode)),
120 );
121
122 log::info!("Building system kernel");
123
124 let clock = Self::initialize_clock(&config.environment());
125 let cache = Self::initialize_cache(config.cache());
126
127 let msgbus = Rc::new(RefCell::new(MessageBus::new(
128 config.trader_id(),
129 instance_id,
130 Some(name.to_string()),
131 None,
132 )));
133 set_message_bus(msgbus);
134
135 let portfolio = Portfolio::new(cache.clone(), clock.clone(), config.portfolio());
136 let risk_engine = RiskEngine::new(
137 config.risk_engine().unwrap_or_default(),
138 Portfolio::new(cache.clone(), clock.clone(), config.portfolio()),
139 clock.clone(),
140 cache.clone(),
141 );
142 let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
143
144 let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
145 let data_engine = Rc::new(RefCell::new(data_engine));
146
147 let data_engine_ref = data_engine.clone();
149 let endpoint = MessagingSwitchboard::data_engine_execute();
150 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
151 move |cmd: &DataCommand| data_engine_ref.borrow_mut().execute(cmd),
152 )));
153 msgbus::register(endpoint, handler);
154
155 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
157 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
158 move |cmd: &DataCommand| get_data_cmd_sender().clone().execute(cmd.clone()), )));
160 msgbus::register(endpoint, handler);
161
162 let data_engine_ref = data_engine.clone();
164 let endpoint = MessagingSwitchboard::data_engine_process();
165 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
167 move |data: &dyn Any| {
168 data_engine_ref.borrow_mut().process(data);
169 },
170 )));
171 msgbus::register(endpoint, handler);
172
173 let data_engine_ref = data_engine.clone();
175 let endpoint = MessagingSwitchboard::data_engine_response();
176 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
177 move |resp: &DataResponse| data_engine_ref.borrow_mut().response(resp.clone()),
178 )));
179 msgbus::register(endpoint, handler);
180
181 let trader = Trader::new(
182 config.trader_id(),
183 instance_id,
184 config.environment(),
185 clock.clone(),
186 cache.clone(),
187 );
188
189 let ts_created = clock.borrow().timestamp_ns();
190
191 Ok(Self {
192 name,
193 instance_id,
194 machine_id,
195 config: Box::new(config),
196 cache,
197 clock,
198 portfolio,
199 log_guard,
200 data_engine,
201 risk_engine,
202 exec_engine,
203 trader,
204 ts_created,
205 ts_started: None,
206 ts_shutdown: None,
207 })
208 }
209
210 fn determine_machine_id() -> anyhow::Result<String> {
211 Ok(hostname::get()?.to_string_lossy().into_owned())
212 }
213
214 fn initialize_logging(
215 trader_id: TraderId,
216 instance_id: UUID4,
217 config: LoggerConfig,
218 ) -> anyhow::Result<LogGuard> {
219 init_tracing()?;
220
221 let log_guard = init_logging(
222 trader_id,
223 instance_id,
224 config,
225 FileWriterConfig::default(), )?;
227
228 Ok(log_guard)
229 }
230
231 fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
232 match environment {
233 Environment::Backtest => {
234 let test_clock = TestClock::new();
235 Rc::new(RefCell::new(test_clock))
236 }
237 Environment::Live | Environment::Sandbox => {
238 let live_clock = LiveClock::default();
239 Rc::new(RefCell::new(live_clock))
240 }
241 }
242 }
243
244 fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
245 let cache_config = cache_config.unwrap_or_default();
246
247 let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
249 let cache = Cache::new(Some(cache_config), cache_database);
250
251 Rc::new(RefCell::new(cache))
252 }
253
254 fn cancel_timers(&self) {
255 self.clock.borrow_mut().cancel_timers();
256 }
257
258 #[must_use]
259 pub fn generate_timestamp_ns(&self) -> UnixNanos {
260 self.clock.borrow().timestamp_ns()
261 }
262
263 #[must_use]
265 pub fn environment(&self) -> Environment {
266 self.config.environment()
267 }
268
269 #[must_use]
271 pub const fn name(&self) -> &str {
272 self.name.as_str()
273 }
274
275 #[must_use]
277 pub fn trader_id(&self) -> TraderId {
278 self.config.trader_id()
279 }
280
281 #[must_use]
283 pub fn machine_id(&self) -> &str {
284 &self.machine_id
285 }
286
287 #[must_use]
289 pub const fn instance_id(&self) -> UUID4 {
290 self.instance_id
291 }
292
293 #[must_use]
295 pub const fn ts_created(&self) -> UnixNanos {
296 self.ts_created
297 }
298
299 #[must_use]
301 pub const fn ts_started(&self) -> Option<UnixNanos> {
302 self.ts_started
303 }
304
305 #[must_use]
307 pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
308 self.ts_shutdown
309 }
310
311 #[must_use]
313 pub fn load_state(&self) -> bool {
314 self.config.load_state()
315 }
316
317 #[must_use]
319 pub fn save_state(&self) -> bool {
320 self.config.save_state()
321 }
322
323 #[must_use]
325 pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
326 self.clock.clone()
327 }
328
329 #[must_use]
331 pub fn cache(&self) -> Rc<RefCell<Cache>> {
332 self.cache.clone()
333 }
334
335 #[must_use]
337 pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
338 get_message_bus()
339 }
340
341 #[must_use]
343 pub const fn portfolio(&self) -> &Portfolio {
344 &self.portfolio
345 }
346
347 #[must_use]
349 pub fn data_engine(&self) -> Ref<'_, DataEngine> {
350 self.data_engine.borrow()
351 }
352
353 #[must_use]
355 pub const fn risk_engine(&self) -> &RiskEngine {
356 &self.risk_engine
357 }
358
359 #[must_use]
361 pub const fn exec_engine(&self) -> &ExecutionEngine {
362 &self.exec_engine
363 }
364
365 #[must_use]
367 pub const fn trader(&self) -> &Trader {
368 &self.trader
369 }
370
371 pub async fn start_async(&mut self) {
373 log::info!("Starting engines...");
374 self.start_engines();
375
376 log::info!("Initializing trader...");
377 if let Err(e) = self.trader.initialize() {
378 log::error!("Error initializing trader: {e:?}");
379 return;
380 }
381
382 log::info!("Connecting clients...");
383 if let Err(e) = self.connect_clients().await {
384 log::error!("Error connecting clients: {e:?}");
385 }
386 log::info!("Clients connected");
387
388 log::info!("Starting trader...");
389 if let Err(e) = self.trader.start() {
390 log::error!("Error starting trader: {e:?}");
391 }
392 log::info!("Trader started");
393
394 self.ts_started = Some(self.clock.borrow().timestamp_ns());
395 log::info!("Posei system kernel started");
396 }
397
398 pub async fn stop_async(&mut self) {
400 log::info!("Stopping Posei system kernel");
401
402 if let Err(e) = self.trader.stop() {
404 log::error!("Error stopping trader: {e:?}");
405 }
406
407 if let Err(e) = self.disconnect_clients().await {
409 log::error!("Error disconnecting clients: {e:?}");
410 }
411
412 self.stop_engines();
413 self.cancel_timers();
414
415 self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
416 log::info!("Posei system kernel stopped");
417 }
418
419 pub fn reset(&mut self) {
421 if let Err(e) = self.trader.reset() {
422 log::error!("Error resetting trader: {e:?}");
423 }
424
425 self.data_engine.borrow_mut().reset();
427 self.ts_started = None;
430 self.ts_shutdown = None;
431
432 log::info!("Posei system kernel reset");
433 }
434
435 pub fn dispose(&mut self) {
437 if let Err(e) = self.trader.dispose() {
438 log::error!("Error disposing trader: {e:?}");
439 }
440
441 self.stop_engines();
442
443 self.data_engine.borrow_mut().dispose();
444 log::info!("Posei system kernel disposed");
447 }
448
449 const fn cancel_all_tasks(&self) {
453 }
455
456 fn start_engines(&self) {
458 self.data_engine.borrow_mut().start();
459 }
461
462 fn stop_engines(&self) {
464 self.data_engine.borrow_mut().stop();
465 }
467
468 #[allow(clippy::await_holding_refcell_ref)]
470 async fn connect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
471 let mut data_engine = self.data_engine.borrow_mut();
472 let mut data_adapters = data_engine.get_clients_mut();
473 let mut futures = Vec::with_capacity(data_adapters.len());
474
475 for adapter in &mut data_adapters {
476 futures.push(adapter.connect());
477 }
478
479 let results = join_all(futures).await;
480 let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
481
482 if errors.is_empty() {
483 Ok(())
484 } else {
485 Err(errors)
486 }
487 }
488
489 #[allow(clippy::await_holding_refcell_ref)]
491 async fn disconnect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
492 let mut data_engine = self.data_engine.borrow_mut();
493 let mut data_adapters = data_engine.get_clients_mut();
494 let mut futures = Vec::with_capacity(data_adapters.len());
495
496 for adapter in &mut data_adapters {
497 futures.push(adapter.disconnect());
498 }
499
500 let results = join_all(futures).await;
501 let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
502
503 if errors.is_empty() {
504 Ok(())
505 } else {
506 Err(errors)
507 }
508 }
509
510 fn stop_clients(&self) {
512 self.data_engine.borrow_mut().stop();
513 }
514
515 const fn initialize_portfolio(&self) {
517 }
519
520 const fn await_engines_connected(&self) {
524 }
526
527 const fn await_execution_reconciliation(&self) {
531 }
533
534 const fn await_portfolio_initialized(&self) {
538 }
540
541 const fn await_trader_residuals(&self) {
545 }
547
548 const fn check_engines_connected(&self) {
550 }
552
553 const fn check_engines_disconnected(&self) {
555 }
557
558 const fn check_portfolio_initialized(&self) {
560 }
562
563 const fn flush_writer(&self) {
565 }
567}