1use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use object_store::{ObjectStore, path::Path as ObjectPath};
20use parquet::{
21 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
22 file::{
23 properties::WriterProperties,
24 reader::{FileReader, SerializedFileReader},
25 statistics::Statistics,
26 },
27};
28
29pub async fn write_batch_to_parquet(
35 batch: RecordBatch,
36 path: &str,
37 storage_options: Option<std::collections::HashMap<String, String>>,
38 compression: Option<parquet::basic::Compression>,
39 max_row_group_size: Option<usize>,
40) -> anyhow::Result<()> {
41 write_batches_to_parquet(
42 &[batch],
43 path,
44 storage_options,
45 compression,
46 max_row_group_size,
47 )
48 .await
49}
50
51pub async fn write_batches_to_parquet(
57 batches: &[RecordBatch],
58 path: &str,
59 storage_options: Option<std::collections::HashMap<String, String>>,
60 compression: Option<parquet::basic::Compression>,
61 max_row_group_size: Option<usize>,
62) -> anyhow::Result<()> {
63 let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
64 let object_path = if base_path.is_empty() {
65 ObjectPath::from(path)
66 } else {
67 ObjectPath::from(format!("{base_path}/{path}"))
68 };
69
70 write_batches_to_object_store(
71 batches,
72 object_store,
73 &object_path,
74 compression,
75 max_row_group_size,
76 )
77 .await
78}
79
80pub async fn write_batches_to_object_store(
86 batches: &[RecordBatch],
87 object_store: Arc<dyn ObjectStore>,
88 path: &ObjectPath,
89 compression: Option<parquet::basic::Compression>,
90 max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92 let mut buffer = Vec::new();
94
95 let writer_props = WriterProperties::builder()
96 .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
97 .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
98 .build();
99
100 let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
101 for batch in batches {
102 writer.write(batch)?;
103 }
104 writer.close()?;
105
106 object_store.put(path, buffer.into()).await?;
108
109 Ok(())
110}
111
112pub async fn combine_parquet_files(
118 file_paths: Vec<&str>,
119 new_file_path: &str,
120 storage_options: Option<std::collections::HashMap<String, String>>,
121 compression: Option<parquet::basic::Compression>,
122 max_row_group_size: Option<usize>,
123) -> anyhow::Result<()> {
124 if file_paths.len() <= 1 {
125 return Ok(());
126 }
127
128 let (object_store, base_path, _) =
130 create_object_store_from_path(file_paths[0], storage_options)?;
131
132 let object_paths: Vec<ObjectPath> = file_paths
134 .iter()
135 .map(|path| {
136 if base_path.is_empty() {
137 ObjectPath::from(*path)
138 } else {
139 ObjectPath::from(format!("{base_path}/{path}"))
140 }
141 })
142 .collect();
143
144 let new_object_path = if base_path.is_empty() {
145 ObjectPath::from(new_file_path)
146 } else {
147 ObjectPath::from(format!("{base_path}/{new_file_path}"))
148 };
149
150 combine_parquet_files_from_object_store(
151 object_store,
152 object_paths,
153 &new_object_path,
154 compression,
155 max_row_group_size,
156 )
157 .await
158}
159
160pub async fn combine_parquet_files_from_object_store(
166 object_store: Arc<dyn ObjectStore>,
167 file_paths: Vec<ObjectPath>,
168 new_file_path: &ObjectPath,
169 compression: Option<parquet::basic::Compression>,
170 max_row_group_size: Option<usize>,
171) -> anyhow::Result<()> {
172 if file_paths.len() <= 1 {
173 return Ok(());
174 }
175
176 let mut all_batches: Vec<RecordBatch> = Vec::new();
177
178 for path in &file_paths {
180 let data = object_store.get(path).await?.bytes().await?;
181 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
182 let mut reader = builder.build()?;
183
184 for batch in reader.by_ref() {
185 all_batches.push(batch?);
186 }
187 }
188
189 write_batches_to_object_store(
191 &all_batches,
192 object_store.clone(),
193 new_file_path,
194 compression,
195 max_row_group_size,
196 )
197 .await?;
198
199 for path in &file_paths {
201 object_store.delete(path).await?;
202 }
203
204 Ok(())
205}
206
207pub async fn min_max_from_parquet_metadata(
217 file_path: &str,
218 storage_options: Option<std::collections::HashMap<String, String>>,
219 column_name: &str,
220) -> anyhow::Result<(u64, u64)> {
221 let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
222 let object_path = if base_path.is_empty() {
223 ObjectPath::from(file_path)
224 } else {
225 ObjectPath::from(format!("{base_path}/{file_path}"))
226 };
227
228 min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
229}
230
231pub async fn min_max_from_parquet_metadata_object_store(
241 object_store: Arc<dyn ObjectStore>,
242 file_path: &ObjectPath,
243 column_name: &str,
244) -> anyhow::Result<(u64, u64)> {
245 let data = object_store.get(file_path).await?.bytes().await?;
247 let reader = SerializedFileReader::new(data)?;
248
249 let metadata = reader.metadata();
250 let mut overall_min_value: Option<i64> = None;
251 let mut overall_max_value: Option<i64> = None;
252
253 for i in 0..metadata.num_row_groups() {
255 let row_group = metadata.row_group(i);
256
257 for j in 0..row_group.num_columns() {
259 let col_metadata = row_group.column(j);
260
261 if col_metadata.column_path().string() == column_name {
262 if let Some(stats) = col_metadata.statistics() {
263 if let Statistics::Int64(int64_stats) = stats {
265 if let Some(&min_value) = int64_stats.min_opt() {
267 if overall_min_value.is_none() || min_value < overall_min_value.unwrap()
268 {
269 overall_min_value = Some(min_value);
270 }
271 }
272
273 if let Some(&max_value) = int64_stats.max_opt() {
275 if overall_max_value.is_none() || max_value > overall_max_value.unwrap()
276 {
277 overall_max_value = Some(max_value);
278 }
279 }
280 } else {
281 anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
282 }
283 } else {
284 anyhow::bail!(
285 "Warning: Statistics not available for column '{column_name}' in row group {i}."
286 );
287 }
288 }
289 }
290 }
291
292 if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
294 Ok((min as u64, max as u64))
295 } else {
296 anyhow::bail!(
297 "Column '{column_name}' not found or has no Int64 statistics in any row group."
298 )
299 }
300}
301
302pub fn create_object_store_from_path(
321 path: &str,
322 storage_options: Option<std::collections::HashMap<String, String>>,
323) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
324 let uri = normalize_path_to_uri(path);
325
326 match uri.as_str() {
327 s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
328 s if s.starts_with("gs://") || s.starts_with("gcs://") => {
329 create_gcs_store(&uri, storage_options)
330 }
331 s if s.starts_with("azure://") => create_azure_store(&uri, storage_options),
332 s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
333 s if s.starts_with("http://") || s.starts_with("https://") => {
334 create_http_store(&uri, storage_options)
335 }
336 s if s.starts_with("file://") => create_local_store(&uri, true),
337 _ => create_local_store(&uri, false), }
339}
340
341#[must_use]
353pub fn normalize_path_to_uri(path: &str) -> String {
354 if path.contains("://") {
355 path.to_string()
357 } else {
358 if path.starts_with('/') {
360 format!("file://{path}")
361 } else {
362 format!(
363 "file://{}",
364 std::env::current_dir().unwrap().join(path).display()
365 )
366 }
367 }
368}
369
370fn create_local_store(
372 uri: &str,
373 is_file_uri: bool,
374) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
375 let path = if is_file_uri {
376 uri.strip_prefix("file://").unwrap_or(uri)
377 } else {
378 uri
379 };
380
381 let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
382 Ok((Arc::new(local_store), String::new(), uri.to_string()))
383}
384
385fn create_s3_store(
387 uri: &str,
388 storage_options: Option<std::collections::HashMap<String, String>>,
389) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
390 let (url, path) = parse_url_and_path(uri)?;
391 let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
392
393 let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
394
395 if let Some(options) = storage_options {
397 for (key, value) in options {
398 match key.as_str() {
399 "endpoint_url" => {
400 builder = builder.with_endpoint(&value);
401 }
402 "region" => {
403 builder = builder.with_region(&value);
404 }
405 "access_key_id" | "key" => {
406 builder = builder.with_access_key_id(&value);
407 }
408 "secret_access_key" | "secret" => {
409 builder = builder.with_secret_access_key(&value);
410 }
411 "session_token" | "token" => {
412 builder = builder.with_token(&value);
413 }
414 "allow_http" => {
415 let allow_http = value.to_lowercase() == "true";
416 builder = builder.with_allow_http(allow_http);
417 }
418 _ => {
419 log::warn!("Unknown S3 storage option: {key}");
421 }
422 }
423 }
424 }
425
426 let s3_store = builder.build()?;
427 Ok((Arc::new(s3_store), path, uri.to_string()))
428}
429
430fn create_gcs_store(
432 uri: &str,
433 storage_options: Option<std::collections::HashMap<String, String>>,
434) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
435 let (url, path) = parse_url_and_path(uri)?;
436 let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
437
438 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
439
440 if let Some(options) = storage_options {
442 for (key, value) in options {
443 match key.as_str() {
444 "service_account_path" => {
445 builder = builder.with_service_account_path(&value);
446 }
447 "service_account_key" => {
448 builder = builder.with_service_account_key(&value);
449 }
450 "project_id" => {
451 log::warn!(
454 "project_id should be set via service account or environment variables"
455 );
456 }
457 "application_credentials" => {
458 unsafe {
463 std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
464 }
465 }
466 _ => {
467 log::warn!("Unknown GCS storage option: {key}");
469 }
470 }
471 }
472 }
473
474 let gcs_store = builder.build()?;
475 Ok((Arc::new(gcs_store), path, uri.to_string()))
476}
477
478fn create_azure_store(
480 uri: &str,
481 storage_options: Option<std::collections::HashMap<String, String>>,
482) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
483 let (url, _) = parse_url_and_path(uri)?;
484 let account = extract_host(&url, "Invalid Azure URI: missing account")?;
485
486 let path_segments: Vec<&str> = url.path().trim_start_matches('/').split('/').collect();
487 if path_segments.is_empty() || path_segments[0].is_empty() {
488 anyhow::bail!("Invalid Azure URI: missing container");
489 }
490
491 let container = path_segments[0];
492 let path = if path_segments.len() > 1 {
493 path_segments[1..].join("/")
494 } else {
495 String::new()
496 };
497
498 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
499 .with_account(&account)
500 .with_container_name(container);
501
502 if let Some(options) = storage_options {
504 for (key, value) in options {
505 match key.as_str() {
506 "account_name" => {
507 builder = builder.with_account(&value);
508 }
509 "account_key" => {
510 builder = builder.with_access_key(&value);
511 }
512 "sas_token" => {
513 let query_pairs: Vec<(String, String)> = value
515 .split('&')
516 .filter_map(|pair| {
517 let mut parts = pair.split('=');
518 match (parts.next(), parts.next()) {
519 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
520 _ => None,
521 }
522 })
523 .collect();
524 builder = builder.with_sas_authorization(query_pairs);
525 }
526 "client_id" => {
527 builder = builder.with_client_id(&value);
528 }
529 "client_secret" => {
530 builder = builder.with_client_secret(&value);
531 }
532 "tenant_id" => {
533 builder = builder.with_tenant_id(&value);
534 }
535 _ => {
536 log::warn!("Unknown Azure storage option: {key}");
538 }
539 }
540 }
541 }
542
543 let azure_store = builder.build()?;
544 Ok((Arc::new(azure_store), path, uri.to_string()))
545}
546
547fn create_abfs_store(
549 uri: &str,
550 storage_options: Option<std::collections::HashMap<String, String>>,
551) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
552 let (url, path) = parse_url_and_path(uri)?;
553 let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
554
555 let account = host
557 .split('.')
558 .next()
559 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
560
561 let container = url
563 .username()
564 .split('@')
565 .next()
566 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
567
568 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
569 .with_account(account)
570 .with_container_name(container);
571
572 if let Some(options) = storage_options {
574 for (key, value) in options {
575 match key.as_str() {
576 "account_name" => {
577 builder = builder.with_account(&value);
578 }
579 "account_key" => {
580 builder = builder.with_access_key(&value);
581 }
582 "sas_token" => {
583 let query_pairs: Vec<(String, String)> = value
585 .split('&')
586 .filter_map(|pair| {
587 let mut parts = pair.split('=');
588 match (parts.next(), parts.next()) {
589 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
590 _ => None,
591 }
592 })
593 .collect();
594 builder = builder.with_sas_authorization(query_pairs);
595 }
596 "client_id" => {
597 builder = builder.with_client_id(&value);
598 }
599 "client_secret" => {
600 builder = builder.with_client_secret(&value);
601 }
602 "tenant_id" => {
603 builder = builder.with_tenant_id(&value);
604 }
605 _ => {
606 log::warn!("Unknown ABFS storage option: {key}");
608 }
609 }
610 }
611 }
612
613 let azure_store = builder.build()?;
614 Ok((Arc::new(azure_store), path, uri.to_string()))
615}
616
617fn create_http_store(
619 uri: &str,
620 storage_options: Option<std::collections::HashMap<String, String>>,
621) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
622 let (url, path) = parse_url_and_path(uri)?;
623 let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
624
625 let builder = object_store::http::HttpBuilder::new().with_url(base_url);
626
627 if let Some(options) = storage_options {
629 for (key, _value) in options {
630 log::warn!("Unknown HTTP storage option: {key}");
634 }
635 }
636
637 let http_store = builder.build()?;
638 Ok((Arc::new(http_store), path, uri.to_string()))
639}
640
641fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
643 let url = url::Url::parse(uri)?;
644 let path = url.path().trim_start_matches('/').to_string();
645 Ok((url, path))
646}
647
648fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
650 url.host_str()
651 .map(std::string::ToString::to_string)
652 .ok_or_else(|| anyhow::anyhow!("{}", error_msg))
653}
654
655#[cfg(test)]
660mod tests {
661 use std::collections::HashMap;
662
663 use super::*;
664
665 #[test]
666 fn test_create_object_store_from_path_local() {
667 let temp_dir = std::env::temp_dir().join("nautilus_test");
669 std::fs::create_dir_all(&temp_dir).unwrap();
670
671 let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
672 if let Err(e) = &result {
673 println!("Error: {e:?}");
674 }
675 assert!(result.is_ok());
676 let (_, base_path, uri) = result.unwrap();
677 assert_eq!(base_path, "");
678 assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
680
681 std::fs::remove_dir_all(&temp_dir).ok();
683 }
684
685 #[test]
686 fn test_create_object_store_from_path_s3() {
687 let mut options = HashMap::new();
688 options.insert(
689 "endpoint_url".to_string(),
690 "https://test.endpoint.com".to_string(),
691 );
692 options.insert("region".to_string(), "us-west-2".to_string());
693 options.insert("access_key_id".to_string(), "test_key".to_string());
694 options.insert("secret_access_key".to_string(), "test_secret".to_string());
695
696 let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
697 assert!(result.is_ok());
698 let (_, base_path, uri) = result.unwrap();
699 assert_eq!(base_path, "path");
700 assert_eq!(uri, "s3://test-bucket/path");
701 }
702
703 #[test]
704 fn test_create_object_store_from_path_azure() {
705 let mut options = HashMap::new();
706 options.insert("account_name".to_string(), "testaccount".to_string());
707 options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); let result =
711 create_object_store_from_path("azure://testaccount/container/path", Some(options));
712 if let Err(e) = &result {
713 println!("Azure Error: {e:?}");
714 }
715 assert!(result.is_ok());
716 let (_, base_path, uri) = result.unwrap();
717 assert_eq!(base_path, "path");
718 assert_eq!(uri, "azure://testaccount/container/path");
719 }
720
721 #[test]
722 fn test_create_object_store_from_path_gcs() {
723 let mut options = HashMap::new();
725 options.insert("project_id".to_string(), "test-project".to_string());
726
727 let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
728 match result {
731 Ok((_, base_path, uri)) => {
732 assert_eq!(base_path, "path");
733 assert_eq!(uri, "gs://test-bucket/path");
734 }
735 Err(e) => {
736 let error_msg = format!("{e:?}");
738 assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
739 }
740 }
741 }
742
743 #[test]
744 fn test_create_object_store_from_path_empty_options() {
745 let result = create_object_store_from_path("s3://test-bucket/path", None);
746 assert!(result.is_ok());
747 let (_, base_path, uri) = result.unwrap();
748 assert_eq!(base_path, "path");
749 assert_eq!(uri, "s3://test-bucket/path");
750 }
751
752 #[test]
753 fn test_parse_url_and_path() {
754 let result = parse_url_and_path("s3://bucket/path/to/file");
755 assert!(result.is_ok());
756 let (url, path) = result.unwrap();
757 assert_eq!(url.scheme(), "s3");
758 assert_eq!(url.host_str().unwrap(), "bucket");
759 assert_eq!(path, "path/to/file");
760 }
761
762 #[test]
763 fn test_extract_host() {
764 let url = url::Url::parse("s3://test-bucket/path").unwrap();
765 let result = extract_host(&url, "Test error");
766 assert!(result.is_ok());
767 assert_eq!(result.unwrap(), "test-bucket");
768 }
769
770 #[test]
771 fn test_normalize_path_to_uri() {
772 assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
773 assert_eq!(
774 normalize_path_to_uri("s3://bucket/path"),
775 "s3://bucket/path"
776 );
777 assert_eq!(
778 normalize_path_to_uri("file:///tmp/test"),
779 "file:///tmp/test"
780 );
781 }
782}