nautilus_serialization/
parquet.rs1use std::{fs, fs::File, path::PathBuf};
17
18use arrow::record_batch::RecordBatch;
19use parquet::{
20 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
21 file::{
22 properties::WriterProperties,
23 reader::{FileReader, SerializedFileReader},
24 statistics::Statistics,
25 },
26};
27
28use crate::enums::ParquetWriteMode;
29
30pub fn write_batch_to_parquet(
36 batch: RecordBatch,
37 filepath: &PathBuf,
38 compression: Option<parquet::basic::Compression>,
39 max_row_group_size: Option<usize>,
40 write_mode: Option<ParquetWriteMode>,
41) -> anyhow::Result<()> {
42 write_batches_to_parquet(
43 &[batch],
44 filepath,
45 compression,
46 max_row_group_size,
47 write_mode,
48 )
49}
50
51pub fn write_batches_to_parquet(
57 batches: &[RecordBatch],
58 filepath: &PathBuf,
59 compression: Option<parquet::basic::Compression>,
60 max_row_group_size: Option<usize>,
61 write_mode: Option<ParquetWriteMode>,
62) -> anyhow::Result<()> {
63 let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
64
65 if let Some(parent) = filepath.parent() {
67 std::fs::create_dir_all(parent)?;
68 }
69
70 if (used_write_mode == ParquetWriteMode::Append || used_write_mode == ParquetWriteMode::Prepend)
71 && filepath.exists()
72 {
73 let file = File::open(filepath)?;
75 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
76 let existing_batches: Vec<RecordBatch> = reader.build()?.collect::<Result<Vec<_>, _>>()?;
77
78 if !existing_batches.is_empty() {
79 let mut combined = Vec::with_capacity(existing_batches.len() + batches.len());
80 let batches: Vec<RecordBatch> = batches.to_vec();
81
82 let combined_batches = if used_write_mode == ParquetWriteMode::Append {
84 combined.extend(existing_batches);
85 combined.extend(batches);
86 combined
87 } else {
88 combined.extend(batches.clone());
90 combined.extend(existing_batches);
91 combined
92 };
93
94 return write_batches_to_file(
95 &combined_batches,
96 filepath,
97 compression,
98 max_row_group_size,
99 );
100 }
101 }
102
103 write_batches_to_file(batches, filepath, compression, max_row_group_size)
105}
106
107pub fn combine_data_files(
113 parquet_files: Vec<PathBuf>,
114 column_name: &str,
115 compression: Option<parquet::basic::Compression>,
116 max_row_group_size: Option<usize>,
117) -> anyhow::Result<()> {
118 let n_files = parquet_files.len();
119
120 if n_files <= 1 {
121 return Ok(());
122 }
123
124 let min_max_per_file = parquet_files
126 .iter()
127 .map(|file| min_max_from_parquet_metadata(file, column_name))
128 .collect::<Result<Vec<_>, _>>()?;
129
130 let mut ordering: Vec<usize> = (0..n_files).collect();
132 ordering.sort_by_key(|&i| min_max_per_file[i].0);
133
134 for i in 1..n_files {
136 if min_max_per_file[ordering[i - 1]].1 >= min_max_per_file[ordering[i]].0 {
137 anyhow::bail!(
138 "Merging not safe due to intersection of timestamps between files. Aborting."
139 );
140 }
141 }
142
143 let sorted_parquet_files = ordering
145 .into_iter()
146 .map(|i| parquet_files[i].clone())
147 .collect();
148
149 combine_parquet_files(sorted_parquet_files, compression, max_row_group_size)
151}
152
153pub fn combine_parquet_files(
159 file_list: Vec<PathBuf>,
160 compression: Option<parquet::basic::Compression>,
161 max_row_group_size: Option<usize>,
162) -> anyhow::Result<()> {
163 if file_list.len() <= 1 {
164 return Ok(());
165 }
166
167 let mut readers = Vec::new();
169 for file in &file_list {
170 let file = File::open(file)?;
171 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
172 readers.push(builder.build()?); }
174
175 let mut all_batches: Vec<RecordBatch> = Vec::new();
177 for reader in &mut readers {
178 for batch in reader.by_ref() {
179 all_batches.push(batch?);
180 }
181 }
182
183 write_batches_to_file(&all_batches, &file_list[0], compression, max_row_group_size)?;
185
186 for file_path in file_list.iter().skip(1) {
188 fs::remove_file(file_path)?;
189 }
190
191 Ok(())
192}
193
194fn write_batches_to_file(
195 batches: &[RecordBatch],
196 filepath: &PathBuf,
197 compression: Option<parquet::basic::Compression>,
198 max_row_group_size: Option<usize>,
199) -> anyhow::Result<()> {
200 let file = File::create(filepath)?;
201 let writer_props = WriterProperties::builder()
202 .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
203 .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
204 .build();
205
206 let mut writer = ArrowWriter::try_new(file, batches[0].schema(), Some(writer_props))?;
207 for batch in batches {
208 writer.write(batch)?;
209 }
210 writer.close()?;
211
212 Ok(())
213}
214
215pub fn min_max_from_parquet_metadata(
225 file_path: &PathBuf,
226 column_name: &str,
227) -> anyhow::Result<(i64, i64)> {
228 let file = File::open(file_path)?;
230 let reader = SerializedFileReader::new(file)?;
231
232 let metadata = reader.metadata();
233 let mut overall_min_value: Option<i64> = None;
234 let mut overall_max_value: Option<i64> = None;
235
236 for i in 0..metadata.num_row_groups() {
238 let row_group = metadata.row_group(i);
239
240 for j in 0..row_group.num_columns() {
242 let col_metadata = row_group.column(j);
243
244 if col_metadata.column_path().string() == column_name {
245 if let Some(stats) = col_metadata.statistics() {
246 if let Statistics::Int64(int64_stats) = stats {
248 if let Some(&min_value) = int64_stats.min_opt() {
250 if overall_min_value.is_none() || min_value < overall_min_value.unwrap()
251 {
252 overall_min_value = Some(min_value);
253 }
254 }
255
256 if let Some(&max_value) = int64_stats.max_opt() {
258 if overall_max_value.is_none() || max_value > overall_max_value.unwrap()
259 {
260 overall_max_value = Some(max_value);
261 }
262 }
263 } else {
264 anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
265 }
266 } else {
267 anyhow::bail!(
268 "Warning: Statistics not available for column '{column_name}' in row group {i}."
269 );
270 }
271 }
272 }
273 }
274
275 if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
277 Ok((min, max))
278 } else {
279 anyhow::bail!(
280 "Column '{column_name}' not found or has no Int64 statistics in any row group."
281 )
282 }
283}