nautilus_demo/
data_client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{net::SocketAddr, pin::Pin, sync::Arc};
17
18use futures::{Stream, StreamExt};
19use nautilus_common::{
20    messages::data::{
21        CustomDataResponse, DataResponse, RequestCustomData, SubscribeCustomData,
22        UnsubscribeCustomData,
23    },
24    runtime,
25};
26use nautilus_core::UnixNanos;
27use posei_trader::client::DataClient;
28use nautilus_model::{
29    data::DataType,
30    identifiers::{ClientId, Venue},
31};
32use nautilus_network::{
33    http::HttpClient,
34    websocket::{Consumer, WebSocketClient, WebSocketConfig},
35};
36use reqwest::Method;
37use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
38use tokio_tungstenite::tungstenite::Message;
39
40pub struct MockDataClient {
41    http_address: SocketAddr,
42    http_client: HttpClient,
43    websocket_client: Arc<WebSocketClient>,
44    http_tx: tokio::sync::mpsc::UnboundedSender<DataResponse>,
45}
46
47impl MockDataClient {
48    pub async fn start(
49        http_address: SocketAddr,
50        websocket_address: SocketAddr,
51    ) -> (
52        Self,
53        Pin<Box<dyn Stream<Item = DataResponse>>>,
54        Pin<Box<dyn Stream<Item = i32>>>,
55    ) {
56        // Create HTTP client with default settings
57        let http_client = HttpClient::new(
58            std::collections::HashMap::new(), // empty headers
59            Vec::new(),                       // no header keys
60            Vec::new(),                       // no keyed quotas
61            None,                             // no default quota
62            Some(5),                          // 30 second timeout
63        );
64
65        println!("Started mock data client with HTTP endpoint: {http_address:?}");
66        println!("WebSocket endpoint: {websocket_address:?}");
67
68        let (tx, rx) = tokio::sync::mpsc::channel(100);
69        let (http_tx, http_rx) = tokio::sync::mpsc::unbounded_channel();
70
71        let config = WebSocketConfig {
72            url: format!("ws://{websocket_address}"),
73            headers: vec![],
74            handler: Consumer::Rust(tx),
75            heartbeat: None,
76            heartbeat_msg: None,
77            ping_handler: None,
78            reconnect_timeout_ms: None,
79            reconnect_delay_initial_ms: None,
80            reconnect_delay_max_ms: None,
81            reconnect_backoff_factor: None,
82            reconnect_jitter_ms: None,
83        };
84
85        let websocket_client = WebSocketClient::connect(config, None, None, None, Vec::new(), None)
86            .await
87            .unwrap();
88
89        let http_stream = UnboundedReceiverStream::new(http_rx);
90
91        let websocket_stream = ReceiverStream::new(rx).map(|message| match message {
92            Message::Text(text) => text.parse::<i32>().unwrap(),
93            _ => unreachable!("Expected Message::Text"),
94        });
95
96        (
97            Self {
98                http_address,
99                http_client,
100                http_tx,
101                websocket_client: Arc::new(websocket_client),
102            },
103            Box::pin(http_stream),
104            Box::pin(websocket_stream),
105        )
106    }
107
108    fn get_request(&self, req: &RequestCustomData) {
109        let req = req.clone();
110        let http_client = self.http_client.clone();
111        let http_tx = self.http_tx.clone();
112        let http_address = self.http_address;
113        runtime::get_runtime().spawn(async move {
114            let response = http_client
115                .request(
116                    Method::GET,
117                    format!("http://{http_address}/get"),
118                    None,
119                    None,
120                    None,
121                    None,
122                )
123                .await
124                .unwrap();
125
126            let value = String::from_utf8(response.body.to_vec())
127                .unwrap()
128                .parse::<i32>()
129                .unwrap();
130            println!("Received positive value: {value}");
131
132            let response = DataResponse::Data(CustomDataResponse::new(
133                req.request_id,
134                req.client_id,
135                Some(Venue::new("http positive stream")),
136                DataType::new("positive_stream", None),
137                Arc::new(value),
138                UnixNanos::new(0),
139                None,
140            ));
141            http_tx.send(response).unwrap();
142        });
143    }
144
145    fn skip_request(&self, req: &RequestCustomData) {
146        let req = req.clone();
147        let http_client = self.http_client.clone();
148        let http_tx = self.http_tx.clone();
149        let http_address = self.http_address;
150        runtime::get_runtime().spawn(async move {
151            let response = http_client
152                .request(
153                    Method::GET,
154                    format!("http://{http_address}/skip"),
155                    None,
156                    None,
157                    None,
158                    None,
159                )
160                .await
161                .unwrap();
162
163            let value = String::from_utf8(response.body.to_vec())
164                .unwrap()
165                .parse::<i32>()
166                .unwrap();
167            println!("Received positive value: {value}");
168
169            let response = DataResponse::Data(CustomDataResponse::new(
170                req.request_id,
171                req.client_id,
172                Some(Venue::new("http positive stream")),
173                DataType::new("positive_stream", None),
174                Arc::new(value),
175                UnixNanos::new(0),
176                None,
177            ));
178            http_tx.send(response).unwrap();
179        });
180    }
181}
182
183#[async_trait::async_trait]
184impl DataClient for MockDataClient {
185    fn client_id(&self) -> nautilus_model::identifiers::ClientId {
186        ClientId::new("mock_data_client")
187    }
188
189    fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
190        if request.data_type.type_name() == "get" {
191            println!("Received get data request");
192            self.get_request(request);
193        } else if request.data_type.type_name() == "skip" {
194            println!("Received skip data request");
195            self.skip_request(request);
196        }
197
198        Ok(())
199    }
200
201    fn subscribe(&mut self, _cmd: &SubscribeCustomData) -> anyhow::Result<()> {
202        println!("Received subscribe command");
203        let websocket_client = self.websocket_client.clone();
204        runtime::get_runtime().spawn(async move {
205            if let Err(err) = websocket_client.send_text("SKIP".to_string(), None).await {
206                tracing::error!("Error sending SKIP message: {err:?}");
207            }
208        });
209        Ok(())
210    }
211
212    fn unsubscribe(&mut self, _cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
213        println!("Received unsubscribe command");
214        let websocket_client = self.websocket_client.clone();
215        runtime::get_runtime().spawn(async move {
216            if let Err(err) = websocket_client.send_text("STOP".to_string(), None).await {
217                tracing::error!("Error sending STOP message: {err:?}");
218            }
219        });
220        Ok(())
221    }
222
223    fn venue(&self) -> Option<Venue> {
224        None
225    }
226
227    fn start(&self) -> anyhow::Result<()> {
228        Ok(())
229    }
230
231    fn stop(&self) -> anyhow::Result<()> {
232        Ok(())
233    }
234
235    fn reset(&self) -> anyhow::Result<()> {
236        Ok(())
237    }
238
239    fn dispose(&self) -> anyhow::Result<()> {
240        Ok(())
241    }
242
243    async fn connect(&self) -> anyhow::Result<()> {
244        Ok(())
245    }
246
247    async fn disconnect(&self) -> anyhow::Result<()> {
248        Ok(())
249    }
250
251    fn is_connected(&self) -> bool {
252        true
253    }
254
255    fn is_disconnected(&self) -> bool {
256        false
257    }
258}