nautilus_infrastructure/sql/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_common::{custom::CustomData, signal::Signal};
19use nautilus_model::{
20    accounts::{Account, AccountAny},
21    data::{Bar, DataType, QuoteTick, TradeTick},
22    events::{
23        AccountState, OrderEvent, OrderEventAny, OrderSnapshot,
24        position::snapshot::PositionSnapshot,
25    },
26    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
27    instruments::{Instrument, InstrumentAny},
28    orders::{Order, OrderAny},
29    types::{AccountBalance, Currency, MarginBalance},
30};
31use sqlx::{PgPool, Row};
32
33use super::models::{
34    orders::OrderSnapshotModel,
35    positions::PositionSnapshotModel,
36    types::{CustomDataModel, SignalModel},
37};
38use crate::sql::models::{
39    accounts::AccountEventModel,
40    data::{BarModel, QuoteTickModel, TradeTickModel},
41    enums::{
42        AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
43        CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
44    },
45    general::{GeneralRow, OrderEventOrderClientIdCombination},
46    instruments::InstrumentAnyModel,
47    orders::OrderEventAnyModel,
48    types::CurrencyModel,
49};
50
51#[derive(Debug)]
52pub struct DatabaseQueries;
53
54impl DatabaseQueries {
55    /// Truncates all tables in the cache database via the provided Postgres `pool`.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if the TRUNCATE operation fails.
60    pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
61        sqlx::query("SELECT truncate_all_tables()")
62            .execute(pool)
63            .await
64            .map(|_| ())
65            .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
66    }
67
68    /// Inserts a raw key-value entry into the `general` table via the provided `pool`.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the INSERT operation fails.
73    pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
74        sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
75            .bind(key)
76            .bind(value)
77            .execute(pool)
78            .await
79            .map(|_| ())
80            .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
81    }
82
83    /// Loads all entries from the `general` table via the provided `pool`.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if the SELECT operation fails.
88    pub async fn load(pool: &PgPool) -> anyhow::Result<HashMap<String, Vec<u8>>> {
89        sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
90            .fetch_all(pool)
91            .await
92            .map(|rows| {
93                let mut cache: HashMap<String, Vec<u8>> = HashMap::new();
94                for row in rows {
95                    cache.insert(row.id, row.value);
96                }
97                cache
98            })
99            .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
100    }
101
102    /// Inserts or ignores a `Currency` row via the provided `pool`.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if the INSERT operation fails.
107    pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
108        sqlx::query(
109            "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
110        )
111            .bind(currency.code.as_str())
112            .bind(i32::from(currency.precision))
113            .bind(i32::from(currency.iso4217))
114            .bind(currency.name.as_str())
115            .bind(CurrencyTypeModel(currency.currency_type))
116            .execute(pool)
117            .await
118            .map(|_| ())
119            .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
120    }
121
122    /// Loads all `Currency` entries via the provided `pool`.
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if the SELECT operation fails.
127    pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
128        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
129            .fetch_all(pool)
130            .await
131            .map(|rows| rows.into_iter().map(|row| row.0).collect())
132            .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
133    }
134
135    /// Loads a single `Currency` entry by `code` via the provided `pool`.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if the SELECT operation fails.
140    pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
141        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
142            .bind(code)
143            .fetch_optional(pool)
144            .await
145            .map(|currency| currency.map(|row| row.0))
146            .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
147    }
148
149    /// Inserts or updates an `InstrumentAny` entry via the provided `pool`.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the INSERT or UPDATE operation fails.
154    pub async fn add_instrument(
155        pool: &PgPool,
156        kind: &str,
157        instrument: Box<dyn Instrument>,
158    ) -> anyhow::Result<()> {
159        sqlx::query(r#"
160            INSERT INTO "instrument" (
161                id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
162                multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
163                price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
164                min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
165            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
166            ON CONFLICT (id)
167            DO UPDATE
168            SET
169                kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
170                 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
171                 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
172                 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32,  ts_event = $33, updated_at = CURRENT_TIMESTAMP
173            "#)
174            .bind(instrument.id().to_string())
175            .bind(kind)
176            .bind(instrument.raw_symbol().to_string())
177            .bind(instrument.base_currency().map(|x| x.code.as_str()))
178            .bind(instrument.underlying().map(|x| x.to_string()))
179            .bind(instrument.quote_currency().code.as_str())
180            .bind(instrument.settlement_currency().code.as_str())
181            .bind(instrument.isin().map(|x| x.to_string()))
182            .bind(AssetClassModel(instrument.asset_class()))
183            .bind(instrument.exchange().map(|x| x.to_string()))
184            .bind(instrument.multiplier().to_string())
185            .bind(instrument.option_kind().map(|x| x.to_string()))
186            .bind(instrument.is_inverse())
187            .bind(instrument.strike_price().map(|x| x.to_string()))
188            .bind(instrument.activation_ns().map(|x| x.to_string()))
189            .bind(instrument.expiration_ns().map(|x| x.to_string()))
190            .bind(i32::from(instrument.price_precision()))
191            .bind(i32::from(instrument.size_precision()))
192            .bind(instrument.price_increment().to_string())
193            .bind(instrument.size_increment().to_string())
194            .bind(instrument.maker_fee().to_string())
195            .bind(instrument.taker_fee().to_string())
196            .bind(instrument.margin_init().to_string())
197            .bind(instrument.margin_maint().to_string())
198            .bind(instrument.lot_size().map(|x| x.to_string()))
199            .bind(instrument.max_quantity().map(|x| x.to_string()))
200            .bind(instrument.min_quantity().map(|x| x.to_string()))
201            .bind(instrument.max_notional().map(|x| x.to_string()))
202            .bind(instrument.min_notional().map(|x| x.to_string()))
203            .bind(instrument.max_price().map(|x| x.to_string()))
204            .bind(instrument.min_price().map(|x| x.to_string()))
205            .bind(instrument.ts_init().to_string())
206            .bind(instrument.ts_event().to_string())
207            .execute(pool)
208            .await
209            .map(|_| ())
210            .map_err(|e| anyhow::anyhow!(format!("Failed to insert item {} into instrument table: {:?}", instrument.id().to_string(), e)))
211    }
212
213    /// Loads a single `InstrumentAny` entry by `instrument_id` via the provided `pool`.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if the SELECT operation fails.
218    pub async fn load_instrument(
219        pool: &PgPool,
220        instrument_id: &InstrumentId,
221    ) -> anyhow::Result<Option<InstrumentAny>> {
222        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
223            .bind(instrument_id.to_string())
224            .fetch_optional(pool)
225            .await
226            .map(|instrument| instrument.map(|row| row.0))
227            .map_err(|e| {
228                anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
229            })
230    }
231
232    /// Loads all `InstrumentAny` entries via the provided `pool`.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if the SELECT operation fails.
237    pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
238        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
239            .fetch_all(pool)
240            .await
241            .map(|rows| rows.into_iter().map(|row| row.0).collect())
242            .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
243    }
244
245    /// Inserts or updates an `OrderAny` entry via the provided `pool`.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if the SQL INSERT or UPDATE operation fails.
250    ///
251    /// # Panics
252    ///
253    /// Panics if the order initialization existence check unwraps `None` after awaiting.
254    pub async fn add_order(
255        pool: &PgPool,
256        _kind: &str,
257        updated: bool,
258        order: Box<dyn Order>,
259        client_id: Option<ClientId>,
260    ) -> anyhow::Result<()> {
261        if updated {
262            let exists = Self::check_if_order_initialized_exists(pool, order.client_order_id())
263                .await
264                .unwrap();
265            assert!(
266                exists,
267                "OrderInitialized event does not exist for order: {}",
268                order.client_order_id()
269            );
270        }
271        match order.last_event().clone() {
272            OrderEventAny::Accepted(event) => {
273                Self::add_order_event(pool, Box::new(event), client_id).await
274            }
275            OrderEventAny::CancelRejected(event) => {
276                Self::add_order_event(pool, Box::new(event), client_id).await
277            }
278            OrderEventAny::Canceled(event) => {
279                Self::add_order_event(pool, Box::new(event), client_id).await
280            }
281            OrderEventAny::Denied(event) => {
282                Self::add_order_event(pool, Box::new(event), client_id).await
283            }
284            OrderEventAny::Emulated(event) => {
285                Self::add_order_event(pool, Box::new(event), client_id).await
286            }
287            OrderEventAny::Expired(event) => {
288                Self::add_order_event(pool, Box::new(event), client_id).await
289            }
290            OrderEventAny::Filled(event) => {
291                Self::add_order_event(pool, Box::new(event), client_id).await
292            }
293            OrderEventAny::Initialized(event) => {
294                Self::add_order_event(pool, Box::new(event), client_id).await
295            }
296            OrderEventAny::ModifyRejected(event) => {
297                Self::add_order_event(pool, Box::new(event), client_id).await
298            }
299            OrderEventAny::PendingCancel(event) => {
300                Self::add_order_event(pool, Box::new(event), client_id).await
301            }
302            OrderEventAny::PendingUpdate(event) => {
303                Self::add_order_event(pool, Box::new(event), client_id).await
304            }
305            OrderEventAny::Rejected(event) => {
306                Self::add_order_event(pool, Box::new(event), client_id).await
307            }
308            OrderEventAny::Released(event) => {
309                Self::add_order_event(pool, Box::new(event), client_id).await
310            }
311            OrderEventAny::Submitted(event) => {
312                Self::add_order_event(pool, Box::new(event), client_id).await
313            }
314            OrderEventAny::Updated(event) => {
315                Self::add_order_event(pool, Box::new(event), client_id).await
316            }
317            OrderEventAny::Triggered(event) => {
318                Self::add_order_event(pool, Box::new(event), client_id).await
319            }
320        }
321    }
322
323    /// Inserts an `OrderSnapshot` entry via the provided `pool`.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if the SQL INSERT operation fails.
328    ///
329    /// # Panics
330    ///
331    /// Panics if serialization of `snapshot.exec_algorithm_params` fails.
332    pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
333        let mut transaction = pool.begin().await?;
334
335        // Insert trader if it does not exist
336        // TODO remove this when node and trader initialization is implemented
337        sqlx::query(
338            r#"
339            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
340            "#,
341        )
342        .bind(snapshot.trader_id.to_string())
343        .execute(&mut *transaction)
344        .await
345        .map(|_| ())
346        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
347
348        sqlx::query(
349            r#"
350            INSERT INTO "order" (
351                id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
352                account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
353                trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
354                expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
355                is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
356                trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
357                parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
358                created_at, updated_at
359            ) VALUES (
360                $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
361                $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
362                $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
363                CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
364            )
365            ON CONFLICT (id)
366            DO UPDATE SET
367                trader_id = $2,
368                strategy_id = $3,
369                instrument_id = $4,
370                venue_order_id = $5,
371                position_id = $6,
372                account_id = $7,
373                last_trade_id = $8,
374                order_type = $9,
375                order_side = $10,
376                quantity = $11,
377                price = $12,
378                trigger_price = $13,
379                trigger_type = $14,
380                limit_offset = $15,
381                trailing_offset = $16,
382                trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
383                time_in_force = $18,
384                expire_time = $19,
385                filled_qty = $20,
386                liquidity_side = $21,
387                avg_px = $22,
388                slippage = $23,
389                commissions = $24,
390                status = $25,
391                is_post_only = $26,
392                is_reduce_only = $27,
393                is_quote_quantity = $28,
394                display_qty = $29,
395                emulation_trigger = $30,
396                trigger_instrument_id = $31,
397                contingency_type = $32,
398                order_list_id = $33,
399                linked_order_ids = $34,
400                parent_order_id = $35,
401                exec_algorithm_id = $36,
402                exec_algorithm_params = $37,
403                exec_spawn_id = $38,
404                tags = $39,
405                init_id = $40,
406                ts_init = $41,
407                ts_last = $42,
408                updated_at = CURRENT_TIMESTAMP
409        "#)
410            .bind(snapshot.client_order_id.to_string())  // Used for both id and client_order_id
411            .bind(snapshot.trader_id.to_string())
412            .bind(snapshot.strategy_id.to_string())
413            .bind(snapshot.instrument_id.to_string())
414            .bind(snapshot.venue_order_id.map(|x| x.to_string()))
415            .bind(snapshot.position_id.map(|x| x.to_string()))
416            .bind(snapshot.account_id.map(|x| x.to_string()))
417            .bind(snapshot.last_trade_id.map(|x| x.to_string()))
418            .bind(snapshot.order_type.to_string())
419            .bind(snapshot.order_side.to_string())
420            .bind(snapshot.quantity.to_string())
421            .bind(snapshot.price.map(|x| x.to_string()))
422            .bind(snapshot.trigger_price.map(|x| x.to_string()))
423            .bind(snapshot.trigger_type.map(|x| x.to_string()))
424            .bind(snapshot.limit_offset.map(|x| x.to_string()))
425            .bind(snapshot.trailing_offset.map(|x| x.to_string()))
426            .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
427            .bind(snapshot.time_in_force.to_string())
428            .bind(snapshot.expire_time.map(|x| x.to_string()))
429            .bind(snapshot.filled_qty.to_string())
430            .bind(snapshot.liquidity_side.map(|x| x.to_string()))
431            .bind(snapshot.avg_px)
432            .bind(snapshot.slippage)
433            .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
434            .bind(snapshot.status.to_string())
435            .bind(snapshot.is_post_only)
436            .bind(snapshot.is_reduce_only)
437            .bind(snapshot.is_quote_quantity)
438            .bind(snapshot.display_qty.map(|x| x.to_string()))
439            .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
440            .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
441            .bind(snapshot.contingency_type.map(|x| x.to_string()))
442            .bind(snapshot.order_list_id.map(|x| x.to_string()))
443            .bind(snapshot.linked_order_ids.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
444            .bind(snapshot.parent_order_id.map(|x| x.to_string()))
445            .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
446            .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
447            .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
448            .bind(snapshot.tags.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
449            .bind(snapshot.init_id.to_string())
450            .bind(snapshot.ts_init.to_string())
451            .bind(snapshot.ts_last.to_string())
452            .execute(&mut *transaction)
453            .await
454            .map(|_| ())
455            .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
456
457        transaction
458            .commit()
459            .await
460            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
461    }
462
463    /// Loads an `OrderSnapshot` entry by client order ID via the provided `pool`.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if the SQL SELECT or deserialization fails.
468    pub async fn load_order_snapshot(
469        pool: &PgPool,
470        client_order_id: &ClientOrderId,
471    ) -> anyhow::Result<Option<OrderSnapshot>> {
472        sqlx::query_as::<_, OrderSnapshotModel>(
473            r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
474        )
475        .bind(client_order_id.to_string())
476        .fetch_optional(pool)
477        .await
478        .map(|model| model.map(|m| m.0))
479        .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
480    }
481
482    /// Inserts or updates a `PositionSnapshot` entry via the provided `pool`.
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if the SQL INSERT or UPDATE operation fails, or if beginning the transaction fails.
487    pub async fn add_position_snapshot(
488        pool: &PgPool,
489        snapshot: PositionSnapshot,
490    ) -> anyhow::Result<()> {
491        let mut transaction = pool.begin().await?;
492
493        // Insert trader if it does not exist
494        // TODO remove this when node and trader initialization is implemented
495        sqlx::query(
496            r#"
497            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
498        "#,
499        )
500        .bind(snapshot.trader_id.to_string())
501        .execute(&mut *transaction)
502        .await
503        .map(|_| ())
504        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
505
506        sqlx::query(r#"
507            INSERT INTO "position" (
508                id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
509                quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
510                duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
511            ) VALUES (
512                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
513                $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
514            )
515            ON CONFLICT (id)
516            DO UPDATE
517            SET
518                trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
519                peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
520                commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
521        "#)
522            .bind(snapshot.position_id.to_string())
523            .bind(snapshot.trader_id.to_string())
524            .bind(snapshot.strategy_id.to_string())
525            .bind(snapshot.instrument_id.to_string())
526            .bind(snapshot.account_id.to_string())
527            .bind(snapshot.opening_order_id.to_string())
528            .bind(snapshot.closing_order_id.map(|x| x.to_string()))
529            .bind(snapshot.entry.to_string())
530            .bind(snapshot.side.to_string())
531            .bind(snapshot.signed_qty)
532            .bind(snapshot.quantity.to_string())
533            .bind(snapshot.peak_qty.to_string())
534            .bind(snapshot.quote_currency.to_string())
535            .bind(snapshot.base_currency.map(|x| x.to_string()))
536            .bind(snapshot.settlement_currency.to_string())
537            .bind(snapshot.avg_px_open)
538            .bind(snapshot.avg_px_close)
539            .bind(snapshot.realized_return)
540            .bind(snapshot.realized_pnl.map(|x| x.to_string()))
541            .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
542            .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
543            .bind(snapshot.duration_ns.map(|x| x.to_string()))
544            .bind(snapshot.ts_opened.to_string())
545            .bind(snapshot.ts_closed.map(|x| x.to_string()))
546            .bind(snapshot.ts_init.to_string())
547            .bind(snapshot.ts_last.to_string())
548            .execute(&mut *transaction)
549            .await
550            .map(|_| ())
551            .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
552        transaction
553            .commit()
554            .await
555            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
556    }
557
558    /// Loads a `PositionSnapshot` entry by `position_id` via the provided `pool`.
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if the SQL SELECT or deserialization fails.
563    pub async fn load_position_snapshot(
564        pool: &PgPool,
565        position_id: &PositionId,
566    ) -> anyhow::Result<Option<PositionSnapshot>> {
567        sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
568            .bind(position_id.to_string())
569            .fetch_optional(pool)
570            .await
571            .map(|model| model.map(|m| m.0))
572            .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
573    }
574
575    /// Checks if an `OrderInitialized` event exists for the given `client_order_id` via the provided `pool`.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if the SQL SELECT operation fails.
580    pub async fn check_if_order_initialized_exists(
581        pool: &PgPool,
582        client_order_id: ClientOrderId,
583    ) -> anyhow::Result<bool> {
584        sqlx::query(r#"
585            SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
586        "#)
587            .bind(client_order_id.to_string())
588            .fetch_one(pool)
589            .await
590            .map(|row| row.get(0))
591            .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
592    }
593
594    /// Checks if any account event exists for the given `account_id` via the provided `pool`.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if the SQL SELECT operation fails.
599    pub async fn check_if_account_event_exists(
600        pool: &PgPool,
601        account_id: AccountId,
602    ) -> anyhow::Result<bool> {
603        sqlx::query(
604            r#"
605            SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
606        "#,
607        )
608        .bind(account_id.to_string())
609        .fetch_one(pool)
610        .await
611        .map(|row| row.get(0))
612        .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
613    }
614
615    /// Inserts or updates an order event entry via the provided `pool`.
616    ///
617    /// # Errors
618    ///
619    /// Returns an error if the SQL INSERT or UPDATE operation fails.
620    pub async fn add_order_event(
621        pool: &PgPool,
622        order_event: Box<dyn OrderEvent>,
623        client_id: Option<ClientId>,
624    ) -> anyhow::Result<()> {
625        let mut transaction = pool.begin().await?;
626
627        // Insert trader if it does not exist
628        // TODO remove this when node and trader initialization is implemented
629        sqlx::query(
630            r#"
631            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
632        "#,
633        )
634        .bind(order_event.trader_id().to_string())
635        .execute(&mut *transaction)
636        .await
637        .map(|_| ())
638        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
639
640        // Insert client if it does not exist
641        // TODO remove this when client initialization is implemented
642        if let Some(client_id) = client_id {
643            sqlx::query(
644                r#"
645                INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
646            "#,
647            )
648            .bind(client_id.to_string())
649            .execute(&mut *transaction)
650            .await
651            .map(|_| ())
652            .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
653        }
654
655        sqlx::query(r#"
656            INSERT INTO "order_event" (
657                id, kind, client_order_id, order_type, order_side, trader_id, client_id, reason, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
658                post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
659                trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
660                order_list_id, linked_order_ids, parent_order_id,
661                exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
662            ) VALUES (
663                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
664                $21, $22, $23, $24, $25, $26::trailing_offset_type, $27, $28, $29, $30, $31, $32, $33, $34,
665                $35, $36, $37, $38, $39, $40, $41, $42, $43, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
666            )
667            ON CONFLICT (id)
668            DO UPDATE
669            SET
670                kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, reason = $8, strategy_id = $9, instrument_id = $10, trade_id = $11, currency = $12,
671                quantity = $13, time_in_force = $14, liquidity_side = $15, post_only = $16, reduce_only = $17, quote_quantity = $18, reconciliation = $19, price = $20, last_px = $21,
672                last_qty = $22, trigger_price = $23, trigger_type = $24, limit_offset = $25, trailing_offset = $26, trailing_offset_type = $27, expire_time = $28, display_qty = $29,
673                emulation_trigger = $30, trigger_instrument_id = $31, contingency_type = $32, order_list_id = $33, linked_order_ids = $34, parent_order_id = $35, exec_algorithm_id = $36,
674                exec_spawn_id = $37, venue_order_id = $38, account_id = $39, position_id = $40, commission = $41, ts_event = $42, ts_init = $43, updated_at = CURRENT_TIMESTAMP
675
676        "#)
677            .bind(order_event.id().to_string())
678            .bind(order_event.kind())
679            .bind(order_event.client_order_id().to_string())
680            .bind(order_event.order_type().map(|x| x.to_string()))
681            .bind(order_event.order_side().map(|x| x.to_string()))
682            .bind(order_event.trader_id().to_string())
683            .bind(client_id.map(|x| x.to_string()))
684            .bind(order_event.reason().map(|x| x.to_string()))
685            .bind(order_event.strategy_id().to_string())
686            .bind(order_event.instrument_id().to_string())
687            .bind(order_event.trade_id().map(|x| x.to_string()))
688            .bind(order_event.currency().map(|x| x.code.as_str()))
689            .bind(order_event.quantity().map(|x| x.to_string()))
690            .bind(order_event.time_in_force().map(|x| x.to_string()))
691            .bind(order_event.liquidity_side().map(|x| x.to_string()))
692            .bind(order_event.post_only())
693            .bind(order_event.reduce_only())
694            .bind(order_event.quote_quantity())
695            .bind(order_event.reconciliation())
696            .bind(order_event.price().map(|x| x.to_string()))
697            .bind(order_event.last_px().map(|x| x.to_string()))
698            .bind(order_event.last_qty().map(|x| x.to_string()))
699            .bind(order_event.trigger_price().map(|x| x.to_string()))
700            .bind(order_event.trigger_type().map(|x| x.to_string()))
701            .bind(order_event.limit_offset().map(|x| x.to_string()))
702            .bind(order_event.trailing_offset().map(|x| x.to_string()))
703            .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
704            .bind(order_event.expire_time().map(|x| x.to_string()))
705            .bind(order_event.display_qty().map(|x| x.to_string()))
706            .bind(order_event.emulation_trigger().map(|x| x.to_string()))
707            .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
708            .bind(order_event.contingency_type().map(|x| x.to_string()))
709            .bind(order_event.order_list_id().map(|x| x.to_string()))
710            .bind(order_event.linked_order_ids().map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
711            .bind(order_event.parent_order_id().map(|x| x.to_string()))
712            .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
713            .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
714            .bind(order_event.venue_order_id().map(|x| x.to_string()))
715            .bind(order_event.account_id().map(|x| x.to_string()))
716            .bind(order_event.position_id().map(|x| x.to_string()))
717            .bind(order_event.commission().map(|x| x.to_string()))
718            .bind(order_event.ts_event().to_string())
719            .bind(order_event.ts_init().to_string())
720            .execute(&mut *transaction)
721            .await
722            .map(|_| ())
723            .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
724        transaction
725            .commit()
726            .await
727            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
728    }
729
730    /// Loads all order events for a `client_order_id` via the provided `pool`.
731    ///
732    /// # Errors
733    ///
734    /// Returns an error if the SQL SELECT or deserialization fails.
735    pub async fn load_order_events(
736        pool: &PgPool,
737        client_order_id: &ClientOrderId,
738    ) -> anyhow::Result<Vec<OrderEventAny>> {
739        sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
740        .bind(client_order_id.to_string())
741        .fetch_all(pool)
742        .await
743        .map(|rows| rows.into_iter().map(|row| row.0).collect())
744        .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
745    }
746
747    /// Loads and assembles a complete `OrderAny` for a `client_order_id` via the provided `pool`.
748    ///
749    /// # Errors
750    ///
751    /// Returns an error if assembling events or SQL operations fail.
752    ///
753    /// # Panics
754    ///
755    /// Panics if assembling the order from events fails.
756    pub async fn load_order(
757        pool: &PgPool,
758        client_order_id: &ClientOrderId,
759    ) -> anyhow::Result<Option<OrderAny>> {
760        let order_events = Self::load_order_events(pool, client_order_id).await;
761
762        match order_events {
763            Ok(order_events) => {
764                if order_events.is_empty() {
765                    return Ok(None);
766                }
767                let order = OrderAny::from_events(order_events).unwrap();
768                Ok(Some(order))
769            }
770            Err(e) => anyhow::bail!("Failed to load order events: {e}"),
771        }
772    }
773
774    /// Loads and assembles all `OrderAny` entries via the provided `pool`.
775    ///
776    /// # Errors
777    ///
778    /// Returns an error if loading events or SQL operations fail.
779    ///
780    /// # Panics
781    ///
782    /// Panics if loading or assembling any individual order fails.
783    pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
784        let mut orders: Vec<OrderAny> = Vec::new();
785        let client_order_ids: Vec<ClientOrderId> = sqlx::query(
786            r#"
787            SELECT DISTINCT client_order_id FROM "order_event"
788        "#,
789        )
790        .fetch_all(pool)
791        .await
792        .map(|rows| {
793            rows.into_iter()
794                .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
795                .collect()
796        })
797        .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
798        for id in client_order_ids {
799            let order = Self::load_order(pool, &id).await.unwrap();
800            match order {
801                Some(order) => {
802                    orders.push(order);
803                }
804                None => {
805                    continue;
806                }
807            }
808        }
809        Ok(orders)
810    }
811
812    /// Inserts or updates an `AccountAny` entry via the provided `pool`.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if the SQL INSERT or UPDATE operation fails.
817    ///
818    /// # Panics
819    ///
820    /// Panics if checking for existing account event unwrap fails.
821    pub async fn add_account(
822        pool: &PgPool,
823        kind: &str,
824        updated: bool,
825        account: Box<dyn Account>,
826    ) -> anyhow::Result<()> {
827        if updated {
828            let exists = Self::check_if_account_event_exists(pool, account.id())
829                .await
830                .unwrap();
831            assert!(
832                exists,
833                "Account event does not exist for account: {}",
834                account.id()
835            );
836        }
837
838        let mut transaction = pool.begin().await?;
839
840        sqlx::query(
841            r#"
842            INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
843        "#,
844        )
845        .bind(account.id().to_string())
846        .execute(&mut *transaction)
847        .await
848        .map(|_| ())
849        .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
850
851        let account_event = account.last_event().unwrap();
852        sqlx::query(r#"
853            INSERT INTO "account_event" (
854                id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
855            ) VALUES (
856                $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
857            )
858            ON CONFLICT (id)
859            DO UPDATE
860            SET
861                kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
862                ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
863        "#)
864            .bind(account_event.event_id.to_string())
865            .bind(kind.to_string())
866            .bind(account_event.account_id.to_string())
867            .bind(account_event.base_currency.map(|x| x.code.as_str()))
868            .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
869            .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
870            .bind(account_event.is_reported)
871            .bind(account_event.ts_event.to_string())
872            .bind(account_event.ts_init.to_string())
873            .execute(&mut *transaction)
874            .await
875            .map(|_| ())
876            .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
877        transaction
878            .commit()
879            .await
880            .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
881    }
882
883    /// Loads all account events for `account_id` via the provided `pool`.
884    ///
885    /// # Errors
886    ///
887    /// Returns an error if the SQL SELECT or deserialization fails.
888    pub async fn load_account_events(
889        pool: &PgPool,
890        account_id: &AccountId,
891    ) -> anyhow::Result<Vec<AccountState>> {
892        sqlx::query_as::<_, AccountEventModel>(
893            r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
894        )
895        .bind(account_id.to_string())
896        .fetch_all(pool)
897        .await
898        .map(|rows| rows.into_iter().map(|row| row.0).collect())
899        .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
900    }
901
902    /// Loads and assembles a complete `AccountAny` for `account_id` via the provided `pool`.
903    ///
904    /// # Errors
905    ///
906    /// Returns an error if assembling events or SQL operations fail.
907    ///
908    /// # Panics
909    ///
910    /// Panics if assembling the account from events fails.
911    pub async fn load_account(
912        pool: &PgPool,
913        account_id: &AccountId,
914    ) -> anyhow::Result<Option<AccountAny>> {
915        let account_events = Self::load_account_events(pool, account_id).await;
916        match account_events {
917            Ok(account_events) => {
918                if account_events.is_empty() {
919                    return Ok(None);
920                }
921                let account = AccountAny::from_events(account_events).unwrap();
922                Ok(Some(account))
923            }
924            Err(e) => anyhow::bail!("Failed to load account events: {e}"),
925        }
926    }
927
928    /// Loads and assembles all `AccountAny` entries via the provided `pool`.
929    ///
930    /// # Errors
931    ///
932    /// Returns an error if loading events or SQL operations fail.
933    ///
934    /// # Panics
935    ///
936    /// Panics if loading or assembling any individual account fails.
937    pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
938        let mut accounts: Vec<AccountAny> = Vec::new();
939        let account_ids: Vec<AccountId> = sqlx::query(
940            r#"
941            SELECT DISTINCT account_id FROM "account_event"
942        "#,
943        )
944        .fetch_all(pool)
945        .await
946        .map(|rows| {
947            rows.into_iter()
948                .map(|row| AccountId::from(row.get::<&str, _>(0)))
949                .collect()
950        })
951        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
952        for id in account_ids {
953            let account = Self::load_account(pool, &id).await.unwrap();
954            match account {
955                Some(account) => {
956                    accounts.push(account);
957                }
958                None => {
959                    continue;
960                }
961            }
962        }
963        Ok(accounts)
964    }
965
966    /// Inserts a `TradeTick` entry via the provided `pool`.
967    ///
968    /// # Errors
969    ///
970    /// Returns an error if the SQL INSERT operation fails.
971    pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
972        sqlx::query(r#"
973            INSERT INTO "trade" (
974                instrument_id, price, quantity, aggressor_side, venue_trade_id,
975                ts_event, ts_init, created_at, updated_at
976            ) VALUES (
977                $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
978            )
979            ON CONFLICT (id)
980            DO UPDATE
981            SET
982                instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
983                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
984        "#)
985            .bind(trade.instrument_id.to_string())
986            .bind(trade.price.to_string())
987            .bind(trade.size.to_string())
988            .bind(AggressorSideModel(trade.aggressor_side))
989            .bind(trade.trade_id.to_string())
990            .bind(trade.ts_event.to_string())
991            .bind(trade.ts_init.to_string())
992            .execute(pool)
993            .await
994            .map(|_| ())
995            .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
996    }
997
998    /// Loads all `TradeTick` entries for `instrument_id` via the provided `pool`.
999    ///
1000    /// # Errors
1001    ///
1002    /// Returns an error if the SQL SELECT or deserialization fails.
1003    pub async fn load_trades(
1004        pool: &PgPool,
1005        instrument_id: &InstrumentId,
1006    ) -> anyhow::Result<Vec<TradeTick>> {
1007        sqlx::query_as::<_, TradeTickModel>(
1008            r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1009        )
1010        .bind(instrument_id.to_string())
1011        .fetch_all(pool)
1012        .await
1013        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1014        .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
1015    }
1016
1017    /// Inserts a `QuoteTick` entry via the provided `pool`.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns an error if the SQL INSERT operation fails.
1022    pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
1023        sqlx::query(r#"
1024            INSERT INTO "quote" (
1025                instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
1026            ) VALUES (
1027                $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1028            )
1029            ON CONFLICT (id)
1030            DO UPDATE
1031            SET
1032                instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
1033                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
1034        "#)
1035            .bind(quote.instrument_id.to_string())
1036            .bind(quote.bid_price.to_string())
1037            .bind(quote.ask_price.to_string())
1038            .bind(quote.bid_size.to_string())
1039            .bind(quote.ask_size.to_string())
1040            .bind(quote.ts_event.to_string())
1041            .bind(quote.ts_init.to_string())
1042            .execute(pool)
1043            .await
1044            .map(|_| ())
1045            .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
1046    }
1047
1048    /// Loads all `QuoteTick` entries for `instrument_id` via the provided `pool`.
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if the SQL SELECT or deserialization fails.
1053    pub async fn load_quotes(
1054        pool: &PgPool,
1055        instrument_id: &InstrumentId,
1056    ) -> anyhow::Result<Vec<QuoteTick>> {
1057        sqlx::query_as::<_, QuoteTickModel>(
1058            r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1059        )
1060        .bind(instrument_id.to_string())
1061        .fetch_all(pool)
1062        .await
1063        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1064        .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
1065    }
1066
1067    /// Inserts a `Bar` entry via the provided `pool`.
1068    ///
1069    /// # Errors
1070    ///
1071    /// Returns an error if the SQL INSERT operation fails.
1072    pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
1073        println!("Adding bar: {bar:?}");
1074        sqlx::query(r#"
1075            INSERT INTO "bar" (
1076                instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
1077            ) VALUES (
1078                $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1079            )
1080            ON CONFLICT (id)
1081            DO UPDATE
1082            SET
1083                instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
1084                open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
1085        "#)
1086            .bind(bar.bar_type.instrument_id().to_string())
1087            .bind(bar.bar_type.spec().step.get() as i32)
1088            .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
1089            .bind(PriceTypeModel(bar.bar_type.spec().price_type))
1090            .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
1091            .bind(bar.open.to_string())
1092            .bind(bar.high.to_string())
1093            .bind(bar.low.to_string())
1094            .bind(bar.close.to_string())
1095            .bind(bar.volume.to_string())
1096            .bind(bar.ts_event.to_string())
1097            .bind(bar.ts_init.to_string())
1098            .execute(pool)
1099            .await
1100            .map(|_| ())
1101            .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
1102    }
1103
1104    /// Loads all `Bar` entries for `instrument_id` via the provided `pool`.
1105    ///
1106    /// # Errors
1107    ///
1108    /// Returns an error if the SQL SELECT or deserialization fails.
1109    pub async fn load_bars(
1110        pool: &PgPool,
1111        instrument_id: &InstrumentId,
1112    ) -> anyhow::Result<Vec<Bar>> {
1113        sqlx::query_as::<_, BarModel>(
1114            r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1115        )
1116        .bind(instrument_id.to_string())
1117        .fetch_all(pool)
1118        .await
1119        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1120        .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
1121    }
1122
1123    /// Loads all distinct client order IDs from order events via the provided `pool`.
1124    ///
1125    /// # Errors
1126    ///
1127    /// Returns an error if the SQL SELECT or iteration fails.
1128    pub async fn load_distinct_order_event_client_ids(
1129        pool: &PgPool,
1130    ) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
1131        let mut map: HashMap<ClientOrderId, ClientId> = HashMap::new();
1132        let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
1133            r#"
1134            SELECT DISTINCT
1135                client_order_id AS "client_order_id",
1136                client_id AS "client_id"
1137            FROM "order_event"
1138        "#,
1139        )
1140        .fetch_all(pool)
1141        .await
1142        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
1143        for id in result {
1144            map.insert(id.client_order_id, id.client_id);
1145        }
1146        Ok(map)
1147    }
1148
1149    /// Inserts a `Signal` entry via the provided `pool`.
1150    ///
1151    /// # Errors
1152    ///
1153    /// Returns an error if the SQL INSERT operation fails.
1154    pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
1155        sqlx::query(
1156            r#"
1157            INSERT INTO "signal" (
1158                name, value, ts_event, ts_init, created_at, updated_at
1159            ) VALUES (
1160                $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1161            )
1162            ON CONFLICT (id)
1163            DO UPDATE
1164            SET
1165                name = $1, value = $2, ts_event = $3, ts_init = $4,
1166                updated_at = CURRENT_TIMESTAMP
1167        "#,
1168        )
1169        .bind(signal.name.to_string())
1170        .bind(signal.value.to_string())
1171        .bind(signal.ts_event.to_string())
1172        .bind(signal.ts_init.to_string())
1173        .execute(pool)
1174        .await
1175        .map(|_| ())
1176        .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
1177    }
1178
1179    /// Loads all `Signal` entries by `name` via the provided `pool`.
1180    ///
1181    /// # Errors
1182    ///
1183    /// Returns an error if the SQL SELECT or deserialization fails.
1184    pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
1185        sqlx::query_as::<_, SignalModel>(
1186            r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
1187        )
1188        .bind(name)
1189        .fetch_all(pool)
1190        .await
1191        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1192        .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1193    }
1194
1195    /// Inserts a `CustomData` entry via the provided `pool`.
1196    ///
1197    /// # Errors
1198    ///
1199    /// Returns an error if the SQL INSERT operation fails.
1200    pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1201        sqlx::query(
1202            r#"
1203            INSERT INTO "custom" (
1204                data_type, metadata, value, ts_event, ts_init, created_at, updated_at
1205            ) VALUES (
1206                $1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1207            )
1208            ON CONFLICT (id)
1209            DO UPDATE
1210            SET
1211                data_type = $1, metadata = $2, value = $3, ts_event = $4, ts_init = $5,
1212                updated_at = CURRENT_TIMESTAMP
1213        "#,
1214        )
1215        .bind(data.data_type.type_name().to_string())
1216        .bind(
1217            data.data_type
1218                .metadata()
1219                .as_ref()
1220                .map_or_else(|| Ok(serde_json::Value::Null), serde_json::to_value)?,
1221        )
1222        .bind(data.value.to_vec())
1223        .bind(data.ts_event.to_string())
1224        .bind(data.ts_init.to_string())
1225        .execute(pool)
1226        .await
1227        .map(|_| ())
1228        .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1229    }
1230
1231    /// Loads all `CustomData` entries of `data_type` via the provided `pool`.
1232    ///
1233    /// # Errors
1234    ///
1235    /// Returns an error if the SQL SELECT or deserialization fails.
1236    pub async fn load_custom_data(
1237        pool: &PgPool,
1238        data_type: &DataType,
1239    ) -> anyhow::Result<Vec<CustomData>> {
1240        // TODO: This metadata JSON could be more efficient at some point
1241        let metadata_json = data_type
1242            .metadata()
1243            .as_ref()
1244            .map_or(Ok(serde_json::Value::Null), |metadata| {
1245                serde_json::to_value(metadata)
1246            })?;
1247
1248        sqlx::query_as::<_, CustomDataModel>(
1249            r#"SELECT * FROM "custom" WHERE data_type = $1 AND metadata = $2 ORDER BY ts_init ASC"#,
1250        )
1251        .bind(data_type.type_name())
1252        .bind(metadata_json)
1253        .fetch_all(pool)
1254        .await
1255        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1256        .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))
1257    }
1258}