nautilus_system/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, rc::Rc};
21
22use futures::future::join_all;
23use nautilus_common::{
24    cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
25    clock::{Clock, LiveClock, TestClock},
26    component::Component,
27    enums::Environment,
28    logging::{
29        headers, init_logging, init_tracing,
30        logger::{LogGuard, LoggerConfig},
31        writer::FileWriterConfig,
32    },
33    msgbus::{MessageBus, get_message_bus, set_message_bus},
34    runtime::get_runtime,
35};
36use nautilus_core::{UUID4, UnixNanos};
37use posei_trader::engine::DataEngine;
38use nautilus_execution::engine::ExecutionEngine;
39use nautilus_model::identifiers::TraderId;
40use nautilus_portfolio::portfolio::Portfolio;
41use nautilus_risk::engine::RiskEngine;
42use nautilus_trading::trader::Trader;
43use ustr::Ustr;
44
45use crate::{builder::PoseiKernelBuilder, config::PoseiKernelConfig};
46
47/// Core Posei system kernel.
48///
49/// Orchestrates data and execution engines, cache, clock, and messaging across environments.
50#[derive(Debug)]
51pub struct PoseiKernel {
52    /// The kernel name (for logging and identification).
53    pub name: String,
54    /// The unique instance identifier for this kernel.
55    pub instance_id: UUID4,
56    /// The machine identifier (hostname or similar).
57    pub machine_id: String,
58    /// The kernel configuration.
59    pub config: Box<dyn PoseiKernelConfig>,
60    /// The shared in-memory cache.
61    pub cache: Rc<RefCell<Cache>>,
62    /// The clock driving the kernel.
63    pub clock: Rc<RefCell<dyn Clock>>,
64    /// The portfolio manager.
65    pub portfolio: Portfolio,
66    /// Guard for the logging subsystem (keeps logger thread alive).
67    pub log_guard: LogGuard,
68    /// The data engine instance.
69    pub data_engine: DataEngine,
70    /// The risk engine instance.
71    pub risk_engine: RiskEngine,
72    /// The execution engine instance.
73    pub exec_engine: ExecutionEngine,
74    /// The trader component.
75    pub trader: Trader,
76    /// The UNIX timestamp (nanoseconds) when the kernel was created.
77    pub ts_created: UnixNanos,
78    /// The UNIX timestamp (nanoseconds) when the kernel was last started.
79    pub ts_started: Option<UnixNanos>,
80    /// The UNIX timestamp (nanoseconds) when the kernel was last shutdown.
81    pub ts_shutdown: Option<UnixNanos>,
82}
83
84impl PoseiKernel {
85    /// Create a new [`PoseiKernelBuilder`] for fluent configuration.
86    #[must_use]
87    pub const fn builder(
88        name: String,
89        trader_id: TraderId,
90        environment: nautilus_common::enums::Environment,
91    ) -> PoseiKernelBuilder {
92        PoseiKernelBuilder::new(name, trader_id, environment)
93    }
94
95    /// Create a new [`PoseiKernel`] instance.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the kernel fails to initialize.
100    pub fn new<T: PoseiKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
101        let instance_id = config.instance_id().unwrap_or_default();
102        let machine_id = Self::determine_machine_id()?;
103
104        let logger_config = config.logging();
105        let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
106        headers::log_header(
107            config.trader_id(),
108            &machine_id,
109            instance_id,
110            Ustr::from(stringify!(LiveNode)),
111        );
112
113        log::info!("Building system kernel");
114
115        let clock = Self::initialize_clock(&config.environment());
116        let cache = Self::initialize_cache(config.cache());
117
118        let msgbus = Rc::new(RefCell::new(MessageBus::new(
119            config.trader_id(),
120            instance_id,
121            Some(name.to_string()),
122            None,
123        )));
124        set_message_bus(msgbus);
125
126        let portfolio = Portfolio::new(cache.clone(), clock.clone(), config.portfolio());
127        let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
128        let risk_engine = RiskEngine::new(
129            config.risk_engine().unwrap_or_default(),
130            Portfolio::new(cache.clone(), clock.clone(), config.portfolio()),
131            clock.clone(),
132            cache.clone(),
133        );
134        let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
135
136        let trader = Trader::new(
137            config.trader_id(),
138            instance_id,
139            config.environment(),
140            clock.clone(),
141        );
142
143        let ts_created = clock.borrow().timestamp_ns();
144
145        Ok(Self {
146            name,
147            instance_id,
148            machine_id,
149            config: Box::new(config),
150            cache,
151            clock,
152            portfolio,
153            log_guard,
154            data_engine,
155            risk_engine,
156            exec_engine,
157            trader,
158            ts_created,
159            ts_started: None,
160            ts_shutdown: None,
161        })
162    }
163
164    fn determine_machine_id() -> anyhow::Result<String> {
165        Ok(hostname::get()?.to_string_lossy().into_owned())
166    }
167
168    fn initialize_logging(
169        trader_id: TraderId,
170        instance_id: UUID4,
171        config: LoggerConfig,
172    ) -> anyhow::Result<LogGuard> {
173        init_tracing()?;
174
175        let log_guard = init_logging(
176            trader_id,
177            instance_id,
178            config,
179            FileWriterConfig::default(), // TODO: Properly incorporate file writer config
180        )?;
181
182        Ok(log_guard)
183    }
184
185    fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
186        match environment {
187            Environment::Backtest => {
188                let test_clock = TestClock::new();
189                Rc::new(RefCell::new(test_clock))
190            }
191            Environment::Live | Environment::Sandbox => {
192                let live_clock = LiveClock::new();
193                Rc::new(RefCell::new(live_clock))
194            }
195        }
196    }
197
198    fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
199        let cache_config = cache_config.unwrap_or_default();
200
201        // TODO: Placeholder: persistent database adapter can be initialized here (e.g., Redis)
202        let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
203        let cache = Cache::new(Some(cache_config), cache_database);
204
205        Rc::new(RefCell::new(cache))
206    }
207
208    fn cancel_timers(&self) {
209        self.clock.borrow_mut().cancel_timers();
210    }
211
212    #[must_use]
213    pub fn generate_timestamp_ns(&self) -> UnixNanos {
214        self.clock.borrow().timestamp_ns()
215    }
216
217    /// Returns the kernel's environment context (Backtest, Sandbox, Live).
218    #[must_use]
219    pub fn environment(&self) -> Environment {
220        self.config.environment()
221    }
222
223    /// Returns the kernel's name.
224    #[must_use]
225    pub const fn name(&self) -> &str {
226        self.name.as_str()
227    }
228
229    /// Returns the kernel's trader ID.
230    #[must_use]
231    pub fn trader_id(&self) -> TraderId {
232        self.config.trader_id()
233    }
234
235    /// Returns the kernel's machine ID.
236    #[must_use]
237    pub fn machine_id(&self) -> &str {
238        &self.machine_id
239    }
240
241    /// Returns the kernel's instance ID.
242    #[must_use]
243    pub const fn instance_id(&self) -> UUID4 {
244        self.instance_id
245    }
246
247    /// Returns the UNIX timestamp (ns) when the kernel was created.
248    #[must_use]
249    pub const fn ts_created(&self) -> UnixNanos {
250        self.ts_created
251    }
252
253    /// Returns the UNIX timestamp (ns) when the kernel was last started.
254    #[must_use]
255    pub const fn ts_started(&self) -> Option<UnixNanos> {
256        self.ts_started
257    }
258
259    /// Returns the UNIX timestamp (ns) when the kernel was last shutdown.
260    #[must_use]
261    pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
262        self.ts_shutdown
263    }
264
265    /// Returns whether the kernel has been configured to load state.
266    #[must_use]
267    pub fn load_state(&self) -> bool {
268        self.config.load_state()
269    }
270
271    /// Returns whether the kernel has been configured to save state.
272    #[must_use]
273    pub fn save_state(&self) -> bool {
274        self.config.save_state()
275    }
276
277    /// Returns the kernel's clock.
278    #[must_use]
279    pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
280        self.clock.clone()
281    }
282
283    /// Returns the kernel's cache.
284    #[must_use]
285    pub fn cache(&self) -> Rc<RefCell<Cache>> {
286        self.cache.clone()
287    }
288
289    /// Returns the kernel's message bus.  // TODO: TBD if this is necessary
290    #[must_use]
291    pub fn msgbus(&self) -> Rc<RefCell<MessageBus>> {
292        get_message_bus()
293    }
294
295    /// Returns the kernel's portfolio.
296    #[must_use]
297    pub const fn portfolio(&self) -> &Portfolio {
298        &self.portfolio
299    }
300
301    /// Returns the kernel's data engine.
302    #[must_use]
303    pub const fn data_engine(&self) -> &DataEngine {
304        &self.data_engine
305    }
306
307    /// Returns the kernel's risk engine.
308    #[must_use]
309    pub const fn risk_engine(&self) -> &RiskEngine {
310        &self.risk_engine
311    }
312
313    /// Returns the kernel's execution engine.
314    #[must_use]
315    pub const fn exec_engine(&self) -> &ExecutionEngine {
316        &self.exec_engine
317    }
318
319    /// Returns the kernel's trader.
320    #[must_use]
321    pub const fn trader(&self) -> &Trader {
322        &self.trader
323    }
324
325    /// Starts the Posei system kernel.
326    pub fn start(&mut self) {
327        self.start_engines();
328
329        // Initialize the trader first
330        if let Err(e) = self.trader.initialize() {
331            log::error!("Error initializing trader: {e:?}");
332            return;
333        }
334
335        // Start the trader (it will start all registered components)
336        if let Err(e) = self.trader.start() {
337            log::error!("Error starting trader: {e:?}");
338        }
339
340        // Connect all adapter clients
341        tokio::task::block_in_place(|| {
342            get_runtime().block_on(async {
343                if let Err(e) = self.connect_clients().await {
344                    log::error!("Error connecting clients: {e:?}");
345                }
346            });
347        });
348
349        self.ts_started = Some(self.clock.borrow().timestamp_ns());
350        log::info!("Posei system kernel started");
351    }
352
353    /// Stops the Posei system kernel.
354    pub fn stop(&mut self) {
355        log::info!("Stopping Posei system kernel");
356
357        // Stop the trader (it will stop all registered components)
358        if let Err(e) = self.trader.stop() {
359            log::error!("Error stopping trader: {e:?}");
360        }
361
362        // Disconnect all adapter clients
363        tokio::task::block_in_place(|| {
364            get_runtime().block_on(async {
365                if let Err(e) = self.disconnect_clients().await {
366                    log::error!("Error disconnecting clients: {e:?}");
367                }
368            });
369        });
370
371        self.stop_engines();
372        self.cancel_timers();
373
374        self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
375        log::info!("Posei system kernel stopped");
376    }
377
378    /// Resets the Posei system kernel to its initial state.
379    pub fn reset(&mut self) {
380        if let Err(e) = self.trader.reset() {
381            log::error!("Error resetting trader: {e:?}");
382        }
383
384        // Reset engines
385        self.data_engine.reset();
386        // TODO: Reset other engines when reset methods are available
387
388        self.ts_started = None;
389        self.ts_shutdown = None;
390
391        log::info!("Posei system kernel reset");
392    }
393
394    /// Disposes of the Posei system kernel, releasing resources.
395    pub fn dispose(&mut self) {
396        if let Err(e) = self.trader.dispose() {
397            log::error!("Error disposing trader: {e:?}");
398        }
399
400        self.stop_engines();
401
402        self.data_engine.dispose();
403        // TODO: Implement dispose methods for other engines
404
405        log::info!("Posei system kernel disposed");
406    }
407
408    /// Cancels all tasks currently running under the kernel.
409    ///
410    /// Intended for cleanup during shutdown.
411    const fn cancel_all_tasks(&self) {
412        // TODO: implement task cancellation logic for async contexts
413    }
414
415    /// Starts all engine components.
416    fn start_engines(&self) {
417        self.data_engine.start();
418        // TODO: Start other engines when methods are available
419    }
420
421    /// Stops all engine components.
422    fn stop_engines(&self) {
423        self.data_engine.stop();
424        // TODO: Stop other engines when methods are available
425    }
426
427    /// Connects all engine clients.
428    async fn connect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
429        let data_adapters = self.data_engine.get_clients_mut();
430        let mut futures = Vec::with_capacity(data_adapters.len());
431
432        for adapter in data_adapters {
433            futures.push(adapter.get_client().connect());
434        }
435
436        let results = join_all(futures).await;
437        let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
438
439        if errors.is_empty() {
440            Ok(())
441        } else {
442            Err(errors)
443        }
444    }
445
446    /// Disconnects all engine clients.
447    async fn disconnect_clients(&mut self) -> Result<(), Vec<anyhow::Error>> {
448        let data_adapters = self.data_engine.get_clients_mut();
449        let mut futures = Vec::with_capacity(data_adapters.len());
450
451        for adapter in data_adapters {
452            futures.push(adapter.get_client().disconnect());
453        }
454
455        let results = join_all(futures).await;
456        let errors: Vec<anyhow::Error> = results.into_iter().filter_map(Result::err).collect();
457
458        if errors.is_empty() {
459            Ok(())
460        } else {
461            Err(errors)
462        }
463    }
464
465    /// Stops engine clients.
466    fn stop_clients(&self) {
467        self.data_engine.stop();
468    }
469
470    /// Initializes the portfolio (orders & positions).
471    const fn initialize_portfolio(&self) {
472        // TODO: Placeholder: portfolio initialization to be implemented in next pass
473    }
474
475    /// Awaits engine clients to connect and initialize.
476    ///
477    /// Blocks until connected or timeout.
478    const fn await_engines_connected(&self) {
479        // TODO: await engine connections with timeout
480    }
481
482    /// Awaits execution engine state reconciliation.
483    ///
484    /// Blocks until executions are reconciled or timeout.
485    const fn await_execution_reconciliation(&self) {
486        // TODO: await execution reconciliation with timeout
487    }
488
489    /// Awaits portfolio initialization.
490    ///
491    /// Blocks until portfolio is initialized or timeout.
492    const fn await_portfolio_initialized(&self) {
493        // TODO: await portfolio initialization with timeout
494    }
495
496    /// Awaits post-stop trader residual events.
497    ///
498    /// Allows final cleanup before full shutdown.
499    const fn await_trader_residuals(&self) {
500        // TODO: await trader residual events after stop
501    }
502
503    /// Checks if engine clients are connected.
504    const fn check_engines_connected(&self) {
505        // TODO: check engine connection status
506    }
507
508    /// Checks if engine clients are disconnected.
509    const fn check_engines_disconnected(&self) {
510        // TODO: check engine disconnection status
511    }
512
513    /// Checks if the portfolio has been initialized.
514    const fn check_portfolio_initialized(&self) {
515        // TODO: check portfolio initialized status
516    }
517
518    /// Flushes the stream writer.
519    const fn flush_writer(&self) {
520        // TODO: No writer in this kernel version; placeholder for future streaming
521    }
522}