nautilus_common/
component.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Component system for managing stateful system entities.
17//!
18//! This module provides the component framework for managing the lifecycle and state
19//! of system entities. Components have defined states (pre-initialized, ready, running,
20//! stopped, etc.) and provide a consistent interface for state management and transitions.
21
22#![allow(unsafe_code)]
23
24use std::{
25    cell::{RefCell, UnsafeCell},
26    collections::HashMap,
27    fmt::Debug,
28    rc::Rc,
29};
30
31use nautilus_model::identifiers::{ComponentId, TraderId};
32use ustr::Ustr;
33
34use crate::{
35    actor::{Actor, registry::get_actor_registry},
36    cache::Cache,
37    clock::Clock,
38    enums::{ComponentState, ComponentTrigger},
39};
40
41/// Components have state and lifecycle management capabilities.
42pub trait Component {
43    /// Returns the unique identifier for this component.
44    fn component_id(&self) -> ComponentId;
45
46    /// Returns the current state of the component.
47    fn state(&self) -> ComponentState;
48
49    /// Transition the component with the state trigger.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the `trigger` is an invalid transition from the current state.
54    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()>;
55
56    /// Returns whether the component is ready.
57    fn is_ready(&self) -> bool {
58        self.state() == ComponentState::Ready
59    }
60
61    /// Returns whether the component is *not* running.
62    fn not_running(&self) -> bool {
63        !self.is_running()
64    }
65
66    /// Returns whether the component is running.
67    fn is_running(&self) -> bool {
68        self.state() == ComponentState::Running
69    }
70
71    /// Returns whether the component is stopped.
72    fn is_stopped(&self) -> bool {
73        self.state() == ComponentState::Stopped
74    }
75
76    /// Returns whether the component has been degraded.
77    fn is_degraded(&self) -> bool {
78        self.state() == ComponentState::Degraded
79    }
80
81    /// Returns whether the component has been faulted.
82    fn is_faulted(&self) -> bool {
83        self.state() == ComponentState::Faulted
84    }
85
86    /// Returns whether the component has been disposed.
87    fn is_disposed(&self) -> bool {
88        self.state() == ComponentState::Disposed
89    }
90
91    /// Registers the component with a system.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the component fails to register.
96    fn register(
97        &mut self,
98        trader_id: TraderId,
99        clock: Rc<RefCell<dyn Clock>>,
100        cache: Rc<RefCell<Cache>>,
101    ) -> anyhow::Result<()>;
102
103    /// Initializes the component.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the initialization state transition fails.
108    fn initialize(&mut self) -> anyhow::Result<()> {
109        self.transition_state(ComponentTrigger::Initialize)
110    }
111
112    /// Starts the component.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the component fails to start.
117    fn start(&mut self) -> anyhow::Result<()> {
118        self.transition_state(ComponentTrigger::Start)?; // -> Starting
119
120        if let Err(e) = self.on_start() {
121            log_error(&e);
122            return Err(e); // Halt state transition
123        }
124
125        self.transition_state(ComponentTrigger::StartCompleted)?;
126
127        Ok(())
128    }
129
130    /// Stops the component.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the component fails to stop.
135    fn stop(&mut self) -> anyhow::Result<()> {
136        self.transition_state(ComponentTrigger::Stop)?; // -> Stopping
137
138        if let Err(e) = self.on_stop() {
139            log_error(&e);
140            return Err(e); // Halt state transition
141        }
142
143        self.transition_state(ComponentTrigger::StopCompleted)?;
144
145        Ok(())
146    }
147
148    /// Resumes the component.
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if the component fails to resume.
153    fn resume(&mut self) -> anyhow::Result<()> {
154        self.transition_state(ComponentTrigger::Resume)?; // -> Resuming
155
156        if let Err(e) = self.on_resume() {
157            log_error(&e);
158            return Err(e); // Halt state transition
159        }
160
161        self.transition_state(ComponentTrigger::ResumeCompleted)?;
162
163        Ok(())
164    }
165
166    /// Degrades the component.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if the component fails to degrade.
171    fn degrade(&mut self) -> anyhow::Result<()> {
172        self.transition_state(ComponentTrigger::Degrade)?; // -> Degrading
173
174        if let Err(e) = self.on_degrade() {
175            log_error(&e);
176            return Err(e); // Halt state transition
177        }
178
179        self.transition_state(ComponentTrigger::DegradeCompleted)?;
180
181        Ok(())
182    }
183
184    /// Faults the component.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the component fails to fault.
189    fn fault(&mut self) -> anyhow::Result<()> {
190        self.transition_state(ComponentTrigger::Fault)?; // -> Faulting
191
192        if let Err(e) = self.on_fault() {
193            log_error(&e);
194            return Err(e); // Halt state transition
195        }
196
197        self.transition_state(ComponentTrigger::FaultCompleted)?;
198
199        Ok(())
200    }
201
202    /// Resets the component to its initial state.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the component fails to reset.
207    fn reset(&mut self) -> anyhow::Result<()> {
208        self.transition_state(ComponentTrigger::Reset)?; // -> Resetting
209
210        if let Err(e) = self.on_reset() {
211            log_error(&e);
212            return Err(e); // Halt state transition
213        }
214
215        self.transition_state(ComponentTrigger::ResetCompleted)?;
216
217        Ok(())
218    }
219
220    /// Disposes of the component, releasing any resources.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the component fails to dispose.
225    fn dispose(&mut self) -> anyhow::Result<()> {
226        self.transition_state(ComponentTrigger::Dispose)?; // -> Disposing
227
228        if let Err(e) = self.on_dispose() {
229            log_error(&e);
230            return Err(e); // Halt state transition
231        }
232
233        self.transition_state(ComponentTrigger::DisposeCompleted)?;
234
235        Ok(())
236    }
237
238    /// Actions to be performed on start.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if starting the actor fails.
243    fn on_start(&mut self) -> anyhow::Result<()> {
244        log::warn!(
245            "The `on_start` handler was called when not overridden, \
246            it's expected that any actions required when stopping the component \
247            occur here, such as unsubscribing from data",
248        );
249        Ok(())
250    }
251
252    /// Actions to be performed on stop.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if stopping the actor fails.
257    fn on_stop(&mut self) -> anyhow::Result<()> {
258        log::warn!(
259            "The `on_stop` handler was called when not overridden, \
260            it's expected that any actions required when stopping the component \
261            occur here, such as unsubscribing from data",
262        );
263        Ok(())
264    }
265
266    /// Actions to be performed on resume.
267    ///
268    /// # Errors
269    ///
270    /// Returns an error if resuming the actor fails.
271    fn on_resume(&mut self) -> anyhow::Result<()> {
272        log::warn!(
273            "The `on_resume` handler was called when not overridden, \
274            it's expected that any actions required when resuming the component \
275            following a stop occur here"
276        );
277        Ok(())
278    }
279
280    /// Actions to be performed on reset.
281    ///
282    /// # Errors
283    ///
284    /// Returns an error if resetting the actor fails.
285    fn on_reset(&mut self) -> anyhow::Result<()> {
286        log::warn!(
287            "The `on_reset` handler was called when not overridden, \
288            it's expected that any actions required when resetting the component \
289            occur here, such as resetting indicators and other state"
290        );
291        Ok(())
292    }
293
294    /// Actions to be performed on dispose.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if disposing the actor fails.
299    fn on_dispose(&mut self) -> anyhow::Result<()> {
300        Ok(())
301    }
302
303    /// Actions to be performed on degrade.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if degrading the actor fails.
308    fn on_degrade(&mut self) -> anyhow::Result<()> {
309        Ok(())
310    }
311
312    /// Actions to be performed on fault.
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if faulting the actor fails.
317    fn on_fault(&mut self) -> anyhow::Result<()> {
318        Ok(())
319    }
320}
321
322fn log_error(e: &anyhow::Error) {
323    log::error!("{e}");
324}
325
326#[rustfmt::skip]
327impl ComponentState {
328    /// Transition the state machine with the component `trigger`.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if `trigger` is invalid for the current state.
333    pub fn transition(&mut self, trigger: &ComponentTrigger) -> anyhow::Result<Self> {
334        let new_state = match (&self, trigger) {
335            (Self::PreInitialized, ComponentTrigger::Initialize) => Self::Ready,
336            (Self::Ready, ComponentTrigger::Reset) => Self::Resetting,
337            (Self::Ready, ComponentTrigger::Start) => Self::Starting,
338            (Self::Ready, ComponentTrigger::Dispose) => Self::Disposing,
339            (Self::Resetting, ComponentTrigger::ResetCompleted) => Self::Ready,
340            (Self::Starting, ComponentTrigger::StartCompleted) => Self::Running,
341            (Self::Starting, ComponentTrigger::Stop) => Self::Stopping,
342            (Self::Starting, ComponentTrigger::Fault) => Self::Faulting,
343            (Self::Running, ComponentTrigger::Stop) => Self::Stopping,
344            (Self::Running, ComponentTrigger::Degrade) => Self::Degrading,
345            (Self::Running, ComponentTrigger::Fault) => Self::Faulting,
346            (Self::Resuming, ComponentTrigger::Stop) => Self::Stopping,
347            (Self::Resuming, ComponentTrigger::ResumeCompleted) => Self::Running,
348            (Self::Resuming, ComponentTrigger::Fault) => Self::Faulting,
349            (Self::Stopping, ComponentTrigger::StopCompleted) => Self::Stopped,
350            (Self::Stopping, ComponentTrigger::Fault) => Self::Faulting,
351            (Self::Stopped, ComponentTrigger::Reset) => Self::Resetting,
352            (Self::Stopped, ComponentTrigger::Resume) => Self::Resuming,
353            (Self::Stopped, ComponentTrigger::Dispose) => Self::Disposing,
354            (Self::Stopped, ComponentTrigger::Fault) => Self::Faulting,
355            (Self::Degrading, ComponentTrigger::DegradeCompleted) => Self::Degraded,
356            (Self::Degraded, ComponentTrigger::Resume) => Self::Resuming,
357            (Self::Degraded, ComponentTrigger::Stop) => Self::Stopping,
358            (Self::Degraded, ComponentTrigger::Fault) => Self::Faulting,
359            (Self::Disposing, ComponentTrigger::DisposeCompleted) => Self::Disposed,
360            (Self::Faulting, ComponentTrigger::FaultCompleted) => Self::Faulted,
361            _ => anyhow::bail!("Invalid state trigger {self} -> {trigger}"),
362        };
363        Ok(new_state)
364    }
365}
366
367thread_local! {
368    static COMPONENT_REGISTRY: ComponentRegistry = ComponentRegistry::new();
369}
370
371/// Registry for storing components.
372pub struct ComponentRegistry {
373    components: RefCell<HashMap<Ustr, Rc<UnsafeCell<dyn Component>>>>,
374}
375
376impl Debug for ComponentRegistry {
377    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
378        let components_ref = self.components.borrow();
379        let keys: Vec<&Ustr> = components_ref.keys().collect();
380        f.debug_struct(stringify!(ComponentRegistry))
381            .field("components", &keys)
382            .finish()
383    }
384}
385
386impl Default for ComponentRegistry {
387    fn default() -> Self {
388        Self::new()
389    }
390}
391
392impl ComponentRegistry {
393    pub fn new() -> Self {
394        Self {
395            components: RefCell::new(HashMap::new()),
396        }
397    }
398
399    pub fn insert(&self, id: Ustr, component: Rc<UnsafeCell<dyn Component>>) {
400        self.components.borrow_mut().insert(id, component);
401    }
402
403    pub fn get(&self, id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
404        self.components.borrow().get(id).cloned()
405    }
406}
407
408/// Returns a reference to the global component registry.
409pub fn get_component_registry() -> &'static ComponentRegistry {
410    COMPONENT_REGISTRY.with(|registry| unsafe {
411        // SAFETY: We return a static reference that lives for the lifetime of the thread.
412        // Since this is thread_local storage, each thread has its own instance.
413        std::mem::transmute::<&ComponentRegistry, &'static ComponentRegistry>(registry)
414    })
415}
416
417/// Registers a component.
418pub fn register_component<T>(component: T) -> Rc<UnsafeCell<T>>
419where
420    T: Component + 'static,
421{
422    let component_id = component.component_id().inner();
423    let component_ref = Rc::new(UnsafeCell::new(component));
424
425    // Register in component registry
426    let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
427    get_component_registry().insert(component_id, component_trait_ref);
428
429    component_ref
430}
431
432/// Registers a component that also implements Actor.
433pub fn register_component_actor<T>(component: T) -> Rc<UnsafeCell<T>>
434where
435    T: Component + Actor + 'static,
436{
437    let component_id = component.component_id().inner();
438    let actor_id = component.id();
439    let component_ref = Rc::new(UnsafeCell::new(component));
440
441    // Register in component registry
442    let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
443    get_component_registry().insert(component_id, component_trait_ref);
444
445    // Register in actor registry
446    let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = component_ref.clone();
447    get_actor_registry().insert(actor_id, actor_trait_ref);
448
449    component_ref
450}
451
452/// Safely calls start() on a component in the global registry.
453///
454/// # Errors
455///
456/// Returns an error if the component is not found or if start() fails.
457pub fn start_component(id: &Ustr) -> anyhow::Result<()> {
458    if let Some(component_ref) = get_component_registry().get(id) {
459        // SAFETY: We have exclusive access to the component and are calling start() which takes &mut self
460        unsafe {
461            let component = &mut *component_ref.get();
462            component.start()
463        }
464    } else {
465        anyhow::bail!("Component '{id}' not found in global registry");
466    }
467}
468
469/// Safely calls stop() on a component in the global registry.
470///
471/// # Errors
472///
473/// Returns an error if the component is not found or if stop() fails.
474pub fn stop_component(id: &Ustr) -> anyhow::Result<()> {
475    if let Some(component_ref) = get_component_registry().get(id) {
476        unsafe {
477            let component = &mut *component_ref.get();
478            component.stop()
479        }
480    } else {
481        anyhow::bail!("Component '{id}' not found in global registry");
482    }
483}
484
485/// Safely calls reset() on a component in the global registry.
486///
487/// # Errors
488///
489/// Returns an error if the component is not found or if reset() fails.
490pub fn reset_component(id: &Ustr) -> anyhow::Result<()> {
491    if let Some(component_ref) = get_component_registry().get(id) {
492        unsafe {
493            let component = &mut *component_ref.get();
494            component.reset()
495        }
496    } else {
497        anyhow::bail!("Component '{id}' not found in global registry");
498    }
499}
500
501/// Safely calls dispose() on a component in the global registry.
502///
503/// # Errors
504///
505/// Returns an error if the component is not found or if dispose() fails.
506pub fn dispose_component(id: &Ustr) -> anyhow::Result<()> {
507    if let Some(component_ref) = get_component_registry().get(id) {
508        unsafe {
509            let component = &mut *component_ref.get();
510            component.dispose()
511        }
512    } else {
513        anyhow::bail!("Component '{id}' not found in global registry");
514    }
515}
516
517/// Returns a component from the global registry by ID.
518pub fn get_component(id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
519    get_component_registry().get(id)
520}
521
522#[cfg(test)]
523/// Clears the component registry (for test isolation).
524pub fn clear_component_registry() {
525    // SAFETY: tests should run single-threaded for component registry
526    get_component_registry().components.borrow_mut().clear();
527}