nautilus_blockchain/hypersync/
client.rs1use std::{collections::BTreeSet, sync::Arc};
17
18use ahash::AHashMap;
19use alloy::primitives::{Address, keccak256};
20use futures_util::Stream;
21use hypersync_client::{
22 net_types::{BlockSelection, FieldSelection, Query},
23 simple_types::Log,
24};
25use nautilus_core::UnixNanos;
26use nautilus_model::defi::{AmmType, Block, Dex, Pool, SharedChain, Token};
27use reqwest::Url;
28
29use crate::{
30 hypersync::transform::{transform_hypersync_block, transform_hypersync_swap_log},
31 rpc::types::BlockchainMessage,
32};
33
34const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
37
38#[derive(Debug)]
40pub struct HyperSyncClient {
41 chain: SharedChain,
43 client: Arc<hypersync_client::Client>,
45 blocks_task: Option<tokio::task::JoinHandle<()>>,
47 swaps_tasks: AHashMap<Address, tokio::task::JoinHandle<()>>,
49 tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
51}
52
53impl HyperSyncClient {
54 #[must_use]
60 pub fn new(
61 chain: SharedChain,
62 tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
63 ) -> Self {
64 let mut config = hypersync_client::ClientConfig::default();
65 let hypersync_url =
66 Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
67 config.url = Some(hypersync_url);
68 let client = hypersync_client::Client::new(config).unwrap();
69
70 Self {
71 chain,
72 client: Arc::new(client),
73 blocks_task: None,
74 swaps_tasks: AHashMap::new(),
75 tx,
76 }
77 }
78
79 pub async fn request_contract_events_stream(
81 &self,
82 from_block: u64,
83 to_block: Option<u64>,
84 contract_address: &str,
85 event_signature: &str,
86 additional_topics: Vec<String>,
87 ) -> impl Stream<Item = Log> + use<> {
88 let event_hash = keccak256(event_signature.as_bytes());
89 let topic0 = format!("0x{}", hex::encode(event_hash));
90
91 let mut topics_array = Vec::new();
92 topics_array.push(vec![topic0]);
93 for additional_topic in additional_topics {
94 topics_array.push(vec![additional_topic]);
95 }
96
97 let mut query_value = serde_json::json!({
98 "from_block": from_block,
99 "logs": [{
100 "topics": topics_array,
101 "address": [
102 contract_address,
103 ]
104 }],
105 "field_selection": {
106 "log": [
107 "block_number",
108 "transaction_hash",
109 "transaction_index",
110 "log_index",
111 "data",
112 "topic0",
113 "topic1",
114 "topic2",
115 "topic3",
116 ]
117 }
118 });
119
120 if let Some(to_block) = to_block {
121 if let Some(obj) = query_value.as_object_mut() {
122 obj.insert("to_block".to_string(), serde_json::json!(to_block));
123 }
124 }
125
126 let query = serde_json::from_value(query_value).unwrap();
127
128 let mut rx = self
129 .client
130 .clone()
131 .stream(query, Default::default())
132 .await
133 .expect("Failed to create stream");
134
135 async_stream::stream! {
136 while let Some(response) = rx.recv().await {
137 let response = response.unwrap();
138
139 for batch in response.data.logs {
140 for log in batch {
141 yield log
142 }
143 }
144 }
145 }
146 }
147
148 pub fn disconnect(&mut self) {
150 self.unsubscribe_blocks();
151 self.unsubscribe_all_swaps();
152 }
153
154 pub async fn current_block(&self) -> u64 {
156 self.client.get_height().await.unwrap()
157 }
158
159 pub async fn request_blocks_stream(
161 &self,
162 from_block: u64,
163 to_block: Option<u64>,
164 ) -> impl Stream<Item = Block> {
165 let query = Self::construct_block_query(from_block, to_block);
166 let mut rx = self
167 .client
168 .clone()
169 .stream(query, Default::default())
170 .await
171 .unwrap();
172
173 let chain = self.chain.name;
174
175 async_stream::stream! {
176 while let Some(response) = rx.recv().await {
177 let response = response.unwrap();
178 for batch in response.data.blocks {
179 for received_block in batch {
180 let block = transform_hypersync_block(chain, received_block).unwrap();
181 yield block
182 }
183 }
184 }
185 }
186 }
187
188 pub fn subscribe_blocks(&mut self) {
190 let chain = self.chain.name;
191 let client = self.client.clone();
192 let tx = self.tx.clone();
193
194 let task = tokio::spawn(async move {
195 tracing::debug!("Starting task 'blocks_feed");
196
197 let current_block_height = client.get_height().await.unwrap();
198 let mut query = Self::construct_block_query(current_block_height, None);
199
200 loop {
201 let response = client.get(&query).await.unwrap();
202 for batch in response.data.blocks {
203 for received_block in batch {
204 let block = transform_hypersync_block(chain, received_block).unwrap();
205 let msg = BlockchainMessage::Block(block);
206 if let Err(e) = tx.send(msg) {
207 log::error!("Error sending message: {e}");
208 }
209 }
210 }
211
212 if let Some(archive_block_height) = response.archive_height {
213 if archive_block_height < response.next_block {
214 while client.get_height().await.unwrap() < response.next_block {
215 tokio::time::sleep(std::time::Duration::from_millis(
216 BLOCK_POLLING_INTERVAL_MS,
217 ))
218 .await;
219 }
220 }
221 }
222
223 query.from_block = response.next_block;
224 }
225 });
226
227 self.blocks_task = Some(task);
228 }
229
230 fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
232 let all_block_fields: BTreeSet<String> = hypersync_schema::block_header()
233 .fields
234 .iter()
235 .map(|x| x.name.clone())
236 .collect();
237
238 Query {
239 from_block,
240 to_block,
241 blocks: vec![BlockSelection::default()],
242 field_selection: FieldSelection {
243 block: all_block_fields,
244 ..Default::default()
245 },
246 ..Default::default()
247 }
248 }
249
250 pub fn subscribe_pool_swaps(&mut self, pool_address: Address) {
252 let chain_ref = self.chain.clone(); let client = self.client.clone();
254 let tx = self.tx.clone();
255
256 let task = tokio::spawn(async move {
257 tracing::debug!("Starting task 'swaps_feed' for pool: {pool_address}");
258
259 let dex = std::sync::Arc::new(Dex::new(
262 (*chain_ref).clone(),
263 "Uniswap V3",
264 "0x1F98431c8aD98523631AE4a59f267346ea31F984", AmmType::CLAMM,
266 "PoolCreated(address,address,uint24,int24,address)",
267 "Swap(address,address,int256,int256,uint160,uint128,int24)",
268 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
269 "Burn(address,int24,int24,uint128,uint256,uint256)",
270 ));
271
272 let token0 = Token::new(
273 chain_ref.clone(),
274 "0xA0b86a33E6441b936662bb6B5d1F8Fb0E2b57A5D"
275 .parse()
276 .unwrap(), "Wrapped Ether".to_string(),
278 "WETH".to_string(),
279 18,
280 );
281
282 let token1 = Token::new(
283 chain_ref.clone(),
284 "0xdAC17F958D2ee523a2206206994597C13D831ec7"
285 .parse()
286 .unwrap(), "Tether USD".to_string(),
288 "USDT".to_string(),
289 6, );
291
292 let pool = std::sync::Arc::new(Pool::new(
293 chain_ref.clone(),
294 (*dex).clone(),
295 pool_address,
296 0, token0,
298 token1,
299 3000, 60, UnixNanos::default(),
302 ));
303
304 let current_block_height = client.get_height().await.unwrap();
305 let mut query =
306 Self::construct_pool_swaps_query(pool_address, current_block_height, None);
307
308 loop {
309 let response = client.get(&query).await.unwrap();
310
311 for batch in response.data.logs {
313 for log in batch {
314 tracing::debug!(
315 "Received swap log from pool {pool_address}: topics={:?}, data={:?}, block={:?}, tx_hash={:?}",
316 log.topics,
317 log.data,
318 log.block_number,
319 log.transaction_hash
320 );
321 match transform_hypersync_swap_log(
322 chain_ref.clone(),
323 dex.clone(),
324 pool.clone(),
325 UnixNanos::default(), &log,
327 ) {
328 Ok(swap) => {
329 let msg = crate::rpc::types::BlockchainMessage::Swap(swap);
330 if let Err(e) = tx.send(msg) {
331 tracing::error!("Error sending swap message: {e}");
332 }
333 }
334 Err(e) => {
335 tracing::warn!(
336 "Failed to transform swap log from pool {pool_address}: {e}"
337 );
338 }
339 }
340 }
341 }
342
343 if let Some(archive_block_height) = response.archive_height {
344 if archive_block_height < response.next_block {
345 while client.get_height().await.unwrap() < response.next_block {
346 tokio::time::sleep(std::time::Duration::from_millis(
347 BLOCK_POLLING_INTERVAL_MS,
348 ))
349 .await;
350 }
351 }
352 }
353
354 query.from_block = response.next_block;
355 }
356 });
357
358 self.swaps_tasks.insert(pool_address, task);
359 }
360
361 fn construct_pool_swaps_query(
363 pool_address: alloy::primitives::Address,
364 from_block: u64,
365 to_block: Option<u64>,
366 ) -> Query {
367 let swap_topic = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67";
370
371 let mut query_value = serde_json::json!({
372 "from_block": from_block,
373 "logs": [{
374 "topics": [
375 [swap_topic]
376 ],
377 "address": [
378 pool_address.to_string(),
379 ]
380 }],
381 "field_selection": {
382 "log": [
383 "block_number",
384 "transaction_hash",
385 "transaction_index",
386 "log_index",
387 "address",
388 "data",
389 "topic0",
390 "topic1",
391 "topic2",
392 "topic3",
393 ]
394 }
395 });
396
397 if let Some(to_block) = to_block {
398 if let Some(obj) = query_value.as_object_mut() {
399 obj.insert("to_block".to_string(), serde_json::json!(to_block));
400 }
401 }
402
403 serde_json::from_value(query_value).unwrap()
404 }
405
406 pub fn unsubscribe_pool_swaps(&mut self, pool_address: Address) {
408 if let Some(task) = self.swaps_tasks.remove(&pool_address) {
409 task.abort();
410 tracing::debug!("Unsubscribed from swaps for pool: {}", pool_address);
411 }
412 }
413
414 pub fn unsubscribe_all_swaps(&mut self) {
416 for (pool_address, task) in self.swaps_tasks.drain() {
417 task.abort();
418 tracing::debug!("Unsubscribed from swaps for pool: {}", pool_address);
419 }
420 }
421
422 pub fn unsubscribe_blocks(&mut self) {
424 if let Some(task) = self.blocks_task.take() {
425 task.abort();
426 tracing::debug!("Unsubscribed from blocks");
427 }
428 }
429}