nautilus_blockchain/hypersync/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::BTreeSet, sync::Arc};
17
18use alloy::primitives::keccak256;
19use hypersync_client::{
20    Client, ClientConfig,
21    net_types::{BlockSelection, FieldSelection, Query},
22};
23use nautilus_model::defi::chain::SharedChain;
24use reqwest::Url;
25
26use crate::{
27    events::pool_created::PoolCreated, exchanges::extended::DexExtended,
28    hypersync::transform::transform_hypersync_block, rpc::types::BlockchainMessage,
29};
30
31/// The interval in milliseconds at which to check for new blocks when waiting
32/// for the hypersync to index the block.
33const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
34
35/// A client for interacting with a HyperSync API to retrieve blockchain data.
36#[derive(Debug)]
37pub struct HyperSyncClient {
38    /// The target blockchain identifier (e.g. Ethereum, Arbitrum).
39    chain: SharedChain,
40    /// The underlying HyperSync Rust client for making API requests.
41    client: Arc<Client>,
42    /// Background task handle for the block subscription task.
43    blocks_subscription_task: Option<tokio::task::JoinHandle<()>>,
44    /// Channel for sending blockchain messages to the adapter data client.
45    tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
46}
47
48impl HyperSyncClient {
49    /// Creates a new [`HyperSyncClient`] instance for the given chain and message sender.
50    ///
51    /// # Panics
52    ///
53    /// Panics if the chain's `hypersync_url` is invalid or if the underlying client cannot be initialized.
54    #[must_use]
55    pub fn new(
56        chain: SharedChain,
57        tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
58    ) -> Self {
59        let mut config = ClientConfig::default();
60        let hypersync_url =
61            Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
62        config.url = Some(hypersync_url);
63        let client = Client::new(config).unwrap();
64        Self {
65            chain,
66            client: Arc::new(client),
67            blocks_subscription_task: None,
68            tx,
69        }
70    }
71
72    /// Fetches historical pool creation events for a specific DEX from the HyperSync API.
73    pub async fn request_pool_created_events(
74        &mut self,
75        from_block: u32,
76        dex: &DexExtended,
77    ) -> Vec<PoolCreated> {
78        let factory_address = dex.factory.as_ref();
79        let pair_created_event = dex.pool_created_event.as_ref();
80        log::info!(
81            "Requesting pair created events from Hypersync emitted by factory {factory_address} and from block {from_block}"
82        );
83        let event_hash = keccak256(pair_created_event.as_bytes());
84        let topic0 = format!("0x{}", hex::encode(event_hash));
85
86        let query = serde_json::from_value(serde_json::json!({
87            "from_block": from_block,
88            "logs": [{
89                "topic": [
90                    topic0
91                ],
92                "address": [
93                    factory_address,
94                ]
95            }],
96            "field_selection": {
97                "log": [
98                    "block_number",
99                    "data",
100                    "topic0",
101                    "topic1",
102                    "topic2",
103                    "topic3",
104                ]
105        }}))
106        .unwrap();
107
108        let mut rx = self
109            .client
110            .clone()
111            .stream(query, Default::default())
112            .await
113            .unwrap();
114        let mut result = Vec::new();
115        while let Some(response) = rx.recv().await {
116            let response = response.unwrap();
117            for batch in response.data.logs {
118                for log in batch {
119                    let pool = dex.parse_pool_created_event(log).unwrap();
120                    result.push(pool);
121                }
122            }
123        }
124        result
125    }
126
127    /// Disconnects from the HyperSync service and stops all background tasks.
128    pub fn disconnect(&mut self) {
129        self.unsubscribe_blocks();
130    }
131
132    /// Starts a background task that continuously polls for new blockchain blocks.
133    pub fn subscribe_blocks(&mut self) {
134        let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
135            .fields
136            .iter()
137            .map(|x| x.name.clone())
138            .collect();
139        let client = self.client.clone();
140        let tx = self.tx.clone();
141        let chain = self.chain.clone();
142        let task = tokio::spawn(async move {
143            let current_block_height = client.get_height().await.unwrap();
144            let mut query = Query {
145                from_block: current_block_height,
146                blocks: vec![BlockSelection::default()],
147                field_selection: FieldSelection {
148                    block: all_block_fields,
149                    ..Default::default()
150                },
151                ..Default::default()
152            };
153
154            loop {
155                let response = client.get(&query).await.unwrap();
156                for batch in response.data.blocks {
157                    for received_block in batch {
158                        let mut block = transform_hypersync_block(received_block).unwrap();
159                        block.set_chain(chain.as_ref().clone());
160                        let msg = BlockchainMessage::Block(block);
161                        if let Err(e) = tx.send(msg) {
162                            log::error!("Error sending message: {e}");
163                        }
164                    }
165                }
166
167                if let Some(archive_block_height) = response.archive_height {
168                    if archive_block_height < response.next_block {
169                        while client.get_height().await.unwrap() < response.next_block {
170                            tokio::time::sleep(std::time::Duration::from_millis(
171                                BLOCK_POLLING_INTERVAL_MS,
172                            ))
173                            .await;
174                        }
175                    }
176                }
177
178                query.from_block = response.next_block;
179            }
180        });
181        self.blocks_subscription_task = Some(task);
182    }
183
184    /// Unsubscribes to the new blocks by stopping the background watch task.
185    pub fn unsubscribe_blocks(&mut self) {
186        if let Some(task) = self.blocks_subscription_task.take() {
187            task.abort();
188        }
189    }
190}