nautilus_demo/
big_brain_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{any::Any, rc::Rc};
17
18use nautilus_common::{
19    actor::{Actor, registry::get_actor_unchecked},
20    messages::data::{
21        DataCommand, RequestCommand, RequestCustomData, SubscribeCommand, SubscribeCustomData,
22        UnsubscribeCommand, UnsubscribeCustomData,
23    },
24    msgbus::{
25        handler::{MessageHandler, ShareableMessageHandler, TypedMessageHandler},
26        register, register_response_handler, send,
27    },
28};
29use nautilus_core::{UUID4, UnixNanos};
30use nautilus_model::{data::DataType, identifiers::ClientId};
31use ustr::Ustr;
32
33/// Big brain actor receives positive and negative streams of numbers
34///
35/// The negative drives the positive stream and the postive after reaching
36/// 10 issues a stop command. The actor should ideally behave like this
37///
38/// -1 -> get request
39/// 1
40/// -2 -> get request
41/// 2
42/// -3 -> skip request
43/// 7 -> skip command
44/// -8 -> get request
45/// 8 -> stop command
46pub struct BigBrainActor {
47    pub pos_val: i32,
48    pub neg_val: i32,
49}
50
51impl Default for BigBrainActor {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl BigBrainActor {
58    #[must_use]
59    pub const fn new() -> Self {
60        Self {
61            pos_val: 0,
62            neg_val: 0,
63        }
64    }
65
66    pub fn register_message_handlers() {
67        let handler = TypedMessageHandler::from(negative_handler);
68        let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc<dyn MessageHandler>);
69        let endpoint = "negative_stream".into();
70        register(endpoint, handler);
71    }
72}
73
74impl Actor for BigBrainActor {
75    fn id(&self) -> Ustr {
76        Ustr::from("big_brain_actor")
77    }
78
79    fn handle(&mut self, _msg: &dyn Any) {
80        todo!()
81    }
82
83    fn as_any(&self) -> &dyn Any {
84        self
85    }
86}
87
88/// Negative integer stream handler
89///
90/// It prints each positive number it receives. For each negative number
91/// it makes requests a positive number. When negative number is equal to -3
92/// it issues a skipped positive number request instead.
93pub fn negative_handler(msg: &i32) {
94    let actor_id = Ustr::from("big_brain_actor");
95    let big_brain_actor = get_actor_unchecked::<BigBrainActor>(&actor_id);
96    big_brain_actor.neg_val = *msg;
97
98    println!("Received negative value: {}", big_brain_actor.neg_val);
99
100    let correlation_id = UUID4::new();
101    let handler = TypedMessageHandler::from(positive_handler);
102    let handler = ShareableMessageHandler::from(Rc::new(handler) as Rc<dyn MessageHandler>);
103    register_response_handler(&correlation_id, handler);
104
105    let data_type = if big_brain_actor.neg_val == -3 {
106        DataType::new("skip", None)
107    } else {
108        DataType::new("get", None)
109    };
110
111    let request = RequestCustomData {
112        client_id: ClientId::new("mock_data_client"),
113        data_type,
114        request_id: correlation_id,
115        ts_init: UnixNanos::new(0),
116        params: None,
117    };
118    let cmd = DataCommand::Request(RequestCommand::Data(request));
119
120    send("data_engine".into(), &cmd);
121}
122
123/// Positive integer stream handler
124///
125/// It prints each positive number it receives. When the positive value
126/// exceeds 3, it issues a skip command for the negative stream. When it exceeds
127/// 8 it issues a stop command for the negative stream
128pub fn positive_handler(msg: &i32) {
129    let actor_id = Ustr::from("big_brain_actor");
130    let big_brain_actor = get_actor_unchecked::<BigBrainActor>(&actor_id);
131    big_brain_actor.pos_val = *msg;
132
133    println!("Received positive value: {}", big_brain_actor.pos_val);
134
135    let data_type = DataType::new("blah", None);
136
137    if big_brain_actor.pos_val == 3 {
138        let data = SubscribeCustomData::new(
139            Some(ClientId::new("mock_data_client")),
140            None,
141            data_type.clone(),
142            UUID4::new(),
143            UnixNanos::new(0),
144            None,
145        );
146        let cmd = DataCommand::Subscribe(SubscribeCommand::Data(data));
147        send("data_engine".into(), &cmd);
148    }
149
150    if big_brain_actor.pos_val > 8 {
151        let data = UnsubscribeCustomData::new(
152            Some(ClientId::new("mock_data_client")),
153            None,
154            data_type,
155            UUID4::new(),
156            UnixNanos::new(0),
157            None,
158        );
159        let cmd = DataCommand::Unsubscribe(UnsubscribeCommand::Data(data));
160        send("data_engine".into(), &cmd);
161    }
162}