nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Posei types.
17
18pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28    collections::HashMap,
29    io::{self, Write},
30};
31
32use arrow::{
33    array::{Array, ArrayRef},
34    datatypes::{DataType, Schema},
35    error::ArrowError,
36    ipc::writer::StreamWriter,
37    record_batch::RecordBatch,
38};
39use nautilus_model::{
40    data::{
41        Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42        delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43    },
44    types::{price::PriceRaw, quantity::QuantityRaw},
45};
46#[cfg(feature = "python")]
47use pyo3::prelude::*;
48
49// Define metadata key constants constants
50const KEY_BAR_TYPE: &str = "bar_type";
51pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
52const KEY_PRICE_PRECISION: &str = "price_precision";
53const KEY_SIZE_PRECISION: &str = "size_precision";
54
55#[derive(thiserror::Error, Debug)]
56pub enum DataStreamingError {
57    #[error("I/O error: {0}")]
58    IoError(#[from] io::Error),
59    #[error("Arrow error: {0}")]
60    ArrowError(#[from] arrow::error::ArrowError),
61    #[cfg(feature = "python")]
62    #[error("Python error: {0}")]
63    PythonError(#[from] PyErr),
64}
65
66#[derive(thiserror::Error, Debug)]
67pub enum EncodingError {
68    #[error("Empty data")]
69    EmptyData,
70    #[error("Missing metadata key: `{0}`")]
71    MissingMetadata(&'static str),
72    #[error("Missing data column: `{0}` at index {1}")]
73    MissingColumn(&'static str, usize),
74    #[error("Error parsing `{0}`: {1}")]
75    ParseError(&'static str, String),
76    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
77    InvalidColumnType(&'static str, usize, DataType, DataType),
78    #[error("Arrow error: {0}")]
79    ArrowError(#[from] arrow::error::ArrowError),
80}
81
82#[inline]
83fn get_raw_price(bytes: &[u8]) -> PriceRaw {
84    PriceRaw::from_le_bytes(bytes.try_into().unwrap())
85}
86
87#[inline]
88fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
89    QuantityRaw::from_le_bytes(bytes.try_into().unwrap())
90}
91
92pub trait ArrowSchemaProvider {
93    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
94
95    #[must_use]
96    fn get_schema_map() -> HashMap<String, String> {
97        let schema = Self::get_schema(None);
98        let mut map = HashMap::new();
99        for field in schema.fields() {
100            let name = field.name().to_string();
101            let data_type = format!("{:?}", field.data_type());
102            map.insert(name, data_type);
103        }
104        map
105    }
106}
107
108pub trait EncodeToRecordBatch
109where
110    Self: Sized + ArrowSchemaProvider,
111{
112    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
113    ///
114    /// # Errors
115    ///
116    /// Returns an `ArrowError` if the encoding fails.
117    fn encode_batch(
118        metadata: &HashMap<String, String>,
119        data: &[Self],
120    ) -> Result<RecordBatch, ArrowError>;
121
122    fn metadata(&self) -> HashMap<String, String>;
123
124    /// Returns the metadata for the first element in a chunk.
125    ///
126    /// # Panics
127    ///
128    /// Panics if `chunk` is empty.
129    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
130        chunk
131            .first()
132            .map(|elem| elem.metadata())
133            .expect("Chunk must have atleast one element to encode")
134    }
135}
136
137pub trait DecodeFromRecordBatch
138where
139    Self: Sized + Into<Data> + ArrowSchemaProvider,
140{
141    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
142    ///
143    /// # Errors
144    ///
145    /// Returns an `EncodingError` if the decoding fails.
146    fn decode_batch(
147        metadata: &HashMap<String, String>,
148        record_batch: RecordBatch,
149    ) -> Result<Vec<Self>, EncodingError>;
150}
151
152pub trait DecodeDataFromRecordBatch
153where
154    Self: Sized + Into<Data> + ArrowSchemaProvider,
155{
156    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
157    ///
158    /// # Errors
159    ///
160    /// Returns an `EncodingError` if the decoding fails.
161    fn decode_data_batch(
162        metadata: &HashMap<String, String>,
163        record_batch: RecordBatch,
164    ) -> Result<Vec<Data>, EncodingError>;
165}
166
167pub trait WriteStream {
168    /// Writes a `RecordBatch` to the implementing output stream.
169    ///
170    /// # Errors
171    ///
172    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
173    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
174}
175
176impl<T: EncodeToRecordBatch + Write> WriteStream for T {
177    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
178        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
179        writer.write(record_batch)?;
180        writer.finish()?;
181        Ok(())
182    }
183}
184
185/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
186///
187/// # Errors
188///
189/// Returns an error if:
190/// - `column_index` is out of range: `EncodingError::MissingColumn`.
191/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
192pub fn extract_column<'a, T: Array + 'static>(
193    cols: &'a [ArrayRef],
194    column_key: &'static str,
195    column_index: usize,
196    expected_type: DataType,
197) -> Result<&'a T, EncodingError> {
198    let column_values = cols
199        .get(column_index)
200        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
201    let downcasted_values =
202        column_values
203            .as_any()
204            .downcast_ref::<T>()
205            .ok_or(EncodingError::InvalidColumnType(
206                column_key,
207                column_index,
208                expected_type,
209                column_values.data_type().clone(),
210            ))?;
211    Ok(downcasted_values)
212}
213
214/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
215///
216/// # Errors
217///
218/// Returns an error if:
219/// - `data` is empty: `EncodingError::EmptyData`.
220/// - Encoding fails: `EncodingError::ArrowError`.
221pub fn book_deltas_to_arrow_record_batch_bytes(
222    data: Vec<OrderBookDelta>,
223) -> Result<RecordBatch, EncodingError> {
224    if data.is_empty() {
225        return Err(EncodingError::EmptyData);
226    }
227
228    // Extract metadata from chunk
229    let metadata = OrderBookDelta::chunk_metadata(&data);
230    OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
231}
232
233/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
234///
235/// # Errors
236///
237/// Returns an error if:
238/// - `data` is empty: `EncodingError::EmptyData`.
239/// - Encoding fails: `EncodingError::ArrowError`.
240///
241/// # Panics
242///
243/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
244pub fn book_depth10_to_arrow_record_batch_bytes(
245    data: Vec<OrderBookDepth10>,
246) -> Result<RecordBatch, EncodingError> {
247    if data.is_empty() {
248        return Err(EncodingError::EmptyData);
249    }
250
251    // Take first element and extract metadata
252    // SAFETY: Unwrap safe as already checked that `data` not empty
253    let first = data.first().unwrap();
254    let metadata = first.metadata();
255    OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
256}
257
258/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
259///
260/// # Errors
261///
262/// Returns an error if:
263/// - `data` is empty: `EncodingError::EmptyData`.
264/// - Encoding fails: `EncodingError::ArrowError`.
265///
266/// # Panics
267///
268/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
269pub fn quotes_to_arrow_record_batch_bytes(
270    data: Vec<QuoteTick>,
271) -> Result<RecordBatch, EncodingError> {
272    if data.is_empty() {
273        return Err(EncodingError::EmptyData);
274    }
275
276    // Take first element and extract metadata
277    // SAFETY: Unwrap safe as already checked that `data` not empty
278    let first = data.first().unwrap();
279    let metadata = first.metadata();
280    QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
281}
282
283/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
284///
285/// # Errors
286///
287/// Returns an error if:
288/// - `data` is empty: `EncodingError::EmptyData`.
289/// - Encoding fails: `EncodingError::ArrowError`.
290///
291/// # Panics
292///
293/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
294pub fn trades_to_arrow_record_batch_bytes(
295    data: Vec<TradeTick>,
296) -> Result<RecordBatch, EncodingError> {
297    if data.is_empty() {
298        return Err(EncodingError::EmptyData);
299    }
300
301    // Take first element and extract metadata
302    // SAFETY: Unwrap safe as already checked that `data` not empty
303    let first = data.first().unwrap();
304    let metadata = first.metadata();
305    TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
306}
307
308/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
309///
310/// # Errors
311///
312/// Returns an error if:
313/// - `data` is empty: `EncodingError::EmptyData`.
314/// - Encoding fails: `EncodingError::ArrowError`.
315///
316/// # Panics
317///
318/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
319pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
320    if data.is_empty() {
321        return Err(EncodingError::EmptyData);
322    }
323
324    // Take first element and extract metadata
325    // SAFETY: Unwrap safe as already checked that `data` not empty
326    let first = data.first().unwrap();
327    let metadata = first.metadata();
328    Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
329}
330
331/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
332///
333/// # Errors
334///
335/// Returns an error if:
336/// - `data` is empty: `EncodingError::EmptyData`.
337/// - Encoding fails: `EncodingError::ArrowError`.
338///
339/// # Panics
340///
341/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
342pub fn mark_prices_to_arrow_record_batch_bytes(
343    data: Vec<MarkPriceUpdate>,
344) -> Result<RecordBatch, EncodingError> {
345    if data.is_empty() {
346        return Err(EncodingError::EmptyData);
347    }
348
349    // Take first element and extract metadata
350    // SAFETY: Unwrap safe as already checked that `data` not empty
351    let first = data.first().unwrap();
352    let metadata = first.metadata();
353    MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
354}
355
356/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
357///
358/// # Errors
359///
360/// Returns an error if:
361/// - `data` is empty: `EncodingError::EmptyData`.
362/// - Encoding fails: `EncodingError::ArrowError`.
363///
364/// # Panics
365///
366/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
367pub fn index_prices_to_arrow_record_batch_bytes(
368    data: Vec<IndexPriceUpdate>,
369) -> Result<RecordBatch, EncodingError> {
370    if data.is_empty() {
371        return Err(EncodingError::EmptyData);
372    }
373
374    // Take first element and extract metadata
375    // SAFETY: Unwrap safe as already checked that `data` not empty
376    let first = data.first().unwrap();
377    let metadata = first.metadata();
378    IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
379}
380
381/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
382///
383/// # Errors
384///
385/// Returns an error if:
386/// - `data` is empty: `EncodingError::EmptyData`.
387/// - Encoding fails: `EncodingError::ArrowError`.
388///
389/// # Panics
390///
391/// Panics if `data` is empty (after the explicit empty check, unwrap is safe).
392pub fn instrument_closes_to_arrow_record_batch_bytes(
393    data: Vec<InstrumentClose>,
394) -> Result<RecordBatch, EncodingError> {
395    if data.is_empty() {
396        return Err(EncodingError::EmptyData);
397    }
398
399    // Take first element and extract metadata
400    // SAFETY: Unwrap safe as already checked that `data` not empty
401    let first = data.first().unwrap();
402    let metadata = first.metadata();
403    InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
404}