1use std::{net::SocketAddr, pin::Pin, sync::Arc};
17
18use futures::{Stream, StreamExt};
19use nautilus_common::{
20 messages::data::{
21 CustomDataResponse, DataResponse, RequestCustomData, SubscribeCustomData,
22 UnsubscribeCustomData,
23 },
24 runtime,
25};
26use nautilus_core::UnixNanos;
27use posei_trader::client::DataClient;
28use nautilus_model::{
29 data::DataType,
30 identifiers::{ClientId, Venue},
31};
32use nautilus_network::{
33 http::HttpClient,
34 websocket::{Consumer, WebSocketClient, WebSocketConfig},
35};
36use reqwest::Method;
37use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
38use tokio_tungstenite::tungstenite::Message;
39
40pub struct MockDataClient {
41 http_address: SocketAddr,
42 http_client: HttpClient,
43 websocket_client: Arc<WebSocketClient>,
44 http_tx: tokio::sync::mpsc::UnboundedSender<DataResponse>,
45}
46
47impl MockDataClient {
48 pub async fn start(
49 http_address: SocketAddr,
50 websocket_address: SocketAddr,
51 ) -> (
52 Self,
53 Pin<Box<dyn Stream<Item = DataResponse>>>,
54 Pin<Box<dyn Stream<Item = i32>>>,
55 ) {
56 let http_client = HttpClient::new(
58 std::collections::HashMap::new(), Vec::new(), Vec::new(), None, Some(5), );
64
65 println!("Started mock data client with HTTP endpoint: {http_address:?}");
66 println!("WebSocket endpoint: {websocket_address:?}");
67
68 let (tx, rx) = tokio::sync::mpsc::channel(100);
69 let (http_tx, http_rx) = tokio::sync::mpsc::unbounded_channel();
70
71 let config = WebSocketConfig {
72 url: format!("ws://{websocket_address}"),
73 headers: vec![],
74 handler: Consumer::Rust(tx),
75 heartbeat: None,
76 heartbeat_msg: None,
77 ping_handler: None,
78 reconnect_timeout_ms: None,
79 reconnect_delay_initial_ms: None,
80 reconnect_delay_max_ms: None,
81 reconnect_backoff_factor: None,
82 reconnect_jitter_ms: None,
83 };
84
85 let websocket_client = WebSocketClient::connect(config, None, None, None, Vec::new(), None)
86 .await
87 .unwrap();
88
89 let http_stream = UnboundedReceiverStream::new(http_rx);
90
91 let websocket_stream = ReceiverStream::new(rx).map(|message| match message {
92 Message::Text(text) => text.parse::<i32>().unwrap(),
93 _ => unreachable!("Expected Message::Text"),
94 });
95
96 (
97 Self {
98 http_address,
99 http_client,
100 http_tx,
101 websocket_client: Arc::new(websocket_client),
102 },
103 Box::pin(http_stream),
104 Box::pin(websocket_stream),
105 )
106 }
107
108 fn get_request(&self, req: &RequestCustomData) {
109 let req = req.clone();
110 let http_client = self.http_client.clone();
111 let http_tx = self.http_tx.clone();
112 let http_address = self.http_address;
113 runtime::get_runtime().spawn(async move {
114 let response = http_client
115 .request(
116 Method::GET,
117 format!("http://{http_address}/get"),
118 None,
119 None,
120 None,
121 None,
122 )
123 .await
124 .unwrap();
125
126 let value = String::from_utf8(response.body.to_vec())
127 .unwrap()
128 .parse::<i32>()
129 .unwrap();
130 println!("Received positive value: {value}");
131
132 let response = DataResponse::Data(CustomDataResponse::new(
133 req.request_id,
134 req.client_id,
135 Some(Venue::new("http positive stream")),
136 DataType::new("positive_stream", None),
137 Arc::new(value),
138 UnixNanos::new(0),
139 None,
140 ));
141 http_tx.send(response).unwrap();
142 });
143 }
144
145 fn skip_request(&self, req: &RequestCustomData) {
146 let req = req.clone();
147 let http_client = self.http_client.clone();
148 let http_tx = self.http_tx.clone();
149 let http_address = self.http_address;
150 runtime::get_runtime().spawn(async move {
151 let response = http_client
152 .request(
153 Method::GET,
154 format!("http://{http_address}/skip"),
155 None,
156 None,
157 None,
158 None,
159 )
160 .await
161 .unwrap();
162
163 let value = String::from_utf8(response.body.to_vec())
164 .unwrap()
165 .parse::<i32>()
166 .unwrap();
167 println!("Received positive value: {value}");
168
169 let response = DataResponse::Data(CustomDataResponse::new(
170 req.request_id,
171 req.client_id,
172 Some(Venue::new("http positive stream")),
173 DataType::new("positive_stream", None),
174 Arc::new(value),
175 UnixNanos::new(0),
176 None,
177 ));
178 http_tx.send(response).unwrap();
179 });
180 }
181}
182
183#[async_trait::async_trait]
184impl DataClient for MockDataClient {
185 fn client_id(&self) -> nautilus_model::identifiers::ClientId {
186 ClientId::new("mock_data_client")
187 }
188
189 fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
190 if request.data_type.type_name() == "get" {
191 println!("Received get data request");
192 self.get_request(request);
193 } else if request.data_type.type_name() == "skip" {
194 println!("Received skip data request");
195 self.skip_request(request);
196 }
197
198 Ok(())
199 }
200
201 fn subscribe(&mut self, _cmd: &SubscribeCustomData) -> anyhow::Result<()> {
202 println!("Received subscribe command");
203 let websocket_client = self.websocket_client.clone();
204 runtime::get_runtime().spawn(async move {
205 if let Err(err) = websocket_client.send_text("SKIP".to_string(), None).await {
206 tracing::error!("Error sending SKIP message: {err:?}");
207 }
208 });
209 Ok(())
210 }
211
212 fn unsubscribe(&mut self, _cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
213 println!("Received unsubscribe command");
214 let websocket_client = self.websocket_client.clone();
215 runtime::get_runtime().spawn(async move {
216 if let Err(err) = websocket_client.send_text("STOP".to_string(), None).await {
217 tracing::error!("Error sending STOP message: {err:?}");
218 }
219 });
220 Ok(())
221 }
222
223 fn venue(&self) -> Option<Venue> {
224 None
225 }
226
227 fn start(&self) -> anyhow::Result<()> {
228 Ok(())
229 }
230
231 fn stop(&self) -> anyhow::Result<()> {
232 Ok(())
233 }
234
235 fn reset(&self) -> anyhow::Result<()> {
236 Ok(())
237 }
238
239 fn dispose(&self) -> anyhow::Result<()> {
240 Ok(())
241 }
242
243 async fn connect(&self) -> anyhow::Result<()> {
244 Ok(())
245 }
246
247 async fn disconnect(&self) -> anyhow::Result<()> {
248 Ok(())
249 }
250
251 fn is_connected(&self) -> bool {
252 true
253 }
254
255 fn is_disconnected(&self) -> bool {
256 false
257 }
258}