nautilus_demo/
big_brain_actor.rs1use std::{any::Any, rc::Rc};
17
18use nautilus_common::{
19 actor::{Actor, registry::get_actor_unchecked},
20 messages::data::{
21 DataCommand, RequestCommand, RequestCustomData, SubscribeCommand, SubscribeCustomData,
22 UnsubscribeCommand, UnsubscribeCustomData,
23 },
24 msgbus::{
25 handler::{MessageHandler, ShareableMessageHandler, TypedMessageHandler},
26 register, register_response_handler, send,
27 },
28};
29use nautilus_core::{UUID4, UnixNanos};
30use nautilus_model::{data::DataType, identifiers::ClientId};
31use ustr::Ustr;
32
33pub struct BigBrainActor {
47 pub pos_val: i32,
48 pub neg_val: i32,
49}
50
51impl Default for BigBrainActor {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl BigBrainActor {
58 #[must_use]
59 pub const fn new() -> Self {
60 Self {
61 pos_val: 0,
62 neg_val: 0,
63 }
64 }
65
66 pub fn register_message_handlers() {
67 let handler = TypedMessageHandler::from(negative_handler);
68 let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc<dyn MessageHandler>);
69 let endpoint = "negative_stream".into();
70 register(endpoint, handler);
71 }
72}
73
74impl Actor for BigBrainActor {
75 fn id(&self) -> Ustr {
76 Ustr::from("big_brain_actor")
77 }
78
79 fn handle(&mut self, _msg: &dyn Any) {
80 todo!()
81 }
82
83 fn as_any(&self) -> &dyn Any {
84 self
85 }
86}
87
88pub fn negative_handler(msg: &i32) {
94 let actor_id = Ustr::from("big_brain_actor");
95 let big_brain_actor = get_actor_unchecked::<BigBrainActor>(&actor_id);
96 big_brain_actor.neg_val = *msg;
97
98 println!("Received negative value: {}", big_brain_actor.neg_val);
99
100 let correlation_id = UUID4::new();
101 let handler = TypedMessageHandler::from(positive_handler);
102 let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc<dyn MessageHandler>);
103 register_response_handler(&correlation_id, handler);
104
105 let data_type = if big_brain_actor.neg_val == -3 {
106 DataType::new("skip", None)
107 } else {
108 DataType::new("get", None)
109 };
110
111 let request = RequestCustomData {
112 client_id: ClientId::new("mock_data_client"),
113 data_type,
114 request_id: correlation_id,
115 ts_init: UnixNanos::new(0),
116 params: None,
117 };
118 let cmd = DataCommand::Request(RequestCommand::Data(request));
119
120 send("data_engine".into(), &cmd);
121}
122
123pub fn positive_handler(msg: &i32) {
129 let actor_id = Ustr::from("big_brain_actor");
130 let big_brain_actor = get_actor_unchecked::<BigBrainActor>(&actor_id);
131 big_brain_actor.pos_val = *msg;
132
133 println!("Received positive value: {}", big_brain_actor.pos_val);
134
135 let data_type = DataType::new("blah", None);
136
137 if big_brain_actor.pos_val == 3 {
138 let data = SubscribeCustomData::new(
139 Some(ClientId::new("mock_data_client")),
140 None,
141 data_type.clone(),
142 UUID4::new(),
143 UnixNanos::new(0),
144 None,
145 );
146 let cmd = DataCommand::Subscribe(SubscribeCommand::Data(data));
147 send("data_engine".into(), &cmd);
148 }
149
150 if big_brain_actor.pos_val > 8 {
151 let data = UnsubscribeCustomData::new(
152 Some(ClientId::new("mock_data_client")),
153 None,
154 data_type,
155 UUID4::new(),
156 UnixNanos::new(0),
157 None,
158 );
159 let cmd = DataCommand::Unsubscribe(UnsubscribeCommand::Data(data));
160 send("data_engine".into(), &cmd);
161 }
162}