nautilus_common/msgbus/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `MessageBus` supporting multiple messaging patterns:
17//!
18//! - Point-to-Point
19//! - Pub/Sub
20//! - Request/Response
21
22pub mod core;
23pub mod database;
24pub mod handler;
25pub mod listener;
26pub mod matching;
27pub mod message;
28pub mod stubs;
29pub mod switchboard;
30
31#[cfg(test)]
32mod tests;
33
34pub use core::MessageBus;
35use core::{Endpoint, Subscription};
36use std::{
37    self,
38    any::Any,
39    cell::{OnceCell, RefCell},
40    rc::Rc,
41};
42
43use handler::ShareableMessageHandler;
44use matching::is_matching_backtracking;
45use nautilus_core::UUID4;
46use nautilus_model::data::Data;
47use ustr::Ustr;
48
49use crate::messages::data::DataResponse;
50// Re-exports
51pub use crate::msgbus::core::{MStr, Pattern, Topic};
52pub use crate::msgbus::message::BusMessage;
53
54// Thread-local storage for MessageBus instances. Each thread (including async runtimes)
55// gets its own MessageBus instance, eliminating the need for unsafe Send/Sync implementations
56// while maintaining the global singleton access pattern that the framework expects.
57thread_local! {
58    static MESSAGE_BUS: OnceCell<Rc<RefCell<MessageBus>>> = const { OnceCell::new() };
59}
60
61/// Sets the thread-local message bus.
62///
63/// # Panics
64///
65/// Panics if a message bus has already been set for this thread.
66pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
67    MESSAGE_BUS.with(|bus| {
68        if bus.set(msgbus).is_err() {
69            panic!("Failed to set MessageBus: already initialized for this thread");
70        }
71    });
72}
73
74/// Gets the thread-local message bus.
75///
76/// If no message bus has been set for this thread, a default one is created and initialized.
77/// This ensures each thread gets its own MessageBus instance, preventing data races while
78/// maintaining the singleton pattern that the codebase expects.
79pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
80    MESSAGE_BUS.with(|bus| {
81        bus.get_or_init(|| {
82            let msgbus = MessageBus::default();
83            Rc::new(RefCell::new(msgbus))
84        })
85        .clone()
86    })
87}
88
89/// Sends the `message` to the `endpoint`.
90pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
91    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
92    if let Some(handler) = handler {
93        handler.0.handle(message);
94    } else {
95        log::error!("send_any: no registered endpoint '{endpoint}'")
96    }
97}
98
99/// Sends the `message` to the `endpoint`.
100pub fn send<T: 'static>(endpoint: MStr<Endpoint>, message: T) {
101    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
102    if let Some(handler) = handler {
103        handler.0.handle(&message);
104    } else {
105        log::error!("send: no registered endpoint '{endpoint}'")
106    }
107}
108
109/// Sends the [`DataResponse`] to the registered correlation ID handler.
110pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
111    let handler = get_message_bus()
112        .borrow()
113        .get_response_handler(correlation_id)
114        .cloned();
115
116    if let Some(handler) = handler {
117        handler.0.handle(message);
118    } else {
119        log::error!("send_response: handler not found for correlation_id '{correlation_id}'")
120    }
121}
122
123/// Publish [`Data`] to a topic.
124pub fn publish_data(topic: &Ustr, message: Data) {
125    let matching_subs = get_message_bus().borrow_mut().matching_subscriptions(topic);
126
127    for sub in matching_subs {
128        sub.handler.0.handle(&message);
129    }
130}
131
132/// Sends the response to the handler registered for the `correlation_id` (if found).
133pub fn response(correlation_id: &UUID4, message: &dyn Any) {
134    let handler = get_message_bus()
135        .borrow()
136        .get_response_handler(correlation_id)
137        .cloned();
138    if let Some(handler) = handler {
139        handler.0.handle(message);
140    } else {
141        log::error!("response: handler not found for correlation_id '{correlation_id}'")
142    }
143}
144
145pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
146    if let Err(e) = get_message_bus()
147        .borrow_mut()
148        .register_response_handler(correlation_id, handler)
149    {
150        log::error!("Failed to register request handler: {e}");
151    }
152}
153
154/// Publishes the `message` to the `topic`.
155pub fn publish(topic: MStr<Topic>, message: &dyn Any) {
156    log::trace!("Publishing topic '{topic}' {message:?}");
157    let matching_subs = get_message_bus()
158        .borrow_mut()
159        .inner_matching_subscriptions(topic);
160
161    log::trace!("Matched {} subscriptions", matching_subs.len());
162
163    for sub in matching_subs {
164        log::trace!("Matched {sub:?}");
165        sub.handler.0.handle(message);
166    }
167}
168
169/// Registers the `handler` for the `endpoint` address.
170pub fn register(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
171    log::debug!(
172        "Registering endpoint '{endpoint}' with handler ID {}",
173        handler.0.id(),
174    );
175
176    // Updates value if key already exists
177    get_message_bus()
178        .borrow_mut()
179        .endpoints
180        .insert(endpoint, handler);
181}
182
183/// Deregisters the handler for the `endpoint` address.
184pub fn deregister(endpoint: MStr<Endpoint>) {
185    log::debug!("Deregistering endpoint '{endpoint}'");
186
187    // Removes entry if it exists for endpoint
188    get_message_bus()
189        .borrow_mut()
190        .endpoints
191        .shift_remove(&endpoint);
192}
193
194/// Subscribes the `handler` to the `pattern` with an optional `priority`.
195///
196/// # Warnings
197///
198/// Assigning priority handling is an advanced feature which *shouldn't
199/// normally be needed by most users*. **Only assign a higher priority to the
200/// subscription if you are certain of what you're doing**. If an inappropriate
201/// priority is assigned then the handler may receive messages before core
202/// system components have been able to process necessary calculations and
203/// produce potential side effects for logically sound behavior.
204pub fn subscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler, priority: Option<u8>) {
205    let msgbus = get_message_bus();
206    let mut msgbus_ref_mut = msgbus.borrow_mut();
207    let sub = core::Subscription::new(pattern, handler, priority);
208
209    log::debug!(
210        "Subscribing {:?} for pattern '{}'",
211        sub.handler,
212        sub.pattern
213    );
214
215    if msgbus_ref_mut.subscriptions.contains(&sub) {
216        log::warn!("{sub:?} already exists");
217        return;
218    }
219
220    // Find existing patterns which match this topic
221    for (topic, subs) in msgbus_ref_mut.topics.iter_mut() {
222        if is_matching_backtracking(*topic, sub.pattern) {
223            // TODO: Consider binary_search and then insert
224            subs.push(sub.clone());
225            subs.sort();
226            log::debug!("Added subscription for '{topic}'");
227        }
228    }
229
230    msgbus_ref_mut.subscriptions.insert(sub);
231}
232
233pub fn subscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler, priority: Option<u8>) {
234    subscribe(topic.into(), handler, priority);
235}
236
237pub fn subscribe_str<T: AsRef<str>>(
238    pattern: T,
239    handler: ShareableMessageHandler,
240    priority: Option<u8>,
241) {
242    subscribe(MStr::from(pattern), handler, priority);
243}
244
245/// Unsubscribes the `handler` from the `pattern`.
246pub fn unsubscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
247    log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
248
249    let sub = core::Subscription::new(pattern, handler, None);
250
251    get_message_bus()
252        .borrow_mut()
253        .topics
254        .values_mut()
255        .for_each(|subs| {
256            if let Ok(index) = subs.binary_search(&sub) {
257                subs.remove(index);
258            }
259        });
260
261    let removed = get_message_bus().borrow_mut().subscriptions.remove(&sub);
262
263    if removed {
264        log::debug!("Handler for pattern '{pattern}' was removed");
265    } else {
266        log::debug!("No matching handler for pattern '{pattern}' was found");
267    }
268}
269
270pub fn unsubscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler) {
271    unsubscribe(topic.into(), handler);
272}
273
274pub fn unsubscribe_str<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) {
275    unsubscribe(MStr::from(pattern), handler);
276}
277
278pub fn is_subscribed<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
279    let pattern = MStr::from(pattern.as_ref());
280    let sub = Subscription::new(pattern, handler, None);
281    get_message_bus().borrow().subscriptions.contains(&sub)
282}
283
284pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
285    get_message_bus().borrow().subscriptions_count(topic)
286}