nautilus_common/msgbus/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler functionality for the message bus system.
17//!
18//! This module provides a trait and implementations for handling messages
19//! in a type-safe manner, enabling both typed and untyped message processing.
20
21use std::{
22    any::{Any, type_name},
23    fmt::Debug,
24    marker::PhantomData,
25    rc::Rc,
26};
27
28use nautilus_core::UUID4;
29use ustr::Ustr;
30
31pub trait MessageHandler: Any {
32    /// Returns the unique identifier for this handler.
33    fn id(&self) -> Ustr;
34    /// Handles a message of any type.
35    fn handle(&self, message: &dyn Any);
36    /// Returns this handler as a trait object.
37    fn as_any(&self) -> &dyn Any;
38}
39
40impl PartialEq for dyn MessageHandler {
41    fn eq(&self, other: &Self) -> bool {
42        self.id() == other.id()
43    }
44}
45
46impl Eq for dyn MessageHandler {}
47
48#[derive(Debug)]
49pub struct TypedMessageHandler<T: 'static + ?Sized, F: Fn(&T) + 'static> {
50    id: Ustr,
51    callback: F,
52    _phantom: PhantomData<T>,
53}
54
55impl<T: 'static, F: Fn(&T) + 'static> TypedMessageHandler<T, F> {
56    /// Creates a new handler with an optional custom ID.
57    pub fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
58        let id_ustr = id
59            .map(|s| Ustr::from(s.as_ref()))
60            .unwrap_or_else(|| generate_handler_id(&callback));
61
62        Self {
63            id: id_ustr,
64            callback,
65            _phantom: PhantomData,
66        }
67    }
68
69    /// Creates a new handler with an auto-generated ID.
70    pub fn from(callback: F) -> Self {
71        Self::new::<Ustr>(None, callback)
72    }
73}
74
75impl<T: 'static, F: Fn(&T) + 'static> MessageHandler for TypedMessageHandler<T, F> {
76    fn id(&self) -> Ustr {
77        self.id
78    }
79
80    fn handle(&self, message: &dyn Any) {
81        if let Some(typed_msg) = message.downcast_ref::<T>() {
82            (self.callback)(typed_msg);
83        } else {
84            log::error!("Expected message of type {}", type_name::<T>());
85        }
86    }
87
88    fn as_any(&self) -> &dyn Any {
89        self
90    }
91}
92
93impl<F: Fn(&dyn Any) + 'static> TypedMessageHandler<dyn Any, F> {
94    /// Creates a new handler for dynamic Any messages with an optional custom ID.
95    pub fn new_any<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
96        let id_ustr = id
97            .map(|s| Ustr::from(s.as_ref()))
98            .unwrap_or_else(|| generate_handler_id(&callback));
99
100        Self {
101            id: id_ustr,
102            callback,
103            _phantom: PhantomData,
104        }
105    }
106
107    /// Creates a handler for Any messages with an optional ID.
108    pub fn from_any<S: AsRef<str>>(id_opt: Option<S>, callback: F) -> Self {
109        Self::new_any(id_opt, callback)
110    }
111
112    /// Creates a handler for Any messages with an auto-generated ID.
113    pub fn with_any(callback: F) -> Self {
114        Self::new_any::<&str>(None, callback)
115    }
116}
117
118impl<F: Fn(&dyn Any) + 'static> MessageHandler for TypedMessageHandler<dyn Any, F> {
119    fn id(&self) -> Ustr {
120        self.id
121    }
122
123    fn handle(&self, message: &dyn Any) {
124        (self.callback)(message);
125    }
126
127    fn as_any(&self) -> &dyn Any {
128        self
129    }
130}
131
132fn generate_handler_id<T: 'static + ?Sized, F: 'static + Fn(&T)>(callback: &F) -> Ustr {
133    let callback_ptr = std::ptr::from_ref(callback);
134    let uuid = UUID4::new();
135    Ustr::from(&format!("<{callback_ptr:?}>-{uuid}"))
136}
137
138#[repr(transparent)]
139#[derive(Clone)]
140pub struct ShareableMessageHandler(pub Rc<dyn MessageHandler>);
141
142impl ShareableMessageHandler {
143    pub fn id(&self) -> Ustr {
144        self.0.id()
145    }
146}
147
148impl Debug for ShareableMessageHandler {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct(stringify!(ShareableMessageHandler))
151            .field("id", &self.0.id())
152            .field("type", &std::any::type_name::<Self>().to_string())
153            .finish()
154    }
155}
156
157impl From<Rc<dyn MessageHandler>> for ShareableMessageHandler {
158    fn from(value: Rc<dyn MessageHandler>) -> Self {
159        Self(value)
160    }
161}
162
163// SAFETY: Message handlers cannot be sent across thread boundaries
164#[allow(unsafe_code)]
165unsafe impl Send for ShareableMessageHandler {}