nautilus_system/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    any::Any,
22    cell::{Ref, RefCell},
23    rc::Rc,
24};
25
26use futures::future::join_all;
27use nautilus_common::{
28    cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
29    clock::{Clock, LiveClock, TestClock},
30    component::Component,
31    enums::Environment,
32    logging::{
33        headers, init_logging, init_tracing,
34        logger::{LogGuard, LoggerConfig},
35        writer::FileWriterConfig,
36    },
37    messages::{DataResponse, data::DataCommand},
38    msgbus::{
39        self, MessageBus, get_message_bus,
40        handler::{ShareableMessageHandler, TypedMessageHandler},
41        set_message_bus,
42        switchboard::MessagingSwitchboard,
43    },
44    runner::get_data_cmd_sender,
45};
46use nautilus_core::{UUID4, UnixNanos};
47use posei_trader::engine::DataEngine;
48use nautilus_execution::engine::ExecutionEngine;
49use nautilus_model::identifiers::TraderId;
50use nautilus_portfolio::portfolio::Portfolio;
51use nautilus_risk::engine::RiskEngine;
52use ustr::Ustr;
53
54use crate::{builder::PoseiKernelBuilder, config::PoseiKernelConfig, trader::Trader};
55
56/// Core Posei system kernel.
57///
58/// Orchestrates data and execution engines, cache, clock, and messaging across environments.
59#[derive(Debug)]
60pub struct PoseiKernel {
61    /// The kernel name (for logging and identification).
62    pub name: String,
63    /// The unique instance identifier for this kernel.
64    pub instance_id: UUID4,
65    /// The machine identifier (hostname or similar).
66    pub machine_id: String,
67    /// The kernel configuration.
68    pub config: Box<dyn PoseiKernelConfig>,
69    /// The shared in-memory cache.
70    pub cache: Rc<RefCell<Cache>>,
71    /// The clock driving the kernel.
72    pub clock: Rc<RefCell<dyn Clock>>,
73    /// The portfolio manager.
74    pub portfolio: Portfolio,
75    /// Guard for the logging subsystem (keeps logger thread alive).
76    pub log_guard: LogGuard,
77    /// The data engine instance.
78    pub data_engine: Rc<RefCell<DataEngine>>,
79    /// The risk engine instance.
80    pub risk_engine: RiskEngine,
81    /// The execution engine instance.
82    pub exec_engine: ExecutionEngine,
83    /// The trader component.
84    pub trader: Trader,
85    /// The UNIX timestamp (nanoseconds) when the kernel was created.
86    pub ts_created: UnixNanos,
87    /// The UNIX timestamp (nanoseconds) when the kernel was last started.
88    pub ts_started: Option<UnixNanos>,
89    /// The UNIX timestamp (nanoseconds) when the kernel was last shutdown.
90    pub ts_shutdown: Option<UnixNanos>,
91}
92
93impl PoseiKernel {
94    /// Create a new [`PoseiKernelBuilder`] for fluent configuration.
95    #[must_use]
96    pub const fn builder(
97        name: String,
98        trader_id: TraderId,
99        environment: nautilus_common::enums::Environment,
100    ) -> PoseiKernelBuilder {
101        PoseiKernelBuilder::new(name, trader_id, environment)
102    }
103
104    /// Create a new [`PoseiKernel`] instance.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the kernel fails to initialize.
109    pub fn new<T: PoseiKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
110        let instance_id = config.instance_id().unwrap_or_default();
111        let machine_id = Self::determine_machine_id()?;
112
113        let logger_config = config.logging();
114        let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
115        headers::log_header(
116            config.trader_id(),
117            &machine_id,
118            instance_id,
119            Ustr::from(stringify!(LiveNode)),
120        );
121
122        log::info!("Building system kernel");
123
124        let clock = Self::initialize_clock(&config.environment());
125        let cache = Self::initialize_cache(config.cache());
126
127        let msgbus = Rc::new(RefCell::new(MessageBus::new(
128            config.trader_id(),
129            instance_id,
130            Some(name.to_string()),
131            None,
132        )));
133        set_message_bus(msgbus);
134
135        let portfolio = Portfolio::new(cache.clone(), clock.clone(), config.portfolio());
136        let risk_engine = RiskEngine::new(
137            config.risk_engine().unwrap_or_default(),
138            Portfolio::new(cache.clone(), clock.clone(), config.portfolio()),
139            clock.clone(),
140            cache.clone(),
141        );
142        let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
143
144        let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
145        let data_engine = Rc::new(RefCell::new(data_engine));
146
147        // Register DataEngine command execution
148        let data_engine_ref = data_engine.clone();
149        let endpoint = MessagingSwitchboard::data_engine_execute();
150        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
151            move |cmd: &DataCommand| data_engine_ref.borrow_mut().execute(cmd),
152        )));
153        msgbus::register(endpoint, handler);
154
155        // Register DataEngine command queueing
156        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
157        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
158            move |cmd: &DataCommand| get_data_cmd_sender().clone().execute(cmd.clone()), // TODO:
159        )));
160        msgbus::register(endpoint, handler);
161
162        // Register DataEngine process handler
163        let data_engine_ref = data_engine.clone();
164        let endpoint = MessagingSwitchboard::data_engine_process();
165        // TODO: Optimize this back to a typed handler
166        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
167            move |data: &dyn Any| {
168                data_engine_ref.borrow_mut().process(data);
169            },
170        )));
171        msgbus::register(endpoint, handler);
172
173        // Register DataEngine response handler
174        let data_engine_ref = data_engine.clone();
175        let endpoint = MessagingSwitchboard::data_engine_response();
176        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
177            move |resp: &DataResponse| data_engine_ref.borrow_mut().response(resp.clone()),
178        )));
179        msgbus::register(endpoint, handler);
180
181        let trader = Trader::new(
182            config.trader_id(),
183            instance_id,
184            config.environment(),
185            clock.clone(),
186            cache.clone(),
187        );
188
189        let ts_created = clock.borrow().timestamp_ns();
190
191        Ok(Self {
192            name,
193            instance_id,
194            machine_id,
195            config: Box::new(config),
196            cache,
197            clock,
198            portfolio,
199            log_guard,
200            data_engine,
201            risk_engine,
202            exec_engine,
203            trader,
204            ts_created,
205            ts_started: None,
206            ts_shutdown: None,
207        })
208    }
209
210    fn determine_machine_id() -> anyhow::Result<String> {
211        Ok(hostname::get()?.to_string_lossy().into_owned())
212    }
213
214    fn initialize_logging(
215        trader_id: TraderId,
216        instance_id: UUID4,
217        config: LoggerConfig,
218    ) -> anyhow::Result<LogGuard> {
219        init_tracing()?;
220
221        let log_guard = init_logging(
222            trader_id,
223            instance_id,
224            config,
225            FileWriterConfig::default(), // TODO: Properly incorporate file writer config
226        )?;
227
228        Ok(log_guard)
229    }
230
231    fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
232        match environment {
233            Environment::Backtest => {
234                let test_clock = TestClock::new();
235                Rc::new(RefCell::new(test_clock))
236            }
237            Environment::Live | Environment::Sandbox => {
238                let live_clock = LiveClock::default();
239                Rc::new(RefCell::new(live_clock))
240            }
241        }
242    }
243
244    fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
245        let cache_config = cache_config.unwrap_or_default();
246
247        // TODO: Placeholder: persistent database adapter can be initialized here (e.g., Redis)
248        let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
249        let cache = Cache::new(Some(cache_config), cache_database);
250
251        Rc::new(RefCell::new(cache))
252    }
253
254    fn cancel_timers(&self) {
255        self.clock.borrow_mut().cancel_timers();
256    }
257
258    #[must_use]
259    pub fn generate_timestamp_ns(&self) -> UnixNanos {
260        self.clock.borrow().timestamp_ns()
261    }
262
263    /// Returns the kernel's environment context (Backtest, Sandbox, Live).
264    #[must_use]
265    pub fn environment(&self) -> Environment {
266        self.config.environment()
267    }
268
269    /// Returns the kernel's name.
270    #[must_use]
271    pub const fn name(&self) -> &str {
272        self.name.as_str()
273    }
274
275    /// Returns the kernel's trader ID.
276    #[must_use]
277    pub fn trader_id(&self) -> TraderId {
278        self.config.trader_id()
279    }
280
281    /// Returns the kernel's machine ID.
282    #[must_use]
283    pub fn machine_id(&self) -> &str {
284        &self.machine_id
285    }
286
287    /// Returns the kernel's instance ID.
288    #[must_use]
289    pub const fn instance_id(&self) -> UUID4 {
290        self.instance_id
291    }
292
293    /// Returns the UNIX timestamp (ns) when the kernel was created.
294    #[must_use]
295    pub const fn ts_created(&self) -> UnixNanos {
296        self.ts_created
297    }
298
299    /// Returns the UNIX timestamp (ns) when the kernel was last started.
300    #[must_use]
301    pub const fn ts_started(&self) -> Option<UnixNanos> {
302        self.ts_started
303    }
304
305    /// Returns the UNIX timestamp (ns) when the kernel was last shutdown.
306    #[must_use]
307    pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
308        self.ts_shutdown
309    }
310
311    /// Returns whether the kernel has been configured to load state.
312    #[must_use]
313    pub fn load_state(&self) -> bool {
314        self.config.load_state()
315    }
316
317    /// Returns whether the kernel has been configured to save state.
318    #[must_use]
319    pub fn save_state(&self) -> bool {
320        self.config.save_state()
321    }
322
323    /// Returns the kernel's clock.
324    #[must_use]
325    pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
326        self.clock.clone()
327    }
328
329    /// Returns the kernel's cache.
330    #[must_use]
331    pub fn cache(&self) -> Rc<RefCell<Cache>> {
332        self.cache.clone()
333    }
334
335    /// Returns the kernel's message bus.  // TODO: TBD if this is necessary
336    #[must_use]
337    pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
338        get_message_bus()
339    }
340
341    /// Returns the kernel's portfolio.
342    #[must_use]
343    pub const fn portfolio(&self) -> &Portfolio {
344        &self.portfolio
345    }
346
347    /// Returns the kernel's data engine.
348    #[must_use]
349    pub fn data_engine(&self) -> Ref<'_, DataEngine> {
350        self.data_engine.borrow()
351    }
352
353    /// Returns the kernel's risk engine.
354    #[must_use]
355    pub const fn risk_engine(&self) -> &RiskEngine {
356        &self.risk_engine
357    }
358
359    /// Returns the kernel's execution engine.
360    #[must_use]
361    pub const fn exec_engine(&self) -> &ExecutionEngine {
362        &self.exec_engine
363    }
364
365    /// Returns the kernel's trader.
366    #[must_use]
367    pub const fn trader(&self) -> &Trader {
368        &self.trader
369    }
370
371    /// Starts the Posei system kernel.
372    pub async fn start_async(&mut self) {
373        log::info!("Starting engines...");
374        self.start_engines();
375
376        log::info!("Initializing trader...");
377        if let Err(e) = self.trader.initialize() {
378            log::error!("Error initializing trader: {e:?}");
379            return;
380        }
381
382        log::info!("Connecting clients...");
383        if let Err(e) = self.connect_clients().await {
384            log::error!("Error connecting clients: {e:?}");
385        }
386        log::info!("Clients connected");
387
388        log::info!("Starting trader...");
389        if let Err(e) = self.trader.start() {
390            log::error!("Error starting trader: {e:?}");
391        }
392        log::info!("Trader started");
393
394        self.ts_started = Some(self.clock.borrow().timestamp_ns());
395        log::info!("Posei system kernel started");
396    }
397
398    /// Stops the Posei system kernel.
399    pub async fn stop_async(&mut self) {
400        log::info!("Stopping Posei system kernel");
401
402        // Stop the trader (it will stop all registered components)
403        if let Err(e) = self.trader.stop() {
404            log::error!("Error stopping trader: {e:?}");
405        }
406
407        // Disconnect all adapter clients
408        if let Err(e) = self.disconnect_clients().await {
409            log::error!("Error disconnecting clients: {e:?}");
410        }
411
412        self.stop_engines();
413        self.cancel_timers();
414
415        self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
416        log::info!("Posei system kernel stopped");
417    }
418
419    /// Resets the Posei system kernel to its initial state.
420    pub fn reset(&mut self) {
421        if let Err(e) = self.trader.reset() {
422            log::error!("Error resetting trader: {e:?}");
423        }
424
425        // Reset engines
426        self.data_engine.borrow_mut().reset();
427        // TODO: Reset other engines when reset methods are available
428
429        self.ts_started = None;
430        self.ts_shutdown = None;
431
432        log::info!("Posei system kernel reset");
433    }
434
435    /// Disposes of the Posei system kernel, releasing resources.
436    pub fn dispose(&mut self) {
437        if let Err(e) = self.trader.dispose() {
438            log::error!("Error disposing trader: {e:?}");
439        }
440
441        self.stop_engines();
442
443        self.data_engine.borrow_mut().dispose();
444        // TODO: Implement dispose methods for other engines
445
446        log::info!("Posei system kernel disposed");
447    }
448
449    /// Cancels all tasks currently running under the kernel.
450    ///
451    /// Intended for cleanup during shutdown.
452    const fn cancel_all_tasks(&self) {
453        // TODO: implement task cancellation logic for async contexts
454    }
455
456    /// Starts all engine components.
457    fn start_engines(&self) {
458        self.data_engine.borrow_mut().start();
459        // TODO: Start other engines when methods are available
460    }
461
462    /// Stops all engine components.
463    fn stop_engines(&self) {
464        self.data_engine.borrow_mut().stop();
465        // TODO: Stop other engines when methods are available
466    }
467
468    /// Connects all engine clients.
469    #[allow(clippy::await_holding_refcell_ref)]
470    async fn connect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
471        let mut data_engine = self.data_engine.borrow_mut();
472        let mut data_adapters = data_engine.get_clients_mut();
473        let mut futures = Vec::with_capacity(data_adapters.len());
474
475        for adapter in &mut data_adapters {
476            futures.push(adapter.connect());
477        }
478
479        let results = join_all(futures).await;
480        let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
481
482        if errors.is_empty() {
483            Ok(())
484        } else {
485            Err(errors)
486        }
487    }
488
489    /// Disconnects all engine clients.
490    #[allow(clippy::await_holding_refcell_ref)]
491    async fn disconnect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
492        let mut data_engine = self.data_engine.borrow_mut();
493        let mut data_adapters = data_engine.get_clients_mut();
494        let mut futures = Vec::with_capacity(data_adapters.len());
495
496        for adapter in &mut data_adapters {
497            futures.push(adapter.disconnect());
498        }
499
500        let results = join_all(futures).await;
501        let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
502
503        if errors.is_empty() {
504            Ok(())
505        } else {
506            Err(errors)
507        }
508    }
509
510    /// Stops engine clients.
511    fn stop_clients(&self) {
512        self.data_engine.borrow_mut().stop();
513    }
514
515    /// Initializes the portfolio (orders & positions).
516    const fn initialize_portfolio(&self) {
517        // TODO: Placeholder: portfolio initialization to be implemented in next pass
518    }
519
520    /// Awaits engine clients to connect and initialize.
521    ///
522    /// Blocks until connected or timeout.
523    const fn await_engines_connected(&self) {
524        // TODO: await engine connections with timeout
525    }
526
527    /// Awaits execution engine state reconciliation.
528    ///
529    /// Blocks until executions are reconciled or timeout.
530    const fn await_execution_reconciliation(&self) {
531        // TODO: await execution reconciliation with timeout
532    }
533
534    /// Awaits portfolio initialization.
535    ///
536    /// Blocks until portfolio is initialized or timeout.
537    const fn await_portfolio_initialized(&self) {
538        // TODO: await portfolio initialization with timeout
539    }
540
541    /// Awaits post-stop trader residual events.
542    ///
543    /// Allows final cleanup before full shutdown.
544    const fn await_trader_residuals(&self) {
545        // TODO: await trader residual events after stop
546    }
547
548    /// Checks if engine clients are connected.
549    const fn check_engines_connected(&self) {
550        // TODO: check engine connection status
551    }
552
553    /// Checks if engine clients are disconnected.
554    const fn check_engines_disconnected(&self) {
555        // TODO: check engine disconnection status
556    }
557
558    /// Checks if the portfolio has been initialized.
559    const fn check_portfolio_initialized(&self) {
560        // TODO: check portfolio initialized status
561    }
562
563    /// Flushes the stream writer.
564    const fn flush_writer(&self) {
565        // TODO: No writer in this kernel version; placeholder for future streaming
566    }
567}