nautilus_common/python/
msgbus.rs1use std::rc::Rc;
17
18use nautilus_core::python::to_pyvalue_err;
19use pyo3::{PyObject, PyResult, pyfunction, pymethods};
20
21use super::handler::PythonMessageHandler;
22use crate::msgbus::{
23 BusMessage, MStr, Topic, core::Endpoint, deregister, get_message_bus,
24 handler::ShareableMessageHandler, publish, register, send_any, subscribe, unsubscribe,
25};
26
27#[pymethods]
28impl BusMessage {
29 #[getter]
30 #[pyo3(name = "topic")]
31 fn py_topic(&mut self) -> String {
32 self.topic.to_string()
33 }
34
35 #[getter]
36 #[pyo3(name = "payload")]
37 fn py_payload(&mut self) -> &[u8] {
38 self.payload.as_ref()
39 }
40
41 fn __repr__(&self) -> String {
42 format!("{}('{}')", stringify!(BusMessage), self)
43 }
44
45 fn __str__(&self) -> String {
46 self.to_string()
47 }
48}
49
50#[pyfunction]
56#[pyo3(name = "msgbus_send")]
57pub fn py_msgbus_send(endpoint: &str, message: PyObject) -> PyResult<()> {
58 let endpoint = MStr::<Endpoint>::endpoint(endpoint).map_err(to_pyvalue_err)?;
59 send_any(endpoint, &message);
60 Ok(())
61}
62
63#[pyfunction]
65#[pyo3(name = "msgbus_is_subscribed")]
66pub fn py_msgbus_is_subscribed(topic: &str, handler: PythonMessageHandler) -> bool {
67 let handler = ShareableMessageHandler(Rc::new(handler));
68 get_message_bus().borrow().is_subscribed(topic, handler)
69}
70
71#[pyfunction]
73#[pyo3(name = "msgbus_is_registered")]
74pub fn py_msgbus_is_registered(endpoint: &str) -> bool {
75 get_message_bus().borrow().is_registered(endpoint)
76}
77
78#[pyfunction]
84#[pyo3(name = "msgbus_publish")]
85pub fn py_msgbus_publish(topic: &str, message: PyObject) -> PyResult<()> {
86 let topic = MStr::<Topic>::topic(topic).map_err(to_pyvalue_err)?;
87 publish(topic, &message);
88 Ok(())
89}
90
91#[pyfunction]
99#[pyo3(name = "msgbus_register")]
100pub fn py_msgbus_register(endpoint: &str, handler: PythonMessageHandler) -> PyResult<()> {
101 let endpoint = MStr::<Endpoint>::endpoint(endpoint).map_err(to_pyvalue_err)?;
102 let handler = ShareableMessageHandler(Rc::new(handler));
103 register(endpoint, handler);
104 Ok(())
105}
106
107#[pyfunction]
126#[pyo3(name = "msgbus_subscribe")]
127#[pyo3(signature = (topic, handler, priority=None))]
128pub fn py_msgbus_subscribe(topic: &str, handler: PythonMessageHandler, priority: Option<u8>) {
129 let pattern = topic.into();
130 let handler = ShareableMessageHandler(Rc::new(handler));
131 subscribe(pattern, handler, priority);
132}
133
134#[pyfunction]
136#[pyo3(name = "msgbus_unsubscribe")]
137pub fn py_msgbus_unsubscribe(topic: &str, handler: PythonMessageHandler) {
138 let pattern = topic.into();
139 let handler = ShareableMessageHandler(Rc::new(handler));
140 unsubscribe(pattern, handler);
141}
142
143#[pyfunction]
149#[pyo3(name = "msgbus_deregister")]
150pub fn py_msgbus_deregister(endpoint: &str) -> PyResult<()> {
151 let endpoint = MStr::<Endpoint>::endpoint(endpoint).map_err(to_pyvalue_err)?;
152 deregister(endpoint);
153 Ok(())
154}