sync_tokens_pools/
sync_tokens_pools.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use nautilus_blockchain::{config::BlockchainAdapterConfig, data::BlockchainDataClient, exchanges};
19use nautilus_common::logging::{
20    logger::{Logger, LoggerConfig},
21    writer::FileWriterConfig,
22};
23use nautilus_core::{UUID4, env::get_env_var};
24use nautilus_model::{
25    defi::chain::{Blockchain, Chain, chains},
26    identifiers::TraderId,
27};
28use tokio::sync::Notify;
29
30#[tokio::main]
31async fn main() -> Result<(), Box<dyn std::error::Error>> {
32    dotenvy::dotenv().ok();
33    // Setup logger
34    let _logger_guard = Logger::init_with_config(
35        TraderId::default(),
36        UUID4::new(),
37        LoggerConfig::default(),
38        FileWriterConfig::new(None, None, None, None),
39    )?;
40
41    // Setup graceful shutdown with signal handling in different task
42    let notify = Arc::new(Notify::new());
43    let notifier = notify.clone();
44    tokio::spawn(async move {
45        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
46            .expect("Failed to create SIGTERM listener");
47        let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
48            .expect("Failed to create SIGINT listener");
49        tokio::select! {
50            _ = sigterm.recv() => {}
51            _ = sigint.recv() => {}
52        }
53        log::info!("Shutdown signal received, shutting down...");
54        notifier.notify_one();
55    });
56
57    // Initialize the blockchain data client, connect and subscribe to live blocks with RPC
58    let chain: Chain = match std::env::var("CHAIN") {
59        Ok(chain_str) => {
60            if let Ok(blockchain) = chain_str.parse::<Blockchain>() {
61                match blockchain {
62                    Blockchain::Ethereum => chains::ETHEREUM.clone(),
63                    Blockchain::Base => chains::BASE.clone(),
64                    Blockchain::Arbitrum => chains::ARBITRUM.clone(),
65                    Blockchain::Polygon => chains::POLYGON.clone(),
66                    _ => panic!("Invalid chain {chain_str}"),
67                }
68            } else {
69                panic!("Invalid chain {chain_str}");
70            }
71        }
72        Err(_) => chains::ETHEREUM.clone(), // default
73    };
74    let chain = Arc::new(chain);
75    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
76    let blockchain_config = BlockchainAdapterConfig::new(http_rpc_url, Some(3), None, true);
77
78    let mut data_client = BlockchainDataClient::new(chain, blockchain_config);
79    data_client.initialize_cache_database(None).await;
80
81    let univ3 = exchanges::ethereum::UNISWAP_V3.clone();
82    let dex_id = univ3.id();
83    data_client.connect().await?;
84    data_client.register_exchange(univ3.clone()).await?;
85    // Lets use block https://etherscan.io/block/22327045 from (Apr-22-2025 08:49:47 PM +UTC)
86    let from_block = Some(22327045);
87
88    // Main loop to keep the app running
89    loop {
90        tokio::select! {
91            () = notify.notified() => break,
92             result = data_client.sync_exchange_pools(dex_id.as_str(), from_block) => {
93                match result {
94                    Ok(()) => {
95                        // Exit after the tokens and pool are synced successfully
96                        log::info!("Successfully synced tokens and pools");
97                        break;
98                    },
99                    Err(e) => {
100                        // Handle error case
101                        log::error!("Error syncing tokens and pools: {e}");
102                        break;
103                    }
104                }
105            }
106        }
107    }
108    data_client.disconnect()?;
109    Ok(())
110}