nautilus_common/msgbus/
handler.rs1use std::{
22 any::{Any, type_name},
23 fmt::Debug,
24 marker::PhantomData,
25 rc::Rc,
26};
27
28use nautilus_core::UUID4;
29use ustr::Ustr;
30
31pub trait MessageHandler: Any {
32 fn id(&self) -> Ustr;
34 fn handle(&self, message: &dyn Any);
36 fn as_any(&self) -> &dyn Any;
38}
39
40impl PartialEq for dyn MessageHandler {
41 fn eq(&self, other: &Self) -> bool {
42 self.id() == other.id()
43 }
44}
45
46impl Eq for dyn MessageHandler {}
47
48#[derive(Debug)]
49pub struct TypedMessageHandler<T: 'static + ?Sized, F: Fn(&T) + 'static> {
50 id: Ustr,
51 callback: F,
52 _phantom: PhantomData<T>,
53}
54
55impl<T: 'static, F: Fn(&T) + 'static> TypedMessageHandler<T, F> {
56 pub fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
58 let id_ustr = id
59 .map(|s| Ustr::from(s.as_ref()))
60 .unwrap_or_else(|| generate_handler_id(&callback));
61
62 Self {
63 id: id_ustr,
64 callback,
65 _phantom: PhantomData,
66 }
67 }
68
69 pub fn from(callback: F) -> Self {
71 Self::new::<Ustr>(None, callback)
72 }
73}
74
75impl<T: 'static, F: Fn(&T) + 'static> MessageHandler for TypedMessageHandler<T, F> {
76 fn id(&self) -> Ustr {
77 self.id
78 }
79
80 fn handle(&self, message: &dyn Any) {
81 if let Some(typed_msg) = message.downcast_ref::<T>() {
82 (self.callback)(typed_msg);
83 } else {
84 log::error!("Expected message of type {}", type_name::<T>());
85 }
86 }
87
88 fn as_any(&self) -> &dyn Any {
89 self
90 }
91}
92
93impl<F: Fn(&dyn Any) + 'static> TypedMessageHandler<dyn Any, F> {
94 pub fn new_any<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
96 let id_ustr = id
97 .map(|s| Ustr::from(s.as_ref()))
98 .unwrap_or_else(|| generate_handler_id(&callback));
99
100 Self {
101 id: id_ustr,
102 callback,
103 _phantom: PhantomData,
104 }
105 }
106
107 pub fn from_any<S: AsRef<str>>(id_opt: Option<S>, callback: F) -> Self {
109 Self::new_any(id_opt, callback)
110 }
111
112 pub fn with_any(callback: F) -> Self {
114 Self::new_any::<&str>(None, callback)
115 }
116}
117
118impl<F: Fn(&dyn Any) + 'static> MessageHandler for TypedMessageHandler<dyn Any, F> {
119 fn id(&self) -> Ustr {
120 self.id
121 }
122
123 fn handle(&self, message: &dyn Any) {
124 (self.callback)(message);
125 }
126
127 fn as_any(&self) -> &dyn Any {
128 self
129 }
130}
131
132fn generate_handler_id<T: 'static + ?Sized, F: 'static + Fn(&T)>(callback: &F) -> Ustr {
133 let callback_ptr = std::ptr::from_ref(callback);
134 let uuid = UUID4::new();
135 Ustr::from(&format!("<{callback_ptr:?}>-{uuid}"))
136}
137
138#[repr(transparent)]
139#[derive(Clone)]
140pub struct ShareableMessageHandler(pub Rc<dyn MessageHandler>);
141
142impl ShareableMessageHandler {
143 pub fn id(&self) -> Ustr {
144 self.0.id()
145 }
146}
147
148impl Debug for ShareableMessageHandler {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct(stringify!(ShareableMessageHandler))
151 .field("id", &self.0.id())
152 .field("type", &std::any::type_name::<Self>().to_string())
153 .finish()
154 }
155}
156
157impl From<Rc<dyn MessageHandler>> for ShareableMessageHandler {
158 fn from(value: Rc<dyn MessageHandler>) -> Self {
159 Self(value)
160 }
161}
162
163#[allow(unsafe_code)]
165unsafe impl Send for ShareableMessageHandler {}