nautilus_persistence/backend/
session.rs1use std::{collections::HashMap, sync::Arc, vec::IntoIter};
17
18use compare::Compare;
19use datafusion::{
20 error::Result, logical_expr::expr::Sort, physical_plan::SendableRecordBatchStream, prelude::*,
21};
22use futures::StreamExt;
23use nautilus_core::{UnixNanos, ffi::cvec::CVec};
24use nautilus_model::data::{Data, HasTsInit};
25use nautilus_serialization::arrow::{
26 DataStreamingError, DecodeDataFromRecordBatch, EncodeToRecordBatch, WriteStream,
27};
28use object_store::ObjectStore;
29use url::Url;
30
31use super::kmerge_batch::{EagerStream, ElementBatchIter, KMerge};
32
33#[derive(Debug, Default)]
34pub struct TsInitComparator;
35
36impl<I> Compare<ElementBatchIter<I, Data>> for TsInitComparator
37where
38 I: Iterator<Item = IntoIter<Data>>,
39{
40 fn compare(
41 &self,
42 l: &ElementBatchIter<I, Data>,
43 r: &ElementBatchIter<I, Data>,
44 ) -> std::cmp::Ordering {
45 l.item.ts_init().cmp(&r.item.ts_init()).reverse()
47 }
48}
49
50pub type QueryResult = KMerge<EagerStream<std::vec::IntoIter<Data>>, Data, TsInitComparator>;
51
52#[cfg_attr(
58 feature = "python",
59 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.persistence")
60)]
61pub struct DataBackendSession {
62 pub chunk_size: usize,
63 pub runtime: Arc<tokio::runtime::Runtime>,
64 session_ctx: SessionContext,
65 batch_streams: Vec<EagerStream<IntoIter<Data>>>,
66}
67
68impl DataBackendSession {
69 #[must_use]
71 pub fn new(chunk_size: usize) -> Self {
72 let runtime = tokio::runtime::Builder::new_multi_thread()
73 .enable_all()
74 .build()
75 .unwrap();
76 let session_cfg = SessionConfig::new()
77 .set_str("datafusion.optimizer.repartition_file_scans", "false")
78 .set_str("datafusion.optimizer.prefer_existing_sort", "true");
79 let session_ctx = SessionContext::new_with_config(session_cfg);
80 Self {
81 session_ctx,
82 batch_streams: Vec::default(),
83 chunk_size,
84 runtime: Arc::new(runtime),
85 }
86 }
87
88 pub fn register_object_store(&mut self, url: &Url, object_store: Arc<dyn ObjectStore>) {
90 self.session_ctx.register_object_store(url, object_store);
91 }
92
93 pub fn register_object_store_from_uri(
95 &mut self,
96 uri: &str,
97 storage_options: Option<std::collections::HashMap<String, String>>,
98 ) -> anyhow::Result<()> {
99 let (object_store, _, _) =
101 crate::parquet::create_object_store_from_path(uri, storage_options)?;
102
103 let parsed_uri = Url::parse(uri)?;
105
106 if matches!(
108 parsed_uri.scheme(),
109 "s3" | "gs" | "gcs" | "azure" | "abfs" | "http" | "https"
110 ) {
111 let base_url = format!(
113 "{}://{}",
114 parsed_uri.scheme(),
115 parsed_uri.host_str().unwrap_or("")
116 );
117 let base_parsed_url = Url::parse(&base_url)?;
118 self.register_object_store(&base_parsed_url, object_store);
119 }
120
121 Ok(())
122 }
123
124 pub fn write_data<T: EncodeToRecordBatch>(
125 data: &[T],
126 metadata: &HashMap<String, String>,
127 stream: &mut dyn WriteStream,
128 ) -> Result<(), DataStreamingError> {
129 let record_batch = T::encode_batch(metadata, data)?;
130 stream.write(&record_batch)?;
131 Ok(())
132 }
133
134 pub fn add_file<T>(
148 &mut self,
149 table_name: &str,
150 file_path: &str,
151 sql_query: Option<&str>,
152 ) -> Result<()>
153 where
154 T: DecodeDataFromRecordBatch + Into<Data>,
155 {
156 let parquet_options = ParquetReadOptions::<'_> {
157 skip_metadata: Some(false),
158 file_sort_order: vec![vec![Sort {
159 expr: col("ts_init"),
160 asc: true,
161 nulls_first: false,
162 }]],
163 ..Default::default()
164 };
165 self.runtime.block_on(self.session_ctx.register_parquet(
166 table_name,
167 file_path,
168 parquet_options,
169 ))?;
170
171 let default_query = format!("SELECT * FROM {} ORDER BY ts_init", &table_name);
172 let sql_query = sql_query.unwrap_or(&default_query);
173 let query = self.runtime.block_on(self.session_ctx.sql(sql_query))?;
174
175 let batch_stream = self.runtime.block_on(query.execute_stream())?;
176
177 self.add_batch_stream::<T>(batch_stream);
178 Ok(())
179 }
180
181 fn add_batch_stream<T>(&mut self, stream: SendableRecordBatchStream)
182 where
183 T: DecodeDataFromRecordBatch + Into<Data>,
184 {
185 let transform = stream.map(|result| match result {
186 Ok(batch) => T::decode_data_batch(batch.schema().metadata(), batch)
187 .unwrap()
188 .into_iter(),
189 Err(e) => panic!("Error getting next batch from RecordBatchStream: {e}"),
190 });
191
192 self.batch_streams
193 .push(EagerStream::from_stream_with_runtime(
194 transform,
195 self.runtime.clone(),
196 ));
197 }
198
199 pub fn get_query_result(&mut self) -> QueryResult {
204 let mut kmerge: KMerge<_, _, _> = KMerge::new(TsInitComparator);
205
206 self.batch_streams
207 .drain(..)
208 .for_each(|eager_stream| kmerge.push_iter(eager_stream));
209
210 kmerge
211 }
212}
213
214unsafe impl Send for DataBackendSession {}
216
217#[must_use]
218pub fn build_query(
219 table: &str,
220 start: Option<UnixNanos>,
221 end: Option<UnixNanos>,
222 where_clause: Option<&str>,
223) -> String {
224 let mut conditions = Vec::new();
225
226 if let Some(clause) = where_clause {
228 conditions.push(clause.to_string());
229 }
230
231 if let Some(start_ts) = start {
233 conditions.push(format!("ts_init >= {start_ts}"));
234 }
235
236 if let Some(end_ts) = end {
238 conditions.push(format!("ts_init <= {end_ts}"));
239 }
240
241 let mut query = format!("SELECT * FROM {table}");
243
244 if !conditions.is_empty() {
246 query.push_str(" WHERE ");
247 query.push_str(&conditions.join(" AND "));
248 }
249
250 query.push_str(" ORDER BY ts_init");
252
253 query
254}
255
256#[cfg_attr(
257 feature = "python",
258 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.persistence", unsendable)
259)]
260pub struct DataQueryResult {
261 pub chunk: Option<CVec>,
262 pub result: QueryResult,
263 pub acc: Vec<Data>,
264 pub size: usize,
265}
266
267impl DataQueryResult {
268 #[must_use]
270 pub const fn new(result: QueryResult, size: usize) -> Self {
271 Self {
272 chunk: None,
273 result,
274 acc: Vec::new(),
275 size,
276 }
277 }
278
279 pub fn set_chunk(&mut self, data: Vec<Data>) -> CVec {
283 self.drop_chunk();
284
285 let chunk: CVec = data.into();
286 self.chunk = Some(chunk);
287 chunk
288 }
289
290 pub fn drop_chunk(&mut self) {
294 if let Some(CVec { ptr, len, cap }) = self.chunk.take() {
295 let data: Vec<Data> =
296 unsafe { Vec::from_raw_parts(ptr.cast::<nautilus_model::data::Data>(), len, cap) };
297 drop(data);
298 }
299 }
300}
301
302impl Iterator for DataQueryResult {
303 type Item = Vec<Data>;
304
305 fn next(&mut self) -> Option<Self::Item> {
306 for _ in 0..self.size {
307 match self.result.next() {
308 Some(item) => self.acc.push(item),
309 None => break,
310 }
311 }
312
313 let mut acc: Vec<Data> = Vec::new();
316 std::mem::swap(&mut acc, &mut self.acc);
317 Some(acc)
318 }
319}
320
321impl Drop for DataQueryResult {
322 fn drop(&mut self) {
323 self.drop_chunk();
324 self.result.clear();
325 }
326}
327
328unsafe impl Send for DataQueryResult {}