nautilus_persistence/python/catalog.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3// https://poseitrader.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_core::UnixNanos;
17use nautilus_model::data::{
18 Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
19};
20use pyo3::{exceptions::PyIOError, prelude::*};
21
22use crate::backend::catalog::ParquetDataCatalog;
23
24/// A catalog for writing data to Parquet files.
25#[cfg_attr(
26 feature = "python",
27 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.persistence")
28)]
29pub struct ParquetDataCatalogV2 {
30 inner: ParquetDataCatalog,
31}
32
33#[pymethods]
34impl ParquetDataCatalogV2 {
35 /// Create a new `ParquetCatalog` with the given base path and optional parameters.
36 ///
37 /// # Parameters
38 ///
39 /// - `base_path`: The base path for the catalog
40 /// - `storage_options`: Optional storage configuration for cloud backends
41 /// - `batch_size`: Optional batch size for processing (default: 5000)
42 /// - `compression`: Optional compression type (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZO, 4=BROTLI, 5=LZ4, 6=ZSTD)
43 /// - `max_row_group_size`: Optional maximum row group size (default: 5000)
44 #[new]
45 #[pyo3(signature = (base_path, storage_options=None, batch_size=None, compression=None, max_row_group_size=None))]
46 #[must_use]
47 pub fn new(
48 base_path: String,
49 storage_options: Option<std::collections::HashMap<String, String>>,
50 batch_size: Option<usize>,
51 compression: Option<u8>,
52 max_row_group_size: Option<usize>,
53 ) -> Self {
54 let compression = compression.map(|c| match c {
55 0 => parquet::basic::Compression::UNCOMPRESSED,
56 1 => parquet::basic::Compression::SNAPPY,
57 // For GZIP, LZO, BROTLI, LZ4, ZSTD we need to use the default level
58 // since we can't pass the level parameter through PyO3
59 2 => {
60 let level = Default::default();
61 parquet::basic::Compression::GZIP(level)
62 }
63 3 => parquet::basic::Compression::LZO,
64 4 => {
65 let level = Default::default();
66 parquet::basic::Compression::BROTLI(level)
67 }
68 5 => parquet::basic::Compression::LZ4,
69 6 => {
70 let level = Default::default();
71 parquet::basic::Compression::ZSTD(level)
72 }
73 _ => parquet::basic::Compression::SNAPPY,
74 });
75
76 Self {
77 inner: ParquetDataCatalog::from_uri(
78 &base_path,
79 storage_options,
80 batch_size,
81 compression,
82 max_row_group_size,
83 )
84 .expect("Failed to create ParquetDataCatalog"),
85 }
86 }
87
88 // TODO: Cannot pass mixed data across pyo3 as a single type
89 // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: PoseiDataType, data: Vec<Data>) {}
90
91 /// Write quote tick data to Parquet files.
92 ///
93 /// # Parameters
94 ///
95 /// - `data`: Vector of quote ticks to write
96 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
97 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
98 ///
99 /// # Returns
100 ///
101 /// Returns the path of the created file as a string.
102 #[pyo3(signature = (data, start=None, end=None))]
103 pub fn write_quote_ticks(
104 &self,
105 data: Vec<QuoteTick>,
106 start: Option<u64>,
107 end: Option<u64>,
108 ) -> PyResult<String> {
109 // Convert u64 timestamps to UnixNanos
110 let start_nanos = start.map(UnixNanos::from);
111 let end_nanos = end.map(UnixNanos::from);
112
113 self.inner
114 .write_to_parquet(data, start_nanos, end_nanos)
115 .map(|path| path.to_string_lossy().to_string())
116 .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
117 }
118
119 /// Write trade tick data to Parquet files.
120 ///
121 /// # Parameters
122 ///
123 /// - `data`: Vector of trade ticks to write
124 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
125 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
126 ///
127 /// # Returns
128 ///
129 /// Returns the path of the created file as a string.
130 #[pyo3(signature = (data, start=None, end=None))]
131 pub fn write_trade_ticks(
132 &self,
133 data: Vec<TradeTick>,
134 start: Option<u64>,
135 end: Option<u64>,
136 ) -> PyResult<String> {
137 // Convert u64 timestamps to UnixNanos
138 let start_nanos = start.map(UnixNanos::from);
139 let end_nanos = end.map(UnixNanos::from);
140
141 self.inner
142 .write_to_parquet(data, start_nanos, end_nanos)
143 .map(|path| path.to_string_lossy().to_string())
144 .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
145 }
146
147 /// Write order book delta data to Parquet files.
148 ///
149 /// # Parameters
150 ///
151 /// - `data`: Vector of order book deltas to write
152 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
153 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
154 ///
155 /// # Returns
156 ///
157 /// Returns the path of the created file as a string.
158 #[pyo3(signature = (data, start=None, end=None))]
159 pub fn write_order_book_deltas(
160 &self,
161 data: Vec<OrderBookDelta>,
162 start: Option<u64>,
163 end: Option<u64>,
164 ) -> PyResult<String> {
165 // Convert u64 timestamps to UnixNanos
166 let start_nanos = start.map(UnixNanos::from);
167 let end_nanos = end.map(UnixNanos::from);
168
169 self.inner
170 .write_to_parquet(data, start_nanos, end_nanos)
171 .map(|path| path.to_string_lossy().to_string())
172 .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
173 }
174
175 /// Write bar data to Parquet files.
176 ///
177 /// # Parameters
178 ///
179 /// - `data`: Vector of bars to write
180 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
181 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
182 ///
183 /// # Returns
184 ///
185 /// Returns the path of the created file as a string.
186 #[pyo3(signature = (data, start=None, end=None))]
187 pub fn write_bars(
188 &self,
189 data: Vec<Bar>,
190 start: Option<u64>,
191 end: Option<u64>,
192 ) -> PyResult<String> {
193 // Convert u64 timestamps to UnixNanos
194 let start_nanos = start.map(UnixNanos::from);
195 let end_nanos = end.map(UnixNanos::from);
196
197 self.inner
198 .write_to_parquet(data, start_nanos, end_nanos)
199 .map(|path| path.to_string_lossy().to_string())
200 .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
201 }
202
203 /// Write order book depth data to Parquet files.
204 ///
205 /// # Parameters
206 ///
207 /// - `data`: Vector of order book depths to write
208 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
209 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
210 ///
211 /// # Returns
212 ///
213 /// Returns the path of the created file as a string.
214 #[pyo3(signature = (data, start=None, end=None))]
215 pub fn write_order_book_depths(
216 &self,
217 data: Vec<OrderBookDepth10>,
218 start: Option<u64>,
219 end: Option<u64>,
220 ) -> PyResult<String> {
221 // Convert u64 timestamps to UnixNanos
222 let start_nanos = start.map(UnixNanos::from);
223 let end_nanos = end.map(UnixNanos::from);
224
225 self.inner
226 .write_to_parquet(data, start_nanos, end_nanos)
227 .map(|path| path.to_string_lossy().to_string())
228 .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
229 }
230
231 /// Write mark price update data to Parquet files.
232 ///
233 /// # Parameters
234 ///
235 /// - `data`: Vector of mark price updates to write
236 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
237 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
238 ///
239 /// # Returns
240 ///
241 /// Returns the path of the created file as a string.
242 #[pyo3(signature = (data, start=None, end=None))]
243 pub fn write_mark_price_updates(
244 &self,
245 data: Vec<MarkPriceUpdate>,
246 start: Option<u64>,
247 end: Option<u64>,
248 ) -> PyResult<String> {
249 // Convert u64 timestamps to UnixNanos
250 let start_nanos = start.map(UnixNanos::from);
251 let end_nanos = end.map(UnixNanos::from);
252
253 self.inner
254 .write_to_parquet(data, start_nanos, end_nanos)
255 .map(|path| path.to_string_lossy().to_string())
256 .map_err(|e| PyIOError::new_err(format!("Failed to write mark price updates: {e}")))
257 }
258
259 /// Write index price update data to Parquet files.
260 ///
261 /// # Parameters
262 ///
263 /// - `data`: Vector of index price updates to write
264 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
265 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
266 ///
267 /// # Returns
268 ///
269 /// Returns the path of the created file as a string.
270 #[pyo3(signature = (data, start=None, end=None))]
271 pub fn write_index_price_updates(
272 &self,
273 data: Vec<IndexPriceUpdate>,
274 start: Option<u64>,
275 end: Option<u64>,
276 ) -> PyResult<String> {
277 // Convert u64 timestamps to UnixNanos
278 let start_nanos = start.map(UnixNanos::from);
279 let end_nanos = end.map(UnixNanos::from);
280
281 self.inner
282 .write_to_parquet(data, start_nanos, end_nanos)
283 .map(|path| path.to_string_lossy().to_string())
284 .map_err(|e| PyIOError::new_err(format!("Failed to write index price updates: {e}")))
285 }
286
287 /// Extend file names in the catalog with additional timestamp information.
288 ///
289 /// # Parameters
290 ///
291 /// - `data_cls`: The data class name
292 /// - `instrument_id`: Optional instrument ID filter
293 /// - `start`: Start timestamp (nanoseconds since Unix epoch)
294 /// - `end`: End timestamp (nanoseconds since Unix epoch)
295 #[pyo3(signature = (data_cls, instrument_id=None, *, start, end))]
296 pub fn extend_file_name(
297 &self,
298 data_cls: &str,
299 instrument_id: Option<String>,
300 start: u64,
301 end: u64,
302 ) -> PyResult<()> {
303 // Convert u64 timestamps to UnixNanos
304 let start_nanos = UnixNanos::from(start);
305 let end_nanos = UnixNanos::from(end);
306
307 self.inner
308 .extend_file_name(data_cls, instrument_id, start_nanos, end_nanos)
309 .map_err(|e| PyIOError::new_err(format!("Failed to extend file name: {e}")))
310 }
311
312 /// Consolidate all data files in the catalog within the specified time range.
313 ///
314 /// # Parameters
315 ///
316 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
317 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
318 /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
319 #[pyo3(signature = (start=None, end=None, ensure_contiguous_files=None))]
320 pub fn consolidate_catalog(
321 &self,
322 start: Option<u64>,
323 end: Option<u64>,
324 ensure_contiguous_files: Option<bool>,
325 ) -> PyResult<()> {
326 // Convert u64 timestamps to UnixNanos
327 let start_nanos = start.map(UnixNanos::from);
328 let end_nanos = end.map(UnixNanos::from);
329
330 self.inner
331 .consolidate_catalog(start_nanos, end_nanos, ensure_contiguous_files)
332 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
333 }
334
335 /// Consolidate data files for a specific data type within the specified time range.
336 ///
337 /// # Parameters
338 ///
339 /// - `type_name`: The data type name to consolidate
340 /// - `instrument_id`: Optional instrument ID filter
341 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
342 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
343 /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
344 #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None, ensure_contiguous_files=None))]
345 pub fn consolidate_data(
346 &self,
347 type_name: &str,
348 instrument_id: Option<String>,
349 start: Option<u64>,
350 end: Option<u64>,
351 ensure_contiguous_files: Option<bool>,
352 ) -> PyResult<()> {
353 // Convert u64 timestamps to UnixNanos
354 let start_nanos = start.map(UnixNanos::from);
355 let end_nanos = end.map(UnixNanos::from);
356
357 self.inner
358 .consolidate_data(
359 type_name,
360 instrument_id,
361 start_nanos,
362 end_nanos,
363 ensure_contiguous_files,
364 )
365 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
366 }
367
368 /// Reset all catalog file names to their canonical form.
369 pub fn reset_catalog_file_names(&self) -> PyResult<()> {
370 self.inner
371 .reset_catalog_file_names()
372 .map_err(|e| PyIOError::new_err(format!("Failed to reset catalog file names: {e}")))
373 }
374
375 /// Reset data file names for a specific data class to their canonical form.
376 ///
377 /// # Parameters
378 ///
379 /// - `data_cls`: The data class name
380 /// - `instrument_id`: Optional instrument ID filter
381 #[pyo3(signature = (data_cls, instrument_id=None))]
382 pub fn reset_data_file_names(
383 &self,
384 data_cls: &str,
385 instrument_id: Option<String>,
386 ) -> PyResult<()> {
387 self.inner
388 .reset_data_file_names(data_cls, instrument_id)
389 .map_err(|e| PyIOError::new_err(format!("Failed to reset data file names: {e}")))
390 }
391
392 /// Query files in the catalog matching the specified criteria.
393 ///
394 /// # Parameters
395 ///
396 /// - `data_cls`: The data class name to query
397 /// - `instrument_ids`: Optional list of instrument IDs to filter by
398 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
399 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
400 ///
401 /// # Returns
402 ///
403 /// Returns a list of file paths matching the criteria.
404 #[pyo3(signature = (data_cls, instrument_ids=None, start=None, end=None))]
405 pub fn query_files(
406 &self,
407 data_cls: &str,
408 instrument_ids: Option<Vec<String>>,
409 start: Option<u64>,
410 end: Option<u64>,
411 ) -> PyResult<Vec<String>> {
412 // Convert u64 timestamps to UnixNanos
413 let start_nanos = start.map(UnixNanos::from);
414 let end_nanos = end.map(UnixNanos::from);
415
416 self.inner
417 .query_files(data_cls, instrument_ids, start_nanos, end_nanos)
418 .map_err(|e| PyIOError::new_err(format!("Failed to query files list: {e}")))
419 }
420
421 /// Get missing time intervals for a data request.
422 ///
423 /// # Parameters
424 ///
425 /// - `start`: Start timestamp (nanoseconds since Unix epoch)
426 /// - `end`: End timestamp (nanoseconds since Unix epoch)
427 /// - `data_cls`: The data class name
428 /// - `instrument_id`: Optional instrument ID filter
429 ///
430 /// # Returns
431 ///
432 /// Returns a list of (start, end) timestamp tuples representing missing intervals.
433 #[pyo3(signature = (start, end, data_cls, instrument_id=None))]
434 pub fn get_missing_intervals_for_request(
435 &self,
436 start: u64,
437 end: u64,
438 data_cls: &str,
439 instrument_id: Option<String>,
440 ) -> PyResult<Vec<(u64, u64)>> {
441 self.inner
442 .get_missing_intervals_for_request(start, end, data_cls, instrument_id)
443 .map_err(|e| PyIOError::new_err(format!("Failed to get missing intervals: {e}")))
444 }
445
446 /// Query the last timestamp for a specific data class and instrument.
447 ///
448 /// # Parameters
449 ///
450 /// - `data_cls`: The data class name
451 /// - `instrument_id`: Optional instrument ID filter
452 ///
453 /// # Returns
454 ///
455 /// Returns the last timestamp as nanoseconds since Unix epoch, or None if no data exists.
456 #[pyo3(signature = (data_cls, instrument_id=None))]
457 pub fn query_last_timestamp(
458 &self,
459 data_cls: &str,
460 instrument_id: Option<String>,
461 ) -> PyResult<Option<u64>> {
462 self.inner
463 .query_last_timestamp(data_cls, instrument_id)
464 .map_err(|e| PyIOError::new_err(format!("Failed to query last timestamp: {e}")))
465 }
466
467 /// Get time intervals covered by data for a specific data class and instrument.
468 ///
469 /// # Parameters
470 ///
471 /// - `data_cls`: The data class name
472 /// - `instrument_id`: Optional instrument ID filter
473 ///
474 /// # Returns
475 ///
476 /// Returns a list of (start, end) timestamp tuples representing covered intervals.
477 #[pyo3(signature = (data_cls, instrument_id=None))]
478 pub fn get_intervals(
479 &self,
480 data_cls: &str,
481 instrument_id: Option<String>,
482 ) -> PyResult<Vec<(u64, u64)>> {
483 self.inner
484 .get_intervals(data_cls, instrument_id)
485 .map_err(|e| PyIOError::new_err(format!("Failed to get intervals: {e}")))
486 }
487}