nautilus_blockchain/hypersync/
client.rs1use std::{collections::BTreeSet, sync::Arc};
17
18use alloy::primitives::keccak256;
19use hypersync_client::{
20 Client, ClientConfig,
21 net_types::{BlockSelection, FieldSelection, Query},
22};
23use nautilus_model::defi::chain::SharedChain;
24use reqwest::Url;
25
26use crate::{
27 events::pool_created::PoolCreated, exchanges::extended::DexExtended,
28 hypersync::transform::transform_hypersync_block, rpc::types::BlockchainMessage,
29};
30
31const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
34
35#[derive(Debug)]
37pub struct HyperSyncClient {
38 chain: SharedChain,
40 client: Arc<Client>,
42 blocks_subscription_task: Option<tokio::task::JoinHandle<()>>,
44 tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
46}
47
48impl HyperSyncClient {
49 #[must_use]
55 pub fn new(
56 chain: SharedChain,
57 tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
58 ) -> Self {
59 let mut config = ClientConfig::default();
60 let hypersync_url =
61 Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
62 config.url = Some(hypersync_url);
63 let client = Client::new(config).unwrap();
64 Self {
65 chain,
66 client: Arc::new(client),
67 blocks_subscription_task: None,
68 tx,
69 }
70 }
71
72 pub async fn request_pool_created_events(
74 &mut self,
75 from_block: u32,
76 dex: &DexExtended,
77 ) -> Vec<PoolCreated> {
78 let factory_address = dex.factory.as_ref();
79 let pair_created_event = dex.pool_created_event.as_ref();
80 log::info!(
81 "Requesting pair created events from Hypersync emitted by factory {factory_address} and from block {from_block}"
82 );
83 let event_hash = keccak256(pair_created_event.as_bytes());
84 let topic0 = format!("0x{}", hex::encode(event_hash));
85
86 let query = serde_json::from_value(serde_json::json!({
87 "from_block": from_block,
88 "logs": [{
89 "topic": [
90 topic0
91 ],
92 "address": [
93 factory_address,
94 ]
95 }],
96 "field_selection": {
97 "log": [
98 "block_number",
99 "data",
100 "topic0",
101 "topic1",
102 "topic2",
103 "topic3",
104 ]
105 }}))
106 .unwrap();
107
108 let mut rx = self
109 .client
110 .clone()
111 .stream(query, Default::default())
112 .await
113 .unwrap();
114 let mut result = Vec::new();
115 while let Some(response) = rx.recv().await {
116 let response = response.unwrap();
117 for batch in response.data.logs {
118 for log in batch {
119 let pool = dex.parse_pool_created_event(log).unwrap();
120 result.push(pool);
121 }
122 }
123 }
124 result
125 }
126
127 pub fn disconnect(&mut self) {
129 self.unsubscribe_blocks();
130 }
131
132 pub fn subscribe_blocks(&mut self) {
134 let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
135 .fields
136 .iter()
137 .map(|x| x.name.clone())
138 .collect();
139 let client = self.client.clone();
140 let tx = self.tx.clone();
141 let chain = self.chain.clone();
142 let task = tokio::spawn(async move {
143 let current_block_height = client.get_height().await.unwrap();
144 let mut query = Query {
145 from_block: current_block_height,
146 blocks: vec![BlockSelection::default()],
147 field_selection: FieldSelection {
148 block: all_block_fields,
149 ..Default::default()
150 },
151 ..Default::default()
152 };
153
154 loop {
155 let response = client.get(&query).await.unwrap();
156 for batch in response.data.blocks {
157 for received_block in batch {
158 let mut block = transform_hypersync_block(received_block).unwrap();
159 block.set_chain(chain.as_ref().clone());
160 let msg = BlockchainMessage::Block(block);
161 if let Err(e) = tx.send(msg) {
162 log::error!("Error sending message: {e}");
163 }
164 }
165 }
166
167 if let Some(archive_block_height) = response.archive_height {
168 if archive_block_height < response.next_block {
169 while client.get_height().await.unwrap() < response.next_block {
170 tokio::time::sleep(std::time::Duration::from_millis(
171 BLOCK_POLLING_INTERVAL_MS,
172 ))
173 .await;
174 }
175 }
176 }
177
178 query.from_block = response.next_block;
179 }
180 });
181 self.blocks_subscription_task = Some(task);
182 }
183
184 pub fn unsubscribe_blocks(&mut self) {
186 if let Some(task) = self.blocks_subscription_task.take() {
187 task.abort();
188 }
189 }
190}