1use std::{cell::RefCell, net::SocketAddr, pin::Pin, rc::Rc};
17
18use data_client::MockDataClient;
19use futures::{Stream, stream::SelectAll};
20use nautilus_common::{
21 cache::Cache,
22 clock::{Clock, LiveClock},
23 messages::data::{DataCommand, DataResponse},
24 msgbus::{
25 self,
26 handler::{ShareableMessageHandler, TypedMessageHandler},
27 },
28};
29use posei_trader::{
30 client::{DataClient, DataClientAdapter},
31 engine::DataEngine,
32};
33use nautilus_model::identifiers::Venue;
34use tokio_stream::StreamExt;
35
36pub mod big_brain_actor;
37pub mod data_client;
38pub mod http_server;
39pub mod websocket_server;
40
41pub async fn init_data_engine(
42 http_address: SocketAddr,
43 websocket_address: SocketAddr,
44) -> (
45 Pin<Box<dyn Stream<Item = DataResponse>>>,
46 Pin<Box<dyn Stream<Item = i32>>>,
47) {
48 let (client, http_stream, websocket_stream) =
49 MockDataClient::start(http_address, websocket_address).await;
50 let client: Box<dyn DataClient> = Box::new(client);
51 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(LiveClock::new()));
52
53 let adapter = DataClientAdapter::new(
54 client.client_id(),
55 Some(Venue::from_str_unchecked("DEMO")),
56 false,
57 false,
58 client,
59 );
60 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
61
62 let mut data_engine = DataEngine::new(clock, cache, None);
63 data_engine.register_client(adapter, None);
64 let data_engine = Rc::new(RefCell::new(data_engine));
65
66 let data_engine_clone = data_engine;
67 let _handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
68 move |cmd: &DataCommand| data_engine_clone.borrow_mut().execute(cmd),
69 )));
70
71 (http_stream, websocket_stream)
72}
73
74#[derive(Default)]
75pub struct LiveRunner {
76 data_response_stream: SelectAll<Pin<Box<dyn Stream<Item = DataResponse>>>>,
77 message_stream: SelectAll<Pin<Box<dyn Stream<Item = i32>>>>,
78}
79
80impl LiveRunner {
81 pub fn new_add_data_response_stream(
82 &mut self,
83 stream: Pin<Box<dyn Stream<Item = DataResponse>>>,
84 ) {
85 self.data_response_stream.push(stream);
86 }
87
88 pub fn new_message_stream(&mut self, stream: Pin<Box<dyn Stream<Item = i32>>>) {
89 self.message_stream.push(stream);
90 }
91
92 pub async fn run(&mut self) {
93 let endpoint = "negative_stream".into();
94
95 loop {
96 tokio::select! {
98 data_response = self.data_response_stream.next() => {
99 if let Some(DataResponse::Data(custom_data_response)) = data_response {
100 println!("Received custom data response: {custom_data_response:?}");
101 let value = custom_data_response.data.downcast_ref::<i32>().copied().unwrap();
102 msgbus::response(&custom_data_response.correlation_id, &value);
103 }
104 }
105 message = self.message_stream.next() => {
106 if let Some(message) = message {
107 msgbus::send(endpoint, &message);
108 }
109 }
110 }
111 }
112 }
113}