nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17#[cfg(feature = "defi")]
18use alloy_primitives::Address;
19#[cfg(feature = "defi")]
20use nautilus_model::defi::Blockchain;
21use nautilus_model::{
22    data::{BarType, DataType},
23    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
24};
25
26use super::core::{Endpoint, MStr, Topic};
27use crate::msgbus::get_message_bus;
28
29pub const CLOSE_TOPIC: &str = "CLOSE";
30
31#[must_use]
32pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
33    get_message_bus()
34        .borrow_mut()
35        .switchboard
36        .get_custom_topic(data_type)
37}
38
39#[must_use]
40pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
41    get_message_bus()
42        .borrow_mut()
43        .switchboard
44        .get_instruments_topic(venue)
45}
46
47#[must_use]
48pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
49    get_message_bus()
50        .borrow_mut()
51        .switchboard
52        .get_instrument_topic(instrument_id)
53}
54
55#[must_use]
56pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
57    get_message_bus()
58        .borrow_mut()
59        .switchboard
60        .get_book_deltas_topic(instrument_id)
61}
62
63#[must_use]
64pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
65    get_message_bus()
66        .borrow_mut()
67        .switchboard
68        .get_book_depth10_topic(instrument_id)
69}
70
71#[must_use]
72pub fn get_book_snapshots_topic(instrument_id: InstrumentId) -> MStr<Topic> {
73    get_message_bus()
74        .borrow_mut()
75        .switchboard
76        .get_book_snapshots_topic(instrument_id)
77}
78
79#[must_use]
80pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
81    get_message_bus()
82        .borrow_mut()
83        .switchboard
84        .get_quotes_topic(instrument_id)
85}
86
87#[must_use]
88pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
89    get_message_bus()
90        .borrow_mut()
91        .switchboard
92        .get_trades_topic(instrument_id)
93}
94
95#[must_use]
96pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
97    get_message_bus()
98        .borrow_mut()
99        .switchboard
100        .get_bars_topic(bar_type)
101}
102
103#[must_use]
104pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
105    get_message_bus()
106        .borrow_mut()
107        .switchboard
108        .get_mark_price_topic(instrument_id)
109}
110
111#[must_use]
112pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
113    get_message_bus()
114        .borrow_mut()
115        .switchboard
116        .get_index_price_topic(instrument_id)
117}
118
119#[must_use]
120pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
121    get_message_bus()
122        .borrow_mut()
123        .switchboard
124        .get_instrument_status_topic(instrument_id)
125}
126
127#[must_use]
128pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
129    get_message_bus()
130        .borrow_mut()
131        .switchboard
132        .get_instrument_close_topic(instrument_id)
133}
134
135#[must_use]
136pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
137    get_message_bus()
138        .borrow_mut()
139        .switchboard
140        .get_order_snapshots_topic(client_order_id)
141}
142
143#[must_use]
144pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
145    get_message_bus()
146        .borrow_mut()
147        .switchboard
148        .get_positions_snapshots_topic(position_id)
149}
150
151#[must_use]
152pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
153    get_message_bus()
154        .borrow_mut()
155        .switchboard
156        .get_event_orders_topic(strategy_id)
157}
158
159#[must_use]
160pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
161    get_message_bus()
162        .borrow_mut()
163        .switchboard
164        .get_event_positions_topic(strategy_id)
165}
166
167#[cfg(feature = "defi")]
168#[must_use]
169pub fn get_defi_blocks_topic(chain: Blockchain) -> MStr<Topic> {
170    get_message_bus()
171        .borrow_mut()
172        .switchboard
173        .get_defi_blocks_topic(chain)
174}
175
176#[cfg(feature = "defi")]
177#[must_use]
178pub fn get_defi_pool_topic(address: Address) -> MStr<Topic> {
179    get_message_bus()
180        .borrow_mut()
181        .switchboard
182        .get_defi_pool_topic(address)
183}
184
185#[cfg(feature = "defi")]
186#[must_use]
187pub fn get_defi_pool_swaps_topic(address: Address) -> MStr<Topic> {
188    get_message_bus()
189        .borrow_mut()
190        .switchboard
191        .get_defi_pool_swaps_topic(address)
192}
193
194#[cfg(feature = "defi")]
195#[must_use]
196pub fn get_defi_liquidity_topic(address: Address) -> MStr<Topic> {
197    get_message_bus()
198        .borrow_mut()
199        .switchboard
200        .get_defi_pool_liquidity_topic(address)
201}
202
203/// Represents a switchboard of built-in messaging endpoint names.
204#[derive(Clone, Debug)]
205pub struct MessagingSwitchboard {
206    custom_topics: AHashMap<DataType, MStr<Topic>>,
207    instruments_topics: AHashMap<Venue, MStr<Topic>>,
208    instrument_topics: AHashMap<InstrumentId, MStr<Topic>>,
209    book_deltas_topics: AHashMap<InstrumentId, MStr<Topic>>,
210    book_depth10_topics: AHashMap<InstrumentId, MStr<Topic>>,
211    book_snapshots_topics: AHashMap<InstrumentId, MStr<Topic>>,
212    quote_topics: AHashMap<InstrumentId, MStr<Topic>>,
213    trade_topics: AHashMap<InstrumentId, MStr<Topic>>,
214    bar_topics: AHashMap<BarType, MStr<Topic>>,
215    mark_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
216    index_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
217    instrument_status_topics: AHashMap<InstrumentId, MStr<Topic>>,
218    instrument_close_topics: AHashMap<InstrumentId, MStr<Topic>>,
219    event_orders_topics: AHashMap<StrategyId, MStr<Topic>>,
220    event_positions_topics: AHashMap<StrategyId, MStr<Topic>>,
221    order_snapshots_topics: AHashMap<ClientOrderId, MStr<Topic>>,
222    positions_snapshots_topics: AHashMap<PositionId, MStr<Topic>>,
223    #[cfg(feature = "defi")]
224    defi_block_topics: AHashMap<Blockchain, MStr<Topic>>,
225    #[cfg(feature = "defi")]
226    defi_pool_topics: AHashMap<Address, MStr<Topic>>,
227    #[cfg(feature = "defi")]
228    defi_pool_swap_topics: AHashMap<Address, MStr<Topic>>,
229    #[cfg(feature = "defi")]
230    defi_pool_liquidity_topics: AHashMap<Address, MStr<Topic>>,
231}
232
233impl Default for MessagingSwitchboard {
234    /// Creates a new default [`MessagingSwitchboard`] instance.
235    fn default() -> Self {
236        Self {
237            custom_topics: AHashMap::new(),
238            instruments_topics: AHashMap::new(),
239            instrument_topics: AHashMap::new(),
240            book_deltas_topics: AHashMap::new(),
241            book_snapshots_topics: AHashMap::new(),
242            book_depth10_topics: AHashMap::new(),
243            quote_topics: AHashMap::new(),
244            trade_topics: AHashMap::new(),
245            mark_price_topics: AHashMap::new(),
246            index_price_topics: AHashMap::new(),
247            bar_topics: AHashMap::new(),
248            instrument_status_topics: AHashMap::new(),
249            instrument_close_topics: AHashMap::new(),
250            order_snapshots_topics: AHashMap::new(),
251            event_orders_topics: AHashMap::new(),
252            event_positions_topics: AHashMap::new(),
253            positions_snapshots_topics: AHashMap::new(),
254            #[cfg(feature = "defi")]
255            defi_block_topics: AHashMap::new(),
256            #[cfg(feature = "defi")]
257            defi_pool_topics: AHashMap::new(),
258            #[cfg(feature = "defi")]
259            defi_pool_swap_topics: AHashMap::new(),
260            #[cfg(feature = "defi")]
261            defi_pool_liquidity_topics: AHashMap::new(),
262        }
263    }
264}
265
266impl MessagingSwitchboard {
267    #[must_use]
268    pub fn data_engine_queue_execute() -> MStr<Endpoint> {
269        "DataEngine.queue_execute".into()
270    }
271
272    #[must_use]
273    pub fn data_engine_execute() -> MStr<Endpoint> {
274        "DataEngine.execute".into()
275    }
276
277    #[must_use]
278    pub fn data_engine_process() -> MStr<Endpoint> {
279        "DataEngine.process".into()
280    }
281
282    #[must_use]
283    pub fn data_engine_response() -> MStr<Endpoint> {
284        "DataEngine.response".into()
285    }
286
287    #[must_use]
288    pub fn exec_engine_execute() -> MStr<Endpoint> {
289        "ExecEngine.execute".into()
290    }
291
292    #[must_use]
293    pub fn exec_engine_process() -> MStr<Endpoint> {
294        "ExecEngine.process".into()
295    }
296
297    #[must_use]
298    pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
299        *self
300            .custom_topics
301            .entry(data_type.clone())
302            .or_insert_with(|| format!("data.{}", data_type.topic()).into())
303    }
304
305    #[must_use]
306    pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
307        *self
308            .instruments_topics
309            .entry(venue)
310            .or_insert_with(|| format!("data.instrument.{}", venue).into())
311    }
312
313    #[must_use]
314    pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
315        *self
316            .instrument_topics
317            .entry(instrument_id)
318            .or_insert_with(|| {
319                format!(
320                    "data.instrument.{}.{}",
321                    instrument_id.venue, instrument_id.symbol
322                )
323                .into()
324            })
325    }
326
327    #[must_use]
328    pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
329        *self
330            .book_deltas_topics
331            .entry(instrument_id)
332            .or_insert_with(|| {
333                format!(
334                    "data.book.deltas.{}.{}",
335                    instrument_id.venue, instrument_id.symbol
336                )
337                .into()
338            })
339    }
340
341    #[must_use]
342    pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
343        *self
344            .book_depth10_topics
345            .entry(instrument_id)
346            .or_insert_with(|| {
347                format!(
348                    "data.book.depth10.{}.{}",
349                    instrument_id.venue, instrument_id.symbol
350                )
351                .into()
352            })
353    }
354
355    #[must_use]
356    pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
357        *self
358            .book_snapshots_topics
359            .entry(instrument_id)
360            .or_insert_with(|| {
361                format!(
362                    "data.book.snapshots.{}.{}",
363                    instrument_id.venue, instrument_id.symbol
364                )
365                .into()
366            })
367    }
368
369    #[must_use]
370    pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
371        *self.quote_topics.entry(instrument_id).or_insert_with(|| {
372            format!(
373                "data.quotes.{}.{}",
374                instrument_id.venue, instrument_id.symbol
375            )
376            .into()
377        })
378    }
379
380    #[must_use]
381    pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
382        *self.trade_topics.entry(instrument_id).or_insert_with(|| {
383            format!(
384                "data.trades.{}.{}",
385                instrument_id.venue, instrument_id.symbol
386            )
387            .into()
388        })
389    }
390
391    #[must_use]
392    pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
393        *self
394            .bar_topics
395            .entry(bar_type)
396            .or_insert_with(|| format!("data.bars.{bar_type}").into())
397    }
398
399    #[must_use]
400    pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
401        *self
402            .mark_price_topics
403            .entry(instrument_id)
404            .or_insert_with(|| {
405                format!(
406                    "data.mark_prices.{}.{}",
407                    instrument_id.venue, instrument_id.symbol
408                )
409                .into()
410            })
411    }
412
413    #[must_use]
414    pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
415        *self
416            .index_price_topics
417            .entry(instrument_id)
418            .or_insert_with(|| {
419                format!(
420                    "data.index_prices.{}.{}",
421                    instrument_id.venue, instrument_id.symbol
422                )
423                .into()
424            })
425    }
426
427    #[must_use]
428    pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
429        *self
430            .instrument_status_topics
431            .entry(instrument_id)
432            .or_insert_with(|| {
433                format!(
434                    "data.status.{}.{}",
435                    instrument_id.venue, instrument_id.symbol
436                )
437                .into()
438            })
439    }
440
441    #[must_use]
442    pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
443        *self
444            .instrument_close_topics
445            .entry(instrument_id)
446            .or_insert_with(|| {
447                format!(
448                    "data.close.{}.{}",
449                    instrument_id.venue, instrument_id.symbol
450                )
451                .into()
452            })
453    }
454
455    #[must_use]
456    pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
457        *self
458            .order_snapshots_topics
459            .entry(client_order_id)
460            .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
461    }
462
463    #[must_use]
464    pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
465        *self
466            .positions_snapshots_topics
467            .entry(position_id)
468            .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
469    }
470
471    #[must_use]
472    pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
473        *self
474            .event_orders_topics
475            .entry(strategy_id)
476            .or_insert_with(|| format!("events.order.{strategy_id}").into())
477    }
478
479    #[must_use]
480    pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
481        *self
482            .event_positions_topics
483            .entry(strategy_id)
484            .or_insert_with(|| format!("events.position.{strategy_id}").into())
485    }
486
487    #[cfg(feature = "defi")]
488    #[must_use]
489    pub fn get_defi_blocks_topic(&mut self, chain: Blockchain) -> MStr<Topic> {
490        *self
491            .defi_block_topics
492            .entry(chain)
493            .or_insert_with(|| format!("data.defi.blocks.{chain}").into())
494    }
495
496    #[cfg(feature = "defi")]
497    #[must_use]
498    pub fn get_defi_pool_topic(&mut self, address: Address) -> MStr<Topic> {
499        *self
500            .defi_pool_topics
501            .entry(address)
502            .or_insert_with(|| format!("data.defi.pool.{address}").into())
503    }
504
505    #[cfg(feature = "defi")]
506    #[must_use]
507    pub fn get_defi_pool_swaps_topic(&mut self, address: Address) -> MStr<Topic> {
508        *self
509            .defi_pool_swap_topics
510            .entry(address)
511            .or_insert_with(|| format!("data.defi.pool_swaps.{address}").into())
512    }
513
514    #[cfg(feature = "defi")]
515    #[must_use]
516    pub fn get_defi_pool_liquidity_topic(&mut self, address: Address) -> MStr<Topic> {
517        *self
518            .defi_pool_liquidity_topics
519            .entry(address)
520            .or_insert_with(|| format!("data.defi.pool_liquidity.{address}").into())
521    }
522}
523
524////////////////////////////////////////////////////////////////////////////////
525// Tests
526////////////////////////////////////////////////////////////////////////////////
527#[cfg(test)]
528mod tests {
529    use nautilus_model::{
530        data::{BarType, DataType},
531        identifiers::InstrumentId,
532    };
533    use rstest::*;
534
535    use super::*;
536
537    #[fixture]
538    fn switchboard() -> MessagingSwitchboard {
539        MessagingSwitchboard::default()
540    }
541
542    #[fixture]
543    fn instrument_id() -> InstrumentId {
544        InstrumentId::from("ESZ24.XCME")
545    }
546
547    #[rstest]
548    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
549        let data_type = DataType::new("ExampleDataType", None);
550        let expected_topic = "data.ExampleDataType".into();
551        let result = switchboard.get_custom_topic(&data_type);
552        assert_eq!(result, expected_topic);
553        assert!(switchboard.custom_topics.contains_key(&data_type));
554    }
555
556    #[rstest]
557    fn test_get_instrument_topic(
558        mut switchboard: MessagingSwitchboard,
559        instrument_id: InstrumentId,
560    ) {
561        let expected_topic = "data.instrument.XCME.ESZ24".into();
562        let result = switchboard.get_instrument_topic(instrument_id);
563        assert_eq!(result, expected_topic);
564        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
565    }
566
567    #[rstest]
568    fn test_get_book_deltas_topic(
569        mut switchboard: MessagingSwitchboard,
570        instrument_id: InstrumentId,
571    ) {
572        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
573        let result = switchboard.get_book_deltas_topic(instrument_id);
574        assert_eq!(result, expected_topic);
575        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
576    }
577
578    #[rstest]
579    fn test_get_book_depth10_topic(
580        mut switchboard: MessagingSwitchboard,
581        instrument_id: InstrumentId,
582    ) {
583        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
584        let result = switchboard.get_book_depth10_topic(instrument_id);
585        assert_eq!(result, expected_topic);
586        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
587    }
588
589    #[rstest]
590    fn test_get_book_snapshots_topic(
591        mut switchboard: MessagingSwitchboard,
592        instrument_id: InstrumentId,
593    ) {
594        let expected_topic = "data.book.snapshots.XCME.ESZ24".into();
595        let result = switchboard.get_book_snapshots_topic(instrument_id);
596        assert_eq!(result, expected_topic);
597        assert!(
598            switchboard
599                .book_snapshots_topics
600                .contains_key(&instrument_id)
601        );
602    }
603
604    #[rstest]
605    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
606        let expected_topic = "data.quotes.XCME.ESZ24".into();
607        let result = switchboard.get_quotes_topic(instrument_id);
608        assert_eq!(result, expected_topic);
609        assert!(switchboard.quote_topics.contains_key(&instrument_id));
610    }
611
612    #[rstest]
613    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
614        let expected_topic = "data.trades.XCME.ESZ24".into();
615        let result = switchboard.get_trades_topic(instrument_id);
616        assert_eq!(result, expected_topic);
617        assert!(switchboard.trade_topics.contains_key(&instrument_id));
618    }
619
620    #[rstest]
621    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
622        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
623        let expected_topic = format!("data.bars.{bar_type}").into();
624        let result = switchboard.get_bars_topic(bar_type);
625        assert_eq!(result, expected_topic);
626        assert!(switchboard.bar_topics.contains_key(&bar_type));
627    }
628
629    #[rstest]
630    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
631        let client_order_id = ClientOrderId::from("O-123456789");
632        let expected_topic = format!("order.snapshots.{client_order_id}").into();
633        let result = switchboard.get_order_snapshots_topic(client_order_id);
634        assert_eq!(result, expected_topic);
635        assert!(
636            switchboard
637                .order_snapshots_topics
638                .contains_key(&client_order_id)
639        );
640    }
641}