nautilus_blockchain/cache/
mod.rs1use std::collections::HashMap;
17
18use nautilus_model::defi::{amm::Pool, chain::SharedChain, token::Token};
19use sqlx::postgres::PgConnectOptions;
20
21use crate::{cache::database::BlockchainCacheDatabase, exchanges::extended::DexExtended};
22
23pub mod database;
24pub mod rows;
25
26#[derive(Debug)]
28pub struct BlockchainCache {
29 chain: SharedChain,
31 dexes: HashMap<String, DexExtended>,
33 tokens: HashMap<String, Token>,
35 pools: HashMap<String, Pool>,
37 database: Option<BlockchainCacheDatabase>,
39}
40
41impl BlockchainCache {
42 #[must_use]
44 pub fn new(chain: SharedChain) -> Self {
45 Self {
46 chain,
47 dexes: HashMap::new(),
48 tokens: HashMap::new(),
49 pools: HashMap::new(),
50 database: None,
51 }
52 }
53
54 pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
56 let database = BlockchainCacheDatabase::init(pg_connect_options).await;
57 self.database = Some(database);
58 }
59
60 pub async fn connect(&mut self) -> anyhow::Result<()> {
62 if let Some(database) = &self.database {
64 database.seed_chain(&self.chain).await?;
65 }
66 self.load_tokens().await?;
67 Ok(())
68 }
69
70 pub async fn add_dex(&mut self, dex_id: String, dex: DexExtended) -> anyhow::Result<()> {
72 log::info!("Adding dex {dex_id} to the cache");
73 if let Some(database) = &self.database {
74 database.add_dex(&dex).await?;
75 }
76 self.dexes.insert(dex_id, dex);
77 Ok(())
78 }
79
80 pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
82 let pool_address = pool.address.clone();
83 log::info!("Adding dex pool {} to the cache", pool_address.as_str());
84 if let Some(database) = &self.database {
85 database.add_pool(&pool).await?;
86 }
87 self.pools.insert(pool_address, pool);
88 Ok(())
89 }
90
91 pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
93 let token_address = token.address.clone();
94 if let Some(database) = &self.database {
95 database.add_token(&token).await?;
96 }
97 self.tokens.insert(token_address, token);
98 Ok(())
99 }
100
101 async fn load_tokens(&mut self) -> anyhow::Result<()> {
103 if let Some(database) = &self.database {
104 let tokens = database.load_tokens(self.chain.clone()).await?;
105 log::info!("Loading {} tokens from cache database", tokens.len());
106 for token in tokens {
107 self.tokens.insert(token.address.clone(), token);
108 }
109 }
110 Ok(())
111 }
112
113 #[must_use]
115 pub fn get_dex(&self, name: &str) -> Option<&DexExtended> {
116 self.dexes.get(name)
117 }
118
119 #[must_use]
121 pub fn get_token(&self, address: &str) -> Option<&Token> {
122 self.tokens.get(address)
123 }
124}