nautilus_common/python/msgbus.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 2Posei Systems Pty Ltd. All rights reserved.
3// https://poseitrader.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::rc::Rc;
17
18use nautilus_core::python::to_pyvalue_err;
19use pyo3::{PyObject, PyResult, pymethods};
20
21use super::handler::PythonMessageHandler;
22use crate::msgbus::{
23 BusMessage, MStr, MessageBus, Topic, core::Endpoint, deregister,
24 handler::ShareableMessageHandler, publish, register, send, subscribe, unsubscribe,
25};
26
27#[pymethods]
28impl BusMessage {
29 #[getter]
30 #[pyo3(name = "topic")]
31 fn py_topic(&mut self) -> String {
32 self.topic.to_string()
33 }
34
35 #[getter]
36 #[pyo3(name = "payload")]
37 fn py_payload(&mut self) -> &[u8] {
38 self.payload.as_ref()
39 }
40
41 fn __repr__(&self) -> String {
42 format!("{}('{}')", stringify!(BusMessage), self)
43 }
44
45 fn __str__(&self) -> String {
46 self.to_string()
47 }
48}
49
50#[pymethods]
51impl MessageBus {
52 /// Sends the `message` to the `endpoint`.
53 ///
54 /// # Errors
55 ///
56 /// Returns an error if `endpoint` is invalid.
57 #[pyo3(name = "send")]
58 pub fn py_send(&self, endpoint: &str, message: PyObject) -> PyResult<()> {
59 let endpoint = MStr::<Endpoint>::endpoint(endpoint).map_err(to_pyvalue_err)?;
60 send(endpoint, &message);
61 Ok(())
62 }
63
64 /// Publishes the `message` to the `topic`.
65 ///
66 /// # Errors
67 ///
68 /// Returns an error if `topic` is invalid.
69 #[pyo3(name = "publish")]
70 #[staticmethod]
71 pub fn py_publish(topic: &str, message: PyObject) -> PyResult<()> {
72 let topic = MStr::<Topic>::topic(topic).map_err(to_pyvalue_err)?;
73 publish(topic, &message);
74 Ok(())
75 }
76
77 /// Registers the given `handler` for the `endpoint` address.
78 ///
79 /// Updates endpoint handler if already exists.
80 ///
81 /// # Errors
82 ///
83 /// Returns an error if `endpoint` is invalid.
84 #[pyo3(name = "register")]
85 #[staticmethod]
86 pub fn py_register(endpoint: &str, handler: PythonMessageHandler) -> PyResult<()> {
87 let endpoint = MStr::<Endpoint>::endpoint(endpoint).map_err(to_pyvalue_err)?;
88 let handler = ShareableMessageHandler(Rc::new(handler));
89 register(endpoint, handler);
90 Ok(())
91 }
92
93 /// Subscribes the given `handler` to the `topic`.
94 ///
95 /// The priority for the subscription determines the ordering of
96 /// handlers receiving messages being processed, higher priority
97 /// handlers will receive messages before lower priority handlers.
98 ///
99 /// Safety: Priority should be between 0 and 255
100 ///
101 /// Updates topic handler if already exists.
102 ///
103 /// # Warnings
104 ///
105 /// Assigning priority handling is an advanced feature which *shouldn't
106 /// normally be needed by most users*. **Only assign a higher priority to the
107 /// subscription if you are certain of what you're doing**. If an inappropriate
108 /// priority is assigned then the handler may receive messages before core
109 /// system components have been able to process necessary calculations and
110 /// produce potential side effects for logically sound behavior.
111 #[pyo3(name = "subscribe")]
112 #[pyo3(signature = (topic, handler, priority=None))]
113 #[staticmethod]
114 pub fn py_subscribe(topic: &str, handler: PythonMessageHandler, priority: Option<u8>) {
115 let pattern = topic.into();
116 let handler = ShareableMessageHandler(Rc::new(handler));
117 subscribe(pattern, handler, priority);
118 }
119
120 /// Returns whether there are subscribers for the given `pattern`.
121 #[must_use]
122 #[pyo3(name = "is_subscribed")]
123 pub fn py_is_subscribed(&self, topic: &str, handler: PythonMessageHandler) -> bool {
124 let handler = ShareableMessageHandler(Rc::new(handler));
125 self.is_subscribed(topic, handler)
126 }
127
128 /// Unsubscribes the given `handler` from the `topic`.
129 #[pyo3(name = "unsubscribe")]
130 #[staticmethod]
131 pub fn py_unsubscribe(topic: &str, handler: PythonMessageHandler) {
132 let pattern = topic.into();
133 let handler = ShareableMessageHandler(Rc::new(handler));
134 unsubscribe(pattern, handler);
135 }
136
137 /// Returns whether there are subscribers for the given `pattern`.
138 #[must_use]
139 #[pyo3(name = "is_registered")]
140 pub fn py_is_registered(&self, endpoint: &str) -> bool {
141 self.is_registered(endpoint)
142 }
143
144 /// Deregisters the given `handler` for the `endpoint` address.
145 ///
146 /// # Errors
147 ///
148 /// Returns an error if `endpoint` is invalid.
149 #[pyo3(name = "deregister")]
150 #[staticmethod]
151 pub fn py_deregister(endpoint: &str) -> PyResult<()> {
152 let endpoint = MStr::<Endpoint>::endpoint(endpoint).map_err(to_pyvalue_err)?;
153 deregister(endpoint);
154 Ok(())
155 }
156}