nautilus_blockchain/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Caching layer for blockchain entities and domain objects.
17//!
18//! This module provides an in-memory cache with optional PostgreSQL persistence for storing
19//! and retrieving blockchain-related data such as blocks, tokens, pools, swaps, and other
20//! DeFi protocol events.
21
22use std::{
23    collections::{BTreeMap, HashMap},
24    sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30    Block, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedPool, Token,
31};
32use sqlx::postgres::PgConnectOptions;
33
34use crate::{cache::database::BlockchainCacheDatabase, exchanges::extended::DexExtended};
35
36pub mod database;
37pub mod rows;
38
39/// Provides caching functionality for various blockchain domain objects.
40#[derive(Debug)]
41pub struct BlockchainCache {
42    /// The blockchain chain this cache is associated with.
43    chain: SharedChain,
44    /// Map of block numbers to their corresponding timestamp
45    block_timestamps: BTreeMap<u64, UnixNanos>,
46    /// Map of DEX identifiers to their corresponding extended DEX objects.
47    dexes: HashMap<String, DexExtended>,
48    /// Map of token addresses to their corresponding `Token` objects.
49    tokens: HashMap<Address, Token>,
50    /// Map of pool addresses to their corresponding `Pool` objects.
51    pools: HashMap<Address, SharedPool>,
52    /// Optional database connection for persistent storage.
53    database: Option<BlockchainCacheDatabase>,
54}
55
56impl BlockchainCache {
57    /// Creates a new in-memory blockchain cache for the specified chain.
58    #[must_use]
59    pub fn new(chain: SharedChain) -> Self {
60        Self {
61            chain,
62            dexes: HashMap::new(),
63            tokens: HashMap::new(),
64            pools: HashMap::new(),
65            block_timestamps: BTreeMap::new(),
66            database: None,
67        }
68    }
69
70    /// Returns the highest block number currently cached, if any.
71    #[must_use]
72    pub fn last_cached_block_number(&self) -> Option<u64> {
73        self.block_timestamps.last_key_value().map(|(k, _)| *k)
74    }
75
76    /// Returns the timestamp for the specified block number if it exists in the cache.
77    #[must_use]
78    pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
79        self.block_timestamps.get(&block_number)
80    }
81
82    /// Initializes the database connection for persistent storage.
83    pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
84        let database = BlockchainCacheDatabase::init(pg_connect_options).await;
85        self.database = Some(database);
86    }
87
88    /// Connects to the database and loads initial data.
89    pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
90        // Seed target adapter chain in database
91        if let Some(database) = &self.database {
92            if let Err(e) = database.seed_chain(&self.chain).await {
93                log::error!("Error seeding chain in database: {e}");
94                log::warn!("Continuing without database cache functionality");
95            }
96        }
97
98        if let Err(e) = self.load_tokens().await {
99            log::error!("Error loading tokens from database: {e}");
100        }
101
102        if let Err(e) = self.load_blocks(from_block).await {
103            log::error!("Error loading blocks from database: {e}");
104        }
105
106        Ok(())
107    }
108
109    /// Loads tokens from the database into the in-memory cache.
110    async fn load_tokens(&mut self) -> anyhow::Result<()> {
111        if let Some(database) = &self.database {
112            let tokens = database.load_tokens(self.chain.clone()).await?;
113            log::info!("Loading {} tokens from cache database", tokens.len());
114
115            for token in tokens {
116                self.tokens.insert(token.address, token);
117            }
118        }
119        Ok(())
120    }
121
122    /// Loads block timestamps from the database starting `from_block` number
123    /// into the in-memory cache.
124    async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
125        if let Some(database) = &self.database {
126            let block_timestamps = database
127                .load_block_timestamps(self.chain.clone(), from_block)
128                .await?;
129
130            // Verify block number sequence consistency
131            if !block_timestamps.is_empty() {
132                let first = block_timestamps.first().unwrap().number;
133                let last = block_timestamps.last().unwrap().number;
134                let expected_len = (last - first + 1) as usize;
135                if block_timestamps.len() != expected_len {
136                    anyhow::bail!(
137                        "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
138                        block_timestamps.len()
139                    );
140                }
141            }
142
143            log::info!(
144                "Loading {} blocks timestamps from the cache database",
145                block_timestamps.len()
146            );
147            for block in block_timestamps {
148                self.block_timestamps.insert(block.number, block.timestamp);
149            }
150        }
151        Ok(())
152    }
153
154    /// Adds a block to the cache and persists it to the database if available.
155    pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
156        if let Some(database) = &self.database {
157            database.add_block(self.chain.chain_id, &block).await?;
158        }
159        self.block_timestamps.insert(block.number, block.timestamp);
160        Ok(())
161    }
162
163    /// Adds a DEX to the cache with the specified identifier.
164    pub async fn add_dex(&mut self, dex_id: String, dex: DexExtended) -> anyhow::Result<()> {
165        log::info!("Adding dex {dex_id} to the cache");
166
167        if let Some(database) = &self.database {
168            database.add_dex(&dex).await?;
169        }
170
171        self.dexes.insert(dex_id, dex);
172        Ok(())
173    }
174
175    /// Adds a liquidity pool/pair to the cache.
176    pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
177        let pool_address = pool.address;
178        log::info!("Adding dex pool {pool_address} to the cache");
179
180        if let Some(database) = &self.database {
181            database.add_pool(&pool).await?;
182        }
183
184        self.pools.insert(pool_address, Arc::new(pool));
185        Ok(())
186    }
187
188    /// Adds a [`Token`] to the cache.
189    pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
190        if let Some(database) = &self.database {
191            database.add_token(&token).await?;
192        }
193        self.tokens.insert(token.address, token);
194        Ok(())
195    }
196
197    /// Adds a [`PoolSwap`] to the cache database if available.
198    pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
199        if let Some(database) = &self.database {
200            database.add_swap(self.chain.chain_id, swap).await?;
201        }
202
203        Ok(())
204    }
205
206    /// Adds a [`PoolLiquidityUpdate`] to the cache database if available.
207    pub async fn add_liquidity_update(
208        &self,
209        liquidity_update: &PoolLiquidityUpdate,
210    ) -> anyhow::Result<()> {
211        if let Some(database) = &self.database {
212            database
213                .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
214                .await?;
215        }
216
217        Ok(())
218    }
219
220    /// Returns a reference to the `DexExtended` associated with the given name.
221    #[must_use]
222    pub fn get_dex(&self, name: &str) -> Option<&DexExtended> {
223        self.dexes.get(name)
224    }
225
226    /// Returns a reference to the pool associated with the given address.
227    #[must_use]
228    pub fn get_pool(&self, address: &Address) -> Option<&SharedPool> {
229        self.pools.get(address)
230    }
231
232    /// Returns a reference to the `Token` associated with the given address.
233    #[must_use]
234    pub fn get_token(&self, address: &Address) -> Option<&Token> {
235        self.tokens.get(address)
236    }
237}