1use alloy::primitives::U256;
17use nautilus_model::defi::{
18 Block, Chain, Dex, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain, Token,
19};
20use sqlx::{PgPool, postgres::PgConnectOptions};
21
22use crate::cache::rows::{BlockTimestampRow, TokenRow};
23
24#[derive(Debug)]
26pub struct BlockchainCacheDatabase {
27 pool: PgPool,
29}
30
31impl BlockchainCacheDatabase {
32 pub async fn init(pg_options: PgConnectOptions) -> Self {
34 let pool = PgPool::connect_with(pg_options)
35 .await
36 .expect("Error connecting to Postgres");
37 Self { pool }
38 }
39
40 pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
42 sqlx::query(
43 r"
44 INSERT INTO chain (
45 chain_id, name
46 ) VALUES ($1,$2)
47 ON CONFLICT (chain_id)
48 DO NOTHING
49 ",
50 )
51 .bind(chain.chain_id as i32)
52 .bind(chain.name.to_string())
53 .execute(&self.pool)
54 .await
55 .map(|_| ())
56 .map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
57 }
58
59 pub async fn add_block(&self, chain_id: u32, block: &Block) -> anyhow::Result<()> {
61 sqlx::query(
62 r"
63 INSERT INTO block (
64 chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
65 base_fee_per_gas, blob_gas_used, excess_blob_gas,
66 l1_gas_price, l1_gas_used, l1_fee_scalar
67 ) VALUES (
68 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
69 )
70 ON CONFLICT (chain_id, number)
71 DO UPDATE
72 SET
73 hash = $3,
74 parent_hash = $4,
75 miner = $5,
76 gas_limit = $6,
77 gas_used = $7,
78 timestamp = $8,
79 base_fee_per_gas = $9,
80 blob_gas_used = $10,
81 excess_blob_gas = $11,
82 l1_gas_price = $12,
83 l1_gas_used = $13,
84 l1_fee_scalar = $14
85 ",
86 )
87 .bind(chain_id as i32)
88 .bind(block.number as i64)
89 .bind(block.hash.as_str())
90 .bind(block.parent_hash.as_str())
91 .bind(block.miner.as_str())
92 .bind(block.gas_limit as i64)
93 .bind(block.gas_used as i64)
94 .bind(block.timestamp.to_string())
95 .bind(block.base_fee_per_gas.as_ref().map(U256::to_string))
96 .bind(block.blob_gas_used.as_ref().map(U256::to_string))
97 .bind(block.excess_blob_gas.as_ref().map(U256::to_string))
98 .bind(block.l1_gas_price.as_ref().map(U256::to_string))
99 .bind(block.l1_gas_used.map(|v| v as i64))
100 .bind(block.l1_fee_scalar.map(|v| v as i64))
101 .execute(&self.pool)
102 .await
103 .map(|_| ())
104 .map_err(|e| anyhow::anyhow!("Failed to insert into block table: {e}"))
105 }
106
107 pub async fn load_block_timestamps(
109 &self,
110 chain: SharedChain,
111 from_block: u64,
112 ) -> anyhow::Result<Vec<BlockTimestampRow>> {
113 sqlx::query_as::<_, BlockTimestampRow>(
114 r"
115 SELECT DISTINCT ON (block.chain_id, number)
116 number,
117 timestamp
118 FROM block
119 WHERE chain_id = $1 AND number >= $2
120 ORDER BY number ASC
121 ",
122 )
123 .bind(chain.chain_id as i32)
124 .bind(from_block as i64)
125 .fetch_all(&self.pool)
126 .await
127 .map_err(|e| anyhow::anyhow!("Failed to load block timestamps: {e}"))
128 }
129
130 pub async fn add_dex(&self, dex: &Dex) -> anyhow::Result<()> {
132 sqlx::query(
133 r"
134 INSERT INTO dex (
135 chain_id, name, factory_address
136 ) VALUES ($1, $2, $3)
137 ON CONFLICT (chain_id, name)
138 DO UPDATE
139 SET
140 factory_address = $3
141 ",
142 )
143 .bind(dex.chain.chain_id as i32)
144 .bind(dex.name.as_ref())
145 .bind(dex.factory.as_ref())
146 .execute(&self.pool)
147 .await
148 .map(|_| ())
149 .map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
150 }
151
152 pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
154 sqlx::query(
155 r"
156 INSERT INTO pool (
157 chain_id, address, dex_name, creation_block,
158 token0_chain, token0_address,
159 token1_chain, token1_address,
160 fee, tick_spacing
161 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
162 ON CONFLICT (chain_id, address)
163 DO UPDATE
164 SET
165 dex_name = $3,
166 creation_block = $4,
167 token0_chain = $5,
168 token0_address = $6,
169 token1_chain = $7,
170 token1_address = $8,
171 fee = $9,
172 tick_spacing = $10
173 ",
174 )
175 .bind(pool.chain.chain_id as i32)
176 .bind(pool.address.to_string())
177 .bind(pool.dex.name.as_ref())
178 .bind(pool.creation_block as i64)
179 .bind(pool.token0.chain.chain_id as i32)
180 .bind(pool.token0.address.to_string())
181 .bind(pool.token1.chain.chain_id as i32)
182 .bind(pool.token1.address.to_string())
183 .bind(pool.fee as i32)
184 .bind(pool.tick_spacing as i32)
185 .execute(&self.pool)
186 .await
187 .map(|_| ())
188 .map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
189 }
190
191 pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
193 sqlx::query(
194 r"
195 INSERT INTO token (
196 chain_id, address, name, symbol, decimals
197 ) VALUES ($1, $2, $3, $4, $5)
198 ON CONFLICT (chain_id, address)
199 DO UPDATE
200 SET
201 name = $3,
202 symbol = $4,
203 decimals = $5
204 ",
205 )
206 .bind(token.chain.chain_id as i32)
207 .bind(token.address.to_string())
208 .bind(token.name.as_str())
209 .bind(token.symbol.as_str())
210 .bind(i32::from(token.decimals))
211 .execute(&self.pool)
212 .await
213 .map(|_| ())
214 .map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
215 }
216
217 pub async fn add_swap(&self, chain_id: u32, swap: &PoolSwap) -> anyhow::Result<()> {
219 sqlx::query(
220 r"
221 INSERT INTO pool_swap (
222 chain_id, pool_address, block, transaction_hash, transaction_index,
223 log_index, sender, side, size, price
224 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
225 ON CONFLICT (chain_id, transaction_hash, log_index)
226 DO NOTHING
227 ",
228 )
229 .bind(chain_id as i32)
230 .bind(swap.pool.address.to_string())
231 .bind(swap.block as i64)
232 .bind(swap.transaction_hash.as_str())
233 .bind(swap.transaction_index as i32)
234 .bind(swap.log_index as i32)
235 .bind(swap.sender.to_string())
236 .bind(swap.side.to_string())
237 .bind(swap.size.to_string())
238 .bind(swap.price.to_string())
239 .execute(&self.pool)
240 .await
241 .map(|_| ())
242 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_swap table: {e}"))
243 }
244
245 pub async fn add_pool_liquidity_update(
247 &self,
248 chain_id: u32,
249 liquidity_update: &PoolLiquidityUpdate,
250 ) -> anyhow::Result<()> {
251 sqlx::query(
252 r"
253 INSERT INTO pool_liquidity (
254 chain_id, pool_address, block, transaction_hash, transaction_index, log_index,
255 event_type, sender, owner, position_liquidity, amount0, amount1, tick_lower, tick_upper
256 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
257 ON CONFLICT (chain_id, transaction_hash, log_index)
258 DO NOTHING
259 ",
260 )
261 .bind(chain_id as i32)
262 .bind(liquidity_update.pool.address.to_string())
263 .bind(liquidity_update.block as i64)
264 .bind(liquidity_update.transaction_hash.as_str())
265 .bind(liquidity_update.transaction_index as i32)
266 .bind(liquidity_update.log_index as i32)
267 .bind(liquidity_update.kind.to_string())
268 .bind(liquidity_update.sender.map(|sender| sender.to_string()))
269 .bind(liquidity_update.owner.to_string())
270 .bind(liquidity_update.position_liquidity.to_string())
271 .bind(liquidity_update.amount0.to_string())
272 .bind(liquidity_update.amount1.to_string())
273 .bind(liquidity_update.tick_lower)
274 .bind(liquidity_update.tick_upper)
275 .execute(&self.pool)
276 .await
277 .map(|_| ())
278 .map_err(|e| anyhow::anyhow!("Failed to insert into pool_liquidity table: {e}"))
279 }
280
281 pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
283 sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1")
284 .bind(chain.chain_id as i32)
285 .fetch_all(&self.pool)
286 .await
287 .map(|rows| {
288 rows.into_iter()
289 .map(|token_row| {
290 Token::new(
291 chain.clone(),
292 token_row.address,
293 token_row.name,
294 token_row.symbol,
295 token_row.decimals as u8,
296 )
297 })
298 .collect::<Vec<_>>()
299 })
300 .map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
301 }
302}