nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, path::PathBuf};
17
18use datafusion::arrow::record_batch::RecordBatch;
19use heck::ToSnakeCase;
20use itertools::Itertools;
21use log::info;
22use nautilus_core::UnixNanos;
23use nautilus_model::data::{
24    Bar, Data, GetTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
25    QuoteTick, TradeTick, close::InstrumentClose,
26};
27use nautilus_serialization::{
28    arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch},
29    enums::ParquetWriteMode,
30    parquet::{combine_data_files, min_max_from_parquet_metadata, write_batches_to_parquet},
31};
32use serde::Serialize;
33
34use super::session::{self, DataBackendSession, QueryResult, build_query};
35
36pub struct ParquetDataCatalog {
37    base_path: PathBuf,
38    batch_size: usize,
39    session: DataBackendSession,
40}
41
42impl Debug for ParquetDataCatalog {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct(stringify!(ParquetDataCatalog))
45            .field("base_path", &self.base_path)
46            .finish()
47    }
48}
49
50impl ParquetDataCatalog {
51    #[must_use]
52    pub fn new(base_path: PathBuf, batch_size: Option<usize>) -> Self {
53        let batch_size = batch_size.unwrap_or(5000);
54        Self {
55            base_path,
56            batch_size,
57            session: session::DataBackendSession::new(batch_size),
58        }
59    }
60
61    pub fn write_data_enum(&self, data: Vec<Data>, write_mode: Option<ParquetWriteMode>) {
62        let mut deltas: Vec<OrderBookDelta> = Vec::new();
63        let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
64        let mut quotes: Vec<QuoteTick> = Vec::new();
65        let mut trades: Vec<TradeTick> = Vec::new();
66        let mut bars: Vec<Bar> = Vec::new();
67        let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
68        let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
69        let mut closes: Vec<InstrumentClose> = Vec::new();
70
71        for d in data.iter().cloned() {
72            match d {
73                Data::Deltas(_) => continue,
74                Data::Delta(d) => {
75                    deltas.push(d);
76                }
77                Data::Depth10(d) => {
78                    depth10s.push(*d);
79                }
80                Data::Quote(d) => {
81                    quotes.push(d);
82                }
83                Data::Trade(d) => {
84                    trades.push(d);
85                }
86                Data::Bar(d) => {
87                    bars.push(d);
88                }
89                Data::MarkPriceUpdate(p) => {
90                    mark_prices.push(p);
91                }
92                Data::IndexPriceUpdate(p) => {
93                    index_prices.push(p);
94                }
95                Data::InstrumentClose(c) => {
96                    closes.push(c);
97                }
98            }
99        }
100
101        let _ = self.write_to_parquet(deltas, None, None, None, write_mode);
102        let _ = self.write_to_parquet(depth10s, None, None, None, write_mode);
103        let _ = self.write_to_parquet(quotes, None, None, None, write_mode);
104        let _ = self.write_to_parquet(trades, None, None, None, write_mode);
105        let _ = self.write_to_parquet(bars, None, None, None, write_mode);
106        let _ = self.write_to_parquet(mark_prices, None, None, None, write_mode);
107        let _ = self.write_to_parquet(index_prices, None, None, None, write_mode);
108        let _ = self.write_to_parquet(closes, None, None, None, write_mode);
109    }
110
111    pub fn write_to_parquet<T>(
112        &self,
113        data: Vec<T>,
114        path: Option<PathBuf>,
115        compression: Option<parquet::basic::Compression>,
116        max_row_group_size: Option<usize>,
117        write_mode: Option<ParquetWriteMode>,
118    ) -> anyhow::Result<PathBuf>
119    where
120        T: GetTsInit + EncodeToRecordBatch + CatalogPathPrefix,
121    {
122        let type_name = std::any::type_name::<T>().to_snake_case();
123        Self::check_ascending_timestamps(&data, &type_name);
124        let batches = self.data_to_record_batches(data)?;
125        let schema = batches.first().expect("Batches are empty.").schema();
126        let instrument_id = schema.metadata.get("instrument_id").cloned();
127        let new_path = self.make_path(T::path_prefix(), instrument_id, write_mode)?;
128        let path = path.unwrap_or(new_path);
129
130        // Write all batches to parquet file
131        info!(
132            "Writing {} batches of {type_name} data to {path:?}",
133            batches.len()
134        );
135
136        write_batches_to_parquet(&batches, &path, compression, max_row_group_size, write_mode)?;
137
138        Ok(path)
139    }
140
141    fn check_ascending_timestamps<T: GetTsInit>(data: &[T], type_name: &str) {
142        assert!(
143            data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()),
144            "{type_name} timestamps must be in ascending order"
145        );
146    }
147
148    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
149    where
150        T: GetTsInit + EncodeToRecordBatch,
151    {
152        let mut batches = Vec::new();
153
154        for chunk in &data.into_iter().chunks(self.batch_size) {
155            let data = chunk.collect_vec();
156            let metadata = EncodeToRecordBatch::chunk_metadata(&data);
157            let record_batch = T::encode_batch(&metadata, &data)?;
158            batches.push(record_batch);
159        }
160
161        Ok(batches)
162    }
163
164    fn make_path(
165        &self,
166        type_name: &str,
167        instrument_id: Option<String>,
168        write_mode: Option<ParquetWriteMode>,
169    ) -> anyhow::Result<PathBuf> {
170        let path = self.make_directory_path(type_name, instrument_id);
171        std::fs::create_dir_all(&path)?;
172        let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
173        let mut file_path = path.join("part-0.parquet");
174        let mut empty_path = file_path.clone();
175        let mut i = 0;
176
177        while empty_path.exists() {
178            i += 1;
179            let name = format!("part-{i}.parquet");
180            empty_path = path.join(name);
181        }
182
183        if i > 1 && used_write_mode != ParquetWriteMode::NewFile {
184            anyhow::bail!(
185                "Only ParquetWriteMode::NewFile is allowed for a directory containing several parquet files."
186            );
187        } else if used_write_mode == ParquetWriteMode::NewFile {
188            file_path = empty_path;
189        }
190
191        info!("Created directory path: {file_path:?}");
192
193        Ok(file_path)
194    }
195
196    fn make_directory_path(&self, type_name: &str, instrument_id: Option<String>) -> PathBuf {
197        let mut path = self.base_path.join("data").join(type_name);
198
199        if let Some(id) = instrument_id {
200            path = path.join(id.replace('/', "")); // for FX symbols like EUR/USD
201        }
202
203        path
204    }
205
206    pub fn write_to_json<T>(
207        &self,
208        data: Vec<T>,
209        path: Option<PathBuf>,
210        write_metadata: bool,
211    ) -> anyhow::Result<PathBuf>
212    where
213        T: GetTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
214    {
215        let type_name = std::any::type_name::<T>().to_snake_case();
216        Self::check_ascending_timestamps(&data, &type_name);
217        let new_path = self.make_path(T::path_prefix(), None, None)?;
218        let json_path = path.unwrap_or(new_path.with_extension("json"));
219
220        info!(
221            "Writing {} records of {type_name} data to {json_path:?}",
222            data.len(),
223        );
224
225        if write_metadata {
226            let metadata = T::chunk_metadata(&data);
227            let metadata_path = json_path.with_extension("metadata.json");
228            info!("Writing metadata to {metadata_path:?}");
229            let metadata_file = std::fs::File::create(&metadata_path)?;
230            serde_json::to_writer_pretty(metadata_file, &metadata)?;
231        }
232
233        let file = std::fs::File::create(&json_path)?;
234        serde_json::to_writer_pretty(file, &serde_json::to_value(data)?)?;
235
236        Ok(json_path)
237    }
238
239    pub fn consolidate_data(
240        &self,
241        type_name: &str,
242        instrument_id: Option<String>,
243    ) -> anyhow::Result<()> {
244        let parquet_files = self.query_parquet_files(type_name, instrument_id)?;
245
246        if !parquet_files.is_empty() {
247            combine_data_files(parquet_files, "ts_init", None, None)?;
248        }
249
250        Ok(())
251    }
252
253    pub fn consolidate_catalog(&self) -> anyhow::Result<()> {
254        let leaf_directories = self.find_leaf_data_directories()?;
255
256        for directory in leaf_directories {
257            let parquet_files: Vec<PathBuf> = std::fs::read_dir(directory)?
258                .filter_map(|entry| {
259                    let path = entry.ok()?.path();
260
261                    if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
262                        Some(path)
263                    } else {
264                        None
265                    }
266                })
267                .collect();
268
269            if !parquet_files.is_empty() {
270                combine_data_files(parquet_files, "ts_init", None, None)?;
271            }
272        }
273
274        Ok(())
275    }
276
277    pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<PathBuf>> {
278        let mut all_paths: Vec<PathBuf> = Vec::new();
279        let data_dir = self.base_path.join("data");
280
281        for entry in walkdir::WalkDir::new(data_dir) {
282            all_paths.push(entry?.path().to_path_buf());
283        }
284
285        let all_dirs = all_paths
286            .iter()
287            .filter(|p| p.is_dir())
288            .cloned()
289            .collect::<Vec<PathBuf>>();
290        let mut leaf_dirs = Vec::new();
291
292        for directory in all_dirs {
293            let items = std::fs::read_dir(&directory)?;
294            let has_subdirs = items.into_iter().any(|entry| {
295                let entry = entry.unwrap();
296                entry.path().is_dir()
297            });
298            let has_files = std::fs::read_dir(&directory)?.any(|entry| {
299                let entry = entry.unwrap();
300                entry.path().is_file()
301            });
302
303            if has_files && !has_subdirs {
304                leaf_dirs.push(directory);
305            }
306        }
307
308        Ok(leaf_dirs)
309    }
310
311    /// Query data loaded in the catalog
312    pub fn query_file<T>(
313        &mut self,
314        path: PathBuf,
315        start: Option<UnixNanos>,
316        end: Option<UnixNanos>,
317        where_clause: Option<&str>,
318    ) -> anyhow::Result<QueryResult>
319    where
320        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
321    {
322        let path_str = path.to_str().expect("Failed to convert path to string");
323        let table_name = path
324            .file_stem()
325            .unwrap()
326            .to_str()
327            .expect("Failed to convert path to string");
328        let query = build_query(table_name, start, end, where_clause);
329        self.session
330            .add_file::<T>(table_name, path_str, Some(&query))?;
331
332        Ok(self.session.get_query_result())
333    }
334
335    /// Query data loaded in the catalog
336    pub fn query_directory<T>(
337        &mut self,
338        instrument_ids: Vec<String>,
339        start: Option<UnixNanos>,
340        end: Option<UnixNanos>,
341        where_clause: Option<&str>,
342    ) -> anyhow::Result<QueryResult>
343    where
344        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
345    {
346        let mut paths = Vec::new();
347
348        for instrument_id in instrument_ids {
349            paths.extend(self.query_parquet_files(T::path_prefix(), Some(instrument_id))?);
350        }
351
352        // If no specific instrument_id is selected query all files for the data type
353        if paths.is_empty() {
354            paths.push(self.make_path(T::path_prefix(), None, None)?);
355        }
356
357        for path in &paths {
358            let path = path.to_str().expect("Failed to convert path to string");
359            let query = build_query(path, start, end, where_clause);
360            self.session.add_file::<T>(path, path, Some(&query))?;
361        }
362
363        Ok(self.session.get_query_result())
364    }
365
366    #[allow(dead_code)]
367    pub fn query_timestamp_bound(
368        &self,
369        data_cls: &str,
370        instrument_id: Option<String>,
371        is_last: Option<bool>,
372    ) -> anyhow::Result<Option<i64>> {
373        let is_last = is_last.unwrap_or(true);
374        let parquet_files = self.query_parquet_files(data_cls, instrument_id)?;
375
376        if parquet_files.is_empty() {
377            return Ok(None);
378        }
379
380        let min_max_per_file: Vec<(i64, i64)> = parquet_files
381            .iter()
382            .map(|file| min_max_from_parquet_metadata(file, "ts_init"))
383            .collect::<Result<Vec<_>, _>>()?;
384        let mut timestamps: Vec<i64> = Vec::new();
385
386        for min_max in min_max_per_file {
387            let (min, max) = min_max;
388
389            if is_last {
390                timestamps.push(max);
391            } else {
392                timestamps.push(min);
393            }
394        }
395
396        if timestamps.is_empty() {
397            return Ok(None);
398        }
399
400        if is_last {
401            Ok(timestamps.iter().max().copied())
402        } else {
403            Ok(timestamps.iter().min().copied())
404        }
405    }
406
407    pub fn query_parquet_files(
408        &self,
409        type_name: &str,
410        instrument_id: Option<String>,
411    ) -> anyhow::Result<Vec<PathBuf>> {
412        let path = self.make_directory_path(type_name, instrument_id);
413        let mut files = Vec::new();
414
415        if path.exists() {
416            for entry in std::fs::read_dir(path)? {
417                let path = entry?.path();
418                if path.is_file() && path.extension().unwrap() == "parquet" {
419                    files.push(path);
420                }
421            }
422        }
423
424        Ok(files)
425    }
426}
427
428pub trait CatalogPathPrefix {
429    fn path_prefix() -> &'static str;
430}
431
432macro_rules! impl_catalog_path_prefix {
433    ($type:ty, $path:expr) => {
434        impl CatalogPathPrefix for $type {
435            fn path_prefix() -> &'static str {
436                $path
437            }
438        }
439    };
440}
441
442impl_catalog_path_prefix!(QuoteTick, "quotes");
443impl_catalog_path_prefix!(TradeTick, "trades");
444impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
445impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
446impl_catalog_path_prefix!(Bar, "bars");
447impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
448impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
449impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");