live_blocks_hypersync/
watch_hypersync_live_blocks.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use nautilus_blockchain::{config::BlockchainAdapterConfig, data::BlockchainDataClient};
19use nautilus_common::logging::{
20    logger::{Logger, LoggerConfig},
21    writer::FileWriterConfig,
22};
23use nautilus_core::{UUID4, env::get_env_var};
24use nautilus_model::{
25    defi::chain::{Blockchain, Chain, chains},
26    identifiers::TraderId,
27};
28use tokio::sync::Notify;
29
30#[tokio::main]
31async fn main() -> Result<(), Box<dyn std::error::Error>> {
32    dotenvy::dotenv().ok();
33    // Setup logger
34    let _logger_guard = Logger::init_with_config(
35        TraderId::default(),
36        UUID4::new(),
37        LoggerConfig::default(),
38        FileWriterConfig::new(None, None, None, None),
39    )?;
40
41    // Setup graceful shutdown with signal handling in a different task
42    let notify = Arc::new(Notify::new());
43    let notifier = notify.clone();
44    tokio::spawn(async move {
45        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
46            .expect("Failed to create SIGTERM listener");
47        let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
48            .expect("Failed to create SIGINT listener");
49        tokio::select! {
50            _ = sigterm.recv() => {}
51            _ = sigint.recv() => {}
52        }
53        log::info!("Shutdown signal received, shutting down...");
54        notifier.notify_one();
55    });
56
57    // Initialize the blockchain data client, connect and subscribe to live blocks with HyperSync watch
58    let chain: Chain = match std::env::var("CHAIN") {
59        Ok(chain_str) => {
60            if let Ok(blockchain) = chain_str.parse::<Blockchain>() {
61                match blockchain {
62                    Blockchain::Ethereum => chains::ETHEREUM.clone(),
63                    Blockchain::Base => chains::BASE.clone(),
64                    Blockchain::Arbitrum => chains::ARBITRUM.clone(),
65                    Blockchain::Polygon => chains::POLYGON.clone(),
66                    _ => panic!("Invalid chain {chain_str}"),
67                }
68            } else {
69                panic!("Invalid chain {chain_str}");
70            }
71        }
72        Err(_) => chains::ETHEREUM.clone(), // default
73    };
74    let chain = Arc::new(chain);
75    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
76    let blockchain_config = BlockchainAdapterConfig::new(http_rpc_url, None, None, true);
77    let mut data_client = BlockchainDataClient::new(chain.clone(), blockchain_config);
78    data_client.connect().await?;
79    data_client.subscribe_blocks().await;
80
81    // Main loop to keep the app running
82    loop {
83        tokio::select! {
84            () = notify.notified() => break,
85            () = data_client.process_hypersync_message() => {}
86        }
87    }
88    data_client.disconnect()?;
89    Ok(())
90}