nautilus_blockchain/hypersync/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::BTreeSet, sync::Arc};
17
18use ahash::AHashMap;
19use alloy::primitives::{Address, keccak256};
20use futures_util::Stream;
21use hypersync_client::{
22    net_types::{BlockSelection, FieldSelection, Query},
23    simple_types::Log,
24};
25use nautilus_core::UnixNanos;
26use nautilus_model::defi::{AmmType, Block, Dex, Pool, SharedChain, Token};
27use reqwest::Url;
28
29use crate::{
30    hypersync::transform::{transform_hypersync_block, transform_hypersync_swap_log},
31    rpc::types::BlockchainMessage,
32};
33
34/// The interval in milliseconds at which to check for new blocks when waiting
35/// for the hypersync to index the block.
36const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
37
38/// A client for interacting with a HyperSync API to retrieve blockchain data.
39#[derive(Debug)]
40pub struct HyperSyncClient {
41    /// The target blockchain identifier (e.g. Ethereum, Arbitrum).
42    chain: SharedChain,
43    /// The underlying HyperSync Rust client for making API requests.
44    client: Arc<hypersync_client::Client>,
45    /// Background task handle for the block subscription task.
46    blocks_task: Option<tokio::task::JoinHandle<()>>,
47    /// Background task handles for swap subscription tasks (keyed by pool address).
48    swaps_tasks: AHashMap<Address, tokio::task::JoinHandle<()>>,
49    /// Channel for sending blockchain messages to the adapter data client.
50    tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
51}
52
53impl HyperSyncClient {
54    /// Creates a new [`HyperSyncClient`] instance for the given chain and message sender.
55    ///
56    /// # Panics
57    ///
58    /// Panics if the chain's `hypersync_url` is invalid or if the underlying client cannot be initialized.
59    #[must_use]
60    pub fn new(
61        chain: SharedChain,
62        tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
63    ) -> Self {
64        let mut config = hypersync_client::ClientConfig::default();
65        let hypersync_url =
66            Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
67        config.url = Some(hypersync_url);
68        let client = hypersync_client::Client::new(config).unwrap();
69
70        Self {
71            chain,
72            client: Arc::new(client),
73            blocks_task: None,
74            swaps_tasks: AHashMap::new(),
75            tx,
76        }
77    }
78
79    /// Creates a stream of contract event logs matching the specified criteria.
80    pub async fn request_contract_events_stream(
81        &self,
82        from_block: u64,
83        to_block: Option<u64>,
84        contract_address: &str,
85        event_signature: &str,
86        additional_topics: Vec<String>,
87    ) -> impl Stream<Item = Log> + use<> {
88        let event_hash = keccak256(event_signature.as_bytes());
89        let topic0 = format!("0x{}", hex::encode(event_hash));
90
91        let mut topics_array = Vec::new();
92        topics_array.push(vec![topic0]);
93        for additional_topic in additional_topics {
94            topics_array.push(vec![additional_topic]);
95        }
96
97        let mut query_value = serde_json::json!({
98            "from_block": from_block,
99            "logs": [{
100                "topics": topics_array,
101                "address": [
102                    contract_address,
103                ]
104            }],
105            "field_selection": {
106                "log": [
107                    "block_number",
108                    "transaction_hash",
109                    "transaction_index",
110                    "log_index",
111                    "data",
112                    "topic0",
113                    "topic1",
114                    "topic2",
115                    "topic3",
116                ]
117            }
118        });
119
120        if let Some(to_block) = to_block {
121            if let Some(obj) = query_value.as_object_mut() {
122                obj.insert("to_block".to_string(), serde_json::json!(to_block));
123            }
124        }
125
126        let query = serde_json::from_value(query_value).unwrap();
127
128        let mut rx = self
129            .client
130            .clone()
131            .stream(query, Default::default())
132            .await
133            .expect("Failed to create stream");
134
135        async_stream::stream! {
136              while let Some(response) = rx.recv().await {
137                let response = response.unwrap();
138
139                for batch in response.data.logs {
140                    for log in batch {
141                        yield log
142                    }
143                }
144            }
145        }
146    }
147
148    /// Disconnects from the HyperSync service and stops all background tasks.
149    pub fn disconnect(&mut self) {
150        self.unsubscribe_blocks();
151        self.unsubscribe_all_swaps();
152    }
153
154    /// Returns the current block
155    pub async fn current_block(&self) -> u64 {
156        self.client.get_height().await.unwrap()
157    }
158
159    /// Creates a stream that yields blockchain blocks within the specified range.
160    pub async fn request_blocks_stream(
161        &self,
162        from_block: u64,
163        to_block: Option<u64>,
164    ) -> impl Stream<Item = Block> {
165        let query = Self::construct_block_query(from_block, to_block);
166        let mut rx = self
167            .client
168            .clone()
169            .stream(query, Default::default())
170            .await
171            .unwrap();
172
173        let chain = self.chain.name;
174
175        async_stream::stream! {
176            while let Some(response) = rx.recv().await {
177                let response = response.unwrap();
178                for batch in response.data.blocks {
179                        for received_block in batch {
180                            let block = transform_hypersync_block(chain, received_block).unwrap();
181                            yield block
182                        }
183                    }
184            }
185        }
186    }
187
188    /// Starts a background task that continuously polls for new blockchain blocks.
189    pub fn subscribe_blocks(&mut self) {
190        let chain = self.chain.name;
191        let client = self.client.clone();
192        let tx = self.tx.clone();
193
194        let task = tokio::spawn(async move {
195            tracing::debug!("Starting task 'blocks_feed");
196
197            let current_block_height = client.get_height().await.unwrap();
198            let mut query = Self::construct_block_query(current_block_height, None);
199
200            loop {
201                let response = client.get(&query).await.unwrap();
202                for batch in response.data.blocks {
203                    for received_block in batch {
204                        let block = transform_hypersync_block(chain, received_block).unwrap();
205                        let msg = BlockchainMessage::Block(block);
206                        if let Err(e) = tx.send(msg) {
207                            log::error!("Error sending message: {e}");
208                        }
209                    }
210                }
211
212                if let Some(archive_block_height) = response.archive_height {
213                    if archive_block_height < response.next_block {
214                        while client.get_height().await.unwrap() < response.next_block {
215                            tokio::time::sleep(std::time::Duration::from_millis(
216                                BLOCK_POLLING_INTERVAL_MS,
217                            ))
218                            .await;
219                        }
220                    }
221                }
222
223                query.from_block = response.next_block;
224            }
225        });
226
227        self.blocks_task = Some(task);
228    }
229
230    /// Constructs a HyperSync query for fetching blocks with all available fields within the specified range.
231    fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
232        let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
233            .fields
234            .iter()
235            .map(|x| x.name.clone())
236            .collect();
237
238        Query {
239            from_block,
240            to_block,
241            blocks: vec![BlockSelection::default()],
242            field_selection: FieldSelection {
243                block: all_block_fields,
244                ..Default::default()
245            },
246            ..Default::default()
247        }
248    }
249
250    /// Subscribes to swap events for a specific pool address.
251    pub fn subscribe_pool_swaps(&mut self, pool_address: Address) {
252        let chain_ref = self.chain.clone(); // Use existing SharedChain
253        let client = self.client.clone();
254        let tx = self.tx.clone();
255
256        let task = tokio::spawn(async move {
257            tracing::debug!("Starting task 'swaps_feed' for pool: {pool_address}");
258
259            // TODO: These objects should be fetched from cache or RPC calls
260            // For now, create minimal objects just to get compilation working
261            let dex = std::sync::Arc::new(Dex::new(
262                (*chain_ref).clone(),
263                "Uniswap V3",
264                "0x1F98431c8aD98523631AE4a59f267346ea31F984", // Uniswap V3 factory
265                AmmType::CLAMM,
266                "PoolCreated(address,address,uint24,int24,address)",
267                "Swap(address,address,int256,int256,uint160,uint128,int24)",
268                "Mint(address,address,int24,int24,uint128,uint256,uint256)",
269                "Burn(address,int24,int24,uint128,uint256,uint256)",
270            ));
271
272            let token0 = Token::new(
273                chain_ref.clone(),
274                "0xA0b86a33E6441b936662bb6B5d1F8Fb0E2b57A5D"
275                    .parse()
276                    .unwrap(), // WETH
277                "Wrapped Ether".to_string(),
278                "WETH".to_string(),
279                18,
280            );
281
282            let token1 = Token::new(
283                chain_ref.clone(),
284                "0xdAC17F958D2ee523a2206206994597C13D831ec7"
285                    .parse()
286                    .unwrap(), // USDT
287                "Tether USD".to_string(),
288                "USDT".to_string(),
289                6, // USDT has 6 decimals
290            );
291
292            let pool = std::sync::Arc::new(Pool::new(
293                chain_ref.clone(),
294                (*dex).clone(),
295                pool_address,
296                0, // creation block - TODO: fetch from cache
297                token0,
298                token1,
299                3000, // 0.3% fee tier
300                60,   // tick spacing
301                UnixNanos::default(),
302            ));
303
304            let current_block_height = client.get_height().await.unwrap();
305            let mut query =
306                Self::construct_pool_swaps_query(pool_address, current_block_height, None);
307
308            loop {
309                let response = client.get(&query).await.unwrap();
310
311                // Process logs for swap events
312                for batch in response.data.logs {
313                    for log in batch {
314                        tracing::debug!(
315                            "Received swap log from pool {pool_address}: topics={:?}, data={:?}, block={:?}, tx_hash={:?}",
316                            log.topics,
317                            log.data,
318                            log.block_number,
319                            log.transaction_hash
320                        );
321                        match transform_hypersync_swap_log(
322                            chain_ref.clone(),
323                            dex.clone(),
324                            pool.clone(),
325                            UnixNanos::default(), // TODO: block timestamp placeholder
326                            &log,
327                        ) {
328                            Ok(swap) => {
329                                let msg = crate::rpc::types::BlockchainMessage::Swap(swap);
330                                if let Err(e) = tx.send(msg) {
331                                    tracing::error!("Error sending swap message: {e}");
332                                }
333                            }
334                            Err(e) => {
335                                tracing::warn!(
336                                    "Failed to transform swap log from pool {pool_address}: {e}"
337                                );
338                            }
339                        }
340                    }
341                }
342
343                if let Some(archive_block_height) = response.archive_height {
344                    if archive_block_height < response.next_block {
345                        while client.get_height().await.unwrap() < response.next_block {
346                            tokio::time::sleep(std::time::Duration::from_millis(
347                                BLOCK_POLLING_INTERVAL_MS,
348                            ))
349                            .await;
350                        }
351                    }
352                }
353
354                query.from_block = response.next_block;
355            }
356        });
357
358        self.swaps_tasks.insert(pool_address, task);
359    }
360
361    /// Constructs a HyperSync query for fetching swap events from a specific pool.
362    fn construct_pool_swaps_query(
363        pool_address: alloy::primitives::Address,
364        from_block: u64,
365        to_block: Option<u64>,
366    ) -> Query {
367        // Uniswap V3 Swap event signature:
368        // Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)
369        let swap_topic = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67";
370
371        let mut query_value = serde_json::json!({
372            "from_block": from_block,
373            "logs": [{
374                "topics": [
375                    [swap_topic]
376                ],
377                "address": [
378                    pool_address.to_string(),
379                ]
380            }],
381            "field_selection": {
382                "log": [
383                    "block_number",
384                    "transaction_hash",
385                    "transaction_index",
386                    "log_index",
387                    "address",
388                    "data",
389                    "topic0",
390                    "topic1",
391                    "topic2",
392                    "topic3",
393                ]
394            }
395        });
396
397        if let Some(to_block) = to_block {
398            if let Some(obj) = query_value.as_object_mut() {
399                obj.insert("to_block".to_string(), serde_json::json!(to_block));
400            }
401        }
402
403        serde_json::from_value(query_value).unwrap()
404    }
405
406    /// Unsubscribes from swap events for a specific pool address.
407    pub fn unsubscribe_pool_swaps(&mut self, pool_address: Address) {
408        if let Some(task) = self.swaps_tasks.remove(&pool_address) {
409            task.abort();
410            tracing::debug!("Unsubscribed from swaps for pool: {}", pool_address);
411        }
412    }
413
414    /// Unsubscribes from all swap events by stopping all swap background tasks.
415    pub fn unsubscribe_all_swaps(&mut self) {
416        for (pool_address, task) in self.swaps_tasks.drain() {
417            task.abort();
418            tracing::debug!("Unsubscribed from swaps for pool: {}", pool_address);
419        }
420    }
421
422    /// Unsubscribes from new blocks by stopping the background watch task.
423    pub fn unsubscribe_blocks(&mut self) {
424        if let Some(task) = self.blocks_task.take() {
425            task.abort();
426            tracing::debug!("Unsubscribed from blocks");
427        }
428    }
429}