nautilus_persistence/python/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_core::UnixNanos;
17use nautilus_model::data::{
18    Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
19};
20use pyo3::{exceptions::PyIOError, prelude::*};
21
22use crate::backend::catalog::ParquetDataCatalog;
23
24/// A catalog for writing data to Parquet files.
25#[cfg_attr(
26    feature = "python",
27    pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.persistence")
28)]
29pub struct ParquetDataCatalogV2 {
30    inner: ParquetDataCatalog,
31}
32
33#[pymethods]
34impl ParquetDataCatalogV2 {
35    /// Create a new `ParquetCatalog` with the given base path and optional parameters.
36    ///
37    /// # Parameters
38    ///
39    /// - `base_path`: The base path for the catalog
40    /// - `storage_options`: Optional storage configuration for cloud backends
41    /// - `batch_size`: Optional batch size for processing (default: 5000)
42    /// - `compression`: Optional compression type (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZO, 4=BROTLI, 5=LZ4, 6=ZSTD)
43    /// - `max_row_group_size`: Optional maximum row group size (default: 5000)
44    #[new]
45    #[pyo3(signature = (base_path, storage_options=None, batch_size=None, compression=None, max_row_group_size=None))]
46    #[must_use]
47    pub fn new(
48        base_path: String,
49        storage_options: Option<std::collections::HashMap<String, String>>,
50        batch_size: Option<usize>,
51        compression: Option<u8>,
52        max_row_group_size: Option<usize>,
53    ) -> Self {
54        let compression = compression.map(|c| match c {
55            0 => parquet::basic::Compression::UNCOMPRESSED,
56            1 => parquet::basic::Compression::SNAPPY,
57            // For GZIP, LZO, BROTLI, LZ4, ZSTD we need to use the default level
58            // since we can't pass the level parameter through PyO3
59            2 => {
60                let level = Default::default();
61                parquet::basic::Compression::GZIP(level)
62            }
63            3 => parquet::basic::Compression::LZO,
64            4 => {
65                let level = Default::default();
66                parquet::basic::Compression::BROTLI(level)
67            }
68            5 => parquet::basic::Compression::LZ4,
69            6 => {
70                let level = Default::default();
71                parquet::basic::Compression::ZSTD(level)
72            }
73            _ => parquet::basic::Compression::SNAPPY,
74        });
75
76        Self {
77            inner: ParquetDataCatalog::from_uri(
78                &base_path,
79                storage_options,
80                batch_size,
81                compression,
82                max_row_group_size,
83            )
84            .expect("Failed to create ParquetDataCatalog"),
85        }
86    }
87
88    // TODO: Cannot pass mixed data across pyo3 as a single type
89    // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: PoseiDataType, data: Vec<Data>) {}
90
91    /// Write quote tick data to Parquet files.
92    ///
93    /// # Parameters
94    ///
95    /// - `data`: Vector of quote ticks to write
96    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
97    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
98    ///
99    /// # Returns
100    ///
101    /// Returns the path of the created file as a string.
102    #[pyo3(signature = (data, start=None, end=None))]
103    pub fn write_quote_ticks(
104        &self,
105        data: Vec<QuoteTick>,
106        start: Option<u64>,
107        end: Option<u64>,
108    ) -> PyResult<String> {
109        // Convert u64 timestamps to UnixNanos
110        let start_nanos = start.map(UnixNanos::from);
111        let end_nanos = end.map(UnixNanos::from);
112
113        self.inner
114            .write_to_parquet(data, start_nanos, end_nanos)
115            .map(|path| path.to_string_lossy().to_string())
116            .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
117    }
118
119    /// Write trade tick data to Parquet files.
120    ///
121    /// # Parameters
122    ///
123    /// - `data`: Vector of trade ticks to write
124    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
125    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
126    ///
127    /// # Returns
128    ///
129    /// Returns the path of the created file as a string.
130    #[pyo3(signature = (data, start=None, end=None))]
131    pub fn write_trade_ticks(
132        &self,
133        data: Vec<TradeTick>,
134        start: Option<u64>,
135        end: Option<u64>,
136    ) -> PyResult<String> {
137        // Convert u64 timestamps to UnixNanos
138        let start_nanos = start.map(UnixNanos::from);
139        let end_nanos = end.map(UnixNanos::from);
140
141        self.inner
142            .write_to_parquet(data, start_nanos, end_nanos)
143            .map(|path| path.to_string_lossy().to_string())
144            .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
145    }
146
147    /// Write order book delta data to Parquet files.
148    ///
149    /// # Parameters
150    ///
151    /// - `data`: Vector of order book deltas to write
152    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
153    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
154    ///
155    /// # Returns
156    ///
157    /// Returns the path of the created file as a string.
158    #[pyo3(signature = (data, start=None, end=None))]
159    pub fn write_order_book_deltas(
160        &self,
161        data: Vec<OrderBookDelta>,
162        start: Option<u64>,
163        end: Option<u64>,
164    ) -> PyResult<String> {
165        // Convert u64 timestamps to UnixNanos
166        let start_nanos = start.map(UnixNanos::from);
167        let end_nanos = end.map(UnixNanos::from);
168
169        self.inner
170            .write_to_parquet(data, start_nanos, end_nanos)
171            .map(|path| path.to_string_lossy().to_string())
172            .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
173    }
174
175    /// Write bar data to Parquet files.
176    ///
177    /// # Parameters
178    ///
179    /// - `data`: Vector of bars to write
180    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
181    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
182    ///
183    /// # Returns
184    ///
185    /// Returns the path of the created file as a string.
186    #[pyo3(signature = (data, start=None, end=None))]
187    pub fn write_bars(
188        &self,
189        data: Vec<Bar>,
190        start: Option<u64>,
191        end: Option<u64>,
192    ) -> PyResult<String> {
193        // Convert u64 timestamps to UnixNanos
194        let start_nanos = start.map(UnixNanos::from);
195        let end_nanos = end.map(UnixNanos::from);
196
197        self.inner
198            .write_to_parquet(data, start_nanos, end_nanos)
199            .map(|path| path.to_string_lossy().to_string())
200            .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
201    }
202
203    /// Write order book depth data to Parquet files.
204    ///
205    /// # Parameters
206    ///
207    /// - `data`: Vector of order book depths to write
208    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
209    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
210    ///
211    /// # Returns
212    ///
213    /// Returns the path of the created file as a string.
214    #[pyo3(signature = (data, start=None, end=None))]
215    pub fn write_order_book_depths(
216        &self,
217        data: Vec<OrderBookDepth10>,
218        start: Option<u64>,
219        end: Option<u64>,
220    ) -> PyResult<String> {
221        // Convert u64 timestamps to UnixNanos
222        let start_nanos = start.map(UnixNanos::from);
223        let end_nanos = end.map(UnixNanos::from);
224
225        self.inner
226            .write_to_parquet(data, start_nanos, end_nanos)
227            .map(|path| path.to_string_lossy().to_string())
228            .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
229    }
230
231    /// Write mark price update data to Parquet files.
232    ///
233    /// # Parameters
234    ///
235    /// - `data`: Vector of mark price updates to write
236    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
237    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
238    ///
239    /// # Returns
240    ///
241    /// Returns the path of the created file as a string.
242    #[pyo3(signature = (data, start=None, end=None))]
243    pub fn write_mark_price_updates(
244        &self,
245        data: Vec<MarkPriceUpdate>,
246        start: Option<u64>,
247        end: Option<u64>,
248    ) -> PyResult<String> {
249        // Convert u64 timestamps to UnixNanos
250        let start_nanos = start.map(UnixNanos::from);
251        let end_nanos = end.map(UnixNanos::from);
252
253        self.inner
254            .write_to_parquet(data, start_nanos, end_nanos)
255            .map(|path| path.to_string_lossy().to_string())
256            .map_err(|e| PyIOError::new_err(format!("Failed to write mark price updates: {e}")))
257    }
258
259    /// Write index price update data to Parquet files.
260    ///
261    /// # Parameters
262    ///
263    /// - `data`: Vector of index price updates to write
264    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
265    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
266    ///
267    /// # Returns
268    ///
269    /// Returns the path of the created file as a string.
270    #[pyo3(signature = (data, start=None, end=None))]
271    pub fn write_index_price_updates(
272        &self,
273        data: Vec<IndexPriceUpdate>,
274        start: Option<u64>,
275        end: Option<u64>,
276    ) -> PyResult<String> {
277        // Convert u64 timestamps to UnixNanos
278        let start_nanos = start.map(UnixNanos::from);
279        let end_nanos = end.map(UnixNanos::from);
280
281        self.inner
282            .write_to_parquet(data, start_nanos, end_nanos)
283            .map(|path| path.to_string_lossy().to_string())
284            .map_err(|e| PyIOError::new_err(format!("Failed to write index price updates: {e}")))
285    }
286
287    /// Extend file names in the catalog with additional timestamp information.
288    ///
289    /// # Parameters
290    ///
291    /// - `data_cls`: The data class name
292    /// - `instrument_id`: Optional instrument ID filter
293    /// - `start`: Start timestamp (nanoseconds since Unix epoch)
294    /// - `end`: End timestamp (nanoseconds since Unix epoch)
295    #[pyo3(signature = (data_cls, instrument_id=None, *, start, end))]
296    pub fn extend_file_name(
297        &self,
298        data_cls: &str,
299        instrument_id: Option<String>,
300        start: u64,
301        end: u64,
302    ) -> PyResult<()> {
303        // Convert u64 timestamps to UnixNanos
304        let start_nanos = UnixNanos::from(start);
305        let end_nanos = UnixNanos::from(end);
306
307        self.inner
308            .extend_file_name(data_cls, instrument_id, start_nanos, end_nanos)
309            .map_err(|e| PyIOError::new_err(format!("Failed to extend file name: {e}")))
310    }
311
312    /// Consolidate all data files in the catalog within the specified time range.
313    ///
314    /// # Parameters
315    ///
316    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
317    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
318    /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
319    #[pyo3(signature = (start=None, end=None, ensure_contiguous_files=None))]
320    pub fn consolidate_catalog(
321        &self,
322        start: Option<u64>,
323        end: Option<u64>,
324        ensure_contiguous_files: Option<bool>,
325    ) -> PyResult<()> {
326        // Convert u64 timestamps to UnixNanos
327        let start_nanos = start.map(UnixNanos::from);
328        let end_nanos = end.map(UnixNanos::from);
329
330        self.inner
331            .consolidate_catalog(start_nanos, end_nanos, ensure_contiguous_files)
332            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
333    }
334
335    /// Consolidate data files for a specific data type within the specified time range.
336    ///
337    /// # Parameters
338    ///
339    /// - `type_name`: The data type name to consolidate
340    /// - `instrument_id`: Optional instrument ID filter
341    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
342    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
343    /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
344    #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None, ensure_contiguous_files=None))]
345    pub fn consolidate_data(
346        &self,
347        type_name: &str,
348        instrument_id: Option<String>,
349        start: Option<u64>,
350        end: Option<u64>,
351        ensure_contiguous_files: Option<bool>,
352    ) -> PyResult<()> {
353        // Convert u64 timestamps to UnixNanos
354        let start_nanos = start.map(UnixNanos::from);
355        let end_nanos = end.map(UnixNanos::from);
356
357        self.inner
358            .consolidate_data(
359                type_name,
360                instrument_id,
361                start_nanos,
362                end_nanos,
363                ensure_contiguous_files,
364            )
365            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
366    }
367
368    /// Reset all catalog file names to their canonical form.
369    pub fn reset_catalog_file_names(&self) -> PyResult<()> {
370        self.inner
371            .reset_catalog_file_names()
372            .map_err(|e| PyIOError::new_err(format!("Failed to reset catalog file names: {e}")))
373    }
374
375    /// Reset data file names for a specific data class to their canonical form.
376    ///
377    /// # Parameters
378    ///
379    /// - `data_cls`: The data class name
380    /// - `instrument_id`: Optional instrument ID filter
381    #[pyo3(signature = (data_cls, instrument_id=None))]
382    pub fn reset_data_file_names(
383        &self,
384        data_cls: &str,
385        instrument_id: Option<String>,
386    ) -> PyResult<()> {
387        self.inner
388            .reset_data_file_names(data_cls, instrument_id)
389            .map_err(|e| PyIOError::new_err(format!("Failed to reset data file names: {e}")))
390    }
391
392    /// Query files in the catalog matching the specified criteria.
393    ///
394    /// # Parameters
395    ///
396    /// - `data_cls`: The data class name to query
397    /// - `instrument_ids`: Optional list of instrument IDs to filter by
398    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
399    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
400    ///
401    /// # Returns
402    ///
403    /// Returns a list of file paths matching the criteria.
404    #[pyo3(signature = (data_cls, instrument_ids=None, start=None, end=None))]
405    pub fn query_files(
406        &self,
407        data_cls: &str,
408        instrument_ids: Option<Vec<String>>,
409        start: Option<u64>,
410        end: Option<u64>,
411    ) -> PyResult<Vec<String>> {
412        // Convert u64 timestamps to UnixNanos
413        let start_nanos = start.map(UnixNanos::from);
414        let end_nanos = end.map(UnixNanos::from);
415
416        self.inner
417            .query_files(data_cls, instrument_ids, start_nanos, end_nanos)
418            .map_err(|e| PyIOError::new_err(format!("Failed to query files list: {e}")))
419    }
420
421    /// Get missing time intervals for a data request.
422    ///
423    /// # Parameters
424    ///
425    /// - `start`: Start timestamp (nanoseconds since Unix epoch)
426    /// - `end`: End timestamp (nanoseconds since Unix epoch)
427    /// - `data_cls`: The data class name
428    /// - `instrument_id`: Optional instrument ID filter
429    ///
430    /// # Returns
431    ///
432    /// Returns a list of (start, end) timestamp tuples representing missing intervals.
433    #[pyo3(signature = (start, end, data_cls, instrument_id=None))]
434    pub fn get_missing_intervals_for_request(
435        &self,
436        start: u64,
437        end: u64,
438        data_cls: &str,
439        instrument_id: Option<String>,
440    ) -> PyResult<Vec<(u64, u64)>> {
441        self.inner
442            .get_missing_intervals_for_request(start, end, data_cls, instrument_id)
443            .map_err(|e| PyIOError::new_err(format!("Failed to get missing intervals: {e}")))
444    }
445
446    /// Query the last timestamp for a specific data class and instrument.
447    ///
448    /// # Parameters
449    ///
450    /// - `data_cls`: The data class name
451    /// - `instrument_id`: Optional instrument ID filter
452    ///
453    /// # Returns
454    ///
455    /// Returns the last timestamp as nanoseconds since Unix epoch, or None if no data exists.
456    #[pyo3(signature = (data_cls, instrument_id=None))]
457    pub fn query_last_timestamp(
458        &self,
459        data_cls: &str,
460        instrument_id: Option<String>,
461    ) -> PyResult<Option<u64>> {
462        self.inner
463            .query_last_timestamp(data_cls, instrument_id)
464            .map_err(|e| PyIOError::new_err(format!("Failed to query last timestamp: {e}")))
465    }
466
467    /// Get time intervals covered by data for a specific data class and instrument.
468    ///
469    /// # Parameters
470    ///
471    /// - `data_cls`: The data class name
472    /// - `instrument_id`: Optional instrument ID filter
473    ///
474    /// # Returns
475    ///
476    /// Returns a list of (start, end) timestamp tuples representing covered intervals.
477    #[pyo3(signature = (data_cls, instrument_id=None))]
478    pub fn get_intervals(
479        &self,
480        data_cls: &str,
481        instrument_id: Option<String>,
482    ) -> PyResult<Vec<(u64, u64)>> {
483        self.inner
484            .get_intervals(data_cls, instrument_id)
485            .map_err(|e| PyIOError::new_err(format!("Failed to get intervals: {e}")))
486    }
487}