nautilus_blockchain/rpc/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{block::Block, chain::Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::websocket::{Consumer, WebSocketClient, WebSocketConfig};
21use reqwest::header::USER_AGENT;
22use tokio_tungstenite::tungstenite::Message;
23
24use crate::rpc::{
25    error::BlockchainRpcClientError,
26    types::{BlockchainMessage, RpcEventType},
27    utils::{
28        extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
29    },
30};
31
32/// Core implementation of a blockchain RPC client that serves as the base for all chain-specific clients.
33/// It provides a shared implementation of common blockchain RPC functionality. It handles:
34/// - WebSocket connection management with blockchain RPC node
35/// - Subscription lifecycle (creation, tracking, and termination)
36/// - Message serialization and deserialization of RPC messages
37/// - Event type mapping and dispatching
38#[derive(Debug)]
39pub struct CoreBlockchainRpcClient {
40    /// The blockchain network type this client connects to.
41    chain: Chain,
42    /// WebSocket secure URL for the blockchain node's RPC endpoint.
43    wss_rpc_url: String,
44    /// Auto-incrementing counter for generating unique RPC request IDs.
45    request_id: u64,
46    /// Tracks in-flight subscription requests by mapping request IDs to their event types.
47    pending_subscription_request: HashMap<u64, RpcEventType>,
48    /// Maps active subscription IDs to their corresponding event types for message
49    /// deserialization.
50    subscription_event_types: HashMap<String, RpcEventType>,
51    /// The active WebSocket client connection.
52    wss_client: Option<Arc<WebSocketClient>>,
53    /// Channel receiver for consuming WebSocket messages.
54    wss_consumer_rx: Option<tokio::sync::mpsc::Receiver<Message>>,
55}
56
57impl CoreBlockchainRpcClient {
58    #[must_use]
59    pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
60        Self {
61            chain,
62            wss_rpc_url,
63            request_id: 1,
64            wss_client: None,
65            pending_subscription_request: HashMap::new(),
66            subscription_event_types: HashMap::new(),
67            wss_consumer_rx: None,
68        }
69    }
70
71    /// Establishes a WebSocket connection to the blockchain node and sets up the message channel.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the WebSocket connection fails.
76    pub async fn connect(&mut self) -> anyhow::Result<()> {
77        let (tx, rx) = tokio::sync::mpsc::channel(100);
78        let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
79        // Most of the blockchain rpc nodes require a heartbeat to keep the connection alive
80        let heartbeat_interval = 30;
81        let config = WebSocketConfig {
82            url: self.wss_rpc_url.clone(),
83            headers: vec![user_agent],
84            heartbeat: Some(heartbeat_interval),
85            heartbeat_msg: None,
86            handler: Consumer::Rust(tx),
87            #[cfg(feature = "python")]
88            ping_handler: None,
89            reconnect_timeout_ms: Some(5_000),
90            reconnect_delay_initial_ms: None,
91            reconnect_jitter_ms: None,
92            reconnect_backoff_factor: None,
93            reconnect_delay_max_ms: None,
94        };
95        let client = WebSocketClient::connect(
96            config,
97            #[cfg(feature = "python")]
98            None,
99            #[cfg(feature = "python")]
100            None,
101            #[cfg(feature = "python")]
102            None,
103            vec![],
104            None,
105        )
106        .await?;
107
108        self.wss_client = Some(Arc::new(client));
109        self.wss_consumer_rx = Some(rx);
110
111        Ok(())
112    }
113
114    /// Registers a subscription for the specified event type and records it internally with the given ID.
115    async fn subscribe_events(
116        &mut self,
117        event_type: RpcEventType,
118        subscription_id: String,
119    ) -> Result<(), BlockchainRpcClientError> {
120        if let Some(client) = &self.wss_client {
121            log::info!("Subscribing to new blocks on chain {}", self.chain.name);
122            let msg = serde_json::json!({
123                "method": "eth_subscribe",
124                "id": self.request_id,
125                "jsonrpc": "2.0",
126                "params": [subscription_id]
127            });
128            self.pending_subscription_request
129                .insert(self.request_id, event_type);
130            self.request_id += 1;
131            if let Err(err) = client.send_text(msg.to_string(), None).await {
132                log::error!("Error sending subscribe message: {err:?}");
133            }
134            Ok(())
135        } else {
136            Err(BlockchainRpcClientError::ClientError(String::from(
137                "Client not connected",
138            )))
139        }
140    }
141
142    /// Terminates a subscription with the blockchain node using the provided subscription ID.
143    async fn unsubscribe_events(
144        &self,
145        subscription_id: String,
146    ) -> Result<(), BlockchainRpcClientError> {
147        if let Some(client) = &self.wss_client {
148            log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
149            let msg = serde_json::json!({
150                "method": "eth_unsubscribe",
151                "id": 1,
152                "jsonrpc": "2.0",
153                "params": [subscription_id]
154            });
155            if let Err(err) = client.send_text(msg.to_string(), None).await {
156                log::error!("Error sending unsubscribe message: {err:?}");
157            }
158            Ok(())
159        } else {
160            Err(BlockchainRpcClientError::ClientError(String::from(
161                "Client not connected",
162            )))
163        }
164    }
165
166    /// Waits for and returns the next available message from the WebSocket channel.
167    pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
168        match &mut self.wss_consumer_rx {
169            Some(rx) => rx.recv().await,
170            None => None,
171        }
172    }
173
174    /// Retrieves, parses, and returns the next blockchain RPC message as a structured `BlockchainRpcMessage` type.
175    ///
176    /// # Panics
177    ///
178    /// Panics if expected fields (`id`, `result`) are missing or cannot be converted when handling subscription confirmations or events.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the RPC channel encounters an error or if deserialization of the message fails.
183    pub async fn next_rpc_message(
184        &mut self,
185    ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
186        while let Some(msg) = self.wait_on_rpc_channel().await {
187            match msg {
188                Message::Text(text) => match serde_json::from_str::<serde_json::Value>(&text) {
189                    Ok(json) => {
190                        if is_subscription_confirmation_response(&json) {
191                            let subscription_request_id = json.get("id").unwrap().as_u64().unwrap();
192                            let result = json.get("result").unwrap().as_str().unwrap();
193                            let event_type = self
194                                .pending_subscription_request
195                                .get(&subscription_request_id)
196                                .unwrap();
197                            self.subscription_event_types
198                                .insert(result.to_string(), event_type.clone());
199                            self.pending_subscription_request
200                                .remove(&subscription_request_id);
201                            continue;
202                        } else if is_subscription_event(&json) {
203                            let subscription_id = match extract_rpc_subscription_id(&json) {
204                                Some(id) => id,
205                                None => {
206                                    return Err(BlockchainRpcClientError::InternalRpcClientError(
207                                        "Error parsing subscription id from valid rpc response"
208                                            .to_string(),
209                                    ));
210                                }
211                            };
212                            if let Some(event_type) =
213                                self.subscription_event_types.get(subscription_id)
214                            {
215                                match event_type {
216                                    RpcEventType::NewBlock => {
217                                        return match serde_json::from_value::<
218                                            RpcNodeWssResponse<Block>,
219                                        >(json)
220                                        {
221                                            Ok(block_response) => {
222                                                let mut block = block_response.params.result;
223                                                block.set_chain(self.chain.clone());
224                                                Ok(BlockchainMessage::Block(block))
225                                            }
226                                            Err(e) => {
227                                                Err(BlockchainRpcClientError::MessageParsingError(
228                                                    format!(
229                                                        "Error parsing rpc response to block with error {e}"
230                                                    ),
231                                                ))
232                                            }
233                                        };
234                                    }
235                                }
236                            }
237                            return Err(BlockchainRpcClientError::InternalRpcClientError(format!(
238                                "Event type not found for defined subscription id {subscription_id}"
239                            )));
240                        }
241                        return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
242                            json.to_string(),
243                        ));
244                    }
245                    Err(e) => {
246                        return Err(BlockchainRpcClientError::MessageParsingError(e.to_string()));
247                    }
248                },
249                Message::Pong(_) => {
250                    continue;
251                }
252                _ => {
253                    return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
254                        msg.to_string(),
255                    ));
256                }
257            }
258        }
259
260        Err(BlockchainRpcClientError::NoMessageReceived)
261    }
262
263    /// Subscribes to real-time block updates from the blockchain node.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the subscription request fails or if the client is not connected.
268    pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
269        self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
270            .await
271    }
272
273    /// Cancels the subscription to real-time block updates.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the unsubscription request fails or if the client is not connected.
278    pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
279        self.unsubscribe_events(String::from("newHeads")).await?;
280
281        // Find and remove the subscription ID associated with the newBlock event type
282        let subscription_ids_to_remove: Vec<String> = self
283            .subscription_event_types
284            .iter()
285            .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
286            .map(|(id, _)| id.clone())
287            .collect();
288
289        for id in subscription_ids_to_remove {
290            self.subscription_event_types.remove(&id);
291        }
292        Ok(())
293    }
294}