1use std::{
17 collections::{HashMap, VecDeque},
18 fmt::Debug,
19 time::{Duration, Instant},
20};
21
22use bytes::Bytes;
23use nautilus_common::{
24 cache::{
25 CacheConfig,
26 database::{CacheDatabaseAdapter, CacheMap},
27 },
28 custom::CustomData,
29 enums::SerializationEncoding,
30 logging::{log_task_awaiting, log_task_started, log_task_stopped},
31 runtime::get_runtime,
32 signal::Signal,
33};
34use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
35use nautilus_cryptography::providers::install_cryptographic_provider;
36use nautilus_model::{
37 accounts::AccountAny,
38 data::{Bar, DataType, QuoteTick, TradeTick},
39 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
40 identifiers::{
41 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
42 TraderId, VenueOrderId,
43 },
44 instruments::{InstrumentAny, SyntheticInstrument},
45 orderbook::OrderBook,
46 orders::OrderAny,
47 position::Position,
48 types::Currency,
49};
50use redis::{Pipeline, aio::ConnectionManager};
51use tokio::try_join;
52use ustr::Ustr;
53
54use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
55use crate::redis::{create_redis_connection, queries::DatabaseQueries};
56
57const CACHE_READ: &str = "cache-read";
59const CACHE_WRITE: &str = "cache-write";
60const CACHE_PROCESS: &str = "cache-process";
61
62const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
64
65const INDEX: &str = "index";
67const GENERAL: &str = "general";
68const CURRENCIES: &str = "currencies";
69const INSTRUMENTS: &str = "instruments";
70const SYNTHETICS: &str = "synthetics";
71const ACCOUNTS: &str = "accounts";
72const ORDERS: &str = "orders";
73const POSITIONS: &str = "positions";
74const ACTORS: &str = "actors";
75const STRATEGIES: &str = "strategies";
76const SNAPSHOTS: &str = "snapshots";
77const HEALTH: &str = "health";
78
79const INDEX_ORDER_IDS: &str = "index:order_ids";
81const INDEX_ORDER_POSITION: &str = "index:order_position";
82const INDEX_ORDER_CLIENT: &str = "index:order_client";
83const INDEX_ORDERS: &str = "index:orders";
84const INDEX_ORDERS_OPEN: &str = "index:orders_open";
85const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
86const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
87const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
88const INDEX_POSITIONS: &str = "index:positions";
89const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
90const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
91
92#[derive(Clone, Debug)]
94pub enum DatabaseOperation {
95 Insert,
96 Update,
97 Delete,
98 Close,
99}
100
101#[derive(Clone, Debug)]
103pub struct DatabaseCommand {
104 pub op_type: DatabaseOperation,
106 pub key: Option<String>,
108 pub payload: Option<Vec<Bytes>>,
110}
111
112impl DatabaseCommand {
113 #[must_use]
115 pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
116 Self {
117 op_type,
118 key: Some(key),
119 payload,
120 }
121 }
122
123 #[must_use]
125 pub const fn close() -> Self {
126 Self {
127 op_type: DatabaseOperation::Close,
128 key: None,
129 payload: None,
130 }
131 }
132}
133
134#[cfg_attr(
135 feature = "python",
136 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.infrastructure")
137)]
138pub struct RedisCacheDatabase {
139 pub con: ConnectionManager,
140 pub trader_id: TraderId,
141 encoding: SerializationEncoding,
142 handle: tokio::task::JoinHandle<()>,
143 trader_key: String,
144 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
145}
146
147impl Debug for RedisCacheDatabase {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct(stringify!(RedisCacheDatabase))
150 .field("trader_id", &self.trader_id)
151 .field("encoding", &self.encoding)
152 .finish()
153 }
154}
155
156impl RedisCacheDatabase {
157 pub async fn new(
166 trader_id: TraderId,
167 instance_id: UUID4,
168 config: CacheConfig,
169 ) -> anyhow::Result<Self> {
170 install_cryptographic_provider();
171
172 let db_config = config
173 .database
174 .as_ref()
175 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
176 let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
177
178 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
179 let trader_key = get_trader_key(trader_id, instance_id, &config);
180 let trader_key_clone = trader_key.clone();
181 let encoding = config.encoding;
182 let handle = get_runtime().spawn(async move {
183 if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
184 log::error!("Error in task '{CACHE_PROCESS}': {e}");
185 }
186 });
187
188 Ok(Self {
189 con,
190 trader_id,
191 encoding,
192 handle,
193 trader_key,
194 tx,
195 })
196 }
197
198 #[must_use]
199 pub const fn get_encoding(&self) -> SerializationEncoding {
200 self.encoding
201 }
202
203 #[must_use]
204 pub fn get_trader_key(&self) -> &str {
205 &self.trader_key
206 }
207
208 pub fn close(&mut self) {
209 log::debug!("Closing");
210
211 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
212 log::debug!("Error sending close command: {e:?}");
213 }
214
215 log_task_awaiting(CACHE_PROCESS);
216
217 tokio::task::block_in_place(|| {
218 if let Err(e) = get_runtime().block_on(&mut self.handle) {
219 log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
220 }
221 });
222
223 log::debug!("Closed");
224 }
225
226 pub async fn flushdb(&mut self) {
227 if let Err(e) = redis::cmd(REDIS_FLUSHDB)
228 .query_async::<()>(&mut self.con)
229 .await
230 {
231 log::error!("Failed to flush database: {e:?}");
232 }
233 }
234
235 pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
241 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
242 log::debug!("Querying keys: {pattern}");
243 DatabaseQueries::scan_keys(&mut self.con, pattern).await
244 }
245
246 pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
252 DatabaseQueries::read(&self.con, &self.trader_key, key).await
253 }
254
255 pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
261 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
262 match self.tx.send(op) {
263 Ok(()) => Ok(()),
264 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
265 }
266 }
267
268 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
274 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
275 match self.tx.send(op) {
276 Ok(()) => Ok(()),
277 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
278 }
279 }
280
281 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
287 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
288 match self.tx.send(op) {
289 Ok(()) => Ok(()),
290 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
291 }
292 }
293}
294
295async fn process_commands(
296 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
297 trader_key: String,
298 config: CacheConfig,
299) -> anyhow::Result<()> {
300 log_task_started(CACHE_PROCESS);
301
302 let db_config = config
303 .database
304 .as_ref()
305 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
306 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
307
308 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
310 let mut last_drain = Instant::now();
311 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
312
313 loop {
315 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
316 drain_buffer(&mut con, &trader_key, &mut buffer).await;
317 last_drain = Instant::now();
318 } else if let Some(cmd) = rx.recv().await {
319 tracing::trace!("Received {cmd:?}");
320
321 if matches!(cmd.op_type, DatabaseOperation::Close) {
322 break;
323 }
324 buffer.push_back(cmd);
325 } else {
326 tracing::debug!("Command channel closed");
327 break;
328 }
329 }
330
331 if !buffer.is_empty() {
333 drain_buffer(&mut con, &trader_key, &mut buffer).await;
334 }
335
336 log_task_stopped(CACHE_PROCESS);
337 Ok(())
338}
339
340async fn drain_buffer(
341 conn: &mut ConnectionManager,
342 trader_key: &str,
343 buffer: &mut VecDeque<DatabaseCommand>,
344) {
345 let mut pipe = redis::pipe();
346 pipe.atomic();
347
348 for msg in buffer.drain(..) {
349 let key = if let Some(key) = msg.key {
350 key
351 } else {
352 log::error!("Null key found for message: {msg:?}");
353 continue;
354 };
355 let collection = match get_collection_key(&key) {
356 Ok(collection) => collection,
357 Err(e) => {
358 tracing::error!("{e}");
359 continue; }
361 };
362
363 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
364
365 match msg.op_type {
366 DatabaseOperation::Insert => {
367 if let Some(payload) = msg.payload {
368 if let Err(e) = insert(&mut pipe, collection, &key, payload) {
369 tracing::error!("{e}");
370 }
371 } else {
372 tracing::error!("Null `payload` for `insert`");
373 }
374 }
375 DatabaseOperation::Update => {
376 if let Some(payload) = msg.payload {
377 if let Err(e) = update(&mut pipe, collection, &key, payload) {
378 tracing::error!("{e}");
379 }
380 } else {
381 tracing::error!("Null `payload` for `update`");
382 }
383 }
384 DatabaseOperation::Delete => {
385 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
387 tracing::error!("{e}");
388 }
389 }
390 DatabaseOperation::Close => panic!("Close command should not be drained"),
391 }
392 }
393
394 if let Err(e) = pipe.query_async::<()>(conn).await {
395 tracing::error!("{e}");
396 }
397}
398
399fn insert(
400 pipe: &mut Pipeline,
401 collection: &str,
402 key: &str,
403 value: Vec<Bytes>,
404) -> anyhow::Result<()> {
405 check_slice_not_empty(value.as_slice(), stringify!(value))?;
406
407 match collection {
408 INDEX => insert_index(pipe, key, &value),
409 GENERAL => {
410 insert_string(pipe, key, value[0].as_ref());
411 Ok(())
412 }
413 CURRENCIES => {
414 insert_string(pipe, key, value[0].as_ref());
415 Ok(())
416 }
417 INSTRUMENTS => {
418 insert_string(pipe, key, value[0].as_ref());
419 Ok(())
420 }
421 SYNTHETICS => {
422 insert_string(pipe, key, value[0].as_ref());
423 Ok(())
424 }
425 ACCOUNTS => {
426 insert_list(pipe, key, value[0].as_ref());
427 Ok(())
428 }
429 ORDERS => {
430 insert_list(pipe, key, value[0].as_ref());
431 Ok(())
432 }
433 POSITIONS => {
434 insert_list(pipe, key, value[0].as_ref());
435 Ok(())
436 }
437 ACTORS => {
438 insert_string(pipe, key, value[0].as_ref());
439 Ok(())
440 }
441 STRATEGIES => {
442 insert_string(pipe, key, value[0].as_ref());
443 Ok(())
444 }
445 SNAPSHOTS => {
446 insert_list(pipe, key, value[0].as_ref());
447 Ok(())
448 }
449 HEALTH => {
450 insert_string(pipe, key, value[0].as_ref());
451 Ok(())
452 }
453 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
454 }
455}
456
457fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
458 let index_key = get_index_key(key)?;
459 match index_key {
460 INDEX_ORDER_IDS => {
461 insert_set(pipe, key, value[0].as_ref());
462 Ok(())
463 }
464 INDEX_ORDER_POSITION => {
465 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
466 Ok(())
467 }
468 INDEX_ORDER_CLIENT => {
469 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
470 Ok(())
471 }
472 INDEX_ORDERS => {
473 insert_set(pipe, key, value[0].as_ref());
474 Ok(())
475 }
476 INDEX_ORDERS_OPEN => {
477 insert_set(pipe, key, value[0].as_ref());
478 Ok(())
479 }
480 INDEX_ORDERS_CLOSED => {
481 insert_set(pipe, key, value[0].as_ref());
482 Ok(())
483 }
484 INDEX_ORDERS_EMULATED => {
485 insert_set(pipe, key, value[0].as_ref());
486 Ok(())
487 }
488 INDEX_ORDERS_INFLIGHT => {
489 insert_set(pipe, key, value[0].as_ref());
490 Ok(())
491 }
492 INDEX_POSITIONS => {
493 insert_set(pipe, key, value[0].as_ref());
494 Ok(())
495 }
496 INDEX_POSITIONS_OPEN => {
497 insert_set(pipe, key, value[0].as_ref());
498 Ok(())
499 }
500 INDEX_POSITIONS_CLOSED => {
501 insert_set(pipe, key, value[0].as_ref());
502 Ok(())
503 }
504 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
505 }
506}
507
508fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
509 pipe.set(key, value);
510}
511
512fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
513 pipe.sadd(key, value);
514}
515
516fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
517 pipe.hset(key, name, value);
518}
519
520fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
521 pipe.rpush(key, value);
522}
523
524fn update(
525 pipe: &mut Pipeline,
526 collection: &str,
527 key: &str,
528 value: Vec<Bytes>,
529) -> anyhow::Result<()> {
530 check_slice_not_empty(value.as_slice(), stringify!(value))?;
531
532 match collection {
533 ACCOUNTS => {
534 update_list(pipe, key, value[0].as_ref());
535 Ok(())
536 }
537 ORDERS => {
538 update_list(pipe, key, value[0].as_ref());
539 Ok(())
540 }
541 POSITIONS => {
542 update_list(pipe, key, value[0].as_ref());
543 Ok(())
544 }
545 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
546 }
547}
548
549fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
550 pipe.rpush_exists(key, value);
551}
552
553fn delete(
554 pipe: &mut Pipeline,
555 collection: &str,
556 key: &str,
557 value: Option<Vec<Bytes>>,
558) -> anyhow::Result<()> {
559 match collection {
560 INDEX => remove_index(pipe, key, value),
561 ACTORS => {
562 delete_string(pipe, key);
563 Ok(())
564 }
565 STRATEGIES => {
566 delete_string(pipe, key);
567 Ok(())
568 }
569 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
570 }
571}
572
573fn remove_index(pipe: &mut Pipeline, key: &str, value: Option<Vec<Bytes>>) -> anyhow::Result<()> {
574 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
575 let index_key = get_index_key(key)?;
576
577 match index_key {
578 INDEX_ORDERS_OPEN => {
579 remove_from_set(pipe, key, value[0].as_ref());
580 Ok(())
581 }
582 INDEX_ORDERS_CLOSED => {
583 remove_from_set(pipe, key, value[0].as_ref());
584 Ok(())
585 }
586 INDEX_ORDERS_EMULATED => {
587 remove_from_set(pipe, key, value[0].as_ref());
588 Ok(())
589 }
590 INDEX_ORDERS_INFLIGHT => {
591 remove_from_set(pipe, key, value[0].as_ref());
592 Ok(())
593 }
594 INDEX_POSITIONS_OPEN => {
595 remove_from_set(pipe, key, value[0].as_ref());
596 Ok(())
597 }
598 INDEX_POSITIONS_CLOSED => {
599 remove_from_set(pipe, key, value[0].as_ref());
600 Ok(())
601 }
602 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
603 }
604}
605
606fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
607 pipe.srem(key, member);
608}
609
610fn delete_string(pipe: &mut Pipeline, key: &str) {
611 pipe.del(key);
612}
613
614fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
615 let mut key = String::new();
616
617 if config.use_trader_prefix {
618 key.push_str("trader-");
619 }
620
621 key.push_str(trader_id.as_str());
622
623 if config.use_instance_id {
624 key.push(REDIS_DELIMITER);
625 key.push_str(&format!("{instance_id}"));
626 }
627
628 key
629}
630
631fn get_collection_key(key: &str) -> anyhow::Result<&str> {
632 key.split_once(REDIS_DELIMITER)
633 .map(|(collection, _)| collection)
634 .ok_or_else(|| {
635 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
636 })
637}
638
639fn get_index_key(key: &str) -> anyhow::Result<&str> {
640 key.split_once(REDIS_DELIMITER)
641 .map(|(_, index_key)| index_key)
642 .ok_or_else(|| {
643 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
644 })
645}
646
647#[allow(dead_code)] #[derive(Debug)]
649pub struct RedisCacheDatabaseAdapter {
650 pub encoding: SerializationEncoding,
651 database: RedisCacheDatabase,
652}
653
654#[allow(dead_code)] #[allow(unused)] #[async_trait::async_trait]
657impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
658 fn close(&mut self) -> anyhow::Result<()> {
659 self.database.close();
660 Ok(())
661 }
662
663 fn flush(&mut self) -> anyhow::Result<()> {
664 self.database.flushdb();
665 Ok(())
666 }
667
668 async fn load_all(&self) -> anyhow::Result<CacheMap> {
669 tracing::debug!("Loading all data");
670
671 let (
672 currencies,
673 instruments,
674 synthetics,
675 accounts,
676 orders,
677 positions,
678 greeks,
679 yield_curves,
680 ) = try_join!(
681 self.load_currencies(),
682 self.load_instruments(),
683 self.load_synthetics(),
684 self.load_accounts(),
685 self.load_orders(),
686 self.load_positions(),
687 self.load_greeks(),
688 self.load_yield_curves()
689 )
690 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
691
692 Ok(CacheMap {
693 currencies,
694 instruments,
695 synthetics,
696 accounts,
697 orders,
698 positions,
699 greeks,
700 yield_curves,
701 })
702 }
703
704 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
705 Ok(HashMap::new()) }
708
709 async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
710 DatabaseQueries::load_currencies(
711 &self.database.con,
712 &self.database.trader_key,
713 self.encoding,
714 )
715 .await
716 }
717
718 async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
719 DatabaseQueries::load_instruments(
720 &self.database.con,
721 &self.database.trader_key,
722 self.encoding,
723 )
724 .await
725 }
726
727 async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
728 DatabaseQueries::load_synthetics(
729 &self.database.con,
730 &self.database.trader_key,
731 self.encoding,
732 )
733 .await
734 }
735
736 async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
737 DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
738 .await
739 }
740
741 async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
742 DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
743 .await
744 }
745
746 async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
747 DatabaseQueries::load_positions(
748 &self.database.con,
749 &self.database.trader_key,
750 self.encoding,
751 )
752 .await
753 }
754
755 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
756 todo!()
757 }
758
759 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
760 todo!()
761 }
762
763 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
764 DatabaseQueries::load_currency(
765 &self.database.con,
766 &self.database.trader_key,
767 code,
768 self.encoding,
769 )
770 .await
771 }
772
773 async fn load_instrument(
774 &self,
775 instrument_id: &InstrumentId,
776 ) -> anyhow::Result<Option<InstrumentAny>> {
777 DatabaseQueries::load_instrument(
778 &self.database.con,
779 &self.database.trader_key,
780 instrument_id,
781 self.encoding,
782 )
783 .await
784 }
785
786 async fn load_synthetic(
787 &self,
788 instrument_id: &InstrumentId,
789 ) -> anyhow::Result<Option<SyntheticInstrument>> {
790 DatabaseQueries::load_synthetic(
791 &self.database.con,
792 &self.database.trader_key,
793 instrument_id,
794 self.encoding,
795 )
796 .await
797 }
798
799 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
800 DatabaseQueries::load_account(
801 &self.database.con,
802 &self.database.trader_key,
803 account_id,
804 self.encoding,
805 )
806 .await
807 }
808
809 async fn load_order(
810 &self,
811 client_order_id: &ClientOrderId,
812 ) -> anyhow::Result<Option<OrderAny>> {
813 DatabaseQueries::load_order(
814 &self.database.con,
815 &self.database.trader_key,
816 client_order_id,
817 self.encoding,
818 )
819 .await
820 }
821
822 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
823 DatabaseQueries::load_position(
824 &self.database.con,
825 &self.database.trader_key,
826 position_id,
827 self.encoding,
828 )
829 .await
830 }
831
832 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
833 todo!()
834 }
835
836 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
837 todo!()
838 }
839
840 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
841 todo!()
842 }
843
844 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
845 todo!()
846 }
847
848 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
849 todo!()
850 }
851
852 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
853 todo!()
854 }
855
856 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
857 todo!()
858 }
859
860 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
861 todo!()
862 }
863
864 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
865 todo!()
866 }
867
868 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
869 todo!()
870 }
871
872 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
873 todo!()
874 }
875
876 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
877 todo!()
878 }
879
880 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
881 todo!()
882 }
883
884 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
885 anyhow::bail!("Saving market data for Redis cache adapter not supported")
886 }
887
888 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
889 anyhow::bail!("Saving market data for Redis cache adapter not supported")
890 }
891
892 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
893 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
894 }
895
896 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
897 anyhow::bail!("Saving market data for Redis cache adapter not supported")
898 }
899
900 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
901 anyhow::bail!("Loading market data for Redis cache adapter not supported")
902 }
903
904 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
905 anyhow::bail!("Saving market data for Redis cache adapter not supported")
906 }
907
908 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
909 anyhow::bail!("Loading market data for Redis cache adapter not supported")
910 }
911
912 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
913 anyhow::bail!("Saving signals for Redis cache adapter not supported")
914 }
915
916 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
917 anyhow::bail!("Loading signals from Redis cache adapter not supported")
918 }
919
920 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
921 anyhow::bail!("Saving custom data for Redis cache adapter not supported")
922 }
923
924 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
925 anyhow::bail!("Loading custom data from Redis cache adapter not supported")
926 }
927
928 fn load_order_snapshot(
929 &self,
930 client_order_id: &ClientOrderId,
931 ) -> anyhow::Result<Option<OrderSnapshot>> {
932 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
933 }
934
935 fn load_position_snapshot(
936 &self,
937 position_id: &PositionId,
938 ) -> anyhow::Result<Option<PositionSnapshot>> {
939 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
940 }
941
942 fn index_venue_order_id(
943 &self,
944 client_order_id: ClientOrderId,
945 venue_order_id: VenueOrderId,
946 ) -> anyhow::Result<()> {
947 todo!()
948 }
949
950 fn index_order_position(
951 &self,
952 client_order_id: ClientOrderId,
953 position_id: PositionId,
954 ) -> anyhow::Result<()> {
955 todo!()
956 }
957
958 fn update_actor(&self) -> anyhow::Result<()> {
959 todo!()
960 }
961
962 fn update_strategy(&self) -> anyhow::Result<()> {
963 todo!()
964 }
965
966 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
967 todo!()
968 }
969
970 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
971 todo!()
972 }
973
974 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
975 todo!()
976 }
977
978 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
979 todo!()
980 }
981
982 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
983 todo!()
984 }
985
986 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
987 todo!()
988 }
989}
990
991#[cfg(test)]
995mod tests {
996 use rstest::rstest;
997
998 use super::*;
999
1000 #[rstest]
1001 fn test_get_trader_key_with_prefix_and_instance_id() {
1002 let trader_id = TraderId::from("tester-123");
1003 let instance_id = UUID4::new();
1004 let mut config = CacheConfig::default();
1005 config.use_instance_id = true;
1006
1007 let key = get_trader_key(trader_id, instance_id, &config);
1008 assert!(key.starts_with("trader-tester-123:"));
1009 assert!(key.ends_with(&instance_id.to_string()));
1010 }
1011
1012 #[rstest]
1013 fn test_get_collection_key_valid() {
1014 let key = "collection:123";
1015 assert_eq!(get_collection_key(key).unwrap(), "collection");
1016 }
1017
1018 #[rstest]
1019 fn test_get_collection_key_invalid() {
1020 let key = "no_delimiter";
1021 assert!(get_collection_key(key).is_err());
1022 }
1023
1024 #[rstest]
1025 fn test_get_index_key_valid() {
1026 let key = "index:123";
1027 assert_eq!(get_index_key(key).unwrap(), "123");
1028 }
1029
1030 #[rstest]
1031 fn test_get_index_key_invalid() {
1032 let key = "no_delimiter";
1033 assert!(get_index_key(key).is_err());
1034 }
1035}