nautilus_serialization/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::{ipc::writer::StreamWriter, record_batch::RecordBatch};
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::{
21    data::{
22        Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick, close::InstrumentClose,
24    },
25    python::data::{
26        pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
27        pyobjects_to_mark_prices, pyobjects_to_quotes, pyobjects_to_trades,
28    },
29};
30use pyo3::{
31    conversion::IntoPyObjectExt,
32    exceptions::{PyRuntimeError, PyTypeError, PyValueError},
33    prelude::*,
34    types::{PyBytes, PyType},
35};
36
37use crate::arrow::{
38    ArrowSchemaProvider, bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39    book_depth10_to_arrow_record_batch_bytes, index_prices_to_arrow_record_batch_bytes,
40    instrument_closes_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
41    quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
42};
43
44/// Transforms the given record `batches` into Python `bytes`.
45fn arrow_record_batch_to_pybytes(py: Python, batch: RecordBatch) -> PyResult<Py<PyBytes>> {
46    // Create a cursor to write to a byte array in memory
47    let mut cursor = Cursor::new(Vec::new());
48    {
49        let mut writer = StreamWriter::try_new(&mut cursor, &batch.schema())
50            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
51
52        writer
53            .write(&batch)
54            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
55
56        writer
57            .finish()
58            .map_err(|e| PyRuntimeError::new_err(format!("{e}")))?;
59    }
60
61    let buffer = cursor.into_inner();
62    let pybytes = PyBytes::new(py, &buffer);
63
64    Ok(pybytes.into())
65}
66
67/// Returns a mapping from field names to Arrow data types for the given Rust data class.
68///
69/// # Errors
70///
71/// Returns a `PyErr` if the class name is not recognized or schema extraction fails.
72#[pyfunction]
73pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
74    let cls_str: String = cls.getattr("__name__")?.extract()?;
75    let result_map = match cls_str.as_str() {
76        stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
77        stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
78        stringify!(QuoteTick) => QuoteTick::get_schema_map(),
79        stringify!(TradeTick) => TradeTick::get_schema_map(),
80        stringify!(Bar) => Bar::get_schema_map(),
81        stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
82        stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
83        _ => {
84            return Err(PyTypeError::new_err(format!(
85                "Arrow schema for `{cls_str}` is not currently implemented in Rust."
86            )));
87        }
88    };
89
90    result_map.into_py_any(py)
91}
92
93/// Returns Python `bytes` from the given list of legacy data objects, which can be passed
94/// to `pa.ipc.open_stream` to create a `RecordBatchReader`.
95///
96/// # Errors
97///
98/// Returns an error if:
99/// - The input list is empty: `PyErr`.
100/// - An unsupported data type is encountered or conversion fails: `PyErr`.
101///
102/// # Panics
103///
104/// Panics if `data.first()` returns `None` (should not occur due to emptiness check).
105#[pyfunction]
106pub fn pyobjects_to_arrow_record_batch_bytes(
107    py: Python,
108    data: Vec<Bound<'_, PyAny>>,
109) -> PyResult<Py<PyBytes>> {
110    if data.is_empty() {
111        return Err(to_pyvalue_err("Empty data"));
112    }
113
114    let data_type: String = data
115        .first()
116        .unwrap() // SAFETY: Unwrap safe as already checked that `data` not empty
117        .getattr("__class__")?
118        .getattr("__name__")?
119        .extract()?;
120
121    match data_type.as_str() {
122        stringify!(OrderBookDelta) => {
123            let deltas = pyobjects_to_book_deltas(data)?;
124            py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
125        }
126        stringify!(QuoteTick) => {
127            let quotes = pyobjects_to_quotes(data)?;
128            py_quotes_to_arrow_record_batch_bytes(py, quotes)
129        }
130        stringify!(TradeTick) => {
131            let trades = pyobjects_to_trades(data)?;
132            py_trades_to_arrow_record_batch_bytes(py, trades)
133        }
134        stringify!(Bar) => {
135            let bars = pyobjects_to_bars(data)?;
136            py_bars_to_arrow_record_batch_bytes(py, bars)
137        }
138        stringify!(MarkPriceUpdate) => {
139            let updates = pyobjects_to_mark_prices(data)?;
140            py_mark_prices_to_arrow_record_batch_bytes(py, updates)
141        }
142        stringify!(IndexPriceUpdate) => {
143            let index_prices = pyobjects_to_index_prices(data)?;
144            py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
145        }
146        stringify!(InstrumentClose) => {
147            let closes = pyobjects_to_index_prices(data)?;
148            py_index_prices_to_arrow_record_batch_bytes(py, closes)
149        }
150        _ => Err(PyValueError::new_err(format!(
151            "unsupported data type: {data_type}"
152        ))),
153    }
154}
155
156/// Converts a list of `OrderBookDelta` into Arrow IPC bytes for Python.
157///
158/// # Errors
159///
160/// Returns a `PyErr` if encoding fails.
161#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
162pub fn py_book_deltas_to_arrow_record_batch_bytes(
163    py: Python,
164    data: Vec<OrderBookDelta>,
165) -> PyResult<Py<PyBytes>> {
166    match book_deltas_to_arrow_record_batch_bytes(data) {
167        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
168        Err(e) => Err(to_pyvalue_err(e)),
169    }
170}
171
172/// Converts a list of `OrderBookDepth10` into Arrow IPC bytes for Python.
173///
174/// # Errors
175///
176/// Returns a `PyErr` if encoding fails.
177#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
178pub fn py_book_depth10_to_arrow_record_batch_bytes(
179    py: Python,
180    data: Vec<OrderBookDepth10>,
181) -> PyResult<Py<PyBytes>> {
182    match book_depth10_to_arrow_record_batch_bytes(data) {
183        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
184        Err(e) => Err(to_pyvalue_err(e)),
185    }
186}
187
188/// Converts a list of `QuoteTick` into Arrow IPC bytes for Python.
189///
190/// # Errors
191///
192/// Returns a `PyErr` if encoding fails.
193#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
194pub fn py_quotes_to_arrow_record_batch_bytes(
195    py: Python,
196    data: Vec<QuoteTick>,
197) -> PyResult<Py<PyBytes>> {
198    match quotes_to_arrow_record_batch_bytes(data) {
199        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
200        Err(e) => Err(to_pyvalue_err(e)),
201    }
202}
203
204/// Converts a list of `TradeTick` into Arrow IPC bytes for Python.
205///
206/// # Errors
207///
208/// Returns a `PyErr` if encoding fails.
209#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
210pub fn py_trades_to_arrow_record_batch_bytes(
211    py: Python,
212    data: Vec<TradeTick>,
213) -> PyResult<Py<PyBytes>> {
214    match trades_to_arrow_record_batch_bytes(data) {
215        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
216        Err(e) => Err(to_pyvalue_err(e)),
217    }
218}
219
220/// Converts a list of `Bar` into Arrow IPC bytes for Python.
221///
222/// # Errors
223///
224/// Returns a `PyErr` if encoding fails.
225#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
226pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
227    match bars_to_arrow_record_batch_bytes(data) {
228        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
229        Err(e) => Err(to_pyvalue_err(e)),
230    }
231}
232
233/// Converts a list of `MarkPriceUpdate` into Arrow IPC bytes for Python.
234///
235/// # Errors
236///
237/// Returns a `PyErr` if encoding fails.
238#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
239pub fn py_mark_prices_to_arrow_record_batch_bytes(
240    py: Python,
241    data: Vec<MarkPriceUpdate>,
242) -> PyResult<Py<PyBytes>> {
243    match mark_prices_to_arrow_record_batch_bytes(data) {
244        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
245        Err(e) => Err(to_pyvalue_err(e)),
246    }
247}
248
249/// Converts a list of `IndexPriceUpdate` into Arrow IPC bytes for Python.
250///
251/// # Errors
252///
253/// Returns a `PyErr` if encoding fails.
254#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
255pub fn py_index_prices_to_arrow_record_batch_bytes(
256    py: Python,
257    data: Vec<IndexPriceUpdate>,
258) -> PyResult<Py<PyBytes>> {
259    match index_prices_to_arrow_record_batch_bytes(data) {
260        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
261        Err(e) => Err(to_pyvalue_err(e)),
262    }
263}
264
265/// Converts a list of `InstrumentClose` into Arrow IPC bytes for Python.
266///
267/// # Errors
268///
269/// Returns a `PyErr` if encoding fails.
270#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
271pub fn py_instrument_closes_to_arrow_record_batch_bytes(
272    py: Python,
273    data: Vec<InstrumentClose>,
274) -> PyResult<Py<PyBytes>> {
275    match instrument_closes_to_arrow_record_batch_bytes(data) {
276        Ok(batch) => arrow_record_batch_to_pybytes(py, batch),
277        Err(e) => Err(to_pyvalue_err(e)),
278    }
279}