1use std::collections::HashMap;
17
18use nautilus_common::{custom::CustomData, signal::Signal};
19use nautilus_model::{
20 accounts::{Account, AccountAny},
21 data::{Bar, DataType, QuoteTick, TradeTick},
22 events::{
23 AccountState, OrderEvent, OrderEventAny, OrderSnapshot,
24 position::snapshot::PositionSnapshot,
25 },
26 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
27 instruments::{Instrument, InstrumentAny},
28 orders::{Order, OrderAny},
29 types::{AccountBalance, Currency, MarginBalance},
30};
31use sqlx::{PgPool, Row};
32
33use super::models::{
34 orders::OrderSnapshotModel,
35 positions::PositionSnapshotModel,
36 types::{CustomDataModel, SignalModel},
37};
38use crate::sql::models::{
39 accounts::AccountEventModel,
40 data::{BarModel, QuoteTickModel, TradeTickModel},
41 enums::{
42 AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
43 CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
44 },
45 general::{GeneralRow, OrderEventOrderClientIdCombination},
46 instruments::InstrumentAnyModel,
47 orders::OrderEventAnyModel,
48 types::CurrencyModel,
49};
50
51#[derive(Debug)]
52pub struct DatabaseQueries;
53
54impl DatabaseQueries {
55 pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
61 sqlx::query("SELECT truncate_all_tables()")
62 .execute(pool)
63 .await
64 .map(|_| ())
65 .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
66 }
67
68 pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
74 sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
75 .bind(key)
76 .bind(value)
77 .execute(pool)
78 .await
79 .map(|_| ())
80 .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
81 }
82
83 pub async fn load(pool: &PgPool) -> anyhow::Result<HashMap<String, Vec<u8>>> {
89 sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
90 .fetch_all(pool)
91 .await
92 .map(|rows| {
93 let mut cache: HashMap<String, Vec<u8>> = HashMap::new();
94 for row in rows {
95 cache.insert(row.id, row.value);
96 }
97 cache
98 })
99 .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
100 }
101
102 pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
108 sqlx::query(
109 "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
110 )
111 .bind(currency.code.as_str())
112 .bind(i32::from(currency.precision))
113 .bind(i32::from(currency.iso4217))
114 .bind(currency.name.as_str())
115 .bind(CurrencyTypeModel(currency.currency_type))
116 .execute(pool)
117 .await
118 .map(|_| ())
119 .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
120 }
121
122 pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
128 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
129 .fetch_all(pool)
130 .await
131 .map(|rows| rows.into_iter().map(|row| row.0).collect())
132 .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
133 }
134
135 pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
141 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
142 .bind(code)
143 .fetch_optional(pool)
144 .await
145 .map(|currency| currency.map(|row| row.0))
146 .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
147 }
148
149 pub async fn add_instrument(
155 pool: &PgPool,
156 kind: &str,
157 instrument: Box<dyn Instrument>,
158 ) -> anyhow::Result<()> {
159 sqlx::query(r#"
160 INSERT INTO "instrument" (
161 id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
162 multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
163 price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
164 min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
165 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
166 ON CONFLICT (id)
167 DO UPDATE
168 SET
169 kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
170 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
171 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
172 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32, ts_event = $33, updated_at = CURRENT_TIMESTAMP
173 "#)
174 .bind(instrument.id().to_string())
175 .bind(kind)
176 .bind(instrument.raw_symbol().to_string())
177 .bind(instrument.base_currency().map(|x| x.code.as_str()))
178 .bind(instrument.underlying().map(|x| x.to_string()))
179 .bind(instrument.quote_currency().code.as_str())
180 .bind(instrument.settlement_currency().code.as_str())
181 .bind(instrument.isin().map(|x| x.to_string()))
182 .bind(AssetClassModel(instrument.asset_class()))
183 .bind(instrument.exchange().map(|x| x.to_string()))
184 .bind(instrument.multiplier().to_string())
185 .bind(instrument.option_kind().map(|x| x.to_string()))
186 .bind(instrument.is_inverse())
187 .bind(instrument.strike_price().map(|x| x.to_string()))
188 .bind(instrument.activation_ns().map(|x| x.to_string()))
189 .bind(instrument.expiration_ns().map(|x| x.to_string()))
190 .bind(i32::from(instrument.price_precision()))
191 .bind(i32::from(instrument.size_precision()))
192 .bind(instrument.price_increment().to_string())
193 .bind(instrument.size_increment().to_string())
194 .bind(instrument.maker_fee().to_string())
195 .bind(instrument.taker_fee().to_string())
196 .bind(instrument.margin_init().to_string())
197 .bind(instrument.margin_maint().to_string())
198 .bind(instrument.lot_size().map(|x| x.to_string()))
199 .bind(instrument.max_quantity().map(|x| x.to_string()))
200 .bind(instrument.min_quantity().map(|x| x.to_string()))
201 .bind(instrument.max_notional().map(|x| x.to_string()))
202 .bind(instrument.min_notional().map(|x| x.to_string()))
203 .bind(instrument.max_price().map(|x| x.to_string()))
204 .bind(instrument.min_price().map(|x| x.to_string()))
205 .bind(instrument.ts_init().to_string())
206 .bind(instrument.ts_event().to_string())
207 .execute(pool)
208 .await
209 .map(|_| ())
210 .map_err(|e| anyhow::anyhow!(format!("Failed to insert item {} into instrument table: {:?}", instrument.id().to_string(), e)))
211 }
212
213 pub async fn load_instrument(
219 pool: &PgPool,
220 instrument_id: &InstrumentId,
221 ) -> anyhow::Result<Option<InstrumentAny>> {
222 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
223 .bind(instrument_id.to_string())
224 .fetch_optional(pool)
225 .await
226 .map(|instrument| instrument.map(|row| row.0))
227 .map_err(|e| {
228 anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
229 })
230 }
231
232 pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
238 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
239 .fetch_all(pool)
240 .await
241 .map(|rows| rows.into_iter().map(|row| row.0).collect())
242 .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
243 }
244
245 pub async fn add_order(
255 pool: &PgPool,
256 _kind: &str,
257 updated: bool,
258 order: Box<dyn Order>,
259 client_id: Option<ClientId>,
260 ) -> anyhow::Result<()> {
261 if updated {
262 let exists = Self::check_if_order_initialized_exists(pool, order.client_order_id())
263 .await
264 .unwrap();
265 assert!(
266 exists,
267 "OrderInitialized event does not exist for order: {}",
268 order.client_order_id()
269 );
270 }
271 match order.last_event().clone() {
272 OrderEventAny::Accepted(event) => {
273 Self::add_order_event(pool, Box::new(event), client_id).await
274 }
275 OrderEventAny::CancelRejected(event) => {
276 Self::add_order_event(pool, Box::new(event), client_id).await
277 }
278 OrderEventAny::Canceled(event) => {
279 Self::add_order_event(pool, Box::new(event), client_id).await
280 }
281 OrderEventAny::Denied(event) => {
282 Self::add_order_event(pool, Box::new(event), client_id).await
283 }
284 OrderEventAny::Emulated(event) => {
285 Self::add_order_event(pool, Box::new(event), client_id).await
286 }
287 OrderEventAny::Expired(event) => {
288 Self::add_order_event(pool, Box::new(event), client_id).await
289 }
290 OrderEventAny::Filled(event) => {
291 Self::add_order_event(pool, Box::new(event), client_id).await
292 }
293 OrderEventAny::Initialized(event) => {
294 Self::add_order_event(pool, Box::new(event), client_id).await
295 }
296 OrderEventAny::ModifyRejected(event) => {
297 Self::add_order_event(pool, Box::new(event), client_id).await
298 }
299 OrderEventAny::PendingCancel(event) => {
300 Self::add_order_event(pool, Box::new(event), client_id).await
301 }
302 OrderEventAny::PendingUpdate(event) => {
303 Self::add_order_event(pool, Box::new(event), client_id).await
304 }
305 OrderEventAny::Rejected(event) => {
306 Self::add_order_event(pool, Box::new(event), client_id).await
307 }
308 OrderEventAny::Released(event) => {
309 Self::add_order_event(pool, Box::new(event), client_id).await
310 }
311 OrderEventAny::Submitted(event) => {
312 Self::add_order_event(pool, Box::new(event), client_id).await
313 }
314 OrderEventAny::Updated(event) => {
315 Self::add_order_event(pool, Box::new(event), client_id).await
316 }
317 OrderEventAny::Triggered(event) => {
318 Self::add_order_event(pool, Box::new(event), client_id).await
319 }
320 }
321 }
322
323 pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
333 let mut transaction = pool.begin().await?;
334
335 sqlx::query(
338 r#"
339 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
340 "#,
341 )
342 .bind(snapshot.trader_id.to_string())
343 .execute(&mut *transaction)
344 .await
345 .map(|_| ())
346 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
347
348 sqlx::query(
349 r#"
350 INSERT INTO "order" (
351 id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
352 account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
353 trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
354 expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
355 is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
356 trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
357 parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
358 created_at, updated_at
359 ) VALUES (
360 $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
361 $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
362 $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
363 CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
364 )
365 ON CONFLICT (id)
366 DO UPDATE SET
367 trader_id = $2,
368 strategy_id = $3,
369 instrument_id = $4,
370 venue_order_id = $5,
371 position_id = $6,
372 account_id = $7,
373 last_trade_id = $8,
374 order_type = $9,
375 order_side = $10,
376 quantity = $11,
377 price = $12,
378 trigger_price = $13,
379 trigger_type = $14,
380 limit_offset = $15,
381 trailing_offset = $16,
382 trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
383 time_in_force = $18,
384 expire_time = $19,
385 filled_qty = $20,
386 liquidity_side = $21,
387 avg_px = $22,
388 slippage = $23,
389 commissions = $24,
390 status = $25,
391 is_post_only = $26,
392 is_reduce_only = $27,
393 is_quote_quantity = $28,
394 display_qty = $29,
395 emulation_trigger = $30,
396 trigger_instrument_id = $31,
397 contingency_type = $32,
398 order_list_id = $33,
399 linked_order_ids = $34,
400 parent_order_id = $35,
401 exec_algorithm_id = $36,
402 exec_algorithm_params = $37,
403 exec_spawn_id = $38,
404 tags = $39,
405 init_id = $40,
406 ts_init = $41,
407 ts_last = $42,
408 updated_at = CURRENT_TIMESTAMP
409 "#)
410 .bind(snapshot.client_order_id.to_string()) .bind(snapshot.trader_id.to_string())
412 .bind(snapshot.strategy_id.to_string())
413 .bind(snapshot.instrument_id.to_string())
414 .bind(snapshot.venue_order_id.map(|x| x.to_string()))
415 .bind(snapshot.position_id.map(|x| x.to_string()))
416 .bind(snapshot.account_id.map(|x| x.to_string()))
417 .bind(snapshot.last_trade_id.map(|x| x.to_string()))
418 .bind(snapshot.order_type.to_string())
419 .bind(snapshot.order_side.to_string())
420 .bind(snapshot.quantity.to_string())
421 .bind(snapshot.price.map(|x| x.to_string()))
422 .bind(snapshot.trigger_price.map(|x| x.to_string()))
423 .bind(snapshot.trigger_type.map(|x| x.to_string()))
424 .bind(snapshot.limit_offset.map(|x| x.to_string()))
425 .bind(snapshot.trailing_offset.map(|x| x.to_string()))
426 .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
427 .bind(snapshot.time_in_force.to_string())
428 .bind(snapshot.expire_time.map(|x| x.to_string()))
429 .bind(snapshot.filled_qty.to_string())
430 .bind(snapshot.liquidity_side.map(|x| x.to_string()))
431 .bind(snapshot.avg_px)
432 .bind(snapshot.slippage)
433 .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
434 .bind(snapshot.status.to_string())
435 .bind(snapshot.is_post_only)
436 .bind(snapshot.is_reduce_only)
437 .bind(snapshot.is_quote_quantity)
438 .bind(snapshot.display_qty.map(|x| x.to_string()))
439 .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
440 .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
441 .bind(snapshot.contingency_type.map(|x| x.to_string()))
442 .bind(snapshot.order_list_id.map(|x| x.to_string()))
443 .bind(snapshot.linked_order_ids.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
444 .bind(snapshot.parent_order_id.map(|x| x.to_string()))
445 .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
446 .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
447 .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
448 .bind(snapshot.tags.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
449 .bind(snapshot.init_id.to_string())
450 .bind(snapshot.ts_init.to_string())
451 .bind(snapshot.ts_last.to_string())
452 .execute(&mut *transaction)
453 .await
454 .map(|_| ())
455 .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
456
457 transaction
458 .commit()
459 .await
460 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
461 }
462
463 pub async fn load_order_snapshot(
469 pool: &PgPool,
470 client_order_id: &ClientOrderId,
471 ) -> anyhow::Result<Option<OrderSnapshot>> {
472 sqlx::query_as::<_, OrderSnapshotModel>(
473 r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
474 )
475 .bind(client_order_id.to_string())
476 .fetch_optional(pool)
477 .await
478 .map(|model| model.map(|m| m.0))
479 .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
480 }
481
482 pub async fn add_position_snapshot(
488 pool: &PgPool,
489 snapshot: PositionSnapshot,
490 ) -> anyhow::Result<()> {
491 let mut transaction = pool.begin().await?;
492
493 sqlx::query(
496 r#"
497 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
498 "#,
499 )
500 .bind(snapshot.trader_id.to_string())
501 .execute(&mut *transaction)
502 .await
503 .map(|_| ())
504 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
505
506 sqlx::query(r#"
507 INSERT INTO "position" (
508 id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
509 quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
510 duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
511 ) VALUES (
512 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
513 $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
514 )
515 ON CONFLICT (id)
516 DO UPDATE
517 SET
518 trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
519 peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
520 commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
521 "#)
522 .bind(snapshot.position_id.to_string())
523 .bind(snapshot.trader_id.to_string())
524 .bind(snapshot.strategy_id.to_string())
525 .bind(snapshot.instrument_id.to_string())
526 .bind(snapshot.account_id.to_string())
527 .bind(snapshot.opening_order_id.to_string())
528 .bind(snapshot.closing_order_id.map(|x| x.to_string()))
529 .bind(snapshot.entry.to_string())
530 .bind(snapshot.side.to_string())
531 .bind(snapshot.signed_qty)
532 .bind(snapshot.quantity.to_string())
533 .bind(snapshot.peak_qty.to_string())
534 .bind(snapshot.quote_currency.to_string())
535 .bind(snapshot.base_currency.map(|x| x.to_string()))
536 .bind(snapshot.settlement_currency.to_string())
537 .bind(snapshot.avg_px_open)
538 .bind(snapshot.avg_px_close)
539 .bind(snapshot.realized_return)
540 .bind(snapshot.realized_pnl.map(|x| x.to_string()))
541 .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
542 .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
543 .bind(snapshot.duration_ns.map(|x| x.to_string()))
544 .bind(snapshot.ts_opened.to_string())
545 .bind(snapshot.ts_closed.map(|x| x.to_string()))
546 .bind(snapshot.ts_init.to_string())
547 .bind(snapshot.ts_last.to_string())
548 .execute(&mut *transaction)
549 .await
550 .map(|_| ())
551 .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
552 transaction
553 .commit()
554 .await
555 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
556 }
557
558 pub async fn load_position_snapshot(
564 pool: &PgPool,
565 position_id: &PositionId,
566 ) -> anyhow::Result<Option<PositionSnapshot>> {
567 sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
568 .bind(position_id.to_string())
569 .fetch_optional(pool)
570 .await
571 .map(|model| model.map(|m| m.0))
572 .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
573 }
574
575 pub async fn check_if_order_initialized_exists(
581 pool: &PgPool,
582 client_order_id: ClientOrderId,
583 ) -> anyhow::Result<bool> {
584 sqlx::query(r#"
585 SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
586 "#)
587 .bind(client_order_id.to_string())
588 .fetch_one(pool)
589 .await
590 .map(|row| row.get(0))
591 .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
592 }
593
594 pub async fn check_if_account_event_exists(
600 pool: &PgPool,
601 account_id: AccountId,
602 ) -> anyhow::Result<bool> {
603 sqlx::query(
604 r#"
605 SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
606 "#,
607 )
608 .bind(account_id.to_string())
609 .fetch_one(pool)
610 .await
611 .map(|row| row.get(0))
612 .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
613 }
614
615 pub async fn add_order_event(
621 pool: &PgPool,
622 order_event: Box<dyn OrderEvent>,
623 client_id: Option<ClientId>,
624 ) -> anyhow::Result<()> {
625 let mut transaction = pool.begin().await?;
626
627 sqlx::query(
630 r#"
631 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
632 "#,
633 )
634 .bind(order_event.trader_id().to_string())
635 .execute(&mut *transaction)
636 .await
637 .map(|_| ())
638 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
639
640 if let Some(client_id) = client_id {
643 sqlx::query(
644 r#"
645 INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
646 "#,
647 )
648 .bind(client_id.to_string())
649 .execute(&mut *transaction)
650 .await
651 .map(|_| ())
652 .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
653 }
654
655 sqlx::query(r#"
656 INSERT INTO "order_event" (
657 id, kind, client_order_id, order_type, order_side, trader_id, client_id, reason, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
658 post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
659 trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
660 order_list_id, linked_order_ids, parent_order_id,
661 exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
662 ) VALUES (
663 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
664 $21, $22, $23, $24, $25, $26::trailing_offset_type, $27, $28, $29, $30, $31, $32, $33, $34,
665 $35, $36, $37, $38, $39, $40, $41, $42, $43, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
666 )
667 ON CONFLICT (id)
668 DO UPDATE
669 SET
670 kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, reason = $8, strategy_id = $9, instrument_id = $10, trade_id = $11, currency = $12,
671 quantity = $13, time_in_force = $14, liquidity_side = $15, post_only = $16, reduce_only = $17, quote_quantity = $18, reconciliation = $19, price = $20, last_px = $21,
672 last_qty = $22, trigger_price = $23, trigger_type = $24, limit_offset = $25, trailing_offset = $26, trailing_offset_type = $27, expire_time = $28, display_qty = $29,
673 emulation_trigger = $30, trigger_instrument_id = $31, contingency_type = $32, order_list_id = $33, linked_order_ids = $34, parent_order_id = $35, exec_algorithm_id = $36,
674 exec_spawn_id = $37, venue_order_id = $38, account_id = $39, position_id = $40, commission = $41, ts_event = $42, ts_init = $43, updated_at = CURRENT_TIMESTAMP
675
676 "#)
677 .bind(order_event.id().to_string())
678 .bind(order_event.kind())
679 .bind(order_event.client_order_id().to_string())
680 .bind(order_event.order_type().map(|x| x.to_string()))
681 .bind(order_event.order_side().map(|x| x.to_string()))
682 .bind(order_event.trader_id().to_string())
683 .bind(client_id.map(|x| x.to_string()))
684 .bind(order_event.reason().map(|x| x.to_string()))
685 .bind(order_event.strategy_id().to_string())
686 .bind(order_event.instrument_id().to_string())
687 .bind(order_event.trade_id().map(|x| x.to_string()))
688 .bind(order_event.currency().map(|x| x.code.as_str()))
689 .bind(order_event.quantity().map(|x| x.to_string()))
690 .bind(order_event.time_in_force().map(|x| x.to_string()))
691 .bind(order_event.liquidity_side().map(|x| x.to_string()))
692 .bind(order_event.post_only())
693 .bind(order_event.reduce_only())
694 .bind(order_event.quote_quantity())
695 .bind(order_event.reconciliation())
696 .bind(order_event.price().map(|x| x.to_string()))
697 .bind(order_event.last_px().map(|x| x.to_string()))
698 .bind(order_event.last_qty().map(|x| x.to_string()))
699 .bind(order_event.trigger_price().map(|x| x.to_string()))
700 .bind(order_event.trigger_type().map(|x| x.to_string()))
701 .bind(order_event.limit_offset().map(|x| x.to_string()))
702 .bind(order_event.trailing_offset().map(|x| x.to_string()))
703 .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
704 .bind(order_event.expire_time().map(|x| x.to_string()))
705 .bind(order_event.display_qty().map(|x| x.to_string()))
706 .bind(order_event.emulation_trigger().map(|x| x.to_string()))
707 .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
708 .bind(order_event.contingency_type().map(|x| x.to_string()))
709 .bind(order_event.order_list_id().map(|x| x.to_string()))
710 .bind(order_event.linked_order_ids().map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
711 .bind(order_event.parent_order_id().map(|x| x.to_string()))
712 .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
713 .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
714 .bind(order_event.venue_order_id().map(|x| x.to_string()))
715 .bind(order_event.account_id().map(|x| x.to_string()))
716 .bind(order_event.position_id().map(|x| x.to_string()))
717 .bind(order_event.commission().map(|x| x.to_string()))
718 .bind(order_event.ts_event().to_string())
719 .bind(order_event.ts_init().to_string())
720 .execute(&mut *transaction)
721 .await
722 .map(|_| ())
723 .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
724 transaction
725 .commit()
726 .await
727 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
728 }
729
730 pub async fn load_order_events(
736 pool: &PgPool,
737 client_order_id: &ClientOrderId,
738 ) -> anyhow::Result<Vec<OrderEventAny>> {
739 sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
740 .bind(client_order_id.to_string())
741 .fetch_all(pool)
742 .await
743 .map(|rows| rows.into_iter().map(|row| row.0).collect())
744 .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
745 }
746
747 pub async fn load_order(
757 pool: &PgPool,
758 client_order_id: &ClientOrderId,
759 ) -> anyhow::Result<Option<OrderAny>> {
760 let order_events = Self::load_order_events(pool, client_order_id).await;
761
762 match order_events {
763 Ok(order_events) => {
764 if order_events.is_empty() {
765 return Ok(None);
766 }
767 let order = OrderAny::from_events(order_events).unwrap();
768 Ok(Some(order))
769 }
770 Err(e) => anyhow::bail!("Failed to load order events: {e}"),
771 }
772 }
773
774 pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
784 let mut orders: Vec<OrderAny> = Vec::new();
785 let client_order_ids: Vec<ClientOrderId> = sqlx::query(
786 r#"
787 SELECT DISTINCT client_order_id FROM "order_event"
788 "#,
789 )
790 .fetch_all(pool)
791 .await
792 .map(|rows| {
793 rows.into_iter()
794 .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
795 .collect()
796 })
797 .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
798 for id in client_order_ids {
799 let order = Self::load_order(pool, &id).await.unwrap();
800 match order {
801 Some(order) => {
802 orders.push(order);
803 }
804 None => {
805 continue;
806 }
807 }
808 }
809 Ok(orders)
810 }
811
812 pub async fn add_account(
822 pool: &PgPool,
823 kind: &str,
824 updated: bool,
825 account: Box<dyn Account>,
826 ) -> anyhow::Result<()> {
827 if updated {
828 let exists = Self::check_if_account_event_exists(pool, account.id())
829 .await
830 .unwrap();
831 assert!(
832 exists,
833 "Account event does not exist for account: {}",
834 account.id()
835 );
836 }
837
838 let mut transaction = pool.begin().await?;
839
840 sqlx::query(
841 r#"
842 INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
843 "#,
844 )
845 .bind(account.id().to_string())
846 .execute(&mut *transaction)
847 .await
848 .map(|_| ())
849 .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
850
851 let account_event = account.last_event().unwrap();
852 sqlx::query(r#"
853 INSERT INTO "account_event" (
854 id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
855 ) VALUES (
856 $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
857 )
858 ON CONFLICT (id)
859 DO UPDATE
860 SET
861 kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
862 ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
863 "#)
864 .bind(account_event.event_id.to_string())
865 .bind(kind.to_string())
866 .bind(account_event.account_id.to_string())
867 .bind(account_event.base_currency.map(|x| x.code.as_str()))
868 .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
869 .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
870 .bind(account_event.is_reported)
871 .bind(account_event.ts_event.to_string())
872 .bind(account_event.ts_init.to_string())
873 .execute(&mut *transaction)
874 .await
875 .map(|_| ())
876 .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
877 transaction
878 .commit()
879 .await
880 .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
881 }
882
883 pub async fn load_account_events(
889 pool: &PgPool,
890 account_id: &AccountId,
891 ) -> anyhow::Result<Vec<AccountState>> {
892 sqlx::query_as::<_, AccountEventModel>(
893 r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
894 )
895 .bind(account_id.to_string())
896 .fetch_all(pool)
897 .await
898 .map(|rows| rows.into_iter().map(|row| row.0).collect())
899 .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
900 }
901
902 pub async fn load_account(
912 pool: &PgPool,
913 account_id: &AccountId,
914 ) -> anyhow::Result<Option<AccountAny>> {
915 let account_events = Self::load_account_events(pool, account_id).await;
916 match account_events {
917 Ok(account_events) => {
918 if account_events.is_empty() {
919 return Ok(None);
920 }
921 let account = AccountAny::from_events(account_events).unwrap();
922 Ok(Some(account))
923 }
924 Err(e) => anyhow::bail!("Failed to load account events: {e}"),
925 }
926 }
927
928 pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
938 let mut accounts: Vec<AccountAny> = Vec::new();
939 let account_ids: Vec<AccountId> = sqlx::query(
940 r#"
941 SELECT DISTINCT account_id FROM "account_event"
942 "#,
943 )
944 .fetch_all(pool)
945 .await
946 .map(|rows| {
947 rows.into_iter()
948 .map(|row| AccountId::from(row.get::<&str, _>(0)))
949 .collect()
950 })
951 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
952 for id in account_ids {
953 let account = Self::load_account(pool, &id).await.unwrap();
954 match account {
955 Some(account) => {
956 accounts.push(account);
957 }
958 None => {
959 continue;
960 }
961 }
962 }
963 Ok(accounts)
964 }
965
966 pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
972 sqlx::query(r#"
973 INSERT INTO "trade" (
974 instrument_id, price, quantity, aggressor_side, venue_trade_id,
975 ts_event, ts_init, created_at, updated_at
976 ) VALUES (
977 $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
978 )
979 ON CONFLICT (id)
980 DO UPDATE
981 SET
982 instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
983 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
984 "#)
985 .bind(trade.instrument_id.to_string())
986 .bind(trade.price.to_string())
987 .bind(trade.size.to_string())
988 .bind(AggressorSideModel(trade.aggressor_side))
989 .bind(trade.trade_id.to_string())
990 .bind(trade.ts_event.to_string())
991 .bind(trade.ts_init.to_string())
992 .execute(pool)
993 .await
994 .map(|_| ())
995 .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
996 }
997
998 pub async fn load_trades(
1004 pool: &PgPool,
1005 instrument_id: &InstrumentId,
1006 ) -> anyhow::Result<Vec<TradeTick>> {
1007 sqlx::query_as::<_, TradeTickModel>(
1008 r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1009 )
1010 .bind(instrument_id.to_string())
1011 .fetch_all(pool)
1012 .await
1013 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1014 .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
1015 }
1016
1017 pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
1023 sqlx::query(r#"
1024 INSERT INTO "quote" (
1025 instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
1026 ) VALUES (
1027 $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1028 )
1029 ON CONFLICT (id)
1030 DO UPDATE
1031 SET
1032 instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
1033 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
1034 "#)
1035 .bind(quote.instrument_id.to_string())
1036 .bind(quote.bid_price.to_string())
1037 .bind(quote.ask_price.to_string())
1038 .bind(quote.bid_size.to_string())
1039 .bind(quote.ask_size.to_string())
1040 .bind(quote.ts_event.to_string())
1041 .bind(quote.ts_init.to_string())
1042 .execute(pool)
1043 .await
1044 .map(|_| ())
1045 .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
1046 }
1047
1048 pub async fn load_quotes(
1054 pool: &PgPool,
1055 instrument_id: &InstrumentId,
1056 ) -> anyhow::Result<Vec<QuoteTick>> {
1057 sqlx::query_as::<_, QuoteTickModel>(
1058 r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1059 )
1060 .bind(instrument_id.to_string())
1061 .fetch_all(pool)
1062 .await
1063 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1064 .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
1065 }
1066
1067 pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
1073 println!("Adding bar: {bar:?}");
1074 sqlx::query(r#"
1075 INSERT INTO "bar" (
1076 instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
1077 ) VALUES (
1078 $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1079 )
1080 ON CONFLICT (id)
1081 DO UPDATE
1082 SET
1083 instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
1084 open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
1085 "#)
1086 .bind(bar.bar_type.instrument_id().to_string())
1087 .bind(bar.bar_type.spec().step.get() as i32)
1088 .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
1089 .bind(PriceTypeModel(bar.bar_type.spec().price_type))
1090 .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
1091 .bind(bar.open.to_string())
1092 .bind(bar.high.to_string())
1093 .bind(bar.low.to_string())
1094 .bind(bar.close.to_string())
1095 .bind(bar.volume.to_string())
1096 .bind(bar.ts_event.to_string())
1097 .bind(bar.ts_init.to_string())
1098 .execute(pool)
1099 .await
1100 .map(|_| ())
1101 .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
1102 }
1103
1104 pub async fn load_bars(
1110 pool: &PgPool,
1111 instrument_id: &InstrumentId,
1112 ) -> anyhow::Result<Vec<Bar>> {
1113 sqlx::query_as::<_, BarModel>(
1114 r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1115 )
1116 .bind(instrument_id.to_string())
1117 .fetch_all(pool)
1118 .await
1119 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1120 .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
1121 }
1122
1123 pub async fn load_distinct_order_event_client_ids(
1129 pool: &PgPool,
1130 ) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
1131 let mut map: HashMap<ClientOrderId, ClientId> = HashMap::new();
1132 let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
1133 r#"
1134 SELECT DISTINCT
1135 client_order_id AS "client_order_id",
1136 client_id AS "client_id"
1137 FROM "order_event"
1138 "#,
1139 )
1140 .fetch_all(pool)
1141 .await
1142 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
1143 for id in result {
1144 map.insert(id.client_order_id, id.client_id);
1145 }
1146 Ok(map)
1147 }
1148
1149 pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
1155 sqlx::query(
1156 r#"
1157 INSERT INTO "signal" (
1158 name, value, ts_event, ts_init, created_at, updated_at
1159 ) VALUES (
1160 $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1161 )
1162 ON CONFLICT (id)
1163 DO UPDATE
1164 SET
1165 name = $1, value = $2, ts_event = $3, ts_init = $4,
1166 updated_at = CURRENT_TIMESTAMP
1167 "#,
1168 )
1169 .bind(signal.name.to_string())
1170 .bind(signal.value.to_string())
1171 .bind(signal.ts_event.to_string())
1172 .bind(signal.ts_init.to_string())
1173 .execute(pool)
1174 .await
1175 .map(|_| ())
1176 .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
1177 }
1178
1179 pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
1185 sqlx::query_as::<_, SignalModel>(
1186 r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
1187 )
1188 .bind(name)
1189 .fetch_all(pool)
1190 .await
1191 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1192 .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1193 }
1194
1195 pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1201 sqlx::query(
1202 r#"
1203 INSERT INTO "custom" (
1204 data_type, metadata, value, ts_event, ts_init, created_at, updated_at
1205 ) VALUES (
1206 $1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1207 )
1208 ON CONFLICT (id)
1209 DO UPDATE
1210 SET
1211 data_type = $1, metadata = $2, value = $3, ts_event = $4, ts_init = $5,
1212 updated_at = CURRENT_TIMESTAMP
1213 "#,
1214 )
1215 .bind(data.data_type.type_name().to_string())
1216 .bind(
1217 data.data_type
1218 .metadata()
1219 .as_ref()
1220 .map_or_else(|| Ok(serde_json::Value::Null), serde_json::to_value)?,
1221 )
1222 .bind(data.value.to_vec())
1223 .bind(data.ts_event.to_string())
1224 .bind(data.ts_init.to_string())
1225 .execute(pool)
1226 .await
1227 .map(|_| ())
1228 .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1229 }
1230
1231 pub async fn load_custom_data(
1237 pool: &PgPool,
1238 data_type: &DataType,
1239 ) -> anyhow::Result<Vec<CustomData>> {
1240 let metadata_json = data_type
1242 .metadata()
1243 .as_ref()
1244 .map_or(Ok(serde_json::Value::Null), |metadata| {
1245 serde_json::to_value(metadata)
1246 })?;
1247
1248 sqlx::query_as::<_, CustomDataModel>(
1249 r#"SELECT * FROM "custom" WHERE data_type = $1 AND metadata = $2 ORDER BY ts_init ASC"#,
1250 )
1251 .bind(data_type.type_name())
1252 .bind(metadata_json)
1253 .fetch_all(pool)
1254 .await
1255 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1256 .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))
1257 }
1258}