1#![allow(unsafe_code)]
23
24use std::{
25 cell::{RefCell, UnsafeCell},
26 collections::HashMap,
27 fmt::Debug,
28 rc::Rc,
29};
30
31use nautilus_model::identifiers::{ComponentId, TraderId};
32use ustr::Ustr;
33
34use crate::{
35 actor::{Actor, registry::get_actor_registry},
36 cache::Cache,
37 clock::Clock,
38 enums::{ComponentState, ComponentTrigger},
39};
40
41pub trait Component {
43 fn component_id(&self) -> ComponentId;
45
46 fn state(&self) -> ComponentState;
48
49 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()>;
55
56 fn is_ready(&self) -> bool {
58 self.state() == ComponentState::Ready
59 }
60
61 fn not_running(&self) -> bool {
63 !self.is_running()
64 }
65
66 fn is_running(&self) -> bool {
68 self.state() == ComponentState::Running
69 }
70
71 fn is_stopped(&self) -> bool {
73 self.state() == ComponentState::Stopped
74 }
75
76 fn is_degraded(&self) -> bool {
78 self.state() == ComponentState::Degraded
79 }
80
81 fn is_faulted(&self) -> bool {
83 self.state() == ComponentState::Faulted
84 }
85
86 fn is_disposed(&self) -> bool {
88 self.state() == ComponentState::Disposed
89 }
90
91 fn register(
97 &mut self,
98 trader_id: TraderId,
99 clock: Rc<RefCell<dyn Clock>>,
100 cache: Rc<RefCell<Cache>>,
101 ) -> anyhow::Result<()>;
102
103 fn initialize(&mut self) -> anyhow::Result<()> {
109 self.transition_state(ComponentTrigger::Initialize)
110 }
111
112 fn start(&mut self) -> anyhow::Result<()> {
118 self.transition_state(ComponentTrigger::Start)?; if let Err(e) = self.on_start() {
121 log_error(&e);
122 return Err(e); }
124
125 self.transition_state(ComponentTrigger::StartCompleted)?;
126
127 Ok(())
128 }
129
130 fn stop(&mut self) -> anyhow::Result<()> {
136 self.transition_state(ComponentTrigger::Stop)?; if let Err(e) = self.on_stop() {
139 log_error(&e);
140 return Err(e); }
142
143 self.transition_state(ComponentTrigger::StopCompleted)?;
144
145 Ok(())
146 }
147
148 fn resume(&mut self) -> anyhow::Result<()> {
154 self.transition_state(ComponentTrigger::Resume)?; if let Err(e) = self.on_resume() {
157 log_error(&e);
158 return Err(e); }
160
161 self.transition_state(ComponentTrigger::ResumeCompleted)?;
162
163 Ok(())
164 }
165
166 fn degrade(&mut self) -> anyhow::Result<()> {
172 self.transition_state(ComponentTrigger::Degrade)?; if let Err(e) = self.on_degrade() {
175 log_error(&e);
176 return Err(e); }
178
179 self.transition_state(ComponentTrigger::DegradeCompleted)?;
180
181 Ok(())
182 }
183
184 fn fault(&mut self) -> anyhow::Result<()> {
190 self.transition_state(ComponentTrigger::Fault)?; if let Err(e) = self.on_fault() {
193 log_error(&e);
194 return Err(e); }
196
197 self.transition_state(ComponentTrigger::FaultCompleted)?;
198
199 Ok(())
200 }
201
202 fn reset(&mut self) -> anyhow::Result<()> {
208 self.transition_state(ComponentTrigger::Reset)?; if let Err(e) = self.on_reset() {
211 log_error(&e);
212 return Err(e); }
214
215 self.transition_state(ComponentTrigger::ResetCompleted)?;
216
217 Ok(())
218 }
219
220 fn dispose(&mut self) -> anyhow::Result<()> {
226 self.transition_state(ComponentTrigger::Dispose)?; if let Err(e) = self.on_dispose() {
229 log_error(&e);
230 return Err(e); }
232
233 self.transition_state(ComponentTrigger::DisposeCompleted)?;
234
235 Ok(())
236 }
237
238 fn on_start(&mut self) -> anyhow::Result<()> {
244 log::warn!(
245 "The `on_start` handler was called when not overridden, \
246 it's expected that any actions required when stopping the component \
247 occur here, such as unsubscribing from data",
248 );
249 Ok(())
250 }
251
252 fn on_stop(&mut self) -> anyhow::Result<()> {
258 log::warn!(
259 "The `on_stop` handler was called when not overridden, \
260 it's expected that any actions required when stopping the component \
261 occur here, such as unsubscribing from data",
262 );
263 Ok(())
264 }
265
266 fn on_resume(&mut self) -> anyhow::Result<()> {
272 log::warn!(
273 "The `on_resume` handler was called when not overridden, \
274 it's expected that any actions required when resuming the component \
275 following a stop occur here"
276 );
277 Ok(())
278 }
279
280 fn on_reset(&mut self) -> anyhow::Result<()> {
286 log::warn!(
287 "The `on_reset` handler was called when not overridden, \
288 it's expected that any actions required when resetting the component \
289 occur here, such as resetting indicators and other state"
290 );
291 Ok(())
292 }
293
294 fn on_dispose(&mut self) -> anyhow::Result<()> {
300 Ok(())
301 }
302
303 fn on_degrade(&mut self) -> anyhow::Result<()> {
309 Ok(())
310 }
311
312 fn on_fault(&mut self) -> anyhow::Result<()> {
318 Ok(())
319 }
320}
321
322fn log_error(e: &anyhow::Error) {
323 log::error!("{e}");
324}
325
326#[rustfmt::skip]
327impl ComponentState {
328 pub fn transition(&mut self, trigger: &ComponentTrigger) -> anyhow::Result<Self> {
334 let new_state = match (&self, trigger) {
335 (Self::PreInitialized, ComponentTrigger::Initialize) => Self::Ready,
336 (Self::Ready, ComponentTrigger::Reset) => Self::Resetting,
337 (Self::Ready, ComponentTrigger::Start) => Self::Starting,
338 (Self::Ready, ComponentTrigger::Dispose) => Self::Disposing,
339 (Self::Resetting, ComponentTrigger::ResetCompleted) => Self::Ready,
340 (Self::Starting, ComponentTrigger::StartCompleted) => Self::Running,
341 (Self::Starting, ComponentTrigger::Stop) => Self::Stopping,
342 (Self::Starting, ComponentTrigger::Fault) => Self::Faulting,
343 (Self::Running, ComponentTrigger::Stop) => Self::Stopping,
344 (Self::Running, ComponentTrigger::Degrade) => Self::Degrading,
345 (Self::Running, ComponentTrigger::Fault) => Self::Faulting,
346 (Self::Resuming, ComponentTrigger::Stop) => Self::Stopping,
347 (Self::Resuming, ComponentTrigger::ResumeCompleted) => Self::Running,
348 (Self::Resuming, ComponentTrigger::Fault) => Self::Faulting,
349 (Self::Stopping, ComponentTrigger::StopCompleted) => Self::Stopped,
350 (Self::Stopping, ComponentTrigger::Fault) => Self::Faulting,
351 (Self::Stopped, ComponentTrigger::Reset) => Self::Resetting,
352 (Self::Stopped, ComponentTrigger::Resume) => Self::Resuming,
353 (Self::Stopped, ComponentTrigger::Dispose) => Self::Disposing,
354 (Self::Stopped, ComponentTrigger::Fault) => Self::Faulting,
355 (Self::Degrading, ComponentTrigger::DegradeCompleted) => Self::Degraded,
356 (Self::Degraded, ComponentTrigger::Resume) => Self::Resuming,
357 (Self::Degraded, ComponentTrigger::Stop) => Self::Stopping,
358 (Self::Degraded, ComponentTrigger::Fault) => Self::Faulting,
359 (Self::Disposing, ComponentTrigger::DisposeCompleted) => Self::Disposed,
360 (Self::Faulting, ComponentTrigger::FaultCompleted) => Self::Faulted,
361 _ => anyhow::bail!("Invalid state trigger {self} -> {trigger}"),
362 };
363 Ok(new_state)
364 }
365}
366
367thread_local! {
368 static COMPONENT_REGISTRY: ComponentRegistry = ComponentRegistry::new();
369}
370
371pub struct ComponentRegistry {
373 components: RefCell<HashMap<Ustr, Rc<UnsafeCell<dyn Component>>>>,
374}
375
376impl Debug for ComponentRegistry {
377 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
378 let components_ref = self.components.borrow();
379 let keys: Vec<&Ustr> = components_ref.keys().collect();
380 f.debug_struct(stringify!(ComponentRegistry))
381 .field("components", &keys)
382 .finish()
383 }
384}
385
386impl Default for ComponentRegistry {
387 fn default() -> Self {
388 Self::new()
389 }
390}
391
392impl ComponentRegistry {
393 pub fn new() -> Self {
394 Self {
395 components: RefCell::new(HashMap::new()),
396 }
397 }
398
399 pub fn insert(&self, id: Ustr, component: Rc<UnsafeCell<dyn Component>>) {
400 self.components.borrow_mut().insert(id, component);
401 }
402
403 pub fn get(&self, id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
404 self.components.borrow().get(id).cloned()
405 }
406}
407
408pub fn get_component_registry() -> &'static ComponentRegistry {
410 COMPONENT_REGISTRY.with(|registry| unsafe {
411 std::mem::transmute::<&ComponentRegistry, &'static ComponentRegistry>(registry)
414 })
415}
416
417pub fn register_component<T>(component: T) -> Rc<UnsafeCell<T>>
419where
420 T: Component + 'static,
421{
422 let component_id = component.component_id().inner();
423 let component_ref = Rc::new(UnsafeCell::new(component));
424
425 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
427 get_component_registry().insert(component_id, component_trait_ref);
428
429 component_ref
430}
431
432pub fn register_component_actor<T>(component: T) -> Rc<UnsafeCell<T>>
434where
435 T: Component + Actor + 'static,
436{
437 let component_id = component.component_id().inner();
438 let actor_id = component.id();
439 let component_ref = Rc::new(UnsafeCell::new(component));
440
441 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
443 get_component_registry().insert(component_id, component_trait_ref);
444
445 let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = component_ref.clone();
447 get_actor_registry().insert(actor_id, actor_trait_ref);
448
449 component_ref
450}
451
452pub fn start_component(id: &Ustr) -> anyhow::Result<()> {
458 if let Some(component_ref) = get_component_registry().get(id) {
459 unsafe {
461 let component = &mut *component_ref.get();
462 component.start()
463 }
464 } else {
465 anyhow::bail!("Component '{id}' not found in global registry");
466 }
467}
468
469pub fn stop_component(id: &Ustr) -> anyhow::Result<()> {
475 if let Some(component_ref) = get_component_registry().get(id) {
476 unsafe {
477 let component = &mut *component_ref.get();
478 component.stop()
479 }
480 } else {
481 anyhow::bail!("Component '{id}' not found in global registry");
482 }
483}
484
485pub fn reset_component(id: &Ustr) -> anyhow::Result<()> {
491 if let Some(component_ref) = get_component_registry().get(id) {
492 unsafe {
493 let component = &mut *component_ref.get();
494 component.reset()
495 }
496 } else {
497 anyhow::bail!("Component '{id}' not found in global registry");
498 }
499}
500
501pub fn dispose_component(id: &Ustr) -> anyhow::Result<()> {
507 if let Some(component_ref) = get_component_registry().get(id) {
508 unsafe {
509 let component = &mut *component_ref.get();
510 component.dispose()
511 }
512 } else {
513 anyhow::bail!("Component '{id}' not found in global registry");
514 }
515}
516
517pub fn get_component(id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
519 get_component_registry().get(id)
520}
521
522#[cfg(test)]
523pub fn clear_component_registry() {
525 get_component_registry().components.borrow_mut().clear();
527}