1use std::{cmp::max, sync::Arc};
17
18use alloy::primitives::U256;
19use futures_util::StreamExt;
20use nautilus_common::{
21 messages::{
22 DataEvent,
23 defi::{
24 DefiDataCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks,
25 SubscribePool, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
26 UnsubscribePool, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
27 },
28 },
29 runner::get_data_event_sender,
30};
31use nautilus_core::UnixNanos;
32use posei_trader::client::DataClient;
33use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
34use nautilus_model::{
35 defi::{
36 Block, Blockchain, DefiData, Dex, Pool, PoolLiquidityUpdate, PoolLiquidityUpdateType,
37 PoolSwap, SharedChain, SharedPool, Token,
38 },
39 identifiers::{ClientId, Venue},
40};
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42
43use crate::{
44 cache::BlockchainCache,
45 config::BlockchainDataClientConfig,
46 contracts::erc20::Erc20Contract,
47 decode::u256_to_quantity,
48 events::pool_created::PoolCreatedEvent,
49 exchanges::extended::DexExtended,
50 hypersync::client::HyperSyncClient,
51 rpc::{
52 BlockchainRpcClient, BlockchainRpcClientAny,
53 chains::{
54 arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
55 polygon::PolygonRpcClient,
56 },
57 http::BlockchainHttpRpcClient,
58 types::BlockchainMessage,
59 },
60 validation::validate_address,
61};
62
63#[derive(Debug)]
73pub struct BlockchainDataClient {
74 pub chain: SharedChain,
76 pub config: BlockchainDataClientConfig,
78 cache: BlockchainCache,
80 rpc_client: Option<BlockchainRpcClientAny>,
82 tokens: Erc20Contract,
84 hypersync_client: HyperSyncClient,
86 hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
88 data_sender: UnboundedSender<DataEvent>,
90 command_tx: UnboundedSender<DefiDataCommand>,
92 command_rx: Option<UnboundedReceiver<DefiDataCommand>>,
94 process_task: Option<tokio::task::JoinHandle<()>>,
96}
97
98impl BlockchainDataClient {
99 #[must_use]
105 pub fn new(config: BlockchainDataClientConfig) -> Self {
106 let chain = config.chain.clone();
107 let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
108 let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
109 Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
110 } else {
111 None
112 };
113 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
114 let hypersync_client = HyperSyncClient::new(chain.clone(), hypersync_tx);
115 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
116 config.http_rpc_url.clone(),
117 config.rpc_requests_per_second,
118 ));
119 let erc20_contract = Erc20Contract::new(http_rpc_client);
120 let cache = BlockchainCache::new(chain.clone());
121 let data_sender = get_data_event_sender();
122 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
123
124 Self {
125 chain,
126 config,
127 cache,
128 rpc_client,
129 tokens: erc20_contract,
130 hypersync_client,
131 hypersync_rx: Some(hypersync_rx),
132 data_sender,
133 command_tx,
134 command_rx: Some(command_rx),
135 process_task: None,
136 }
137 }
138
139 fn initialize_rpc_client(
141 blockchain: Blockchain,
142 wss_rpc_url: String,
143 ) -> BlockchainRpcClientAny {
144 match blockchain {
145 Blockchain::Ethereum => {
146 BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
147 }
148 Blockchain::Polygon => {
149 BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
150 }
151 Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
152 Blockchain::Arbitrum => {
153 BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
154 }
155 _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
156 }
157 }
158
159 pub async fn initialize_cache_database(
161 &mut self,
162 pg_connect_options: Option<PostgresConnectOptions>,
163 ) {
164 let pg_connect_options = pg_connect_options.unwrap_or_default();
165 tracing::info!(
166 "Initializing blockchain cache on database '{}'",
167 pg_connect_options.database
168 );
169 self.cache
170 .initialize_database(pg_connect_options.into())
171 .await;
172 }
173
174 fn spawn_process_task(&mut self) {
177 let command_rx = if let Some(r) = self.command_rx.take() {
178 r
179 } else {
180 tracing::error!("Command receiver already taken, not spawning handler");
181 return;
182 };
183
184 let hypersync_rx = if let Some(r) = self.hypersync_rx.take() {
185 r
186 } else {
187 tracing::error!("HyperSync receiver already taken, not spawning handler");
188 return;
189 };
190
191 let mut hypersync_client = std::mem::replace(
192 &mut self.hypersync_client,
193 HyperSyncClient::new(self.chain.clone(), tokio::sync::mpsc::unbounded_channel().0),
194 );
195 let mut rpc_client = self.rpc_client.take();
196 let data_sender = self.data_sender.clone();
197
198 let handle = tokio::spawn(async move {
199 tracing::debug!("Started task 'process'");
200
201 let mut command_rx = command_rx;
202 let mut hypersync_rx = hypersync_rx;
203
204 loop {
205 tokio::select! {
206 command = command_rx.recv() => {
207 if let Some(cmd) = command {
208 if let Err(e) = Self::process_command(
209 cmd,
210 &mut hypersync_client,
211 rpc_client.as_mut()
212 ).await {
213 tracing::error!("Error processing command: {e}");
214 }
215 } else {
216 tracing::debug!("Command channel closed");
217 break;
218 }
219 }
220 data = hypersync_rx.recv() => {
221 if let Some(msg) = data {
222 let data_event = match msg {
223 BlockchainMessage::Block(block) => {
224 DataEvent::DeFi(DefiData::Block(block))
225 }
226 BlockchainMessage::Swap(swap) => {
227 DataEvent::DeFi(DefiData::PoolSwap(swap))
228 }
229 };
230
231 if let Err(e) = data_sender.send(data_event) {
232 tracing::error!("Failed to send data event: {e}");
233 break;
234 }
235 } else {
236 tracing::debug!("HyperSync data channel closed");
237 break;
238 }
239 }
240 }
241 }
242
243 tracing::debug!("Stopped task 'process'");
244 });
245
246 self.process_task = Some(handle);
247 }
248
249 async fn process_command(
250 command: DefiDataCommand,
251 hypersync_client: &mut HyperSyncClient,
252 rpc_client: Option<&mut BlockchainRpcClientAny>,
253 ) -> anyhow::Result<()> {
254 match command {
255 DefiDataCommand::Subscribe(cmd) => {
256 Self::handle_subscribe_command(cmd, hypersync_client, rpc_client).await
257 }
258 DefiDataCommand::Unsubscribe(cmd) => {
259 Self::handle_unsubscribe_command(cmd, hypersync_client, rpc_client).await
260 }
261 }
262 }
263
264 async fn handle_subscribe_command(
266 command: DefiSubscribeCommand,
267 hypersync_client: &mut HyperSyncClient,
268 mut rpc_client: Option<&mut BlockchainRpcClientAny>,
269 ) -> anyhow::Result<()> {
270 match command {
271 DefiSubscribeCommand::Blocks(_cmd) => {
272 tracing::info!("Processing subscribe blocks command");
273
274 if let Some(ref mut rpc) = rpc_client {
276 if let Err(e) = rpc.subscribe_blocks().await {
277 tracing::warn!(
278 "RPC blocks subscription failed: {e}, falling back to HyperSync"
279 );
280 hypersync_client.subscribe_blocks();
281 } else {
282 tracing::info!("Successfully subscribed to blocks via RPC");
283 }
284 } else {
285 tracing::info!("Subscribing to blocks via HyperSync");
286 hypersync_client.subscribe_blocks();
287 }
288
289 Ok(())
290 }
291 DefiSubscribeCommand::Pool(_cmd) => {
292 tracing::info!("Processing subscribe pool command");
293 tracing::warn!("Pool subscriptions are handled at application level");
296 Ok(())
297 }
298 DefiSubscribeCommand::PoolSwaps(cmd) => {
299 tracing::info!(
300 "Processing subscribe pool swaps command for address: {}",
301 cmd.address
302 );
303
304 if let Some(ref mut _rpc) = rpc_client {
305 tracing::warn!(
306 "RPC pool swaps subscription not yet implemented, using HyperSync"
307 );
308 }
309
310 hypersync_client.subscribe_pool_swaps(cmd.address);
311 tracing::info!("Subscribed to pool swaps for address: {}", cmd.address);
312
313 Ok(())
314 }
315 DefiSubscribeCommand::PoolLiquidityUpdates(_cmd) => {
316 tracing::info!("Processing subscribe pool liquidity updates command");
317 tracing::warn!("Pool liquidity updates subscription not yet implemented");
318 Ok(())
320 }
321 }
322 }
323
324 async fn handle_unsubscribe_command(
326 command: DefiUnsubscribeCommand,
327 hypersync_client: &mut HyperSyncClient,
328 rpc_client: Option<&mut BlockchainRpcClientAny>,
329 ) -> anyhow::Result<()> {
330 match command {
331 DefiUnsubscribeCommand::Blocks(_cmd) => {
332 tracing::info!("Processing unsubscribe blocks command");
333
334 if rpc_client.is_some() {
336 tracing::warn!("RPC blocks unsubscription not yet implemented");
337 }
338
339 hypersync_client.unsubscribe_blocks();
341 tracing::info!("Unsubscribed from blocks via HyperSync");
342
343 Ok(())
344 }
345 DefiUnsubscribeCommand::Pool(_cmd) => {
346 tracing::info!("Processing unsubscribe pool command");
347 tracing::warn!("Pool unsubscriptions are handled at application level");
349 Ok(())
350 }
351 DefiUnsubscribeCommand::PoolSwaps(_cmd) => {
352 tracing::info!("Processing unsubscribe pool swaps command");
353 tracing::warn!("Pool swaps unsubscription not yet implemented");
354 Ok(())
356 }
357 DefiUnsubscribeCommand::PoolLiquidityUpdates(_cmd) => {
358 tracing::info!("Processing unsubscribe pool liquidity updates command");
359 tracing::warn!("Pool liquidity updates unsubscription not yet implemented");
360 Ok(())
362 }
363 }
364 }
365
366 pub async fn sync_blocks(&mut self, from_block: Option<u64>) -> anyhow::Result<()> {
368 let from_block = if let Some(b) = from_block {
369 b
370 } else {
371 tracing::warn!("Skipping blocks sync: `from_block` not supplied");
372 return Ok(());
373 };
374
375 let from_block = match self.cache.last_cached_block_number() {
376 None => from_block,
377 Some(cached_block_number) => max(from_block, cached_block_number + 1),
378 };
379
380 let current_block = self.hypersync_client.current_block().await;
381 tracing::info!("Syncing blocks from {from_block} to {current_block}");
382
383 let blocks_stream = self
384 .hypersync_client
385 .request_blocks_stream(from_block, Some(current_block))
386 .await;
387
388 tokio::pin!(blocks_stream);
389
390 while let Some(block) = blocks_stream.next().await {
391 self.cache.add_block(block).await?;
392 }
393
394 tracing::info!("Finished syncing blocks");
395 Ok(())
396 }
397
398 pub async fn sync_pool_swaps(
400 &mut self,
401 dex_id: &str,
402 pool_address: String,
403 from_block: Option<u64>,
404 to_block: Option<u64>,
405 ) -> anyhow::Result<()> {
406 let dex_extended = self.get_dex(dex_id)?.clone();
407 let pool = self.get_pool(&pool_address)?;
408 let from_block =
409 from_block.map_or(pool.creation_block, |block| max(block, pool.creation_block));
410 tracing::info!(
411 "Syncing pool swaps for {} on Dex {} from block {}{}",
412 pool.ticker(),
413 dex_extended.name,
414 from_block,
415 to_block.map_or(String::new(), |block| format!(" to {block}"))
416 );
417 let swap_event_signature = dex_extended.swap_created_event.as_ref();
418 let stream = self
419 .hypersync_client
420 .request_contract_events_stream(
421 from_block,
422 to_block,
423 &pool.address.to_string(),
424 swap_event_signature,
425 Vec::new(),
426 )
427 .await;
428
429 tokio::pin!(stream);
430
431 while let Some(log) = stream.next().await {
432 let swap_event = dex_extended.parse_swap_event(log)?;
433 let Some(timestamp) = self.cache.get_block_timestamp(swap_event.block_number) else {
434 tracing::error!(
435 "Missing block timestamp in the cache for block {} while processing swap event",
436 swap_event.block_number
437 );
438 continue;
439 };
440 let (side, size, price) = dex_extended
441 .convert_to_trade_data(&pool.token0, &pool.token1, &swap_event)
442 .expect("Failed to convert swap event to trade data");
443
444 let swap = PoolSwap::new(
445 self.chain.clone(),
446 dex_extended.dex.clone(),
447 pool.clone(),
448 swap_event.block_number,
449 swap_event.transaction_hash,
450 swap_event.transaction_index,
451 swap_event.log_index,
452 *timestamp,
453 swap_event.sender,
454 side,
455 size,
456 price,
457 );
458
459 self.cache.add_pool_swap(&swap).await?;
460
461 self.send_swap(swap);
462 }
463
464 tracing::info!("Finished syncing pool swaps");
465 Ok(())
466 }
467
468 pub async fn sync_pool_mints(
470 &self,
471 dex_id: &str,
472 pool_address: String,
473 from_block: Option<u64>,
474 to_block: Option<u64>,
475 ) -> anyhow::Result<()> {
476 let dex_extended = self.get_dex(dex_id)?.clone();
477 let pool = self.get_pool(&pool_address)?.clone();
478 let from_block =
479 from_block.map_or(pool.creation_block, |block| max(block, pool.creation_block));
480 tracing::info!(
481 "Syncing pool mints for {} on Dex {} from block {from_block}{}",
482 pool.ticker(),
483 dex_extended.name,
484 to_block.map_or(String::new(), |block| format!(" to {block}"))
485 );
486 let mint_event_signature = dex_extended.mint_created_event.as_ref();
487 let stream = self
488 .hypersync_client
489 .request_contract_events_stream(
490 from_block,
491 to_block,
492 &pool.address.to_string(),
493 mint_event_signature,
494 Vec::new(),
495 )
496 .await;
497
498 tokio::pin!(stream);
499
500 while let Some(log) = stream.next().await {
501 let mint_event = dex_extended.parse_mint_event(log)?;
502 let Some(timestamp) = self.cache.get_block_timestamp(mint_event.block_number) else {
503 tracing::error!(
504 "Missing block timestamp in the cache for block {} while processing mint event",
505 mint_event.block_number
506 );
507 continue;
508 };
509 let liquidity = u256_to_quantity(
510 U256::from(mint_event.amount),
511 self.chain.native_currency_decimals,
512 )?;
513 let amount0 = u256_to_quantity(mint_event.amount0, pool.token0.decimals)?;
514 let amount1 = u256_to_quantity(mint_event.amount1, pool.token1.decimals)?;
515
516 let liquidity_update = PoolLiquidityUpdate::new(
517 self.chain.clone(),
518 dex_extended.dex.clone(),
519 pool.clone(),
520 PoolLiquidityUpdateType::Mint,
521 mint_event.block_number,
522 mint_event.transaction_hash,
523 mint_event.transaction_index,
524 mint_event.log_index,
525 Some(mint_event.sender),
526 mint_event.owner,
527 liquidity,
528 amount0,
529 amount1,
530 mint_event.tick_lower,
531 mint_event.tick_upper,
532 *timestamp,
533 );
534
535 self.cache.add_liquidity_update(&liquidity_update).await?;
536
537 self.send_liquidity_update(liquidity_update);
538 }
539
540 tracing::info!("Finished syncing pool mints");
541 Ok(())
542 }
543
544 pub async fn sync_pool_burns(
546 &self,
547 dex_id: &str,
548 pool_address: String,
549 from_block: Option<u64>,
550 to_block: Option<u64>,
551 ) -> anyhow::Result<()> {
552 let dex_extended = self.get_dex(dex_id)?.clone();
553 let pool = self.get_pool(&pool_address)?.clone();
554 let from_block =
555 from_block.map_or(pool.creation_block, |block| max(block, pool.creation_block));
556 tracing::info!(
557 "Syncing pool burns for {} on Dex {} from block {from_block}{}",
558 pool.ticker(),
559 dex_extended.name,
560 to_block.map_or(String::new(), |block| format!(" to {block}"))
561 );
562 let burn_event_signature = dex_extended.burn_created_event.as_ref();
563 let stream = self
564 .hypersync_client
565 .request_contract_events_stream(
566 from_block,
567 to_block,
568 &pool.address.to_string(),
569 burn_event_signature,
570 Vec::new(),
571 )
572 .await;
573
574 tokio::pin!(stream);
575
576 while let Some(log) = stream.next().await {
577 let burn_event = dex_extended.parse_burn_event(log)?;
578 let Some(timestamp) = self.cache.get_block_timestamp(burn_event.block_number) else {
579 tracing::error!(
580 "Missing block timestamp in the cache for block {} while processing burn event",
581 burn_event.block_number
582 );
583 continue;
584 };
585 let liquidity = u256_to_quantity(
586 U256::from(burn_event.amount),
587 self.chain.native_currency_decimals,
588 )?;
589 let amount0 = u256_to_quantity(burn_event.amount0, pool.token0.decimals)?;
590 let amount1 = u256_to_quantity(burn_event.amount1, pool.token1.decimals)?;
591
592 let liquidity_update = PoolLiquidityUpdate::new(
593 self.chain.clone(),
594 dex_extended.dex.clone(),
595 pool.clone(),
596 PoolLiquidityUpdateType::Burn,
597 burn_event.block_number,
598 burn_event.transaction_hash,
599 burn_event.transaction_index,
600 burn_event.log_index,
601 None,
602 burn_event.owner,
603 liquidity,
604 amount0,
605 amount1,
606 burn_event.tick_lower,
607 burn_event.tick_upper,
608 *timestamp,
609 );
610
611 self.cache.add_liquidity_update(&liquidity_update).await?;
612
613 self.send_liquidity_update(liquidity_update);
614 }
615
616 tracing::info!("Finished syncing pool burns");
617 Ok(())
618 }
619
620 pub async fn sync_exchange_pools(
622 &mut self,
623 dex_id: &str,
624 from_block: Option<u64>,
625 to_block: Option<u64>,
626 ) -> anyhow::Result<()> {
627 let from_block = from_block.unwrap_or(0);
628 tracing::info!(
629 "Syncing Dex exchange pools for {dex_id} from block {from_block}{}",
630 to_block.map_or(String::new(), |block| format!(" to {block}"))
631 );
632
633 let dex = self.get_dex(dex_id)?.clone();
634 let factory_address = dex.factory.as_ref();
635 let pair_created_event_signature = dex.pool_created_event.as_ref();
636 let pools_stream = self
637 .hypersync_client
638 .request_contract_events_stream(
639 from_block,
640 to_block,
641 factory_address,
642 pair_created_event_signature,
643 Vec::new(),
644 )
645 .await;
646
647 tokio::pin!(pools_stream);
648
649 while let Some(log) = pools_stream.next().await {
650 let pool = dex.parse_pool_created_event(log)?;
651 self.process_token(pool.token0.to_string()).await?;
652 self.process_token(pool.token1.to_string()).await?;
653 self.process_pool(&dex.dex, pool).await?;
654 }
655 Ok(())
656 }
657
658 pub async fn process_token(&mut self, token_address: String) -> anyhow::Result<()> {
664 let token_address = validate_address(&token_address)?;
665
666 if self.cache.get_token(&token_address).is_none() {
667 let token_info = self.tokens.fetch_token_info(&token_address).await?;
668 let token = Token::new(
669 self.chain.clone(),
670 token_address,
671 token_info.name,
672 token_info.symbol,
673 token_info.decimals,
674 );
675 tracing::info!("Saving fetched token {token} in the cache");
676 self.cache.add_token(token).await?;
677 }
678
679 Ok(())
680 }
681
682 async fn process_pool(&mut self, dex: &Dex, event: PoolCreatedEvent) -> anyhow::Result<()> {
684 let pool = Pool::new(
685 self.chain.clone(),
686 dex.clone(),
687 event.pool_address,
688 event.block_number,
689 self.cache.get_token(&event.token0).cloned().unwrap(),
690 self.cache.get_token(&event.token1).cloned().unwrap(),
691 event.fee,
692 event.tick_spacing,
693 UnixNanos::default(), );
695 self.cache.add_pool(pool.clone()).await?;
696
697 Ok(())
698 }
699
700 pub async fn register_exchange(&mut self, dex: DexExtended) -> anyhow::Result<()> {
702 let dex_id = dex.id();
703 tracing::info!("Registering blockchain exchange {dex_id}");
704 self.cache.add_dex(dex_id, dex).await?;
705 Ok(())
706 }
707
708 pub async fn process_hypersync_messages(&mut self) {
710 tracing::info!("Starting task 'process_hypersync_messages'");
711
712 let mut rx = if let Some(r) = self.hypersync_rx.take() {
713 r
714 } else {
715 tracing::warn!("HyperSync receiver already taken, not spawning forwarder");
716 return;
717 };
718
719 while let Some(msg) = rx.recv().await {
720 match msg {
721 BlockchainMessage::Block(block) => {
722 self.send_block(block);
723 }
724 BlockchainMessage::Swap(swap) => {
725 self.send_swap(swap);
726 }
727 }
728 }
729 }
730
731 pub async fn process_rpc_messages(&mut self) {
733 tracing::info!("Starting task 'process_rpc_messages'");
734
735 loop {
736 let msg = {
737 match self
738 .rpc_client
739 .as_mut()
740 .expect("process_rpc_messages: RPC client not initialised")
741 .next_rpc_message()
742 .await
743 {
744 Ok(m) => m,
745 Err(e) => {
746 tracing::error!("Error processing RPC message: {e}");
747 continue;
748 }
749 }
750 };
751
752 match msg {
753 BlockchainMessage::Block(block) => self.send_block(block),
754 BlockchainMessage::Swap(swap) => self.send_swap(swap),
755 }
756 }
757 }
758
759 pub async fn subscribe_blocks_async(&mut self) -> anyhow::Result<()> {
761 if let Some(rpc_client) = self.rpc_client.as_mut() {
762 rpc_client.subscribe_blocks().await?;
763 } else {
764 self.hypersync_client.subscribe_blocks();
765 }
766
767 Ok(())
768 }
769
770 pub async fn subscribe_pool_swaps_async(&mut self) -> anyhow::Result<()> {
772 if let Some(rpc_client) = self.rpc_client.as_mut() {
773 rpc_client.subscribe_swaps().await?;
774 } else {
775 todo!("Not implemented")
776 }
778
779 Ok(())
780 }
781
782 pub async fn unsubscribe_blocks_async(&mut self) -> anyhow::Result<()> {
784 if let Some(_rpc_client) = self.rpc_client.as_mut() {
785 todo!("Not implemented");
786 } else {
788 self.hypersync_client.unsubscribe_blocks();
789 }
790
791 Ok(())
792 }
793
794 pub async fn unsubscribe_pool_swaps_async(&mut self) -> anyhow::Result<()> {
796 if let Some(_rpc_client) = self.rpc_client.as_mut() {
797 todo!("Not implemented");
798 } else {
800 self.hypersync_client.unsubscribe_blocks();
801 }
802
803 Ok(())
804 }
805
806 fn get_dex(&self, dex_id: &str) -> anyhow::Result<&DexExtended> {
807 match self.cache.get_dex(dex_id) {
808 Some(dex) => Ok(dex),
809 None => anyhow::bail!("Dex {dex_id} is not registered"),
810 }
811 }
812
813 fn get_pool(&self, pool_address: &str) -> anyhow::Result<&SharedPool> {
814 let pool_address = validate_address(pool_address)?;
815 match self.cache.get_pool(&pool_address) {
816 Some(pool) => Ok(pool),
817 None => anyhow::bail!("Pool {pool_address} is not registered"),
818 }
819 }
820
821 fn send_block(&self, block: Block) {
822 let data = DataEvent::DeFi(DefiData::Block(block));
823 self.send_data(data);
824 }
825
826 fn send_swap(&self, swap: PoolSwap) {
827 let data = DataEvent::DeFi(DefiData::PoolSwap(swap));
828 self.send_data(data);
829 }
830
831 fn send_liquidity_update(&self, update: PoolLiquidityUpdate) {
832 let data = DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update));
833 self.send_data(data);
834 }
835
836 fn send_data(&self, data: DataEvent) {
837 tracing::debug!("Sending {data}");
838
839 if let Err(e) = self.data_sender.send(data) {
840 tracing::error!("Failed to send data: {e}");
841 }
842 }
843}
844
845#[async_trait::async_trait]
846impl DataClient for BlockchainDataClient {
847 fn client_id(&self) -> ClientId {
848 ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
849 }
850
851 fn venue(&self) -> Option<Venue> {
852 None
855 }
856
857 fn start(&mut self) -> anyhow::Result<()> {
858 tracing::info!("Starting blockchain data client for '{}'", self.chain.name);
859 Ok(())
860 }
861
862 fn stop(&mut self) -> anyhow::Result<()> {
863 tracing::info!("Stopping blockchain data client for '{}'", self.chain.name);
864 Ok(())
865 }
866
867 fn reset(&mut self) -> anyhow::Result<()> {
868 tracing::info!("Resetting blockchain data client for '{}'", self.chain.name);
869 Ok(())
870 }
871
872 fn dispose(&mut self) -> anyhow::Result<()> {
873 tracing::info!("Disposing blockchain data client for '{}'", self.chain.name);
874 Ok(())
875 }
876
877 async fn connect(&mut self) -> anyhow::Result<()> {
878 tracing::info!(
879 "Connecting blockchain data client for '{}'",
880 self.chain.name
881 );
882
883 if let Some(ref mut rpc_client) = self.rpc_client {
884 rpc_client.connect().await?;
885 }
886
887 let from_block = self.config.from_block.unwrap_or(0);
888 self.cache.connect(from_block).await?;
889 self.sync_blocks(self.config.from_block).await?;
890 if self.process_task.is_none() {
893 self.spawn_process_task();
894 }
895
896 Ok(())
897 }
898
899 async fn disconnect(&mut self) -> anyhow::Result<()> {
900 tracing::info!(
901 "Disconnecting blockchain data client for '{}'",
902 self.chain.name
903 );
904
905 if let Some(handle) = self.process_task.take() {
906 tracing::debug!("Aborting task 'process'");
907 handle.abort();
908 }
909
910 Ok(())
911 }
912
913 fn is_connected(&self) -> bool {
914 self.rpc_client.is_some() || true }
918
919 fn is_disconnected(&self) -> bool {
920 !self.is_connected()
921 }
922
923 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
924 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
925 self.command_tx.send(command)?;
926 Ok(())
927 }
928
929 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
930 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
931 self.command_tx.send(command)?;
932 Ok(())
933 }
934
935 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
936 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
937 self.command_tx.send(command)?;
938 Ok(())
939 }
940
941 fn subscribe_pool_liquidity_updates(
942 &mut self,
943 cmd: &SubscribePoolLiquidityUpdates,
944 ) -> anyhow::Result<()> {
945 let command =
946 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
947 self.command_tx.send(command)?;
948 Ok(())
949 }
950
951 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
952 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
953 self.command_tx.send(command)?;
954 Ok(())
955 }
956
957 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
958 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
959 self.command_tx.send(command)?;
960 Ok(())
961 }
962
963 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
964 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
965 self.command_tx.send(command)?;
966 Ok(())
967 }
968
969 fn unsubscribe_pool_liquidity_updates(
970 &mut self,
971 cmd: &UnsubscribePoolLiquidityUpdates,
972 ) -> anyhow::Result<()> {
973 let command =
974 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
975 self.command_tx.send(command)?;
976 Ok(())
977 }
978}