nautilus_blockchain/hypersync/
transform.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use alloy::primitives::{Address, I256, U256};
19use hypersync_client::format::Hex;
20use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND};
21use nautilus_model::{
22    defi::{Block, Blockchain, Chain, Dex, Pool, PoolSwap, hex::from_str_hex_to_u64},
23    enums::OrderSide,
24    types::{Price, Quantity},
25};
26use ustr::Ustr;
27
28use crate::{
29    decode::{u256_to_price, u256_to_quantity},
30    hypersync::helpers::{
31        extract_block_number, extract_log_index, extract_transaction_hash,
32        extract_transaction_index,
33    },
34};
35
36/// Converts a HyperSync block format to our internal [`Block`] type.
37pub fn transform_hypersync_block(
38    chain: Blockchain,
39    received_block: hypersync_client::simple_types::Block,
40) -> Result<Block, anyhow::Error> {
41    let number = received_block
42        .number
43        .ok_or_else(|| anyhow::anyhow!("Missing block number"))?;
44    let gas_limit = from_str_hex_to_u64(
45        received_block
46            .gas_limit
47            .ok_or_else(|| anyhow::anyhow!("Missing gas limit"))?
48            .encode_hex()
49            .as_str(),
50    )?;
51    let gas_used = from_str_hex_to_u64(
52        received_block
53            .gas_used
54            .ok_or_else(|| anyhow::anyhow!("Missing gas used"))?
55            .encode_hex()
56            .as_str(),
57    )?;
58    let timestamp = from_str_hex_to_u64(
59        received_block
60            .timestamp
61            .ok_or_else(|| anyhow::anyhow!("Missing timestamp"))?
62            .encode_hex()
63            .as_str(),
64    )?;
65
66    let mut block = Block::new(
67        received_block
68            .hash
69            .ok_or_else(|| anyhow::anyhow!("Missing hash"))?
70            .to_string(),
71        received_block
72            .parent_hash
73            .ok_or_else(|| anyhow::anyhow!("Missing parent hash"))?
74            .to_string(),
75        number,
76        Ustr::from(
77            received_block
78                .miner
79                .ok_or_else(|| anyhow::anyhow!("Missing miner"))?
80                .to_string()
81                .as_str(),
82        ),
83        gas_limit,
84        gas_used,
85        UnixNanos::new(timestamp * NANOSECONDS_IN_SECOND),
86        Some(chain),
87    );
88
89    if let Some(base_fee_hex) = received_block.base_fee_per_gas {
90        let s = base_fee_hex.encode_hex();
91        let val = U256::from_str_radix(s.trim_start_matches("0x"), 16)?;
92        block = block.with_base_fee(val);
93    }
94
95    if let (Some(used_hex), Some(excess_hex)) =
96        (received_block.blob_gas_used, received_block.excess_blob_gas)
97    {
98        let used = U256::from_str_radix(used_hex.encode_hex().trim_start_matches("0x"), 16)?;
99        let excess = U256::from_str_radix(excess_hex.encode_hex().trim_start_matches("0x"), 16)?;
100        block = block.with_blob_gas(used, excess);
101    }
102
103    // TODO: HyperSync does not yet publush L1 gas metadata fields
104    // if let (Some(price_hex), Some(l1_used_hex), Some(scalar_hex)) = (
105    //     received_block.l1_gas_price,
106    //     received_block.l1_gas_used,
107    //     received_block.l1_fee_scalar,
108    // ) {
109    //     let price = U256::from_str_radix(price_hex.encode_hex().trim_start_matches("0x"), 16)?;
110    //     let used = from_str_hex_to_u64(l1_used_hex.encode_hex().as_str())?;
111    //     let scalar = from_str_hex_to_u64(scalar_hex.encode_hex().as_str())?;
112    //     block = block.with_l1_fee_components(price, used, scalar);
113    // }
114
115    Ok(block)
116}
117
118/// Converts a HyperSync log entry to a [`PoolSwap`] using provided context.
119pub fn transform_hypersync_swap_log(
120    chain_ref: Arc<Chain>,
121    dex: Arc<Dex>,
122    pool: Arc<Pool>,
123    block_timestamp: UnixNanos,
124    log: &hypersync_client::simple_types::Log,
125) -> Result<PoolSwap, anyhow::Error> {
126    let block_number = extract_block_number(log)?;
127    let transaction_hash = extract_transaction_hash(log)?;
128    let transaction_index = extract_transaction_index(log)?;
129    let log_index = extract_log_index(log)?;
130
131    let sender = log
132        .topics
133        .get(1)
134        .and_then(|t| t.as_ref())
135        .map(|t| Address::from_slice(&t[12..32]))
136        .ok_or_else(|| anyhow::anyhow!("Missing sender address in swap log"))?;
137
138    let data = log
139        .data
140        .as_ref()
141        .ok_or_else(|| anyhow::anyhow!("Missing data field in swap log"))?;
142
143    if data.len() < 160 {
144        // 5 * 32 bytes = 160 bytes minimum
145        anyhow::bail!("Insufficient data length for Uniswap V3 swap event");
146    }
147
148    let amount0_bytes = &data[0..32];
149    let amount1_bytes = &data[32..64];
150
151    // Convert signed integers (int256) - handle negative amounts
152    let amount0_signed = I256::from_be_bytes::<32>(amount0_bytes.try_into().unwrap());
153    let amount1_signed = I256::from_be_bytes::<32>(amount1_bytes.try_into().unwrap());
154
155    // Get absolute values for quantity calculations
156    let amount0 = if amount0_signed.is_negative() {
157        U256::from(-amount0_signed)
158    } else {
159        U256::from(amount0_signed)
160    };
161    let amount1 = if amount1_signed.is_negative() {
162        U256::from(-amount1_signed)
163    } else {
164        U256::from(amount1_signed)
165    };
166
167    tracing::debug!(
168        "Raw amounts: amount0_signed={}, amount1_signed={}, amount0={}, amount1={}",
169        amount0_signed,
170        amount1_signed,
171        amount0,
172        amount1
173    );
174
175    let side = if amount0_signed.is_positive() {
176        OrderSide::Sell // Selling token0 (pool received token0)
177    } else {
178        OrderSide::Buy // Buying token0 (pool gave token0)
179    };
180
181    let quantity = if pool.token0.decimals == 18 {
182        Quantity::from_wei(amount0)
183    } else {
184        u256_to_quantity(amount0, pool.token0.decimals)?
185    };
186
187    let amount1_quantity = if pool.token1.decimals == 18 {
188        Quantity::from_wei(amount1)
189    } else {
190        u256_to_quantity(amount1, pool.token1.decimals)?
191    };
192
193    tracing::debug!(
194        "Converted amounts: amount0={} -> {} {}, amount1={} -> {} {}",
195        amount0,
196        quantity,
197        pool.token0.symbol,
198        amount1,
199        amount1_quantity,
200        pool.token1.symbol
201    );
202
203    let price = if !amount0.is_zero() && !amount1.is_zero() {
204        let price_precision = pool.token0.decimals.max(pool.token1.decimals);
205        let scaled_amount1 = amount1 * U256::from(10_u128.pow(u32::from(price_precision)));
206        let price_raw = scaled_amount1 / amount0;
207
208        if price_precision == 18 {
209            Price::from_wei(price_raw)
210        } else {
211            u256_to_price(price_raw, price_precision)?
212        }
213    } else {
214        anyhow::bail!("Invalid swap: amount0 or amount1 is zero, cannot calculate price");
215    };
216
217    let swap = PoolSwap::new(
218        chain_ref,
219        dex,
220        pool,
221        block_number,
222        transaction_hash,
223        transaction_index,
224        log_index,
225        block_timestamp,
226        sender,
227        side,
228        quantity,
229        price,
230    );
231
232    Ok(swap)
233}
234
235////////////////////////////////////////////////////////////////////////////////
236// Tests
237////////////////////////////////////////////////////////////////////////////////
238
239#[cfg(test)]
240mod tests {
241    use std::sync::Arc;
242
243    use nautilus_model::defi::{AmmType, Chain, Dex, Token};
244    use rstest::rstest;
245    use serde_json::json;
246
247    use super::*;
248
249    #[rstest]
250    fn test_transform_hypersync_swap_log() {
251        let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
252
253        let dex = Arc::new(Dex::new(
254            (*chain).clone(),
255            "Uniswap V3",
256            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
257            AmmType::CLAMM,
258            "PoolCreated(address,address,uint24,int24,address)",
259            "Swap(address,address,int256,int256,uint160,uint128,int24)",
260            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
261            "Burn(address,int24,int24,uint128,uint256,uint256)",
262        ));
263
264        let token0 = Token::new(
265            chain.clone(),
266            "0xA0b86a33E6441b936662bb6B5d1F8Fb0E2b57A5D"
267                .parse()
268                .unwrap(),
269            "Wrapped Ether".to_string(),
270            "WETH".to_string(),
271            18,
272        );
273
274        let token1 = Token::new(
275            chain.clone(),
276            "0xdAC17F958D2ee523a2206206994597C13D831ec7"
277                .parse()
278                .unwrap(),
279            "Tether USD".to_string(),
280            "USDT".to_string(),
281            6,
282        );
283
284        let pool = Arc::new(Pool::new(
285            chain.clone(),
286            (*dex).clone(),
287            "0x11b815efB8f581194ae79006d24E0d814B7697F6"
288                .parse()
289                .unwrap(),
290            12345678,
291            token0,
292            token1,
293            3000,
294            60,
295            UnixNanos::default(),
296        ));
297
298        let log_json = json!({
299            "block_number": "0x1581b7e",
300            "transaction_hash": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
301            "transaction_index": "0x5",
302            "log_index": "0xa",
303            "data": "0x0000000000000000000000000000000000000000000000000de0b6b3a7640000000000000000000000000000000000000000000000000000000000001dcd6500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
304            "topics": [
305                "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67",
306                "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad",
307                "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"
308            ]
309        });
310
311        let log: hypersync_client::simple_types::Log =
312            serde_json::from_value(log_json).expect("Failed to deserialize log");
313
314        let result = transform_hypersync_swap_log(
315            chain.clone(),
316            dex.clone(),
317            pool.clone(),
318            UnixNanos::default(),
319            &log,
320        );
321
322        assert!(
323            result.is_ok(),
324            "Transform should succeed with valid log data"
325        );
326        let swap = result.unwrap();
327
328        // Assert all fields are correctly transformed
329        assert_eq!(swap.block, 0x1581b7e);
330        assert_eq!(
331            swap.transaction_hash,
332            "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
333        );
334        assert_eq!(swap.transaction_index, 5);
335        assert_eq!(swap.log_index, 10);
336        assert_eq!(swap.timestamp, UnixNanos::default());
337        assert_eq!(
338            swap.sender,
339            "0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"
340                .parse()
341                .unwrap()
342        );
343        assert_eq!(swap.side, OrderSide::Sell); // amount0 is positive (1 ETH), so selling token0
344
345        // Test data has amount0 = 1 ETH (0x0de0b6b3a7640000) and amount1 = 500 USDT (0x1dcd6500)
346        // amount0 = 1000000000000000000 wei = 1.0 ETH
347        assert_eq!(swap.quantity.as_f64(), 1.0);
348        assert_eq!(swap.quantity.precision, 18);
349
350        // Price should be amount1/amount0 = 500 USDT / 1 ETH = 500.0
351        assert_eq!(swap.price.as_f64(), 500.0);
352        assert_eq!(swap.price.precision, 18);
353    }
354
355    #[rstest]
356    fn test_transform_hypersync_block() {
357        let block_json = json!({
358            "number": 0x1581b7e_u64,
359            "hash": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
360            "parent_hash": "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
361            "miner": "0x0000000000000000000000000000000000000000",
362            "gas_limit": "0x1c9c380",
363            "gas_used": "0x5208",
364            "timestamp": "0x61bc3f2d"
365        });
366
367        let block: hypersync_client::simple_types::Block =
368            serde_json::from_value(block_json).expect("Failed to deserialize block");
369
370        let result = transform_hypersync_block(Blockchain::Ethereum, block);
371
372        assert!(
373            result.is_ok(),
374            "Transform should succeed with valid block data"
375        );
376        let transformed_block = result.unwrap();
377
378        assert_eq!(transformed_block.number, 0x1581b7e);
379        assert_eq!(
380            transformed_block.hash,
381            "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
382        );
383        assert_eq!(
384            transformed_block.parent_hash,
385            "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
386        );
387        assert_eq!(
388            transformed_block.miner.as_str(),
389            "0x0000000000000000000000000000000000000000"
390        );
391        assert_eq!(transformed_block.gas_limit, 0x1c9c380);
392        assert_eq!(transformed_block.gas_used, 0x5208);
393
394        // timestamp 0x61bc3f2d = 1639659309 seconds = 1639659309000000000 nanoseconds
395        let expected_timestamp = UnixNanos::new(1639659309 * NANOSECONDS_IN_SECOND);
396        assert_eq!(transformed_block.timestamp, expected_timestamp);
397
398        assert_eq!(transformed_block.chain, Some(Blockchain::Ethereum));
399
400        // Optional fields should be None when not provided in test data
401        assert!(transformed_block.base_fee.is_none());
402        assert!(transformed_block.blob_gas_used.is_none());
403        assert!(transformed_block.excess_blob_gas.is_none());
404    }
405}