nautilus_blockchain/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_model::defi::{amm::Pool, chain::SharedChain, token::Token};
19use sqlx::postgres::PgConnectOptions;
20
21use crate::{cache::database::BlockchainCacheDatabase, exchanges::extended::DexExtended};
22
23pub mod database;
24pub mod rows;
25
26/// Provides caching functionality for various blockchain domain objects.
27#[derive(Debug)]
28pub struct BlockchainCache {
29    /// The blockchain chain this cache is associated with.
30    chain: SharedChain,
31    /// Map of DEX identifiers to their corresponding extended DEX objects.
32    dexes: HashMap<String, DexExtended>,
33    /// Map of token addresses to their corresponding `Token` objects.
34    tokens: HashMap<String, Token>,
35    /// Map of pool addresses to their corresponding `Pool` objects.
36    pools: HashMap<String, Pool>,
37    /// Optional database connection for persistent storage.
38    database: Option<BlockchainCacheDatabase>,
39}
40
41impl BlockchainCache {
42    /// Creates a new in-memory blockchain cache for the specified chain.
43    #[must_use]
44    pub fn new(chain: SharedChain) -> Self {
45        Self {
46            chain,
47            dexes: HashMap::new(),
48            tokens: HashMap::new(),
49            pools: HashMap::new(),
50            database: None,
51        }
52    }
53
54    /// Initializes the database connection for persistent storage.
55    pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
56        let database = BlockchainCacheDatabase::init(pg_connect_options).await;
57        self.database = Some(database);
58    }
59
60    /// Connects to the database and loads initial data.
61    pub async fn connect(&mut self) -> anyhow::Result<()> {
62        // Seed target adapter chain in database
63        if let Some(database) = &self.database {
64            database.seed_chain(&self.chain).await?;
65        }
66        self.load_tokens().await?;
67        Ok(())
68    }
69
70    /// Adds a DEX to the cache with the specified identifier.
71    pub async fn add_dex(&mut self, dex_id: String, dex: DexExtended) -> anyhow::Result<()> {
72        log::info!("Adding dex {dex_id} to the cache");
73        if let Some(database) = &self.database {
74            database.add_dex(&dex).await?;
75        }
76        self.dexes.insert(dex_id, dex);
77        Ok(())
78    }
79
80    /// Adds a liquidity pool/pair to the cache.
81    pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
82        let pool_address = pool.address.clone();
83        log::info!("Adding dex pool {} to the cache", pool_address.as_str());
84        if let Some(database) = &self.database {
85            database.add_pool(&pool).await?;
86        }
87        self.pools.insert(pool_address, pool);
88        Ok(())
89    }
90
91    /// Adds a token to the cache.
92    pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
93        let token_address = token.address.clone();
94        if let Some(database) = &self.database {
95            database.add_token(&token).await?;
96        }
97        self.tokens.insert(token_address, token);
98        Ok(())
99    }
100
101    /// Loads tokens from the database into the in-memory cache.
102    async fn load_tokens(&mut self) -> anyhow::Result<()> {
103        if let Some(database) = &self.database {
104            let tokens = database.load_tokens(self.chain.clone()).await?;
105            log::info!("Loading {} tokens from cache database", tokens.len());
106            for token in tokens {
107                self.tokens.insert(token.address.clone(), token);
108            }
109        }
110        Ok(())
111    }
112
113    /// Returns a reference to the `DexExtended` associated with the given name.
114    #[must_use]
115    pub fn get_dex(&self, name: &str) -> Option<&DexExtended> {
116        self.dexes.get(name)
117    }
118
119    /// Returns a reference to the `Token` associated with the given address.
120    #[must_use]
121    pub fn get_token(&self, address: &str) -> Option<&Token> {
122        self.tokens.get(address)
123    }
124}