1mod record;
17
18use std::{
19 error::Error,
20 ffi::OsStr,
21 fs::File,
22 io::{BufReader, Read, Seek, SeekFrom},
23 path::Path,
24 time::Duration,
25};
26
27use csv::{Reader, ReaderBuilder, StringRecord};
28use flate2::read::GzDecoder;
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31 data::{
32 BookOrder, DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
33 },
34 enums::{BookAction, OrderSide, RecordFlag},
35 identifiers::{InstrumentId, TradeId},
36 types::{Quantity, fixed::FIXED_PRECISION},
37};
38
39use super::{
40 csv::record::{
41 TardisBookUpdateRecord, TardisOrderBookSnapshot5Record, TardisOrderBookSnapshot25Record,
42 TardisQuoteRecord, TardisTradeRecord,
43 },
44 parse::{
45 parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
46 parse_timestamp,
47 },
48};
49use crate::parse::parse_price;
50
51fn infer_precision(value: f64) -> u8 {
52 let str_value = value.to_string(); match str_value.find('.') {
54 Some(decimal_idx) => (str_value.len() - decimal_idx - 1) as u8,
55 None => 0,
56 }
57}
58
59fn create_csv_reader<P: AsRef<Path>>(
60 filepath: P,
61) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
62 let filepath_ref = filepath.as_ref();
63 const MAX_RETRIES: u8 = 3;
64 const DELAY_MS: u64 = 100;
65
66 fn open_file_with_retry<P: AsRef<Path>>(
67 path: P,
68 max_retries: u8,
69 delay_ms: u64,
70 ) -> anyhow::Result<File> {
71 let path_ref = path.as_ref();
72 for attempt in 1..=max_retries {
73 match File::open(path_ref) {
74 Ok(file) => return Ok(file),
75 Err(e) => {
76 if attempt == max_retries {
77 anyhow::bail!(
78 "Failed to open file '{path_ref:?}' after {max_retries} attempts: {e}"
79 );
80 }
81 eprintln!(
82 "Attempt {attempt}/{max_retries} failed to open file '{path_ref:?}': {e}. Retrying after {delay_ms}ms..."
83 );
84 std::thread::sleep(Duration::from_millis(delay_ms));
85 }
86 }
87 }
88 unreachable!("Loop should return either Ok or Err");
89 }
90
91 let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
92
93 let is_gzipped = filepath_ref
94 .extension()
95 .and_then(OsStr::to_str)
96 .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
97
98 if !is_gzipped {
99 let buf_reader = BufReader::new(file);
100 return Ok(ReaderBuilder::new()
101 .has_headers(true)
102 .from_reader(Box::new(buf_reader)));
103 }
104
105 let file_size = file.metadata()?.len();
106 if file_size < 2 {
107 anyhow::bail!("File too small to be a valid gzip file");
108 }
109
110 let mut header_buf = [0u8; 2];
111 for attempt in 1..=MAX_RETRIES {
112 match file.read_exact(&mut header_buf) {
113 Ok(()) => break,
114 Err(e) => {
115 if attempt == MAX_RETRIES {
116 anyhow::bail!(
117 "Failed to read gzip header from '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
118 );
119 }
120 eprintln!(
121 "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
122 );
123 std::thread::sleep(Duration::from_millis(DELAY_MS));
124 }
125 }
126 }
127
128 if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
129 anyhow::bail!("File '{filepath_ref:?}' has .gz extension but invalid gzip header");
130 }
131
132 for attempt in 1..=MAX_RETRIES {
133 match file.seek(SeekFrom::Start(0)) {
134 Ok(_) => break,
135 Err(e) => {
136 if attempt == MAX_RETRIES {
137 anyhow::bail!(
138 "Failed to reset file position for '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
139 );
140 }
141 eprintln!(
142 "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
143 );
144 std::thread::sleep(Duration::from_millis(DELAY_MS));
145 }
146 }
147 }
148
149 let buf_reader = BufReader::new(file);
150 let decoder = GzDecoder::new(buf_reader);
151
152 Ok(ReaderBuilder::new()
153 .has_headers(true)
154 .from_reader(Box::new(decoder)))
155}
156
157pub fn load_deltas<P: AsRef<Path>>(
168 filepath: P,
169 price_precision: Option<u8>,
170 size_precision: Option<u8>,
171 instrument_id: Option<InstrumentId>,
172 limit: Option<usize>,
173) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
174 let (price_precision, size_precision) = match (price_precision, size_precision) {
176 (Some(p), Some(s)) => (p, s),
177 (price_precision, size_precision) => {
178 let mut reader = create_csv_reader(&filepath)?;
179 let mut record = StringRecord::new();
180
181 let mut max_price_precision = 0u8;
182 let mut max_size_precision = 0u8;
183 let mut count = 0;
184
185 while reader.read_record(&mut record)? {
186 let parsed: TardisBookUpdateRecord = record.deserialize(None)?;
187
188 if price_precision.is_none() {
189 max_price_precision = infer_precision(parsed.price).max(max_price_precision);
190 }
191
192 if size_precision.is_none() {
193 max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
194 }
195
196 if let Some(limit) = limit {
197 if count >= limit {
198 break;
199 }
200 count += 1;
201 }
202 }
203
204 drop(reader);
205
206 max_price_precision = max_price_precision.min(FIXED_PRECISION);
207 max_size_precision = max_size_precision.min(FIXED_PRECISION);
208
209 (
210 price_precision.unwrap_or(max_price_precision),
211 size_precision.unwrap_or(max_size_precision),
212 )
213 }
214 };
215
216 let mut deltas: Vec<OrderBookDelta> = Vec::new();
217 let mut last_ts_event = UnixNanos::default();
218
219 let mut reader = create_csv_reader(filepath)?;
220 let mut record = StringRecord::new();
221
222 while reader.read_record(&mut record)? {
223 let record: TardisBookUpdateRecord = record.deserialize(None)?;
224
225 let instrument_id = match &instrument_id {
226 Some(id) => *id,
227 None => parse_instrument_id(&record.exchange, record.symbol),
228 };
229 let side = parse_order_side(&record.side);
230 let price = parse_price(record.price, price_precision);
231 let size = Quantity::new(record.amount, size_precision);
232 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
234
235 let action = parse_book_action(record.is_snapshot, size.as_f64());
236 let flags = 0; let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
239 let ts_init = parse_timestamp(record.local_timestamp);
240
241 if last_ts_event != ts_event {
243 if let Some(last_delta) = deltas.last_mut() {
244 last_delta.flags = RecordFlag::F_LAST.value();
246 }
247 }
248
249 assert!(
250 !(action != BookAction::Delete && size.is_zero()),
251 "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {record:?}"
252 );
253
254 last_ts_event = ts_event;
255
256 let delta = OrderBookDelta::new(
257 instrument_id,
258 action,
259 order,
260 flags,
261 sequence,
262 ts_event,
263 ts_init,
264 );
265
266 deltas.push(delta);
267
268 if let Some(limit) = limit {
269 if deltas.len() >= limit {
270 break;
271 }
272 }
273 }
274
275 if let Some(last_delta) = deltas.last_mut() {
277 last_delta.flags = RecordFlag::F_LAST.value();
278 }
279
280 Ok(deltas)
281}
282
283fn create_book_order(
284 side: OrderSide,
285 price: Option<f64>,
286 amount: Option<f64>,
287 price_precision: u8,
288 size_precision: u8,
289) -> (BookOrder, u32) {
290 match price {
291 Some(price) => (
292 BookOrder::new(
293 side,
294 parse_price(price, price_precision),
295 Quantity::new(amount.unwrap_or(0.0), size_precision),
296 0,
297 ),
298 1, ),
300 None => (NULL_ORDER, 0), }
302}
303
304pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
315 filepath: P,
316 price_precision: Option<u8>,
317 size_precision: Option<u8>,
318 instrument_id: Option<InstrumentId>,
319 limit: Option<usize>,
320) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
321 let (price_precision, size_precision) = match (price_precision, size_precision) {
323 (Some(p), Some(s)) => (p, s),
324 (price_precision, size_precision) => {
325 let mut reader = create_csv_reader(&filepath)?;
326 let mut record = StringRecord::new();
327
328 let mut max_price_precision = 0u8;
329 let mut max_size_precision = 0u8;
330 let mut count = 0;
331
332 while reader.read_record(&mut record)? {
333 let parsed: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
334
335 if price_precision.is_none() {
336 if let Some(bid_price) = parsed.bids_0_price {
337 max_price_precision = infer_precision(bid_price).max(max_price_precision);
338 }
339 }
340
341 if size_precision.is_none() {
342 if let Some(bid_amount) = parsed.bids_0_amount {
343 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
344 }
345 }
346
347 if let Some(limit) = limit {
348 if count >= limit {
349 break;
350 }
351 count += 1;
352 }
353 }
354
355 drop(reader);
356
357 max_price_precision = max_price_precision.min(FIXED_PRECISION);
358 max_size_precision = max_size_precision.min(FIXED_PRECISION);
359
360 (
361 price_precision.unwrap_or(max_price_precision),
362 size_precision.unwrap_or(max_size_precision),
363 )
364 }
365 };
366
367 let mut depths: Vec<OrderBookDepth10> = Vec::new();
368
369 let mut reader = create_csv_reader(filepath)?;
370 let mut record = StringRecord::new();
371 while reader.read_record(&mut record)? {
372 let record: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
373 let instrument_id = match &instrument_id {
374 Some(id) => *id,
375 None => parse_instrument_id(&record.exchange, record.symbol),
376 };
377 let flags = RecordFlag::F_LAST.value();
378 let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
380 let ts_init = parse_timestamp(record.local_timestamp);
381
382 let mut bids = [NULL_ORDER; DEPTH10_LEN];
384 let mut asks = [NULL_ORDER; DEPTH10_LEN];
385 let mut bid_counts = [0u32; DEPTH10_LEN];
386 let mut ask_counts = [0u32; DEPTH10_LEN];
387
388 for i in 0..=4 {
389 let (bid_order, bid_count) = create_book_order(
391 OrderSide::Buy,
392 match i {
393 0 => record.bids_0_price,
394 1 => record.bids_1_price,
395 2 => record.bids_2_price,
396 3 => record.bids_3_price,
397 4 => record.bids_4_price,
398 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
399 },
400 match i {
401 0 => record.bids_0_amount,
402 1 => record.bids_1_amount,
403 2 => record.bids_2_amount,
404 3 => record.bids_3_amount,
405 4 => record.bids_4_amount,
406 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
407 },
408 price_precision,
409 size_precision,
410 );
411 bids[i] = bid_order;
412 bid_counts[i] = bid_count;
413
414 let (ask_order, ask_count) = create_book_order(
416 OrderSide::Sell,
417 match i {
418 0 => record.asks_0_price,
419 1 => record.asks_1_price,
420 2 => record.asks_2_price,
421 3 => record.asks_3_price,
422 4 => record.asks_4_price,
423 _ => None, },
425 match i {
426 0 => record.asks_0_amount,
427 1 => record.asks_1_amount,
428 2 => record.asks_2_amount,
429 3 => record.asks_3_amount,
430 4 => record.asks_4_amount,
431 _ => None, },
433 price_precision,
434 size_precision,
435 );
436 asks[i] = ask_order;
437 ask_counts[i] = ask_count;
438 }
439
440 let depth = OrderBookDepth10::new(
441 instrument_id,
442 bids,
443 asks,
444 bid_counts,
445 ask_counts,
446 flags,
447 sequence,
448 ts_event,
449 ts_init,
450 );
451
452 depths.push(depth);
453
454 if let Some(limit) = limit {
455 if depths.len() >= limit {
456 break;
457 }
458 }
459 }
460
461 Ok(depths)
462}
463
464pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
475 filepath: P,
476 price_precision: Option<u8>,
477 size_precision: Option<u8>,
478 instrument_id: Option<InstrumentId>,
479 limit: Option<usize>,
480) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
481 let (price_precision, size_precision) = match (price_precision, size_precision) {
483 (Some(p), Some(s)) => (p, s),
484 (price_precision, size_precision) => {
485 let mut reader = create_csv_reader(&filepath)?;
486 let mut record = StringRecord::new();
487
488 let mut max_price_precision = 0u8;
489 let mut max_size_precision = 0u8;
490 let mut count = 0;
491
492 while reader.read_record(&mut record)? {
493 let parsed: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
494
495 if price_precision.is_none() {
496 if let Some(bid_price) = parsed.bids_0_price {
497 max_price_precision = infer_precision(bid_price).max(max_price_precision);
498 }
499 }
500
501 if size_precision.is_none() {
502 if let Some(bid_amount) = parsed.bids_0_amount {
503 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
504 }
505 }
506
507 if let Some(limit) = limit {
508 if count >= limit {
509 break;
510 }
511 count += 1;
512 }
513 }
514
515 drop(reader);
516
517 max_price_precision = max_price_precision.min(FIXED_PRECISION);
518 max_size_precision = max_size_precision.min(FIXED_PRECISION);
519
520 (
521 price_precision.unwrap_or(max_price_precision),
522 size_precision.unwrap_or(max_size_precision),
523 )
524 }
525 };
526
527 let mut depths: Vec<OrderBookDepth10> = Vec::new();
528 let mut reader = create_csv_reader(filepath)?;
529 let mut record = StringRecord::new();
530
531 while reader.read_record(&mut record)? {
532 let record: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
533
534 let instrument_id = match &instrument_id {
535 Some(id) => *id,
536 None => parse_instrument_id(&record.exchange, record.symbol),
537 };
538 let flags = RecordFlag::F_LAST.value();
539 let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
541 let ts_init = parse_timestamp(record.local_timestamp);
542
543 let mut bids = [NULL_ORDER; DEPTH10_LEN];
545 let mut asks = [NULL_ORDER; DEPTH10_LEN];
546 let mut bid_counts = [0u32; DEPTH10_LEN];
547 let mut ask_counts = [0u32; DEPTH10_LEN];
548
549 for i in 0..DEPTH10_LEN {
551 let (bid_order, bid_count) = create_book_order(
553 OrderSide::Buy,
554 match i {
555 0 => record.bids_0_price,
556 1 => record.bids_1_price,
557 2 => record.bids_2_price,
558 3 => record.bids_3_price,
559 4 => record.bids_4_price,
560 5 => record.bids_5_price,
561 6 => record.bids_6_price,
562 7 => record.bids_7_price,
563 8 => record.bids_8_price,
564 9 => record.bids_9_price,
565 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
566 },
567 match i {
568 0 => record.bids_0_amount,
569 1 => record.bids_1_amount,
570 2 => record.bids_2_amount,
571 3 => record.bids_3_amount,
572 4 => record.bids_4_amount,
573 5 => record.bids_5_amount,
574 6 => record.bids_6_amount,
575 7 => record.bids_7_amount,
576 8 => record.bids_8_amount,
577 9 => record.bids_9_amount,
578 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
579 },
580 price_precision,
581 size_precision,
582 );
583 bids[i] = bid_order;
584 bid_counts[i] = bid_count;
585
586 let (ask_order, ask_count) = create_book_order(
588 OrderSide::Sell,
589 match i {
590 0 => record.asks_0_price,
591 1 => record.asks_1_price,
592 2 => record.asks_2_price,
593 3 => record.asks_3_price,
594 4 => record.asks_4_price,
595 5 => record.asks_5_price,
596 6 => record.asks_6_price,
597 7 => record.asks_7_price,
598 8 => record.asks_8_price,
599 9 => record.asks_9_price,
600 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
601 },
602 match i {
603 0 => record.asks_0_amount,
604 1 => record.asks_1_amount,
605 2 => record.asks_2_amount,
606 3 => record.asks_3_amount,
607 4 => record.asks_4_amount,
608 5 => record.asks_5_amount,
609 6 => record.asks_6_amount,
610 7 => record.asks_7_amount,
611 8 => record.asks_8_amount,
612 9 => record.asks_9_amount,
613 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
614 },
615 price_precision,
616 size_precision,
617 );
618 asks[i] = ask_order;
619 ask_counts[i] = ask_count;
620 }
621
622 let depth = OrderBookDepth10::new(
623 instrument_id,
624 bids,
625 asks,
626 bid_counts,
627 ask_counts,
628 flags,
629 sequence,
630 ts_event,
631 ts_init,
632 );
633
634 depths.push(depth);
635
636 if let Some(limit) = limit {
637 if depths.len() >= limit {
638 break;
639 }
640 }
641 }
642
643 Ok(depths)
644}
645
646pub fn load_quote_ticks<P: AsRef<Path>>(
657 filepath: P,
658 price_precision: Option<u8>,
659 size_precision: Option<u8>,
660 instrument_id: Option<InstrumentId>,
661 limit: Option<usize>,
662) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
663 let (price_precision, size_precision) = match (price_precision, size_precision) {
665 (Some(p), Some(s)) => (p, s),
666 (price_precision, size_precision) => {
667 let mut reader = create_csv_reader(&filepath)?;
668 let mut record = StringRecord::new();
669
670 let mut max_price_precision = 0u8;
671 let mut max_size_precision = 0u8;
672 let mut count = 0;
673
674 while reader.read_record(&mut record)? {
675 let parsed: TardisQuoteRecord = record.deserialize(None)?;
676
677 if price_precision.is_none() {
678 if let Some(bid_price) = parsed.bid_price {
679 max_price_precision = infer_precision(bid_price).max(max_price_precision);
680 }
681 }
682
683 if size_precision.is_none() {
684 if let Some(bid_amount) = parsed.bid_amount {
685 max_size_precision = infer_precision(bid_amount).max(max_size_precision);
686 }
687 }
688
689 if let Some(limit) = limit {
690 if count >= limit {
691 break;
692 }
693 count += 1;
694 }
695 }
696
697 drop(reader);
698
699 max_price_precision = max_price_precision.min(FIXED_PRECISION);
700 max_size_precision = max_size_precision.min(FIXED_PRECISION);
701
702 (
703 price_precision.unwrap_or(max_price_precision),
704 size_precision.unwrap_or(max_size_precision),
705 )
706 }
707 };
708
709 let mut quotes = Vec::new();
710 let mut reader = create_csv_reader(filepath)?;
711 let mut record = StringRecord::new();
712
713 while reader.read_record(&mut record)? {
714 let record: TardisQuoteRecord = record.deserialize(None)?;
715
716 let instrument_id = match &instrument_id {
717 Some(id) => *id,
718 None => parse_instrument_id(&record.exchange, record.symbol),
719 };
720 let bid_price = parse_price(record.bid_price.unwrap_or(0.0), price_precision);
721 let bid_size = Quantity::new(record.bid_amount.unwrap_or(0.0), size_precision);
722 let ask_price = parse_price(record.ask_price.unwrap_or(0.0), price_precision);
723 let ask_size = Quantity::new(record.ask_amount.unwrap_or(0.0), size_precision);
724 let ts_event = parse_timestamp(record.timestamp);
725 let ts_init = parse_timestamp(record.local_timestamp);
726
727 let quote = QuoteTick::new(
728 instrument_id,
729 bid_price,
730 ask_price,
731 bid_size,
732 ask_size,
733 ts_event,
734 ts_init,
735 );
736
737 quotes.push(quote);
738
739 if let Some(limit) = limit {
740 if quotes.len() >= limit {
741 break;
742 }
743 }
744 }
745
746 Ok(quotes)
747}
748
749pub fn load_trade_ticks<P: AsRef<Path>>(
760 filepath: P,
761 price_precision: Option<u8>,
762 size_precision: Option<u8>,
763 instrument_id: Option<InstrumentId>,
764 limit: Option<usize>,
765) -> Result<Vec<TradeTick>, Box<dyn Error>> {
766 let (price_precision, size_precision) = match (price_precision, size_precision) {
768 (Some(p), Some(s)) => (p, s),
769 (price_precision, size_precision) => {
770 let mut reader = create_csv_reader(&filepath)?;
771 let mut record = StringRecord::new();
772
773 let mut max_price_precision = 0u8;
774 let mut max_size_precision = 0u8;
775 let mut count = 0;
776
777 while reader.read_record(&mut record)? {
778 let parsed: TardisTradeRecord = record.deserialize(None)?;
779
780 if price_precision.is_none() {
781 max_price_precision = infer_precision(parsed.price).max(max_price_precision);
782 }
783
784 if size_precision.is_none() {
785 max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
786 }
787
788 if let Some(limit) = limit {
789 if count >= limit {
790 break;
791 }
792 count += 1;
793 }
794 }
795
796 drop(reader);
797
798 max_price_precision = max_price_precision.min(FIXED_PRECISION);
799 max_size_precision = max_size_precision.min(FIXED_PRECISION);
800
801 (
802 price_precision.unwrap_or(max_price_precision),
803 size_precision.unwrap_or(max_size_precision),
804 )
805 }
806 };
807
808 let mut trades = Vec::new();
809 let mut reader = create_csv_reader(filepath)?;
810 let mut record = StringRecord::new();
811
812 while reader.read_record(&mut record)? {
813 let record: TardisTradeRecord = record.deserialize(None)?;
814
815 let instrument_id = match &instrument_id {
816 Some(id) => *id,
817 None => parse_instrument_id(&record.exchange, record.symbol),
818 };
819 let price = parse_price(record.price, price_precision);
820 let size = Quantity::non_zero_checked(record.amount, size_precision)
821 .unwrap_or_else(|e| panic!("Invalid {record:?}: size {e}"));
822 let aggressor_side = parse_aggressor_side(&record.side);
823 let trade_id = TradeId::new(&record.id);
824 let ts_event = parse_timestamp(record.timestamp);
825 let ts_init = parse_timestamp(record.local_timestamp);
826
827 let trade = TradeTick::new(
828 instrument_id,
829 price,
830 size,
831 aggressor_side,
832 trade_id,
833 ts_event,
834 ts_init,
835 );
836
837 trades.push(trade);
838
839 if let Some(limit) = limit {
840 if trades.len() >= limit {
841 break;
842 }
843 }
844 }
845
846 Ok(trades)
847}
848
849#[cfg(test)]
853mod tests {
854 use nautilus_model::{
855 enums::{AggressorSide, BookAction},
856 identifiers::InstrumentId,
857 types::Price,
858 };
859 use nautilus_testkit::common::{
860 ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
861 ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
862 ensure_data_exists_tardis_huobi_quotes,
863 };
864 use rstest::*;
865
866 use super::*;
867
868 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
870 #[rstest]
871 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
874 #[case] price_precision: Option<u8>,
875 #[case] size_precision: Option<u8>,
876 ) {
877 let filepath = ensure_data_exists_tardis_deribit_book_l2();
878 let deltas = load_deltas(
879 filepath,
880 price_precision,
881 size_precision,
882 None,
883 Some(10_000),
884 )
885 .unwrap();
886
887 assert_eq!(deltas.len(), 10_000);
888 assert_eq!(
889 deltas[0].instrument_id,
890 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
891 );
892 assert_eq!(deltas[0].action, BookAction::Add);
893 assert_eq!(deltas[0].order.side, OrderSide::Sell);
894 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
895 assert_eq!(deltas[0].order.size, Quantity::from("18640"));
896 assert_eq!(deltas[0].flags, 0);
897 assert_eq!(deltas[0].sequence, 0);
898 assert_eq!(deltas[0].ts_event, 1585699200245000000);
899 assert_eq!(deltas[0].ts_init, 1585699200355684000);
900 }
901
902 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
904 #[rstest]
905 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
908 #[case] price_precision: Option<u8>,
909 #[case] size_precision: Option<u8>,
910 ) {
911 let filepath = ensure_data_exists_tardis_binance_snapshot5();
912 let depths = load_depth10_from_snapshot5(
913 filepath,
914 price_precision,
915 size_precision,
916 None,
917 Some(10_000),
918 )
919 .unwrap();
920
921 assert_eq!(depths.len(), 10_000);
922 assert_eq!(
923 depths[0].instrument_id,
924 InstrumentId::from("BTCUSDT.BINANCE")
925 );
926 assert_eq!(depths[0].bids.len(), 10);
927 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
928 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
929 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
930 assert_eq!(depths[0].bids[0].order_id, 0);
931 assert_eq!(depths[0].asks.len(), 10);
932 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
933 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
934 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
935 assert_eq!(depths[0].asks[0].order_id, 0);
936 assert_eq!(depths[0].bid_counts[0], 1);
937 assert_eq!(depths[0].ask_counts[0], 1);
938 assert_eq!(depths[0].flags, 128);
939 assert_eq!(depths[0].ts_event, 1598918403696000000);
940 assert_eq!(depths[0].ts_init, 1598918403810979000);
941 assert_eq!(depths[0].sequence, 0);
942 }
943
944 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
946 #[rstest]
947 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
950 #[case] price_precision: Option<u8>,
951 #[case] size_precision: Option<u8>,
952 ) {
953 let filepath = ensure_data_exists_tardis_binance_snapshot25();
954 let depths = load_depth10_from_snapshot25(
955 filepath,
956 price_precision,
957 size_precision,
958 None,
959 Some(10_000),
960 )
961 .unwrap();
962
963 assert_eq!(depths.len(), 10_000);
964 assert_eq!(
965 depths[0].instrument_id,
966 InstrumentId::from("BTCUSDT.BINANCE")
967 );
968 assert_eq!(depths[0].bids.len(), 10);
969 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
970 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
971 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
972 assert_eq!(depths[0].bids[0].order_id, 0);
973 assert_eq!(depths[0].asks.len(), 10);
974 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
975 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
976 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
977 assert_eq!(depths[0].asks[0].order_id, 0);
978 assert_eq!(depths[0].bid_counts[0], 1);
979 assert_eq!(depths[0].ask_counts[0], 1);
980 assert_eq!(depths[0].flags, 128);
981 assert_eq!(depths[0].ts_event, 1598918403696000000);
982 assert_eq!(depths[0].ts_init, 1598918403810979000);
983 assert_eq!(depths[0].sequence, 0);
984 }
985
986 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
988 #[rstest]
989 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
992 #[case] price_precision: Option<u8>,
993 #[case] size_precision: Option<u8>,
994 ) {
995 let filepath = ensure_data_exists_tardis_huobi_quotes();
996 let quotes = load_quote_ticks(
997 filepath,
998 price_precision,
999 size_precision,
1000 None,
1001 Some(10_000),
1002 )
1003 .unwrap();
1004
1005 assert_eq!(quotes.len(), 10_000);
1006 assert_eq!(
1007 quotes[0].instrument_id,
1008 InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
1009 );
1010 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
1011 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
1012 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
1013 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
1014 assert_eq!(quotes[0].ts_event, 1588291201099000000);
1015 assert_eq!(quotes[0].ts_init, 1588291201234268000);
1016 }
1017
1018 #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
1020 #[rstest]
1021 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
1024 #[case] price_precision: Option<u8>,
1025 #[case] size_precision: Option<u8>,
1026 ) {
1027 let filepath = ensure_data_exists_tardis_bitmex_trades();
1028 let trades = load_trade_ticks(
1029 filepath,
1030 price_precision,
1031 size_precision,
1032 None,
1033 Some(10_000),
1034 )
1035 .unwrap();
1036
1037 assert_eq!(trades.len(), 10_000);
1038 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1039 assert_eq!(trades[0].price, Price::from("8531.5"));
1040 assert_eq!(trades[0].size, Quantity::from("2152"));
1041 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
1042 assert_eq!(
1043 trades[0].trade_id,
1044 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1045 );
1046 assert_eq!(trades[0].ts_event, 1583020803145000000);
1047 assert_eq!(trades[0].ts_init, 1583020803307160000);
1048 }
1049}