databento_node_test/
node_test.rs1use std::{
17 ops::{Deref, DerefMut},
18 path::PathBuf,
19 time::Duration,
20};
21
22use nautilus_common::{
23 actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
24 enums::Environment,
25 timer::TimeEvent,
26};
27use nautilus_core::env::get_env_var;
28use posei_traderbento::factories::{DatabentoDataClientFactory, DatabentoLiveClientConfig};
29use nautilus_live::node::LiveNode;
30use nautilus_model::{
31 data::{QuoteTick, TradeTick},
32 identifiers::{ClientId, InstrumentId, TraderId},
33};
34
35#[tokio::main]
38async fn main() -> Result<(), Box<dyn std::error::Error>> {
39 dotenvy::dotenv().ok();
40
41 let environment = Environment::Live;
42 let trader_id = TraderId::default();
43 let node_name = "DATABENTO-TESTER-001".to_string();
44
45 let api_key = get_env_var("DATABENTO_API_KEY").unwrap_or_else(|_| {
47 println!("⚠️ DATABENTO_API_KEY not found, using placeholder");
48 "db-placeholder-key".to_string()
49 });
50
51 let publishers_filepath = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
53 if !publishers_filepath.exists() {
54 println!(
55 "⚠️ Publishers file not found at: {}",
56 publishers_filepath.display()
57 );
58 }
59
60 let databento_config = DatabentoLiveClientConfig::new(
62 api_key,
63 publishers_filepath,
64 true, true, );
67
68 let client_factory = DatabentoDataClientFactory::new();
69
70 let client_id = ClientId::new("DATABENTO");
72 let instrument_ids = vec![
73 InstrumentId::from("ESM5.XCME"),
74 ];
76
77 let mut node = LiveNode::builder(node_name, trader_id, environment)?
79 .with_load_state(false)
80 .with_save_state(false)
81 .add_data_client(None, client_factory, databento_config)?
82 .build()?;
83
84 let actor_config = DatabentoSubscriberActorConfig::new(client_id, instrument_ids);
85 let actor = DatabentoSubscriberActor::new(actor_config);
86
87 node.add_actor(actor)?;
88
89 node.run().await?;
90
91 Ok(())
92}
93
94#[derive(Debug, Clone)]
96pub struct DatabentoSubscriberActorConfig {
97 pub base: DataActorConfig,
99 pub client_id: ClientId,
101 pub instrument_ids: Vec<InstrumentId>,
103}
104
105impl DatabentoSubscriberActorConfig {
106 #[must_use]
108 pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
109 Self {
110 base: DataActorConfig::default(),
111 client_id,
112 instrument_ids,
113 }
114 }
115}
116
117#[derive(Debug)]
123pub struct DatabentoSubscriberActor {
124 core: DataActorCore,
125 config: DatabentoSubscriberActorConfig,
126 pub received_quotes: Vec<QuoteTick>,
127 pub received_trades: Vec<TradeTick>,
128}
129
130impl Deref for DatabentoSubscriberActor {
131 type Target = DataActorCore;
132
133 fn deref(&self) -> &Self::Target {
134 &self.core
135 }
136}
137
138impl DerefMut for DatabentoSubscriberActor {
139 fn deref_mut(&mut self) -> &mut Self::Target {
140 &mut self.core
141 }
142}
143
144impl DataActor for DatabentoSubscriberActor {
145 fn on_start(&mut self) -> anyhow::Result<()> {
146 let instrument_ids = self.config.instrument_ids.clone();
147 let client_id = self.config.client_id;
148
149 for instrument_id in instrument_ids {
151 log::info!("Subscribing to quotes for {instrument_id}");
152 self.subscribe_quotes(instrument_id, Some(client_id), None);
153
154 log::info!("Subscribing to trades for {instrument_id}");
155 self.subscribe_trades(instrument_id, Some(client_id), None);
156 }
157
158 self.clock().set_timer(
159 "TEST-TIMER-1-SECOND",
160 Duration::from_secs(1),
161 None,
162 None,
163 None,
164 Some(true),
165 Some(false),
166 )?;
167
168 self.clock().set_timer(
169 "TEST-TIMER-2-SECOND",
170 Duration::from_secs(2),
171 None,
172 None,
173 None,
174 Some(true),
175 Some(false),
176 )?;
177
178 Ok(())
179 }
180
181 fn on_stop(&mut self) -> anyhow::Result<()> {
182 let instrument_ids = self.config.instrument_ids.clone();
183 let client_id = self.config.client_id;
184
185 for instrument_id in instrument_ids {
187 log::info!("Unsubscribing from quotes for {instrument_id}");
188 self.unsubscribe_quotes(instrument_id, Some(client_id), None);
189
190 log::info!("Unsubscribing from trades for {instrument_id}");
191 self.unsubscribe_trades(instrument_id, Some(client_id), None);
192 }
193
194 Ok(())
195 }
196
197 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
198 log::info!("Received time event: {event:?}");
199 Ok(())
200 }
201
202 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
203 log::info!("Received quote: {quote:?}");
204 self.received_quotes.push(*quote);
205 Ok(())
206 }
207
208 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
209 log::info!("Received trade: {trade:?}");
210 self.received_trades.push(*trade);
211 Ok(())
212 }
213}
214
215impl DatabentoSubscriberActor {
216 #[must_use]
218 pub fn new(config: DatabentoSubscriberActorConfig) -> Self {
219 Self {
220 core: DataActorCore::new(config.base.clone()),
221 config,
222 received_quotes: Vec::new(),
223 received_trades: Vec::new(),
224 }
225 }
226
227 #[must_use]
229 pub const fn quote_count(&self) -> usize {
230 self.received_quotes.len()
231 }
232
233 #[must_use]
235 pub const fn trade_count(&self) -> usize {
236 self.received_trades.len()
237 }
238}