nautilus_persistence/
parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use object_store::{ObjectStore, path::Path as ObjectPath};
20use parquet::{
21    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
22    file::{
23        properties::WriterProperties,
24        reader::{FileReader, SerializedFileReader},
25        statistics::Statistics,
26    },
27};
28
29/// Writes a `RecordBatch` to a Parquet file using object store, with optional compression.
30///
31/// # Errors
32///
33/// Returns an error if writing to Parquet fails or any I/O operation fails.
34pub async fn write_batch_to_parquet(
35    batch: RecordBatch,
36    path: &str,
37    storage_options: Option<std::collections::HashMap<String, String>>,
38    compression: Option<parquet::basic::Compression>,
39    max_row_group_size: Option<usize>,
40) -> anyhow::Result<()> {
41    write_batches_to_parquet(
42        &[batch],
43        path,
44        storage_options,
45        compression,
46        max_row_group_size,
47    )
48    .await
49}
50
51/// Writes multiple `RecordBatch` items to a Parquet file using object store, with optional compression, row group sizing, and storage options.
52///
53/// # Errors
54///
55/// Returns an error if writing to Parquet fails or any I/O operation fails.
56pub async fn write_batches_to_parquet(
57    batches: &[RecordBatch],
58    path: &str,
59    storage_options: Option<std::collections::HashMap<String, String>>,
60    compression: Option<parquet::basic::Compression>,
61    max_row_group_size: Option<usize>,
62) -> anyhow::Result<()> {
63    let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
64    let object_path = if base_path.is_empty() {
65        ObjectPath::from(path)
66    } else {
67        ObjectPath::from(format!("{base_path}/{path}"))
68    };
69
70    write_batches_to_object_store(
71        batches,
72        object_store,
73        &object_path,
74        compression,
75        max_row_group_size,
76    )
77    .await
78}
79
80/// Writes multiple `RecordBatch` items to an object store URI, with optional compression and row group sizing.
81///
82/// # Errors
83///
84/// Returns an error if writing to Parquet fails or any I/O operation fails.
85pub async fn write_batches_to_object_store(
86    batches: &[RecordBatch],
87    object_store: Arc<dyn ObjectStore>,
88    path: &ObjectPath,
89    compression: Option<parquet::basic::Compression>,
90    max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92    // Create a temporary buffer to write the parquet data
93    let mut buffer = Vec::new();
94
95    let writer_props = WriterProperties::builder()
96        .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
97        .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
98        .build();
99
100    let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
101    for batch in batches {
102        writer.write(batch)?;
103    }
104    writer.close()?;
105
106    // Upload the buffer to object store
107    object_store.put(path, buffer.into()).await?;
108
109    Ok(())
110}
111
112/// Combines multiple Parquet files using object store with storage options
113///
114/// # Errors
115///
116/// Returns an error if file reading or writing fails.
117pub async fn combine_parquet_files(
118    file_paths: Vec<&str>,
119    new_file_path: &str,
120    storage_options: Option<std::collections::HashMap<String, String>>,
121    compression: Option<parquet::basic::Compression>,
122    max_row_group_size: Option<usize>,
123) -> anyhow::Result<()> {
124    if file_paths.len() <= 1 {
125        return Ok(());
126    }
127
128    // Create object store from the first file path (assuming all files are in the same store)
129    let (object_store, base_path, _) =
130        create_object_store_from_path(file_paths[0], storage_options)?;
131
132    // Convert string paths to ObjectPath
133    let object_paths: Vec<ObjectPath> = file_paths
134        .iter()
135        .map(|path| {
136            if base_path.is_empty() {
137                ObjectPath::from(*path)
138            } else {
139                ObjectPath::from(format!("{base_path}/{path}"))
140            }
141        })
142        .collect();
143
144    let new_object_path = if base_path.is_empty() {
145        ObjectPath::from(new_file_path)
146    } else {
147        ObjectPath::from(format!("{base_path}/{new_file_path}"))
148    };
149
150    combine_parquet_files_from_object_store(
151        object_store,
152        object_paths,
153        &new_object_path,
154        compression,
155        max_row_group_size,
156    )
157    .await
158}
159
160/// Combines multiple Parquet files from object store
161///
162/// # Errors
163///
164/// Returns an error if file reading or writing fails.
165pub async fn combine_parquet_files_from_object_store(
166    object_store: Arc<dyn ObjectStore>,
167    file_paths: Vec<ObjectPath>,
168    new_file_path: &ObjectPath,
169    compression: Option<parquet::basic::Compression>,
170    max_row_group_size: Option<usize>,
171) -> anyhow::Result<()> {
172    if file_paths.len() <= 1 {
173        return Ok(());
174    }
175
176    let mut all_batches: Vec<RecordBatch> = Vec::new();
177
178    // Read all files from object store
179    for path in &file_paths {
180        let data = object_store.get(path).await?.bytes().await?;
181        let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
182        let mut reader = builder.build()?;
183
184        for batch in reader.by_ref() {
185            all_batches.push(batch?);
186        }
187    }
188
189    // Write combined batches to new location
190    write_batches_to_object_store(
191        &all_batches,
192        object_store.clone(),
193        new_file_path,
194        compression,
195        max_row_group_size,
196    )
197    .await?;
198
199    // Remove the merged files
200    for path in &file_paths {
201        object_store.delete(path).await?;
202    }
203
204    Ok(())
205}
206
207/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata using object store with storage options.
208///
209/// # Errors
210///
211/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
212///
213/// # Panics
214///
215/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
216pub async fn min_max_from_parquet_metadata(
217    file_path: &str,
218    storage_options: Option<std::collections::HashMap<String, String>>,
219    column_name: &str,
220) -> anyhow::Result<(u64, u64)> {
221    let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
222    let object_path = if base_path.is_empty() {
223        ObjectPath::from(file_path)
224    } else {
225        ObjectPath::from(format!("{base_path}/{file_path}"))
226    };
227
228    min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
229}
230
231/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata in object store.
232///
233/// # Errors
234///
235/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
236///
237/// # Panics
238///
239/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
240pub async fn min_max_from_parquet_metadata_object_store(
241    object_store: Arc<dyn ObjectStore>,
242    file_path: &ObjectPath,
243    column_name: &str,
244) -> anyhow::Result<(u64, u64)> {
245    // Download the parquet file from object store
246    let data = object_store.get(file_path).await?.bytes().await?;
247    let reader = SerializedFileReader::new(data)?;
248
249    let metadata = reader.metadata();
250    let mut overall_min_value: Option<i64> = None;
251    let mut overall_max_value: Option<i64> = None;
252
253    // Iterate through all row groups
254    for i in 0..metadata.num_row_groups() {
255        let row_group = metadata.row_group(i);
256
257        // Iterate through all columns in this row group
258        for j in 0..row_group.num_columns() {
259            let col_metadata = row_group.column(j);
260
261            if col_metadata.column_path().string() == column_name {
262                if let Some(stats) = col_metadata.statistics() {
263                    // Check if we have Int64 statistics
264                    if let Statistics::Int64(int64_stats) = stats {
265                        // Extract min value if available
266                        if let Some(&min_value) = int64_stats.min_opt() {
267                            if overall_min_value.is_none() || min_value < overall_min_value.unwrap()
268                            {
269                                overall_min_value = Some(min_value);
270                            }
271                        }
272
273                        // Extract max value if available
274                        if let Some(&max_value) = int64_stats.max_opt() {
275                            if overall_max_value.is_none() || max_value > overall_max_value.unwrap()
276                            {
277                                overall_max_value = Some(max_value);
278                            }
279                        }
280                    } else {
281                        anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
282                    }
283                } else {
284                    anyhow::bail!(
285                        "Warning: Statistics not available for column '{column_name}' in row group {i}."
286                    );
287                }
288            }
289        }
290    }
291
292    // Return the min/max pair if both are available
293    if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
294        Ok((min as u64, max as u64))
295    } else {
296        anyhow::bail!(
297            "Column '{column_name}' not found or has no Int64 statistics in any row group."
298        )
299    }
300}
301
302/// Creates an object store from a URI string with optional storage options.
303///
304/// Supports multiple cloud storage providers:
305/// - AWS S3: `s3://bucket/path`
306/// - Google Cloud Storage: `gs://bucket/path` or `gcs://bucket/path`
307/// - Azure Blob Storage: `azure://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`
308/// - HTTP/WebDAV: `http://` or `https://`
309/// - Local files: `file://path` or plain paths
310///
311/// # Parameters
312///
313/// - `path`: The URI string for the storage location.
314/// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
315///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
316///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
317///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
318///
319/// Returns a tuple of (`ObjectStore`, `base_path`, `normalized_uri`)
320pub fn create_object_store_from_path(
321    path: &str,
322    storage_options: Option<std::collections::HashMap<String, String>>,
323) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
324    let uri = normalize_path_to_uri(path);
325
326    match uri.as_str() {
327        s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
328        s if s.starts_with("gs://") || s.starts_with("gcs://") => {
329            create_gcs_store(&uri, storage_options)
330        }
331        s if s.starts_with("azure://") => create_azure_store(&uri, storage_options),
332        s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
333        s if s.starts_with("http://") || s.starts_with("https://") => {
334            create_http_store(&uri, storage_options)
335        }
336        s if s.starts_with("file://") => create_local_store(&uri, true),
337        _ => create_local_store(&uri, false), // Fallback: assume local path
338    }
339}
340
341/// Normalizes a path to URI format for consistent object store usage.
342///
343/// If the path is already a URI (contains "://"), returns it as-is.
344/// Otherwise, converts local paths to file:// URIs.
345///
346/// Supported URI schemes:
347/// - `s3://` for AWS S3
348/// - `gs://` or `gcs://` for Google Cloud Storage
349/// - `azure://` or `abfs://` for Azure Blob Storage
350/// - `http://` or `https://` for HTTP/WebDAV
351/// - `file://` for local files
352#[must_use]
353pub fn normalize_path_to_uri(path: &str) -> String {
354    if path.contains("://") {
355        // Already a URI - return as-is
356        path.to_string()
357    } else {
358        // Convert local path to file:// URI
359        if path.starts_with('/') {
360            format!("file://{path}")
361        } else {
362            format!(
363                "file://{}",
364                std::env::current_dir().unwrap().join(path).display()
365            )
366        }
367    }
368}
369
370/// Helper function to create local file system object store
371fn create_local_store(
372    uri: &str,
373    is_file_uri: bool,
374) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
375    let path = if is_file_uri {
376        uri.strip_prefix("file://").unwrap_or(uri)
377    } else {
378        uri
379    };
380
381    let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
382    Ok((Arc::new(local_store), String::new(), uri.to_string()))
383}
384
385/// Helper function to create S3 object store with options
386fn create_s3_store(
387    uri: &str,
388    storage_options: Option<std::collections::HashMap<String, String>>,
389) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
390    let (url, path) = parse_url_and_path(uri)?;
391    let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
392
393    let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
394
395    // Apply storage options if provided
396    if let Some(options) = storage_options {
397        for (key, value) in options {
398            match key.as_str() {
399                "endpoint_url" => {
400                    builder = builder.with_endpoint(&value);
401                }
402                "region" => {
403                    builder = builder.with_region(&value);
404                }
405                "access_key_id" | "key" => {
406                    builder = builder.with_access_key_id(&value);
407                }
408                "secret_access_key" | "secret" => {
409                    builder = builder.with_secret_access_key(&value);
410                }
411                "session_token" | "token" => {
412                    builder = builder.with_token(&value);
413                }
414                "allow_http" => {
415                    let allow_http = value.to_lowercase() == "true";
416                    builder = builder.with_allow_http(allow_http);
417                }
418                _ => {
419                    // Ignore unknown options for forward compatibility
420                    log::warn!("Unknown S3 storage option: {key}");
421                }
422            }
423        }
424    }
425
426    let s3_store = builder.build()?;
427    Ok((Arc::new(s3_store), path, uri.to_string()))
428}
429
430/// Helper function to create GCS object store with options
431fn create_gcs_store(
432    uri: &str,
433    storage_options: Option<std::collections::HashMap<String, String>>,
434) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
435    let (url, path) = parse_url_and_path(uri)?;
436    let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
437
438    let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
439
440    // Apply storage options if provided
441    if let Some(options) = storage_options {
442        for (key, value) in options {
443            match key.as_str() {
444                "service_account_path" => {
445                    builder = builder.with_service_account_path(&value);
446                }
447                "service_account_key" => {
448                    builder = builder.with_service_account_key(&value);
449                }
450                "project_id" => {
451                    // Note: GoogleCloudStorageBuilder doesn't have with_project_id method
452                    // This would need to be handled via environment variables or service account
453                    log::warn!(
454                        "project_id should be set via service account or environment variables"
455                    );
456                }
457                "application_credentials" => {
458                    // Set GOOGLE_APPLICATION_CREDENTIALS env var required by Google auth libraries.
459                    // SAFETY: std::env::set_var is marked unsafe because it mutates global state and
460                    // can break signal-safe code. We only call it during configuration before any
461                    // multi-threaded work starts, so it is considered safe in this context.
462                    unsafe {
463                        std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
464                    }
465                }
466                _ => {
467                    // Ignore unknown options for forward compatibility
468                    log::warn!("Unknown GCS storage option: {key}");
469                }
470            }
471        }
472    }
473
474    let gcs_store = builder.build()?;
475    Ok((Arc::new(gcs_store), path, uri.to_string()))
476}
477
478/// Helper function to create Azure object store with options
479fn create_azure_store(
480    uri: &str,
481    storage_options: Option<std::collections::HashMap<String, String>>,
482) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
483    let (url, _) = parse_url_and_path(uri)?;
484    let account = extract_host(&url, "Invalid Azure URI: missing account")?;
485
486    let path_segments: Vec<&str> = url.path().trim_start_matches('/').split('/').collect();
487    if path_segments.is_empty() || path_segments[0].is_empty() {
488        anyhow::bail!("Invalid Azure URI: missing container");
489    }
490
491    let container = path_segments[0];
492    let path = if path_segments.len() > 1 {
493        path_segments[1..].join("/")
494    } else {
495        String::new()
496    };
497
498    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
499        .with_account(&account)
500        .with_container_name(container);
501
502    // Apply storage options if provided
503    if let Some(options) = storage_options {
504        for (key, value) in options {
505            match key.as_str() {
506                "account_name" => {
507                    builder = builder.with_account(&value);
508                }
509                "account_key" => {
510                    builder = builder.with_access_key(&value);
511                }
512                "sas_token" => {
513                    // Parse SAS token as query string parameters
514                    let query_pairs: Vec<(String, String)> = value
515                        .split('&')
516                        .filter_map(|pair| {
517                            let mut parts = pair.split('=');
518                            match (parts.next(), parts.next()) {
519                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
520                                _ => None,
521                            }
522                        })
523                        .collect();
524                    builder = builder.with_sas_authorization(query_pairs);
525                }
526                "client_id" => {
527                    builder = builder.with_client_id(&value);
528                }
529                "client_secret" => {
530                    builder = builder.with_client_secret(&value);
531                }
532                "tenant_id" => {
533                    builder = builder.with_tenant_id(&value);
534                }
535                _ => {
536                    // Ignore unknown options for forward compatibility
537                    log::warn!("Unknown Azure storage option: {key}");
538                }
539            }
540        }
541    }
542
543    let azure_store = builder.build()?;
544    Ok((Arc::new(azure_store), path, uri.to_string()))
545}
546
547/// Helper function to create Azure object store from abfs:// URI with options.
548fn create_abfs_store(
549    uri: &str,
550    storage_options: Option<std::collections::HashMap<String, String>>,
551) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
552    let (url, path) = parse_url_and_path(uri)?;
553    let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
554
555    // Extract account from host (account.dfs.core.windows.net)
556    let account = host
557        .split('.')
558        .next()
559        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
560
561    // Extract container from username part
562    let container = url
563        .username()
564        .split('@')
565        .next()
566        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
567
568    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
569        .with_account(account)
570        .with_container_name(container);
571
572    // Apply storage options if provided (same as Azure store)
573    if let Some(options) = storage_options {
574        for (key, value) in options {
575            match key.as_str() {
576                "account_name" => {
577                    builder = builder.with_account(&value);
578                }
579                "account_key" => {
580                    builder = builder.with_access_key(&value);
581                }
582                "sas_token" => {
583                    // Parse SAS token as query string parameters
584                    let query_pairs: Vec<(String, String)> = value
585                        .split('&')
586                        .filter_map(|pair| {
587                            let mut parts = pair.split('=');
588                            match (parts.next(), parts.next()) {
589                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
590                                _ => None,
591                            }
592                        })
593                        .collect();
594                    builder = builder.with_sas_authorization(query_pairs);
595                }
596                "client_id" => {
597                    builder = builder.with_client_id(&value);
598                }
599                "client_secret" => {
600                    builder = builder.with_client_secret(&value);
601                }
602                "tenant_id" => {
603                    builder = builder.with_tenant_id(&value);
604                }
605                _ => {
606                    // Ignore unknown options for forward compatibility
607                    log::warn!("Unknown ABFS storage option: {key}");
608                }
609            }
610        }
611    }
612
613    let azure_store = builder.build()?;
614    Ok((Arc::new(azure_store), path, uri.to_string()))
615}
616
617/// Helper function to create HTTP object store with options.
618fn create_http_store(
619    uri: &str,
620    storage_options: Option<std::collections::HashMap<String, String>>,
621) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
622    let (url, path) = parse_url_and_path(uri)?;
623    let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
624
625    let builder = object_store::http::HttpBuilder::new().with_url(base_url);
626
627    // Apply storage options if provided
628    if let Some(options) = storage_options {
629        for (key, _value) in options {
630            // HTTP builder has limited configuration options
631            // Most HTTP-specific options would be handled via client options
632            // Ignore unknown options for forward compatibility
633            log::warn!("Unknown HTTP storage option: {key}");
634        }
635    }
636
637    let http_store = builder.build()?;
638    Ok((Arc::new(http_store), path, uri.to_string()))
639}
640
641/// Helper function to parse URL and extract path component.
642fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
643    let url = url::Url::parse(uri)?;
644    let path = url.path().trim_start_matches('/').to_string();
645    Ok((url, path))
646}
647
648/// Helper function to extract host from URL with error handling.
649fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
650    url.host_str()
651        .map(std::string::ToString::to_string)
652        .ok_or_else(|| anyhow::anyhow!("{}", error_msg))
653}
654
655////////////////////////////////////////////////////////////////////////////////
656// Tests
657////////////////////////////////////////////////////////////////////////////////
658
659#[cfg(test)]
660mod tests {
661    use std::collections::HashMap;
662
663    use super::*;
664
665    #[test]
666    fn test_create_object_store_from_path_local() {
667        // Create a temporary directory for testing
668        let temp_dir = std::env::temp_dir().join("nautilus_test");
669        std::fs::create_dir_all(&temp_dir).unwrap();
670
671        let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
672        if let Err(e) = &result {
673            println!("Error: {e:?}");
674        }
675        assert!(result.is_ok());
676        let (_, base_path, uri) = result.unwrap();
677        assert_eq!(base_path, "");
678        // The URI should be normalized to file:// format
679        assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
680
681        // Clean up
682        std::fs::remove_dir_all(&temp_dir).ok();
683    }
684
685    #[test]
686    fn test_create_object_store_from_path_s3() {
687        let mut options = HashMap::new();
688        options.insert(
689            "endpoint_url".to_string(),
690            "https://test.endpoint.com".to_string(),
691        );
692        options.insert("region".to_string(), "us-west-2".to_string());
693        options.insert("access_key_id".to_string(), "test_key".to_string());
694        options.insert("secret_access_key".to_string(), "test_secret".to_string());
695
696        let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
697        assert!(result.is_ok());
698        let (_, base_path, uri) = result.unwrap();
699        assert_eq!(base_path, "path");
700        assert_eq!(uri, "s3://test-bucket/path");
701    }
702
703    #[test]
704    fn test_create_object_store_from_path_azure() {
705        let mut options = HashMap::new();
706        options.insert("account_name".to_string(), "testaccount".to_string());
707        // Use a valid base64 encoded key for testing
708        options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); // "testkey" in base64
709
710        let result =
711            create_object_store_from_path("azure://testaccount/container/path", Some(options));
712        if let Err(e) = &result {
713            println!("Azure Error: {e:?}");
714        }
715        assert!(result.is_ok());
716        let (_, base_path, uri) = result.unwrap();
717        assert_eq!(base_path, "path");
718        assert_eq!(uri, "azure://testaccount/container/path");
719    }
720
721    #[test]
722    fn test_create_object_store_from_path_gcs() {
723        // Test GCS without service account (will use default credentials or fail gracefully)
724        let mut options = HashMap::new();
725        options.insert("project_id".to_string(), "test-project".to_string());
726
727        let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
728        // GCS might fail due to missing credentials, but we're testing the path parsing
729        // The function should at least parse the URI correctly before failing on auth
730        match result {
731            Ok((_, base_path, uri)) => {
732                assert_eq!(base_path, "path");
733                assert_eq!(uri, "gs://test-bucket/path");
734            }
735            Err(e) => {
736                // Expected to fail due to missing credentials, but should contain bucket info
737                let error_msg = format!("{e:?}");
738                assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
739            }
740        }
741    }
742
743    #[test]
744    fn test_create_object_store_from_path_empty_options() {
745        let result = create_object_store_from_path("s3://test-bucket/path", None);
746        assert!(result.is_ok());
747        let (_, base_path, uri) = result.unwrap();
748        assert_eq!(base_path, "path");
749        assert_eq!(uri, "s3://test-bucket/path");
750    }
751
752    #[test]
753    fn test_parse_url_and_path() {
754        let result = parse_url_and_path("s3://bucket/path/to/file");
755        assert!(result.is_ok());
756        let (url, path) = result.unwrap();
757        assert_eq!(url.scheme(), "s3");
758        assert_eq!(url.host_str().unwrap(), "bucket");
759        assert_eq!(path, "path/to/file");
760    }
761
762    #[test]
763    fn test_extract_host() {
764        let url = url::Url::parse("s3://test-bucket/path").unwrap();
765        let result = extract_host(&url, "Test error");
766        assert!(result.is_ok());
767        assert_eq!(result.unwrap(), "test-bucket");
768    }
769
770    #[test]
771    fn test_normalize_path_to_uri() {
772        assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
773        assert_eq!(
774            normalize_path_to_uri("s3://bucket/path"),
775            "s3://bucket/path"
776        );
777        assert_eq!(
778            normalize_path_to_uri("file:///tmp/test"),
779            "file:///tmp/test"
780        );
781    }
782}