1use std::{
17 ops::{Deref, DerefMut},
18 str::FromStr,
19 sync::Arc,
20 time::Duration,
21};
22
23use alloy::primitives::Address;
24use nautilus_blockchain::{
25 config::BlockchainDataClientConfig, factories::BlockchainDataClientFactory,
26};
27use nautilus_common::{
28 actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
29 enums::{Environment, LogColor},
30 logging::log_info,
31};
32use nautilus_core::env::get_env_var;
33use nautilus_live::node::LiveNode;
34use nautilus_model::{
35 defi::{Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
36 identifiers::{ClientId, TraderId},
37};
38
39#[tokio::main]
44async fn main() -> Result<(), Box<dyn std::error::Error>> {
45 dotenvy::dotenv().ok();
46
47 let environment = Environment::Live;
48 let trader_id = TraderId::default();
49 let node_name = "TESTER-001".to_string();
50
51 let chain = chains::ARBITRUM.clone();
52 let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
53 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
54 let from_block = None; let client_factory = BlockchainDataClientFactory::new();
59 let client_config = BlockchainDataClientConfig::new(
60 Arc::new(chain.clone()),
61 http_rpc_url,
62 None, Some(wss_rpc_url),
64 true, from_block,
67 );
68
69 let mut node = LiveNode::builder(node_name, trader_id, environment)?
70 .with_load_state(false)
71 .with_save_state(false)
72 .add_data_client(
73 None, client_factory,
75 client_config,
76 )?
77 .build()?;
78
79 let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
81 let pools = vec![
82 Address::from_str("0xC31E54c7A869B9fCbECC14363CF510d1C41Fa443")?, ];
84
85 let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
86 let actor = BlockchainSubscriberActor::new(actor_config);
87
88 node.add_actor(actor)?;
89
90 node.run().await?;
91
92 Ok(())
93}
94
95#[derive(Debug, Clone)]
97pub struct BlockchainSubscriberActorConfig {
98 pub base: DataActorConfig,
100 pub client_id: ClientId,
102 pub chain: Blockchain,
104 pub pools: Vec<Address>,
106}
107
108impl BlockchainSubscriberActorConfig {
109 #[must_use]
111 pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<Address>) -> Self {
112 Self {
113 base: DataActorConfig::default(),
114 client_id,
115 chain,
116 pools,
117 }
118 }
119}
120
121#[derive(Debug)]
127pub struct BlockchainSubscriberActor {
128 core: DataActorCore,
129 config: BlockchainSubscriberActorConfig,
130 pub received_blocks: Vec<Block>,
131 pub received_pool_swaps: Vec<PoolSwap>,
132 pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
133 pub received_pools: Vec<Pool>,
134}
135
136impl Deref for BlockchainSubscriberActor {
137 type Target = DataActorCore;
138
139 fn deref(&self) -> &Self::Target {
140 &self.core
141 }
142}
143
144impl DerefMut for BlockchainSubscriberActor {
145 fn deref_mut(&mut self) -> &mut Self::Target {
146 &mut self.core
147 }
148}
149
150impl DataActor for BlockchainSubscriberActor {
151 fn on_start(&mut self) -> anyhow::Result<()> {
152 let client_id = self.config.client_id;
153
154 self.subscribe_blocks(self.config.chain, Some(client_id), None);
155
156 let pool_addresses = self.config.pools.clone();
157 for address in pool_addresses {
158 self.subscribe_pool(address, Some(client_id), None);
159 self.subscribe_pool_swaps(address, Some(client_id), None);
160 self.subscribe_pool_liquidity_updates(address, Some(client_id), None);
161 }
162
163 self.clock().set_timer(
164 "TEST-TIMER-1-SECOND",
165 Duration::from_secs(1),
166 None,
167 None,
168 None,
169 Some(true),
170 Some(false),
171 )?;
172
173 self.clock().set_timer(
174 "TEST-TIMER-2-SECOND",
175 Duration::from_secs(2),
176 None,
177 None,
178 None,
179 Some(true),
180 Some(false),
181 )?;
182
183 Ok(())
184 }
185
186 fn on_stop(&mut self) -> anyhow::Result<()> {
187 let client_id = self.config.client_id;
188
189 self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
190
191 let pool_addresses = self.config.pools.clone();
192 for address in pool_addresses {
193 self.unsubscribe_pool(address, Some(client_id), None);
194 self.unsubscribe_pool_swaps(address, Some(client_id), None);
195 self.unsubscribe_pool_liquidity_updates(address, Some(client_id), None);
196 }
197
198 Ok(())
199 }
200
201 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
202 log_info!("Received {block}", color = LogColor::Cyan);
203
204 self.received_blocks.push(block.clone());
205 Ok(())
206 }
207
208 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
209 log_info!("Received {swap}", color = LogColor::Cyan);
210
211 self.received_pool_swaps.push(swap.clone());
212 Ok(())
213 }
214}
215
216impl BlockchainSubscriberActor {
217 #[must_use]
219 pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
220 Self {
221 core: DataActorCore::new(config.base.clone()),
222 config,
223 received_blocks: Vec::new(),
224 received_pool_swaps: Vec::new(),
225 received_pool_liquidity_updates: Vec::new(),
226 received_pools: Vec::new(),
227 }
228 }
229
230 #[must_use]
232 pub const fn block_count(&self) -> usize {
233 self.received_blocks.len()
234 }
235
236 #[must_use]
238 pub const fn pool_count(&self) -> usize {
239 self.received_pools.len()
240 }
241
242 #[must_use]
244 pub const fn pool_swap_count(&self) -> usize {
245 self.received_pool_swaps.len()
246 }
247
248 #[must_use]
250 pub const fn pool_liquidity_update_count(&self) -> usize {
251 self.received_pool_liquidity_updates.len()
252 }
253}