sync_tokens_pools/
sync_tokens_pools.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use nautilus_blockchain::{
19    config::BlockchainDataClientConfig, data::BlockchainDataClient, exchanges,
20};
21use nautilus_common::logging::{
22    logger::{Logger, LoggerConfig},
23    writer::FileWriterConfig,
24};
25use nautilus_core::{UUID4, env::get_env_var};
26use posei_trader::DataClient;
27use nautilus_live::runner::AsyncRunner;
28use nautilus_model::{defi::chain::chains, identifiers::TraderId};
29use tokio::sync::Notify;
30
31// Run with `cargo run -p nautilus-blockchain --bin sync_token_pool --features hypersync`
32
33#[tokio::main]
34async fn main() -> Result<(), Box<dyn std::error::Error>> {
35    dotenvy::dotenv().ok();
36
37    let _logger_guard = Logger::init_with_config(
38        TraderId::default(),
39        UUID4::new(),
40        LoggerConfig::default(),
41        FileWriterConfig::new(None, None, None, None),
42    )?;
43
44    let _ = AsyncRunner::default(); // Needed for live channels
45
46    // Setup graceful shutdown with signal handling in different task
47    let notify = Arc::new(Notify::new());
48    let notifier = notify.clone();
49    tokio::spawn(async move {
50        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
51            .expect("Failed to create SIGTERM listener");
52        let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
53            .expect("Failed to create SIGINT listener");
54        tokio::select! {
55            _ = sigterm.recv() => {}
56            _ = sigint.recv() => {}
57        }
58        log::info!("Shutdown signal received, shutting down...");
59        notifier.notify_one();
60    });
61
62    // Initialize the blockchain data client, connect and subscribe to live blocks with RPC
63    let chain = Arc::new(chains::ETHEREUM.clone());
64    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
65    // Let's use block https://etherscan.io/block/22327045 from (Apr-22-2025 08:49:47 PM +UTC)
66    let from_block = Some(22327045);
67    let blockchain_config = BlockchainDataClientConfig::new(
68        chain.clone(),
69        http_rpc_url,
70        Some(3), // RPC requests per second
71        None,    // WSS RPC URL
72        true,    // Use hypersync for live data
73        from_block,
74    );
75
76    let mut data_client = BlockchainDataClient::new(blockchain_config);
77    data_client.initialize_cache_database(None).await;
78
79    let univ3 = exchanges::ethereum::UNISWAP_V3.clone();
80    let dex_id = univ3.id();
81    data_client.connect().await?;
82    data_client.register_exchange(univ3.clone()).await?;
83
84    loop {
85        tokio::select! {
86            () = notify.notified() => break,
87             result = data_client.sync_exchange_pools(dex_id.as_str(), from_block, None) => {
88                match result {
89                    Ok(()) => {
90                        // Exit after the tokens and pool are synced successfully
91                        log::info!("Successfully synced tokens and pools");
92                        break;
93                    },
94                    Err(e) => {
95                        // Handle error case
96                        log::error!("Error syncing tokens and pools: {e}");
97                        break;
98                    }
99                }
100            }
101        }
102    }
103
104    data_client.disconnect().await?;
105    Ok(())
106}