live_blocks_rpc/
watch_rpc_live_blocks.rs1use std::sync::Arc;
17
18use nautilus_blockchain::{config::BlockchainAdapterConfig, data::BlockchainDataClient};
19use nautilus_common::logging::{
20 logger::{Logger, LoggerConfig},
21 writer::FileWriterConfig,
22};
23use nautilus_core::{UUID4, env::get_env_var};
24use nautilus_model::{
25 defi::chain::{Blockchain, Chain, chains},
26 identifiers::TraderId,
27};
28use tokio::sync::Notify;
29
30#[tokio::main]
31async fn main() -> Result<(), Box<dyn std::error::Error>> {
32 dotenvy::dotenv().ok();
33 let _logger_guard = Logger::init_with_config(
35 TraderId::default(),
36 UUID4::new(),
37 LoggerConfig::default(),
38 FileWriterConfig::new(None, None, None, None),
39 )?;
40
41 let notify = Arc::new(Notify::new());
43 let notifier = notify.clone();
44 tokio::spawn(async move {
45 let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
46 .expect("Failed to create SIGTERM listener");
47 let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
48 .expect("Failed to create SIGINT listener");
49 tokio::select! {
50 _ = sigterm.recv() => {}
51 _ = sigint.recv() => {}
52 }
53 log::info!("Shutdown signal received, shutting down...");
54 notifier.notify_one();
55 });
56
57 let chain: Chain = match std::env::var("CHAIN") {
59 Ok(chain_str) => {
60 if let Ok(blockchain) = chain_str.parse::<Blockchain>() {
61 match blockchain {
62 Blockchain::Ethereum => chains::ETHEREUM.clone(),
63 Blockchain::Base => chains::BASE.clone(),
64 Blockchain::Arbitrum => chains::ARBITRUM.clone(),
65 Blockchain::Polygon => chains::POLYGON.clone(),
66 _ => panic!("Invalid chain {chain_str}"),
67 }
68 } else {
69 panic!("Invalid chain {chain_str}");
70 }
71 }
72 Err(_) => chains::ETHEREUM.clone(), };
74 let chain = Arc::new(chain);
75 let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
76 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
77 let blockchain_config =
78 BlockchainAdapterConfig::new(http_rpc_url, None, Some(wss_rpc_url), false);
79 let mut data_client = BlockchainDataClient::new(chain.clone(), blockchain_config);
80 data_client.connect().await?;
81 data_client.subscribe_blocks().await;
82
83 loop {
85 tokio::select! {
86 () = notify.notified() => break,
87 () = data_client.process_rpc_message() => {}
88 }
89 }
90 data_client.disconnect()?;
91 Ok(())
92}