live_blocks_hypersync/
watch_hypersync_live_blocks.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use nautilus_blockchain::{config::BlockchainDataClientConfig, data::BlockchainDataClient};
19use nautilus_common::logging::{
20    logger::{Logger, LoggerConfig},
21    writer::FileWriterConfig,
22};
23use nautilus_core::{UUID4, env::get_env_var};
24use posei_trader::DataClient;
25use nautilus_live::runner::AsyncRunner;
26use nautilus_model::{defi::chain::chains, identifiers::TraderId};
27use tokio::sync::Notify;
28
29// Run with `cargo run -p nautilus-blockchain --bin live_blocks_hypersync --features hypersync`
30
31#[tokio::main]
32async fn main() -> Result<(), Box<dyn std::error::Error>> {
33    dotenvy::dotenv().ok();
34
35    let _logger_guard = Logger::init_with_config(
36        TraderId::default(),
37        UUID4::new(),
38        LoggerConfig::default(),
39        FileWriterConfig::new(None, None, None, None),
40    )?;
41
42    let _ = AsyncRunner::default(); // Needed for live channels
43
44    // Setup graceful shutdown with signal handling in a different task
45    let notify = Arc::new(Notify::new());
46    let notifier = notify.clone();
47    tokio::spawn(async move {
48        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
49            .expect("Failed to create SIGTERM listener");
50        let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
51            .expect("Failed to create SIGINT listener");
52        tokio::select! {
53            _ = sigterm.recv() => {}
54            _ = sigint.recv() => {}
55        }
56        log::info!("Shutdown signal received, shutting down...");
57        notifier.notify_one();
58    });
59
60    // Initialize the blockchain data client, connect and subscribe to live blocks with HyperSync watch
61    let chain = Arc::new(chains::ETHEREUM.clone());
62    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
63    let blockchain_config = BlockchainDataClientConfig::new(
64        chain.clone(),
65        http_rpc_url,
66        None, // RPC requests per second
67        None, // WSS RPC URL
68        true, // Use hypersync for live data
69        None, // from_block
70    );
71
72    let mut data_client = BlockchainDataClient::new(blockchain_config);
73
74    data_client.connect().await?;
75    data_client.subscribe_blocks_async().await?;
76
77    loop {
78        tokio::select! {
79            () = notify.notified() => break,
80            () = data_client.process_hypersync_messages() => {}
81        }
82    }
83
84    data_client.disconnect().await?;
85    Ok(())
86}