nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Posei types.
17
18pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28    collections::HashMap,
29    io::{self, Write},
30};
31
32use arrow::{
33    array::{Array, ArrayRef},
34    datatypes::{DataType, Schema},
35    error::ArrowError,
36    ipc::writer::StreamWriter,
37    record_batch::RecordBatch,
38};
39use nautilus_model::{
40    data::{
41        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43    },
44    types::{price::PriceRaw, quantity::QuantityRaw},
45};
46#[cfg(feature = "python")]
47use pyo3::prelude::*;
48
49// Define metadata key constants constants
50const KEY_BAR_TYPE: &str = "bar_type";
51pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
52const KEY_PRICE_PRECISION: &str = "price_precision";
53const KEY_SIZE_PRECISION: &str = "size_precision";
54
55#[derive(thiserror::Error, Debug)]
56pub enum DataStreamingError {
57    #[error("I/O error: {0}")]
58    IoError(#[from] io::Error),
59    #[error("Arrow error: {0}")]
60    ArrowError(#[from] arrow::error::ArrowError),
61    #[cfg(feature = "python")]
62    #[error("Python error: {0}")]
63    PythonError(#[from] PyErr),
64}
65
66#[derive(thiserror::Error, Debug)]
67pub enum EncodingError {
68    #[error("Empty data")]
69    EmptyData,
70    #[error("Missing metadata key: `{0}`")]
71    MissingMetadata(&'static str),
72    #[error("Missing data column: `{0}` at index {1}")]
73    MissingColumn(&'static str, usize),
74    #[error("Error parsing `{0}`: {1}")]
75    ParseError(&'static str, String),
76    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
77    InvalidColumnType(&'static str, usize, DataType, DataType),
78    #[error("Arrow error: {0}")]
79    ArrowError(#[from] arrow::error::ArrowError),
80}
81
82#[inline]
83fn get_raw_price(bytes: &[u8]) -> PriceRaw {
84    PriceRaw::from_le_bytes(bytes.try_into().unwrap())
85}
86
87#[inline]
88fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
89    QuantityRaw::from_le_bytes(bytes.try_into().unwrap())
90}
91
92/// Provides Apache Arrow schema definitions for data types.
93pub trait ArrowSchemaProvider {
94    /// Returns the Arrow schema for this type with optional metadata.
95    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
96
97    /// Returns a map of field names to their Arrow data types.
98    #[must_use]
99    fn get_schema_map() -> HashMap<String, String> {
100        let schema = Self::get_schema(None);
101        let mut map = HashMap::new();
102        for field in schema.fields() {
103            let name = field.name().to_string();
104            let data_type = format!("{:?}", field.data_type());
105            map.insert(name, data_type);
106        }
107        map
108    }
109}
110
111/// Encodes data types to Apache Arrow RecordBatch format.
112pub trait EncodeToRecordBatch
113where
114    Self: Sized + ArrowSchemaProvider,
115{
116    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
117    ///
118    /// # Errors
119    ///
120    /// Returns an `ArrowError` if the encoding fails.
121    fn encode_batch(
122        metadata: &HashMap<String, String>,
123        data: &[Self],
124    ) -> Result<RecordBatch, ArrowError>;
125
126    /// Returns the metadata for this data element.
127    fn metadata(&self) -> HashMap<String, String>;
128
129    /// Returns the metadata for the first element in a chunk.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `chunk` is empty.
134    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
135        chunk
136            .first()
137            .map(|elem| elem.metadata())
138            .expect("Chunk must have atleast one element to encode")
139    }
140}
141
142/// Decodes data types from Apache Arrow RecordBatch format.
143pub trait DecodeFromRecordBatch
144where
145    Self: Sized + Into<Data> + ArrowSchemaProvider,
146{
147    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
148    ///
149    /// # Errors
150    ///
151    /// Returns an `EncodingError` if the decoding fails.
152    fn decode_batch(
153        metadata: &HashMap<String, String>,
154        record_batch: RecordBatch,
155    ) -> Result<Vec<Self>, EncodingError>;
156}
157
158/// Decodes raw Data objects from Apache Arrow RecordBatch format.
159pub trait DecodeDataFromRecordBatch
160where
161    Self: Sized + Into<Data> + ArrowSchemaProvider,
162{
163    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
164    ///
165    /// # Errors
166    ///
167    /// Returns an `EncodingError` if the decoding fails.
168    fn decode_data_batch(
169        metadata: &HashMap<String, String>,
170        record_batch: RecordBatch,
171    ) -> Result<Vec<Data>, EncodingError>;
172}
173
174/// Writes RecordBatch data to output streams.
175pub trait WriteStream {
176    /// Writes a `RecordBatch` to the implementing output stream.
177    ///
178    /// # Errors
179    ///
180    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
181    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
182}
183
184impl<T: EncodeToRecordBatch + Write> WriteStream for T {
185    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
186        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
187        writer.write(record_batch)?;
188        writer.finish()?;
189        Ok(())
190    }
191}
192
193/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
194///
195/// # Errors
196///
197/// Returns an error if:
198/// - `column_index` is out of range: `EncodingError::MissingColumn`.
199/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
200pub fn extract_column<'a, T: Array + 'static>(
201    cols: &'a [ArrayRef],
202    column_key: &'static str,
203    column_index: usize,
204    expected_type: DataType,
205) -> Result<&'a T, EncodingError> {
206    let column_values = cols
207        .get(column_index)
208        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
209    let downcasted_values =
210        column_values
211            .as_any()
212            .downcast_ref::<T>()
213            .ok_or(EncodingError::InvalidColumnType(
214                column_key,
215                column_index,
216                expected_type,
217                column_values.data_type().clone(),
218            ))?;
219    Ok(downcasted_values)
220}
221
222/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
223///
224/// # Errors
225///
226/// Returns an error if:
227/// - `data` is empty: `EncodingError::EmptyData`.
228/// - Encoding fails: `EncodingError::ArrowError`.
229pub fn book_deltas_to_arrow_record_batch_bytes(
230    data: Vec<OrderBookDelta>,
231) -> Result<RecordBatch, EncodingError> {
232    if data.is_empty() {
233        return Err(EncodingError::EmptyData);
234    }
235
236    // Extract metadata from chunk
237    let metadata = OrderBookDelta::chunk_metadata(&data);
238    OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
239}
240
241/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
242///
243/// # Errors
244///
245/// Returns an error if:
246/// - `data` is empty: `EncodingError::EmptyData`.
247/// - Encoding fails: `EncodingError::ArrowError`.
248///
249/// # Panics
250///
251/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
252pub fn book_depth10_to_arrow_record_batch_bytes(
253    data: Vec<OrderBookDepth10>,
254) -> Result<RecordBatch, EncodingError> {
255    if data.is_empty() {
256        return Err(EncodingError::EmptyData);
257    }
258
259    // Take first element and extract metadata
260    // SAFETY: Unwrap safe as already checked that `data` not empty
261    let first = data.first().unwrap();
262    let metadata = first.metadata();
263    OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
264}
265
266/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
267///
268/// # Errors
269///
270/// Returns an error if:
271/// - `data` is empty: `EncodingError::EmptyData`.
272/// - Encoding fails: `EncodingError::ArrowError`.
273///
274/// # Panics
275///
276/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
277pub fn quotes_to_arrow_record_batch_bytes(
278    data: Vec<QuoteTick>,
279) -> Result<RecordBatch, EncodingError> {
280    if data.is_empty() {
281        return Err(EncodingError::EmptyData);
282    }
283
284    // Take first element and extract metadata
285    // SAFETY: Unwrap safe as already checked that `data` not empty
286    let first = data.first().unwrap();
287    let metadata = first.metadata();
288    QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
289}
290
291/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
292///
293/// # Errors
294///
295/// Returns an error if:
296/// - `data` is empty: `EncodingError::EmptyData`.
297/// - Encoding fails: `EncodingError::ArrowError`.
298///
299/// # Panics
300///
301/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
302pub fn trades_to_arrow_record_batch_bytes(
303    data: Vec<TradeTick>,
304) -> Result<RecordBatch, EncodingError> {
305    if data.is_empty() {
306        return Err(EncodingError::EmptyData);
307    }
308
309    // Take first element and extract metadata
310    // SAFETY: Unwrap safe as already checked that `data` not empty
311    let first = data.first().unwrap();
312    let metadata = first.metadata();
313    TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
314}
315
316/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
317///
318/// # Errors
319///
320/// Returns an error if:
321/// - `data` is empty: `EncodingError::EmptyData`.
322/// - Encoding fails: `EncodingError::ArrowError`.
323///
324/// # Panics
325///
326/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
327pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
328    if data.is_empty() {
329        return Err(EncodingError::EmptyData);
330    }
331
332    // Take first element and extract metadata
333    // SAFETY: Unwrap safe as already checked that `data` not empty
334    let first = data.first().unwrap();
335    let metadata = first.metadata();
336    Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
337}
338
339/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
340///
341/// # Errors
342///
343/// Returns an error if:
344/// - `data` is empty: `EncodingError::EmptyData`.
345/// - Encoding fails: `EncodingError::ArrowError`.
346///
347/// # Panics
348///
349/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
350pub fn mark_prices_to_arrow_record_batch_bytes(
351    data: Vec<MarkPriceUpdate>,
352) -> Result<RecordBatch, EncodingError> {
353    if data.is_empty() {
354        return Err(EncodingError::EmptyData);
355    }
356
357    // Take first element and extract metadata
358    // SAFETY: Unwrap safe as already checked that `data` not empty
359    let first = data.first().unwrap();
360    let metadata = first.metadata();
361    MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
362}
363
364/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
365///
366/// # Errors
367///
368/// Returns an error if:
369/// - `data` is empty: `EncodingError::EmptyData`.
370/// - Encoding fails: `EncodingError::ArrowError`.
371///
372/// # Panics
373///
374/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
375pub fn index_prices_to_arrow_record_batch_bytes(
376    data: Vec<IndexPriceUpdate>,
377) -> Result<RecordBatch, EncodingError> {
378    if data.is_empty() {
379        return Err(EncodingError::EmptyData);
380    }
381
382    // Take first element and extract metadata
383    // SAFETY: Unwrap safe as already checked that `data` not empty
384    let first = data.first().unwrap();
385    let metadata = first.metadata();
386    IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
387}
388
389/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
390///
391/// # Errors
392///
393/// Returns an error if:
394/// - `data` is empty: `EncodingError::EmptyData`.
395/// - Encoding fails: `EncodingError::ArrowError`.
396///
397/// # Panics
398///
399/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
400pub fn instrument_closes_to_arrow_record_batch_bytes(
401    data: Vec<InstrumentClose>,
402) -> Result<RecordBatch, EncodingError> {
403    if data.is_empty() {
404        return Err(EncodingError::EmptyData);
405    }
406
407    // Take first element and extract metadata
408    // SAFETY: Unwrap safe as already checked that `data` not empty
409    let first = data.first().unwrap();
410    let metadata = first.metadata();
411    InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
412}