sync_pool_events/
sync_pool_events.rs1use std::sync::Arc;
17
18use nautilus_blockchain::{
19 config::BlockchainDataClientConfig, data::BlockchainDataClient, exchanges,
20};
21use nautilus_common::logging::{
22 logger::{Logger, LoggerConfig},
23 writer::FileWriterConfig,
24};
25use nautilus_core::{UUID4, env::get_env_var};
26use posei_trader::DataClient;
27use nautilus_live::runner::AsyncRunner;
28use nautilus_model::{defi::chain::chains, identifiers::TraderId};
29use tokio::sync::Notify;
30
31#[tokio::main]
34async fn main() -> Result<(), Box<dyn std::error::Error>> {
35 dotenvy::dotenv().ok();
36
37 let _logger_guard = Logger::init_with_config(
38 TraderId::default(),
39 UUID4::new(),
40 LoggerConfig::default(),
41 FileWriterConfig::new(None, None, None, None),
42 )?;
43
44 let _ = AsyncRunner::default(); let notify = Arc::new(Notify::new());
48 let notifier = notify.clone();
49 tokio::spawn(async move {
50 let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
51 .expect("Failed to create SIGTERM listener");
52 let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
53 .expect("Failed to create SIGINT listener");
54 tokio::select! {
55 _ = sigterm.recv() => {}
56 _ = sigint.recv() => {}
57 }
58 log::info!("Shutdown signal received, shutting down...");
59 notifier.notify_one();
60 });
61
62 let chain = Arc::new(chains::ETHEREUM.clone());
64 let weth_usdc_pool = "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640";
66 let pool_creation_block = 12376729;
67 let from_block = Some(22550000);
68
69 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
70 let blockchain_config = BlockchainDataClientConfig::new(
71 chain.clone(),
72 http_rpc_url,
73 Some(3), None, true, from_block,
77 );
78
79 let mut data_client = BlockchainDataClient::new(blockchain_config);
80 data_client.initialize_cache_database(None).await;
81
82 let univ3 = exchanges::ethereum::UNISWAP_V3.clone();
83 let dex_id = univ3.id();
84
85 data_client.connect().await?;
86 data_client.register_exchange(univ3.clone()).await?;
87
88 loop {
89 tokio::select! {
90 () = notify.notified() => break,
91 () = async {
92 data_client.sync_exchange_pools(
93 dex_id.as_str(),
94 Some(pool_creation_block),
95 Some(pool_creation_block + 1),
96 ).await.unwrap();
97 data_client.sync_pool_swaps(
98 dex_id.as_str(),
99 weth_usdc_pool.to_string(),
100 from_block,
101 None,
102 ).await.unwrap();
103 data_client.sync_pool_mints(
104 dex_id.as_str(),
105 weth_usdc_pool.to_string(),
106 from_block,
107 None,
108 ).await.unwrap();
109 data_client.sync_pool_burns(
110 dex_id.as_str(),
111 weth_usdc_pool.to_string(),
112 from_block,
113 None,
114 ).await.unwrap();
115 } => break,
116 }
117 }
118
119 data_client.disconnect().await?;
120 Ok(())
121}