nautilus_serialization/arrow/
mod.rs1pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28 collections::HashMap,
29 io::{self, Write},
30};
31
32use arrow::{
33 array::{Array, ArrayRef},
34 datatypes::{DataType, Schema},
35 error::ArrowError,
36 ipc::writer::StreamWriter,
37 record_batch::RecordBatch,
38};
39use nautilus_model::{
40 data::{
41 Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42 delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43 },
44 types::{price::PriceRaw, quantity::QuantityRaw},
45};
46#[cfg(feature = "python")]
47use pyo3::prelude::*;
48
49const KEY_BAR_TYPE: &str = "bar_type";
51pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
52const KEY_PRICE_PRECISION: &str = "price_precision";
53const KEY_SIZE_PRECISION: &str = "size_precision";
54
55#[derive(thiserror::Error, Debug)]
56pub enum DataStreamingError {
57 #[error("I/O error: {0}")]
58 IoError(#[from] io::Error),
59 #[error("Arrow error: {0}")]
60 ArrowError(#[from] arrow::error::ArrowError),
61 #[cfg(feature = "python")]
62 #[error("Python error: {0}")]
63 PythonError(#[from] PyErr),
64}
65
66#[derive(thiserror::Error, Debug)]
67pub enum EncodingError {
68 #[error("Empty data")]
69 EmptyData,
70 #[error("Missing metadata key: `{0}`")]
71 MissingMetadata(&'static str),
72 #[error("Missing data column: `{0}` at index {1}")]
73 MissingColumn(&'static str, usize),
74 #[error("Error parsing `{0}`: {1}")]
75 ParseError(&'static str, String),
76 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
77 InvalidColumnType(&'static str, usize, DataType, DataType),
78 #[error("Arrow error: {0}")]
79 ArrowError(#[from] arrow::error::ArrowError),
80}
81
82#[inline]
83fn get_raw_price(bytes: &[u8]) -> PriceRaw {
84 PriceRaw::from_le_bytes(bytes.try_into().unwrap())
85}
86
87#[inline]
88fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
89 QuantityRaw::from_le_bytes(bytes.try_into().unwrap())
90}
91
92pub trait ArrowSchemaProvider {
94 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
96
97 #[must_use]
99 fn get_schema_map() -> HashMap<String, String> {
100 let schema = Self::get_schema(None);
101 let mut map = HashMap::new();
102 for field in schema.fields() {
103 let name = field.name().to_string();
104 let data_type = format!("{:?}", field.data_type());
105 map.insert(name, data_type);
106 }
107 map
108 }
109}
110
111pub trait EncodeToRecordBatch
113where
114 Self: Sized + ArrowSchemaProvider,
115{
116 fn encode_batch(
122 metadata: &HashMap<String, String>,
123 data: &[Self],
124 ) -> Result<RecordBatch, ArrowError>;
125
126 fn metadata(&self) -> HashMap<String, String>;
128
129 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
135 chunk
136 .first()
137 .map(|elem| elem.metadata())
138 .expect("Chunk must have atleast one element to encode")
139 }
140}
141
142pub trait DecodeFromRecordBatch
144where
145 Self: Sized + Into<Data> + ArrowSchemaProvider,
146{
147 fn decode_batch(
153 metadata: &HashMap<String, String>,
154 record_batch: RecordBatch,
155 ) -> Result<Vec<Self>, EncodingError>;
156}
157
158pub trait DecodeDataFromRecordBatch
160where
161 Self: Sized + Into<Data> + ArrowSchemaProvider,
162{
163 fn decode_data_batch(
169 metadata: &HashMap<String, String>,
170 record_batch: RecordBatch,
171 ) -> Result<Vec<Data>, EncodingError>;
172}
173
174pub trait WriteStream {
176 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
182}
183
184impl<T: EncodeToRecordBatch + Write> WriteStream for T {
185 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
186 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
187 writer.write(record_batch)?;
188 writer.finish()?;
189 Ok(())
190 }
191}
192
193pub fn extract_column<'a, T: Array + 'static>(
201 cols: &'a [ArrayRef],
202 column_key: &'static str,
203 column_index: usize,
204 expected_type: DataType,
205) -> Result<&'a T, EncodingError> {
206 let column_values = cols
207 .get(column_index)
208 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
209 let downcasted_values =
210 column_values
211 .as_any()
212 .downcast_ref::<T>()
213 .ok_or(EncodingError::InvalidColumnType(
214 column_key,
215 column_index,
216 expected_type,
217 column_values.data_type().clone(),
218 ))?;
219 Ok(downcasted_values)
220}
221
222pub fn book_deltas_to_arrow_record_batch_bytes(
230 data: Vec<OrderBookDelta>,
231) -> Result<RecordBatch, EncodingError> {
232 if data.is_empty() {
233 return Err(EncodingError::EmptyData);
234 }
235
236 let metadata = OrderBookDelta::chunk_metadata(&data);
238 OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
239}
240
241pub fn book_depth10_to_arrow_record_batch_bytes(
253 data: Vec<OrderBookDepth10>,
254) -> Result<RecordBatch, EncodingError> {
255 if data.is_empty() {
256 return Err(EncodingError::EmptyData);
257 }
258
259 let first = data.first().unwrap();
262 let metadata = first.metadata();
263 OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
264}
265
266pub fn quotes_to_arrow_record_batch_bytes(
278 data: Vec<QuoteTick>,
279) -> Result<RecordBatch, EncodingError> {
280 if data.is_empty() {
281 return Err(EncodingError::EmptyData);
282 }
283
284 let first = data.first().unwrap();
287 let metadata = first.metadata();
288 QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
289}
290
291pub fn trades_to_arrow_record_batch_bytes(
303 data: Vec<TradeTick>,
304) -> Result<RecordBatch, EncodingError> {
305 if data.is_empty() {
306 return Err(EncodingError::EmptyData);
307 }
308
309 let first = data.first().unwrap();
312 let metadata = first.metadata();
313 TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
314}
315
316pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
328 if data.is_empty() {
329 return Err(EncodingError::EmptyData);
330 }
331
332 let first = data.first().unwrap();
335 let metadata = first.metadata();
336 Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
337}
338
339pub fn mark_prices_to_arrow_record_batch_bytes(
351 data: Vec<MarkPriceUpdate>,
352) -> Result<RecordBatch, EncodingError> {
353 if data.is_empty() {
354 return Err(EncodingError::EmptyData);
355 }
356
357 let first = data.first().unwrap();
360 let metadata = first.metadata();
361 MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
362}
363
364pub fn index_prices_to_arrow_record_batch_bytes(
376 data: Vec<IndexPriceUpdate>,
377) -> Result<RecordBatch, EncodingError> {
378 if data.is_empty() {
379 return Err(EncodingError::EmptyData);
380 }
381
382 let first = data.first().unwrap();
385 let metadata = first.metadata();
386 IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
387}
388
389pub fn instrument_closes_to_arrow_record_batch_bytes(
401 data: Vec<InstrumentClose>,
402) -> Result<RecordBatch, EncodingError> {
403 if data.is_empty() {
404 return Err(EncodingError::EmptyData);
405 }
406
407 let first = data.first().unwrap();
410 let metadata = first.metadata();
411 InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
412}