nautilus_persistence/backend/catalog.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3// https://poseitrader.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a comprehensive data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.)
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │ └── INSTRUMENT_ID/
37//! │ └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │ └── INSTRUMENT_ID/
40//! │ └── start_ts-end_ts.parquet
41//! └── bars/
42//! └── INSTRUMENT_ID/
43//! └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//! PathBuf::from("/path/to/data"),
55//! None, // storage_options
56//! Some(5000), // batch_size
57//! None, // compression (defaults to SNAPPY)
58//! None, // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66 fmt::Debug,
67 ops::Bound,
68 path::{Path, PathBuf},
69 sync::Arc,
70};
71
72use datafusion::arrow::record_batch::RecordBatch;
73use futures::StreamExt;
74use heck::ToSnakeCase;
75use itertools::Itertools;
76use log::info;
77use nautilus_core::{
78 UnixNanos,
79 datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
80};
81use nautilus_model::data::{
82 Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
83 QuoteTick, TradeTick, close::InstrumentClose,
84};
85use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
86use object_store::{ObjectStore, path::Path as ObjectPath};
87use serde::Serialize;
88use unbounded_interval_tree::interval_tree::IntervalTree;
89
90use super::session::{self, DataBackendSession, QueryResult, build_query};
91use crate::parquet::{
92 combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
93 write_batches_to_object_store,
94};
95
96/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
97///
98/// The `ParquetDataCatalog` provides a comprehensive solution for managing large volumes of financial
99/// market data with efficient storage, querying, and consolidation capabilities. It supports various
100/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
101///
102/// # Features
103///
104/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression
105/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate
106/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance
107/// - **Data Validation**: Ensures timestamp ordering and interval consistency
108/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed
109/// - **Type Safety**: Strongly typed data handling with compile-time guarantees
110///
111/// # Data Organization
112///
113/// Data is organized hierarchically by data type and instrument:
114/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`
115/// - Files are named with their timestamp ranges for efficient range queries
116/// - Intervals are validated to be disjoint to prevent data overlap
117///
118/// # Performance Considerations
119///
120/// - **Batch Size**: Controls memory usage during data processing
121/// - **Compression**: SNAPPY compression provides good balance of speed and size
122/// - **Row Group Size**: Affects query performance and memory usage
123/// - **File Consolidation**: Reduces the number of files for better query performance
124pub struct ParquetDataCatalog {
125 /// The base path for data storage within the object store.
126 base_path: String,
127 /// The original URI provided when creating the catalog.
128 original_uri: String,
129 /// The object store backend for data persistence.
130 object_store: Arc<dyn ObjectStore>,
131 /// The DataFusion session for query execution.
132 session: DataBackendSession,
133 /// The number of records to process in each batch.
134 batch_size: usize,
135 /// The compression algorithm used for Parquet files.
136 compression: parquet::basic::Compression,
137 /// The maximum number of rows in each Parquet row group.
138 max_row_group_size: usize,
139}
140
141impl Debug for ParquetDataCatalog {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct(stringify!(ParquetDataCatalog))
144 .field("base_path", &self.base_path)
145 .finish()
146 }
147}
148
149impl ParquetDataCatalog {
150 /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
151 ///
152 /// This is a convenience constructor that converts a local path to a URI format
153 /// and delegates to [`Self::from_uri`].
154 ///
155 /// # Parameters
156 ///
157 /// - `base_path`: The base directory path for data storage.
158 /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
159 /// - `batch_size`: Number of records to process in each batch (default: 5000).
160 /// - `compression`: Parquet compression algorithm (default: SNAPPY).
161 /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
162 ///
163 /// # Panics
164 ///
165 /// Panics if the path cannot be converted to a valid URI or if the object store
166 /// cannot be created from the path.
167 ///
168 /// # Examples
169 ///
170 /// ```rust,no_run
171 /// use std::path::PathBuf;
172 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
173 ///
174 /// let catalog = ParquetDataCatalog::new(
175 /// PathBuf::from("/tmp/posei_trader"),
176 /// None, // no storage options
177 /// Some(1000), // smaller batch size
178 /// None, // default compression
179 /// None, // default row group size
180 /// );
181 /// ```
182 #[must_use]
183 pub fn new(
184 base_path: PathBuf,
185 storage_options: Option<std::collections::HashMap<String, String>>,
186 batch_size: Option<usize>,
187 compression: Option<parquet::basic::Compression>,
188 max_row_group_size: Option<usize>,
189 ) -> Self {
190 let path_str = base_path.to_string_lossy().to_string();
191 Self::from_uri(
192 &path_str,
193 storage_options,
194 batch_size,
195 compression,
196 max_row_group_size,
197 )
198 .expect("Failed to create catalog from path")
199 }
200
201 /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
202 ///
203 /// Supports various URI schemes including local file paths and multiple cloud storage backends
204 /// supported by the `object_store` crate.
205 ///
206 /// # Supported URI Schemes
207 ///
208 /// - **AWS S3**: `s3://bucket/path`
209 /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`
210 /// - **Azure Blob Storage**: `azure://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`
211 /// - **HTTP/WebDAV**: `http://` or `https://`
212 /// - **Local files**: `file://path` or plain paths
213 ///
214 /// # Parameters
215 ///
216 /// - `uri`: The URI for the data storage location.
217 /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
218 /// - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
219 /// - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
220 /// - For Azure: `account_name`, `account_key`, `sas_token`, etc.
221 /// - `batch_size`: Number of records to process in each batch (default: 5000).
222 /// - `compression`: Parquet compression algorithm (default: SNAPPY).
223 /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
224 ///
225 /// # Errors
226 ///
227 /// Returns an error if:
228 /// - The URI format is invalid or unsupported.
229 /// - The object store cannot be created or accessed.
230 /// - Authentication fails for cloud storage backends.
231 ///
232 /// # Examples
233 ///
234 /// ```rust,no_run
235 /// use std::collections::HashMap;
236 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
237 ///
238 /// // Local filesystem
239 /// let local_catalog = ParquetDataCatalog::from_uri(
240 /// "/tmp/posei_trader",
241 /// None, None, None, None
242 /// )?;
243 ///
244 /// // S3 bucket
245 /// let s3_catalog = ParquetDataCatalog::from_uri(
246 /// "s3://my-bucket/nautilus-data",
247 /// None, None, None, None
248 /// )?;
249 ///
250 /// // Google Cloud Storage
251 /// let gcs_catalog = ParquetDataCatalog::from_uri(
252 /// "gs://my-bucket/nautilus-data",
253 /// None, None, None, None
254 /// )?;
255 ///
256 /// // Azure Blob Storage
257 /// let azure_catalog = ParquetDataCatalog::from_uri(
258 /// "azure://account/container/nautilus-data",
259 /// None, None, None, None
260 /// )?;
261 ///
262 /// // S3 with custom endpoint and credentials
263 /// let mut storage_options = HashMap::new();
264 /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
265 /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
266 /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
267 ///
268 /// let s3_catalog = ParquetDataCatalog::from_uri(
269 /// "s3://my-bucket/nautilus-data",
270 /// Some(storage_options),
271 /// None, None, None,
272 /// )?;
273 /// # Ok::<(), anyhow::Error>(())
274 /// ```
275 pub fn from_uri(
276 uri: &str,
277 storage_options: Option<std::collections::HashMap<String, String>>,
278 batch_size: Option<usize>,
279 compression: Option<parquet::basic::Compression>,
280 max_row_group_size: Option<usize>,
281 ) -> anyhow::Result<Self> {
282 let batch_size = batch_size.unwrap_or(5000);
283 let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
284 let max_row_group_size = max_row_group_size.unwrap_or(5000);
285
286 let (object_store, base_path, original_uri) =
287 crate::parquet::create_object_store_from_path(uri, storage_options)?;
288
289 Ok(Self {
290 base_path,
291 original_uri,
292 object_store,
293 session: session::DataBackendSession::new(batch_size),
294 batch_size,
295 compression,
296 max_row_group_size,
297 })
298 }
299
300 /// Writes mixed data types to the catalog by separating them into type-specific collections.
301 ///
302 /// This method takes a heterogeneous collection of market data and separates it by type,
303 /// then writes each type to its appropriate location in the catalog. This is useful when
304 /// processing mixed data streams or bulk data imports.
305 ///
306 /// # Parameters
307 ///
308 /// - `data`: A vector of mixed [`Data`] enum variants.
309 /// - `start`: Optional start timestamp to override the data's natural range.
310 /// - `end`: Optional end timestamp to override the data's natural range.
311 ///
312 /// # Notes
313 ///
314 /// - Data is automatically sorted by type before writing.
315 /// - Each data type is written to its own directory structure.
316 /// - Instrument data handling is not yet implemented (TODO).
317 ///
318 /// # Examples
319 ///
320 /// ```rust,no_run
321 /// use nautilus_model::data::Data;
322 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
323 ///
324 /// let catalog = ParquetDataCatalog::new(/* ... */);
325 /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
326 ///
327 /// catalog.write_data_enum(mixed_data, None, None)?;
328 /// ```
329 pub fn write_data_enum(
330 &self,
331 data: Vec<Data>,
332 start: Option<UnixNanos>,
333 end: Option<UnixNanos>,
334 ) -> anyhow::Result<()> {
335 let mut deltas: Vec<OrderBookDelta> = Vec::new();
336 let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
337 let mut quotes: Vec<QuoteTick> = Vec::new();
338 let mut trades: Vec<TradeTick> = Vec::new();
339 let mut bars: Vec<Bar> = Vec::new();
340 let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
341 let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
342 let mut closes: Vec<InstrumentClose> = Vec::new();
343
344 for d in data.iter().cloned() {
345 match d {
346 Data::Deltas(_) => continue,
347 Data::Delta(d) => {
348 deltas.push(d);
349 }
350 Data::Depth10(d) => {
351 depth10s.push(*d);
352 }
353 Data::Quote(d) => {
354 quotes.push(d);
355 }
356 Data::Trade(d) => {
357 trades.push(d);
358 }
359 Data::Bar(d) => {
360 bars.push(d);
361 }
362 Data::MarkPriceUpdate(p) => {
363 mark_prices.push(p);
364 }
365 Data::IndexPriceUpdate(p) => {
366 index_prices.push(p);
367 }
368 Data::InstrumentClose(c) => {
369 closes.push(c);
370 }
371 }
372 }
373
374 // TODO: need to handle instruments here
375
376 self.write_to_parquet(deltas, start, end)?;
377 self.write_to_parquet(depth10s, start, end)?;
378 self.write_to_parquet(quotes, start, end)?;
379 self.write_to_parquet(trades, start, end)?;
380 self.write_to_parquet(bars, start, end)?;
381 self.write_to_parquet(mark_prices, start, end)?;
382 self.write_to_parquet(index_prices, start, end)?;
383 self.write_to_parquet(closes, start, end)?;
384
385 Ok(())
386 }
387
388 /// Writes typed data to a Parquet file in the catalog.
389 ///
390 /// This is the core method for persisting market data to the catalog. It handles data
391 /// validation, batching, compression, and ensures proper file organization with
392 /// timestamp-based naming.
393 ///
394 /// # Type Parameters
395 ///
396 /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
397 ///
398 /// # Parameters
399 ///
400 /// - `data`: Vector of data records to write (must be in ascending timestamp order).
401 /// - `start`: Optional start timestamp to override the natural data range.
402 /// - `end`: Optional end timestamp to override the natural data range.
403 ///
404 /// # Returns
405 ///
406 /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
407 ///
408 /// # Errors
409 ///
410 /// This function will return an error if:
411 /// - Data serialization to Arrow record batches fails
412 /// - Object store write operations fail
413 /// - File path construction fails
414 /// - Timestamp interval validation fails after writing
415 ///
416 /// # Panics
417 ///
418 /// Panics if:
419 /// - Data timestamps are not in ascending order
420 /// - Record batches are empty after conversion
421 /// - Required metadata is missing from the schema
422 ///
423 /// # Examples
424 ///
425 /// ```rust,no_run
426 /// use nautilus_model::data::QuoteTick;
427 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
428 ///
429 /// let catalog = ParquetDataCatalog::new(/* ... */);
430 /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
431 ///
432 /// let path = catalog.write_to_parquet(quotes, None, None)?;
433 /// println!("Data written to: {:?}", path);
434 /// # Ok::<(), anyhow::Error>(())
435 /// ```
436 pub fn write_to_parquet<T>(
437 &self,
438 data: Vec<T>,
439 start: Option<UnixNanos>,
440 end: Option<UnixNanos>,
441 ) -> anyhow::Result<PathBuf>
442 where
443 T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
444 {
445 if data.is_empty() {
446 return Ok(PathBuf::new());
447 }
448
449 let type_name = std::any::type_name::<T>().to_snake_case();
450 Self::check_ascending_timestamps(&data, &type_name)?;
451
452 let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
453 let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
454
455 let batches = self.data_to_record_batches(data)?;
456 let schema = batches.first().expect("Batches are empty.").schema();
457 let instrument_id = schema.metadata.get("instrument_id").cloned();
458
459 let directory = self.make_path(T::path_prefix(), instrument_id)?;
460 let filename = timestamps_to_filename(start_ts, end_ts);
461 let path = PathBuf::from(format!("{directory}/{filename}"));
462
463 // Write all batches to parquet file
464 info!(
465 "Writing {} batches of {type_name} data to {path:?}",
466 batches.len()
467 );
468
469 // Convert path to object store path
470 let object_path = self.to_object_path(&path.to_string_lossy());
471
472 self.execute_async(async {
473 write_batches_to_object_store(
474 &batches,
475 self.object_store.clone(),
476 &object_path,
477 Some(self.compression),
478 Some(self.max_row_group_size),
479 )
480 .await
481 })?;
482 let intervals = self.get_directory_intervals(&directory)?;
483
484 if !are_intervals_disjoint(&intervals) {
485 anyhow::bail!("Intervals are not disjoint after writing a new file");
486 }
487
488 Ok(path)
489 }
490
491 /// Writes typed data to a JSON file in the catalog.
492 ///
493 /// This method provides an alternative to Parquet format for data export and debugging.
494 /// JSON files are human-readable but less efficient for large datasets.
495 ///
496 /// # Type Parameters
497 ///
498 /// - `T`: The data type to write, must implement serialization and cataloging traits.
499 ///
500 /// # Parameters
501 ///
502 /// - `data`: Vector of data records to write (must be in ascending timestamp order).
503 /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
504 /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
505 ///
506 /// # Returns
507 ///
508 /// Returns the [`PathBuf`] of the created JSON file.
509 ///
510 /// # Errors
511 ///
512 /// This function will return an error if:
513 /// - JSON serialization fails
514 /// - Object store write operations fail
515 /// - File path construction fails
516 ///
517 /// # Panics
518 ///
519 /// Panics if data timestamps are not in ascending order.
520 ///
521 /// # Examples
522 ///
523 /// ```rust,no_run
524 /// use std::path::PathBuf;
525 /// use nautilus_model::data::TradeTick;
526 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
527 ///
528 /// let catalog = ParquetDataCatalog::new(/* ... */);
529 /// let trades: Vec<TradeTick> = vec![/* trade data */];
530 ///
531 /// let path = catalog.write_to_json(
532 /// trades,
533 /// Some(PathBuf::from("/custom/path")),
534 /// true // write metadata
535 /// )?;
536 /// # Ok::<(), anyhow::Error>(())
537 /// ```
538 pub fn write_to_json<T>(
539 &self,
540 data: Vec<T>,
541 path: Option<PathBuf>,
542 write_metadata: bool,
543 ) -> anyhow::Result<PathBuf>
544 where
545 T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
546 {
547 if data.is_empty() {
548 return Ok(PathBuf::new());
549 }
550
551 let type_name = std::any::type_name::<T>().to_snake_case();
552 Self::check_ascending_timestamps(&data, &type_name)?;
553
554 let start_ts = data.first().unwrap().ts_init();
555 let end_ts = data.last().unwrap().ts_init();
556
557 let directory =
558 path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
559 let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
560 let json_path = directory.join(&filename);
561
562 info!(
563 "Writing {} records of {type_name} data to {json_path:?}",
564 data.len()
565 );
566
567 if write_metadata {
568 let metadata = T::chunk_metadata(&data);
569 let metadata_path = json_path.with_extension("metadata.json");
570 info!("Writing metadata to {metadata_path:?}");
571
572 // Use object store for metadata file
573 let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
574 let metadata_json = serde_json::to_vec_pretty(&metadata)?;
575 self.execute_async(async {
576 self.object_store
577 .put(&metadata_object_path, metadata_json.into())
578 .await
579 .map_err(anyhow::Error::from)
580 })?;
581 }
582
583 // Use object store for main JSON file
584 let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
585 let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
586 self.execute_async(async {
587 self.object_store
588 .put(&json_object_path, json_data.into())
589 .await
590 .map_err(anyhow::Error::from)
591 })?;
592
593 Ok(json_path)
594 }
595
596 /// Validates that data timestamps are in ascending order.
597 ///
598 /// # Parameters
599 ///
600 /// - `data`: Slice of data records to validate.
601 /// - `type_name`: Name of the data type for error messages.
602 ///
603 /// # Panics
604 ///
605 /// Panics if any timestamp is less than the previous timestamp.
606 fn check_ascending_timestamps<T: HasTsInit>(data: &[T], type_name: &str) -> anyhow::Result<()> {
607 if !data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()) {
608 anyhow::bail!("{type_name} timestamps must be in ascending order");
609 }
610
611 Ok(())
612 }
613
614 /// Converts data into Arrow record batches for Parquet serialization.
615 ///
616 /// This method chunks the data according to the configured batch size and converts
617 /// each chunk into an Arrow record batch with appropriate metadata.
618 ///
619 /// # Type Parameters
620 ///
621 /// - `T`: The data type to convert, must implement required encoding traits.
622 ///
623 /// # Parameters
624 ///
625 /// - `data`: Vector of data records to convert
626 ///
627 /// # Returns
628 ///
629 /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
630 ///
631 /// # Errors
632 ///
633 /// Returns an error if record batch encoding fails for any chunk.
634 pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
635 where
636 T: HasTsInit + EncodeToRecordBatch,
637 {
638 let mut batches = Vec::new();
639
640 for chunk in &data.into_iter().chunks(self.batch_size) {
641 let data = chunk.collect_vec();
642 let metadata = EncodeToRecordBatch::chunk_metadata(&data);
643 let record_batch = T::encode_batch(&metadata, &data)?;
644 batches.push(record_batch);
645 }
646
647 Ok(batches)
648 }
649
650 /// Extends the timestamp range of an existing Parquet file by renaming it.
651 ///
652 /// This method finds an existing file that is adjacent to the specified time range
653 /// and renames it to include the new range. This is useful when appending data
654 /// that extends the time coverage of existing files.
655 ///
656 /// # Parameters
657 ///
658 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
659 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
660 /// - `start`: Start timestamp of the new range to extend to.
661 /// - `end`: End timestamp of the new range to extend to.
662 ///
663 /// # Returns
664 ///
665 /// Returns `Ok(())` on success, or an error if the operation fails.
666 ///
667 /// # Errors
668 ///
669 /// This function will return an error if:
670 /// - The directory path cannot be constructed.
671 /// - No adjacent file is found to extend.
672 /// - File rename operations fail.
673 /// - Interval validation fails after extension.
674 ///
675 /// # Examples
676 ///
677 /// ```rust,no_run
678 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
679 /// use nautilus_core::UnixNanos;
680 ///
681 /// let catalog = ParquetDataCatalog::new(/* ... */);
682 ///
683 /// // Extend a file's range backwards or forwards
684 /// catalog.extend_file_name(
685 /// "quotes",
686 /// Some("BTCUSD".to_string()),
687 /// UnixNanos::from(1609459200000000000),
688 /// UnixNanos::from(1609545600000000000)
689 /// )?;
690 /// # Ok::<(), anyhow::Error>(())
691 /// ```
692 pub fn extend_file_name(
693 &self,
694 data_cls: &str,
695 instrument_id: Option<String>,
696 start: UnixNanos,
697 end: UnixNanos,
698 ) -> anyhow::Result<()> {
699 let directory = self.make_path(data_cls, instrument_id)?;
700 let intervals = self.get_directory_intervals(&directory)?;
701
702 let start = start.as_u64();
703 let end = end.as_u64();
704
705 for interval in intervals {
706 if interval.0 == end + 1 {
707 // Extend backwards: new file covers [start, interval.1]
708 self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
709 break;
710 } else if interval.1 == start - 1 {
711 // Extend forwards: new file covers [interval.0, end]
712 self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
713 break;
714 }
715 }
716
717 let intervals = self.get_directory_intervals(&directory)?;
718
719 if !are_intervals_disjoint(&intervals) {
720 anyhow::bail!("Intervals are not disjoint after extending a file");
721 }
722
723 Ok(())
724 }
725
726 /// Helper method to list parquet files in a directory
727 fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
728 self.execute_async(async {
729 let prefix = ObjectPath::from(format!("{directory}/"));
730 let mut stream = self.object_store.list(Some(&prefix));
731 let mut files = Vec::new();
732
733 while let Some(object) = stream.next().await {
734 let object = object?;
735 if object.location.as_ref().ends_with(".parquet") {
736 files.push(object.location.to_string());
737 }
738 }
739 Ok::<Vec<String>, anyhow::Error>(files)
740 })
741 }
742
743 /// Helper method to reconstruct full URI for remote object store paths
744 fn reconstruct_full_uri(&self, path_str: &str) -> String {
745 // Check if this is a remote URI scheme that needs reconstruction
746 if self.is_remote_uri() {
747 // Extract the base URL (scheme + host) from the original URI
748 if let Ok(url) = url::Url::parse(&self.original_uri) {
749 if let Some(host) = url.host_str() {
750 return format!("{}://{}/{}", url.scheme(), host, path_str);
751 }
752 }
753 }
754
755 // For local paths or if URL parsing fails, return the path as-is
756 path_str.to_string()
757 }
758
759 /// Helper method to check if the original URI uses a remote object store scheme
760 fn is_remote_uri(&self) -> bool {
761 self.original_uri.starts_with("s3://")
762 || self.original_uri.starts_with("gs://")
763 || self.original_uri.starts_with("gcs://")
764 || self.original_uri.starts_with("azure://")
765 || self.original_uri.starts_with("abfs://")
766 || self.original_uri.starts_with("http://")
767 || self.original_uri.starts_with("https://")
768 }
769
770 /// Consolidates all data files in the catalog by merging multiple files into single files per directory.
771 ///
772 /// This method finds all leaf data directories in the catalog and consolidates the Parquet files
773 /// within each directory. Consolidation improves query performance by reducing the number of files
774 /// that need to be read and can also reduce storage overhead.
775 ///
776 /// # Parameters
777 ///
778 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
779 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
780 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
781 ///
782 /// # Returns
783 ///
784 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
785 ///
786 /// # Errors
787 ///
788 /// This function will return an error if:
789 /// - Directory listing fails.
790 /// - File consolidation operations fail.
791 /// - Interval validation fails (when `ensure_contiguous_files` is true).
792 ///
793 /// # Examples
794 ///
795 /// ```rust,no_run
796 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
797 /// use nautilus_core::UnixNanos;
798 ///
799 /// let catalog = ParquetDataCatalog::new(/* ... */);
800 ///
801 /// // Consolidate all files in the catalog
802 /// catalog.consolidate_catalog(None, None, None)?;
803 ///
804 /// // Consolidate only files within a specific time range
805 /// catalog.consolidate_catalog(
806 /// Some(UnixNanos::from(1609459200000000000)),
807 /// Some(UnixNanos::from(1609545600000000000)),
808 /// Some(true)
809 /// )?;
810 /// # Ok::<(), anyhow::Error>(())
811 /// ```
812 pub fn consolidate_catalog(
813 &self,
814 start: Option<UnixNanos>,
815 end: Option<UnixNanos>,
816 ensure_contiguous_files: Option<bool>,
817 ) -> anyhow::Result<()> {
818 let leaf_directories = self.find_leaf_data_directories()?;
819
820 for directory in leaf_directories {
821 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
822 }
823
824 Ok(())
825 }
826
827 /// Consolidates data files for a specific data type and instrument.
828 ///
829 /// This method consolidates Parquet files within a specific directory (defined by data type
830 /// and optional instrument ID) by merging multiple files into a single file. This improves
831 /// query performance and can reduce storage overhead.
832 ///
833 /// # Parameters
834 ///
835 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
836 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
837 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
838 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
839 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
840 ///
841 /// # Returns
842 ///
843 /// Returns `Ok(())` on success, or an error if consolidation fails.
844 ///
845 /// # Errors
846 ///
847 /// This function will return an error if:
848 /// - The directory path cannot be constructed
849 /// - File consolidation operations fail
850 /// - Interval validation fails (when `ensure_contiguous_files` is true)
851 ///
852 /// # Examples
853 ///
854 /// ```rust,no_run
855 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
856 /// use nautilus_core::UnixNanos;
857 ///
858 /// let catalog = ParquetDataCatalog::new(/* ... */);
859 ///
860 /// // Consolidate all quote files for a specific instrument
861 /// catalog.consolidate_data(
862 /// "quotes",
863 /// Some("BTCUSD".to_string()),
864 /// None,
865 /// None,
866 /// None
867 /// )?;
868 ///
869 /// // Consolidate trade files within a time range
870 /// catalog.consolidate_data(
871 /// "trades",
872 /// None,
873 /// Some(UnixNanos::from(1609459200000000000)),
874 /// Some(UnixNanos::from(1609545600000000000)),
875 /// Some(true)
876 /// )?;
877 /// # Ok::<(), anyhow::Error>(())
878 /// ```
879 pub fn consolidate_data(
880 &self,
881 type_name: &str,
882 instrument_id: Option<String>,
883 start: Option<UnixNanos>,
884 end: Option<UnixNanos>,
885 ensure_contiguous_files: Option<bool>,
886 ) -> anyhow::Result<()> {
887 let directory = self.make_path(type_name, instrument_id)?;
888 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
889 }
890
891 fn consolidate_directory(
892 &self,
893 directory: &str,
894 start: Option<UnixNanos>,
895 end: Option<UnixNanos>,
896 ensure_contiguous_files: Option<bool>,
897 ) -> anyhow::Result<()> {
898 let parquet_files = self.list_parquet_files(directory)?;
899
900 if parquet_files.len() <= 1 {
901 return Ok(());
902 }
903
904 let mut files_to_consolidate = Vec::new();
905 let mut intervals = Vec::new();
906 let start = start.map(|t| t.as_u64());
907 let end = end.map(|t| t.as_u64());
908
909 for file in parquet_files {
910 if let Some(interval) = parse_filename_timestamps(&file) {
911 let (interval_start, interval_end) = interval;
912 let include_file = match (start, end) {
913 (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
914 (Some(s), None) => interval_start >= s,
915 (None, Some(e)) => interval_end <= e,
916 (None, None) => true,
917 };
918
919 if include_file {
920 files_to_consolidate.push(file);
921 intervals.push(interval);
922 }
923 }
924 }
925
926 intervals.sort_by_key(|&(start, _)| start);
927
928 if !intervals.is_empty() {
929 let file_name = timestamps_to_filename(
930 UnixNanos::from(intervals[0].0),
931 UnixNanos::from(intervals.last().unwrap().1),
932 );
933 let path = format!("{directory}/{file_name}");
934
935 // Convert string paths to ObjectPath for the function call
936 let object_paths: Vec<ObjectPath> = files_to_consolidate
937 .iter()
938 .map(|path| ObjectPath::from(path.as_str()))
939 .collect();
940
941 self.execute_async(async {
942 combine_parquet_files_from_object_store(
943 self.object_store.clone(),
944 object_paths,
945 &ObjectPath::from(path),
946 Some(self.compression),
947 Some(self.max_row_group_size),
948 )
949 .await
950 })?;
951 }
952
953 if ensure_contiguous_files.unwrap_or(true) && !are_intervals_contiguous(&intervals) {
954 anyhow::bail!("Intervals are not disjoint after consolidating a directory");
955 }
956
957 Ok(())
958 }
959
960 /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
961 ///
962 /// This method scans all leaf data directories in the catalog and renames files based on
963 /// the actual timestamp range of their content. This is useful when files have been
964 /// modified or when filename conventions have changed.
965 ///
966 /// # Returns
967 ///
968 /// Returns `Ok(())` on success, or an error if the operation fails.
969 ///
970 /// # Errors
971 ///
972 /// This function will return an error if:
973 /// - Directory listing fails
974 /// - File metadata reading fails
975 /// - File rename operations fail
976 /// - Interval validation fails after renaming
977 ///
978 /// # Examples
979 ///
980 /// ```rust,no_run
981 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
982 ///
983 /// let catalog = ParquetDataCatalog::new(/* ... */);
984 ///
985 /// // Reset all filenames in the catalog
986 /// catalog.reset_catalog_file_names()?;
987 /// # Ok::<(), anyhow::Error>(())
988 /// ```
989 pub fn reset_catalog_file_names(&self) -> anyhow::Result<()> {
990 let leaf_directories = self.find_leaf_data_directories()?;
991
992 for directory in leaf_directories {
993 self.reset_file_names(&directory)?;
994 }
995
996 Ok(())
997 }
998
999 /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1000 ///
1001 /// This method renames files in a specific directory based on the actual timestamp
1002 /// range of their content. This is useful for correcting filenames after data
1003 /// modifications or when filename conventions have changed.
1004 ///
1005 /// # Parameters
1006 ///
1007 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1008 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1009 ///
1010 /// # Returns
1011 ///
1012 /// Returns `Ok(())` on success, or an error if the operation fails.
1013 ///
1014 /// # Errors
1015 ///
1016 /// This function will return an error if:
1017 /// - The directory path cannot be constructed
1018 /// - File metadata reading fails
1019 /// - File rename operations fail
1020 /// - Interval validation fails after renaming
1021 ///
1022 /// # Examples
1023 ///
1024 /// ```rust,no_run
1025 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1026 ///
1027 /// let catalog = ParquetDataCatalog::new(/* ... */);
1028 ///
1029 /// // Reset filenames for all quote files
1030 /// catalog.reset_data_file_names("quotes", None)?;
1031 ///
1032 /// // Reset filenames for a specific instrument's trade files
1033 /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1034 /// # Ok::<(), anyhow::Error>(())
1035 /// ```
1036 pub fn reset_data_file_names(
1037 &self,
1038 data_cls: &str,
1039 instrument_id: Option<String>,
1040 ) -> anyhow::Result<()> {
1041 let directory = self.make_path(data_cls, instrument_id)?;
1042 self.reset_file_names(&directory)
1043 }
1044
1045 /// Reset the filenames of parquet files in a directory
1046 fn reset_file_names(&self, directory: &str) -> anyhow::Result<()> {
1047 let parquet_files = self.list_parquet_files(directory)?;
1048
1049 for file in parquet_files {
1050 let object_path = ObjectPath::from(file.as_str());
1051 let (first_ts, last_ts) = self.execute_async(async {
1052 min_max_from_parquet_metadata_object_store(
1053 self.object_store.clone(),
1054 &object_path,
1055 "ts_init",
1056 )
1057 .await
1058 })?;
1059
1060 let new_filename =
1061 timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1062 let new_file_path = format!("{directory}/{new_filename}");
1063 let new_object_path = ObjectPath::from(new_file_path);
1064
1065 self.move_file(&object_path, &new_object_path)?;
1066 }
1067
1068 let intervals = self.get_directory_intervals(directory)?;
1069
1070 if !are_intervals_disjoint(&intervals) {
1071 anyhow::bail!("Intervals are not disjoint after resetting file names");
1072 }
1073
1074 Ok(())
1075 }
1076
1077 /// Finds all leaf data directories in the catalog.
1078 ///
1079 /// A leaf directory is one that contains data files but no subdirectories.
1080 /// This method is used to identify directories that can be processed for
1081 /// consolidation or other operations.
1082 ///
1083 /// # Returns
1084 ///
1085 /// Returns a vector of directory path strings representing leaf directories,
1086 /// or an error if directory traversal fails.
1087 ///
1088 /// # Errors
1089 ///
1090 /// This function will return an error if:
1091 /// - Object store listing operations fail
1092 /// - Directory structure cannot be analyzed
1093 ///
1094 /// # Examples
1095 ///
1096 /// ```rust,no_run
1097 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1098 ///
1099 /// let catalog = ParquetDataCatalog::new(/* ... */);
1100 ///
1101 /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1102 /// for dir in leaf_dirs {
1103 /// println!("Found leaf directory: {}", dir);
1104 /// }
1105 /// # Ok::<(), anyhow::Error>(())
1106 /// ```
1107 pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1108 let data_dir = if self.base_path.is_empty() {
1109 "data".to_string()
1110 } else {
1111 format!("{}/data", self.base_path)
1112 };
1113
1114 let leaf_dirs = self.execute_async(async {
1115 let mut all_paths = std::collections::HashSet::new();
1116 let mut directories = std::collections::HashSet::new();
1117 let mut files_in_dirs = std::collections::HashMap::new();
1118
1119 // List all objects under the data directory
1120 let prefix = ObjectPath::from(format!("{data_dir}/"));
1121 let mut stream = self.object_store.list(Some(&prefix));
1122
1123 while let Some(object) = stream.next().await {
1124 let object = object?;
1125 let path_str = object.location.to_string();
1126 all_paths.insert(path_str.clone());
1127
1128 // Extract directory path
1129 if let Some(parent) = std::path::Path::new(&path_str).parent() {
1130 let parent_str = parent.to_string_lossy().to_string();
1131 directories.insert(parent_str.clone());
1132
1133 // Track files in each directory
1134 files_in_dirs
1135 .entry(parent_str)
1136 .or_insert_with(Vec::new)
1137 .push(path_str);
1138 }
1139 }
1140
1141 // Find leaf directories (directories with files but no subdirectories)
1142 let mut leaf_dirs = Vec::new();
1143 for dir in &directories {
1144 let has_files = files_in_dirs
1145 .get(dir)
1146 .is_some_and(|files| !files.is_empty());
1147 let has_subdirs = directories
1148 .iter()
1149 .any(|d| d.starts_with(&format!("{dir}/")) && d != dir);
1150
1151 if has_files && !has_subdirs {
1152 leaf_dirs.push(dir.clone());
1153 }
1154 }
1155
1156 Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1157 })?;
1158
1159 Ok(leaf_dirs)
1160 }
1161
1162 /// Query data loaded in the catalog
1163 pub fn query<T>(
1164 &mut self,
1165 instrument_ids: Option<Vec<String>>,
1166 start: Option<UnixNanos>,
1167 end: Option<UnixNanos>,
1168 where_clause: Option<&str>,
1169 ) -> anyhow::Result<QueryResult>
1170 where
1171 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
1172 {
1173 // Register the object store with the session for remote URIs
1174 if self.is_remote_uri() {
1175 let url = url::Url::parse(&self.original_uri)?;
1176 let host = url
1177 .host_str()
1178 .ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
1179 let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
1180 self.session
1181 .register_object_store(&base_url, self.object_store.clone());
1182 }
1183
1184 let files_list = self.query_files(T::path_prefix(), instrument_ids, start, end)?;
1185
1186 for (idx, file_uri) in files_list.iter().enumerate() {
1187 let table_name = format!("{}_{}", T::path_prefix(), idx);
1188 let query = build_query(&table_name, start, end, where_clause);
1189
1190 self.session
1191 .add_file::<T>(&table_name, file_uri, Some(&query))?;
1192 }
1193
1194 Ok(self.session.get_query_result())
1195 }
1196
1197 /// Queries all Parquet files for a specific data type and optional instrument IDs.
1198 ///
1199 /// This method finds all Parquet files that match the specified criteria and returns
1200 /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1201 /// and timestamp range (if provided).
1202 ///
1203 /// # Parameters
1204 ///
1205 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1206 /// - `instrument_ids`: Optional list of instrument IDs to filter by.
1207 /// - `start`: Optional start timestamp to filter files by their time range.
1208 /// - `end`: Optional end timestamp to filter files by their time range.
1209 ///
1210 /// # Returns
1211 ///
1212 /// Returns a vector of file URI strings that match the query criteria,
1213 /// or an error if the query fails.
1214 ///
1215 /// # Errors
1216 ///
1217 /// This function will return an error if:
1218 /// - The directory path cannot be constructed.
1219 /// - Object store listing operations fail.
1220 /// - URI reconstruction fails.
1221 ///
1222 /// # Examples
1223 ///
1224 /// ```rust,no_run
1225 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1226 /// use nautilus_core::UnixNanos;
1227 ///
1228 /// let catalog = ParquetDataCatalog::new(/* ... */);
1229 ///
1230 /// // Query all quote files
1231 /// let files = catalog.query_files("quotes", None, None, None)?;
1232 ///
1233 /// // Query trade files for specific instruments within a time range
1234 /// let files = catalog.query_files(
1235 /// "trades",
1236 /// Some(vec!["BTCUSD".to_string(), "ETHUSD".to_string()]),
1237 /// Some(UnixNanos::from(1609459200000000000)),
1238 /// Some(UnixNanos::from(1609545600000000000))
1239 /// )?;
1240 /// # Ok::<(), anyhow::Error>(())
1241 /// ```
1242 pub fn query_files(
1243 &self,
1244 data_cls: &str,
1245 instrument_ids: Option<Vec<String>>,
1246 start: Option<UnixNanos>,
1247 end: Option<UnixNanos>,
1248 ) -> anyhow::Result<Vec<String>> {
1249 let mut files = Vec::new();
1250
1251 let start_u64 = start.map(|s| s.as_u64());
1252 let end_u64 = end.map(|e| e.as_u64());
1253
1254 let base_dir = self.make_path(data_cls, None)?;
1255
1256 // Use recursive listing to match Python's glob behavior
1257 let list_result = self.execute_async(async {
1258 let prefix = ObjectPath::from(format!("{base_dir}/"));
1259 let mut stream = self.object_store.list(Some(&prefix));
1260 let mut objects = Vec::new();
1261 while let Some(object) = stream.next().await {
1262 objects.push(object?);
1263 }
1264 Ok::<Vec<_>, anyhow::Error>(objects)
1265 })?;
1266
1267 let mut file_paths: Vec<String> = list_result
1268 .into_iter()
1269 .filter_map(|object| {
1270 let path_str = object.location.to_string();
1271 if path_str.ends_with(".parquet") {
1272 Some(path_str)
1273 } else {
1274 None
1275 }
1276 })
1277 .collect();
1278
1279 // Apply identifier filtering if provided
1280 if let Some(identifiers) = instrument_ids {
1281 let safe_identifiers: Vec<String> = identifiers
1282 .iter()
1283 .map(|id| urisafe_instrument_id(id))
1284 .collect();
1285
1286 // Exact match by default for instrument_ids or bar_types
1287 let exact_match_file_paths: Vec<String> = file_paths
1288 .iter()
1289 .filter(|file_path| {
1290 // Extract the directory name (second to last path component)
1291 let path_parts: Vec<&str> = file_path.split('/').collect();
1292 if path_parts.len() >= 2 {
1293 let dir_name = path_parts[path_parts.len() - 2];
1294 safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
1295 } else {
1296 false
1297 }
1298 })
1299 .cloned()
1300 .collect();
1301
1302 if exact_match_file_paths.is_empty() && data_cls == "bars" {
1303 // Partial match of instrument_ids in bar_types for bars
1304 file_paths.retain(|file_path| {
1305 let path_parts: Vec<&str> = file_path.split('/').collect();
1306 if path_parts.len() >= 2 {
1307 let dir_name = path_parts[path_parts.len() - 2];
1308 safe_identifiers
1309 .iter()
1310 .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
1311 } else {
1312 false
1313 }
1314 });
1315 } else {
1316 file_paths = exact_match_file_paths;
1317 }
1318 }
1319
1320 // Apply timestamp filtering
1321 file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
1322
1323 // Convert to full URIs
1324 for file_path in file_paths {
1325 let full_uri = self.reconstruct_full_uri(&file_path);
1326 files.push(full_uri);
1327 }
1328
1329 Ok(files)
1330 }
1331
1332 /// Finds the missing time intervals for a specific data type and instrument ID.
1333 ///
1334 /// This method compares a requested time range against the existing data coverage
1335 /// and returns the gaps that need to be filled. This is useful for determining
1336 /// what data needs to be fetched or backfilled.
1337 ///
1338 /// # Parameters
1339 ///
1340 /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
1341 /// - `end`: End timestamp of the requested range (Unix nanoseconds).
1342 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1343 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1344 ///
1345 /// # Returns
1346 ///
1347 /// Returns a vector of (start, end) tuples representing the missing intervals,
1348 /// or an error if the operation fails.
1349 ///
1350 /// # Errors
1351 ///
1352 /// This function will return an error if:
1353 /// - The directory path cannot be constructed
1354 /// - Interval retrieval fails
1355 /// - Gap calculation fails
1356 ///
1357 /// # Examples
1358 ///
1359 /// ```rust,no_run
1360 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1361 ///
1362 /// let catalog = ParquetDataCatalog::new(/* ... */);
1363 ///
1364 /// // Find missing intervals for quote data
1365 /// let missing = catalog.get_missing_intervals_for_request(
1366 /// 1609459200000000000, // start
1367 /// 1609545600000000000, // end
1368 /// "quotes",
1369 /// Some("BTCUSD".to_string())
1370 /// )?;
1371 ///
1372 /// for (start, end) in missing {
1373 /// println!("Missing data from {} to {}", start, end);
1374 /// }
1375 /// # Ok::<(), anyhow::Error>(())
1376 /// ```
1377 pub fn get_missing_intervals_for_request(
1378 &self,
1379 start: u64,
1380 end: u64,
1381 data_cls: &str,
1382 instrument_id: Option<String>,
1383 ) -> anyhow::Result<Vec<(u64, u64)>> {
1384 let intervals = self.get_intervals(data_cls, instrument_id)?;
1385
1386 Ok(query_interval_diff(start, end, &intervals))
1387 }
1388
1389 /// Gets the last (most recent) timestamp for a specific data type and instrument ID.
1390 ///
1391 /// This method finds the latest timestamp covered by existing data files for
1392 /// the specified data type and instrument. This is useful for determining
1393 /// the most recent data available or for incremental data updates.
1394 ///
1395 /// # Parameters
1396 ///
1397 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1398 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1399 ///
1400 /// # Returns
1401 ///
1402 /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
1403 /// or an error if the operation fails.
1404 ///
1405 /// # Errors
1406 ///
1407 /// This function will return an error if:
1408 /// - The directory path cannot be constructed
1409 /// - Interval retrieval fails
1410 ///
1411 /// # Examples
1412 ///
1413 /// ```rust,no_run
1414 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1415 ///
1416 /// let catalog = ParquetDataCatalog::new(/* ... */);
1417 ///
1418 /// // Get the last timestamp for quote data
1419 /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
1420 /// println!("Last quote timestamp: {}", last_ts);
1421 /// } else {
1422 /// println!("No quote data found");
1423 /// }
1424 /// # Ok::<(), anyhow::Error>(())
1425 /// ```
1426 pub fn query_last_timestamp(
1427 &self,
1428 data_cls: &str,
1429 instrument_id: Option<String>,
1430 ) -> anyhow::Result<Option<u64>> {
1431 let intervals = self.get_intervals(data_cls, instrument_id)?;
1432
1433 if intervals.is_empty() {
1434 return Ok(None);
1435 }
1436
1437 Ok(Some(intervals.last().unwrap().1))
1438 }
1439
1440 /// Gets the time intervals covered by Parquet files for a specific data type and instrument ID.
1441 ///
1442 /// This method returns all time intervals covered by existing data files for the
1443 /// specified data type and instrument. The intervals are sorted by start time and
1444 /// represent the complete data coverage available.
1445 ///
1446 /// # Parameters
1447 ///
1448 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1449 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1450 ///
1451 /// # Returns
1452 ///
1453 /// Returns a vector of (start, end) tuples representing the covered intervals,
1454 /// sorted by start time, or an error if the operation fails.
1455 ///
1456 /// # Errors
1457 ///
1458 /// This function will return an error if:
1459 /// - The directory path cannot be constructed.
1460 /// - Directory listing fails.
1461 /// - Filename parsing fails.
1462 ///
1463 /// # Examples
1464 ///
1465 /// ```rust,no_run
1466 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1467 ///
1468 /// let catalog = ParquetDataCatalog::new(/* ... */);
1469 ///
1470 /// // Get all intervals for quote data
1471 /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
1472 /// for (start, end) in intervals {
1473 /// println!("Data available from {} to {}", start, end);
1474 /// }
1475 /// # Ok::<(), anyhow::Error>(())
1476 /// ```
1477 pub fn get_intervals(
1478 &self,
1479 data_cls: &str,
1480 instrument_id: Option<String>,
1481 ) -> anyhow::Result<Vec<(u64, u64)>> {
1482 let directory = self.make_path(data_cls, instrument_id)?;
1483
1484 self.get_directory_intervals(&directory)
1485 }
1486
1487 /// Get the time intervals covered by parquet files in a directory
1488 fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
1489 let mut intervals = Vec::new();
1490
1491 // Use object store for all operations
1492 let list_result = self.execute_async(async {
1493 let path = object_store::path::Path::from(directory);
1494 Ok(self
1495 .object_store
1496 .list(Some(&path))
1497 .collect::<Vec<_>>()
1498 .await)
1499 })?;
1500
1501 for result in list_result {
1502 match result {
1503 Ok(object) => {
1504 let path_str = object.location.to_string();
1505 if path_str.ends_with(".parquet") {
1506 if let Some(interval) = parse_filename_timestamps(&path_str) {
1507 intervals.push(interval);
1508 }
1509 }
1510 }
1511 Err(_) => {
1512 // Directory doesn't exist or is empty, which is fine
1513 break;
1514 }
1515 }
1516 }
1517
1518 intervals.sort_by_key(|&(start, _)| start);
1519
1520 Ok(intervals)
1521 }
1522
1523 /// Create a directory path for a data type and instrument ID
1524 fn make_path(&self, type_name: &str, instrument_id: Option<String>) -> anyhow::Result<String> {
1525 let mut path = if self.base_path.is_empty() {
1526 format!("data/{type_name}")
1527 } else {
1528 // Remove trailing slash from base_path to avoid double slashes
1529 let base_path = self.base_path.trim_end_matches('/');
1530 format!("{base_path}/data/{type_name}")
1531 };
1532
1533 if let Some(id) = instrument_id {
1534 path = format!("{}/{}", path, urisafe_instrument_id(&id));
1535 }
1536
1537 Ok(path)
1538 }
1539
1540 /// Helper method to rename a parquet file by moving it via object store operations
1541 fn rename_parquet_file(
1542 &self,
1543 directory: &str,
1544 old_start: u64,
1545 old_end: u64,
1546 new_start: u64,
1547 new_end: u64,
1548 ) -> anyhow::Result<()> {
1549 let old_filename =
1550 timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
1551 let old_path = format!("{directory}/{old_filename}");
1552 let old_object_path = self.to_object_path(&old_path);
1553
1554 let new_filename =
1555 timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
1556 let new_path = format!("{directory}/{new_filename}");
1557 let new_object_path = self.to_object_path(&new_path);
1558
1559 self.move_file(&old_object_path, &new_object_path)
1560 }
1561
1562 /// Helper method to convert a path string to `ObjectPath`, handling `base_path`
1563 fn to_object_path(&self, path: &str) -> ObjectPath {
1564 if self.base_path.is_empty() {
1565 return ObjectPath::from(path);
1566 }
1567
1568 let base = self.base_path.trim_end_matches('/');
1569
1570 // Remove the catalog base prefix if present
1571 let without_base = path
1572 .strip_prefix(&format!("{base}/"))
1573 .or_else(|| path.strip_prefix(base))
1574 .unwrap_or(path);
1575
1576 ObjectPath::from(without_base)
1577 }
1578
1579 /// Helper method to move a file using object store rename operation
1580 fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
1581 self.execute_async(async {
1582 self.object_store
1583 .rename(old_path, new_path)
1584 .await
1585 .map_err(anyhow::Error::from)
1586 })
1587 }
1588
1589 /// Helper method to execute async operations with a runtime
1590 fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
1591 where
1592 F: std::future::Future<Output = anyhow::Result<R>>,
1593 {
1594 let rt = nautilus_common::runtime::get_runtime();
1595 rt.block_on(future)
1596 }
1597}
1598
1599/// Trait for providing catalog path prefixes for different data types.
1600///
1601/// This trait enables type-safe organization of data within the catalog by providing
1602/// a standardized way to determine the directory structure for each data type.
1603/// Each data type maps to a specific subdirectory within the catalog's data folder.
1604///
1605/// # Implementation
1606///
1607/// Types implementing this trait should return a static string that represents
1608/// the directory name where data of that type should be stored.
1609///
1610/// # Examples
1611///
1612/// ```rust
1613/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
1614/// use nautilus_model::data::QuoteTick;
1615///
1616/// assert_eq!(QuoteTick::path_prefix(), "quotes");
1617/// ```
1618pub trait CatalogPathPrefix {
1619 /// Returns the path prefix (directory name) for this data type.
1620 ///
1621 /// # Returns
1622 ///
1623 /// A static string representing the directory name where this data type is stored.
1624 fn path_prefix() -> &'static str;
1625}
1626
1627/// Macro for implementing [`CatalogPathPrefix`] for data types.
1628///
1629/// This macro provides a convenient way to implement the trait for multiple types
1630/// with their corresponding path prefixes.
1631///
1632/// # Parameters
1633///
1634/// - `$type`: The data type to implement the trait for.
1635/// - `$path`: The path prefix string for that type.
1636macro_rules! impl_catalog_path_prefix {
1637 ($type:ty, $path:expr) => {
1638 impl CatalogPathPrefix for $type {
1639 fn path_prefix() -> &'static str {
1640 $path
1641 }
1642 }
1643 };
1644}
1645
1646// Standard implementations for financial data types
1647impl_catalog_path_prefix!(QuoteTick, "quotes");
1648impl_catalog_path_prefix!(TradeTick, "trades");
1649impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
1650impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
1651impl_catalog_path_prefix!(Bar, "bars");
1652impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
1653impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
1654impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
1655
1656////////////////////////////////////////////////////////////////////////////////
1657// Helper functions for filename operations
1658////////////////////////////////////////////////////////////////////////////////
1659
1660/// Converts timestamps to a filename using ISO 8601 format.
1661///
1662/// This function converts two Unix nanosecond timestamps to a filename that uses
1663/// ISO 8601 format with filesystem-safe characters. The format matches the Python
1664/// implementation for consistency.
1665///
1666/// # Parameters
1667///
1668/// - `timestamp_1`: First timestamp in Unix nanoseconds.
1669/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
1670///
1671/// # Returns
1672///
1673/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
1674///
1675/// # Examples
1676///
1677/// ```rust
1678/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
1679/// # use nautilus_core::UnixNanos;
1680/// let filename = timestamps_to_filename(
1681/// UnixNanos::from(1609459200000000000),
1682/// UnixNanos::from(1609545600000000000)
1683/// );
1684/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
1685/// ```
1686fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
1687 let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
1688 let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
1689
1690 format!("{datetime_1}_{datetime_2}.parquet")
1691}
1692
1693/// Converts an ISO 8601 timestamp to a filesystem-safe format.
1694///
1695/// This function replaces colons and dots with hyphens to make the timestamp
1696/// safe for use in filenames across different filesystems.
1697///
1698/// # Parameters
1699///
1700/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1701///
1702/// # Returns
1703///
1704/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1705///
1706/// # Examples
1707///
1708/// ```rust
1709/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
1710/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
1711/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
1712/// ```
1713fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
1714 iso_timestamp.replace([':', '.'], "-")
1715}
1716
1717/// Converts a filesystem-safe timestamp back to ISO 8601 format.
1718///
1719/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
1720/// converting filesystem-safe timestamps back to standard ISO 8601 format.
1721///
1722/// # Parameters
1723///
1724/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1725///
1726/// # Returns
1727///
1728/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1729///
1730/// # Examples
1731///
1732/// ```rust
1733/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
1734/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
1735/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
1736/// ```
1737fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
1738 let (date_part, time_part) = file_timestamp
1739 .split_once('T')
1740 .unwrap_or((file_timestamp, ""));
1741 let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
1742
1743 // Find the last hyphen to separate nanoseconds
1744 if let Some(last_hyphen_idx) = time_part.rfind('-') {
1745 let time_with_dot_for_nanos = format!(
1746 "{}.{}",
1747 &time_part[..last_hyphen_idx],
1748 &time_part[last_hyphen_idx + 1..]
1749 );
1750 let final_time_part = time_with_dot_for_nanos.replace('-', ":");
1751 format!("{date_part}T{final_time_part}Z")
1752 } else {
1753 // Fallback if no nanoseconds part found
1754 let final_time_part = time_part.replace('-', ":");
1755 format!("{date_part}T{final_time_part}Z")
1756 }
1757}
1758
1759/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
1760///
1761/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
1762/// It's used to convert parsed timestamps back to the internal representation.
1763///
1764/// # Parameters
1765///
1766/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1767///
1768/// # Returns
1769///
1770/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
1771///
1772/// # Examples
1773///
1774/// ```rust
1775/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
1776/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
1777/// assert_eq!(nanos, 1609459200000000000);
1778/// ```
1779fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
1780 Ok(iso8601_to_unix_nanos(iso_timestamp.to_string())?.into())
1781}
1782
1783////////////////////////////////////////////////////////////////////////////////
1784// Helper functions for interval operations
1785////////////////////////////////////////////////////////////////////////////////
1786
1787/// Converts an instrument ID to a URI-safe format by removing forward slashes.
1788///
1789/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
1790/// suitable for use in file paths. This function removes these characters to
1791/// create a safe directory name.
1792///
1793/// # Parameters
1794///
1795/// - `instrument_id`: The original instrument ID string.
1796///
1797/// # Returns
1798///
1799/// A URI-safe version of the instrument ID with forward slashes removed.
1800///
1801/// # Examples
1802///
1803/// ```rust
1804/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
1805/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
1806/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
1807/// ```
1808fn urisafe_instrument_id(instrument_id: &str) -> String {
1809 instrument_id.replace('/', "")
1810}
1811
1812/// Checks if a filename's timestamp range intersects with a query interval.
1813///
1814/// This function determines whether a Parquet file (identified by its timestamp-based
1815/// filename) contains data that falls within the specified query time range.
1816///
1817/// # Parameters
1818///
1819/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
1820/// - `start`: Optional start timestamp for the query range.
1821/// - `end`: Optional end timestamp for the query range.
1822///
1823/// # Returns
1824///
1825/// Returns `true` if the file's time range intersects with the query range,
1826/// `false` otherwise. Returns `true` if the filename cannot be parsed.
1827///
1828/// # Examples
1829///
1830/// ```rust
1831/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
1832/// // Example with ISO format filenames
1833/// assert!(query_intersects_filename(
1834/// "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
1835/// Some(1609459200000000000),
1836/// Some(1609545600000000000)
1837/// ));
1838/// ```
1839fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
1840 if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
1841 (start.is_none() || start.unwrap() <= file_end)
1842 && (end.is_none() || file_start <= end.unwrap())
1843 } else {
1844 true
1845 }
1846}
1847
1848/// Parses timestamps from a Parquet filename.
1849///
1850/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
1851/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
1852///
1853/// # Parameters
1854///
1855/// - `filename`: The filename to parse (can be a full path).
1856///
1857/// # Returns
1858///
1859/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
1860/// `None` otherwise.
1861///
1862/// # Examples
1863///
1864/// ```rust
1865/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
1866/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
1867/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
1868/// ```
1869fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
1870 let path = Path::new(filename);
1871 let base_name = path.file_name()?.to_str()?;
1872 let base_filename = base_name.strip_suffix(".parquet")?;
1873 let (first_part, second_part) = base_filename.split_once('_')?;
1874
1875 let first_iso = file_timestamp_to_iso_timestamp(first_part);
1876 let second_iso = file_timestamp_to_iso_timestamp(second_part);
1877
1878 let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
1879 let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
1880
1881 Some((first_ts, second_ts))
1882}
1883
1884/// Checks if a list of closed integer intervals are all mutually disjoint.
1885///
1886/// Two intervals are disjoint if they do not overlap. This function validates that
1887/// all intervals in the list are non-overlapping, which is a requirement for
1888/// maintaining data integrity in the catalog.
1889///
1890/// # Parameters
1891///
1892/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1893///
1894/// # Returns
1895///
1896/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
1897/// Returns `true` for empty lists or lists with a single interval.
1898///
1899/// # Examples
1900///
1901/// ```rust
1902/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
1903/// // Disjoint intervals
1904/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
1905///
1906/// // Overlapping intervals
1907/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
1908/// ```
1909fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
1910 let n = intervals.len();
1911
1912 if n <= 1 {
1913 return true;
1914 }
1915
1916 let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
1917 sorted_intervals.sort_by_key(|&(start, _)| start);
1918
1919 for i in 0..(n - 1) {
1920 let (_, end1) = sorted_intervals[i];
1921 let (start2, _) = sorted_intervals[i + 1];
1922
1923 if end1 >= start2 {
1924 return false;
1925 }
1926 }
1927
1928 true
1929}
1930
1931/// Checks if intervals are contiguous (adjacent with no gaps).
1932///
1933/// Intervals are contiguous if, when sorted by start time, each interval's start
1934/// timestamp is exactly one more than the previous interval's end timestamp.
1935/// This ensures complete coverage of a time range with no gaps.
1936///
1937/// # Parameters
1938///
1939/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1940///
1941/// # Returns
1942///
1943/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
1944/// Returns `true` for empty lists or lists with a single interval.
1945///
1946/// # Examples
1947///
1948/// ```rust
1949/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
1950/// // Contiguous intervals
1951/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
1952///
1953/// // Non-contiguous intervals (gap between 5 and 8)
1954/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
1955/// ```
1956fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
1957 let n = intervals.len();
1958 if n <= 1 {
1959 return true;
1960 }
1961
1962 let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
1963 sorted_intervals.sort_by_key(|&(start, _)| start);
1964
1965 for i in 0..(n - 1) {
1966 let (_, end1) = sorted_intervals[i];
1967 let (start2, _) = sorted_intervals[i + 1];
1968
1969 if end1 + 1 != start2 {
1970 return false;
1971 }
1972 }
1973
1974 true
1975}
1976
1977/// Finds the parts of a query interval that are not covered by existing data intervals.
1978///
1979/// This function calculates the "gaps" in data coverage by comparing a requested
1980/// time range against the intervals covered by existing data files. It's used to
1981/// determine what data needs to be fetched or backfilled.
1982///
1983/// # Parameters
1984///
1985/// - `start`: Start timestamp of the query interval (inclusive).
1986/// - `end`: End timestamp of the query interval (inclusive).
1987/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
1988///
1989/// # Returns
1990///
1991/// Returns a vector of (start, end) tuples representing the gaps in coverage.
1992/// Returns an empty vector if the query range is invalid or fully covered.
1993///
1994/// # Examples
1995///
1996/// ```rust
1997/// # use nautilus_persistence::backend::catalog::query_interval_diff;
1998/// // Query 1-100, have data for 10-30 and 60-80
1999/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
2000/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
2001/// ```
2002fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
2003 if start > end {
2004 return Vec::new();
2005 }
2006
2007 let interval_set = get_interval_set(closed_intervals);
2008 let query_range = (Bound::Included(start), Bound::Included(end));
2009 let query_diff = interval_set.get_interval_difference(&query_range);
2010 let mut result: Vec<(u64, u64)> = Vec::new();
2011
2012 for interval in query_diff {
2013 if let Some(tuple) = interval_to_tuple(interval, start, end) {
2014 result.push(tuple);
2015 }
2016 }
2017
2018 result
2019}
2020
2021/// Creates an interval tree from closed integer intervals.
2022///
2023/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
2024/// for use with the interval tree data structure, which is used for efficient
2025/// interval operations and gap detection.
2026///
2027/// # Parameters
2028///
2029/// - `intervals`: A slice of closed intervals as (start, end) tuples.
2030///
2031/// # Returns
2032///
2033/// Returns an [`IntervalTree`] containing the converted intervals.
2034///
2035/// # Notes
2036///
2037/// - Invalid intervals (where start > end) are skipped
2038/// - Uses saturating addition to prevent overflow when converting to half-open intervals
2039fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
2040 let mut tree = IntervalTree::default();
2041
2042 if intervals.is_empty() {
2043 return tree;
2044 }
2045
2046 for &(start, end) in intervals {
2047 if start > end {
2048 continue;
2049 }
2050
2051 tree.insert((
2052 Bound::Included(start),
2053 Bound::Excluded(end.saturating_add(1)),
2054 ));
2055 }
2056
2057 tree
2058}
2059
2060/// Converts an interval tree result back to a closed interval tuple.
2061///
2062/// This helper function converts the bounded interval representation used by
2063/// the interval tree back into the (start, end) tuple format used throughout
2064/// the catalog.
2065///
2066/// # Parameters
2067///
2068/// - `interval`: The bounded interval from the interval tree.
2069/// - `query_start`: The start of the original query range.
2070/// - `query_end`: The end of the original query range.
2071///
2072/// # Returns
2073///
2074/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
2075fn interval_to_tuple(
2076 interval: (Bound<&u64>, Bound<&u64>),
2077 query_start: u64,
2078 query_end: u64,
2079) -> Option<(u64, u64)> {
2080 let (bound_start, bound_end) = interval;
2081
2082 let start = match bound_start {
2083 Bound::Included(val) => *val,
2084 Bound::Excluded(val) => val.saturating_add(1),
2085 Bound::Unbounded => query_start,
2086 };
2087
2088 let end = match bound_end {
2089 Bound::Included(val) => *val,
2090 Bound::Excluded(val) => {
2091 if *val == 0 {
2092 return None; // Empty interval
2093 }
2094 val - 1
2095 }
2096 Bound::Unbounded => query_end,
2097 };
2098
2099 if start <= end {
2100 Some((start, end))
2101 } else {
2102 None
2103 }
2104}
2105
2106////////////////////////////////////////////////////////////////////////////////
2107// Tests
2108////////////////////////////////////////////////////////////////////////////////
2109
2110#[cfg(test)]
2111mod tests {
2112 use nautilus_model::data::HasTsInit;
2113
2114 use super::*;
2115
2116 #[derive(Clone)]
2117 struct DummyData(u64);
2118
2119 impl HasTsInit for DummyData {
2120 fn ts_init(&self) -> UnixNanos {
2121 UnixNanos::from(self.0)
2122 }
2123 }
2124
2125 #[test]
2126 fn test_check_ascending_timestamps_error() {
2127 let data = vec![DummyData(2), DummyData(1)];
2128 let result = super::ParquetDataCatalog::check_ascending_timestamps(&data, "dummy");
2129 assert!(result.is_err());
2130 }
2131
2132 #[test]
2133 fn test_to_object_path_trailing_slash() {
2134 // Create catalog with base path that contains a trailing slash
2135 let tmp = tempfile::tempdir().unwrap();
2136 let base_dir = tmp.path().join("catalog");
2137 std::fs::create_dir_all(&base_dir).unwrap();
2138
2139 let catalog = ParquetDataCatalog::new(base_dir.clone(), None, None, None, None);
2140
2141 // Build a sample path under the catalog base
2142 let sample_path = format!(
2143 "{}/data/quotes/XYZ/2021-01-01T00-00-00-000000000Z_2021-01-01T00-00-01-000000000Z.parquet",
2144 base_dir.to_string_lossy()
2145 );
2146
2147 let object_path = catalog.to_object_path(&sample_path);
2148
2149 assert!(
2150 !object_path
2151 .as_ref()
2152 .starts_with(base_dir.to_string_lossy().as_ref())
2153 );
2154 }
2155
2156 #[test]
2157 fn test_is_remote_uri() {
2158 // Test S3 URIs
2159 let s3_catalog =
2160 ParquetDataCatalog::from_uri("s3://bucket/path", None, None, None, None).unwrap();
2161 assert!(s3_catalog.is_remote_uri());
2162
2163 // Test GCS URIs
2164 let gcs_catalog =
2165 ParquetDataCatalog::from_uri("gs://bucket/path", None, None, None, None).unwrap();
2166 assert!(gcs_catalog.is_remote_uri());
2167
2168 let gcs2_catalog =
2169 ParquetDataCatalog::from_uri("gcs://bucket/path", None, None, None, None).unwrap();
2170 assert!(gcs2_catalog.is_remote_uri());
2171
2172 // Test Azure URIs
2173 let azure_catalog =
2174 ParquetDataCatalog::from_uri("azure://account/container/path", None, None, None, None)
2175 .unwrap();
2176 assert!(azure_catalog.is_remote_uri());
2177
2178 let abfs_catalog = ParquetDataCatalog::from_uri(
2179 "abfs://container@account.dfs.core.windows.net/path",
2180 None,
2181 None,
2182 None,
2183 None,
2184 )
2185 .unwrap();
2186 assert!(abfs_catalog.is_remote_uri());
2187
2188 // Test HTTP URIs
2189 let http_catalog =
2190 ParquetDataCatalog::from_uri("http://example.com/path", None, None, None, None)
2191 .unwrap();
2192 assert!(http_catalog.is_remote_uri());
2193
2194 let https_catalog =
2195 ParquetDataCatalog::from_uri("https://example.com/path", None, None, None, None)
2196 .unwrap();
2197 assert!(https_catalog.is_remote_uri());
2198
2199 // Test local paths (should not be remote)
2200 let tmp = tempfile::tempdir().unwrap();
2201 let local_catalog =
2202 ParquetDataCatalog::new(tmp.path().to_path_buf(), None, None, None, None);
2203 assert!(!local_catalog.is_remote_uri());
2204
2205 let tmp_file = tempfile::tempdir().unwrap();
2206 let file_uri = format!("file://{}", tmp_file.path().display());
2207 let file_catalog = ParquetDataCatalog::from_uri(&file_uri, None, None, None, None).unwrap();
2208 assert!(!file_catalog.is_remote_uri());
2209 }
2210
2211 #[test]
2212 fn test_reconstruct_full_uri() {
2213 // Test S3 URI reconstruction
2214 let s3_catalog =
2215 ParquetDataCatalog::from_uri("s3://bucket/base/path", None, None, None, None).unwrap();
2216 let reconstructed = s3_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2217 assert_eq!(reconstructed, "s3://bucket/data/quotes/file.parquet");
2218
2219 // Test GCS URI reconstruction
2220 let gcs_catalog =
2221 ParquetDataCatalog::from_uri("gs://bucket/base/path", None, None, None, None).unwrap();
2222 let reconstructed = gcs_catalog.reconstruct_full_uri("data/trades/file.parquet");
2223 assert_eq!(reconstructed, "gs://bucket/data/trades/file.parquet");
2224
2225 // Test Azure URI reconstruction
2226 let azure_catalog =
2227 ParquetDataCatalog::from_uri("azure://account/container/path", None, None, None, None)
2228 .unwrap();
2229 let reconstructed = azure_catalog.reconstruct_full_uri("data/bars/file.parquet");
2230 assert_eq!(reconstructed, "azure://account/data/bars/file.parquet");
2231
2232 // Test HTTP URI reconstruction
2233 let http_catalog =
2234 ParquetDataCatalog::from_uri("https://example.com/base/path", None, None, None, None)
2235 .unwrap();
2236 let reconstructed = http_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2237 assert_eq!(
2238 reconstructed,
2239 "https://example.com/data/quotes/file.parquet"
2240 );
2241
2242 // Test local path (should return path as-is)
2243 let tmp = tempfile::tempdir().unwrap();
2244 let local_catalog =
2245 ParquetDataCatalog::new(tmp.path().to_path_buf(), None, None, None, None);
2246 let reconstructed = local_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2247 assert_eq!(reconstructed, "data/quotes/file.parquet");
2248 }
2249}