nautilus_blockchain/rpc/
core.rs1use std::{collections::HashMap, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{block::Block, chain::Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::websocket::{Consumer, WebSocketClient, WebSocketConfig};
21use reqwest::header::USER_AGENT;
22use tokio_tungstenite::tungstenite::Message;
23
24use crate::rpc::{
25 error::BlockchainRpcClientError,
26 types::{BlockchainMessage, RpcEventType},
27 utils::{
28 extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
29 },
30};
31
32#[derive(Debug)]
39pub struct CoreBlockchainRpcClient {
40 chain: Chain,
42 wss_rpc_url: String,
44 request_id: u64,
46 pending_subscription_request: HashMap<u64, RpcEventType>,
48 subscription_event_types: HashMap<String, RpcEventType>,
51 wss_client: Option<Arc<WebSocketClient>>,
53 wss_consumer_rx: Option<tokio::sync::mpsc::Receiver<Message>>,
55}
56
57impl CoreBlockchainRpcClient {
58 #[must_use]
59 pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
60 Self {
61 chain,
62 wss_rpc_url,
63 request_id: 1,
64 wss_client: None,
65 pending_subscription_request: HashMap::new(),
66 subscription_event_types: HashMap::new(),
67 wss_consumer_rx: None,
68 }
69 }
70
71 pub async fn connect(&mut self) -> anyhow::Result<()> {
77 let (tx, rx) = tokio::sync::mpsc::channel(100);
78 let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
79 let heartbeat_interval = 30;
81 let config = WebSocketConfig {
82 url: self.wss_rpc_url.clone(),
83 headers: vec![user_agent],
84 heartbeat: Some(heartbeat_interval),
85 heartbeat_msg: None,
86 handler: Consumer::Rust(tx),
87 #[cfg(feature = "python")]
88 ping_handler: None,
89 reconnect_timeout_ms: Some(5_000),
90 reconnect_delay_initial_ms: None,
91 reconnect_jitter_ms: None,
92 reconnect_backoff_factor: None,
93 reconnect_delay_max_ms: None,
94 };
95 let client = WebSocketClient::connect(
96 config,
97 #[cfg(feature = "python")]
98 None,
99 #[cfg(feature = "python")]
100 None,
101 #[cfg(feature = "python")]
102 None,
103 vec![],
104 None,
105 )
106 .await?;
107
108 self.wss_client = Some(Arc::new(client));
109 self.wss_consumer_rx = Some(rx);
110
111 Ok(())
112 }
113
114 async fn subscribe_events(
116 &mut self,
117 event_type: RpcEventType,
118 subscription_id: String,
119 ) -> Result<(), BlockchainRpcClientError> {
120 if let Some(client) = &self.wss_client {
121 log::info!("Subscribing to new blocks on chain {}", self.chain.name);
122 let msg = serde_json::json!({
123 "method": "eth_subscribe",
124 "id": self.request_id,
125 "jsonrpc": "2.0",
126 "params": [subscription_id]
127 });
128 self.pending_subscription_request
129 .insert(self.request_id, event_type);
130 self.request_id += 1;
131 if let Err(err) = client.send_text(msg.to_string(), None).await {
132 log::error!("Error sending subscribe message: {err:?}");
133 }
134 Ok(())
135 } else {
136 Err(BlockchainRpcClientError::ClientError(String::from(
137 "Client not connected",
138 )))
139 }
140 }
141
142 async fn unsubscribe_events(
144 &self,
145 subscription_id: String,
146 ) -> Result<(), BlockchainRpcClientError> {
147 if let Some(client) = &self.wss_client {
148 log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
149 let msg = serde_json::json!({
150 "method": "eth_unsubscribe",
151 "id": 1,
152 "jsonrpc": "2.0",
153 "params": [subscription_id]
154 });
155 if let Err(err) = client.send_text(msg.to_string(), None).await {
156 log::error!("Error sending unsubscribe message: {err:?}");
157 }
158 Ok(())
159 } else {
160 Err(BlockchainRpcClientError::ClientError(String::from(
161 "Client not connected",
162 )))
163 }
164 }
165
166 pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
168 match &mut self.wss_consumer_rx {
169 Some(rx) => rx.recv().await,
170 None => None,
171 }
172 }
173
174 pub async fn next_rpc_message(
184 &mut self,
185 ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
186 while let Some(msg) = self.wait_on_rpc_channel().await {
187 match msg {
188 Message::Text(text) => match serde_json::from_str::<serde_json::Value>(&text) {
189 Ok(json) => {
190 if is_subscription_confirmation_response(&json) {
191 let subscription_request_id = json.get("id").unwrap().as_u64().unwrap();
192 let result = json.get("result").unwrap().as_str().unwrap();
193 let event_type = self
194 .pending_subscription_request
195 .get(&subscription_request_id)
196 .unwrap();
197 self.subscription_event_types
198 .insert(result.to_string(), event_type.clone());
199 self.pending_subscription_request
200 .remove(&subscription_request_id);
201 continue;
202 } else if is_subscription_event(&json) {
203 let subscription_id = match extract_rpc_subscription_id(&json) {
204 Some(id) => id,
205 None => {
206 return Err(BlockchainRpcClientError::InternalRpcClientError(
207 "Error parsing subscription id from valid rpc response"
208 .to_string(),
209 ));
210 }
211 };
212 if let Some(event_type) =
213 self.subscription_event_types.get(subscription_id)
214 {
215 match event_type {
216 RpcEventType::NewBlock => {
217 return match serde_json::from_value::<
218 RpcNodeWssResponse<Block>,
219 >(json)
220 {
221 Ok(block_response) => {
222 let mut block = block_response.params.result;
223 block.set_chain(self.chain.clone());
224 Ok(BlockchainMessage::Block(block))
225 }
226 Err(e) => {
227 Err(BlockchainRpcClientError::MessageParsingError(
228 format!(
229 "Error parsing rpc response to block with error {e}"
230 ),
231 ))
232 }
233 };
234 }
235 }
236 }
237 return Err(BlockchainRpcClientError::InternalRpcClientError(format!(
238 "Event type not found for defined subscription id {subscription_id}"
239 )));
240 }
241 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
242 json.to_string(),
243 ));
244 }
245 Err(e) => {
246 return Err(BlockchainRpcClientError::MessageParsingError(e.to_string()));
247 }
248 },
249 Message::Pong(_) => {
250 continue;
251 }
252 _ => {
253 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
254 msg.to_string(),
255 ));
256 }
257 }
258 }
259
260 Err(BlockchainRpcClientError::NoMessageReceived)
261 }
262
263 pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
269 self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
270 .await
271 }
272
273 pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
279 self.unsubscribe_events(String::from("newHeads")).await?;
280
281 let subscription_ids_to_remove: Vec<String> = self
283 .subscription_event_types
284 .iter()
285 .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
286 .map(|(id, _)| id.clone())
287 .collect();
288
289 for id in subscription_ids_to_remove {
290 self.subscription_event_types.remove(&id);
291 }
292 Ok(())
293 }
294}