nautilus_common/msgbus/
mod.rs1pub mod core;
23pub mod database;
24pub mod handler;
25pub mod listener;
26pub mod matching;
27pub mod message;
28pub mod stubs;
29pub mod switchboard;
30
31#[cfg(test)]
32mod tests;
33
34pub use core::MessageBus;
35use core::{Endpoint, Subscription};
36use std::{self, any::Any, cell::RefCell, fmt::Debug, rc::Rc, sync::OnceLock};
37
38use handler::ShareableMessageHandler;
39use matching::is_matching_backtracking;
40use nautilus_core::UUID4;
41use nautilus_model::data::Data;
42use ustr::Ustr;
43
44use crate::messages::data::DataResponse;
45pub use crate::msgbus::core::{MStr, Pattern, Topic};
47pub use crate::msgbus::message::BusMessage;
48
49#[derive(Debug)]
50pub struct ShareableMessageBus(Rc<RefCell<MessageBus>>);
51
52#[allow(unsafe_code)]
54unsafe impl Send for ShareableMessageBus {}
55#[allow(unsafe_code)]
56unsafe impl Sync for ShareableMessageBus {}
57
58static MESSAGE_BUS: OnceLock<ShareableMessageBus> = OnceLock::new();
59
60pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
66 if MESSAGE_BUS.set(ShareableMessageBus(msgbus)).is_err() {
67 panic!("Failed to set MessageBus");
68 }
69}
70
71pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
77 if MESSAGE_BUS.get().is_none() {
78 let msgbus = MessageBus::default();
80 let msgbus = Rc::new(RefCell::new(msgbus));
81 let _ = MESSAGE_BUS.set(ShareableMessageBus(msgbus.clone()));
82 msgbus
83 } else {
84 MESSAGE_BUS.get().unwrap().0.clone()
85 }
86}
87
88pub fn send(endpoint: MStr<Endpoint>, message: &dyn Any) {
90 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
92 if let Some(handler) = handler {
93 handler.0.handle(message);
94 }
95}
96
97pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
99 let handler = get_message_bus()
100 .borrow()
101 .get_response_handler(correlation_id)
102 .cloned();
103
104 if let Some(handler) = handler {
105 handler.0.handle(message);
106 }
107}
108
109pub fn publish_data(topic: &Ustr, message: Data) {
111 let matching_subs = get_message_bus().borrow_mut().matching_subscriptions(topic);
112
113 for sub in matching_subs {
114 sub.handler.0.handle(&message);
115 }
116}
117
118pub fn response(correlation_id: &UUID4, message: &dyn Any) {
120 let handler = get_message_bus()
121 .borrow()
122 .get_response_handler(correlation_id)
123 .cloned();
124 if let Some(handler) = handler {
125 handler.0.handle(message);
126 } else {
127 log::error!(
128 "Failed to handle response: handler not found for correlation_id {correlation_id}"
129 )
130 }
131}
132
133pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
134 if let Err(e) = get_message_bus()
135 .borrow_mut()
136 .register_response_handler(correlation_id, handler)
137 {
138 log::error!("Failed to register request handler: {e}");
139 }
140}
141
142pub fn publish(topic: MStr<Topic>, message: &dyn Any) {
144 log::trace!("Publishing topic '{topic}' {message:?}");
145 let matching_subs = get_message_bus()
146 .borrow_mut()
147 .inner_matching_subscriptions(topic);
148
149 log::trace!("Matched {} subscriptions", matching_subs.len());
150
151 for sub in matching_subs {
152 log::trace!("Matched {sub:?}");
153 sub.handler.0.handle(message);
154 }
155}
156
157pub fn register(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
159 log::debug!(
160 "Registering endpoint '{endpoint}' with handler ID {}",
161 handler.0.id(),
162 );
163
164 get_message_bus()
166 .borrow_mut()
167 .endpoints
168 .insert(endpoint, handler);
169}
170
171pub fn deregister(endpoint: MStr<Endpoint>) {
173 log::debug!("Deregistering endpoint '{endpoint}'");
174
175 get_message_bus()
177 .borrow_mut()
178 .endpoints
179 .shift_remove(&endpoint);
180}
181
182pub fn subscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler, priority: Option<u8>) {
193 let msgbus = get_message_bus();
194 let mut msgbus_ref_mut = msgbus.borrow_mut();
195 let sub = core::Subscription::new(pattern, handler, priority);
196
197 log::debug!(
198 "Subscribing {:?} for pattern '{}'",
199 sub.handler,
200 sub.pattern
201 );
202
203 if msgbus_ref_mut.subscriptions.contains(&sub) {
204 log::warn!("{sub:?} already exists");
205 return;
206 }
207
208 for (topic, subs) in msgbus_ref_mut.topics.iter_mut() {
210 if is_matching_backtracking(*topic, sub.pattern) {
211 subs.push(sub.clone());
213 subs.sort();
214 log::debug!("Added subscription for '{topic}'");
215 }
216 }
217
218 msgbus_ref_mut.subscriptions.insert(sub);
219}
220
221pub fn subscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler, priority: Option<u8>) {
222 subscribe(topic.into(), handler, priority);
223}
224
225pub fn subscribe_str<T: AsRef<str>>(
226 pattern: T,
227 handler: ShareableMessageHandler,
228 priority: Option<u8>,
229) {
230 subscribe(MStr::from(pattern), handler, priority);
231}
232
233pub fn unsubscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
235 log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
236
237 let sub = core::Subscription::new(pattern, handler, None);
238
239 get_message_bus()
240 .borrow_mut()
241 .topics
242 .values_mut()
243 .for_each(|subs| {
244 if let Ok(index) = subs.binary_search(&sub) {
245 subs.remove(index);
246 }
247 });
248
249 let removed = get_message_bus().borrow_mut().subscriptions.remove(&sub);
250
251 if removed {
252 log::debug!("Handler for pattern '{pattern}' was removed");
253 } else {
254 log::debug!("No matching handler for pattern '{pattern}' was found");
255 }
256}
257
258pub fn unsubscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler) {
259 unsubscribe(topic.into(), handler);
260}
261
262pub fn unsubscribe_str<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) {
263 unsubscribe(MStr::from(pattern), handler);
264}
265
266pub fn is_subscribed<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
267 let pattern = MStr::from(pattern.as_ref());
268 let sub = Subscription::new(pattern, handler, None);
269 get_message_bus().borrow().subscriptions.contains(&sub)
270}
271
272pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
273 get_message_bus().borrow().subscriptions_count(topic)
274}