databento_node_test/
node_test.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    ops::{Deref, DerefMut},
18    path::PathBuf,
19    time::Duration,
20};
21
22use nautilus_common::{
23    actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
24    enums::Environment,
25    timer::TimeEvent,
26};
27use nautilus_core::env::get_env_var;
28use posei_traderbento::factories::{DatabentoDataClientFactory, DatabentoLiveClientConfig};
29use nautilus_live::node::LiveNode;
30use nautilus_model::{
31    data::{QuoteTick, TradeTick},
32    identifiers::{ClientId, InstrumentId, TraderId},
33};
34
35// Run with `cargo run --bin databento-node-test --features high-precision`
36
37#[tokio::main]
38async fn main() -> Result<(), Box<dyn std::error::Error>> {
39    dotenvy::dotenv().ok();
40
41    let environment = Environment::Live;
42    let trader_id = TraderId::default();
43    let node_name = "DATABENTO-TESTER-001".to_string();
44
45    // Get Databento API key from environment
46    let api_key = get_env_var("DATABENTO_API_KEY").unwrap_or_else(|_| {
47        println!("⚠️  DATABENTO_API_KEY not found, using placeholder");
48        "db-placeholder-key".to_string()
49    });
50
51    // Determine publishers file path
52    let publishers_filepath = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
53    if !publishers_filepath.exists() {
54        println!(
55            "⚠️  Publishers file not found at: {}",
56            publishers_filepath.display()
57        );
58    }
59
60    // Configure Databento client
61    let databento_config = DatabentoLiveClientConfig::new(
62        api_key,
63        publishers_filepath,
64        true, // use_exchange_as_venue
65        true, // bars_timestamp_on_close
66    );
67
68    let client_factory = DatabentoDataClientFactory::new();
69
70    // Create and register a Databento subscriber actor
71    let client_id = ClientId::new("DATABENTO");
72    let instrument_ids = vec![
73        InstrumentId::from("ESM5.XCME"),
74        // Add more instruments as needed
75    ];
76
77    // Build the live node with Databento data client
78    let mut node = LiveNode::builder(node_name, trader_id, environment)?
79        .with_load_state(false)
80        .with_save_state(false)
81        .add_data_client(None, client_factory, databento_config)?
82        .build()?;
83
84    let actor_config = DatabentoSubscriberActorConfig::new(client_id, instrument_ids);
85    let actor = DatabentoSubscriberActor::new(actor_config);
86
87    node.add_actor(actor)?;
88
89    node.run().await?;
90
91    Ok(())
92}
93
94/// Configuration for the Databento subscriber actor.
95#[derive(Debug, Clone)]
96pub struct DatabentoSubscriberActorConfig {
97    /// Base data actor configuration.
98    pub base: DataActorConfig,
99    /// Client ID to use for subscriptions.
100    pub client_id: ClientId,
101    /// Instrument IDs to subscribe to.
102    pub instrument_ids: Vec<InstrumentId>,
103}
104
105impl DatabentoSubscriberActorConfig {
106    /// Creates a new [`DatabentoSubscriberActorConfig`] instance.
107    #[must_use]
108    pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
109        Self {
110            base: DataActorConfig::default(),
111            client_id,
112            instrument_ids,
113        }
114    }
115}
116
117/// A basic Databento subscriber actor that subscribes to quotes and trades.
118///
119/// This actor demonstrates how to use the `DataActor` trait to subscribe to market data
120/// from Databento for specified instruments. It logs received quotes and trades to
121/// demonstrate the data flow.
122#[derive(Debug)]
123pub struct DatabentoSubscriberActor {
124    core: DataActorCore,
125    config: DatabentoSubscriberActorConfig,
126    pub received_quotes: Vec<QuoteTick>,
127    pub received_trades: Vec<TradeTick>,
128}
129
130impl Deref for DatabentoSubscriberActor {
131    type Target = DataActorCore;
132
133    fn deref(&self) -> &Self::Target {
134        &self.core
135    }
136}
137
138impl DerefMut for DatabentoSubscriberActor {
139    fn deref_mut(&mut self) -> &mut Self::Target {
140        &mut self.core
141    }
142}
143
144impl DataActor for DatabentoSubscriberActor {
145    fn on_start(&mut self) -> anyhow::Result<()> {
146        let instrument_ids = self.config.instrument_ids.clone();
147        let client_id = self.config.client_id;
148
149        // Subscribe to quotes and trades for each instrument
150        for instrument_id in instrument_ids {
151            log::info!("Subscribing to quotes for {instrument_id}");
152            self.subscribe_quotes(instrument_id, Some(client_id), None);
153
154            log::info!("Subscribing to trades for {instrument_id}");
155            self.subscribe_trades(instrument_id, Some(client_id), None);
156        }
157
158        self.clock().set_timer(
159            "TEST-TIMER-1-SECOND",
160            Duration::from_secs(1),
161            None,
162            None,
163            None,
164            Some(true),
165            Some(false),
166        )?;
167
168        self.clock().set_timer(
169            "TEST-TIMER-2-SECOND",
170            Duration::from_secs(2),
171            None,
172            None,
173            None,
174            Some(true),
175            Some(false),
176        )?;
177
178        Ok(())
179    }
180
181    fn on_stop(&mut self) -> anyhow::Result<()> {
182        let instrument_ids = self.config.instrument_ids.clone();
183        let client_id = self.config.client_id;
184
185        // Unsubscribe to quotes and trades for each instrument
186        for instrument_id in instrument_ids {
187            log::info!("Unsubscribing from quotes for {instrument_id}");
188            self.unsubscribe_quotes(instrument_id, Some(client_id), None);
189
190            log::info!("Unsubscribing from trades for {instrument_id}");
191            self.unsubscribe_trades(instrument_id, Some(client_id), None);
192        }
193
194        Ok(())
195    }
196
197    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
198        log::info!("Received time event: {event:?}");
199        Ok(())
200    }
201
202    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
203        log::info!("Received quote: {quote:?}");
204        self.received_quotes.push(*quote);
205        Ok(())
206    }
207
208    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
209        log::info!("Received trade: {trade:?}");
210        self.received_trades.push(*trade);
211        Ok(())
212    }
213}
214
215impl DatabentoSubscriberActor {
216    /// Creates a new [`DatabentoSubscriberActor`] instance.
217    #[must_use]
218    pub fn new(config: DatabentoSubscriberActorConfig) -> Self {
219        Self {
220            core: DataActorCore::new(config.base.clone()),
221            config,
222            received_quotes: Vec::new(),
223            received_trades: Vec::new(),
224        }
225    }
226
227    /// Returns the number of quotes received by this actor.
228    #[must_use]
229    pub const fn quote_count(&self) -> usize {
230        self.received_quotes.len()
231    }
232
233    /// Returns the number of trades received by this actor.
234    #[must_use]
235    pub const fn trade_count(&self) -> usize {
236        self.received_trades.len()
237    }
238}