1use std::{
17 collections::HashMap,
18 fs,
19 path::{Path, PathBuf},
20};
21
22use anyhow::Context;
23use arrow::array::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, future::join_all, pin_mut};
26use heck::ToSnakeCase;
27use nautilus_core::{UnixNanos, parsing::precision_from_str};
28use nautilus_model::{
29 data::{
30 Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
31 TradeTick,
32 },
33 identifiers::InstrumentId,
34};
35use nautilus_serialization::{
36 arrow::{
37 bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
38 book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
39 trades_to_arrow_record_batch_bytes,
40 },
41 parquet::write_batch_to_parquet,
42};
43use thousands::Separable;
44use ustr::Ustr;
45
46use super::{enums::Exchange, http::models::InstrumentInfo};
47use crate::{
48 config::TardisReplayConfig,
49 http::TardisHttpClient,
50 machine::{TardisMachineClient, types::InstrumentMiniInfo},
51 parse::{normalize_instrument_id, parse_instrument_id},
52};
53
54struct DateCursor {
55 date_utc: NaiveDate,
57 end_ns: UnixNanos,
59}
60
61impl DateCursor {
62 fn new(current_ns: UnixNanos) -> Self {
64 let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
65 let date_utc = current_utc.date_naive();
66
67 let end_utc =
70 date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
71 let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
72
73 Self { date_utc, end_ns }
74 }
75}
76
77async fn gather_instruments_info(
78 config: &TardisReplayConfig,
79 http_client: &TardisHttpClient,
80) -> HashMap<Exchange, Vec<InstrumentInfo>> {
81 let futures = config.options.iter().map(|options| {
82 let exchange = options.exchange.clone();
83 let client = &http_client;
84
85 tracing::info!("Requesting instruments for {exchange}");
86
87 async move {
88 match client.instruments_info(exchange.clone(), None, None).await {
89 Ok(instruments) => Some((exchange, instruments)),
90 Err(e) => {
91 tracing::error!("Error fetching instruments for {exchange}: {e}");
92 None
93 }
94 }
95 }
96 });
97
98 let results: Vec<(Exchange, Vec<InstrumentInfo>)> =
99 join_all(futures).await.into_iter().flatten().collect();
100
101 tracing::info!("Received all instruments");
102
103 results.into_iter().collect()
104}
105
106pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
118 tracing::info!("Starting replay");
119 tracing::info!("Config filepath: {config_filepath:?}");
120
121 let config_data = fs::read_to_string(config_filepath)
123 .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
124 let config: TardisReplayConfig = serde_json::from_str(&config_data)
125 .context("Failed to parse config JSON into TardisReplayConfig")?;
126
127 let path = config
128 .output_path
129 .as_deref()
130 .map(Path::new)
131 .map(Path::to_path_buf)
132 .or_else(|| {
133 std::env::var("NAUTILUS_CATALOG_PATH")
134 .ok()
135 .map(|env_path| PathBuf::from(env_path).join("data"))
136 })
137 .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
138
139 tracing::info!("Output path: {path:?}");
140
141 let normalize_symbols = config.normalize_symbols.unwrap_or(true);
142 tracing::info!("normalize_symbols={normalize_symbols}");
143
144 let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
145 let mut machine_client =
146 TardisMachineClient::new(config.tardis_ws_url.as_deref(), normalize_symbols)?;
147
148 let info_map = gather_instruments_info(&config, &http_client).await;
149
150 for (exchange, instruments) in &info_map {
151 for inst in instruments {
152 let instrument_type = inst.instrument_type.clone();
153 let price_precision = precision_from_str(&inst.price_increment.to_string());
154 let size_precision = precision_from_str(&inst.amount_increment.to_string());
155
156 let instrument_id = if normalize_symbols {
157 normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
158 } else {
159 parse_instrument_id(exchange, inst.id)
160 };
161
162 let info = InstrumentMiniInfo::new(
163 instrument_id,
164 Some(Ustr::from(&inst.id)),
165 exchange.clone(),
166 price_precision,
167 size_precision,
168 );
169 machine_client.add_instrument_info(info);
170 }
171 }
172
173 tracing::info!("Starting tardis-machine stream");
174 let stream = machine_client.replay(config.options).await;
175 pin_mut!(stream);
176
177 let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
179 let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
180 let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
181 let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
182 let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
183
184 let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
186 let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
187 let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
188 let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
189 let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
190
191 let mut msg_count = 0;
192
193 while let Some(msg) = stream.next().await {
194 match msg {
195 Data::Deltas(msg) => {
196 handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
197 }
198 Data::Depth10(msg) => {
199 handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
200 }
201 Data::Quote(msg) => handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path),
202 Data::Trade(msg) => handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path),
203 Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
204 Data::Delta(_) => panic!("Individual delta message not implemented (or required)"),
205 _ => panic!("Not implemented"),
206 }
207
208 msg_count += 1;
209 if msg_count % 100_000 == 0 {
210 tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
211 }
212 }
213
214 for (instrument_id, deltas) in deltas_map {
217 let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
218 batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
219 }
220
221 for (instrument_id, depths) in depths_map {
222 let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
223 batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
224 }
225
226 for (instrument_id, quotes) in quotes_map {
227 let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
228 batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
229 }
230
231 for (instrument_id, trades) in trades_map {
232 let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
233 batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
234 }
235
236 for (bar_type, bars) in bars_map {
237 let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
238 batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
239 }
240
241 tracing::info!(
242 "Replay completed after {} messages",
243 msg_count.separate_with_commas()
244 );
245 Ok(())
246}
247
248fn handle_deltas_msg(
249 deltas: OrderBookDeltas_API,
250 map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
251 cursors: &mut HashMap<InstrumentId, DateCursor>,
252 path: &Path,
253) {
254 let cursor = cursors
255 .entry(deltas.instrument_id)
256 .or_insert_with(|| DateCursor::new(deltas.ts_init));
257
258 if deltas.ts_init > cursor.end_ns {
259 if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
260 batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
261 }
262 *cursor = DateCursor::new(deltas.ts_init);
264 }
265
266 map.entry(deltas.instrument_id)
267 .or_insert_with(|| Vec::with_capacity(1_000_000))
268 .extend(&*deltas.deltas);
269}
270
271fn handle_depth10_msg(
272 depth10: OrderBookDepth10,
273 map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
274 cursors: &mut HashMap<InstrumentId, DateCursor>,
275 path: &Path,
276) {
277 let cursor = cursors
278 .entry(depth10.instrument_id)
279 .or_insert_with(|| DateCursor::new(depth10.ts_init));
280
281 if depth10.ts_init > cursor.end_ns {
282 if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
283 batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
284 }
285 *cursor = DateCursor::new(depth10.ts_init);
287 }
288
289 map.entry(depth10.instrument_id)
290 .or_insert_with(|| Vec::with_capacity(1_000_000))
291 .push(depth10);
292}
293
294fn handle_quote_msg(
295 quote: QuoteTick,
296 map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
297 cursors: &mut HashMap<InstrumentId, DateCursor>,
298 path: &Path,
299) {
300 let cursor = cursors
301 .entry(quote.instrument_id)
302 .or_insert_with(|| DateCursor::new(quote.ts_init));
303
304 if quote.ts_init > cursor.end_ns {
305 if let Some(quotes_vec) = map.remove("e.instrument_id) {
306 batch_and_write_quotes(quotes_vec, "e.instrument_id, cursor.date_utc, path);
307 }
308 *cursor = DateCursor::new(quote.ts_init);
310 }
311
312 map.entry(quote.instrument_id)
313 .or_insert_with(|| Vec::with_capacity(1_000_000))
314 .push(quote);
315}
316
317fn handle_trade_msg(
318 trade: TradeTick,
319 map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
320 cursors: &mut HashMap<InstrumentId, DateCursor>,
321 path: &Path,
322) {
323 let cursor = cursors
324 .entry(trade.instrument_id)
325 .or_insert_with(|| DateCursor::new(trade.ts_init));
326
327 if trade.ts_init > cursor.end_ns {
328 if let Some(trades_vec) = map.remove(&trade.instrument_id) {
329 batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
330 }
331 *cursor = DateCursor::new(trade.ts_init);
333 }
334
335 map.entry(trade.instrument_id)
336 .or_insert_with(|| Vec::with_capacity(1_000_000))
337 .push(trade);
338}
339
340fn handle_bar_msg(
341 bar: Bar,
342 map: &mut HashMap<BarType, Vec<Bar>>,
343 cursors: &mut HashMap<BarType, DateCursor>,
344 path: &Path,
345) {
346 let cursor = cursors
347 .entry(bar.bar_type)
348 .or_insert_with(|| DateCursor::new(bar.ts_init));
349
350 if bar.ts_init > cursor.end_ns {
351 if let Some(bars_vec) = map.remove(&bar.bar_type) {
352 batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
353 }
354 *cursor = DateCursor::new(bar.ts_init);
356 }
357
358 map.entry(bar.bar_type)
359 .or_insert_with(|| Vec::with_capacity(1_000_000))
360 .push(bar);
361}
362
363fn batch_and_write_deltas(
364 deltas: Vec<OrderBookDelta>,
365 instrument_id: &InstrumentId,
366 date: NaiveDate,
367 path: &Path,
368) {
369 let typename = stringify!(OrderBookDeltas);
370 match book_deltas_to_arrow_record_batch_bytes(deltas) {
371 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
372 Err(e) => {
373 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
374 }
375 }
376}
377
378fn batch_and_write_depths(
379 depths: Vec<OrderBookDepth10>,
380 instrument_id: &InstrumentId,
381 date: NaiveDate,
382 path: &Path,
383) {
384 let typename = stringify!(OrderBookDepth10);
385 match book_depth10_to_arrow_record_batch_bytes(depths) {
386 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
387 Err(e) => {
388 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
389 }
390 }
391}
392
393fn batch_and_write_quotes(
394 quotes: Vec<QuoteTick>,
395 instrument_id: &InstrumentId,
396 date: NaiveDate,
397 path: &Path,
398) {
399 let typename = stringify!(QuoteTick);
400 match quotes_to_arrow_record_batch_bytes(quotes) {
401 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
402 Err(e) => {
403 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
404 }
405 }
406}
407
408fn batch_and_write_trades(
409 trades: Vec<TradeTick>,
410 instrument_id: &InstrumentId,
411 date: NaiveDate,
412 path: &Path,
413) {
414 let typename = stringify!(TradeTick);
415 match trades_to_arrow_record_batch_bytes(trades) {
416 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
417 Err(e) => {
418 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
419 }
420 }
421}
422
423fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
424 let typename = stringify!(Bar);
425 let batch = match bars_to_arrow_record_batch_bytes(bars) {
426 Ok(batch) => batch,
427 Err(e) => {
428 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
429 return;
430 }
431 };
432
433 let filepath = path.join(parquet_filepath_bars(bar_type, date));
434 match write_batch_to_parquet(batch, &filepath, None, None, None) {
435 Ok(()) => tracing::info!("File written: {filepath:?}"),
436 Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
437 }
438}
439
440fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
441 let typename = typename.to_snake_case();
442 let instrument_id_str = instrument_id.to_string().replace('/', "");
443 let date_str = date.to_string().replace('-', "");
444 PathBuf::new()
445 .join(typename)
446 .join(instrument_id_str)
447 .join(format!("{date_str}.parquet"))
448}
449
450fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
451 let bar_type_str = bar_type.to_string().replace('/', "");
452 let date_str = date.to_string().replace('-', "");
453 PathBuf::new()
454 .join("bar")
455 .join(bar_type_str)
456 .join(format!("{date_str}.parquet"))
457}
458
459fn write_batch(
460 batch: RecordBatch,
461 typename: &str,
462 instrument_id: &InstrumentId,
463 date: NaiveDate,
464 path: &Path,
465) {
466 let filepath = path.join(parquet_filepath(typename, instrument_id, date));
467 match write_batch_to_parquet(batch, &filepath, None, None, None) {
468 Ok(()) => tracing::info!("File written: {filepath:?}"),
469 Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
470 }
471}
472
473#[cfg(test)]
477mod tests {
478 use chrono::{TimeZone, Utc};
479 use rstest::rstest;
480
481 use super::*;
482
483 #[rstest]
484 #[case(
485 Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
487 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
488 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
489)]
490 #[case(
491 Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
493 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
494 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
495)]
496 #[case(
497 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
499 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
500 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
501)]
502 #[case(
503 Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
505 NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
506 Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
507)]
508 fn test_date_cursor(
509 #[case] timestamp: u64,
510 #[case] expected_date: NaiveDate,
511 #[case] expected_end_ns: u64,
512 ) {
513 let unix_nanos = UnixNanos::from(timestamp);
514 let cursor = DateCursor::new(unix_nanos);
515
516 assert_eq!(cursor.date_utc, expected_date);
517 assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
518 }
519}