nautilus_demo/
lib.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::RefCell, net::SocketAddr, pin::Pin, rc::Rc};
17
18use data_client::MockDataClient;
19use futures::{Stream, stream::SelectAll};
20use nautilus_common::{
21    cache::Cache,
22    clock::{Clock, LiveClock},
23    messages::data::{DataCommand, DataResponse},
24    msgbus::{
25        self,
26        handler::{ShareableMessageHandler, TypedMessageHandler},
27    },
28};
29use posei_trader::{
30    client::{DataClient, DataClientAdapter},
31    engine::DataEngine,
32};
33use nautilus_model::identifiers::Venue;
34use tokio_stream::StreamExt;
35
36pub mod big_brain_actor;
37pub mod data_client;
38pub mod http_server;
39pub mod websocket_server;
40
41pub async fn init_data_engine(
42    http_address: SocketAddr,
43    websocket_address: SocketAddr,
44) -> (
45    Pin<Box<dyn Stream<Item = DataResponse>>>,
46    Pin<Box<dyn Stream<Item = i32>>>,
47) {
48    let (client, http_stream, websocket_stream) =
49        MockDataClient::start(http_address, websocket_address).await;
50    let client: Box<dyn DataClient> = Box::new(client);
51    let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(LiveClock::new()));
52
53    let adapter = DataClientAdapter::new(
54        client.client_id(),
55        Some(Venue::from_str_unchecked("DEMO")),
56        false,
57        false,
58        client,
59    );
60    let cache = Rc::new(RefCell::new(Cache::new(None, None)));
61
62    let mut data_engine = DataEngine::new(clock, cache, None);
63    data_engine.register_client(adapter, None);
64    let data_engine = Rc::new(RefCell::new(data_engine));
65
66    let data_engine_clone = data_engine;
67    let _handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
68        move |cmd: &DataCommand| data_engine_clone.borrow_mut().execute(cmd),
69    )));
70
71    (http_stream, websocket_stream)
72}
73
74#[derive(Default)]
75pub struct LiveRunner {
76    data_response_stream: SelectAll<Pin<Box<dyn Stream<Item = DataResponse>>>>,
77    message_stream: SelectAll<Pin<Box<dyn Stream<Item = i32>>>>,
78}
79
80impl LiveRunner {
81    pub fn new_add_data_response_stream(
82        &mut self,
83        stream: Pin<Box<dyn Stream<Item = DataResponse>>>,
84    ) {
85        self.data_response_stream.push(stream);
86    }
87
88    pub fn new_message_stream(&mut self, stream: Pin<Box<dyn Stream<Item = i32>>>) {
89        self.message_stream.push(stream);
90    }
91
92    pub async fn run(&mut self) {
93        let endpoint = "negative_stream".into();
94
95        loop {
96            // TODO: push decoding logic into data client
97            tokio::select! {
98                data_response = self.data_response_stream.next() => {
99                    if let Some(DataResponse::Data(custom_data_response)) = data_response {
100                            println!("Received custom data response: {custom_data_response:?}");
101                            let value = custom_data_response.data.downcast_ref::<i32>().copied().unwrap();
102                            msgbus::response(&custom_data_response.correlation_id, &value);
103                    }
104                }
105                message = self.message_stream.next() => {
106                    if let Some(message) = message {
107                        msgbus::send(endpoint, &message);
108                    }
109                }
110            }
111        }
112    }
113}