live_blocks_rpc/
watch_rpc_live_blocks.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use nautilus_blockchain::{config::BlockchainDataClientConfig, data::BlockchainDataClient};
19use nautilus_common::logging::{
20    logger::{Logger, LoggerConfig},
21    writer::FileWriterConfig,
22};
23use nautilus_core::{UUID4, env::get_env_var};
24use posei_trader::DataClient;
25use nautilus_live::runner::AsyncRunner;
26use nautilus_model::{
27    defi::chain::{Blockchain, Chain, chains},
28    identifiers::TraderId,
29};
30use tokio::sync::Notify;
31
32// Run with `cargo run -p nautilus-blockchain --bin live_blocks_hypersync --features hypersync`
33
34#[tokio::main]
35async fn main() -> Result<(), Box<dyn std::error::Error>> {
36    dotenvy::dotenv().ok();
37
38    let _logger_guard = Logger::init_with_config(
39        TraderId::default(),
40        UUID4::new(),
41        LoggerConfig::default(),
42        FileWriterConfig::new(None, None, None, None),
43    )?;
44
45    let _ = AsyncRunner::default(); // Needed for live channels
46
47    // Setup graceful shutdown with signal handling in different task
48    let notify = Arc::new(Notify::new());
49    let notifier = notify.clone();
50    tokio::spawn(async move {
51        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
52            .expect("Failed to create SIGTERM listener");
53        let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
54            .expect("Failed to create SIGINT listener");
55        tokio::select! {
56            _ = sigterm.recv() => {}
57            _ = sigint.recv() => {}
58        }
59        log::info!("Shutdown signal received, shutting down...");
60        notifier.notify_one();
61    });
62
63    // Initialize the blockchain data client, connect and subscribe to live blocks with RPC
64    let chain: Chain = match std::env::var("CHAIN") {
65        Ok(chain_str) => {
66            if let Ok(blockchain) = chain_str.parse::<Blockchain>() {
67                match blockchain {
68                    Blockchain::Ethereum => chains::ETHEREUM.clone(),
69                    Blockchain::Base => chains::BASE.clone(),
70                    Blockchain::Arbitrum => chains::ARBITRUM.clone(),
71                    Blockchain::Polygon => chains::POLYGON.clone(),
72                    _ => panic!("Invalid chain {chain_str}"),
73                }
74            } else {
75                panic!("Invalid chain {chain_str}");
76            }
77        }
78        Err(_) => chains::ETHEREUM.clone(), // default
79    };
80    let chain = Arc::new(chain);
81    let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
82    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
83    let blockchain_config = BlockchainDataClientConfig::new(
84        chain.clone(),
85        http_rpc_url,
86        None, // RPC requests per second
87        Some(wss_rpc_url),
88        false, // Don't use hypersync for live data
89        None,  // from_block
90    );
91
92    let mut data_client = BlockchainDataClient::new(blockchain_config);
93
94    data_client.connect().await?;
95    data_client.subscribe_blocks_async().await?;
96
97    loop {
98        tokio::select! {
99            () = notify.notified() => break,
100            () = data_client.process_rpc_messages() => {}
101        }
102    }
103
104    data_client.disconnect().await?;
105    Ok(())
106}