nautilus_serialization/python/
arrow.rs1use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{
21 data::{
22 Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick, close::InstrumentClose,
24 },
25 python::data::{
26 pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27 pyobjects_to_mark_prices, pyobjects_to_quotes, pyobjects_to_trades,
28 },
29};
30use pyo3::{
31 conversion::IntoPyObjectExt,
32 exceptions::{PyRuntimeError, PyTypeError, PyValueError},
33 prelude::*,
34 types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38 ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39 book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
40 instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
41 quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
42};
43
44fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
46 let mut cursor = Cursor::new(Vec::new());
48 {
49 let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
50 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
51
52 writer
53 .write(&batch)
54 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
55
56 writer
57 .finish()
58 .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
59 }
60
61 let buffer = cursor.into_inner();
62 let pybytes = PyBytes::new(py, &buffer);
63
64 Ok(pybytes.into())
65}
66
67#[pyfunction]
73pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
74 let cls_str: String = cls.getattr("__name__")?.extract()?;
75 let result_map = match cls_str.as_str() {
76 stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
77 stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
78 stringify!(QuoteTick) => QuoteTick::get_schema_map(),
79 stringify!(TradeTick) => TradeTick::get_schema_map(),
80 stringify!(Bar) => Bar::get_schema_map(),
81 stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
82 stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
83 _ => {
84 return Err(PyTypeError::new_err(format!(
85 "Arrow schema for `{cls_str}` is not currently implemented in Rust."
86 )));
87 }
88 };
89
90 result_map.into_py_any(py)
91}
92
93#[pyfunction]
106pub fn pyobjects_to_arrow_record_batch_bytes(
107 py: Python,
108 data: Vec<Bound<'_, PyAny>>,
109) -> PyResult<Py<PyBytes>> {
110 if data.is_empty() {
111 return Err(to_pyvalue_err("Empty data"));
112 }
113
114 let data_type: String = data
115 .first()
116 .unwrap() .getattr("__class__")?
118 .getattr("__name__")?
119 .extract()?;
120
121 match data_type.as_str() {
122 stringify!(OrderBookDelta) => {
123 let deltas = pyobjects_to_book_deltas(data)?;
124 py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
125 }
126 stringify!(QuoteTick) => {
127 let quotes = pyobjects_to_quotes(data)?;
128 py_quotes_to_arrow_record_batch_bytes(py, quotes)
129 }
130 stringify!(TradeTick) => {
131 let trades = pyobjects_to_trades(data)?;
132 py_trades_to_arrow_record_batch_bytes(py, trades)
133 }
134 stringify!(Bar) => {
135 let bars = pyobjects_to_bars(data)?;
136 py_bars_to_arrow_record_batch_bytes(py, bars)
137 }
138 stringify!(MarkPriceUpdate) => {
139 let updates = pyobjects_to_mark_prices(data)?;
140 py_mark_prices_to_arrow_record_batch_bytes(py, updates)
141 }
142 stringify!(IndexPriceUpdate) => {
143 let index_prices = pyobjects_to_index_prices(data)?;
144 py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
145 }
146 stringify!(InstrumentClose) => {
147 let closes = pyobjects_to_index_prices(data)?;
148 py_index_prices_to_arrow_record_batch_bytes(py, closes)
149 }
150 _ => Err(PyValueError::new_err(format!(
151 "unsupported data type: {data_type}"
152 ))),
153 }
154}
155
156#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
162pub fn py_book_deltas_to_arrow_record_batch_bytes(
163 py: Python,
164 data: Vec<OrderBookDelta>,
165) -> PyResult<Py<PyBytes>> {
166 match book_deltas_to_arrow_record_batch_bytes(data) {
167 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
168 Err(e) => Err(to_pyvalue_err(e)),
169 }
170}
171
172#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
178pub fn py_book_depth10_to_arrow_record_batch_bytes(
179 py: Python,
180 data: Vec<OrderBookDepth10>,
181) -> PyResult<Py<PyBytes>> {
182 match book_depth10_to_arrow_record_batch_bytes(data) {
183 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
184 Err(e) => Err(to_pyvalue_err(e)),
185 }
186}
187
188#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
194pub fn py_quotes_to_arrow_record_batch_bytes(
195 py: Python,
196 data: Vec<QuoteTick>,
197) -> PyResult<Py<PyBytes>> {
198 match quotes_to_arrow_record_batch_bytes(data) {
199 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
200 Err(e) => Err(to_pyvalue_err(e)),
201 }
202}
203
204#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
210pub fn py_trades_to_arrow_record_batch_bytes(
211 py: Python,
212 data: Vec<TradeTick>,
213) -> PyResult<Py<PyBytes>> {
214 match trades_to_arrow_record_batch_bytes(data) {
215 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
216 Err(e) => Err(to_pyvalue_err(e)),
217 }
218}
219
220#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
226pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
227 match bars_to_arrow_record_batch_bytes(data) {
228 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
229 Err(e) => Err(to_pyvalue_err(e)),
230 }
231}
232
233#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
239pub fn py_mark_prices_to_arrow_record_batch_bytes(
240 py: Python,
241 data: Vec<MarkPriceUpdate>,
242) -> PyResult<Py<PyBytes>> {
243 match mark_prices_to_arrow_record_batch_bytes(data) {
244 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
245 Err(e) => Err(to_pyvalue_err(e)),
246 }
247}
248
249#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
255pub fn py_index_prices_to_arrow_record_batch_bytes(
256 py: Python,
257 data: Vec<IndexPriceUpdate>,
258) -> PyResult<Py<PyBytes>> {
259 match index_prices_to_arrow_record_batch_bytes(data) {
260 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
261 Err(e) => Err(to_pyvalue_err(e)),
262 }
263}
264
265#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
271pub fn py_instrument_closes_to_arrow_record_batch_bytes(
272 py: Python,
273 data: Vec<InstrumentClose>,
274) -> PyResult<Py<PyBytes>> {
275 match instrument_closes_to_arrow_record_batch_bytes(data) {
276 Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
277 Err(e) => Err(to_pyvalue_err(e)),
278 }
279}