nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    fmt::Debug,
19    time::{Duration, Instant},
20};
21
22use bytes::Bytes;
23use nautilus_common::{
24    cache::{
25        CacheConfig,
26        database::{CacheDatabaseAdapter, CacheMap},
27    },
28    custom::CustomData,
29    enums::SerializationEncoding,
30    logging::{log_task_awaiting, log_task_started, log_task_stopped},
31    runtime::get_runtime,
32    signal::Signal,
33};
34use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
35use nautilus_cryptography::providers::install_cryptographic_provider;
36use nautilus_model::{
37    accounts::AccountAny,
38    data::{Bar, DataType, QuoteTick, TradeTick},
39    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
40    identifiers::{
41        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
42        TraderId, VenueOrderId,
43    },
44    instruments::{InstrumentAny, SyntheticInstrument},
45    orderbook::OrderBook,
46    orders::OrderAny,
47    position::Position,
48    types::Currency,
49};
50use redis::{Pipeline, aio::ConnectionManager};
51use tokio::try_join;
52use ustr::Ustr;
53
54use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
55use crate::redis::{create_redis_connection, queries::DatabaseQueries};
56
57// Task and connection names
58const CACHE_READ: &str = "cache-read";
59const CACHE_WRITE: &str = "cache-write";
60const CACHE_PROCESS: &str = "cache-process";
61
62// Error constants
63const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
64
65// Collection keys
66const INDEX: &str = "index";
67const GENERAL: &str = "general";
68const CURRENCIES: &str = "currencies";
69const INSTRUMENTS: &str = "instruments";
70const SYNTHETICS: &str = "synthetics";
71const ACCOUNTS: &str = "accounts";
72const ORDERS: &str = "orders";
73const POSITIONS: &str = "positions";
74const ACTORS: &str = "actors";
75const STRATEGIES: &str = "strategies";
76const SNAPSHOTS: &str = "snapshots";
77const HEALTH: &str = "health";
78
79// Index keys
80const INDEX_ORDER_IDS: &str = "index:order_ids";
81const INDEX_ORDER_POSITION: &str = "index:order_position";
82const INDEX_ORDER_CLIENT: &str = "index:order_client";
83const INDEX_ORDERS: &str = "index:orders";
84const INDEX_ORDERS_OPEN: &str = "index:orders_open";
85const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
86const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
87const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
88const INDEX_POSITIONS: &str = "index:positions";
89const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
90const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
91
92/// A type of database operation.
93#[derive(Clone, Debug)]
94pub enum DatabaseOperation {
95    Insert,
96    Update,
97    Delete,
98    Close,
99}
100
101/// Represents a database command to be performed which may be executed in a task.
102#[derive(Clone, Debug)]
103pub struct DatabaseCommand {
104    /// The database operation type.
105    pub op_type: DatabaseOperation,
106    /// The primary key for the operation.
107    pub key: Option<String>,
108    /// The data payload for the operation.
109    pub payload: Option<Vec<Bytes>>,
110}
111
112impl DatabaseCommand {
113    /// Creates a new [`DatabaseCommand`] instance.
114    #[must_use]
115    pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
116        Self {
117            op_type,
118            key: Some(key),
119            payload,
120        }
121    }
122
123    /// Initialize a `Close` database command, this is meant to close the database cache channel.
124    #[must_use]
125    pub const fn close() -> Self {
126        Self {
127            op_type: DatabaseOperation::Close,
128            key: None,
129            payload: None,
130        }
131    }
132}
133
134#[cfg_attr(
135    feature = "python",
136    pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.infrastructure")
137)]
138pub struct RedisCacheDatabase {
139    pub con: ConnectionManager,
140    pub trader_id: TraderId,
141    encoding: SerializationEncoding,
142    handle: tokio::task::JoinHandle<()>,
143    trader_key: String,
144    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
145}
146
147impl Debug for RedisCacheDatabase {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct(stringify!(RedisCacheDatabase))
150            .field("trader_id", &self.trader_id)
151            .field("encoding", &self.encoding)
152            .finish()
153    }
154}
155
156impl RedisCacheDatabase {
157    /// Creates a new [`RedisCacheDatabase`] instance for the given `trader_id`, `instance_id`, and `config`.
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if:
162    /// - The database configuration is missing in `config`.
163    /// - Establishing the Redis connection fails.
164    /// - The command processing task cannot be spawned.
165    pub async fn new(
166        trader_id: TraderId,
167        instance_id: UUID4,
168        config: CacheConfig,
169    ) -> anyhow::Result<Self> {
170        install_cryptographic_provider();
171
172        let db_config = config
173            .database
174            .as_ref()
175            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
176        let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
177
178        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
179        let trader_key = get_trader_key(trader_id, instance_id, &config);
180        let trader_key_clone = trader_key.clone();
181        let encoding = config.encoding;
182        let handle = get_runtime().spawn(async move {
183            if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
184                log::error!("Error in task '{CACHE_PROCESS}': {e}");
185            }
186        });
187
188        Ok(Self {
189            con,
190            trader_id,
191            encoding,
192            handle,
193            trader_key,
194            tx,
195        })
196    }
197
198    #[must_use]
199    pub const fn get_encoding(&self) -> SerializationEncoding {
200        self.encoding
201    }
202
203    #[must_use]
204    pub fn get_trader_key(&self) -> &str {
205        &self.trader_key
206    }
207
208    pub fn close(&mut self) {
209        log::debug!("Closing");
210
211        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
212            log::debug!("Error sending close command: {e:?}");
213        }
214
215        log_task_awaiting(CACHE_PROCESS);
216
217        tokio::task::block_in_place(|| {
218            if let Err(e) = get_runtime().block_on(&mut self.handle) {
219                log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
220            }
221        });
222
223        log::debug!("Closed");
224    }
225
226    pub async fn flushdb(&mut self) {
227        if let Err(e) = redis::cmd(REDIS_FLUSHDB)
228            .query_async::<()>(&mut self.con)
229            .await
230        {
231            log::error!("Failed to flush database: {e:?}");
232        }
233    }
234
235    /// Retrieves all keys matching the given `pattern` from Redis for this trader.
236    ///
237    /// # Errors
238    ///
239    /// Returns an error if the underlying Redis scan operation fails.
240    pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
241        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
242        log::debug!("Querying keys: {pattern}");
243        DatabaseQueries::scan_keys(&mut self.con, pattern).await
244    }
245
246    /// Reads the value(s) associated with `key` for this trader from Redis.
247    ///
248    /// # Errors
249    ///
250    /// Returns an error if the underlying Redis read operation fails.
251    pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
252        DatabaseQueries::read(&self.con, &self.trader_key, key).await
253    }
254
255    /// Sends an insert command for `key` with optional `payload` to Redis via the background task.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if the command cannot be sent to the background task channel.
260    pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
261        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
262        match self.tx.send(op) {
263            Ok(()) => Ok(()),
264            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
265        }
266    }
267
268    /// Sends an update command for `key` with optional `payload` to Redis via the background task.
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if the command cannot be sent to the background task channel.
273    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
274        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
275        match self.tx.send(op) {
276            Ok(()) => Ok(()),
277            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
278        }
279    }
280
281    /// Sends a delete command for `key` with optional `payload` to Redis via the background task.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the command cannot be sent to the background task channel.
286    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
287        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
288        match self.tx.send(op) {
289            Ok(()) => Ok(()),
290            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
291        }
292    }
293}
294
295async fn process_commands(
296    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
297    trader_key: String,
298    config: CacheConfig,
299) -> anyhow::Result<()> {
300    log_task_started(CACHE_PROCESS);
301
302    let db_config = config
303        .database
304        .as_ref()
305        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
306    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
307
308    // Buffering
309    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
310    let mut last_drain = Instant::now();
311    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
312
313    // Continue to receive and handle messages until channel is hung up
314    loop {
315        if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
316            drain_buffer(&mut con, &trader_key, &mut buffer).await;
317            last_drain = Instant::now();
318        } else if let Some(cmd) = rx.recv().await {
319            tracing::trace!("Received {cmd:?}");
320
321            if matches!(cmd.op_type, DatabaseOperation::Close) {
322                break;
323            }
324            buffer.push_back(cmd);
325        } else {
326            tracing::debug!("Command channel closed");
327            break;
328        }
329    }
330
331    // Drain any remaining messages
332    if !buffer.is_empty() {
333        drain_buffer(&mut con, &trader_key, &mut buffer).await;
334    }
335
336    log_task_stopped(CACHE_PROCESS);
337    Ok(())
338}
339
340async fn drain_buffer(
341    conn: &mut ConnectionManager,
342    trader_key: &str,
343    buffer: &mut VecDeque<DatabaseCommand>,
344) {
345    let mut pipe = redis::pipe();
346    pipe.atomic();
347
348    for msg in buffer.drain(..) {
349        let key = if let Some(key) = msg.key {
350            key
351        } else {
352            log::error!("Null key found for message: {msg:?}");
353            continue;
354        };
355        let collection = match get_collection_key(&key) {
356            Ok(collection) => collection,
357            Err(e) => {
358                tracing::error!("{e}");
359                continue; // Continue to next message
360            }
361        };
362
363        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
364
365        match msg.op_type {
366            DatabaseOperation::Insert => {
367                if let Some(payload) = msg.payload {
368                    if let Err(e) = insert(&mut pipe, collection, &key, payload) {
369                        tracing::error!("{e}");
370                    }
371                } else {
372                    tracing::error!("Null `payload` for `insert`");
373                }
374            }
375            DatabaseOperation::Update => {
376                if let Some(payload) = msg.payload {
377                    if let Err(e) = update(&mut pipe, collection, &key, payload) {
378                        tracing::error!("{e}");
379                    }
380                } else {
381                    tracing::error!("Null `payload` for `update`");
382                }
383            }
384            DatabaseOperation::Delete => {
385                // `payload` can be `None` for a delete operation
386                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
387                    tracing::error!("{e}");
388                }
389            }
390            DatabaseOperation::Close => panic!("Close command should not be drained"),
391        }
392    }
393
394    if let Err(e) = pipe.query_async::<()>(conn).await {
395        tracing::error!("{e}");
396    }
397}
398
399fn insert(
400    pipe: &mut Pipeline,
401    collection: &str,
402    key: &str,
403    value: Vec<Bytes>,
404) -> anyhow::Result<()> {
405    check_slice_not_empty(value.as_slice(), stringify!(value))?;
406
407    match collection {
408        INDEX => insert_index(pipe, key, &value),
409        GENERAL => {
410            insert_string(pipe, key, value[0].as_ref());
411            Ok(())
412        }
413        CURRENCIES => {
414            insert_string(pipe, key, value[0].as_ref());
415            Ok(())
416        }
417        INSTRUMENTS => {
418            insert_string(pipe, key, value[0].as_ref());
419            Ok(())
420        }
421        SYNTHETICS => {
422            insert_string(pipe, key, value[0].as_ref());
423            Ok(())
424        }
425        ACCOUNTS => {
426            insert_list(pipe, key, value[0].as_ref());
427            Ok(())
428        }
429        ORDERS => {
430            insert_list(pipe, key, value[0].as_ref());
431            Ok(())
432        }
433        POSITIONS => {
434            insert_list(pipe, key, value[0].as_ref());
435            Ok(())
436        }
437        ACTORS => {
438            insert_string(pipe, key, value[0].as_ref());
439            Ok(())
440        }
441        STRATEGIES => {
442            insert_string(pipe, key, value[0].as_ref());
443            Ok(())
444        }
445        SNAPSHOTS => {
446            insert_list(pipe, key, value[0].as_ref());
447            Ok(())
448        }
449        HEALTH => {
450            insert_string(pipe, key, value[0].as_ref());
451            Ok(())
452        }
453        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
454    }
455}
456
457fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
458    let index_key = get_index_key(key)?;
459    match index_key {
460        INDEX_ORDER_IDS => {
461            insert_set(pipe, key, value[0].as_ref());
462            Ok(())
463        }
464        INDEX_ORDER_POSITION => {
465            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
466            Ok(())
467        }
468        INDEX_ORDER_CLIENT => {
469            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
470            Ok(())
471        }
472        INDEX_ORDERS => {
473            insert_set(pipe, key, value[0].as_ref());
474            Ok(())
475        }
476        INDEX_ORDERS_OPEN => {
477            insert_set(pipe, key, value[0].as_ref());
478            Ok(())
479        }
480        INDEX_ORDERS_CLOSED => {
481            insert_set(pipe, key, value[0].as_ref());
482            Ok(())
483        }
484        INDEX_ORDERS_EMULATED => {
485            insert_set(pipe, key, value[0].as_ref());
486            Ok(())
487        }
488        INDEX_ORDERS_INFLIGHT => {
489            insert_set(pipe, key, value[0].as_ref());
490            Ok(())
491        }
492        INDEX_POSITIONS => {
493            insert_set(pipe, key, value[0].as_ref());
494            Ok(())
495        }
496        INDEX_POSITIONS_OPEN => {
497            insert_set(pipe, key, value[0].as_ref());
498            Ok(())
499        }
500        INDEX_POSITIONS_CLOSED => {
501            insert_set(pipe, key, value[0].as_ref());
502            Ok(())
503        }
504        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
505    }
506}
507
508fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
509    pipe.set(key, value);
510}
511
512fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
513    pipe.sadd(key, value);
514}
515
516fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
517    pipe.hset(key, name, value);
518}
519
520fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
521    pipe.rpush(key, value);
522}
523
524fn update(
525    pipe: &mut Pipeline,
526    collection: &str,
527    key: &str,
528    value: Vec<Bytes>,
529) -> anyhow::Result<()> {
530    check_slice_not_empty(value.as_slice(), stringify!(value))?;
531
532    match collection {
533        ACCOUNTS => {
534            update_list(pipe, key, value[0].as_ref());
535            Ok(())
536        }
537        ORDERS => {
538            update_list(pipe, key, value[0].as_ref());
539            Ok(())
540        }
541        POSITIONS => {
542            update_list(pipe, key, value[0].as_ref());
543            Ok(())
544        }
545        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
546    }
547}
548
549fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
550    pipe.rpush_exists(key, value);
551}
552
553fn delete(
554    pipe: &mut Pipeline,
555    collection: &str,
556    key: &str,
557    value: Option<Vec<Bytes>>,
558) -> anyhow::Result<()> {
559    match collection {
560        INDEX => remove_index(pipe, key, value),
561        ACTORS => {
562            delete_string(pipe, key);
563            Ok(())
564        }
565        STRATEGIES => {
566            delete_string(pipe, key);
567            Ok(())
568        }
569        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
570    }
571}
572
573fn remove_index(pipe: &mut Pipeline, key: &str, value: Option<Vec<Bytes>>) -> anyhow::Result<()> {
574    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
575    let index_key = get_index_key(key)?;
576
577    match index_key {
578        INDEX_ORDERS_OPEN => {
579            remove_from_set(pipe, key, value[0].as_ref());
580            Ok(())
581        }
582        INDEX_ORDERS_CLOSED => {
583            remove_from_set(pipe, key, value[0].as_ref());
584            Ok(())
585        }
586        INDEX_ORDERS_EMULATED => {
587            remove_from_set(pipe, key, value[0].as_ref());
588            Ok(())
589        }
590        INDEX_ORDERS_INFLIGHT => {
591            remove_from_set(pipe, key, value[0].as_ref());
592            Ok(())
593        }
594        INDEX_POSITIONS_OPEN => {
595            remove_from_set(pipe, key, value[0].as_ref());
596            Ok(())
597        }
598        INDEX_POSITIONS_CLOSED => {
599            remove_from_set(pipe, key, value[0].as_ref());
600            Ok(())
601        }
602        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
603    }
604}
605
606fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
607    pipe.srem(key, member);
608}
609
610fn delete_string(pipe: &mut Pipeline, key: &str) {
611    pipe.del(key);
612}
613
614fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
615    let mut key = String::new();
616
617    if config.use_trader_prefix {
618        key.push_str("trader-");
619    }
620
621    key.push_str(trader_id.as_str());
622
623    if config.use_instance_id {
624        key.push(REDIS_DELIMITER);
625        key.push_str(&format!("{instance_id}"));
626    }
627
628    key
629}
630
631fn get_collection_key(key: &str) -> anyhow::Result<&str> {
632    key.split_once(REDIS_DELIMITER)
633        .map(|(collection, _)| collection)
634        .ok_or_else(|| {
635            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
636        })
637}
638
639fn get_index_key(key: &str) -> anyhow::Result<&str> {
640    key.split_once(REDIS_DELIMITER)
641        .map(|(_, index_key)| index_key)
642        .ok_or_else(|| {
643            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
644        })
645}
646
647#[allow(dead_code)] // Under development
648#[derive(Debug)]
649pub struct RedisCacheDatabaseAdapter {
650    pub encoding: SerializationEncoding,
651    database: RedisCacheDatabase,
652}
653
654#[allow(dead_code)] // Under development
655#[allow(unused)] // Under development
656#[async_trait::async_trait]
657impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
658    fn close(&mut self) -> anyhow::Result<()> {
659        self.database.close();
660        Ok(())
661    }
662
663    fn flush(&mut self) -> anyhow::Result<()> {
664        self.database.flushdb();
665        Ok(())
666    }
667
668    async fn load_all(&self) -> anyhow::Result<CacheMap> {
669        tracing::debug!("Loading all data");
670
671        let (
672            currencies,
673            instruments,
674            synthetics,
675            accounts,
676            orders,
677            positions,
678            greeks,
679            yield_curves,
680        ) = try_join!(
681            self.load_currencies(),
682            self.load_instruments(),
683            self.load_synthetics(),
684            self.load_accounts(),
685            self.load_orders(),
686            self.load_positions(),
687            self.load_greeks(),
688            self.load_yield_curves()
689        )
690        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
691
692        Ok(CacheMap {
693            currencies,
694            instruments,
695            synthetics,
696            accounts,
697            orders,
698            positions,
699            greeks,
700            yield_curves,
701        })
702    }
703
704    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
705        // self.database.load()
706        Ok(HashMap::new()) // TODO
707    }
708
709    async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
710        DatabaseQueries::load_currencies(
711            &self.database.con,
712            &self.database.trader_key,
713            self.encoding,
714        )
715        .await
716    }
717
718    async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
719        DatabaseQueries::load_instruments(
720            &self.database.con,
721            &self.database.trader_key,
722            self.encoding,
723        )
724        .await
725    }
726
727    async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
728        DatabaseQueries::load_synthetics(
729            &self.database.con,
730            &self.database.trader_key,
731            self.encoding,
732        )
733        .await
734    }
735
736    async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
737        DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
738            .await
739    }
740
741    async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
742        DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
743            .await
744    }
745
746    async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
747        DatabaseQueries::load_positions(
748            &self.database.con,
749            &self.database.trader_key,
750            self.encoding,
751        )
752        .await
753    }
754
755    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
756        todo!()
757    }
758
759    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
760        todo!()
761    }
762
763    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
764        DatabaseQueries::load_currency(
765            &self.database.con,
766            &self.database.trader_key,
767            code,
768            self.encoding,
769        )
770        .await
771    }
772
773    async fn load_instrument(
774        &self,
775        instrument_id: &InstrumentId,
776    ) -> anyhow::Result<Option<InstrumentAny>> {
777        DatabaseQueries::load_instrument(
778            &self.database.con,
779            &self.database.trader_key,
780            instrument_id,
781            self.encoding,
782        )
783        .await
784    }
785
786    async fn load_synthetic(
787        &self,
788        instrument_id: &InstrumentId,
789    ) -> anyhow::Result<Option<SyntheticInstrument>> {
790        DatabaseQueries::load_synthetic(
791            &self.database.con,
792            &self.database.trader_key,
793            instrument_id,
794            self.encoding,
795        )
796        .await
797    }
798
799    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
800        DatabaseQueries::load_account(
801            &self.database.con,
802            &self.database.trader_key,
803            account_id,
804            self.encoding,
805        )
806        .await
807    }
808
809    async fn load_order(
810        &self,
811        client_order_id: &ClientOrderId,
812    ) -> anyhow::Result<Option<OrderAny>> {
813        DatabaseQueries::load_order(
814            &self.database.con,
815            &self.database.trader_key,
816            client_order_id,
817            self.encoding,
818        )
819        .await
820    }
821
822    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
823        DatabaseQueries::load_position(
824            &self.database.con,
825            &self.database.trader_key,
826            position_id,
827            self.encoding,
828        )
829        .await
830    }
831
832    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
833        todo!()
834    }
835
836    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
837        todo!()
838    }
839
840    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
841        todo!()
842    }
843
844    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
845        todo!()
846    }
847
848    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
849        todo!()
850    }
851
852    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
853        todo!()
854    }
855
856    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
857        todo!()
858    }
859
860    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
861        todo!()
862    }
863
864    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
865        todo!()
866    }
867
868    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
869        todo!()
870    }
871
872    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
873        todo!()
874    }
875
876    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
877        todo!()
878    }
879
880    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
881        todo!()
882    }
883
884    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
885        anyhow::bail!("Saving market data for Redis cache adapter not supported")
886    }
887
888    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
889        anyhow::bail!("Saving market data for Redis cache adapter not supported")
890    }
891
892    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
893        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
894    }
895
896    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
897        anyhow::bail!("Saving market data for Redis cache adapter not supported")
898    }
899
900    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
901        anyhow::bail!("Loading market data for Redis cache adapter not supported")
902    }
903
904    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
905        anyhow::bail!("Saving market data for Redis cache adapter not supported")
906    }
907
908    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
909        anyhow::bail!("Loading market data for Redis cache adapter not supported")
910    }
911
912    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
913        anyhow::bail!("Saving signals for Redis cache adapter not supported")
914    }
915
916    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
917        anyhow::bail!("Loading signals from Redis cache adapter not supported")
918    }
919
920    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
921        anyhow::bail!("Saving custom data for Redis cache adapter not supported")
922    }
923
924    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
925        anyhow::bail!("Loading custom data from Redis cache adapter not supported")
926    }
927
928    fn load_order_snapshot(
929        &self,
930        client_order_id: &ClientOrderId,
931    ) -> anyhow::Result<Option<OrderSnapshot>> {
932        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
933    }
934
935    fn load_position_snapshot(
936        &self,
937        position_id: &PositionId,
938    ) -> anyhow::Result<Option<PositionSnapshot>> {
939        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
940    }
941
942    fn index_venue_order_id(
943        &self,
944        client_order_id: ClientOrderId,
945        venue_order_id: VenueOrderId,
946    ) -> anyhow::Result<()> {
947        todo!()
948    }
949
950    fn index_order_position(
951        &self,
952        client_order_id: ClientOrderId,
953        position_id: PositionId,
954    ) -> anyhow::Result<()> {
955        todo!()
956    }
957
958    fn update_actor(&self) -> anyhow::Result<()> {
959        todo!()
960    }
961
962    fn update_strategy(&self) -> anyhow::Result<()> {
963        todo!()
964    }
965
966    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
967        todo!()
968    }
969
970    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
971        todo!()
972    }
973
974    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
975        todo!()
976    }
977
978    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
979        todo!()
980    }
981
982    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
983        todo!()
984    }
985
986    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
987        todo!()
988    }
989}
990
991////////////////////////////////////////////////////////////////////////////////
992// Tests
993////////////////////////////////////////////////////////////////////////////////
994#[cfg(test)]
995mod tests {
996    use rstest::rstest;
997
998    use super::*;
999
1000    #[rstest]
1001    fn test_get_trader_key_with_prefix_and_instance_id() {
1002        let trader_id = TraderId::from("tester-123");
1003        let instance_id = UUID4::new();
1004        let mut config = CacheConfig::default();
1005        config.use_instance_id = true;
1006
1007        let key = get_trader_key(trader_id, instance_id, &config);
1008        assert!(key.starts_with("trader-tester-123:"));
1009        assert!(key.ends_with(&instance_id.to_string()));
1010    }
1011
1012    #[rstest]
1013    fn test_get_collection_key_valid() {
1014        let key = "collection:123";
1015        assert_eq!(get_collection_key(key).unwrap(), "collection");
1016    }
1017
1018    #[rstest]
1019    fn test_get_collection_key_invalid() {
1020        let key = "no_delimiter";
1021        assert!(get_collection_key(key).is_err());
1022    }
1023
1024    #[rstest]
1025    fn test_get_index_key_valid() {
1026        let key = "index:123";
1027        assert_eq!(get_index_key(key).unwrap(), "123");
1028    }
1029
1030    #[rstest]
1031    fn test_get_index_key_invalid() {
1032        let key = "no_delimiter";
1033        assert!(get_index_key(key).is_err());
1034    }
1035}