nautilus_blockchain/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, sync::Arc};
17
18use alloy::primitives::U256;
19use futures_util::StreamExt;
20use nautilus_common::{
21    messages::{
22        DataEvent,
23        defi::{
24            DefiDataCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks,
25            SubscribePool, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
26            UnsubscribePool, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
27        },
28    },
29    runner::get_data_event_sender,
30};
31use nautilus_core::UnixNanos;
32use posei_trader::client::DataClient;
33use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
34use nautilus_model::{
35    defi::{
36        Block, Blockchain, DefiData, Dex, Pool, PoolLiquidityUpdate, PoolLiquidityUpdateType,
37        PoolSwap, SharedChain, SharedPool, Token,
38    },
39    identifiers::{ClientId, Venue},
40};
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42
43use crate::{
44    cache::BlockchainCache,
45    config::BlockchainDataClientConfig,
46    contracts::erc20::Erc20Contract,
47    decode::u256_to_quantity,
48    events::pool_created::PoolCreatedEvent,
49    exchanges::extended::DexExtended,
50    hypersync::client::HyperSyncClient,
51    rpc::{
52        BlockchainRpcClient, BlockchainRpcClientAny,
53        chains::{
54            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
55            polygon::PolygonRpcClient,
56        },
57        http::BlockchainHttpRpcClient,
58        types::BlockchainMessage,
59    },
60    validation::validate_address,
61};
62
63/// A comprehensive client for interacting with blockchain data from multiple sources.
64///
65/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
66/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
67/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
68///
69/// This client supports two primary data sources:
70/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
71/// 2. HyperSync API for efficient historical data queries.
72#[derive(Debug)]
73pub struct BlockchainDataClient {
74    /// The blockchain being targeted by this client instance.
75    pub chain: SharedChain,
76    /// The configuration for the data client.
77    pub config: BlockchainDataClientConfig,
78    /// Local cache for blockchain entities.
79    cache: BlockchainCache,
80    /// Optional WebSocket RPC client for direct blockchain node communication.
81    rpc_client: Option<BlockchainRpcClientAny>,
82    /// Interface for interacting with ERC20 token contracts.
83    tokens: Erc20Contract,
84    /// Client for the HyperSync data indexing service.
85    hypersync_client: HyperSyncClient,
86    /// Channel receiver for messages from the HyperSync client.
87    hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
88    /// Channel sender for publishing data events to the `AsyncRunner`.
89    data_sender: UnboundedSender<DataEvent>,
90    /// Channel sender for commands to be processed asynchronously.
91    command_tx: UnboundedSender<DefiDataCommand>,
92    /// Channel receiver for commands to be processed asynchronously.
93    command_rx: Option<UnboundedReceiver<DefiDataCommand>>,
94    /// Background task for processing messages.
95    process_task: Option<tokio::task::JoinHandle<()>>,
96}
97
98impl BlockchainDataClient {
99    /// Creates a new [`BlockchainDataClient`] instance for the specified configuration.
100    ///
101    /// # Panics
102    ///
103    /// Panics if `use_hypersync_for_live_data` is false and `wss_rpc_url` is `None` in the provided config.
104    #[must_use]
105    pub fn new(config: BlockchainDataClientConfig) -> Self {
106        let chain = config.chain.clone();
107        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
108            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
109            Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
110        } else {
111            None
112        };
113        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
114        let hypersync_client = HyperSyncClient::new(chain.clone(), hypersync_tx);
115        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
116            config.http_rpc_url.clone(),
117            config.rpc_requests_per_second,
118        ));
119        let erc20_contract = Erc20Contract::new(http_rpc_client);
120        let cache = BlockchainCache::new(chain.clone());
121        let data_sender = get_data_event_sender();
122        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
123
124        Self {
125            chain,
126            config,
127            cache,
128            rpc_client,
129            tokens: erc20_contract,
130            hypersync_client,
131            hypersync_rx: Some(hypersync_rx),
132            data_sender,
133            command_tx,
134            command_rx: Some(command_rx),
135            process_task: None,
136        }
137    }
138
139    /// Creates an appropriate blockchain RPC client for the specified blockchain.
140    fn initialize_rpc_client(
141        blockchain: Blockchain,
142        wss_rpc_url: String,
143    ) -> BlockchainRpcClientAny {
144        match blockchain {
145            Blockchain::Ethereum => {
146                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
147            }
148            Blockchain::Polygon => {
149                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
150            }
151            Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
152            Blockchain::Arbitrum => {
153                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
154            }
155            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
156        }
157    }
158
159    /// Initializes the database connection for the blockchain cache.
160    pub async fn initialize_cache_database(
161        &mut self,
162        pg_connect_options: Option<PostgresConnectOptions>,
163    ) {
164        let pg_connect_options = pg_connect_options.unwrap_or_default();
165        tracing::info!(
166            "Initializing blockchain cache on database '{}'",
167            pg_connect_options.database
168        );
169        self.cache
170            .initialize_database(pg_connect_options.into())
171            .await;
172    }
173
174    /// Spawns a unified task that handles both commands and data from the same client instances.
175    /// This replaces both the command processor and hypersync forwarder with a single unified handler.
176    fn spawn_process_task(&mut self) {
177        let command_rx = if let Some(r) = self.command_rx.take() {
178            r
179        } else {
180            tracing::error!("Command receiver already taken, not spawning handler");
181            return;
182        };
183
184        let hypersync_rx = if let Some(r) = self.hypersync_rx.take() {
185            r
186        } else {
187            tracing::error!("HyperSync receiver already taken, not spawning handler");
188            return;
189        };
190
191        let mut hypersync_client = std::mem::replace(
192            &mut self.hypersync_client,
193            HyperSyncClient::new(self.chain.clone(), tokio::sync::mpsc::unbounded_channel().0),
194        );
195        let mut rpc_client = self.rpc_client.take();
196        let data_sender = self.data_sender.clone();
197
198        let handle = tokio::spawn(async move {
199            tracing::debug!("Started task 'process'");
200
201            let mut command_rx = command_rx;
202            let mut hypersync_rx = hypersync_rx;
203
204            loop {
205                tokio::select! {
206                    command = command_rx.recv() => {
207                        if let Some(cmd) = command {
208                            if let Err(e) = Self::process_command(
209                                cmd,
210                                &mut hypersync_client,
211                                rpc_client.as_mut()
212                            ).await {
213                                tracing::error!("Error processing command: {e}");
214                            }
215                        } else {
216                            tracing::debug!("Command channel closed");
217                            break;
218                        }
219                    }
220                    data = hypersync_rx.recv() => {
221                        if let Some(msg) = data {
222                            let data_event = match msg {
223                                BlockchainMessage::Block(block) => {
224                                    DataEvent::DeFi(DefiData::Block(block))
225                                }
226                                BlockchainMessage::Swap(swap) => {
227                                    DataEvent::DeFi(DefiData::PoolSwap(swap))
228                                }
229                            };
230
231                            if let Err(e) = data_sender.send(data_event) {
232                                tracing::error!("Failed to send data event: {e}");
233                                break;
234                            }
235                        } else {
236                            tracing::debug!("HyperSync data channel closed");
237                            break;
238                        }
239                    }
240                }
241            }
242
243            tracing::debug!("Stopped task 'process'");
244        });
245
246        self.process_task = Some(handle);
247    }
248
249    async fn process_command(
250        command: DefiDataCommand,
251        hypersync_client: &mut HyperSyncClient,
252        rpc_client: Option<&mut BlockchainRpcClientAny>,
253    ) -> anyhow::Result<()> {
254        match command {
255            DefiDataCommand::Subscribe(cmd) => {
256                Self::handle_subscribe_command(cmd, hypersync_client, rpc_client).await
257            }
258            DefiDataCommand::Unsubscribe(cmd) => {
259                Self::handle_unsubscribe_command(cmd, hypersync_client, rpc_client).await
260            }
261        }
262    }
263
264    /// Handles DeFi subscribe commands with access to mutable client instances.
265    async fn handle_subscribe_command(
266        command: DefiSubscribeCommand,
267        hypersync_client: &mut HyperSyncClient,
268        mut rpc_client: Option<&mut BlockchainRpcClientAny>,
269    ) -> anyhow::Result<()> {
270        match command {
271            DefiSubscribeCommand::Blocks(_cmd) => {
272                tracing::info!("Processing subscribe blocks command");
273
274                // Try RPC client first if available, otherwise use HyperSync
275                if let Some(ref mut rpc) = rpc_client {
276                    if let Err(e) = rpc.subscribe_blocks().await {
277                        tracing::warn!(
278                            "RPC blocks subscription failed: {e}, falling back to HyperSync"
279                        );
280                        hypersync_client.subscribe_blocks();
281                    } else {
282                        tracing::info!("Successfully subscribed to blocks via RPC");
283                    }
284                } else {
285                    tracing::info!("Subscribing to blocks via HyperSync");
286                    hypersync_client.subscribe_blocks();
287                }
288
289                Ok(())
290            }
291            DefiSubscribeCommand::Pool(_cmd) => {
292                tracing::info!("Processing subscribe pool command");
293                // Pool subscriptions are typically handled at the application level
294                // as they involve specific pool addresses and don't require blockchain streaming
295                tracing::warn!("Pool subscriptions are handled at application level");
296                Ok(())
297            }
298            DefiSubscribeCommand::PoolSwaps(cmd) => {
299                tracing::info!(
300                    "Processing subscribe pool swaps command for address: {}",
301                    cmd.address
302                );
303
304                if let Some(ref mut _rpc) = rpc_client {
305                    tracing::warn!(
306                        "RPC pool swaps subscription not yet implemented, using HyperSync"
307                    );
308                }
309
310                hypersync_client.subscribe_pool_swaps(cmd.address);
311                tracing::info!("Subscribed to pool swaps for address: {}", cmd.address);
312
313                Ok(())
314            }
315            DefiSubscribeCommand::PoolLiquidityUpdates(_cmd) => {
316                tracing::info!("Processing subscribe pool liquidity updates command");
317                tracing::warn!("Pool liquidity updates subscription not yet implemented");
318                // TODO: Implement actual pool liquidity updates subscription logic
319                Ok(())
320            }
321        }
322    }
323
324    /// Handles DeFi unsubscribe commands with access to mutable client instances.
325    async fn handle_unsubscribe_command(
326        command: DefiUnsubscribeCommand,
327        hypersync_client: &mut HyperSyncClient,
328        rpc_client: Option<&mut BlockchainRpcClientAny>,
329    ) -> anyhow::Result<()> {
330        match command {
331            DefiUnsubscribeCommand::Blocks(_cmd) => {
332                tracing::info!("Processing unsubscribe blocks command");
333
334                // TODO: Implement RPC unsubscription when available
335                if rpc_client.is_some() {
336                    tracing::warn!("RPC blocks unsubscription not yet implemented");
337                }
338
339                // Use HyperSync client for unsubscription
340                hypersync_client.unsubscribe_blocks();
341                tracing::info!("Unsubscribed from blocks via HyperSync");
342
343                Ok(())
344            }
345            DefiUnsubscribeCommand::Pool(_cmd) => {
346                tracing::info!("Processing unsubscribe pool command");
347                // Pool unsubscriptions are typically handled at the application level
348                tracing::warn!("Pool unsubscriptions are handled at application level");
349                Ok(())
350            }
351            DefiUnsubscribeCommand::PoolSwaps(_cmd) => {
352                tracing::info!("Processing unsubscribe pool swaps command");
353                tracing::warn!("Pool swaps unsubscription not yet implemented");
354                // TODO: Implement pool swaps unsubscription logic
355                Ok(())
356            }
357            DefiUnsubscribeCommand::PoolLiquidityUpdates(_cmd) => {
358                tracing::info!("Processing unsubscribe pool liquidity updates command");
359                tracing::warn!("Pool liquidity updates unsubscription not yet implemented");
360                // TODO: Implement pool liquidity updates unsubscription logic
361                Ok(())
362            }
363        }
364    }
365
366    /// Synchronizes blockchain data by fetching and caching all blocks from the starting block to the current chain head.
367    pub async fn sync_blocks(&mut self, from_block: Option<u64>) -> anyhow::Result<()> {
368        let from_block = if let Some(b) = from_block {
369            b
370        } else {
371            tracing::warn!("Skipping blocks sync: `from_block` not supplied");
372            return Ok(());
373        };
374
375        let from_block = match self.cache.last_cached_block_number() {
376            None => from_block,
377            Some(cached_block_number) => max(from_block, cached_block_number + 1),
378        };
379
380        let current_block = self.hypersync_client.current_block().await;
381        tracing::info!("Syncing blocks from {from_block} to {current_block}");
382
383        let blocks_stream = self
384            .hypersync_client
385            .request_blocks_stream(from_block, Some(current_block))
386            .await;
387
388        tokio::pin!(blocks_stream);
389
390        while let Some(block) = blocks_stream.next().await {
391            self.cache.add_block(block).await?;
392        }
393
394        tracing::info!("Finished syncing blocks");
395        Ok(())
396    }
397
398    /// Fetches and caches all swap events for a specific liquidity pool within the given block range.
399    pub async fn sync_pool_swaps(
400        &mut self,
401        dex_id: &str,
402        pool_address: String,
403        from_block: Option<u64>,
404        to_block: Option<u64>,
405    ) -> anyhow::Result<()> {
406        let dex_extended = self.get_dex(dex_id)?.clone();
407        let pool = self.get_pool(&pool_address)?;
408        let from_block =
409            from_block.map_or(pool.creation_block, |block| max(block, pool.creation_block));
410        tracing::info!(
411            "Syncing pool swaps for {} on Dex {} from block {}{}",
412            pool.ticker(),
413            dex_extended.name,
414            from_block,
415            to_block.map_or(String::new(), |block| format!(" to {block}"))
416        );
417        let swap_event_signature = dex_extended.swap_created_event.as_ref();
418        let stream = self
419            .hypersync_client
420            .request_contract_events_stream(
421                from_block,
422                to_block,
423                &pool.address.to_string(),
424                swap_event_signature,
425                Vec::new(),
426            )
427            .await;
428
429        tokio::pin!(stream);
430
431        while let Some(log) = stream.next().await {
432            let swap_event = dex_extended.parse_swap_event(log)?;
433            let Some(timestamp) = self.cache.get_block_timestamp(swap_event.block_number) else {
434                tracing::error!(
435                    "Missing block timestamp in the cache for block {} while processing swap event",
436                    swap_event.block_number
437                );
438                continue;
439            };
440            let (side, size, price) = dex_extended
441                .convert_to_trade_data(&pool.token0, &pool.token1, &swap_event)
442                .expect("Failed to convert swap event to trade data");
443
444            let swap = PoolSwap::new(
445                self.chain.clone(),
446                dex_extended.dex.clone(),
447                pool.clone(),
448                swap_event.block_number,
449                swap_event.transaction_hash,
450                swap_event.transaction_index,
451                swap_event.log_index,
452                *timestamp,
453                swap_event.sender,
454                side,
455                size,
456                price,
457            );
458
459            self.cache.add_pool_swap(&swap).await?;
460
461            self.send_swap(swap);
462        }
463
464        tracing::info!("Finished syncing pool swaps");
465        Ok(())
466    }
467
468    /// Fetches and caches all mint events for a specific liquidity pool within the given block range.
469    pub async fn sync_pool_mints(
470        &self,
471        dex_id: &str,
472        pool_address: String,
473        from_block: Option<u64>,
474        to_block: Option<u64>,
475    ) -> anyhow::Result<()> {
476        let dex_extended = self.get_dex(dex_id)?.clone();
477        let pool = self.get_pool(&pool_address)?.clone();
478        let from_block =
479            from_block.map_or(pool.creation_block, |block| max(block, pool.creation_block));
480        tracing::info!(
481            "Syncing pool mints for {} on Dex {} from block {from_block}{}",
482            pool.ticker(),
483            dex_extended.name,
484            to_block.map_or(String::new(), |block| format!(" to {block}"))
485        );
486        let mint_event_signature = dex_extended.mint_created_event.as_ref();
487        let stream = self
488            .hypersync_client
489            .request_contract_events_stream(
490                from_block,
491                to_block,
492                &pool.address.to_string(),
493                mint_event_signature,
494                Vec::new(),
495            )
496            .await;
497
498        tokio::pin!(stream);
499
500        while let Some(log) = stream.next().await {
501            let mint_event = dex_extended.parse_mint_event(log)?;
502            let Some(timestamp) = self.cache.get_block_timestamp(mint_event.block_number) else {
503                tracing::error!(
504                    "Missing block timestamp in the cache for block {} while processing mint event",
505                    mint_event.block_number
506                );
507                continue;
508            };
509            let liquidity = u256_to_quantity(
510                U256::from(mint_event.amount),
511                self.chain.native_currency_decimals,
512            )?;
513            let amount0 = u256_to_quantity(mint_event.amount0, pool.token0.decimals)?;
514            let amount1 = u256_to_quantity(mint_event.amount1, pool.token1.decimals)?;
515
516            let liquidity_update = PoolLiquidityUpdate::new(
517                self.chain.clone(),
518                dex_extended.dex.clone(),
519                pool.clone(),
520                PoolLiquidityUpdateType::Mint,
521                mint_event.block_number,
522                mint_event.transaction_hash,
523                mint_event.transaction_index,
524                mint_event.log_index,
525                Some(mint_event.sender),
526                mint_event.owner,
527                liquidity,
528                amount0,
529                amount1,
530                mint_event.tick_lower,
531                mint_event.tick_upper,
532                *timestamp,
533            );
534
535            self.cache.add_liquidity_update(&liquidity_update).await?;
536
537            self.send_liquidity_update(liquidity_update);
538        }
539
540        tracing::info!("Finished syncing pool mints");
541        Ok(())
542    }
543
544    /// Fetches and caches all burn events for a specific liquidity pool within the given block range.
545    pub async fn sync_pool_burns(
546        &self,
547        dex_id: &str,
548        pool_address: String,
549        from_block: Option<u64>,
550        to_block: Option<u64>,
551    ) -> anyhow::Result<()> {
552        let dex_extended = self.get_dex(dex_id)?.clone();
553        let pool = self.get_pool(&pool_address)?.clone();
554        let from_block =
555            from_block.map_or(pool.creation_block, |block| max(block, pool.creation_block));
556        tracing::info!(
557            "Syncing pool burns for {} on Dex {} from block {from_block}{}",
558            pool.ticker(),
559            dex_extended.name,
560            to_block.map_or(String::new(), |block| format!(" to {block}"))
561        );
562        let burn_event_signature = dex_extended.burn_created_event.as_ref();
563        let stream = self
564            .hypersync_client
565            .request_contract_events_stream(
566                from_block,
567                to_block,
568                &pool.address.to_string(),
569                burn_event_signature,
570                Vec::new(),
571            )
572            .await;
573
574        tokio::pin!(stream);
575
576        while let Some(log) = stream.next().await {
577            let burn_event = dex_extended.parse_burn_event(log)?;
578            let Some(timestamp) = self.cache.get_block_timestamp(burn_event.block_number) else {
579                tracing::error!(
580                    "Missing block timestamp in the cache for block {} while processing burn event",
581                    burn_event.block_number
582                );
583                continue;
584            };
585            let liquidity = u256_to_quantity(
586                U256::from(burn_event.amount),
587                self.chain.native_currency_decimals,
588            )?;
589            let amount0 = u256_to_quantity(burn_event.amount0, pool.token0.decimals)?;
590            let amount1 = u256_to_quantity(burn_event.amount1, pool.token1.decimals)?;
591
592            let liquidity_update = PoolLiquidityUpdate::new(
593                self.chain.clone(),
594                dex_extended.dex.clone(),
595                pool.clone(),
596                PoolLiquidityUpdateType::Burn,
597                burn_event.block_number,
598                burn_event.transaction_hash,
599                burn_event.transaction_index,
600                burn_event.log_index,
601                None,
602                burn_event.owner,
603                liquidity,
604                amount0,
605                amount1,
606                burn_event.tick_lower,
607                burn_event.tick_upper,
608                *timestamp,
609            );
610
611            self.cache.add_liquidity_update(&liquidity_update).await?;
612
613            self.send_liquidity_update(liquidity_update);
614        }
615
616        tracing::info!("Finished syncing pool burns");
617        Ok(())
618    }
619
620    /// Synchronizes token and pool data for a specific DEX from the specified block.
621    pub async fn sync_exchange_pools(
622        &mut self,
623        dex_id: &str,
624        from_block: Option<u64>,
625        to_block: Option<u64>,
626    ) -> anyhow::Result<()> {
627        let from_block = from_block.unwrap_or(0);
628        tracing::info!(
629            "Syncing Dex exchange pools for {dex_id} from block {from_block}{}",
630            to_block.map_or(String::new(), |block| format!(" to {block}"))
631        );
632
633        let dex = self.get_dex(dex_id)?.clone();
634        let factory_address = dex.factory.as_ref();
635        let pair_created_event_signature = dex.pool_created_event.as_ref();
636        let pools_stream = self
637            .hypersync_client
638            .request_contract_events_stream(
639                from_block,
640                to_block,
641                factory_address,
642                pair_created_event_signature,
643                Vec::new(),
644            )
645            .await;
646
647        tokio::pin!(pools_stream);
648
649        while let Some(log) = pools_stream.next().await {
650            let pool = dex.parse_pool_created_event(log)?;
651            self.process_token(pool.token0.to_string()).await?;
652            self.process_token(pool.token1.to_string()).await?;
653            self.process_pool(&dex.dex, pool).await?;
654        }
655        Ok(())
656    }
657
658    /// Processes a token by address, fetching and caching its metadata if not already cached.
659    ///
660    /// # Errors
661    ///
662    /// Returns an error if fetching token info or adding to cache fails.
663    pub async fn process_token(&mut self, token_address: String) -> anyhow::Result<()> {
664        let token_address = validate_address(&token_address)?;
665
666        if self.cache.get_token(&token_address).is_none() {
667            let token_info = self.tokens.fetch_token_info(&token_address).await?;
668            let token = Token::new(
669                self.chain.clone(),
670                token_address,
671                token_info.name,
672                token_info.symbol,
673                token_info.decimals,
674            );
675            tracing::info!("Saving fetched token {token} in the cache");
676            self.cache.add_token(token).await?;
677        }
678
679        Ok(())
680    }
681
682    /// Processes a pool creation event by creating and caching a `Pool` entity.
683    async fn process_pool(&mut self, dex: &Dex, event: PoolCreatedEvent) -> anyhow::Result<()> {
684        let pool = Pool::new(
685            self.chain.clone(),
686            dex.clone(),
687            event.pool_address,
688            event.block_number,
689            self.cache.get_token(&event.token0).cloned().unwrap(),
690            self.cache.get_token(&event.token1).cloned().unwrap(),
691            event.fee,
692            event.tick_spacing,
693            UnixNanos::default(), // TODO: Use default timestamp for now
694        );
695        self.cache.add_pool(pool.clone()).await?;
696
697        Ok(())
698    }
699
700    /// Registers a decentralized exchange with the client.
701    pub async fn register_exchange(&mut self, dex: DexExtended) -> anyhow::Result<()> {
702        let dex_id = dex.id();
703        tracing::info!("Registering blockchain exchange {dex_id}");
704        self.cache.add_dex(dex_id, dex).await?;
705        Ok(())
706    }
707
708    /// Processes incoming messages from the HyperSync client.
709    pub async fn process_hypersync_messages(&mut self) {
710        tracing::info!("Starting task 'process_hypersync_messages'");
711
712        let mut rx = if let Some(r) = self.hypersync_rx.take() {
713            r
714        } else {
715            tracing::warn!("HyperSync receiver already taken, not spawning forwarder");
716            return;
717        };
718
719        while let Some(msg) = rx.recv().await {
720            match msg {
721                BlockchainMessage::Block(block) => {
722                    self.send_block(block);
723                }
724                BlockchainMessage::Swap(swap) => {
725                    self.send_swap(swap);
726                }
727            }
728        }
729    }
730
731    /// Processes incoming messages from the RPC client.
732    pub async fn process_rpc_messages(&mut self) {
733        tracing::info!("Starting task 'process_rpc_messages'");
734
735        loop {
736            let msg = {
737                match self
738                    .rpc_client
739                    .as_mut()
740                    .expect("process_rpc_messages: RPC client not initialised")
741                    .next_rpc_message()
742                    .await
743                {
744                    Ok(m) => m,
745                    Err(e) => {
746                        tracing::error!("Error processing RPC message: {e}");
747                        continue;
748                    }
749                }
750            };
751
752            match msg {
753                BlockchainMessage::Block(block) => self.send_block(block),
754                BlockchainMessage::Swap(swap) => self.send_swap(swap),
755            }
756        }
757    }
758
759    /// Subscribes to new blockchain blocks from the available data source.
760    pub async fn subscribe_blocks_async(&mut self) -> anyhow::Result<()> {
761        if let Some(rpc_client) = self.rpc_client.as_mut() {
762            rpc_client.subscribe_blocks().await?;
763        } else {
764            self.hypersync_client.subscribe_blocks();
765        }
766
767        Ok(())
768    }
769
770    /// Subscribes to new blockchain blocks from the available data source.
771    pub async fn subscribe_pool_swaps_async(&mut self) -> anyhow::Result<()> {
772        if let Some(rpc_client) = self.rpc_client.as_mut() {
773            rpc_client.subscribe_swaps().await?;
774        } else {
775            todo!("Not implemented")
776            // self.hypersync_client.subscribe_swaps();
777        }
778
779        Ok(())
780    }
781
782    /// Unsubscribes from block events.
783    pub async fn unsubscribe_blocks_async(&mut self) -> anyhow::Result<()> {
784        if let Some(_rpc_client) = self.rpc_client.as_mut() {
785            todo!("Not implemented");
786            // rpc_client.unsubscribe_blocks().await?;
787        } else {
788            self.hypersync_client.unsubscribe_blocks();
789        }
790
791        Ok(())
792    }
793
794    /// Unsubscribes from swap events.
795    pub async fn unsubscribe_pool_swaps_async(&mut self) -> anyhow::Result<()> {
796        if let Some(_rpc_client) = self.rpc_client.as_mut() {
797            todo!("Not implemented");
798            // rpc_client.unsubscribe_blocks().await?;
799        } else {
800            self.hypersync_client.unsubscribe_blocks();
801        }
802
803        Ok(())
804    }
805
806    fn get_dex(&self, dex_id: &str) -> anyhow::Result<&DexExtended> {
807        match self.cache.get_dex(dex_id) {
808            Some(dex) => Ok(dex),
809            None => anyhow::bail!("Dex {dex_id} is not registered"),
810        }
811    }
812
813    fn get_pool(&self, pool_address: &str) -> anyhow::Result<&SharedPool> {
814        let pool_address = validate_address(pool_address)?;
815        match self.cache.get_pool(&pool_address) {
816            Some(pool) => Ok(pool),
817            None => anyhow::bail!("Pool {pool_address} is not registered"),
818        }
819    }
820
821    fn send_block(&self, block: Block) {
822        let data = DataEvent::DeFi(DefiData::Block(block));
823        self.send_data(data);
824    }
825
826    fn send_swap(&self, swap: PoolSwap) {
827        let data = DataEvent::DeFi(DefiData::PoolSwap(swap));
828        self.send_data(data);
829    }
830
831    fn send_liquidity_update(&self, update: PoolLiquidityUpdate) {
832        let data = DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update));
833        self.send_data(data);
834    }
835
836    fn send_data(&self, data: DataEvent) {
837        tracing::debug!("Sending {data}");
838
839        if let Err(e) = self.data_sender.send(data) {
840            tracing::error!("Failed to send data: {e}");
841        }
842    }
843}
844
845#[async_trait::async_trait]
846impl DataClient for BlockchainDataClient {
847    fn client_id(&self) -> ClientId {
848        ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
849    }
850
851    fn venue(&self) -> Option<Venue> {
852        // Blockchain data clients don't map to a single venue since they can provide
853        // data for multiple DEXs across the blockchain
854        None
855    }
856
857    fn start(&mut self) -> anyhow::Result<()> {
858        tracing::info!("Starting blockchain data client for '{}'", self.chain.name);
859        Ok(())
860    }
861
862    fn stop(&mut self) -> anyhow::Result<()> {
863        tracing::info!("Stopping blockchain data client for '{}'", self.chain.name);
864        Ok(())
865    }
866
867    fn reset(&mut self) -> anyhow::Result<()> {
868        tracing::info!("Resetting blockchain data client for '{}'", self.chain.name);
869        Ok(())
870    }
871
872    fn dispose(&mut self) -> anyhow::Result<()> {
873        tracing::info!("Disposing blockchain data client for '{}'", self.chain.name);
874        Ok(())
875    }
876
877    async fn connect(&mut self) -> anyhow::Result<()> {
878        tracing::info!(
879            "Connecting blockchain data client for '{}'",
880            self.chain.name
881        );
882
883        if let Some(ref mut rpc_client) = self.rpc_client {
884            rpc_client.connect().await?;
885        }
886
887        let from_block = self.config.from_block.unwrap_or(0);
888        self.cache.connect(from_block).await?;
889        self.sync_blocks(self.config.from_block).await?;
890        // self.subscribe_blocks().await?;
891
892        if self.process_task.is_none() {
893            self.spawn_process_task();
894        }
895
896        Ok(())
897    }
898
899    async fn disconnect(&mut self) -> anyhow::Result<()> {
900        tracing::info!(
901            "Disconnecting blockchain data client for '{}'",
902            self.chain.name
903        );
904
905        if let Some(handle) = self.process_task.take() {
906            tracing::debug!("Aborting task 'process'");
907            handle.abort();
908        }
909
910        Ok(())
911    }
912
913    fn is_connected(&self) -> bool {
914        // TODO: Improve connection detection
915        // For now, we'll assume connected if we have either RPC or HyperSync configured
916        self.rpc_client.is_some() || true // HyperSync is always available
917    }
918
919    fn is_disconnected(&self) -> bool {
920        !self.is_connected()
921    }
922
923    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
924        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
925        self.command_tx.send(command)?;
926        Ok(())
927    }
928
929    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
930        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
931        self.command_tx.send(command)?;
932        Ok(())
933    }
934
935    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
936        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
937        self.command_tx.send(command)?;
938        Ok(())
939    }
940
941    fn subscribe_pool_liquidity_updates(
942        &mut self,
943        cmd: &SubscribePoolLiquidityUpdates,
944    ) -> anyhow::Result<()> {
945        let command =
946            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
947        self.command_tx.send(command)?;
948        Ok(())
949    }
950
951    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
952        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
953        self.command_tx.send(command)?;
954        Ok(())
955    }
956
957    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
958        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
959        self.command_tx.send(command)?;
960        Ok(())
961    }
962
963    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
964        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
965        self.command_tx.send(command)?;
966        Ok(())
967    }
968
969    fn unsubscribe_pool_liquidity_updates(
970        &mut self,
971        cmd: &UnsubscribePoolLiquidityUpdates,
972    ) -> anyhow::Result<()> {
973        let command =
974            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
975        self.command_tx.send(command)?;
976        Ok(())
977    }
978}