nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a comprehensive data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.)
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │   └── INSTRUMENT_ID/
37//! │       └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │   └── INSTRUMENT_ID/
40//! │       └── start_ts-end_ts.parquet
41//! └── bars/
42//!     └── INSTRUMENT_ID/
43//!         └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//!     PathBuf::from("/path/to/data"),
55//!     None,        // storage_options
56//!     Some(5000),  // batch_size
57//!     None,        // compression (defaults to SNAPPY)
58//!     None,        // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66    fmt::Debug,
67    ops::Bound,
68    path::{Path, PathBuf},
69    sync::Arc,
70};
71
72use datafusion::arrow::record_batch::RecordBatch;
73use futures::StreamExt;
74use heck::ToSnakeCase;
75use itertools::Itertools;
76use log::info;
77use nautilus_core::{
78    UnixNanos,
79    datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
80};
81use nautilus_model::data::{
82    Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
83    QuoteTick, TradeTick, close::InstrumentClose,
84};
85use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
86use object_store::{ObjectStore, path::Path as ObjectPath};
87use serde::Serialize;
88use unbounded_interval_tree::interval_tree::IntervalTree;
89
90use super::session::{self, DataBackendSession, QueryResult, build_query};
91use crate::parquet::{
92    combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
93    write_batches_to_object_store,
94};
95
96/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
97///
98/// The `ParquetDataCatalog` provides a comprehensive solution for managing large volumes of financial
99/// market data with efficient storage, querying, and consolidation capabilities. It supports various
100/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
101///
102/// # Features
103///
104/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression
105/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate
106/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance
107/// - **Data Validation**: Ensures timestamp ordering and interval consistency
108/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed
109/// - **Type Safety**: Strongly typed data handling with compile-time guarantees
110///
111/// # Data Organization
112///
113/// Data is organized hierarchically by data type and instrument:
114/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`
115/// - Files are named with their timestamp ranges for efficient range queries
116/// - Intervals are validated to be disjoint to prevent data overlap
117///
118/// # Performance Considerations
119///
120/// - **Batch Size**: Controls memory usage during data processing
121/// - **Compression**: SNAPPY compression provides good balance of speed and size
122/// - **Row Group Size**: Affects query performance and memory usage
123/// - **File Consolidation**: Reduces the number of files for better query performance
124pub struct ParquetDataCatalog {
125    /// The base path for data storage within the object store.
126    base_path: String,
127    /// The original URI provided when creating the catalog.
128    original_uri: String,
129    /// The object store backend for data persistence.
130    object_store: Arc<dyn ObjectStore>,
131    /// The DataFusion session for query execution.
132    session: DataBackendSession,
133    /// The number of records to process in each batch.
134    batch_size: usize,
135    /// The compression algorithm used for Parquet files.
136    compression: parquet::basic::Compression,
137    /// The maximum number of rows in each Parquet row group.
138    max_row_group_size: usize,
139}
140
141impl Debug for ParquetDataCatalog {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct(stringify!(ParquetDataCatalog))
144            .field("base_path", &self.base_path)
145            .finish()
146    }
147}
148
149impl ParquetDataCatalog {
150    /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
151    ///
152    /// This is a convenience constructor that converts a local path to a URI format
153    /// and delegates to [`Self::from_uri`].
154    ///
155    /// # Parameters
156    ///
157    /// - `base_path`: The base directory path for data storage.
158    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
159    /// - `batch_size`: Number of records to process in each batch (default: 5000).
160    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
161    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
162    ///
163    /// # Panics
164    ///
165    /// Panics if the path cannot be converted to a valid URI or if the object store
166    /// cannot be created from the path.
167    ///
168    /// # Examples
169    ///
170    /// ```rust,no_run
171    /// use std::path::PathBuf;
172    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
173    ///
174    /// let catalog = ParquetDataCatalog::new(
175    ///     PathBuf::from("/tmp/posei_trader"),
176    ///     None,        // no storage options
177    ///     Some(1000),  // smaller batch size
178    ///     None,        // default compression
179    ///     None,        // default row group size
180    /// );
181    /// ```
182    #[must_use]
183    pub fn new(
184        base_path: PathBuf,
185        storage_options: Option<std::collections::HashMap<String, String>>,
186        batch_size: Option<usize>,
187        compression: Option<parquet::basic::Compression>,
188        max_row_group_size: Option<usize>,
189    ) -> Self {
190        let path_str = base_path.to_string_lossy().to_string();
191        Self::from_uri(
192            &path_str,
193            storage_options,
194            batch_size,
195            compression,
196            max_row_group_size,
197        )
198        .expect("Failed to create catalog from path")
199    }
200
201    /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
202    ///
203    /// Supports various URI schemes including local file paths and multiple cloud storage backends
204    /// supported by the `object_store` crate.
205    ///
206    /// # Supported URI Schemes
207    ///
208    /// - **AWS S3**: `s3://bucket/path`
209    /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`
210    /// - **Azure Blob Storage**: `azure://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`
211    /// - **HTTP/WebDAV**: `http://` or `https://`
212    /// - **Local files**: `file://path` or plain paths
213    ///
214    /// # Parameters
215    ///
216    /// - `uri`: The URI for the data storage location.
217    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
218    ///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
219    ///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
220    ///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
221    /// - `batch_size`: Number of records to process in each batch (default: 5000).
222    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
223    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if:
228    /// - The URI format is invalid or unsupported.
229    /// - The object store cannot be created or accessed.
230    /// - Authentication fails for cloud storage backends.
231    ///
232    /// # Examples
233    ///
234    /// ```rust,no_run
235    /// use std::collections::HashMap;
236    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
237    ///
238    /// // Local filesystem
239    /// let local_catalog = ParquetDataCatalog::from_uri(
240    ///     "/tmp/posei_trader",
241    ///     None, None, None, None
242    /// )?;
243    ///
244    /// // S3 bucket
245    /// let s3_catalog = ParquetDataCatalog::from_uri(
246    ///     "s3://my-bucket/nautilus-data",
247    ///     None, None, None, None
248    /// )?;
249    ///
250    /// // Google Cloud Storage
251    /// let gcs_catalog = ParquetDataCatalog::from_uri(
252    ///     "gs://my-bucket/nautilus-data",
253    ///     None, None, None, None
254    /// )?;
255    ///
256    /// // Azure Blob Storage
257    /// let azure_catalog = ParquetDataCatalog::from_uri(
258    ///     "azure://account/container/nautilus-data",
259    ///     None, None, None, None
260    /// )?;
261    ///
262    /// // S3 with custom endpoint and credentials
263    /// let mut storage_options = HashMap::new();
264    /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
265    /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
266    /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
267    ///
268    /// let s3_catalog = ParquetDataCatalog::from_uri(
269    ///     "s3://my-bucket/nautilus-data",
270    ///     Some(storage_options),
271    ///     None, None, None,
272    /// )?;
273    /// # Ok::<(), anyhow::Error>(())
274    /// ```
275    pub fn from_uri(
276        uri: &str,
277        storage_options: Option<std::collections::HashMap<String, String>>,
278        batch_size: Option<usize>,
279        compression: Option<parquet::basic::Compression>,
280        max_row_group_size: Option<usize>,
281    ) -> anyhow::Result<Self> {
282        let batch_size = batch_size.unwrap_or(5000);
283        let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
284        let max_row_group_size = max_row_group_size.unwrap_or(5000);
285
286        let (object_store, base_path, original_uri) =
287            crate::parquet::create_object_store_from_path(uri, storage_options)?;
288
289        Ok(Self {
290            base_path,
291            original_uri,
292            object_store,
293            session: session::DataBackendSession::new(batch_size),
294            batch_size,
295            compression,
296            max_row_group_size,
297        })
298    }
299
300    /// Writes mixed data types to the catalog by separating them into type-specific collections.
301    ///
302    /// This method takes a heterogeneous collection of market data and separates it by type,
303    /// then writes each type to its appropriate location in the catalog. This is useful when
304    /// processing mixed data streams or bulk data imports.
305    ///
306    /// # Parameters
307    ///
308    /// - `data`: A vector of mixed [`Data`] enum variants.
309    /// - `start`: Optional start timestamp to override the data's natural range.
310    /// - `end`: Optional end timestamp to override the data's natural range.
311    ///
312    /// # Notes
313    ///
314    /// - Data is automatically sorted by type before writing.
315    /// - Each data type is written to its own directory structure.
316    /// - Instrument data handling is not yet implemented (TODO).
317    ///
318    /// # Examples
319    ///
320    /// ```rust,no_run
321    /// use nautilus_model::data::Data;
322    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
323    ///
324    /// let catalog = ParquetDataCatalog::new(/* ... */);
325    /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
326    ///
327    /// catalog.write_data_enum(mixed_data, None, None)?;
328    /// ```
329    pub fn write_data_enum(
330        &self,
331        data: Vec<Data>,
332        start: Option<UnixNanos>,
333        end: Option<UnixNanos>,
334    ) -> anyhow::Result<()> {
335        let mut deltas: Vec<OrderBookDelta> = Vec::new();
336        let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
337        let mut quotes: Vec<QuoteTick> = Vec::new();
338        let mut trades: Vec<TradeTick> = Vec::new();
339        let mut bars: Vec<Bar> = Vec::new();
340        let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
341        let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
342        let mut closes: Vec<InstrumentClose> = Vec::new();
343
344        for d in data.iter().cloned() {
345            match d {
346                Data::Deltas(_) => continue,
347                Data::Delta(d) => {
348                    deltas.push(d);
349                }
350                Data::Depth10(d) => {
351                    depth10s.push(*d);
352                }
353                Data::Quote(d) => {
354                    quotes.push(d);
355                }
356                Data::Trade(d) => {
357                    trades.push(d);
358                }
359                Data::Bar(d) => {
360                    bars.push(d);
361                }
362                Data::MarkPriceUpdate(p) => {
363                    mark_prices.push(p);
364                }
365                Data::IndexPriceUpdate(p) => {
366                    index_prices.push(p);
367                }
368                Data::InstrumentClose(c) => {
369                    closes.push(c);
370                }
371            }
372        }
373
374        // TODO: need to handle instruments here
375
376        self.write_to_parquet(deltas, start, end)?;
377        self.write_to_parquet(depth10s, start, end)?;
378        self.write_to_parquet(quotes, start, end)?;
379        self.write_to_parquet(trades, start, end)?;
380        self.write_to_parquet(bars, start, end)?;
381        self.write_to_parquet(mark_prices, start, end)?;
382        self.write_to_parquet(index_prices, start, end)?;
383        self.write_to_parquet(closes, start, end)?;
384
385        Ok(())
386    }
387
388    /// Writes typed data to a Parquet file in the catalog.
389    ///
390    /// This is the core method for persisting market data to the catalog. It handles data
391    /// validation, batching, compression, and ensures proper file organization with
392    /// timestamp-based naming.
393    ///
394    /// # Type Parameters
395    ///
396    /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
397    ///
398    /// # Parameters
399    ///
400    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
401    /// - `start`: Optional start timestamp to override the natural data range.
402    /// - `end`: Optional end timestamp to override the natural data range.
403    ///
404    /// # Returns
405    ///
406    /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
407    ///
408    /// # Errors
409    ///
410    /// This function will return an error if:
411    /// - Data serialization to Arrow record batches fails
412    /// - Object store write operations fail
413    /// - File path construction fails
414    /// - Timestamp interval validation fails after writing
415    ///
416    /// # Panics
417    ///
418    /// Panics if:
419    /// - Data timestamps are not in ascending order
420    /// - Record batches are empty after conversion
421    /// - Required metadata is missing from the schema
422    ///
423    /// # Examples
424    ///
425    /// ```rust,no_run
426    /// use nautilus_model::data::QuoteTick;
427    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
428    ///
429    /// let catalog = ParquetDataCatalog::new(/* ... */);
430    /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
431    ///
432    /// let path = catalog.write_to_parquet(quotes, None, None)?;
433    /// println!("Data written to: {:?}", path);
434    /// # Ok::<(), anyhow::Error>(())
435    /// ```
436    pub fn write_to_parquet<T>(
437        &self,
438        data: Vec<T>,
439        start: Option<UnixNanos>,
440        end: Option<UnixNanos>,
441    ) -> anyhow::Result<PathBuf>
442    where
443        T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
444    {
445        if data.is_empty() {
446            return Ok(PathBuf::new());
447        }
448
449        let type_name = std::any::type_name::<T>().to_snake_case();
450        Self::check_ascending_timestamps(&data, &type_name)?;
451
452        let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
453        let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
454
455        let batches = self.data_to_record_batches(data)?;
456        let schema = batches.first().expect("Batches are empty.").schema();
457        let instrument_id = schema.metadata.get("instrument_id").cloned();
458
459        let directory = self.make_path(T::path_prefix(), instrument_id)?;
460        let filename = timestamps_to_filename(start_ts, end_ts);
461        let path = PathBuf::from(format!("{directory}/{filename}"));
462
463        // Write all batches to parquet file
464        info!(
465            "Writing {} batches of {type_name} data to {path:?}",
466            batches.len()
467        );
468
469        // Convert path to object store path
470        let object_path = self.to_object_path(&path.to_string_lossy());
471
472        self.execute_async(async {
473            write_batches_to_object_store(
474                &batches,
475                self.object_store.clone(),
476                &object_path,
477                Some(self.compression),
478                Some(self.max_row_group_size),
479            )
480            .await
481        })?;
482        let intervals = self.get_directory_intervals(&directory)?;
483
484        if !are_intervals_disjoint(&intervals) {
485            anyhow::bail!("Intervals are not disjoint after writing a new file");
486        }
487
488        Ok(path)
489    }
490
491    /// Writes typed data to a JSON file in the catalog.
492    ///
493    /// This method provides an alternative to Parquet format for data export and debugging.
494    /// JSON files are human-readable but less efficient for large datasets.
495    ///
496    /// # Type Parameters
497    ///
498    /// - `T`: The data type to write, must implement serialization and cataloging traits.
499    ///
500    /// # Parameters
501    ///
502    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
503    /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
504    /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
505    ///
506    /// # Returns
507    ///
508    /// Returns the [`PathBuf`] of the created JSON file.
509    ///
510    /// # Errors
511    ///
512    /// This function will return an error if:
513    /// - JSON serialization fails
514    /// - Object store write operations fail
515    /// - File path construction fails
516    ///
517    /// # Panics
518    ///
519    /// Panics if data timestamps are not in ascending order.
520    ///
521    /// # Examples
522    ///
523    /// ```rust,no_run
524    /// use std::path::PathBuf;
525    /// use nautilus_model::data::TradeTick;
526    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
527    ///
528    /// let catalog = ParquetDataCatalog::new(/* ... */);
529    /// let trades: Vec<TradeTick> = vec![/* trade data */];
530    ///
531    /// let path = catalog.write_to_json(
532    ///     trades,
533    ///     Some(PathBuf::from("/custom/path")),
534    ///     true  // write metadata
535    /// )?;
536    /// # Ok::<(), anyhow::Error>(())
537    /// ```
538    pub fn write_to_json<T>(
539        &self,
540        data: Vec<T>,
541        path: Option<PathBuf>,
542        write_metadata: bool,
543    ) -> anyhow::Result<PathBuf>
544    where
545        T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
546    {
547        if data.is_empty() {
548            return Ok(PathBuf::new());
549        }
550
551        let type_name = std::any::type_name::<T>().to_snake_case();
552        Self::check_ascending_timestamps(&data, &type_name)?;
553
554        let start_ts = data.first().unwrap().ts_init();
555        let end_ts = data.last().unwrap().ts_init();
556
557        let directory =
558            path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
559        let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
560        let json_path = directory.join(&filename);
561
562        info!(
563            "Writing {} records of {type_name} data to {json_path:?}",
564            data.len()
565        );
566
567        if write_metadata {
568            let metadata = T::chunk_metadata(&data);
569            let metadata_path = json_path.with_extension("metadata.json");
570            info!("Writing metadata to {metadata_path:?}");
571
572            // Use object store for metadata file
573            let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
574            let metadata_json = serde_json::to_vec_pretty(&metadata)?;
575            self.execute_async(async {
576                self.object_store
577                    .put(&metadata_object_path, metadata_json.into())
578                    .await
579                    .map_err(anyhow::Error::from)
580            })?;
581        }
582
583        // Use object store for main JSON file
584        let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
585        let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
586        self.execute_async(async {
587            self.object_store
588                .put(&json_object_path, json_data.into())
589                .await
590                .map_err(anyhow::Error::from)
591        })?;
592
593        Ok(json_path)
594    }
595
596    /// Validates that data timestamps are in ascending order.
597    ///
598    /// # Parameters
599    ///
600    /// - `data`: Slice of data records to validate.
601    /// - `type_name`: Name of the data type for error messages.
602    ///
603    /// # Panics
604    ///
605    /// Panics if any timestamp is less than the previous timestamp.
606    fn check_ascending_timestamps<T: HasTsInit>(data: &[T], type_name: &str) -> anyhow::Result<()> {
607        if !data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()) {
608            anyhow::bail!("{type_name} timestamps must be in ascending order");
609        }
610
611        Ok(())
612    }
613
614    /// Converts data into Arrow record batches for Parquet serialization.
615    ///
616    /// This method chunks the data according to the configured batch size and converts
617    /// each chunk into an Arrow record batch with appropriate metadata.
618    ///
619    /// # Type Parameters
620    ///
621    /// - `T`: The data type to convert, must implement required encoding traits.
622    ///
623    /// # Parameters
624    ///
625    /// - `data`: Vector of data records to convert
626    ///
627    /// # Returns
628    ///
629    /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
630    ///
631    /// # Errors
632    ///
633    /// Returns an error if record batch encoding fails for any chunk.
634    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
635    where
636        T: HasTsInit + EncodeToRecordBatch,
637    {
638        let mut batches = Vec::new();
639
640        for chunk in &data.into_iter().chunks(self.batch_size) {
641            let data = chunk.collect_vec();
642            let metadata = EncodeToRecordBatch::chunk_metadata(&data);
643            let record_batch = T::encode_batch(&metadata, &data)?;
644            batches.push(record_batch);
645        }
646
647        Ok(batches)
648    }
649
650    /// Extends the timestamp range of an existing Parquet file by renaming it.
651    ///
652    /// This method finds an existing file that is adjacent to the specified time range
653    /// and renames it to include the new range. This is useful when appending data
654    /// that extends the time coverage of existing files.
655    ///
656    /// # Parameters
657    ///
658    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
659    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
660    /// - `start`: Start timestamp of the new range to extend to.
661    /// - `end`: End timestamp of the new range to extend to.
662    ///
663    /// # Returns
664    ///
665    /// Returns `Ok(())` on success, or an error if the operation fails.
666    ///
667    /// # Errors
668    ///
669    /// This function will return an error if:
670    /// - The directory path cannot be constructed.
671    /// - No adjacent file is found to extend.
672    /// - File rename operations fail.
673    /// - Interval validation fails after extension.
674    ///
675    /// # Examples
676    ///
677    /// ```rust,no_run
678    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
679    /// use nautilus_core::UnixNanos;
680    ///
681    /// let catalog = ParquetDataCatalog::new(/* ... */);
682    ///
683    /// // Extend a file's range backwards or forwards
684    /// catalog.extend_file_name(
685    ///     "quotes",
686    ///     Some("BTCUSD".to_string()),
687    ///     UnixNanos::from(1609459200000000000),
688    ///     UnixNanos::from(1609545600000000000)
689    /// )?;
690    /// # Ok::<(), anyhow::Error>(())
691    /// ```
692    pub fn extend_file_name(
693        &self,
694        data_cls: &str,
695        instrument_id: Option<String>,
696        start: UnixNanos,
697        end: UnixNanos,
698    ) -> anyhow::Result<()> {
699        let directory = self.make_path(data_cls, instrument_id)?;
700        let intervals = self.get_directory_intervals(&directory)?;
701
702        let start = start.as_u64();
703        let end = end.as_u64();
704
705        for interval in intervals {
706            if interval.0 == end + 1 {
707                // Extend backwards: new file covers [start, interval.1]
708                self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
709                break;
710            } else if interval.1 == start - 1 {
711                // Extend forwards: new file covers [interval.0, end]
712                self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
713                break;
714            }
715        }
716
717        let intervals = self.get_directory_intervals(&directory)?;
718
719        if !are_intervals_disjoint(&intervals) {
720            anyhow::bail!("Intervals are not disjoint after extending a file");
721        }
722
723        Ok(())
724    }
725
726    /// Helper method to list parquet files in a directory
727    fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
728        self.execute_async(async {
729            let prefix = ObjectPath::from(format!("{directory}/"));
730            let mut stream = self.object_store.list(Some(&prefix));
731            let mut files = Vec::new();
732
733            while let Some(object) = stream.next().await {
734                let object = object?;
735                if object.location.as_ref().ends_with(".parquet") {
736                    files.push(object.location.to_string());
737                }
738            }
739            Ok::<Vec<String>, anyhow::Error>(files)
740        })
741    }
742
743    /// Helper method to reconstruct full URI for remote object store paths
744    fn reconstruct_full_uri(&self, path_str: &str) -> String {
745        // Check if this is a remote URI scheme that needs reconstruction
746        if self.is_remote_uri() {
747            // Extract the base URL (scheme + host) from the original URI
748            if let Ok(url) = url::Url::parse(&self.original_uri) {
749                if let Some(host) = url.host_str() {
750                    return format!("{}://{}/{}", url.scheme(), host, path_str);
751                }
752            }
753        }
754
755        // For local paths or if URL parsing fails, return the path as-is
756        path_str.to_string()
757    }
758
759    /// Helper method to check if the original URI uses a remote object store scheme
760    fn is_remote_uri(&self) -> bool {
761        self.original_uri.starts_with("s3://")
762            || self.original_uri.starts_with("gs://")
763            || self.original_uri.starts_with("gcs://")
764            || self.original_uri.starts_with("azure://")
765            || self.original_uri.starts_with("abfs://")
766            || self.original_uri.starts_with("http://")
767            || self.original_uri.starts_with("https://")
768    }
769
770    /// Consolidates all data files in the catalog by merging multiple files into single files per directory.
771    ///
772    /// This method finds all leaf data directories in the catalog and consolidates the Parquet files
773    /// within each directory. Consolidation improves query performance by reducing the number of files
774    /// that need to be read and can also reduce storage overhead.
775    ///
776    /// # Parameters
777    ///
778    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
779    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
780    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
781    ///
782    /// # Returns
783    ///
784    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
785    ///
786    /// # Errors
787    ///
788    /// This function will return an error if:
789    /// - Directory listing fails.
790    /// - File consolidation operations fail.
791    /// - Interval validation fails (when `ensure_contiguous_files` is true).
792    ///
793    /// # Examples
794    ///
795    /// ```rust,no_run
796    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
797    /// use nautilus_core::UnixNanos;
798    ///
799    /// let catalog = ParquetDataCatalog::new(/* ... */);
800    ///
801    /// // Consolidate all files in the catalog
802    /// catalog.consolidate_catalog(None, None, None)?;
803    ///
804    /// // Consolidate only files within a specific time range
805    /// catalog.consolidate_catalog(
806    ///     Some(UnixNanos::from(1609459200000000000)),
807    ///     Some(UnixNanos::from(1609545600000000000)),
808    ///     Some(true)
809    /// )?;
810    /// # Ok::<(), anyhow::Error>(())
811    /// ```
812    pub fn consolidate_catalog(
813        &self,
814        start: Option<UnixNanos>,
815        end: Option<UnixNanos>,
816        ensure_contiguous_files: Option<bool>,
817    ) -> anyhow::Result<()> {
818        let leaf_directories = self.find_leaf_data_directories()?;
819
820        for directory in leaf_directories {
821            self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
822        }
823
824        Ok(())
825    }
826
827    /// Consolidates data files for a specific data type and instrument.
828    ///
829    /// This method consolidates Parquet files within a specific directory (defined by data type
830    /// and optional instrument ID) by merging multiple files into a single file. This improves
831    /// query performance and can reduce storage overhead.
832    ///
833    /// # Parameters
834    ///
835    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
836    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
837    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
838    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
839    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
840    ///
841    /// # Returns
842    ///
843    /// Returns `Ok(())` on success, or an error if consolidation fails.
844    ///
845    /// # Errors
846    ///
847    /// This function will return an error if:
848    /// - The directory path cannot be constructed
849    /// - File consolidation operations fail
850    /// - Interval validation fails (when `ensure_contiguous_files` is true)
851    ///
852    /// # Examples
853    ///
854    /// ```rust,no_run
855    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
856    /// use nautilus_core::UnixNanos;
857    ///
858    /// let catalog = ParquetDataCatalog::new(/* ... */);
859    ///
860    /// // Consolidate all quote files for a specific instrument
861    /// catalog.consolidate_data(
862    ///     "quotes",
863    ///     Some("BTCUSD".to_string()),
864    ///     None,
865    ///     None,
866    ///     None
867    /// )?;
868    ///
869    /// // Consolidate trade files within a time range
870    /// catalog.consolidate_data(
871    ///     "trades",
872    ///     None,
873    ///     Some(UnixNanos::from(1609459200000000000)),
874    ///     Some(UnixNanos::from(1609545600000000000)),
875    ///     Some(true)
876    /// )?;
877    /// # Ok::<(), anyhow::Error>(())
878    /// ```
879    pub fn consolidate_data(
880        &self,
881        type_name: &str,
882        instrument_id: Option<String>,
883        start: Option<UnixNanos>,
884        end: Option<UnixNanos>,
885        ensure_contiguous_files: Option<bool>,
886    ) -> anyhow::Result<()> {
887        let directory = self.make_path(type_name, instrument_id)?;
888        self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
889    }
890
891    fn consolidate_directory(
892        &self,
893        directory: &str,
894        start: Option<UnixNanos>,
895        end: Option<UnixNanos>,
896        ensure_contiguous_files: Option<bool>,
897    ) -> anyhow::Result<()> {
898        let parquet_files = self.list_parquet_files(directory)?;
899
900        if parquet_files.len() <= 1 {
901            return Ok(());
902        }
903
904        let mut files_to_consolidate = Vec::new();
905        let mut intervals = Vec::new();
906        let start = start.map(|t| t.as_u64());
907        let end = end.map(|t| t.as_u64());
908
909        for file in parquet_files {
910            if let Some(interval) = parse_filename_timestamps(&file) {
911                let (interval_start, interval_end) = interval;
912                let include_file = match (start, end) {
913                    (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
914                    (Some(s), None) => interval_start >= s,
915                    (None, Some(e)) => interval_end <= e,
916                    (None, None) => true,
917                };
918
919                if include_file {
920                    files_to_consolidate.push(file);
921                    intervals.push(interval);
922                }
923            }
924        }
925
926        intervals.sort_by_key(|&(start, _)| start);
927
928        if !intervals.is_empty() {
929            let file_name = timestamps_to_filename(
930                UnixNanos::from(intervals[0].0),
931                UnixNanos::from(intervals.last().unwrap().1),
932            );
933            let path = format!("{directory}/{file_name}");
934
935            // Convert string paths to ObjectPath for the function call
936            let object_paths: Vec<ObjectPath> = files_to_consolidate
937                .iter()
938                .map(|path| ObjectPath::from(path.as_str()))
939                .collect();
940
941            self.execute_async(async {
942                combine_parquet_files_from_object_store(
943                    self.object_store.clone(),
944                    object_paths,
945                    &ObjectPath::from(path),
946                    Some(self.compression),
947                    Some(self.max_row_group_size),
948                )
949                .await
950            })?;
951        }
952
953        if ensure_contiguous_files.unwrap_or(true) && !are_intervals_contiguous(&intervals) {
954            anyhow::bail!("Intervals are not disjoint after consolidating a directory");
955        }
956
957        Ok(())
958    }
959
960    /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
961    ///
962    /// This method scans all leaf data directories in the catalog and renames files based on
963    /// the actual timestamp range of their content. This is useful when files have been
964    /// modified or when filename conventions have changed.
965    ///
966    /// # Returns
967    ///
968    /// Returns `Ok(())` on success, or an error if the operation fails.
969    ///
970    /// # Errors
971    ///
972    /// This function will return an error if:
973    /// - Directory listing fails
974    /// - File metadata reading fails
975    /// - File rename operations fail
976    /// - Interval validation fails after renaming
977    ///
978    /// # Examples
979    ///
980    /// ```rust,no_run
981    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
982    ///
983    /// let catalog = ParquetDataCatalog::new(/* ... */);
984    ///
985    /// // Reset all filenames in the catalog
986    /// catalog.reset_catalog_file_names()?;
987    /// # Ok::<(), anyhow::Error>(())
988    /// ```
989    pub fn reset_catalog_file_names(&self) -> anyhow::Result<()> {
990        let leaf_directories = self.find_leaf_data_directories()?;
991
992        for directory in leaf_directories {
993            self.reset_file_names(&directory)?;
994        }
995
996        Ok(())
997    }
998
999    /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1000    ///
1001    /// This method renames files in a specific directory based on the actual timestamp
1002    /// range of their content. This is useful for correcting filenames after data
1003    /// modifications or when filename conventions have changed.
1004    ///
1005    /// # Parameters
1006    ///
1007    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1008    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1009    ///
1010    /// # Returns
1011    ///
1012    /// Returns `Ok(())` on success, or an error if the operation fails.
1013    ///
1014    /// # Errors
1015    ///
1016    /// This function will return an error if:
1017    /// - The directory path cannot be constructed
1018    /// - File metadata reading fails
1019    /// - File rename operations fail
1020    /// - Interval validation fails after renaming
1021    ///
1022    /// # Examples
1023    ///
1024    /// ```rust,no_run
1025    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1026    ///
1027    /// let catalog = ParquetDataCatalog::new(/* ... */);
1028    ///
1029    /// // Reset filenames for all quote files
1030    /// catalog.reset_data_file_names("quotes", None)?;
1031    ///
1032    /// // Reset filenames for a specific instrument's trade files
1033    /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1034    /// # Ok::<(), anyhow::Error>(())
1035    /// ```
1036    pub fn reset_data_file_names(
1037        &self,
1038        data_cls: &str,
1039        instrument_id: Option<String>,
1040    ) -> anyhow::Result<()> {
1041        let directory = self.make_path(data_cls, instrument_id)?;
1042        self.reset_file_names(&directory)
1043    }
1044
1045    /// Reset the filenames of parquet files in a directory
1046    fn reset_file_names(&self, directory: &str) -> anyhow::Result<()> {
1047        let parquet_files = self.list_parquet_files(directory)?;
1048
1049        for file in parquet_files {
1050            let object_path = ObjectPath::from(file.as_str());
1051            let (first_ts, last_ts) = self.execute_async(async {
1052                min_max_from_parquet_metadata_object_store(
1053                    self.object_store.clone(),
1054                    &object_path,
1055                    "ts_init",
1056                )
1057                .await
1058            })?;
1059
1060            let new_filename =
1061                timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1062            let new_file_path = format!("{directory}/{new_filename}");
1063            let new_object_path = ObjectPath::from(new_file_path);
1064
1065            self.move_file(&object_path, &new_object_path)?;
1066        }
1067
1068        let intervals = self.get_directory_intervals(directory)?;
1069
1070        if !are_intervals_disjoint(&intervals) {
1071            anyhow::bail!("Intervals are not disjoint after resetting file names");
1072        }
1073
1074        Ok(())
1075    }
1076
1077    /// Finds all leaf data directories in the catalog.
1078    ///
1079    /// A leaf directory is one that contains data files but no subdirectories.
1080    /// This method is used to identify directories that can be processed for
1081    /// consolidation or other operations.
1082    ///
1083    /// # Returns
1084    ///
1085    /// Returns a vector of directory path strings representing leaf directories,
1086    /// or an error if directory traversal fails.
1087    ///
1088    /// # Errors
1089    ///
1090    /// This function will return an error if:
1091    /// - Object store listing operations fail
1092    /// - Directory structure cannot be analyzed
1093    ///
1094    /// # Examples
1095    ///
1096    /// ```rust,no_run
1097    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1098    ///
1099    /// let catalog = ParquetDataCatalog::new(/* ... */);
1100    ///
1101    /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1102    /// for dir in leaf_dirs {
1103    ///     println!("Found leaf directory: {}", dir);
1104    /// }
1105    /// # Ok::<(), anyhow::Error>(())
1106    /// ```
1107    pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1108        let data_dir = if self.base_path.is_empty() {
1109            "data".to_string()
1110        } else {
1111            format!("{}/data", self.base_path)
1112        };
1113
1114        let leaf_dirs = self.execute_async(async {
1115            let mut all_paths = std::collections::HashSet::new();
1116            let mut directories = std::collections::HashSet::new();
1117            let mut files_in_dirs = std::collections::HashMap::new();
1118
1119            // List all objects under the data directory
1120            let prefix = ObjectPath::from(format!("{data_dir}/"));
1121            let mut stream = self.object_store.list(Some(&prefix));
1122
1123            while let Some(object) = stream.next().await {
1124                let object = object?;
1125                let path_str = object.location.to_string();
1126                all_paths.insert(path_str.clone());
1127
1128                // Extract directory path
1129                if let Some(parent) = std::path::Path::new(&path_str).parent() {
1130                    let parent_str = parent.to_string_lossy().to_string();
1131                    directories.insert(parent_str.clone());
1132
1133                    // Track files in each directory
1134                    files_in_dirs
1135                        .entry(parent_str)
1136                        .or_insert_with(Vec::new)
1137                        .push(path_str);
1138                }
1139            }
1140
1141            // Find leaf directories (directories with files but no subdirectories)
1142            let mut leaf_dirs = Vec::new();
1143            for dir in &directories {
1144                let has_files = files_in_dirs
1145                    .get(dir)
1146                    .is_some_and(|files| !files.is_empty());
1147                let has_subdirs = directories
1148                    .iter()
1149                    .any(|d| d.starts_with(&format!("{dir}/")) && d != dir);
1150
1151                if has_files && !has_subdirs {
1152                    leaf_dirs.push(dir.clone());
1153                }
1154            }
1155
1156            Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1157        })?;
1158
1159        Ok(leaf_dirs)
1160    }
1161
1162    /// Query data loaded in the catalog
1163    pub fn query<T>(
1164        &mut self,
1165        instrument_ids: Option<Vec<String>>,
1166        start: Option<UnixNanos>,
1167        end: Option<UnixNanos>,
1168        where_clause: Option<&str>,
1169    ) -> anyhow::Result<QueryResult>
1170    where
1171        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
1172    {
1173        // Register the object store with the session for remote URIs
1174        if self.is_remote_uri() {
1175            let url = url::Url::parse(&self.original_uri)?;
1176            let host = url
1177                .host_str()
1178                .ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
1179            let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
1180            self.session
1181                .register_object_store(&base_url, self.object_store.clone());
1182        }
1183
1184        let files_list = self.query_files(T::path_prefix(), instrument_ids, start, end)?;
1185
1186        for (idx, file_uri) in files_list.iter().enumerate() {
1187            let table_name = format!("{}_{}", T::path_prefix(), idx);
1188            let query = build_query(&table_name, start, end, where_clause);
1189
1190            self.session
1191                .add_file::<T>(&table_name, file_uri, Some(&query))?;
1192        }
1193
1194        Ok(self.session.get_query_result())
1195    }
1196
1197    /// Queries all Parquet files for a specific data type and optional instrument IDs.
1198    ///
1199    /// This method finds all Parquet files that match the specified criteria and returns
1200    /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1201    /// and timestamp range (if provided).
1202    ///
1203    /// # Parameters
1204    ///
1205    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1206    /// - `instrument_ids`: Optional list of instrument IDs to filter by.
1207    /// - `start`: Optional start timestamp to filter files by their time range.
1208    /// - `end`: Optional end timestamp to filter files by their time range.
1209    ///
1210    /// # Returns
1211    ///
1212    /// Returns a vector of file URI strings that match the query criteria,
1213    /// or an error if the query fails.
1214    ///
1215    /// # Errors
1216    ///
1217    /// This function will return an error if:
1218    /// - The directory path cannot be constructed.
1219    /// - Object store listing operations fail.
1220    /// - URI reconstruction fails.
1221    ///
1222    /// # Examples
1223    ///
1224    /// ```rust,no_run
1225    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1226    /// use nautilus_core::UnixNanos;
1227    ///
1228    /// let catalog = ParquetDataCatalog::new(/* ... */);
1229    ///
1230    /// // Query all quote files
1231    /// let files = catalog.query_files("quotes", None, None, None)?;
1232    ///
1233    /// // Query trade files for specific instruments within a time range
1234    /// let files = catalog.query_files(
1235    ///     "trades",
1236    ///     Some(vec!["BTCUSD".to_string(), "ETHUSD".to_string()]),
1237    ///     Some(UnixNanos::from(1609459200000000000)),
1238    ///     Some(UnixNanos::from(1609545600000000000))
1239    /// )?;
1240    /// # Ok::<(), anyhow::Error>(())
1241    /// ```
1242    pub fn query_files(
1243        &self,
1244        data_cls: &str,
1245        instrument_ids: Option<Vec<String>>,
1246        start: Option<UnixNanos>,
1247        end: Option<UnixNanos>,
1248    ) -> anyhow::Result<Vec<String>> {
1249        let mut files = Vec::new();
1250
1251        let start_u64 = start.map(|s| s.as_u64());
1252        let end_u64 = end.map(|e| e.as_u64());
1253
1254        let base_dir = self.make_path(data_cls, None)?;
1255
1256        // Use recursive listing to match Python's glob behavior
1257        let list_result = self.execute_async(async {
1258            let prefix = ObjectPath::from(format!("{base_dir}/"));
1259            let mut stream = self.object_store.list(Some(&prefix));
1260            let mut objects = Vec::new();
1261            while let Some(object) = stream.next().await {
1262                objects.push(object?);
1263            }
1264            Ok::<Vec<_>, anyhow::Error>(objects)
1265        })?;
1266
1267        let mut file_paths: Vec<String> = list_result
1268            .into_iter()
1269            .filter_map(|object| {
1270                let path_str = object.location.to_string();
1271                if path_str.ends_with(".parquet") {
1272                    Some(path_str)
1273                } else {
1274                    None
1275                }
1276            })
1277            .collect();
1278
1279        // Apply identifier filtering if provided
1280        if let Some(identifiers) = instrument_ids {
1281            let safe_identifiers: Vec<String> = identifiers
1282                .iter()
1283                .map(|id| urisafe_instrument_id(id))
1284                .collect();
1285
1286            // Exact match by default for instrument_ids or bar_types
1287            let exact_match_file_paths: Vec<String> = file_paths
1288                .iter()
1289                .filter(|file_path| {
1290                    // Extract the directory name (second to last path component)
1291                    let path_parts: Vec<&str> = file_path.split('/').collect();
1292                    if path_parts.len() >= 2 {
1293                        let dir_name = path_parts[path_parts.len() - 2];
1294                        safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
1295                    } else {
1296                        false
1297                    }
1298                })
1299                .cloned()
1300                .collect();
1301
1302            if exact_match_file_paths.is_empty() && data_cls == "bars" {
1303                // Partial match of instrument_ids in bar_types for bars
1304                file_paths.retain(|file_path| {
1305                    let path_parts: Vec<&str> = file_path.split('/').collect();
1306                    if path_parts.len() >= 2 {
1307                        let dir_name = path_parts[path_parts.len() - 2];
1308                        safe_identifiers
1309                            .iter()
1310                            .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
1311                    } else {
1312                        false
1313                    }
1314                });
1315            } else {
1316                file_paths = exact_match_file_paths;
1317            }
1318        }
1319
1320        // Apply timestamp filtering
1321        file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
1322
1323        // Convert to full URIs
1324        for file_path in file_paths {
1325            let full_uri = self.reconstruct_full_uri(&file_path);
1326            files.push(full_uri);
1327        }
1328
1329        Ok(files)
1330    }
1331
1332    /// Finds the missing time intervals for a specific data type and instrument ID.
1333    ///
1334    /// This method compares a requested time range against the existing data coverage
1335    /// and returns the gaps that need to be filled. This is useful for determining
1336    /// what data needs to be fetched or backfilled.
1337    ///
1338    /// # Parameters
1339    ///
1340    /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
1341    /// - `end`: End timestamp of the requested range (Unix nanoseconds).
1342    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1343    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1344    ///
1345    /// # Returns
1346    ///
1347    /// Returns a vector of (start, end) tuples representing the missing intervals,
1348    /// or an error if the operation fails.
1349    ///
1350    /// # Errors
1351    ///
1352    /// This function will return an error if:
1353    /// - The directory path cannot be constructed
1354    /// - Interval retrieval fails
1355    /// - Gap calculation fails
1356    ///
1357    /// # Examples
1358    ///
1359    /// ```rust,no_run
1360    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1361    ///
1362    /// let catalog = ParquetDataCatalog::new(/* ... */);
1363    ///
1364    /// // Find missing intervals for quote data
1365    /// let missing = catalog.get_missing_intervals_for_request(
1366    ///     1609459200000000000,  // start
1367    ///     1609545600000000000,  // end
1368    ///     "quotes",
1369    ///     Some("BTCUSD".to_string())
1370    /// )?;
1371    ///
1372    /// for (start, end) in missing {
1373    ///     println!("Missing data from {} to {}", start, end);
1374    /// }
1375    /// # Ok::<(), anyhow::Error>(())
1376    /// ```
1377    pub fn get_missing_intervals_for_request(
1378        &self,
1379        start: u64,
1380        end: u64,
1381        data_cls: &str,
1382        instrument_id: Option<String>,
1383    ) -> anyhow::Result<Vec<(u64, u64)>> {
1384        let intervals = self.get_intervals(data_cls, instrument_id)?;
1385
1386        Ok(query_interval_diff(start, end, &intervals))
1387    }
1388
1389    /// Gets the last (most recent) timestamp for a specific data type and instrument ID.
1390    ///
1391    /// This method finds the latest timestamp covered by existing data files for
1392    /// the specified data type and instrument. This is useful for determining
1393    /// the most recent data available or for incremental data updates.
1394    ///
1395    /// # Parameters
1396    ///
1397    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1398    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1399    ///
1400    /// # Returns
1401    ///
1402    /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
1403    /// or an error if the operation fails.
1404    ///
1405    /// # Errors
1406    ///
1407    /// This function will return an error if:
1408    /// - The directory path cannot be constructed
1409    /// - Interval retrieval fails
1410    ///
1411    /// # Examples
1412    ///
1413    /// ```rust,no_run
1414    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1415    ///
1416    /// let catalog = ParquetDataCatalog::new(/* ... */);
1417    ///
1418    /// // Get the last timestamp for quote data
1419    /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
1420    ///     println!("Last quote timestamp: {}", last_ts);
1421    /// } else {
1422    ///     println!("No quote data found");
1423    /// }
1424    /// # Ok::<(), anyhow::Error>(())
1425    /// ```
1426    pub fn query_last_timestamp(
1427        &self,
1428        data_cls: &str,
1429        instrument_id: Option<String>,
1430    ) -> anyhow::Result<Option<u64>> {
1431        let intervals = self.get_intervals(data_cls, instrument_id)?;
1432
1433        if intervals.is_empty() {
1434            return Ok(None);
1435        }
1436
1437        Ok(Some(intervals.last().unwrap().1))
1438    }
1439
1440    /// Gets the time intervals covered by Parquet files for a specific data type and instrument ID.
1441    ///
1442    /// This method returns all time intervals covered by existing data files for the
1443    /// specified data type and instrument. The intervals are sorted by start time and
1444    /// represent the complete data coverage available.
1445    ///
1446    /// # Parameters
1447    ///
1448    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1449    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1450    ///
1451    /// # Returns
1452    ///
1453    /// Returns a vector of (start, end) tuples representing the covered intervals,
1454    /// sorted by start time, or an error if the operation fails.
1455    ///
1456    /// # Errors
1457    ///
1458    /// This function will return an error if:
1459    /// - The directory path cannot be constructed.
1460    /// - Directory listing fails.
1461    /// - Filename parsing fails.
1462    ///
1463    /// # Examples
1464    ///
1465    /// ```rust,no_run
1466    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1467    ///
1468    /// let catalog = ParquetDataCatalog::new(/* ... */);
1469    ///
1470    /// // Get all intervals for quote data
1471    /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
1472    /// for (start, end) in intervals {
1473    ///     println!("Data available from {} to {}", start, end);
1474    /// }
1475    /// # Ok::<(), anyhow::Error>(())
1476    /// ```
1477    pub fn get_intervals(
1478        &self,
1479        data_cls: &str,
1480        instrument_id: Option<String>,
1481    ) -> anyhow::Result<Vec<(u64, u64)>> {
1482        let directory = self.make_path(data_cls, instrument_id)?;
1483
1484        self.get_directory_intervals(&directory)
1485    }
1486
1487    /// Get the time intervals covered by parquet files in a directory
1488    fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
1489        let mut intervals = Vec::new();
1490
1491        // Use object store for all operations
1492        let list_result = self.execute_async(async {
1493            let path = object_store::path::Path::from(directory);
1494            Ok(self
1495                .object_store
1496                .list(Some(&path))
1497                .collect::<Vec<_>>()
1498                .await)
1499        })?;
1500
1501        for result in list_result {
1502            match result {
1503                Ok(object) => {
1504                    let path_str = object.location.to_string();
1505                    if path_str.ends_with(".parquet") {
1506                        if let Some(interval) = parse_filename_timestamps(&path_str) {
1507                            intervals.push(interval);
1508                        }
1509                    }
1510                }
1511                Err(_) => {
1512                    // Directory doesn't exist or is empty, which is fine
1513                    break;
1514                }
1515            }
1516        }
1517
1518        intervals.sort_by_key(|&(start, _)| start);
1519
1520        Ok(intervals)
1521    }
1522
1523    /// Create a directory path for a data type and instrument ID
1524    fn make_path(&self, type_name: &str, instrument_id: Option<String>) -> anyhow::Result<String> {
1525        let mut path = if self.base_path.is_empty() {
1526            format!("data/{type_name}")
1527        } else {
1528            // Remove trailing slash from base_path to avoid double slashes
1529            let base_path = self.base_path.trim_end_matches('/');
1530            format!("{base_path}/data/{type_name}")
1531        };
1532
1533        if let Some(id) = instrument_id {
1534            path = format!("{}/{}", path, urisafe_instrument_id(&id));
1535        }
1536
1537        Ok(path)
1538    }
1539
1540    /// Helper method to rename a parquet file by moving it via object store operations
1541    fn rename_parquet_file(
1542        &self,
1543        directory: &str,
1544        old_start: u64,
1545        old_end: u64,
1546        new_start: u64,
1547        new_end: u64,
1548    ) -> anyhow::Result<()> {
1549        let old_filename =
1550            timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
1551        let old_path = format!("{directory}/{old_filename}");
1552        let old_object_path = self.to_object_path(&old_path);
1553
1554        let new_filename =
1555            timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
1556        let new_path = format!("{directory}/{new_filename}");
1557        let new_object_path = self.to_object_path(&new_path);
1558
1559        self.move_file(&old_object_path, &new_object_path)
1560    }
1561
1562    /// Helper method to convert a path string to `ObjectPath`, handling `base_path`
1563    fn to_object_path(&self, path: &str) -> ObjectPath {
1564        if self.base_path.is_empty() {
1565            return ObjectPath::from(path);
1566        }
1567
1568        let base = self.base_path.trim_end_matches('/');
1569
1570        // Remove the catalog base prefix if present
1571        let without_base = path
1572            .strip_prefix(&format!("{base}/"))
1573            .or_else(|| path.strip_prefix(base))
1574            .unwrap_or(path);
1575
1576        ObjectPath::from(without_base)
1577    }
1578
1579    /// Helper method to move a file using object store rename operation
1580    fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
1581        self.execute_async(async {
1582            self.object_store
1583                .rename(old_path, new_path)
1584                .await
1585                .map_err(anyhow::Error::from)
1586        })
1587    }
1588
1589    /// Helper method to execute async operations with a runtime
1590    fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
1591    where
1592        F: std::future::Future<Output = anyhow::Result<R>>,
1593    {
1594        let rt = nautilus_common::runtime::get_runtime();
1595        rt.block_on(future)
1596    }
1597}
1598
1599/// Trait for providing catalog path prefixes for different data types.
1600///
1601/// This trait enables type-safe organization of data within the catalog by providing
1602/// a standardized way to determine the directory structure for each data type.
1603/// Each data type maps to a specific subdirectory within the catalog's data folder.
1604///
1605/// # Implementation
1606///
1607/// Types implementing this trait should return a static string that represents
1608/// the directory name where data of that type should be stored.
1609///
1610/// # Examples
1611///
1612/// ```rust
1613/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
1614/// use nautilus_model::data::QuoteTick;
1615///
1616/// assert_eq!(QuoteTick::path_prefix(), "quotes");
1617/// ```
1618pub trait CatalogPathPrefix {
1619    /// Returns the path prefix (directory name) for this data type.
1620    ///
1621    /// # Returns
1622    ///
1623    /// A static string representing the directory name where this data type is stored.
1624    fn path_prefix() -> &'static str;
1625}
1626
1627/// Macro for implementing [`CatalogPathPrefix`] for data types.
1628///
1629/// This macro provides a convenient way to implement the trait for multiple types
1630/// with their corresponding path prefixes.
1631///
1632/// # Parameters
1633///
1634/// - `$type`: The data type to implement the trait for.
1635/// - `$path`: The path prefix string for that type.
1636macro_rules! impl_catalog_path_prefix {
1637    ($type:ty, $path:expr) => {
1638        impl CatalogPathPrefix for $type {
1639            fn path_prefix() -> &'static str {
1640                $path
1641            }
1642        }
1643    };
1644}
1645
1646// Standard implementations for financial data types
1647impl_catalog_path_prefix!(QuoteTick, "quotes");
1648impl_catalog_path_prefix!(TradeTick, "trades");
1649impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
1650impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
1651impl_catalog_path_prefix!(Bar, "bars");
1652impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
1653impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
1654impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
1655
1656////////////////////////////////////////////////////////////////////////////////
1657// Helper functions for filename operations
1658////////////////////////////////////////////////////////////////////////////////
1659
1660/// Converts timestamps to a filename using ISO 8601 format.
1661///
1662/// This function converts two Unix nanosecond timestamps to a filename that uses
1663/// ISO 8601 format with filesystem-safe characters. The format matches the Python
1664/// implementation for consistency.
1665///
1666/// # Parameters
1667///
1668/// - `timestamp_1`: First timestamp in Unix nanoseconds.
1669/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
1670///
1671/// # Returns
1672///
1673/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
1674///
1675/// # Examples
1676///
1677/// ```rust
1678/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
1679/// # use nautilus_core::UnixNanos;
1680/// let filename = timestamps_to_filename(
1681///     UnixNanos::from(1609459200000000000),
1682///     UnixNanos::from(1609545600000000000)
1683/// );
1684/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
1685/// ```
1686fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
1687    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
1688    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
1689
1690    format!("{datetime_1}_{datetime_2}.parquet")
1691}
1692
1693/// Converts an ISO 8601 timestamp to a filesystem-safe format.
1694///
1695/// This function replaces colons and dots with hyphens to make the timestamp
1696/// safe for use in filenames across different filesystems.
1697///
1698/// # Parameters
1699///
1700/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1701///
1702/// # Returns
1703///
1704/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1705///
1706/// # Examples
1707///
1708/// ```rust
1709/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
1710/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
1711/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
1712/// ```
1713fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
1714    iso_timestamp.replace([':', '.'], "-")
1715}
1716
1717/// Converts a filesystem-safe timestamp back to ISO 8601 format.
1718///
1719/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
1720/// converting filesystem-safe timestamps back to standard ISO 8601 format.
1721///
1722/// # Parameters
1723///
1724/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1725///
1726/// # Returns
1727///
1728/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1729///
1730/// # Examples
1731///
1732/// ```rust
1733/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
1734/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
1735/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
1736/// ```
1737fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
1738    let (date_part, time_part) = file_timestamp
1739        .split_once('T')
1740        .unwrap_or((file_timestamp, ""));
1741    let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
1742
1743    // Find the last hyphen to separate nanoseconds
1744    if let Some(last_hyphen_idx) = time_part.rfind('-') {
1745        let time_with_dot_for_nanos = format!(
1746            "{}.{}",
1747            &time_part[..last_hyphen_idx],
1748            &time_part[last_hyphen_idx + 1..]
1749        );
1750        let final_time_part = time_with_dot_for_nanos.replace('-', ":");
1751        format!("{date_part}T{final_time_part}Z")
1752    } else {
1753        // Fallback if no nanoseconds part found
1754        let final_time_part = time_part.replace('-', ":");
1755        format!("{date_part}T{final_time_part}Z")
1756    }
1757}
1758
1759/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
1760///
1761/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
1762/// It's used to convert parsed timestamps back to the internal representation.
1763///
1764/// # Parameters
1765///
1766/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1767///
1768/// # Returns
1769///
1770/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
1771///
1772/// # Examples
1773///
1774/// ```rust
1775/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
1776/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
1777/// assert_eq!(nanos, 1609459200000000000);
1778/// ```
1779fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
1780    Ok(iso8601_to_unix_nanos(iso_timestamp.to_string())?.into())
1781}
1782
1783////////////////////////////////////////////////////////////////////////////////
1784// Helper functions for interval operations
1785////////////////////////////////////////////////////////////////////////////////
1786
1787/// Converts an instrument ID to a URI-safe format by removing forward slashes.
1788///
1789/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
1790/// suitable for use in file paths. This function removes these characters to
1791/// create a safe directory name.
1792///
1793/// # Parameters
1794///
1795/// - `instrument_id`: The original instrument ID string.
1796///
1797/// # Returns
1798///
1799/// A URI-safe version of the instrument ID with forward slashes removed.
1800///
1801/// # Examples
1802///
1803/// ```rust
1804/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
1805/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
1806/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
1807/// ```
1808fn urisafe_instrument_id(instrument_id: &str) -> String {
1809    instrument_id.replace('/', "")
1810}
1811
1812/// Checks if a filename's timestamp range intersects with a query interval.
1813///
1814/// This function determines whether a Parquet file (identified by its timestamp-based
1815/// filename) contains data that falls within the specified query time range.
1816///
1817/// # Parameters
1818///
1819/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
1820/// - `start`: Optional start timestamp for the query range.
1821/// - `end`: Optional end timestamp for the query range.
1822///
1823/// # Returns
1824///
1825/// Returns `true` if the file's time range intersects with the query range,
1826/// `false` otherwise. Returns `true` if the filename cannot be parsed.
1827///
1828/// # Examples
1829///
1830/// ```rust
1831/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
1832/// // Example with ISO format filenames
1833/// assert!(query_intersects_filename(
1834///     "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
1835///     Some(1609459200000000000),
1836///     Some(1609545600000000000)
1837/// ));
1838/// ```
1839fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
1840    if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
1841        (start.is_none() || start.unwrap() <= file_end)
1842            && (end.is_none() || file_start <= end.unwrap())
1843    } else {
1844        true
1845    }
1846}
1847
1848/// Parses timestamps from a Parquet filename.
1849///
1850/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
1851/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
1852///
1853/// # Parameters
1854///
1855/// - `filename`: The filename to parse (can be a full path).
1856///
1857/// # Returns
1858///
1859/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
1860/// `None` otherwise.
1861///
1862/// # Examples
1863///
1864/// ```rust
1865/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
1866/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
1867/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
1868/// ```
1869fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
1870    let path = Path::new(filename);
1871    let base_name = path.file_name()?.to_str()?;
1872    let base_filename = base_name.strip_suffix(".parquet")?;
1873    let (first_part, second_part) = base_filename.split_once('_')?;
1874
1875    let first_iso = file_timestamp_to_iso_timestamp(first_part);
1876    let second_iso = file_timestamp_to_iso_timestamp(second_part);
1877
1878    let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
1879    let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
1880
1881    Some((first_ts, second_ts))
1882}
1883
1884/// Checks if a list of closed integer intervals are all mutually disjoint.
1885///
1886/// Two intervals are disjoint if they do not overlap. This function validates that
1887/// all intervals in the list are non-overlapping, which is a requirement for
1888/// maintaining data integrity in the catalog.
1889///
1890/// # Parameters
1891///
1892/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1893///
1894/// # Returns
1895///
1896/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
1897/// Returns `true` for empty lists or lists with a single interval.
1898///
1899/// # Examples
1900///
1901/// ```rust
1902/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
1903/// // Disjoint intervals
1904/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
1905///
1906/// // Overlapping intervals
1907/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
1908/// ```
1909fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
1910    let n = intervals.len();
1911
1912    if n <= 1 {
1913        return true;
1914    }
1915
1916    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
1917    sorted_intervals.sort_by_key(|&(start, _)| start);
1918
1919    for i in 0..(n - 1) {
1920        let (_, end1) = sorted_intervals[i];
1921        let (start2, _) = sorted_intervals[i + 1];
1922
1923        if end1 >= start2 {
1924            return false;
1925        }
1926    }
1927
1928    true
1929}
1930
1931/// Checks if intervals are contiguous (adjacent with no gaps).
1932///
1933/// Intervals are contiguous if, when sorted by start time, each interval's start
1934/// timestamp is exactly one more than the previous interval's end timestamp.
1935/// This ensures complete coverage of a time range with no gaps.
1936///
1937/// # Parameters
1938///
1939/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1940///
1941/// # Returns
1942///
1943/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
1944/// Returns `true` for empty lists or lists with a single interval.
1945///
1946/// # Examples
1947///
1948/// ```rust
1949/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
1950/// // Contiguous intervals
1951/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
1952///
1953/// // Non-contiguous intervals (gap between 5 and 8)
1954/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
1955/// ```
1956fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
1957    let n = intervals.len();
1958    if n <= 1 {
1959        return true;
1960    }
1961
1962    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
1963    sorted_intervals.sort_by_key(|&(start, _)| start);
1964
1965    for i in 0..(n - 1) {
1966        let (_, end1) = sorted_intervals[i];
1967        let (start2, _) = sorted_intervals[i + 1];
1968
1969        if end1 + 1 != start2 {
1970            return false;
1971        }
1972    }
1973
1974    true
1975}
1976
1977/// Finds the parts of a query interval that are not covered by existing data intervals.
1978///
1979/// This function calculates the "gaps" in data coverage by comparing a requested
1980/// time range against the intervals covered by existing data files. It's used to
1981/// determine what data needs to be fetched or backfilled.
1982///
1983/// # Parameters
1984///
1985/// - `start`: Start timestamp of the query interval (inclusive).
1986/// - `end`: End timestamp of the query interval (inclusive).
1987/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
1988///
1989/// # Returns
1990///
1991/// Returns a vector of (start, end) tuples representing the gaps in coverage.
1992/// Returns an empty vector if the query range is invalid or fully covered.
1993///
1994/// # Examples
1995///
1996/// ```rust
1997/// # use nautilus_persistence::backend::catalog::query_interval_diff;
1998/// // Query 1-100, have data for 10-30 and 60-80
1999/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
2000/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
2001/// ```
2002fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
2003    if start > end {
2004        return Vec::new();
2005    }
2006
2007    let interval_set = get_interval_set(closed_intervals);
2008    let query_range = (Bound::Included(start), Bound::Included(end));
2009    let query_diff = interval_set.get_interval_difference(&query_range);
2010    let mut result: Vec<(u64, u64)> = Vec::new();
2011
2012    for interval in query_diff {
2013        if let Some(tuple) = interval_to_tuple(interval, start, end) {
2014            result.push(tuple);
2015        }
2016    }
2017
2018    result
2019}
2020
2021/// Creates an interval tree from closed integer intervals.
2022///
2023/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
2024/// for use with the interval tree data structure, which is used for efficient
2025/// interval operations and gap detection.
2026///
2027/// # Parameters
2028///
2029/// - `intervals`: A slice of closed intervals as (start, end) tuples.
2030///
2031/// # Returns
2032///
2033/// Returns an [`IntervalTree`] containing the converted intervals.
2034///
2035/// # Notes
2036///
2037/// - Invalid intervals (where start > end) are skipped
2038/// - Uses saturating addition to prevent overflow when converting to half-open intervals
2039fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
2040    let mut tree = IntervalTree::default();
2041
2042    if intervals.is_empty() {
2043        return tree;
2044    }
2045
2046    for &(start, end) in intervals {
2047        if start > end {
2048            continue;
2049        }
2050
2051        tree.insert((
2052            Bound::Included(start),
2053            Bound::Excluded(end.saturating_add(1)),
2054        ));
2055    }
2056
2057    tree
2058}
2059
2060/// Converts an interval tree result back to a closed interval tuple.
2061///
2062/// This helper function converts the bounded interval representation used by
2063/// the interval tree back into the (start, end) tuple format used throughout
2064/// the catalog.
2065///
2066/// # Parameters
2067///
2068/// - `interval`: The bounded interval from the interval tree.
2069/// - `query_start`: The start of the original query range.
2070/// - `query_end`: The end of the original query range.
2071///
2072/// # Returns
2073///
2074/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
2075fn interval_to_tuple(
2076    interval: (Bound<&u64>, Bound<&u64>),
2077    query_start: u64,
2078    query_end: u64,
2079) -> Option<(u64, u64)> {
2080    let (bound_start, bound_end) = interval;
2081
2082    let start = match bound_start {
2083        Bound::Included(val) => *val,
2084        Bound::Excluded(val) => val.saturating_add(1),
2085        Bound::Unbounded => query_start,
2086    };
2087
2088    let end = match bound_end {
2089        Bound::Included(val) => *val,
2090        Bound::Excluded(val) => {
2091            if *val == 0 {
2092                return None; // Empty interval
2093            }
2094            val - 1
2095        }
2096        Bound::Unbounded => query_end,
2097    };
2098
2099    if start <= end {
2100        Some((start, end))
2101    } else {
2102        None
2103    }
2104}
2105
2106////////////////////////////////////////////////////////////////////////////////
2107// Tests
2108////////////////////////////////////////////////////////////////////////////////
2109
2110#[cfg(test)]
2111mod tests {
2112    use nautilus_model::data::HasTsInit;
2113
2114    use super::*;
2115
2116    #[derive(Clone)]
2117    struct DummyData(u64);
2118
2119    impl HasTsInit for DummyData {
2120        fn ts_init(&self) -> UnixNanos {
2121            UnixNanos::from(self.0)
2122        }
2123    }
2124
2125    #[test]
2126    fn test_check_ascending_timestamps_error() {
2127        let data = vec![DummyData(2), DummyData(1)];
2128        let result = super::ParquetDataCatalog::check_ascending_timestamps(&data, "dummy");
2129        assert!(result.is_err());
2130    }
2131
2132    #[test]
2133    fn test_to_object_path_trailing_slash() {
2134        // Create catalog with base path that contains a trailing slash
2135        let tmp = tempfile::tempdir().unwrap();
2136        let base_dir = tmp.path().join("catalog");
2137        std::fs::create_dir_all(&base_dir).unwrap();
2138
2139        let catalog = ParquetDataCatalog::new(base_dir.clone(), None, None, None, None);
2140
2141        // Build a sample path under the catalog base
2142        let sample_path = format!(
2143            "{}/data/quotes/XYZ/2021-01-01T00-00-00-000000000Z_2021-01-01T00-00-01-000000000Z.parquet",
2144            base_dir.to_string_lossy()
2145        );
2146
2147        let object_path = catalog.to_object_path(&sample_path);
2148
2149        assert!(
2150            !object_path
2151                .as_ref()
2152                .starts_with(base_dir.to_string_lossy().as_ref())
2153        );
2154    }
2155
2156    #[test]
2157    fn test_is_remote_uri() {
2158        // Test S3 URIs
2159        let s3_catalog =
2160            ParquetDataCatalog::from_uri("s3://bucket/path", None, None, None, None).unwrap();
2161        assert!(s3_catalog.is_remote_uri());
2162
2163        // Test GCS URIs
2164        let gcs_catalog =
2165            ParquetDataCatalog::from_uri("gs://bucket/path", None, None, None, None).unwrap();
2166        assert!(gcs_catalog.is_remote_uri());
2167
2168        let gcs2_catalog =
2169            ParquetDataCatalog::from_uri("gcs://bucket/path", None, None, None, None).unwrap();
2170        assert!(gcs2_catalog.is_remote_uri());
2171
2172        // Test Azure URIs
2173        let azure_catalog =
2174            ParquetDataCatalog::from_uri("azure://account/container/path", None, None, None, None)
2175                .unwrap();
2176        assert!(azure_catalog.is_remote_uri());
2177
2178        let abfs_catalog = ParquetDataCatalog::from_uri(
2179            "abfs://container@account.dfs.core.windows.net/path",
2180            None,
2181            None,
2182            None,
2183            None,
2184        )
2185        .unwrap();
2186        assert!(abfs_catalog.is_remote_uri());
2187
2188        // Test HTTP URIs
2189        let http_catalog =
2190            ParquetDataCatalog::from_uri("http://example.com/path", None, None, None, None)
2191                .unwrap();
2192        assert!(http_catalog.is_remote_uri());
2193
2194        let https_catalog =
2195            ParquetDataCatalog::from_uri("https://example.com/path", None, None, None, None)
2196                .unwrap();
2197        assert!(https_catalog.is_remote_uri());
2198
2199        // Test local paths (should not be remote)
2200        let tmp = tempfile::tempdir().unwrap();
2201        let local_catalog =
2202            ParquetDataCatalog::new(tmp.path().to_path_buf(), None, None, None, None);
2203        assert!(!local_catalog.is_remote_uri());
2204
2205        let tmp_file = tempfile::tempdir().unwrap();
2206        let file_uri = format!("file://{}", tmp_file.path().display());
2207        let file_catalog = ParquetDataCatalog::from_uri(&file_uri, None, None, None, None).unwrap();
2208        assert!(!file_catalog.is_remote_uri());
2209    }
2210
2211    #[test]
2212    fn test_reconstruct_full_uri() {
2213        // Test S3 URI reconstruction
2214        let s3_catalog =
2215            ParquetDataCatalog::from_uri("s3://bucket/base/path", None, None, None, None).unwrap();
2216        let reconstructed = s3_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2217        assert_eq!(reconstructed, "s3://bucket/data/quotes/file.parquet");
2218
2219        // Test GCS URI reconstruction
2220        let gcs_catalog =
2221            ParquetDataCatalog::from_uri("gs://bucket/base/path", None, None, None, None).unwrap();
2222        let reconstructed = gcs_catalog.reconstruct_full_uri("data/trades/file.parquet");
2223        assert_eq!(reconstructed, "gs://bucket/data/trades/file.parquet");
2224
2225        // Test Azure URI reconstruction
2226        let azure_catalog =
2227            ParquetDataCatalog::from_uri("azure://account/container/path", None, None, None, None)
2228                .unwrap();
2229        let reconstructed = azure_catalog.reconstruct_full_uri("data/bars/file.parquet");
2230        assert_eq!(reconstructed, "azure://account/data/bars/file.parquet");
2231
2232        // Test HTTP URI reconstruction
2233        let http_catalog =
2234            ParquetDataCatalog::from_uri("https://example.com/base/path", None, None, None, None)
2235                .unwrap();
2236        let reconstructed = http_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2237        assert_eq!(
2238            reconstructed,
2239            "https://example.com/data/quotes/file.parquet"
2240        );
2241
2242        // Test local path (should return path as-is)
2243        let tmp = tempfile::tempdir().unwrap();
2244        let local_catalog =
2245            ParquetDataCatalog::new(tmp.path().to_path_buf(), None, None, None, None);
2246        let reconstructed = local_catalog.reconstruct_full_uri("data/quotes/file.parquet");
2247        assert_eq!(reconstructed, "data/quotes/file.parquet");
2248    }
2249}