nautilus_common/msgbus/
mod.rs1pub mod core;
23pub mod database;
24pub mod handler;
25pub mod listener;
26pub mod matching;
27pub mod message;
28pub mod stubs;
29pub mod switchboard;
30
31#[cfg(test)]
32mod tests;
33
34pub use core::MessageBus;
35use core::{Endpoint, Subscription};
36use std::{
37 self,
38 any::Any,
39 cell::{OnceCell, RefCell},
40 rc::Rc,
41};
42
43use handler::ShareableMessageHandler;
44use matching::is_matching_backtracking;
45use nautilus_core::UUID4;
46use nautilus_model::data::Data;
47use ustr::Ustr;
48
49use crate::messages::data::DataResponse;
50pub use crate::msgbus::core::{MStr, Pattern, Topic};
52pub use crate::msgbus::message::BusMessage;
53
54thread_local! {
58 static MESSAGE_BUS: OnceCell<Rc<RefCell<MessageBus>>> = const { OnceCell::new() };
59}
60
61pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
67 MESSAGE_BUS.with(|bus| {
68 if bus.set(msgbus).is_err() {
69 panic!("Failed to set MessageBus: already initialized for this thread");
70 }
71 });
72}
73
74pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
80 MESSAGE_BUS.with(|bus| {
81 bus.get_or_init(|| {
82 let msgbus = MessageBus::default();
83 Rc::new(RefCell::new(msgbus))
84 })
85 .clone()
86 })
87}
88
89pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
91 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
92 if let Some(handler) = handler {
93 handler.0.handle(message);
94 } else {
95 log::error!("send_any: no registered endpoint '{endpoint}'")
96 }
97}
98
99pub fn send<T: 'static>(endpoint: MStr<Endpoint>, message: T) {
101 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
102 if let Some(handler) = handler {
103 handler.0.handle(&message);
104 } else {
105 log::error!("send: no registered endpoint '{endpoint}'")
106 }
107}
108
109pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
111 let handler = get_message_bus()
112 .borrow()
113 .get_response_handler(correlation_id)
114 .cloned();
115
116 if let Some(handler) = handler {
117 handler.0.handle(message);
118 } else {
119 log::error!("send_response: handler not found for correlation_id '{correlation_id}'")
120 }
121}
122
123pub fn publish_data(topic: &Ustr, message: Data) {
125 let matching_subs = get_message_bus().borrow_mut().matching_subscriptions(topic);
126
127 for sub in matching_subs {
128 sub.handler.0.handle(&message);
129 }
130}
131
132pub fn response(correlation_id: &UUID4, message: &dyn Any) {
134 let handler = get_message_bus()
135 .borrow()
136 .get_response_handler(correlation_id)
137 .cloned();
138 if let Some(handler) = handler {
139 handler.0.handle(message);
140 } else {
141 log::error!("response: handler not found for correlation_id '{correlation_id}'")
142 }
143}
144
145pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
146 if let Err(e) = get_message_bus()
147 .borrow_mut()
148 .register_response_handler(correlation_id, handler)
149 {
150 log::error!("Failed to register request handler: {e}");
151 }
152}
153
154pub fn publish(topic: MStr<Topic>, message: &dyn Any) {
156 log::trace!("Publishing topic '{topic}' {message:?}");
157 let matching_subs = get_message_bus()
158 .borrow_mut()
159 .inner_matching_subscriptions(topic);
160
161 log::trace!("Matched {} subscriptions", matching_subs.len());
162
163 for sub in matching_subs {
164 log::trace!("Matched {sub:?}");
165 sub.handler.0.handle(message);
166 }
167}
168
169pub fn register(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
171 log::debug!(
172 "Registering endpoint '{endpoint}' with handler ID {}",
173 handler.0.id(),
174 );
175
176 get_message_bus()
178 .borrow_mut()
179 .endpoints
180 .insert(endpoint, handler);
181}
182
183pub fn deregister(endpoint: MStr<Endpoint>) {
185 log::debug!("Deregistering endpoint '{endpoint}'");
186
187 get_message_bus()
189 .borrow_mut()
190 .endpoints
191 .shift_remove(&endpoint);
192}
193
194pub fn subscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler, priority: Option<u8>) {
205 let msgbus = get_message_bus();
206 let mut msgbus_ref_mut = msgbus.borrow_mut();
207 let sub = core::Subscription::new(pattern, handler, priority);
208
209 log::debug!(
210 "Subscribing {:?} for pattern '{}'",
211 sub.handler,
212 sub.pattern
213 );
214
215 if msgbus_ref_mut.subscriptions.contains(&sub) {
216 log::warn!("{sub:?} already exists");
217 return;
218 }
219
220 for (topic, subs) in msgbus_ref_mut.topics.iter_mut() {
222 if is_matching_backtracking(*topic, sub.pattern) {
223 subs.push(sub.clone());
225 subs.sort();
226 log::debug!("Added subscription for '{topic}'");
227 }
228 }
229
230 msgbus_ref_mut.subscriptions.insert(sub);
231}
232
233pub fn subscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler, priority: Option<u8>) {
234 subscribe(topic.into(), handler, priority);
235}
236
237pub fn subscribe_str<T: AsRef<str>>(
238 pattern: T,
239 handler: ShareableMessageHandler,
240 priority: Option<u8>,
241) {
242 subscribe(MStr::from(pattern), handler, priority);
243}
244
245pub fn unsubscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
247 log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
248
249 let sub = core::Subscription::new(pattern, handler, None);
250
251 get_message_bus()
252 .borrow_mut()
253 .topics
254 .values_mut()
255 .for_each(|subs| {
256 if let Ok(index) = subs.binary_search(&sub) {
257 subs.remove(index);
258 }
259 });
260
261 let removed = get_message_bus().borrow_mut().subscriptions.remove(&sub);
262
263 if removed {
264 log::debug!("Handler for pattern '{pattern}' was removed");
265 } else {
266 log::debug!("No matching handler for pattern '{pattern}' was found");
267 }
268}
269
270pub fn unsubscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler) {
271 unsubscribe(topic.into(), handler);
272}
273
274pub fn unsubscribe_str<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) {
275 unsubscribe(MStr::from(pattern), handler);
276}
277
278pub fn is_subscribed<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
279 let pattern = MStr::from(pattern.as_ref());
280 let sub = Subscription::new(pattern, handler, None);
281 get_message_bus().borrow().subscriptions.contains(&sub)
282}
283
284pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
285 get_message_bus().borrow().subscriptions_count(topic)
286}