live_blocks_hypersync/
watch_hypersync_live_blocks.rs1use std::sync::Arc;
17
18use nautilus_blockchain::{config::BlockchainDataClientConfig, data::BlockchainDataClient};
19use nautilus_common::logging::{
20 logger::{Logger, LoggerConfig},
21 writer::FileWriterConfig,
22};
23use nautilus_core::{UUID4, env::get_env_var};
24use posei_trader::DataClient;
25use nautilus_live::runner::AsyncRunner;
26use nautilus_model::{defi::chain::chains, identifiers::TraderId};
27use tokio::sync::Notify;
28
29#[tokio::main]
32async fn main() -> Result<(), Box<dyn std::error::Error>> {
33 dotenvy::dotenv().ok();
34
35 let _logger_guard = Logger::init_with_config(
36 TraderId::default(),
37 UUID4::new(),
38 LoggerConfig::default(),
39 FileWriterConfig::new(None, None, None, None),
40 )?;
41
42 let _ = AsyncRunner::default(); let notify = Arc::new(Notify::new());
46 let notifier = notify.clone();
47 tokio::spawn(async move {
48 let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
49 .expect("Failed to create SIGTERM listener");
50 let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
51 .expect("Failed to create SIGINT listener");
52 tokio::select! {
53 _ = sigterm.recv() => {}
54 _ = sigint.recv() => {}
55 }
56 log::info!("Shutdown signal received, shutting down...");
57 notifier.notify_one();
58 });
59
60 let chain = Arc::new(chains::ETHEREUM.clone());
62 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
63 let blockchain_config = BlockchainDataClientConfig::new(
64 chain.clone(),
65 http_rpc_url,
66 None, None, true, None, );
71
72 let mut data_client = BlockchainDataClient::new(blockchain_config);
73
74 data_client.connect().await?;
75 data_client.subscribe_blocks_async().await?;
76
77 loop {
78 tokio::select! {
79 () = notify.notified() => break,
80 () = data_client.process_hypersync_messages() => {}
81 }
82 }
83
84 data_client.disconnect().await?;
85 Ok(())
86}