sync_tokens_pools/
sync_tokens_pools.rs1use std::sync::Arc;
17
18use nautilus_blockchain::{config::BlockchainAdapterConfig, data::BlockchainDataClient, exchanges};
19use nautilus_common::logging::{
20 logger::{Logger, LoggerConfig},
21 writer::FileWriterConfig,
22};
23use nautilus_core::{UUID4, env::get_env_var};
24use nautilus_model::{
25 defi::chain::{Blockchain, Chain, chains},
26 identifiers::TraderId,
27};
28use tokio::sync::Notify;
29
30#[tokio::main]
31async fn main() -> Result<(), Box<dyn std::error::Error>> {
32 dotenvy::dotenv().ok();
33 let _logger_guard = Logger::init_with_config(
35 TraderId::default(),
36 UUID4::new(),
37 LoggerConfig::default(),
38 FileWriterConfig::new(None, None, None, None),
39 )?;
40
41 let notify = Arc::new(Notify::new());
43 let notifier = notify.clone();
44 tokio::spawn(async move {
45 let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
46 .expect("Failed to create SIGTERM listener");
47 let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
48 .expect("Failed to create SIGINT listener");
49 tokio::select! {
50 _ = sigterm.recv() => {}
51 _ = sigint.recv() => {}
52 }
53 log::info!("Shutdown signal received, shutting down...");
54 notifier.notify_one();
55 });
56
57 let chain: Chain = match std::env::var("CHAIN") {
59 Ok(chain_str) => {
60 if let Ok(blockchain) = chain_str.parse::<Blockchain>() {
61 match blockchain {
62 Blockchain::Ethereum => chains::ETHEREUM.clone(),
63 Blockchain::Base => chains::BASE.clone(),
64 Blockchain::Arbitrum => chains::ARBITRUM.clone(),
65 Blockchain::Polygon => chains::POLYGON.clone(),
66 _ => panic!("Invalid chain {chain_str}"),
67 }
68 } else {
69 panic!("Invalid chain {chain_str}");
70 }
71 }
72 Err(_) => chains::ETHEREUM.clone(), };
74 let chain = Arc::new(chain);
75 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
76 let blockchain_config = BlockchainAdapterConfig::new(http_rpc_url, Some(3), None, true);
77
78 let mut data_client = BlockchainDataClient::new(chain, blockchain_config);
79 data_client.initialize_cache_database(None).await;
80
81 let univ3 = exchanges::ethereum::UNISWAP_V3.clone();
82 let dex_id = univ3.id();
83 data_client.connect().await?;
84 data_client.register_exchange(univ3.clone()).await?;
85 let from_block = Some(22327045);
87
88 loop {
90 tokio::select! {
91 () = notify.notified() => break,
92 result = data_client.sync_exchange_pools(dex_id.as_str(), from_block) => {
93 match result {
94 Ok(()) => {
95 log::info!("Successfully synced tokens and pools");
97 break;
98 },
99 Err(e) => {
100 log::error!("Error syncing tokens and pools: {e}");
102 break;
103 }
104 }
105 }
106 }
107 }
108 data_client.disconnect()?;
109 Ok(())
110}