1use ahash::AHashMap;
17#[cfg(feature = "defi")]
18use alloy_primitives::Address;
19#[cfg(feature = "defi")]
20use nautilus_model::defi::Blockchain;
21use nautilus_model::{
22 data::{BarType, DataType},
23 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
24};
25
26use super::core::{Endpoint, MStr, Topic};
27use crate::msgbus::get_message_bus;
28
29pub const CLOSE_TOPIC: &str = "CLOSE";
30
31#[must_use]
32pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
33 get_message_bus()
34 .borrow_mut()
35 .switchboard
36 .get_custom_topic(data_type)
37}
38
39#[must_use]
40pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
41 get_message_bus()
42 .borrow_mut()
43 .switchboard
44 .get_instruments_topic(venue)
45}
46
47#[must_use]
48pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
49 get_message_bus()
50 .borrow_mut()
51 .switchboard
52 .get_instrument_topic(instrument_id)
53}
54
55#[must_use]
56pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
57 get_message_bus()
58 .borrow_mut()
59 .switchboard
60 .get_book_deltas_topic(instrument_id)
61}
62
63#[must_use]
64pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
65 get_message_bus()
66 .borrow_mut()
67 .switchboard
68 .get_book_depth10_topic(instrument_id)
69}
70
71#[must_use]
72pub fn get_book_snapshots_topic(instrument_id: InstrumentId) -> MStr<Topic> {
73 get_message_bus()
74 .borrow_mut()
75 .switchboard
76 .get_book_snapshots_topic(instrument_id)
77}
78
79#[must_use]
80pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
81 get_message_bus()
82 .borrow_mut()
83 .switchboard
84 .get_quotes_topic(instrument_id)
85}
86
87#[must_use]
88pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
89 get_message_bus()
90 .borrow_mut()
91 .switchboard
92 .get_trades_topic(instrument_id)
93}
94
95#[must_use]
96pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
97 get_message_bus()
98 .borrow_mut()
99 .switchboard
100 .get_bars_topic(bar_type)
101}
102
103#[must_use]
104pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
105 get_message_bus()
106 .borrow_mut()
107 .switchboard
108 .get_mark_price_topic(instrument_id)
109}
110
111#[must_use]
112pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
113 get_message_bus()
114 .borrow_mut()
115 .switchboard
116 .get_index_price_topic(instrument_id)
117}
118
119#[must_use]
120pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
121 get_message_bus()
122 .borrow_mut()
123 .switchboard
124 .get_instrument_status_topic(instrument_id)
125}
126
127#[must_use]
128pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
129 get_message_bus()
130 .borrow_mut()
131 .switchboard
132 .get_instrument_close_topic(instrument_id)
133}
134
135#[must_use]
136pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
137 get_message_bus()
138 .borrow_mut()
139 .switchboard
140 .get_order_snapshots_topic(client_order_id)
141}
142
143#[must_use]
144pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
145 get_message_bus()
146 .borrow_mut()
147 .switchboard
148 .get_positions_snapshots_topic(position_id)
149}
150
151#[must_use]
152pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
153 get_message_bus()
154 .borrow_mut()
155 .switchboard
156 .get_event_orders_topic(strategy_id)
157}
158
159#[must_use]
160pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
161 get_message_bus()
162 .borrow_mut()
163 .switchboard
164 .get_event_positions_topic(strategy_id)
165}
166
167#[cfg(feature = "defi")]
168#[must_use]
169pub fn get_defi_blocks_topic(chain: Blockchain) -> MStr<Topic> {
170 get_message_bus()
171 .borrow_mut()
172 .switchboard
173 .get_defi_blocks_topic(chain)
174}
175
176#[cfg(feature = "defi")]
177#[must_use]
178pub fn get_defi_pool_topic(address: Address) -> MStr<Topic> {
179 get_message_bus()
180 .borrow_mut()
181 .switchboard
182 .get_defi_pool_topic(address)
183}
184
185#[cfg(feature = "defi")]
186#[must_use]
187pub fn get_defi_pool_swaps_topic(address: Address) -> MStr<Topic> {
188 get_message_bus()
189 .borrow_mut()
190 .switchboard
191 .get_defi_pool_swaps_topic(address)
192}
193
194#[cfg(feature = "defi")]
195#[must_use]
196pub fn get_defi_liquidity_topic(address: Address) -> MStr<Topic> {
197 get_message_bus()
198 .borrow_mut()
199 .switchboard
200 .get_defi_pool_liquidity_topic(address)
201}
202
203#[derive(Clone, Debug)]
205pub struct MessagingSwitchboard {
206 custom_topics: AHashMap<DataType, MStr<Topic>>,
207 instruments_topics: AHashMap<Venue, MStr<Topic>>,
208 instrument_topics: AHashMap<InstrumentId, MStr<Topic>>,
209 book_deltas_topics: AHashMap<InstrumentId, MStr<Topic>>,
210 book_depth10_topics: AHashMap<InstrumentId, MStr<Topic>>,
211 book_snapshots_topics: AHashMap<InstrumentId, MStr<Topic>>,
212 quote_topics: AHashMap<InstrumentId, MStr<Topic>>,
213 trade_topics: AHashMap<InstrumentId, MStr<Topic>>,
214 bar_topics: AHashMap<BarType, MStr<Topic>>,
215 mark_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
216 index_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
217 instrument_status_topics: AHashMap<InstrumentId, MStr<Topic>>,
218 instrument_close_topics: AHashMap<InstrumentId, MStr<Topic>>,
219 event_orders_topics: AHashMap<StrategyId, MStr<Topic>>,
220 event_positions_topics: AHashMap<StrategyId, MStr<Topic>>,
221 order_snapshots_topics: AHashMap<ClientOrderId, MStr<Topic>>,
222 positions_snapshots_topics: AHashMap<PositionId, MStr<Topic>>,
223 #[cfg(feature = "defi")]
224 defi_block_topics: AHashMap<Blockchain, MStr<Topic>>,
225 #[cfg(feature = "defi")]
226 defi_pool_topics: AHashMap<Address, MStr<Topic>>,
227 #[cfg(feature = "defi")]
228 defi_pool_swap_topics: AHashMap<Address, MStr<Topic>>,
229 #[cfg(feature = "defi")]
230 defi_pool_liquidity_topics: AHashMap<Address, MStr<Topic>>,
231}
232
233impl Default for MessagingSwitchboard {
234 fn default() -> Self {
236 Self {
237 custom_topics: AHashMap::new(),
238 instruments_topics: AHashMap::new(),
239 instrument_topics: AHashMap::new(),
240 book_deltas_topics: AHashMap::new(),
241 book_snapshots_topics: AHashMap::new(),
242 book_depth10_topics: AHashMap::new(),
243 quote_topics: AHashMap::new(),
244 trade_topics: AHashMap::new(),
245 mark_price_topics: AHashMap::new(),
246 index_price_topics: AHashMap::new(),
247 bar_topics: AHashMap::new(),
248 instrument_status_topics: AHashMap::new(),
249 instrument_close_topics: AHashMap::new(),
250 order_snapshots_topics: AHashMap::new(),
251 event_orders_topics: AHashMap::new(),
252 event_positions_topics: AHashMap::new(),
253 positions_snapshots_topics: AHashMap::new(),
254 #[cfg(feature = "defi")]
255 defi_block_topics: AHashMap::new(),
256 #[cfg(feature = "defi")]
257 defi_pool_topics: AHashMap::new(),
258 #[cfg(feature = "defi")]
259 defi_pool_swap_topics: AHashMap::new(),
260 #[cfg(feature = "defi")]
261 defi_pool_liquidity_topics: AHashMap::new(),
262 }
263 }
264}
265
266impl MessagingSwitchboard {
267 #[must_use]
268 pub fn data_engine_queue_execute() -> MStr<Endpoint> {
269 "DataEngine.queue_execute".into()
270 }
271
272 #[must_use]
273 pub fn data_engine_execute() -> MStr<Endpoint> {
274 "DataEngine.execute".into()
275 }
276
277 #[must_use]
278 pub fn data_engine_process() -> MStr<Endpoint> {
279 "DataEngine.process".into()
280 }
281
282 #[must_use]
283 pub fn data_engine_response() -> MStr<Endpoint> {
284 "DataEngine.response".into()
285 }
286
287 #[must_use]
288 pub fn exec_engine_execute() -> MStr<Endpoint> {
289 "ExecEngine.execute".into()
290 }
291
292 #[must_use]
293 pub fn exec_engine_process() -> MStr<Endpoint> {
294 "ExecEngine.process".into()
295 }
296
297 #[must_use]
298 pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
299 *self
300 .custom_topics
301 .entry(data_type.clone())
302 .or_insert_with(|| format!("data.{}", data_type.topic()).into())
303 }
304
305 #[must_use]
306 pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
307 *self
308 .instruments_topics
309 .entry(venue)
310 .or_insert_with(|| format!("data.instrument.{}", venue).into())
311 }
312
313 #[must_use]
314 pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
315 *self
316 .instrument_topics
317 .entry(instrument_id)
318 .or_insert_with(|| {
319 format!(
320 "data.instrument.{}.{}",
321 instrument_id.venue, instrument_id.symbol
322 )
323 .into()
324 })
325 }
326
327 #[must_use]
328 pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
329 *self
330 .book_deltas_topics
331 .entry(instrument_id)
332 .or_insert_with(|| {
333 format!(
334 "data.book.deltas.{}.{}",
335 instrument_id.venue, instrument_id.symbol
336 )
337 .into()
338 })
339 }
340
341 #[must_use]
342 pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
343 *self
344 .book_depth10_topics
345 .entry(instrument_id)
346 .or_insert_with(|| {
347 format!(
348 "data.book.depth10.{}.{}",
349 instrument_id.venue, instrument_id.symbol
350 )
351 .into()
352 })
353 }
354
355 #[must_use]
356 pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
357 *self
358 .book_snapshots_topics
359 .entry(instrument_id)
360 .or_insert_with(|| {
361 format!(
362 "data.book.snapshots.{}.{}",
363 instrument_id.venue, instrument_id.symbol
364 )
365 .into()
366 })
367 }
368
369 #[must_use]
370 pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
371 *self.quote_topics.entry(instrument_id).or_insert_with(|| {
372 format!(
373 "data.quotes.{}.{}",
374 instrument_id.venue, instrument_id.symbol
375 )
376 .into()
377 })
378 }
379
380 #[must_use]
381 pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
382 *self.trade_topics.entry(instrument_id).or_insert_with(|| {
383 format!(
384 "data.trades.{}.{}",
385 instrument_id.venue, instrument_id.symbol
386 )
387 .into()
388 })
389 }
390
391 #[must_use]
392 pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
393 *self
394 .bar_topics
395 .entry(bar_type)
396 .or_insert_with(|| format!("data.bars.{bar_type}").into())
397 }
398
399 #[must_use]
400 pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
401 *self
402 .mark_price_topics
403 .entry(instrument_id)
404 .or_insert_with(|| {
405 format!(
406 "data.mark_prices.{}.{}",
407 instrument_id.venue, instrument_id.symbol
408 )
409 .into()
410 })
411 }
412
413 #[must_use]
414 pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
415 *self
416 .index_price_topics
417 .entry(instrument_id)
418 .or_insert_with(|| {
419 format!(
420 "data.index_prices.{}.{}",
421 instrument_id.venue, instrument_id.symbol
422 )
423 .into()
424 })
425 }
426
427 #[must_use]
428 pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
429 *self
430 .instrument_status_topics
431 .entry(instrument_id)
432 .or_insert_with(|| {
433 format!(
434 "data.status.{}.{}",
435 instrument_id.venue, instrument_id.symbol
436 )
437 .into()
438 })
439 }
440
441 #[must_use]
442 pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
443 *self
444 .instrument_close_topics
445 .entry(instrument_id)
446 .or_insert_with(|| {
447 format!(
448 "data.close.{}.{}",
449 instrument_id.venue, instrument_id.symbol
450 )
451 .into()
452 })
453 }
454
455 #[must_use]
456 pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
457 *self
458 .order_snapshots_topics
459 .entry(client_order_id)
460 .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
461 }
462
463 #[must_use]
464 pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
465 *self
466 .positions_snapshots_topics
467 .entry(position_id)
468 .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
469 }
470
471 #[must_use]
472 pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
473 *self
474 .event_orders_topics
475 .entry(strategy_id)
476 .or_insert_with(|| format!("events.order.{strategy_id}").into())
477 }
478
479 #[must_use]
480 pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
481 *self
482 .event_positions_topics
483 .entry(strategy_id)
484 .or_insert_with(|| format!("events.position.{strategy_id}").into())
485 }
486
487 #[cfg(feature = "defi")]
488 #[must_use]
489 pub fn get_defi_blocks_topic(&mut self, chain: Blockchain) -> MStr<Topic> {
490 *self
491 .defi_block_topics
492 .entry(chain)
493 .or_insert_with(|| format!("data.defi.blocks.{chain}").into())
494 }
495
496 #[cfg(feature = "defi")]
497 #[must_use]
498 pub fn get_defi_pool_topic(&mut self, address: Address) -> MStr<Topic> {
499 *self
500 .defi_pool_topics
501 .entry(address)
502 .or_insert_with(|| format!("data.defi.pool.{address}").into())
503 }
504
505 #[cfg(feature = "defi")]
506 #[must_use]
507 pub fn get_defi_pool_swaps_topic(&mut self, address: Address) -> MStr<Topic> {
508 *self
509 .defi_pool_swap_topics
510 .entry(address)
511 .or_insert_with(|| format!("data.defi.pool_swaps.{address}").into())
512 }
513
514 #[cfg(feature = "defi")]
515 #[must_use]
516 pub fn get_defi_pool_liquidity_topic(&mut self, address: Address) -> MStr<Topic> {
517 *self
518 .defi_pool_liquidity_topics
519 .entry(address)
520 .or_insert_with(|| format!("data.defi.pool_liquidity.{address}").into())
521 }
522}
523
524#[cfg(test)]
528mod tests {
529 use nautilus_model::{
530 data::{BarType, DataType},
531 identifiers::InstrumentId,
532 };
533 use rstest::*;
534
535 use super::*;
536
537 #[fixture]
538 fn switchboard() -> MessagingSwitchboard {
539 MessagingSwitchboard::default()
540 }
541
542 #[fixture]
543 fn instrument_id() -> InstrumentId {
544 InstrumentId::from("ESZ24.XCME")
545 }
546
547 #[rstest]
548 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
549 let data_type = DataType::new("ExampleDataType", None);
550 let expected_topic = "data.ExampleDataType".into();
551 let result = switchboard.get_custom_topic(&data_type);
552 assert_eq!(result, expected_topic);
553 assert!(switchboard.custom_topics.contains_key(&data_type));
554 }
555
556 #[rstest]
557 fn test_get_instrument_topic(
558 mut switchboard: MessagingSwitchboard,
559 instrument_id: InstrumentId,
560 ) {
561 let expected_topic = "data.instrument.XCME.ESZ24".into();
562 let result = switchboard.get_instrument_topic(instrument_id);
563 assert_eq!(result, expected_topic);
564 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
565 }
566
567 #[rstest]
568 fn test_get_book_deltas_topic(
569 mut switchboard: MessagingSwitchboard,
570 instrument_id: InstrumentId,
571 ) {
572 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
573 let result = switchboard.get_book_deltas_topic(instrument_id);
574 assert_eq!(result, expected_topic);
575 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
576 }
577
578 #[rstest]
579 fn test_get_book_depth10_topic(
580 mut switchboard: MessagingSwitchboard,
581 instrument_id: InstrumentId,
582 ) {
583 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
584 let result = switchboard.get_book_depth10_topic(instrument_id);
585 assert_eq!(result, expected_topic);
586 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
587 }
588
589 #[rstest]
590 fn test_get_book_snapshots_topic(
591 mut switchboard: MessagingSwitchboard,
592 instrument_id: InstrumentId,
593 ) {
594 let expected_topic = "data.book.snapshots.XCME.ESZ24".into();
595 let result = switchboard.get_book_snapshots_topic(instrument_id);
596 assert_eq!(result, expected_topic);
597 assert!(
598 switchboard
599 .book_snapshots_topics
600 .contains_key(&instrument_id)
601 );
602 }
603
604 #[rstest]
605 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
606 let expected_topic = "data.quotes.XCME.ESZ24".into();
607 let result = switchboard.get_quotes_topic(instrument_id);
608 assert_eq!(result, expected_topic);
609 assert!(switchboard.quote_topics.contains_key(&instrument_id));
610 }
611
612 #[rstest]
613 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
614 let expected_topic = "data.trades.XCME.ESZ24".into();
615 let result = switchboard.get_trades_topic(instrument_id);
616 assert_eq!(result, expected_topic);
617 assert!(switchboard.trade_topics.contains_key(&instrument_id));
618 }
619
620 #[rstest]
621 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
622 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
623 let expected_topic = format!("data.bars.{bar_type}").into();
624 let result = switchboard.get_bars_topic(bar_type);
625 assert_eq!(result, expected_topic);
626 assert!(switchboard.bar_topics.contains_key(&bar_type));
627 }
628
629 #[rstest]
630 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
631 let client_order_id = ClientOrderId::from("O-123456789");
632 let expected_topic = format!("order.snapshots.{client_order_id}").into();
633 let result = switchboard.get_order_snapshots_topic(client_order_id);
634 assert_eq!(result, expected_topic);
635 assert!(
636 switchboard
637 .order_snapshots_topics
638 .contains_key(&client_order_id)
639 );
640 }
641}