nautilus_tardis/
replay.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    path::{Path, PathBuf},
20};
21
22use anyhow::Context;
23use arrow::array::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, future::join_all, pin_mut};
26use heck::ToSnakeCase;
27use nautilus_core::{UnixNanos, parsing::precision_from_str};
28use nautilus_model::{
29    data::{
30        Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
31        TradeTick,
32    },
33    identifiers::InstrumentId,
34};
35use nautilus_serialization::{
36    arrow::{
37        bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
38        book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
39        trades_to_arrow_record_batch_bytes,
40    },
41    parquet::write_batch_to_parquet,
42};
43use thousands::Separable;
44use ustr::Ustr;
45
46use super::{enums::Exchange, http::models::InstrumentInfo};
47use crate::{
48    config::TardisReplayConfig,
49    http::TardisHttpClient,
50    machine::{TardisMachineClient, types::InstrumentMiniInfo},
51    parse::{normalize_instrument_id, parse_instrument_id},
52};
53
54struct DateCursor {
55    /// Cursor date UTC.
56    date_utc: NaiveDate,
57    /// Cursor end timestamp UNIX nanoseconds.
58    end_ns: UnixNanos,
59}
60
61impl DateCursor {
62    /// Creates a new [`DateCursor`] instance.
63    fn new(current_ns: UnixNanos) -> Self {
64        let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
65        let date_utc = current_utc.date_naive();
66
67        // Calculate end of the current UTC day
68        // SAFETY: Known safe input values
69        let end_utc =
70            date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
71        let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
72
73        Self { date_utc, end_ns }
74    }
75}
76
77async fn gather_instruments_info(
78    config: &TardisReplayConfig,
79    http_client: &TardisHttpClient,
80) -> HashMap<Exchange, Vec<InstrumentInfo>> {
81    let futures = config.options.iter().map(|options| {
82        let exchange = options.exchange.clone();
83        let client = &http_client;
84
85        tracing::info!("Requesting instruments for {exchange}");
86
87        async move {
88            match client.instruments_info(exchange.clone(), None, None).await {
89                Ok(instruments) => Some((exchange, instruments)),
90                Err(e) => {
91                    tracing::error!("Error fetching instruments for {exchange}: {e}");
92                    None
93                }
94            }
95        }
96    });
97
98    let results: Vec<(Exchange, Vec<InstrumentInfo>)> =
99        join_all(futures).await.into_iter().flatten().collect();
100
101    tracing::info!("Received all instruments");
102
103    results.into_iter().collect()
104}
105
106/// Run the Tardis Machine replay from a JSON configuration file.
107///
108/// # Errors
109///
110/// Returns an error if reading or parsing the config file fails,
111/// or if any downstream replay operation fails.
112/// Run the Tardis Machine replay from a JSON configuration file.
113///
114/// # Panics
115///
116/// Panics if unable to determine the output path (current directory fallback fails).
117pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
118    tracing::info!("Starting replay");
119    tracing::info!("Config filepath: {config_filepath:?}");
120
121    // Load and parse the replay configuration
122    let config_data = fs::read_to_string(config_filepath)
123        .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
124    let config: TardisReplayConfig = serde_json::from_str(&config_data)
125        .context("Failed to parse config JSON into TardisReplayConfig")?;
126
127    let path = config
128        .output_path
129        .as_deref()
130        .map(Path::new)
131        .map(Path::to_path_buf)
132        .or_else(|| {
133            std::env::var("NAUTILUS_CATALOG_PATH")
134                .ok()
135                .map(|env_path| PathBuf::from(env_path).join("data"))
136        })
137        .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
138
139    tracing::info!("Output path: {path:?}");
140
141    let normalize_symbols = config.normalize_symbols.unwrap_or(true);
142    tracing::info!("normalize_symbols={normalize_symbols}");
143
144    let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
145    let mut machine_client =
146        TardisMachineClient::new(config.tardis_ws_url.as_deref(), normalize_symbols)?;
147
148    let info_map = gather_instruments_info(&config, &http_client).await;
149
150    for (exchange, instruments) in &info_map {
151        for inst in instruments {
152            let instrument_type = inst.instrument_type.clone();
153            let price_precision = precision_from_str(&inst.price_increment.to_string());
154            let size_precision = precision_from_str(&inst.amount_increment.to_string());
155
156            let instrument_id = if normalize_symbols {
157                normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
158            } else {
159                parse_instrument_id(exchange, inst.id)
160            };
161
162            let info = InstrumentMiniInfo::new(
163                instrument_id,
164                Some(Ustr::from(&inst.id)),
165                exchange.clone(),
166                price_precision,
167                size_precision,
168            );
169            machine_client.add_instrument_info(info);
170        }
171    }
172
173    tracing::info!("Starting tardis-machine stream");
174    let stream = machine_client.replay(config.options).await;
175    pin_mut!(stream);
176
177    // Initialize date cursors
178    let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
179    let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
180    let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
181    let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
182    let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
183
184    // Initialize date collection maps
185    let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
186    let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
187    let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
188    let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
189    let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
190
191    let mut msg_count = 0;
192
193    while let Some(msg) = stream.next().await {
194        match msg {
195            Data::Deltas(msg) => {
196                handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
197            }
198            Data::Depth10(msg) => {
199                handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
200            }
201            Data::Quote(msg) => handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path),
202            Data::Trade(msg) => handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path),
203            Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
204            Data::Delta(_) => panic!("Individual delta message not implemented (or required)"),
205            _ => panic!("Not implemented"),
206        }
207
208        msg_count += 1;
209        if msg_count % 100_000 == 0 {
210            tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
211        }
212    }
213
214    // Iterate through every remaining type and instrument sequentially
215
216    for (instrument_id, deltas) in deltas_map {
217        let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
218        batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
219    }
220
221    for (instrument_id, depths) in depths_map {
222        let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
223        batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
224    }
225
226    for (instrument_id, quotes) in quotes_map {
227        let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
228        batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
229    }
230
231    for (instrument_id, trades) in trades_map {
232        let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
233        batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
234    }
235
236    for (bar_type, bars) in bars_map {
237        let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
238        batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
239    }
240
241    tracing::info!(
242        "Replay completed after {} messages",
243        msg_count.separate_with_commas()
244    );
245    Ok(())
246}
247
248fn handle_deltas_msg(
249    deltas: OrderBookDeltas_API,
250    map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
251    cursors: &mut HashMap<InstrumentId, DateCursor>,
252    path: &Path,
253) {
254    let cursor = cursors
255        .entry(deltas.instrument_id)
256        .or_insert_with(|| DateCursor::new(deltas.ts_init));
257
258    if deltas.ts_init > cursor.end_ns {
259        if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
260            batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
261        }
262        // Update cursor
263        *cursor = DateCursor::new(deltas.ts_init);
264    }
265
266    map.entry(deltas.instrument_id)
267        .or_insert_with(|| Vec::with_capacity(1_000_000))
268        .extend(&*deltas.deltas);
269}
270
271fn handle_depth10_msg(
272    depth10: OrderBookDepth10,
273    map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
274    cursors: &mut HashMap<InstrumentId, DateCursor>,
275    path: &Path,
276) {
277    let cursor = cursors
278        .entry(depth10.instrument_id)
279        .or_insert_with(|| DateCursor::new(depth10.ts_init));
280
281    if depth10.ts_init > cursor.end_ns {
282        if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
283            batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
284        }
285        // Update cursor
286        *cursor = DateCursor::new(depth10.ts_init);
287    }
288
289    map.entry(depth10.instrument_id)
290        .or_insert_with(|| Vec::with_capacity(1_000_000))
291        .push(depth10);
292}
293
294fn handle_quote_msg(
295    quote: QuoteTick,
296    map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
297    cursors: &mut HashMap<InstrumentId, DateCursor>,
298    path: &Path,
299) {
300    let cursor = cursors
301        .entry(quote.instrument_id)
302        .or_insert_with(|| DateCursor::new(quote.ts_init));
303
304    if quote.ts_init > cursor.end_ns {
305        if let Some(quotes_vec) = map.remove(&quote.instrument_id) {
306            batch_and_write_quotes(quotes_vec, &quote.instrument_id, cursor.date_utc, path);
307        }
308        // Update cursor
309        *cursor = DateCursor::new(quote.ts_init);
310    }
311
312    map.entry(quote.instrument_id)
313        .or_insert_with(|| Vec::with_capacity(1_000_000))
314        .push(quote);
315}
316
317fn handle_trade_msg(
318    trade: TradeTick,
319    map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
320    cursors: &mut HashMap<InstrumentId, DateCursor>,
321    path: &Path,
322) {
323    let cursor = cursors
324        .entry(trade.instrument_id)
325        .or_insert_with(|| DateCursor::new(trade.ts_init));
326
327    if trade.ts_init > cursor.end_ns {
328        if let Some(trades_vec) = map.remove(&trade.instrument_id) {
329            batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
330        }
331        // Update cursor
332        *cursor = DateCursor::new(trade.ts_init);
333    }
334
335    map.entry(trade.instrument_id)
336        .or_insert_with(|| Vec::with_capacity(1_000_000))
337        .push(trade);
338}
339
340fn handle_bar_msg(
341    bar: Bar,
342    map: &mut HashMap<BarType, Vec<Bar>>,
343    cursors: &mut HashMap<BarType, DateCursor>,
344    path: &Path,
345) {
346    let cursor = cursors
347        .entry(bar.bar_type)
348        .or_insert_with(|| DateCursor::new(bar.ts_init));
349
350    if bar.ts_init > cursor.end_ns {
351        if let Some(bars_vec) = map.remove(&bar.bar_type) {
352            batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
353        }
354        // Update cursor
355        *cursor = DateCursor::new(bar.ts_init);
356    }
357
358    map.entry(bar.bar_type)
359        .or_insert_with(|| Vec::with_capacity(1_000_000))
360        .push(bar);
361}
362
363fn batch_and_write_deltas(
364    deltas: Vec<OrderBookDelta>,
365    instrument_id: &InstrumentId,
366    date: NaiveDate,
367    path: &Path,
368) {
369    let typename = stringify!(OrderBookDeltas);
370    match book_deltas_to_arrow_record_batch_bytes(deltas) {
371        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
372        Err(e) => {
373            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
374        }
375    }
376}
377
378fn batch_and_write_depths(
379    depths: Vec<OrderBookDepth10>,
380    instrument_id: &InstrumentId,
381    date: NaiveDate,
382    path: &Path,
383) {
384    let typename = stringify!(OrderBookDepth10);
385    match book_depth10_to_arrow_record_batch_bytes(depths) {
386        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
387        Err(e) => {
388            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
389        }
390    }
391}
392
393fn batch_and_write_quotes(
394    quotes: Vec<QuoteTick>,
395    instrument_id: &InstrumentId,
396    date: NaiveDate,
397    path: &Path,
398) {
399    let typename = stringify!(QuoteTick);
400    match quotes_to_arrow_record_batch_bytes(quotes) {
401        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
402        Err(e) => {
403            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
404        }
405    }
406}
407
408fn batch_and_write_trades(
409    trades: Vec<TradeTick>,
410    instrument_id: &InstrumentId,
411    date: NaiveDate,
412    path: &Path,
413) {
414    let typename = stringify!(TradeTick);
415    match trades_to_arrow_record_batch_bytes(trades) {
416        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
417        Err(e) => {
418            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
419        }
420    }
421}
422
423fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
424    let typename = stringify!(Bar);
425    let batch = match bars_to_arrow_record_batch_bytes(bars) {
426        Ok(batch) => batch,
427        Err(e) => {
428            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
429            return;
430        }
431    };
432
433    let filepath = path.join(parquet_filepath_bars(bar_type, date));
434    match write_batch_to_parquet(batch, &filepath, None, None, None) {
435        Ok(()) => tracing::info!("File written: {filepath:?}"),
436        Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
437    }
438}
439
440fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
441    let typename = typename.to_snake_case();
442    let instrument_id_str = instrument_id.to_string().replace('/', "");
443    let date_str = date.to_string().replace('-', "");
444    PathBuf::new()
445        .join(typename)
446        .join(instrument_id_str)
447        .join(format!("{date_str}.parquet"))
448}
449
450fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
451    let bar_type_str = bar_type.to_string().replace('/', "");
452    let date_str = date.to_string().replace('-', "");
453    PathBuf::new()
454        .join("bar")
455        .join(bar_type_str)
456        .join(format!("{date_str}.parquet"))
457}
458
459fn write_batch(
460    batch: RecordBatch,
461    typename: &str,
462    instrument_id: &InstrumentId,
463    date: NaiveDate,
464    path: &Path,
465) {
466    let filepath = path.join(parquet_filepath(typename, instrument_id, date));
467    match write_batch_to_parquet(batch, &filepath, None, None, None) {
468        Ok(()) => tracing::info!("File written: {filepath:?}"),
469        Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
470    }
471}
472
473///////////////////////////////////////////////////////////////////////////////////////////////////
474// Tests
475///////////////////////////////////////////////////////////////////////////////////////////////////
476#[cfg(test)]
477mod tests {
478    use chrono::{TimeZone, Utc};
479    use rstest::rstest;
480
481    use super::*;
482
483    #[rstest]
484    #[case(
485    // Start of day: 2024-01-01 00:00:00 UTC
486    Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
487    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
488    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
489)]
490    #[case(
491    // Midday: 2024-01-01 12:00:00 UTC
492    Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
493    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
494    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
495)]
496    #[case(
497    // End of day: 2024-01-01 23:59:59.999999999 UTC
498    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
499    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
500    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
501)]
502    #[case(
503    // Start of new day: 2024-01-02 00:00:00 UTC
504    Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
505    NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
506    Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
507)]
508    fn test_date_cursor(
509        #[case] timestamp: u64,
510        #[case] expected_date: NaiveDate,
511        #[case] expected_end_ns: u64,
512    ) {
513        let unix_nanos = UnixNanos::from(timestamp);
514        let cursor = DateCursor::new(unix_nanos);
515
516        assert_eq!(cursor.date_utc, expected_date);
517        assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
518    }
519}