1use std::sync::Arc;
17
18use alloy::primitives::{Address, I256, U256};
19use hypersync_client::format::Hex;
20use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND};
21use nautilus_model::{
22 defi::{Block, Blockchain, Chain, Dex, Pool, PoolSwap, hex::from_str_hex_to_u64},
23 enums::OrderSide,
24 types::{Price, Quantity},
25};
26use ustr::Ustr;
27
28use crate::{
29 decode::{u256_to_price, u256_to_quantity},
30 hypersync::helpers::{
31 extract_block_number, extract_log_index, extract_transaction_hash,
32 extract_transaction_index,
33 },
34};
35
36pub fn transform_hypersync_block(
38 chain: Blockchain,
39 received_block: hypersync_client::simple_types::Block,
40) -> Result<Block, anyhow::Error> {
41 let number = received_block
42 .number
43 .ok_or_else(|| anyhow::anyhow!("Missing block number"))?;
44 let gas_limit = from_str_hex_to_u64(
45 received_block
46 .gas_limit
47 .ok_or_else(|| anyhow::anyhow!("Missing gas limit"))?
48 .encode_hex()
49 .as_str(),
50 )?;
51 let gas_used = from_str_hex_to_u64(
52 received_block
53 .gas_used
54 .ok_or_else(|| anyhow::anyhow!("Missing gas used"))?
55 .encode_hex()
56 .as_str(),
57 )?;
58 let timestamp = from_str_hex_to_u64(
59 received_block
60 .timestamp
61 .ok_or_else(|| anyhow::anyhow!("Missing timestamp"))?
62 .encode_hex()
63 .as_str(),
64 )?;
65
66 let mut block = Block::new(
67 received_block
68 .hash
69 .ok_or_else(|| anyhow::anyhow!("Missing hash"))?
70 .to_string(),
71 received_block
72 .parent_hash
73 .ok_or_else(|| anyhow::anyhow!("Missing parent hash"))?
74 .to_string(),
75 number,
76 Ustr::from(
77 received_block
78 .miner
79 .ok_or_else(|| anyhow::anyhow!("Missing miner"))?
80 .to_string()
81 .as_str(),
82 ),
83 gas_limit,
84 gas_used,
85 UnixNanos::new(timestamp * NANOSECONDS_IN_SECOND),
86 Some(chain),
87 );
88
89 if let Some(base_fee_hex) = received_block.base_fee_per_gas {
90 let s = base_fee_hex.encode_hex();
91 let val = U256::from_str_radix(s.trim_start_matches("0x"), 16)?;
92 block = block.with_base_fee(val);
93 }
94
95 if let (Some(used_hex), Some(excess_hex)) =
96 (received_block.blob_gas_used, received_block.excess_blob_gas)
97 {
98 let used = U256::from_str_radix(used_hex.encode_hex().trim_start_matches("0x"), 16)?;
99 let excess = U256::from_str_radix(excess_hex.encode_hex().trim_start_matches("0x"), 16)?;
100 block = block.with_blob_gas(used, excess);
101 }
102
103 Ok(block)
116}
117
118pub fn transform_hypersync_swap_log(
120 chain_ref: Arc<Chain>,
121 dex: Arc<Dex>,
122 pool: Arc<Pool>,
123 block_timestamp: UnixNanos,
124 log: &hypersync_client::simple_types::Log,
125) -> Result<PoolSwap, anyhow::Error> {
126 let block_number = extract_block_number(log)?;
127 let transaction_hash = extract_transaction_hash(log)?;
128 let transaction_index = extract_transaction_index(log)?;
129 let log_index = extract_log_index(log)?;
130
131 let sender = log
132 .topics
133 .get(1)
134 .and_then(|t| t.as_ref())
135 .map(|t| Address::from_slice(&t[12..32]))
136 .ok_or_else(|| anyhow::anyhow!("Missing sender address in swap log"))?;
137
138 let data = log
139 .data
140 .as_ref()
141 .ok_or_else(|| anyhow::anyhow!("Missing data field in swap log"))?;
142
143 if data.len() < 160 {
144 anyhow::bail!("Insufficient data length for Uniswap V3 swap event");
146 }
147
148 let amount0_bytes = &data[0..32];
149 let amount1_bytes = &data[32..64];
150
151 let amount0_signed = I256::from_be_bytes::<32>(amount0_bytes.try_into().unwrap());
153 let amount1_signed = I256::from_be_bytes::<32>(amount1_bytes.try_into().unwrap());
154
155 let amount0 = if amount0_signed.is_negative() {
157 U256::from(-amount0_signed)
158 } else {
159 U256::from(amount0_signed)
160 };
161 let amount1 = if amount1_signed.is_negative() {
162 U256::from(-amount1_signed)
163 } else {
164 U256::from(amount1_signed)
165 };
166
167 tracing::debug!(
168 "Raw amounts: amount0_signed={}, amount1_signed={}, amount0={}, amount1={}",
169 amount0_signed,
170 amount1_signed,
171 amount0,
172 amount1
173 );
174
175 let side = if amount0_signed.is_positive() {
176 OrderSide::Sell } else {
178 OrderSide::Buy };
180
181 let quantity = if pool.token0.decimals == 18 {
182 Quantity::from_wei(amount0)
183 } else {
184 u256_to_quantity(amount0, pool.token0.decimals)?
185 };
186
187 let amount1_quantity = if pool.token1.decimals == 18 {
188 Quantity::from_wei(amount1)
189 } else {
190 u256_to_quantity(amount1, pool.token1.decimals)?
191 };
192
193 tracing::debug!(
194 "Converted amounts: amount0={} -> {} {}, amount1={} -> {} {}",
195 amount0,
196 quantity,
197 pool.token0.symbol,
198 amount1,
199 amount1_quantity,
200 pool.token1.symbol
201 );
202
203 let price = if !amount0.is_zero() && !amount1.is_zero() {
204 let price_precision = pool.token0.decimals.max(pool.token1.decimals);
205 let scaled_amount1 = amount1 * U256::from(10_u128.pow(u32::from(price_precision)));
206 let price_raw = scaled_amount1 / amount0;
207
208 if price_precision == 18 {
209 Price::from_wei(price_raw)
210 } else {
211 u256_to_price(price_raw, price_precision)?
212 }
213 } else {
214 anyhow::bail!("Invalid swap: amount0 or amount1 is zero, cannot calculate price");
215 };
216
217 let swap = PoolSwap::new(
218 chain_ref,
219 dex,
220 pool,
221 block_number,
222 transaction_hash,
223 transaction_index,
224 log_index,
225 block_timestamp,
226 sender,
227 side,
228 quantity,
229 price,
230 );
231
232 Ok(swap)
233}
234
235#[cfg(test)]
240mod tests {
241 use std::sync::Arc;
242
243 use nautilus_model::defi::{AmmType, Chain, Dex, Token};
244 use rstest::rstest;
245 use serde_json::json;
246
247 use super::*;
248
249 #[rstest]
250 fn test_transform_hypersync_swap_log() {
251 let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
252
253 let dex = Arc::new(Dex::new(
254 (*chain).clone(),
255 "Uniswap V3",
256 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
257 AmmType::CLAMM,
258 "PoolCreated(address,address,uint24,int24,address)",
259 "Swap(address,address,int256,int256,uint160,uint128,int24)",
260 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
261 "Burn(address,int24,int24,uint128,uint256,uint256)",
262 ));
263
264 let token0 = Token::new(
265 chain.clone(),
266 "0xA0b86a33E6441b936662bb6B5d1F8Fb0E2b57A5D"
267 .parse()
268 .unwrap(),
269 "Wrapped Ether".to_string(),
270 "WETH".to_string(),
271 18,
272 );
273
274 let token1 = Token::new(
275 chain.clone(),
276 "0xdAC17F958D2ee523a2206206994597C13D831ec7"
277 .parse()
278 .unwrap(),
279 "Tether USD".to_string(),
280 "USDT".to_string(),
281 6,
282 );
283
284 let pool = Arc::new(Pool::new(
285 chain.clone(),
286 (*dex).clone(),
287 "0x11b815efB8f581194ae79006d24E0d814B7697F6"
288 .parse()
289 .unwrap(),
290 12345678,
291 token0,
292 token1,
293 3000,
294 60,
295 UnixNanos::default(),
296 ));
297
298 let log_json = json!({
299 "block_number": "0x1581b7e",
300 "transaction_hash": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
301 "transaction_index": "0x5",
302 "log_index": "0xa",
303 "data": "0x0000000000000000000000000000000000000000000000000de0b6b3a7640000000000000000000000000000000000000000000000000000000000001dcd6500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
304 "topics": [
305 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67",
306 "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad",
307 "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"
308 ]
309 });
310
311 let log: hypersync_client::simple_types::Log =
312 serde_json::from_value(log_json).expect("Failed to deserialize log");
313
314 let result = transform_hypersync_swap_log(
315 chain.clone(),
316 dex.clone(),
317 pool.clone(),
318 UnixNanos::default(),
319 &log,
320 );
321
322 assert!(
323 result.is_ok(),
324 "Transform should succeed with valid log data"
325 );
326 let swap = result.unwrap();
327
328 assert_eq!(swap.block, 0x1581b7e);
330 assert_eq!(
331 swap.transaction_hash,
332 "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
333 );
334 assert_eq!(swap.transaction_index, 5);
335 assert_eq!(swap.log_index, 10);
336 assert_eq!(swap.timestamp, UnixNanos::default());
337 assert_eq!(
338 swap.sender,
339 "0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"
340 .parse()
341 .unwrap()
342 );
343 assert_eq!(swap.side, OrderSide::Sell); assert_eq!(swap.quantity.as_f64(), 1.0);
348 assert_eq!(swap.quantity.precision, 18);
349
350 assert_eq!(swap.price.as_f64(), 500.0);
352 assert_eq!(swap.price.precision, 18);
353 }
354
355 #[rstest]
356 fn test_transform_hypersync_block() {
357 let block_json = json!({
358 "number": 0x1581b7e_u64,
359 "hash": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
360 "parent_hash": "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
361 "miner": "0x0000000000000000000000000000000000000000",
362 "gas_limit": "0x1c9c380",
363 "gas_used": "0x5208",
364 "timestamp": "0x61bc3f2d"
365 });
366
367 let block: hypersync_client::simple_types::Block =
368 serde_json::from_value(block_json).expect("Failed to deserialize block");
369
370 let result = transform_hypersync_block(Blockchain::Ethereum, block);
371
372 assert!(
373 result.is_ok(),
374 "Transform should succeed with valid block data"
375 );
376 let transformed_block = result.unwrap();
377
378 assert_eq!(transformed_block.number, 0x1581b7e);
379 assert_eq!(
380 transformed_block.hash,
381 "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
382 );
383 assert_eq!(
384 transformed_block.parent_hash,
385 "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
386 );
387 assert_eq!(
388 transformed_block.miner.as_str(),
389 "0x0000000000000000000000000000000000000000"
390 );
391 assert_eq!(transformed_block.gas_limit, 0x1c9c380);
392 assert_eq!(transformed_block.gas_used, 0x5208);
393
394 let expected_timestamp = UnixNanos::new(1639659309 * NANOSECONDS_IN_SECOND);
396 assert_eq!(transformed_block.timestamp, expected_timestamp);
397
398 assert_eq!(transformed_block.chain, Some(Blockchain::Ethereum));
399
400 assert!(transformed_block.base_fee.is_none());
402 assert!(transformed_block.blob_gas_used.is_none());
403 assert!(transformed_block.excess_blob_gas.is_none());
404 }
405}