node_test/
node_test.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    ops::{Deref, DerefMut},
18    str::FromStr,
19    sync::Arc,
20    time::Duration,
21};
22
23use alloy::primitives::Address;
24use nautilus_blockchain::{
25    config::BlockchainDataClientConfig, factories::BlockchainDataClientFactory,
26};
27use nautilus_common::{
28    actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
29    enums::{Environment, LogColor},
30    logging::log_info,
31};
32use nautilus_core::env::get_env_var;
33use nautilus_live::node::LiveNode;
34use nautilus_model::{
35    defi::{Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
36    identifiers::{ClientId, TraderId},
37};
38
39// Requires capnp installed on the machine
40// Run with `cargo run -p nautilus-blockchain --bin node_test --features hypersync`
41// To see additional tracing logs `export RUST_LOG=debug,h2=off`
42
43#[tokio::main]
44async fn main() -> Result<(), Box<dyn std::error::Error>> {
45    dotenvy::dotenv().ok();
46
47    let environment = Environment::Live;
48    let trader_id = TraderId::default();
49    let node_name = "TESTER-001".to_string();
50
51    let chain = chains::ARBITRUM.clone();
52    let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
53    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
54    // let from_block = Some(22_735_000_u64); // Ethereum
55    // let from_block = Some(348_860_000_u64); // Arbitrum
56    let from_block = None; // No sync
57
58    let client_factory = BlockchainDataClientFactory::new();
59    let client_config = BlockchainDataClientConfig::new(
60        Arc::new(chain.clone()),
61        http_rpc_url,
62        None, // RPC requests per second
63        Some(wss_rpc_url),
64        true, // Use HyperSync for live data
65        // Some(from_block), // from_block
66        from_block,
67    );
68
69    let mut node = LiveNode::builder(node_name, trader_id, environment)?
70        .with_load_state(false)
71        .with_save_state(false)
72        .add_data_client(
73            None, // Use factory name
74            client_factory,
75            client_config,
76        )?
77        .build()?;
78
79    // Create and register a blockchain subscriber actor
80    let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
81    let pools = vec![
82        Address::from_str("0xC31E54c7A869B9fCbECC14363CF510d1C41Fa443")?, // WETH/USDC Arbitrum One
83    ];
84
85    let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
86    let actor = BlockchainSubscriberActor::new(actor_config);
87
88    node.add_actor(actor)?;
89
90    node.run().await?;
91
92    Ok(())
93}
94
95/// Configuration for the blockchain subscriber actor.
96#[derive(Debug, Clone)]
97pub struct BlockchainSubscriberActorConfig {
98    /// Base data actor configuration.
99    pub base: DataActorConfig,
100    /// Client ID to use for subscriptions.
101    pub client_id: ClientId,
102    /// The blockchain to subscribe for.
103    pub chain: Blockchain,
104    /// Pool addresses to monitor for swaps and liquidity updates.
105    pub pools: Vec<Address>,
106}
107
108impl BlockchainSubscriberActorConfig {
109    /// Creates a new [`BlockchainSubscriberActorConfig`] instance.
110    #[must_use]
111    pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<Address>) -> Self {
112        Self {
113            base: DataActorConfig::default(),
114            client_id,
115            chain,
116            pools,
117        }
118    }
119}
120
121/// A basic blockchain subscriber actor that monitors DeFi activities.
122///
123/// This actor demonstrates how to use the `DataActor` trait to monitor blockchain data
124/// from DEXs, pools, and other DeFi protocols. It logs received blocks and swaps
125/// to demonstrate the data flow.
126#[derive(Debug)]
127pub struct BlockchainSubscriberActor {
128    core: DataActorCore,
129    config: BlockchainSubscriberActorConfig,
130    pub received_blocks: Vec<Block>,
131    pub received_pool_swaps: Vec<PoolSwap>,
132    pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
133    pub received_pools: Vec<Pool>,
134}
135
136impl Deref for BlockchainSubscriberActor {
137    type Target = DataActorCore;
138
139    fn deref(&self) -> &Self::Target {
140        &self.core
141    }
142}
143
144impl DerefMut for BlockchainSubscriberActor {
145    fn deref_mut(&mut self) -> &mut Self::Target {
146        &mut self.core
147    }
148}
149
150impl DataActor for BlockchainSubscriberActor {
151    fn on_start(&mut self) -> anyhow::Result<()> {
152        let client_id = self.config.client_id;
153
154        self.subscribe_blocks(self.config.chain, Some(client_id), None);
155
156        let pool_addresses = self.config.pools.clone();
157        for address in pool_addresses {
158            self.subscribe_pool(address, Some(client_id), None);
159            self.subscribe_pool_swaps(address, Some(client_id), None);
160            self.subscribe_pool_liquidity_updates(address, Some(client_id), None);
161        }
162
163        self.clock().set_timer(
164            "TEST-TIMER-1-SECOND",
165            Duration::from_secs(1),
166            None,
167            None,
168            None,
169            Some(true),
170            Some(false),
171        )?;
172
173        self.clock().set_timer(
174            "TEST-TIMER-2-SECOND",
175            Duration::from_secs(2),
176            None,
177            None,
178            None,
179            Some(true),
180            Some(false),
181        )?;
182
183        Ok(())
184    }
185
186    fn on_stop(&mut self) -> anyhow::Result<()> {
187        let client_id = self.config.client_id;
188
189        self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
190
191        let pool_addresses = self.config.pools.clone();
192        for address in pool_addresses {
193            self.unsubscribe_pool(address, Some(client_id), None);
194            self.unsubscribe_pool_swaps(address, Some(client_id), None);
195            self.unsubscribe_pool_liquidity_updates(address, Some(client_id), None);
196        }
197
198        Ok(())
199    }
200
201    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
202        log_info!("Received {block}", color = LogColor::Cyan);
203
204        self.received_blocks.push(block.clone());
205        Ok(())
206    }
207
208    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
209        log_info!("Received {swap}", color = LogColor::Cyan);
210
211        self.received_pool_swaps.push(swap.clone());
212        Ok(())
213    }
214}
215
216impl BlockchainSubscriberActor {
217    /// Creates a new [`BlockchainSubscriberActor`] instance.
218    #[must_use]
219    pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
220        Self {
221            core: DataActorCore::new(config.base.clone()),
222            config,
223            received_blocks: Vec::new(),
224            received_pool_swaps: Vec::new(),
225            received_pool_liquidity_updates: Vec::new(),
226            received_pools: Vec::new(),
227        }
228    }
229
230    /// Returns the number of pools received by this actor.
231    #[must_use]
232    pub const fn block_count(&self) -> usize {
233        self.received_blocks.len()
234    }
235
236    /// Returns the number of pools received by this actor.
237    #[must_use]
238    pub const fn pool_count(&self) -> usize {
239        self.received_pools.len()
240    }
241
242    /// Returns the number of swaps received by this actor.
243    #[must_use]
244    pub const fn pool_swap_count(&self) -> usize {
245        self.received_pool_swaps.len()
246    }
247
248    /// Returns the number of liquidity updates received by this actor.
249    #[must_use]
250    pub const fn pool_liquidity_update_count(&self) -> usize {
251        self.received_pool_liquidity_updates.len()
252    }
253}