nautilus_blockchain/cache/
mod.rs1use std::{
23 collections::{BTreeMap, HashMap},
24 sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30 Block, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedPool, Token,
31};
32use sqlx::postgres::PgConnectOptions;
33
34use crate::{cache::database::BlockchainCacheDatabase, exchanges::extended::DexExtended};
35
36pub mod database;
37pub mod rows;
38
39#[derive(Debug)]
41pub struct BlockchainCache {
42 chain: SharedChain,
44 block_timestamps: BTreeMap<u64, UnixNanos>,
46 dexes: HashMap<String, DexExtended>,
48 tokens: HashMap<Address, Token>,
50 pools: HashMap<Address, SharedPool>,
52 database: Option<BlockchainCacheDatabase>,
54}
55
56impl BlockchainCache {
57 #[must_use]
59 pub fn new(chain: SharedChain) -> Self {
60 Self {
61 chain,
62 dexes: HashMap::new(),
63 tokens: HashMap::new(),
64 pools: HashMap::new(),
65 block_timestamps: BTreeMap::new(),
66 database: None,
67 }
68 }
69
70 #[must_use]
72 pub fn last_cached_block_number(&self) -> Option<u64> {
73 self.block_timestamps.last_key_value().map(|(k, _)| *k)
74 }
75
76 #[must_use]
78 pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
79 self.block_timestamps.get(&block_number)
80 }
81
82 pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
84 let database = BlockchainCacheDatabase::init(pg_connect_options).await;
85 self.database = Some(database);
86 }
87
88 pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
90 if let Some(database) = &self.database {
92 if let Err(e) = database.seed_chain(&self.chain).await {
93 log::error!("Error seeding chain in database: {e}");
94 log::warn!("Continuing without database cache functionality");
95 }
96 }
97
98 if let Err(e) = self.load_tokens().await {
99 log::error!("Error loading tokens from database: {e}");
100 }
101
102 if let Err(e) = self.load_blocks(from_block).await {
103 log::error!("Error loading blocks from database: {e}");
104 }
105
106 Ok(())
107 }
108
109 async fn load_tokens(&mut self) -> anyhow::Result<()> {
111 if let Some(database) = &self.database {
112 let tokens = database.load_tokens(self.chain.clone()).await?;
113 log::info!("Loading {} tokens from cache database", tokens.len());
114
115 for token in tokens {
116 self.tokens.insert(token.address, token);
117 }
118 }
119 Ok(())
120 }
121
122 async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
125 if let Some(database) = &self.database {
126 let block_timestamps = database
127 .load_block_timestamps(self.chain.clone(), from_block)
128 .await?;
129
130 if !block_timestamps.is_empty() {
132 let first = block_timestamps.first().unwrap().number;
133 let last = block_timestamps.last().unwrap().number;
134 let expected_len = (last - first + 1) as usize;
135 if block_timestamps.len() != expected_len {
136 anyhow::bail!(
137 "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
138 block_timestamps.len()
139 );
140 }
141 }
142
143 log::info!(
144 "Loading {} blocks timestamps from the cache database",
145 block_timestamps.len()
146 );
147 for block in block_timestamps {
148 self.block_timestamps.insert(block.number, block.timestamp);
149 }
150 }
151 Ok(())
152 }
153
154 pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
156 if let Some(database) = &self.database {
157 database.add_block(self.chain.chain_id, &block).await?;
158 }
159 self.block_timestamps.insert(block.number, block.timestamp);
160 Ok(())
161 }
162
163 pub async fn add_dex(&mut self, dex_id: String, dex: DexExtended) -> anyhow::Result<()> {
165 log::info!("Adding dex {dex_id} to the cache");
166
167 if let Some(database) = &self.database {
168 database.add_dex(&dex).await?;
169 }
170
171 self.dexes.insert(dex_id, dex);
172 Ok(())
173 }
174
175 pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
177 let pool_address = pool.address;
178 log::info!("Adding dex pool {pool_address} to the cache");
179
180 if let Some(database) = &self.database {
181 database.add_pool(&pool).await?;
182 }
183
184 self.pools.insert(pool_address, Arc::new(pool));
185 Ok(())
186 }
187
188 pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
190 if let Some(database) = &self.database {
191 database.add_token(&token).await?;
192 }
193 self.tokens.insert(token.address, token);
194 Ok(())
195 }
196
197 pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
199 if let Some(database) = &self.database {
200 database.add_swap(self.chain.chain_id, swap).await?;
201 }
202
203 Ok(())
204 }
205
206 pub async fn add_liquidity_update(
208 &self,
209 liquidity_update: &PoolLiquidityUpdate,
210 ) -> anyhow::Result<()> {
211 if let Some(database) = &self.database {
212 database
213 .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
214 .await?;
215 }
216
217 Ok(())
218 }
219
220 #[must_use]
222 pub fn get_dex(&self, name: &str) -> Option<&DexExtended> {
223 self.dexes.get(name)
224 }
225
226 #[must_use]
228 pub fn get_pool(&self, address: &Address) -> Option<&SharedPool> {
229 self.pools.get(address)
230 }
231
232 #[must_use]
234 pub fn get_token(&self, address: &Address) -> Option<&Token> {
235 self.tokens.get(address)
236 }
237}