nautilus_common/msgbus/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `MessageBus` supporting multiple messaging patterns:
17//!
18//! - Point-to-Point
19//! - Pub/Sub
20//! - Request/Response
21
22pub mod core;
23pub mod database;
24pub mod handler;
25pub mod listener;
26pub mod matching;
27pub mod message;
28pub mod stubs;
29pub mod switchboard;
30
31#[cfg(test)]
32mod tests;
33
34pub use core::MessageBus;
35use core::{Endpoint, Subscription};
36use std::{self, any::Any, cell::RefCell, fmt::Debug, rc::Rc, sync::OnceLock};
37
38use handler::ShareableMessageHandler;
39use matching::is_matching_backtracking;
40use nautilus_core::UUID4;
41use nautilus_model::data::Data;
42use ustr::Ustr;
43
44use crate::messages::data::DataResponse;
45// Re-exports
46pub use crate::msgbus::core::{MStr, Pattern, Topic};
47pub use crate::msgbus::message::BusMessage;
48
49#[derive(Debug)]
50pub struct ShareableMessageBus(Rc<RefCell<MessageBus>>);
51
52// SAFETY: Cannot be sent across thread boundaries
53#[allow(unsafe_code)]
54unsafe impl Send for ShareableMessageBus {}
55#[allow(unsafe_code)]
56unsafe impl Sync for ShareableMessageBus {}
57
58static MESSAGE_BUS: OnceLock<ShareableMessageBus> = OnceLock::new();
59
60/// Sets the global message bus.
61///
62/// # Panics
63///
64/// Panics if a message bus has already been set.
65pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
66    if MESSAGE_BUS.set(ShareableMessageBus(msgbus)).is_err() {
67        panic!("Failed to set MessageBus");
68    }
69}
70
71/// Gets the global message bus.
72///
73/// # Panics
74///
75/// Panics if the global message bus is uninitialized.
76pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
77    if MESSAGE_BUS.get().is_none() {
78        // Initialize default message bus
79        let msgbus = MessageBus::default();
80        let msgbus = Rc::new(RefCell::new(msgbus));
81        let _ = MESSAGE_BUS.set(ShareableMessageBus(msgbus.clone()));
82        msgbus
83    } else {
84        MESSAGE_BUS.get().unwrap().0.clone()
85    }
86}
87
88/// Sends the `message` to the `endpoint`.
89pub fn send(endpoint: MStr<Endpoint>, message: &dyn Any) {
90    // TODO: This should return a Result (in case endpoint doesn't exist)
91    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
92    if let Some(handler) = handler {
93        handler.0.handle(message);
94    }
95}
96
97/// Sends the [`DataResponse`] to the registered correlation ID handler.
98pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
99    let handler = get_message_bus()
100        .borrow()
101        .get_response_handler(correlation_id)
102        .cloned();
103
104    if let Some(handler) = handler {
105        handler.0.handle(message);
106    }
107}
108
109/// Publish [`Data`] to a topic.
110pub fn publish_data(topic: &Ustr, message: Data) {
111    let matching_subs = get_message_bus().borrow_mut().matching_subscriptions(topic);
112
113    for sub in matching_subs {
114        sub.handler.0.handle(&message);
115    }
116}
117
118/// Sends the response to the handler registered for the `correlation_id` (if found).
119pub fn response(correlation_id: &UUID4, message: &dyn Any) {
120    let handler = get_message_bus()
121        .borrow()
122        .get_response_handler(correlation_id)
123        .cloned();
124    if let Some(handler) = handler {
125        handler.0.handle(message);
126    } else {
127        log::error!(
128            "Failed to handle response: handler not found for correlation_id {correlation_id}"
129        )
130    }
131}
132
133pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
134    if let Err(e) = get_message_bus()
135        .borrow_mut()
136        .register_response_handler(correlation_id, handler)
137    {
138        log::error!("Failed to register request handler: {e}");
139    }
140}
141
142/// Publishes the `message` to the `topic`.
143pub fn publish(topic: MStr<Topic>, message: &dyn Any) {
144    log::trace!("Publishing topic '{topic}' {message:?}");
145    let matching_subs = get_message_bus()
146        .borrow_mut()
147        .inner_matching_subscriptions(topic);
148
149    log::trace!("Matched {} subscriptions", matching_subs.len());
150
151    for sub in matching_subs {
152        log::trace!("Matched {sub:?}");
153        sub.handler.0.handle(message);
154    }
155}
156
157/// Registers the `handler` for the `endpoint` address.
158pub fn register(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
159    log::debug!(
160        "Registering endpoint '{endpoint}' with handler ID {}",
161        handler.0.id(),
162    );
163
164    // Updates value if key already exists
165    get_message_bus()
166        .borrow_mut()
167        .endpoints
168        .insert(endpoint, handler);
169}
170
171/// Deregisters the handler for the `endpoint` address.
172pub fn deregister(endpoint: MStr<Endpoint>) {
173    log::debug!("Deregistering endpoint '{endpoint}'");
174
175    // Removes entry if it exists for endpoint
176    get_message_bus()
177        .borrow_mut()
178        .endpoints
179        .shift_remove(&endpoint);
180}
181
182/// Subscribes the `handler` to the `pattern` with an optional `priority`.
183///
184/// # Warnings
185///
186/// Assigning priority handling is an advanced feature which *shouldn't
187/// normally be needed by most users*. **Only assign a higher priority to the
188/// subscription if you are certain of what you're doing**. If an inappropriate
189/// priority is assigned then the handler may receive messages before core
190/// system components have been able to process necessary calculations and
191/// produce potential side effects for logically sound behavior.
192pub fn subscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler, priority: Option<u8>) {
193    let msgbus = get_message_bus();
194    let mut msgbus_ref_mut = msgbus.borrow_mut();
195    let sub = core::Subscription::new(pattern, handler, priority);
196
197    log::debug!(
198        "Subscribing {:?} for pattern '{}'",
199        sub.handler,
200        sub.pattern
201    );
202
203    if msgbus_ref_mut.subscriptions.contains(&sub) {
204        log::warn!("{sub:?} already exists");
205        return;
206    }
207
208    // Find existing patterns which match this topic
209    for (topic, subs) in msgbus_ref_mut.topics.iter_mut() {
210        if is_matching_backtracking(*topic, sub.pattern) {
211            // TODO: Consider binary_search and then insert
212            subs.push(sub.clone());
213            subs.sort();
214            log::debug!("Added subscription for '{topic}'");
215        }
216    }
217
218    msgbus_ref_mut.subscriptions.insert(sub);
219}
220
221pub fn subscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler, priority: Option<u8>) {
222    subscribe(topic.into(), handler, priority);
223}
224
225pub fn subscribe_str<T: AsRef<str>>(
226    pattern: T,
227    handler: ShareableMessageHandler,
228    priority: Option<u8>,
229) {
230    subscribe(MStr::from(pattern), handler, priority);
231}
232
233/// Unsubscribes the `handler` from the `pattern`.
234pub fn unsubscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
235    log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
236
237    let sub = core::Subscription::new(pattern, handler, None);
238
239    get_message_bus()
240        .borrow_mut()
241        .topics
242        .values_mut()
243        .for_each(|subs| {
244            if let Ok(index) = subs.binary_search(&sub) {
245                subs.remove(index);
246            }
247        });
248
249    let removed = get_message_bus().borrow_mut().subscriptions.remove(&sub);
250
251    if removed {
252        log::debug!("Handler for pattern '{pattern}' was removed");
253    } else {
254        log::debug!("No matching handler for pattern '{pattern}' was found");
255    }
256}
257
258pub fn unsubscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler) {
259    unsubscribe(topic.into(), handler);
260}
261
262pub fn unsubscribe_str<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) {
263    unsubscribe(MStr::from(pattern), handler);
264}
265
266pub fn is_subscribed<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
267    let pattern = MStr::from(pattern.as_ref());
268    let sub = Subscription::new(pattern, handler, None);
269    get_message_bus().borrow().subscriptions.contains(&sub)
270}
271
272pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
273    get_message_bus().borrow().subscriptions_count(topic)
274}