nautilus_blockchain/cache/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_model::defi::{
17    amm::Pool,
18    chain::{Chain, SharedChain},
19    dex::Dex,
20    token::Token,
21};
22use sqlx::{PgPool, postgres::PgConnectOptions};
23
24use crate::cache::rows::TokenRow;
25
26/// Database interface for persisting and retrieving blockchain entities and domain objects.
27#[derive(Debug)]
28pub struct BlockchainCacheDatabase {
29    /// PostgreSQL connection pool used for database operations.
30    pool: PgPool,
31}
32
33impl BlockchainCacheDatabase {
34    /// Initializes a new database instance by establishing a connection to PostgreSQL.
35    pub async fn init(pg_options: PgConnectOptions) -> Self {
36        let pool = PgPool::connect_with(pg_options)
37            .await
38            .expect("Error connecting to Postgres");
39        Self { pool }
40    }
41
42    /// Seeds the database with a blockchain chain record.
43    pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
44        sqlx::query(
45            r"
46            INSERT INTO chain (
47                chain_id, name
48            ) VALUES ($1,$2)
49            ON CONFLICT (chain_id)
50            DO NOTHING
51        ",
52        )
53        .bind(chain.chain_id as i32)
54        .bind(chain.name.to_string())
55        .execute(&self.pool)
56        .await
57        .map(|_| ())
58        .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
59    }
60
61    /// Adds or updates a DEX (Decentralized Exchange) record in the database.
62    pub async fn add_dex(&self, dex: &Dex) -> anyhow::Result<()> {
63        sqlx::query(
64            r"
65            INSERT INTO dex (
66                chain_id, name, factory_address
67            ) VALUES ($1, $2, $3)
68            ON CONFLICT (chain_id, name)
69            DO UPDATE
70            SET
71                factory_address = $3
72        ",
73        )
74        .bind(dex.chain.chain_id as i32)
75        .bind(dex.name.as_ref())
76        .bind(dex.factory.as_ref())
77        .execute(&self.pool)
78        .await
79        .map(|_| ())
80        .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
81    }
82
83    /// Adds or updates a liquidity pool/pair record in the database.
84    pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
85        sqlx::query(
86            r"
87            INSERT INTO pool (
88                chain_id, address, dex_name, creation_block,
89                token0_chain, token0_address,
90                token1_chain, token1_address,
91                fee, tick_spacing
92            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
93            ON CONFLICT (chain_id, address)
94            DO UPDATE
95            SET
96                dex_name = $3,
97                creation_block = $4,
98                token0_chain = $5,
99                token0_address = $6,
100                token1_chain = $7,
101                token1_address = $8,
102                fee = $9,
103                tick_spacing = $10
104        ",
105        )
106        .bind(pool.chain.chain_id as i32)
107        .bind(pool.address.as_str())
108        .bind(pool.dex.name.as_ref())
109        .bind(pool.creation_block as i64)
110        .bind(pool.token0.chain.chain_id as i32)
111        .bind(pool.token0.address.as_str())
112        .bind(pool.token1.chain.chain_id as i32)
113        .bind(pool.token1.address.as_str())
114        .bind(pool.fee as i32)
115        .bind(pool.tick_spacing as i32)
116        .execute(&self.pool)
117        .await
118        .map(|_| ())
119        .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
120    }
121
122    /// Adds or updates a token record in the database.
123    pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
124        sqlx::query(
125            r"
126            INSERT INTO token (
127                chain_id, address, name, symbol, decimals
128            ) VALUES ($1, $2, $3, $4, $5)
129            ON CONFLICT (chain_id, address)
130            DO UPDATE
131            SET
132                name = $3,
133                symbol = $4,
134                decimals = $5
135        ",
136        )
137        .bind(token.chain.chain_id as i32)
138        .bind(token.address.as_str())
139        .bind(token.name.as_str())
140        .bind(token.symbol.as_str())
141        .bind(i32::from(token.decimals))
142        .execute(&self.pool)
143        .await
144        .map(|_| ())
145        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
146    }
147
148    /// Retrieves all token records for the given chain and converts them into `Token` domain objects.
149    pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
150        sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1")
151            .bind(chain.chain_id as i32)
152            .fetch_all(&self.pool)
153            .await
154            .map(|rows| {
155                rows.into_iter()
156                    .map(|token_row| {
157                        Token::new(
158                            chain.clone(),
159                            token_row.address,
160                            token_row.name,
161                            token_row.symbol,
162                            token_row.decimals as u8,
163                        )
164                    })
165                    .collect::<Vec<_>>()
166            })
167            .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
168    }
169}