nautilus_blockchain/cache/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use alloy::primitives::U256;
17use nautilus_model::defi::{
18    Block, Chain, Dex, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, Token,
19};
20use sqlx::{PgPool, postgres::PgConnectOptions};
21
22use crate::cache::rows::{BlockTimestampRow, TokenRow};
23
24/// Database interface for persisting and retrieving blockchain entities and domain objects.
25#[derive(Debug)]
26pub struct BlockchainCacheDatabase {
27    /// PostgreSQL connection pool used for database operations.
28    pool: PgPool,
29}
30
31impl BlockchainCacheDatabase {
32    /// Initializes a new database instance by establishing a connection to PostgreSQL.
33    pub async fn init(pg_options: PgConnectOptions) -> Self {
34        let pool = PgPool::connect_with(pg_options)
35            .await
36            .expect("Error connecting to Postgres");
37        Self { pool }
38    }
39
40    /// Seeds the database with a blockchain chain record.
41    pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
42        sqlx::query(
43            r"
44            INSERT INTO chain (
45                chain_id, name
46            ) VALUES ($1,$2)
47            ON CONFLICT (chain_id)
48            DO NOTHING
49        ",
50        )
51        .bind(chain.chain_id as i32)
52        .bind(chain.name.to_string())
53        .execute(&self.pool)
54        .await
55        .map(|_| ())
56        .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
57    }
58
59    /// Inserts or updates a block record in the database.
60    pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
61        sqlx::query(
62            r"
63            INSERT INTO block (
64                chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
65                base_fee_per_gas, blob_gas_used, excess_blob_gas,
66                l1_gas_price, l1_gas_used, l1_fee_scalar
67            ) VALUES (
68                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
69            )
70            ON CONFLICT (chain_id, number)
71            DO UPDATE
72            SET
73                hash = $3,
74                parent_hash = $4,
75                miner = $5,
76                gas_limit = $6,
77                gas_used = $7,
78                timestamp = $8,
79                base_fee_per_gas = $9,
80                blob_gas_used = $10,
81                excess_blob_gas = $11,
82                l1_gas_price = $12,
83                l1_gas_used = $13,
84                l1_fee_scalar = $14
85        ",
86        )
87        .bind(chain_id as i32)
88        .bind(block.number as i64)
89        .bind(block.hash.as_str())
90        .bind(block.parent_hash.as_str())
91        .bind(block.miner.as_str())
92        .bind(block.gas_limit as i64)
93        .bind(block.gas_used as i64)
94        .bind(block.timestamp.to_string())
95        .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
96        .bind(block.blob_gas_used.as_ref().map(U256::to_string))
97        .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
98        .bind(block.l1_gas_price.as_ref().map(U256::to_string))
99        .bind(block.l1_gas_used.map(|v| v as i64))
100        .bind(block.l1_fee_scalar.map(|v| v as i64))
101        .execute(&self.pool)
102        .await
103        .map(|_| ())
104        .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
105    }
106
107    /// Retrieves block timestamps for a given chain starting from a specific block number.
108    pub async fn load_block_timestamps(
109        &self,
110        chain: SharedChain,
111        from_block: u64,
112    ) -> anyhow::Result<Vec<BlockTimestampRow>> {
113        sqlx::query_as::<_, BlockTimestampRow>(
114            r"
115            SELECT DISTINCT ON (block.chain_id, number)
116                number,
117                timestamp
118            FROM block
119            WHERE chain_id = $1 AND number >= $2
120            ORDER BY number ASC
121            ",
122        )
123        .bind(chain.chain_id as i32)
124        .bind(from_block as i64)
125        .fetch_all(&self.pool)
126        .await
127        .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
128    }
129
130    /// Adds or updates a DEX (Decentralized Exchange) record in the database.
131    pub async fn add_dex(&self, dex: &Dex) -> anyhow::Result<()> {
132        sqlx::query(
133            r"
134            INSERT INTO dex (
135                chain_id, name, factory_address
136            ) VALUES ($1, $2, $3)
137            ON CONFLICT (chain_id, name)
138            DO UPDATE
139            SET
140                factory_address = $3
141        ",
142        )
143        .bind(dex.chain.chain_id as i32)
144        .bind(dex.name.as_ref())
145        .bind(dex.factory.as_ref())
146        .execute(&self.pool)
147        .await
148        .map(|_| ())
149        .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
150    }
151
152    /// Adds or updates a liquidity pool/pair record in the database.
153    pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
154        sqlx::query(
155            r"
156            INSERT INTO pool (
157                chain_id, address, dex_name, creation_block,
158                token0_chain, token0_address,
159                token1_chain, token1_address,
160                fee, tick_spacing
161            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
162            ON CONFLICT (chain_id, address)
163            DO UPDATE
164            SET
165                dex_name = $3,
166                creation_block = $4,
167                token0_chain = $5,
168                token0_address = $6,
169                token1_chain = $7,
170                token1_address = $8,
171                fee = $9,
172                tick_spacing = $10
173        ",
174        )
175        .bind(pool.chain.chain_id as i32)
176        .bind(pool.address.to_string())
177        .bind(pool.dex.name.as_ref())
178        .bind(pool.creation_block as i64)
179        .bind(pool.token0.chain.chain_id as i32)
180        .bind(pool.token0.address.to_string())
181        .bind(pool.token1.chain.chain_id as i32)
182        .bind(pool.token1.address.to_string())
183        .bind(pool.fee as i32)
184        .bind(pool.tick_spacing as i32)
185        .execute(&self.pool)
186        .await
187        .map(|_| ())
188        .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
189    }
190
191    /// Adds or updates a token record in the database.
192    pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
193        sqlx::query(
194            r"
195            INSERT INTO token (
196                chain_id, address, name, symbol, decimals
197            ) VALUES ($1, $2, $3, $4, $5)
198            ON CONFLICT (chain_id, address)
199            DO UPDATE
200            SET
201                name = $3,
202                symbol = $4,
203                decimals = $5
204        ",
205        )
206        .bind(token.chain.chain_id as i32)
207        .bind(token.address.to_string())
208        .bind(token.name.as_str())
209        .bind(token.symbol.as_str())
210        .bind(i32::from(token.decimals))
211        .execute(&self.pool)
212        .await
213        .map(|_| ())
214        .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
215    }
216
217    /// Persists a token swap transaction event to the `pool_swap` table.
218    pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
219        sqlx::query(
220            r"
221            INSERT INTO pool_swap (
222                chain_id, pool_address, block, transaction_hash, transaction_index,
223                log_index, sender, side, size, price
224            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
225            ON CONFLICT (chain_id, transaction_hash, log_index)
226            DO NOTHING
227        ",
228        )
229        .bind(chain_id as i32)
230        .bind(swap.pool.address.to_string())
231        .bind(swap.block as i64)
232        .bind(swap.transaction_hash.as_str())
233        .bind(swap.transaction_index as i32)
234        .bind(swap.log_index as i32)
235        .bind(swap.sender.to_string())
236        .bind(swap.side.to_string())
237        .bind(swap.size.to_string())
238        .bind(swap.price.to_string())
239        .execute(&self.pool)
240        .await
241        .map(|_| ())
242        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
243    }
244
245    /// Persists a liquidity position change (mint/burn) event to the `pool_liquidity` table.
246    pub async fn add_pool_liquidity_update(
247        &self,
248        chain_id: u32,
249        liquidity_update: &PoolLiquidityUpdate,
250    ) -> anyhow::Result<()> {
251        sqlx::query(
252            r"
253            INSERT INTO pool_liquidity (
254                chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
255                event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
256            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
257            ON CONFLICT (chain_id, transaction_hash, log_index)
258            DO NOTHING
259        ",
260        )
261        .bind(chain_id as i32)
262        .bind(liquidity_update.pool.address.to_string())
263        .bind(liquidity_update.block as i64)
264        .bind(liquidity_update.transaction_hash.as_str())
265        .bind(liquidity_update.transaction_index as i32)
266        .bind(liquidity_update.log_index as i32)
267        .bind(liquidity_update.kind.to_string())
268        .bind(liquidity_update.sender.map(|sender| sender.to_string()))
269        .bind(liquidity_update.owner.to_string())
270        .bind(liquidity_update.position_liquidity.to_string())
271        .bind(liquidity_update.amount0.to_string())
272        .bind(liquidity_update.amount1.to_string())
273        .bind(liquidity_update.tick_lower)
274        .bind(liquidity_update.tick_upper)
275        .execute(&self.pool)
276        .await
277        .map(|_| ())
278        .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
279    }
280
281    /// Retrieves all token records for the given chain and converts them into `Token` domain objects.
282    pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
283        sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1")
284            .bind(chain.chain_id as i32)
285            .fetch_all(&self.pool)
286            .await
287            .map(|rows| {
288                rows.into_iter()
289                    .map(|token_row| {
290                        Token::new(
291                            chain.clone(),
292                            token_row.address,
293                            token_row.name,
294                            token_row.symbol,
295                            token_row.decimals as u8,
296                        )
297                    })
298                    .collect::<Vec<_>>()
299            })
300            .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
301    }
302}