nautilus_common/msgbus/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 2Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    collections::HashMap,
19    fmt::{self, Display},
20    hash::{Hash, Hasher},
21    ops::Deref,
22    rc::Rc,
23};
24
25use ahash::{AHashMap, AHashSet};
26use handler::ShareableMessageHandler;
27use indexmap::IndexMap;
28use matching::is_matching_backtracking;
29use nautilus_core::{
30    UUID4,
31    correctness::{FAILED, check_predicate_true, check_valid_string},
32};
33use nautilus_model::identifiers::TraderId;
34use switchboard::MessagingSwitchboard;
35use ustr::Ustr;
36
37use super::{handler, matching, set_message_bus, switchboard};
38
39#[inline(always)]
40fn check_fully_qualified_string(value: &Ustr, key: &str) -> anyhow::Result<()> {
41    check_predicate_true(
42        !value.chars().any(|c| c == '*' || c == '?'),
43        &format!("{key} `value` contained invalid characters, was {value}"),
44    )
45}
46
47/// Pattern is a string pattern for a subscription with special characters
48/// for pattern matching.
49#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
50pub struct Pattern;
51
52/// Topic is a fully qualified string for publishing data.
53#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
54pub struct Topic;
55
56/// Endpoint is a fully qualified string for sending data.
57#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
58pub struct Endpoint;
59
60/// A message bus string type. It can be a pattern or a topic.
61#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
62pub struct MStr<T> {
63    value: Ustr,
64    _marker: std::marker::PhantomData<T>,
65}
66
67impl<T> Display for MStr<T> {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        write!(f, "{}", self.value)
70    }
71}
72
73impl<T> Deref for MStr<T> {
74    type Target = Ustr;
75
76    fn deref(&self) -> &Self::Target {
77        &self.value
78    }
79}
80
81impl MStr<Pattern> {
82    /// Create a new pattern from a string.
83    pub fn pattern<T: AsRef<str>>(value: T) -> Self {
84        let value = Ustr::from(value.as_ref());
85
86        Self {
87            value,
88            _marker: std::marker::PhantomData,
89        }
90    }
91}
92
93impl<T: AsRef<str>> From<T> for MStr<Pattern> {
94    fn from(value: T) -> Self {
95        Self::pattern(value)
96    }
97}
98
99impl From<MStr<Topic>> for MStr<Pattern> {
100    fn from(value: MStr<Topic>) -> Self {
101        Self {
102            value: value.value,
103            _marker: std::marker::PhantomData,
104        }
105    }
106}
107
108impl MStr<Topic> {
109    /// Create a new topic from a fully qualified string.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the topic has white space or invalid characters.
114    pub fn topic<T: AsRef<str>>(value: T) -> anyhow::Result<Self> {
115        let topic = Ustr::from(value.as_ref());
116        check_valid_string(value, stringify!(value))?;
117        check_fully_qualified_string(&topic, stringify!(Topic))?;
118
119        Ok(Self {
120            value: topic,
121            _marker: std::marker::PhantomData,
122        })
123    }
124}
125
126impl<T: AsRef<str>> From<T> for MStr<Topic> {
127    fn from(value: T) -> Self {
128        Self::topic(value).expect(FAILED)
129    }
130}
131
132impl MStr<Endpoint> {
133    /// Create a new endpoint from a fully qualified string.
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if the endpoint has white space or invalid characters.
138    pub fn endpoint<T: AsRef<str>>(value: T) -> anyhow::Result<Self> {
139        let endpoint = Ustr::from(value.as_ref());
140        check_valid_string(value, stringify!(value))?;
141        check_fully_qualified_string(&endpoint, stringify!(Endpoint))?;
142
143        Ok(Self {
144            value: endpoint,
145            _marker: std::marker::PhantomData,
146        })
147    }
148}
149
150impl<T: AsRef<str>> From<T> for MStr<Endpoint> {
151    fn from(value: T) -> Self {
152        Self::endpoint(value).expect(FAILED)
153    }
154}
155
156/// Represents a subscription to a particular topic.
157///
158/// This is an internal class intended to be used by the message bus to organize
159/// topics and their subscribers.
160///
161#[derive(Clone, Debug)]
162pub struct Subscription {
163    /// The shareable message handler for the subscription.
164    pub handler: ShareableMessageHandler,
165    /// Store a copy of the handler ID for faster equality checks.
166    pub handler_id: Ustr,
167    /// The pattern for the subscription.
168    pub pattern: MStr<Pattern>,
169    /// The priority for the subscription determines the ordering of handlers receiving
170    /// messages being processed, higher priority handlers will receive messages before
171    /// lower priority handlers.
172    pub priority: u8,
173}
174
175impl Subscription {
176    /// Creates a new [`Subscription`] instance.
177    #[must_use]
178    pub fn new(
179        pattern: MStr<Pattern>,
180        handler: ShareableMessageHandler,
181        priority: Option<u8>,
182    ) -> Self {
183        Self {
184            handler_id: handler.0.id(),
185            pattern,
186            handler,
187            priority: priority.unwrap_or(0),
188        }
189    }
190}
191
192impl PartialEq<Self> for Subscription {
193    fn eq(&self, other: &Self) -> bool {
194        self.pattern == other.pattern && self.handler_id == other.handler_id
195    }
196}
197
198impl Eq for Subscription {}
199
200impl PartialOrd for Subscription {
201    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
202        Some(self.cmp(other))
203    }
204}
205
206impl Ord for Subscription {
207    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
208        other
209            .priority
210            .cmp(&self.priority)
211            .then_with(|| self.pattern.cmp(&other.pattern))
212            .then_with(|| self.handler_id.cmp(&other.handler_id))
213    }
214}
215
216impl Hash for Subscription {
217    fn hash<H: Hasher>(&self, state: &mut H) {
218        self.pattern.hash(state);
219        self.handler_id.hash(state);
220    }
221}
222
223/// A generic message bus to facilitate various messaging patterns.
224///
225/// The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
226/// well as direct point-to-point messaging to registered endpoints.
227///
228/// Pub/Sub wildcard patterns for hierarchical topics are possible:
229///  - `*` asterisk represents one or more characters in a pattern.
230///  - `?` question mark represents a single character in a pattern.
231///
232/// Given a topic and pattern potentially containing wildcard characters, i.e.
233/// `*` and `?`, where `?` can match any single character in the topic, and `*`
234/// can match any number of characters including zero characters.
235///
236/// The asterisk in a wildcard matches any character zero or more times. For
237/// example, `comp*` matches anything beginning with `comp` which means `comp`,
238/// `complete`, and `computer` are all matched.
239///
240/// A question mark matches a single character once. For example, `c?mp` matches
241/// `camp` and `comp`. The question mark can also be used more than once.
242/// For example, `c??p` would match both of the above examples and `coop`.
243#[cfg_attr(
244    feature = "python",
245    pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
246)]
247#[derive(Debug)]
248pub struct MessageBus {
249    /// The trader ID associated with the message bus.
250    pub trader_id: TraderId,
251    /// The instance ID associated with the message bus.
252    pub instance_id: UUID4,
253    /// The name for the message bus.
254    pub name: String,
255    /// If the message bus is backed by a database.
256    pub has_backing: bool,
257    /// The switchboard for built-in endpoints.
258    pub switchboard: MessagingSwitchboard,
259    /// Active subscriptions.
260    pub subscriptions: AHashSet<Subscription>,
261    /// Maps a topic to all the handlers registered for it
262    /// this is updated whenever a new subscription is created.
263    pub topics: IndexMap<MStr<Topic>, Vec<Subscription>>,
264    /// Index of endpoint addresses and their handlers.
265    pub endpoints: IndexMap<MStr<Endpoint>, ShareableMessageHandler>,
266    /// Index of request correlation IDs and their response handlers.
267    pub correlation_index: AHashMap<UUID4, ShareableMessageHandler>,
268}
269
270// SAFETY: Message bus is not meant to be passed between threads
271#[allow(unsafe_code)]
272unsafe impl Send for MessageBus {}
273
274#[allow(unsafe_code)]
275unsafe impl Sync for MessageBus {}
276
277impl MessageBus {
278    /// Creates a new [`MessageBus`] instance.
279    #[must_use]
280    pub fn new(
281        trader_id: TraderId,
282        instance_id: UUID4,
283        name: Option<String>,
284        _config: Option<HashMap<String, serde_json::Value>>,
285    ) -> Self {
286        Self {
287            trader_id,
288            instance_id,
289            name: name.unwrap_or(stringify!(MessageBus).to_owned()),
290            switchboard: MessagingSwitchboard::default(),
291            subscriptions: AHashSet::new(),
292            topics: IndexMap::new(),
293            endpoints: IndexMap::new(),
294            correlation_index: AHashMap::new(),
295            has_backing: false,
296        }
297    }
298
299    /// Returns the message bus instances memory address.
300    #[must_use]
301    pub fn memory_address(&self) -> String {
302        format!("{:?}", std::ptr::from_ref(self))
303    }
304
305    /// Returns the registered endpoint addresses.
306    #[must_use]
307    pub fn endpoints(&self) -> Vec<&str> {
308        self.endpoints.iter().map(|e| e.0.as_str()).collect()
309    }
310
311    /// Returns actively subscribed patterns.
312    #[must_use]
313    pub fn patterns(&self) -> Vec<&str> {
314        self.subscriptions
315            .iter()
316            .map(|s| s.pattern.as_str())
317            .collect()
318    }
319
320    /// Returns whether there are subscribers for the `topic`.
321    pub fn has_subscribers<T: AsRef<str>>(&self, topic: T) -> bool {
322        self.subscriptions_count(topic) > 0
323    }
324
325    /// Returns the count of subscribers for the `topic`.
326    ///
327    /// # Panics
328    ///
329    /// Returns an error if the topic is not valid.
330    #[must_use]
331    pub fn subscriptions_count<T: AsRef<str>>(&self, topic: T) -> usize {
332        let topic = MStr::<Topic>::topic(topic).expect(FAILED);
333        self.topics
334            .get(&topic)
335            .map(|subs| subs.len())
336            .unwrap_or_else(|| self.find_topic_matches(topic).len())
337    }
338
339    /// Returns active subscriptions.
340    #[must_use]
341    pub fn subscriptions(&self) -> Vec<&Subscription> {
342        self.subscriptions.iter().collect()
343    }
344
345    /// Returns the handler IDs for actively subscribed patterns.
346    #[must_use]
347    pub fn subscription_handler_ids(&self) -> Vec<&str> {
348        self.subscriptions
349            .iter()
350            .map(|s| s.handler_id.as_str())
351            .collect()
352    }
353
354    /// Returns whether the endpoint is registered.
355    ///
356    /// # Panics
357    ///
358    /// Returns an error if the endpoint is not valid topic string.
359    #[must_use]
360    pub fn is_registered<T: AsRef<str>>(&self, endpoint: T) -> bool {
361        let endpoint: MStr<Endpoint> = endpoint.into();
362        self.endpoints.contains_key(&endpoint)
363    }
364
365    /// Returns whether the `handler` is subscribed to the `pattern`.
366    #[must_use]
367    pub fn is_subscribed<T: AsRef<str>>(
368        &self,
369        pattern: T,
370        handler: ShareableMessageHandler,
371    ) -> bool {
372        let pattern = MStr::<Pattern>::pattern(pattern);
373        let sub = Subscription::new(pattern, handler, None);
374        self.subscriptions.contains(&sub)
375    }
376
377    /// Close the message bus which will close the sender channel and join the thread.
378    ///
379    /// # Errors
380    ///
381    /// This function never returns an error (TBD once backing database added).
382    pub const fn close(&self) -> anyhow::Result<()> {
383        // TODO: Integrate the backing database
384        Ok(())
385    }
386
387    /// Returns the handler for the `endpoint`.
388    #[must_use]
389    pub fn get_endpoint(&self, endpoint: MStr<Endpoint>) -> Option<&ShareableMessageHandler> {
390        self.endpoints.get(&endpoint)
391    }
392
393    /// Returns the handler for the `correlation_id`.
394    #[must_use]
395    pub fn get_response_handler(&self, correlation_id: &UUID4) -> Option<&ShareableMessageHandler> {
396        self.correlation_index.get(correlation_id)
397    }
398
399    /// Finds the subscriptions with pattern matching the `topic`.
400    pub(crate) fn find_topic_matches(&self, topic: MStr<Topic>) -> Vec<Subscription> {
401        self.subscriptions
402            .iter()
403            .filter_map(|sub| {
404                if is_matching_backtracking(topic, sub.pattern) {
405                    Some(sub.clone())
406                } else {
407                    None
408                }
409            })
410            .collect()
411    }
412
413    /// Finds the subscriptions which match the `topic` and caches the
414    /// results in the `patterns` map.
415    #[must_use]
416    pub fn matching_subscriptions<T: AsRef<str>>(&mut self, topic: T) -> Vec<Subscription> {
417        let topic = MStr::<Topic>::from(topic);
418        self.inner_matching_subscriptions(topic)
419    }
420
421    pub(crate) fn inner_matching_subscriptions(&mut self, topic: MStr<Topic>) -> Vec<Subscription> {
422        self.topics.get(&topic).cloned().unwrap_or_else(|| {
423            let mut matches = self.find_topic_matches(topic);
424            matches.sort();
425            self.topics.insert(topic, matches.clone());
426            matches
427        })
428    }
429
430    /// Register a response handler for a specific correlation ID.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if `handler` is already registered for the `correlation_id`.
435    pub fn register_response_handler(
436        &mut self,
437        correlation_id: &UUID4,
438        handler: ShareableMessageHandler,
439    ) -> anyhow::Result<()> {
440        if self.correlation_index.contains_key(correlation_id) {
441            anyhow::bail!("Correlation ID <{correlation_id}> already has a registered handler");
442        }
443
444        self.correlation_index.insert(*correlation_id, handler);
445
446        Ok(())
447    }
448}
449
450/// Data specific functions.
451impl MessageBus {
452    // /// Send a [`DataRequest`] to an endpoint that must be a data client implementation.
453    // pub fn send_data_request(&self, message: DataRequest) {
454    //     // TODO: log error
455    //     if let Some(client) = self.get_client(&message.client_id, message.venue) {
456    //         let _ = client.request(message);
457    //     }
458    // }
459    //
460    // /// Send a [`SubscriptionCommand`] to an endpoint that must be a data client implementation.
461    // pub fn send_subscription_command(&self, message: SubscriptionCommand) {
462    //     if let Some(client) = self.get_client(&message.client_id, message.venue) {
463    //         client.through_execute(message);
464    //     }
465    // }
466
467    /// Register message bus globally
468    pub fn register_message_bus(self) -> Rc<RefCell<MessageBus>> {
469        let msgbus = Rc::new(RefCell::new(self));
470        set_message_bus(msgbus.clone());
471        msgbus
472    }
473}
474
475impl Default for MessageBus {
476    /// Creates a new default [`MessageBus`] instance.
477    fn default() -> Self {
478        Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
479    }
480}