sync_pool_events/
sync_pool_events.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use nautilus_blockchain::{
19    config::BlockchainDataClientConfig, data::BlockchainDataClient, exchanges,
20};
21use nautilus_common::logging::{
22    logger::{Logger, LoggerConfig},
23    writer::FileWriterConfig,
24};
25use nautilus_core::{UUID4, env::get_env_var};
26use posei_trader::DataClient;
27use nautilus_live::runner::AsyncRunner;
28use nautilus_model::{defi::chain::chains, identifiers::TraderId};
29use tokio::sync::Notify;
30
31// Run with `cargo run -p nautilus-blockchain --bin sync_pool_events --features hypersync`
32
33#[tokio::main]
34async fn main() -> Result<(), Box<dyn std::error::Error>> {
35    dotenvy::dotenv().ok();
36
37    let _logger_guard = Logger::init_with_config(
38        TraderId::default(),
39        UUID4::new(),
40        LoggerConfig::default(),
41        FileWriterConfig::new(None, None, None, None),
42    )?;
43
44    let _ = AsyncRunner::default(); // Needed for live channels
45
46    // Setup graceful shutdown with signal handling in different task
47    let notify = Arc::new(Notify::new());
48    let notifier = notify.clone();
49    tokio::spawn(async move {
50        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
51            .expect("Failed to create SIGTERM listener");
52        let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
53            .expect("Failed to create SIGINT listener");
54        tokio::select! {
55            _ = sigterm.recv() => {}
56            _ = sigint.recv() => {}
57        }
58        log::info!("Shutdown signal received, shutting down...");
59        notifier.notify_one();
60    });
61
62    // Initialize the blockchain data client, connect and subscribe to live blocks with RPC
63    let chain = Arc::new(chains::ETHEREUM.clone());
64    // WETH/USDC Uniswap V3 pool
65    let weth_usdc_pool = "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640";
66    let pool_creation_block = 12376729;
67    let from_block = Some(22550000);
68
69    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
70    let blockchain_config = BlockchainDataClientConfig::new(
71        chain.clone(),
72        http_rpc_url,
73        Some(3), // RPC requests per second
74        None,    // WSS RPC URL
75        true,    // Use hypersync for live data
76        from_block,
77    );
78
79    let mut data_client = BlockchainDataClient::new(blockchain_config);
80    data_client.initialize_cache_database(None).await;
81
82    let univ3 = exchanges::ethereum::UNISWAP_V3.clone();
83    let dex_id = univ3.id();
84
85    data_client.connect().await?;
86    data_client.register_exchange(univ3.clone()).await?;
87
88    loop {
89        tokio::select! {
90            () = notify.notified() => break,
91             () = async {
92                data_client.sync_exchange_pools(
93                    dex_id.as_str(),
94                    Some(pool_creation_block),
95                    Some(pool_creation_block + 1),
96                ).await.unwrap();
97                data_client.sync_pool_swaps(
98                    dex_id.as_str(),
99                    weth_usdc_pool.to_string(),
100                    from_block,
101                    None,
102                ).await.unwrap();
103                data_client.sync_pool_mints(
104                    dex_id.as_str(),
105                    weth_usdc_pool.to_string(),
106                    from_block,
107                    None,
108                ).await.unwrap();
109                data_client.sync_pool_burns(
110                    dex_id.as_str(),
111                    weth_usdc_pool.to_string(),
112                    from_block,
113                    None,
114                ).await.unwrap();
115            } => break,
116        }
117    }
118
119    data_client.disconnect().await?;
120    Ok(())
121}