nautilus_blockchain/cache/
database.rs1use nautilus_model::defi::{
17 amm::Pool,
18 chain::{Chain, SharedChain},
19 dex::Dex,
20 token::Token,
21};
22use sqlx::{PgPool, postgres::PgConnectOptions};
23
24use crate::cache::rows::TokenRow;
25
26#[derive(Debug)]
28pub struct BlockchainCacheDatabase {
29 pool: PgPool,
31}
32
33impl BlockchainCacheDatabase {
34 pub async fn init(pg_options: PgConnectOptions) -> Self {
36 let pool = PgPool::connect_with(pg_options)
37 .await
38 .expect("Error connecting to Postgres");
39 Self { pool }
40 }
41
42 pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
44 sqlx::query(
45 r"
46 INSERT INTO chain (
47 chain_id, name
48 ) VALUES ($1,$2)
49 ON CONFLICT (chain_id)
50 DO NOTHING
51 ",
52 )
53 .bind(chain.chain_id as i32)
54 .bind(chain.name.to_string())
55 .execute(&self.pool)
56 .await
57 .map(|_| ())
58 .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
59 }
60
61 pub async fn add_dex(&self, dex: &Dex) -> anyhow::Result<()> {
63 sqlx::query(
64 r"
65 INSERT INTO dex (
66 chain_id, name, factory_address
67 ) VALUES ($1, $2, $3)
68 ON CONFLICT (chain_id, name)
69 DO UPDATE
70 SET
71 factory_address = $3
72 ",
73 )
74 .bind(dex.chain.chain_id as i32)
75 .bind(dex.name.as_ref())
76 .bind(dex.factory.as_ref())
77 .execute(&self.pool)
78 .await
79 .map(|_| ())
80 .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
81 }
82
83 pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
85 sqlx::query(
86 r"
87 INSERT INTO pool (
88 chain_id, address, dex_name, creation_block,
89 token0_chain, token0_address,
90 token1_chain, token1_address,
91 fee, tick_spacing
92 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
93 ON CONFLICT (chain_id, address)
94 DO UPDATE
95 SET
96 dex_name = $3,
97 creation_block = $4,
98 token0_chain = $5,
99 token0_address = $6,
100 token1_chain = $7,
101 token1_address = $8,
102 fee = $9,
103 tick_spacing = $10
104 ",
105 )
106 .bind(pool.chain.chain_id as i32)
107 .bind(pool.address.as_str())
108 .bind(pool.dex.name.as_ref())
109 .bind(pool.creation_block as i64)
110 .bind(pool.token0.chain.chain_id as i32)
111 .bind(pool.token0.address.as_str())
112 .bind(pool.token1.chain.chain_id as i32)
113 .bind(pool.token1.address.as_str())
114 .bind(pool.fee as i32)
115 .bind(pool.tick_spacing as i32)
116 .execute(&self.pool)
117 .await
118 .map(|_| ())
119 .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
120 }
121
122 pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
124 sqlx::query(
125 r"
126 INSERT INTO token (
127 chain_id, address, name, symbol, decimals
128 ) VALUES ($1, $2, $3, $4, $5)
129 ON CONFLICT (chain_id, address)
130 DO UPDATE
131 SET
132 name = $3,
133 symbol = $4,
134 decimals = $5
135 ",
136 )
137 .bind(token.chain.chain_id as i32)
138 .bind(token.address.as_str())
139 .bind(token.name.as_str())
140 .bind(token.symbol.as_str())
141 .bind(i32::from(token.decimals))
142 .execute(&self.pool)
143 .await
144 .map(|_| ())
145 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
146 }
147
148 pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
150 sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1")
151 .bind(chain.chain_id as i32)
152 .fetch_all(&self.pool)
153 .await
154 .map(|rows| {
155 rows.into_iter()
156 .map(|token_row| {
157 Token::new(
158 chain.clone(),
159 token_row.address,
160 token_row.name,
161 token_row.symbol,
162 token_row.decimals as u8,
163 )
164 })
165 .collect::<Vec<_>>()
166 })
167 .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
168 }
169}