nautilus_serialization/arrow/
mod.rs1pub mod bar;
19pub mod close;
20pub mod delta;
21pub mod depth;
22pub mod index_price;
23pub mod mark_price;
24pub mod quote;
25pub mod trade;
26
27use std::{
28 collections::HashMap,
29 io::{self, Write},
30};
31
32use arrow::{
33 array::{Array, ArrayRef},
34 datatypes::{DataType, Schema},
35 error::ArrowError,
36 ipc::writer::StreamWriter,
37 record_batch::RecordBatch,
38};
39use nautilus_model::{
40 data::{
41 Data, IndexPriceUpdate, MarkPriceUpdate, bar::Bar, close::InstrumentClose,
42 delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick, trade::TradeTick,
43 },
44 types::{price::PriceRaw, quantity::QuantityRaw},
45};
46#[cfg(feature = "python")]
47use pyo3::prelude::*;
48
49const KEY_BAR_TYPE: &str = "bar_type";
51pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
52const KEY_PRICE_PRECISION: &str = "price_precision";
53const KEY_SIZE_PRECISION: &str = "size_precision";
54
55#[derive(thiserror::Error, Debug)]
56pub enum DataStreamingError {
57 #[error("I/O error: {0}")]
58 IoError(#[from] io::Error),
59 #[error("Arrow error: {0}")]
60 ArrowError(#[from] arrow::error::ArrowError),
61 #[cfg(feature = "python")]
62 #[error("Python error: {0}")]
63 PythonError(#[from] PyErr),
64}
65
66#[derive(thiserror::Error, Debug)]
67pub enum EncodingError {
68 #[error("Empty data")]
69 EmptyData,
70 #[error("Missing metadata key: `{0}`")]
71 MissingMetadata(&'static str),
72 #[error("Missing data column: `{0}` at index {1}")]
73 MissingColumn(&'static str, usize),
74 #[error("Error parsing `{0}`: {1}")]
75 ParseError(&'static str, String),
76 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
77 InvalidColumnType(&'static str, usize, DataType, DataType),
78 #[error("Arrow error: {0}")]
79 ArrowError(#[from] arrow::error::ArrowError),
80}
81
82#[inline]
83fn get_raw_price(bytes: &[u8]) -> PriceRaw {
84 PriceRaw::from_le_bytes(bytes.try_into().unwrap())
85}
86
87#[inline]
88fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
89 QuantityRaw::from_le_bytes(bytes.try_into().unwrap())
90}
91
92pub trait ArrowSchemaProvider {
93 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
94
95 #[must_use]
96 fn get_schema_map() -> HashMap<String, String> {
97 let schema = Self::get_schema(None);
98 let mut map = HashMap::new();
99 for field in schema.fields() {
100 let name = field.name().to_string();
101 let data_type = format!("{:?}", field.data_type());
102 map.insert(name, data_type);
103 }
104 map
105 }
106}
107
108pub trait EncodeToRecordBatch
109where
110 Self: Sized + ArrowSchemaProvider,
111{
112 fn encode_batch(
118 metadata: &HashMap<String, String>,
119 data: &[Self],
120 ) -> Result<RecordBatch, ArrowError>;
121
122 fn metadata(&self) -> HashMap<String, String>;
123
124 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
130 chunk
131 .first()
132 .map(|elem| elem.metadata())
133 .expect("Chunk must have atleast one element to encode")
134 }
135}
136
137pub trait DecodeFromRecordBatch
138where
139 Self: Sized + Into<Data> + ArrowSchemaProvider,
140{
141 fn decode_batch(
147 metadata: &HashMap<String, String>,
148 record_batch: RecordBatch,
149 ) -> Result<Vec<Self>, EncodingError>;
150}
151
152pub trait DecodeDataFromRecordBatch
153where
154 Self: Sized + Into<Data> + ArrowSchemaProvider,
155{
156 fn decode_data_batch(
162 metadata: &HashMap<String, String>,
163 record_batch: RecordBatch,
164 ) -> Result<Vec<Data>, EncodingError>;
165}
166
167pub trait WriteStream {
168 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
174}
175
176impl<T: EncodeToRecordBatch + Write> WriteStream for T {
177 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
178 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
179 writer.write(record_batch)?;
180 writer.finish()?;
181 Ok(())
182 }
183}
184
185pub fn extract_column<'a, T: Array + 'static>(
193 cols: &'a [ArrayRef],
194 column_key: &'static str,
195 column_index: usize,
196 expected_type: DataType,
197) -> Result<&'a T, EncodingError> {
198 let column_values = cols
199 .get(column_index)
200 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
201 let downcasted_values =
202 column_values
203 .as_any()
204 .downcast_ref::<T>()
205 .ok_or(EncodingError::InvalidColumnType(
206 column_key,
207 column_index,
208 expected_type,
209 column_values.data_type().clone(),
210 ))?;
211 Ok(downcasted_values)
212}
213
214pub fn book_deltas_to_arrow_record_batch_bytes(
222 data: Vec<OrderBookDelta>,
223) -> Result<RecordBatch, EncodingError> {
224 if data.is_empty() {
225 return Err(EncodingError::EmptyData);
226 }
227
228 let metadata = OrderBookDelta::chunk_metadata(&data);
230 OrderBookDelta::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
231}
232
233pub fn book_depth10_to_arrow_record_batch_bytes(
245 data: Vec<OrderBookDepth10>,
246) -> Result<RecordBatch, EncodingError> {
247 if data.is_empty() {
248 return Err(EncodingError::EmptyData);
249 }
250
251 let first = data.first().unwrap();
254 let metadata = first.metadata();
255 OrderBookDepth10::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
256}
257
258pub fn quotes_to_arrow_record_batch_bytes(
270 data: Vec<QuoteTick>,
271) -> Result<RecordBatch, EncodingError> {
272 if data.is_empty() {
273 return Err(EncodingError::EmptyData);
274 }
275
276 let first = data.first().unwrap();
279 let metadata = first.metadata();
280 QuoteTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
281}
282
283pub fn trades_to_arrow_record_batch_bytes(
295 data: Vec<TradeTick>,
296) -> Result<RecordBatch, EncodingError> {
297 if data.is_empty() {
298 return Err(EncodingError::EmptyData);
299 }
300
301 let first = data.first().unwrap();
304 let metadata = first.metadata();
305 TradeTick::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
306}
307
308pub fn bars_to_arrow_record_batch_bytes(data: Vec<Bar>) -> Result<RecordBatch, EncodingError> {
320 if data.is_empty() {
321 return Err(EncodingError::EmptyData);
322 }
323
324 let first = data.first().unwrap();
327 let metadata = first.metadata();
328 Bar::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
329}
330
331pub fn mark_prices_to_arrow_record_batch_bytes(
343 data: Vec<MarkPriceUpdate>,
344) -> Result<RecordBatch, EncodingError> {
345 if data.is_empty() {
346 return Err(EncodingError::EmptyData);
347 }
348
349 let first = data.first().unwrap();
352 let metadata = first.metadata();
353 MarkPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
354}
355
356pub fn index_prices_to_arrow_record_batch_bytes(
368 data: Vec<IndexPriceUpdate>,
369) -> Result<RecordBatch, EncodingError> {
370 if data.is_empty() {
371 return Err(EncodingError::EmptyData);
372 }
373
374 let first = data.first().unwrap();
377 let metadata = first.metadata();
378 IndexPriceUpdate::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
379}
380
381pub fn instrument_closes_to_arrow_record_batch_bytes(
393 data: Vec<InstrumentClose>,
394) -> Result<RecordBatch, EncodingError> {
395 if data.is_empty() {
396 return Err(EncodingError::EmptyData);
397 }
398
399 let first = data.first().unwrap();
402 let metadata = first.metadata();
403 InstrumentClose::encode_batch(&metadata, &data).map_err(EncodingError::ArrowError)
404}