nautilus_serialization/
parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fs, fs::File, path::PathBuf};
17
18use arrow::record_batch::RecordBatch;
19use parquet::{
20    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
21    file::{
22        properties::WriterProperties,
23        reader::{FileReader, SerializedFileReader},
24        statistics::Statistics,
25    },
26};
27
28use crate::enums::ParquetWriteMode;
29
30/// Writes a `RecordBatch` to a Parquet file at the specified `filepath`, with optional compression.
31///
32/// # Errors
33///
34/// Returns an error if writing to Parquet fails or any I/O operation fails.
35pub fn write_batch_to_parquet(
36    batch: RecordBatch,
37    filepath: &PathBuf,
38    compression: Option<parquet::basic::Compression>,
39    max_row_group_size: Option<usize>,
40    write_mode: Option<ParquetWriteMode>,
41) -> anyhow::Result<()> {
42    write_batches_to_parquet(
43        &[batch],
44        filepath,
45        compression,
46        max_row_group_size,
47        write_mode,
48    )
49}
50
51/// Writes multiple `RecordBatch` items to a Parquet file at the specified `filepath`, with optional compression and row group sizing.
52///
53/// # Errors
54///
55/// Returns an error if writing to Parquet fails or any I/O operation fails.
56pub fn write_batches_to_parquet(
57    batches: &[RecordBatch],
58    filepath: &PathBuf,
59    compression: Option<parquet::basic::Compression>,
60    max_row_group_size: Option<usize>,
61    write_mode: Option<ParquetWriteMode>,
62) -> anyhow::Result<()> {
63    let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
64
65    // Ensure the parent directory exists
66    if let Some(parent) = filepath.parent() {
67        std::fs::create_dir_all(parent)?;
68    }
69
70    if (used_write_mode == ParquetWriteMode::Append || used_write_mode == ParquetWriteMode::Prepend)
71        && filepath.exists()
72    {
73        // Read existing parquet file
74        let file = File::open(filepath)?;
75        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
76        let existing_batches: Vec<RecordBatch> = reader.build()?.collect::<Result<Vec<_>, _>>()?;
77
78        if !existing_batches.is_empty() {
79            let mut combined = Vec::with_capacity(existing_batches.len() + batches.len());
80            let batches: Vec<RecordBatch> = batches.to_vec();
81
82            // Combine batches in the appropriate order
83            let combined_batches = if used_write_mode == ParquetWriteMode::Append {
84                combined.extend(existing_batches);
85                combined.extend(batches);
86                combined
87            } else {
88                // Prepend mode
89                combined.extend(batches.clone());
90                combined.extend(existing_batches);
91                combined
92            };
93
94            return write_batches_to_file(
95                &combined_batches,
96                filepath,
97                compression,
98                max_row_group_size,
99            );
100        }
101    }
102
103    // Default case: create new file or overwrite existing
104    write_batches_to_file(batches, filepath, compression, max_row_group_size)
105}
106
107/// Combines multiple Parquet files by column range checks and ordering, then writes the merged result.
108///
109/// # Errors
110///
111/// Returns an error if file reading fails, metadata extraction fails, or merging safety checks fail.
112pub fn combine_data_files(
113    parquet_files: Vec<PathBuf>,
114    column_name: &str,
115    compression: Option<parquet::basic::Compression>,
116    max_row_group_size: Option<usize>,
117) -> anyhow::Result<()> {
118    let n_files = parquet_files.len();
119
120    if n_files <= 1 {
121        return Ok(());
122    }
123
124    // Get min/max for each file
125    let min_max_per_file = parquet_files
126        .iter()
127        .map(|file| min_max_from_parquet_metadata(file, column_name))
128        .collect::<Result<Vec<_>, _>>()?;
129
130    // Create ordering by first timestamp
131    let mut ordering: Vec<usize> = (0..n_files).collect();
132    ordering.sort_by_key(|&i| min_max_per_file[i].0);
133
134    // Check for timestamp intersection
135    for i in 1..n_files {
136        if min_max_per_file[ordering[i - 1]].1 >= min_max_per_file[ordering[i]].0 {
137            anyhow::bail!(
138                "Merging not safe due to intersection of timestamps between files. Aborting."
139            );
140        }
141    }
142
143    // Create sorted list of files
144    let sorted_parquet_files = ordering
145        .into_iter()
146        .map(|i| parquet_files[i].clone())
147        .collect();
148
149    // Combine the files
150    combine_parquet_files(sorted_parquet_files, compression, max_row_group_size)
151}
152
153/// Merges multiple Parquet files into a single Parquet file, combining row groups.
154///
155/// # Errors
156///
157/// Returns an error if file I/O or Parquet merging fails.
158pub fn combine_parquet_files(
159    file_list: Vec<PathBuf>,
160    compression: Option<parquet::basic::Compression>,
161    max_row_group_size: Option<usize>,
162) -> anyhow::Result<()> {
163    if file_list.len() <= 1 {
164        return Ok(());
165    }
166
167    // Create readers and immediately build them.  Store the *readers*, not the builders.
168    let mut readers = Vec::new();
169    for file in &file_list {
170        let file = File::open(file)?;
171        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
172        readers.push(builder.build()?); // Build immediately and store the reader.
173    }
174
175    // Collect all batches into a single vector
176    let mut all_batches: Vec<RecordBatch> = Vec::new();
177    for reader in &mut readers {
178        for batch in reader.by_ref() {
179            all_batches.push(batch?);
180        }
181    }
182
183    // Use write_batches_to_file to write the combined batches
184    write_batches_to_file(&all_batches, &file_list[0], compression, max_row_group_size)?;
185
186    // Remove the merged files.
187    for file_path in file_list.iter().skip(1) {
188        fs::remove_file(file_path)?;
189    }
190
191    Ok(())
192}
193
194fn write_batches_to_file(
195    batches: &[RecordBatch],
196    filepath: &PathBuf,
197    compression: Option<parquet::basic::Compression>,
198    max_row_group_size: Option<usize>,
199) -> anyhow::Result<()> {
200    let file = File::create(filepath)?;
201    let writer_props = WriterProperties::builder()
202        .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
203        .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
204        .build();
205
206    let mut writer = ArrowWriter::try_new(file, batches[0].schema(), Some(writer_props))?;
207    for batch in batches {
208        writer.write(batch)?;
209    }
210    writer.close()?;
211
212    Ok(())
213}
214
215/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata.
216///
217/// # Errors
218///
219/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
220///
221/// # Panics
222///
223/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
224pub fn min_max_from_parquet_metadata(
225    file_path: &PathBuf,
226    column_name: &str,
227) -> anyhow::Result<(i64, i64)> {
228    // Open the parquet file
229    let file = File::open(file_path)?;
230    let reader = SerializedFileReader::new(file)?;
231
232    let metadata = reader.metadata();
233    let mut overall_min_value: Option<i64> = None;
234    let mut overall_max_value: Option<i64> = None;
235
236    // Iterate through all row groups
237    for i in 0..metadata.num_row_groups() {
238        let row_group = metadata.row_group(i);
239
240        // Iterate through all columns in this row group
241        for j in 0..row_group.num_columns() {
242            let col_metadata = row_group.column(j);
243
244            if col_metadata.column_path().string() == column_name {
245                if let Some(stats) = col_metadata.statistics() {
246                    // Check if we have Int64 statistics
247                    if let Statistics::Int64(int64_stats) = stats {
248                        // Extract min value if available
249                        if let Some(&min_value) = int64_stats.min_opt() {
250                            if overall_min_value.is_none() || min_value < overall_min_value.unwrap()
251                            {
252                                overall_min_value = Some(min_value);
253                            }
254                        }
255
256                        // Extract max value if available
257                        if let Some(&max_value) = int64_stats.max_opt() {
258                            if overall_max_value.is_none() || max_value > overall_max_value.unwrap()
259                            {
260                                overall_max_value = Some(max_value);
261                            }
262                        }
263                    } else {
264                        anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
265                    }
266                } else {
267                    anyhow::bail!(
268                        "Warning: Statistics not available for column '{column_name}' in row group {i}."
269                    );
270                }
271            }
272        }
273    }
274
275    // Return the min/max pair if both are available
276    if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
277        Ok((min, max))
278    } else {
279        anyhow::bail!(
280            "Column '{column_name}' not found or has no Int64 statistics in any row group."
281        )
282    }
283}