1use std::{
17 cell::RefCell,
18 collections::HashMap,
19 fmt::{self, Display},
20 hash::{Hash, Hasher},
21 ops::Deref,
22 rc::Rc,
23};
24
25use ahash::{AHashMap, AHashSet};
26use handler::ShareableMessageHandler;
27use indexmap::IndexMap;
28use matching::is_matching_backtracking;
29use nautilus_core::{
30 UUID4,
31 correctness::{FAILED, check_predicate_true, check_valid_string},
32};
33use nautilus_model::identifiers::TraderId;
34use switchboard::MessagingSwitchboard;
35use ustr::Ustr;
36
37use super::{handler, matching, set_message_bus, switchboard};
38
39#[inline(always)]
40fn check_fully_qualified_string(value: &Ustr, key: &str) -> anyhow::Result<()> {
41 check_predicate_true(
42 !value.chars().any(|c| c == '*' || c == '?'),
43 &format!("{key} `value` contained invalid characters, was {value}"),
44 )
45}
46
47#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
50pub struct Pattern;
51
52#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
54pub struct Topic;
55
56#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
58pub struct Endpoint;
59
60#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
62pub struct MStr<T> {
63 value: Ustr,
64 _marker: std::marker::PhantomData<T>,
65}
66
67impl<T> Display for MStr<T> {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 write!(f, "{}", self.value)
70 }
71}
72
73impl<T> Deref for MStr<T> {
74 type Target = Ustr;
75
76 fn deref(&self) -> &Self::Target {
77 &self.value
78 }
79}
80
81impl MStr<Pattern> {
82 pub fn pattern<T: AsRef<str>>(value: T) -> Self {
84 let value = Ustr::from(value.as_ref());
85
86 Self {
87 value,
88 _marker: std::marker::PhantomData,
89 }
90 }
91}
92
93impl<T: AsRef<str>> From<T> for MStr<Pattern> {
94 fn from(value: T) -> Self {
95 Self::pattern(value)
96 }
97}
98
99impl From<MStr<Topic>> for MStr<Pattern> {
100 fn from(value: MStr<Topic>) -> Self {
101 Self {
102 value: value.value,
103 _marker: std::marker::PhantomData,
104 }
105 }
106}
107
108impl MStr<Topic> {
109 pub fn topic<T: AsRef<str>>(value: T) -> anyhow::Result<Self> {
115 let topic = Ustr::from(value.as_ref());
116 check_valid_string(value, stringify!(value))?;
117 check_fully_qualified_string(&topic, stringify!(Topic))?;
118
119 Ok(Self {
120 value: topic,
121 _marker: std::marker::PhantomData,
122 })
123 }
124}
125
126impl<T: AsRef<str>> From<T> for MStr<Topic> {
127 fn from(value: T) -> Self {
128 Self::topic(value).expect(FAILED)
129 }
130}
131
132impl MStr<Endpoint> {
133 pub fn endpoint<T: AsRef<str>>(value: T) -> anyhow::Result<Self> {
139 let endpoint = Ustr::from(value.as_ref());
140 check_valid_string(value, stringify!(value))?;
141 check_fully_qualified_string(&endpoint, stringify!(Endpoint))?;
142
143 Ok(Self {
144 value: endpoint,
145 _marker: std::marker::PhantomData,
146 })
147 }
148}
149
150impl<T: AsRef<str>> From<T> for MStr<Endpoint> {
151 fn from(value: T) -> Self {
152 Self::endpoint(value).expect(FAILED)
153 }
154}
155
156#[derive(Clone, Debug)]
162pub struct Subscription {
163 pub handler: ShareableMessageHandler,
165 pub handler_id: Ustr,
167 pub pattern: MStr<Pattern>,
169 pub priority: u8,
173}
174
175impl Subscription {
176 #[must_use]
178 pub fn new(
179 pattern: MStr<Pattern>,
180 handler: ShareableMessageHandler,
181 priority: Option<u8>,
182 ) -> Self {
183 Self {
184 handler_id: handler.0.id(),
185 pattern,
186 handler,
187 priority: priority.unwrap_or(0),
188 }
189 }
190}
191
192impl PartialEq<Self> for Subscription {
193 fn eq(&self, other: &Self) -> bool {
194 self.pattern == other.pattern && self.handler_id == other.handler_id
195 }
196}
197
198impl Eq for Subscription {}
199
200impl PartialOrd for Subscription {
201 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
202 Some(self.cmp(other))
203 }
204}
205
206impl Ord for Subscription {
207 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
208 other
209 .priority
210 .cmp(&self.priority)
211 .then_with(|| self.pattern.cmp(&other.pattern))
212 .then_with(|| self.handler_id.cmp(&other.handler_id))
213 }
214}
215
216impl Hash for Subscription {
217 fn hash<H: Hasher>(&self, state: &mut H) {
218 self.pattern.hash(state);
219 self.handler_id.hash(state);
220 }
221}
222
223#[cfg_attr(
244 feature = "python",
245 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
246)]
247#[derive(Debug)]
248pub struct MessageBus {
249 pub trader_id: TraderId,
251 pub instance_id: UUID4,
253 pub name: String,
255 pub has_backing: bool,
257 pub switchboard: MessagingSwitchboard,
259 pub subscriptions: AHashSet<Subscription>,
261 pub topics: IndexMap<MStr<Topic>, Vec<Subscription>>,
264 pub endpoints: IndexMap<MStr<Endpoint>, ShareableMessageHandler>,
266 pub correlation_index: AHashMap<UUID4, ShareableMessageHandler>,
268}
269
270#[allow(unsafe_code)]
272unsafe impl Send for MessageBus {}
273
274#[allow(unsafe_code)]
275unsafe impl Sync for MessageBus {}
276
277impl MessageBus {
278 #[must_use]
280 pub fn new(
281 trader_id: TraderId,
282 instance_id: UUID4,
283 name: Option<String>,
284 _config: Option<HashMap<String, serde_json::Value>>,
285 ) -> Self {
286 Self {
287 trader_id,
288 instance_id,
289 name: name.unwrap_or(stringify!(MessageBus).to_owned()),
290 switchboard: MessagingSwitchboard::default(),
291 subscriptions: AHashSet::new(),
292 topics: IndexMap::new(),
293 endpoints: IndexMap::new(),
294 correlation_index: AHashMap::new(),
295 has_backing: false,
296 }
297 }
298
299 #[must_use]
301 pub fn memory_address(&self) -> String {
302 format!("{:?}", std::ptr::from_ref(self))
303 }
304
305 #[must_use]
307 pub fn endpoints(&self) -> Vec<&str> {
308 self.endpoints.iter().map(|e| e.0.as_str()).collect()
309 }
310
311 #[must_use]
313 pub fn patterns(&self) -> Vec<&str> {
314 self.subscriptions
315 .iter()
316 .map(|s| s.pattern.as_str())
317 .collect()
318 }
319
320 pub fn has_subscribers<T: AsRef<str>>(&self, topic: T) -> bool {
322 self.subscriptions_count(topic) > 0
323 }
324
325 #[must_use]
331 pub fn subscriptions_count<T: AsRef<str>>(&self, topic: T) -> usize {
332 let topic = MStr::<Topic>::topic(topic).expect(FAILED);
333 self.topics
334 .get(&topic)
335 .map(|subs| subs.len())
336 .unwrap_or_else(|| self.find_topic_matches(topic).len())
337 }
338
339 #[must_use]
341 pub fn subscriptions(&self) -> Vec<&Subscription> {
342 self.subscriptions.iter().collect()
343 }
344
345 #[must_use]
347 pub fn subscription_handler_ids(&self) -> Vec<&str> {
348 self.subscriptions
349 .iter()
350 .map(|s| s.handler_id.as_str())
351 .collect()
352 }
353
354 #[must_use]
360 pub fn is_registered<T: AsRef<str>>(&self, endpoint: T) -> bool {
361 let endpoint: MStr<Endpoint> = endpoint.into();
362 self.endpoints.contains_key(&endpoint)
363 }
364
365 #[must_use]
367 pub fn is_subscribed<T: AsRef<str>>(
368 &self,
369 pattern: T,
370 handler: ShareableMessageHandler,
371 ) -> bool {
372 let pattern = MStr::<Pattern>::pattern(pattern);
373 let sub = Subscription::new(pattern, handler, None);
374 self.subscriptions.contains(&sub)
375 }
376
377 pub const fn close(&self) -> anyhow::Result<()> {
383 Ok(())
385 }
386
387 #[must_use]
389 pub fn get_endpoint(&self, endpoint: MStr<Endpoint>) -> Option<&ShareableMessageHandler> {
390 self.endpoints.get(&endpoint)
391 }
392
393 #[must_use]
395 pub fn get_response_handler(&self, correlation_id: &UUID4) -> Option<&ShareableMessageHandler> {
396 self.correlation_index.get(correlation_id)
397 }
398
399 pub(crate) fn find_topic_matches(&self, topic: MStr<Topic>) -> Vec<Subscription> {
401 self.subscriptions
402 .iter()
403 .filter_map(|sub| {
404 if is_matching_backtracking(topic, sub.pattern) {
405 Some(sub.clone())
406 } else {
407 None
408 }
409 })
410 .collect()
411 }
412
413 #[must_use]
416 pub fn matching_subscriptions<T: AsRef<str>>(&mut self, topic: T) -> Vec<Subscription> {
417 let topic = MStr::<Topic>::from(topic);
418 self.inner_matching_subscriptions(topic)
419 }
420
421 pub(crate) fn inner_matching_subscriptions(&mut self, topic: MStr<Topic>) -> Vec<Subscription> {
422 self.topics.get(&topic).cloned().unwrap_or_else(|| {
423 let mut matches = self.find_topic_matches(topic);
424 matches.sort();
425 self.topics.insert(topic, matches.clone());
426 matches
427 })
428 }
429
430 pub fn register_response_handler(
436 &mut self,
437 correlation_id: &UUID4,
438 handler: ShareableMessageHandler,
439 ) -> anyhow::Result<()> {
440 if self.correlation_index.contains_key(correlation_id) {
441 anyhow::bail!("Correlation ID <{correlation_id}> already has a registered handler");
442 }
443
444 self.correlation_index.insert(*correlation_id, handler);
445
446 Ok(())
447 }
448}
449
450impl MessageBus {
452 pub fn register_message_bus(self) -> Rc<RefCell<MessageBus>> {
469 let msgbus = Rc::new(RefCell::new(self));
470 set_message_bus(msgbus.clone());
471 msgbus
472 }
473}
474
475impl Default for MessageBus {
476 fn default() -> Self {
478 Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
479 }
480}