live_blocks_rpc/
watch_rpc_live_blocks.rs1use std::sync::Arc;
17
18use nautilus_blockchain::{config::BlockchainDataClientConfig, data::BlockchainDataClient};
19use nautilus_common::logging::{
20 logger::{Logger, LoggerConfig},
21 writer::FileWriterConfig,
22};
23use nautilus_core::{UUID4, env::get_env_var};
24use posei_trader::DataClient;
25use nautilus_live::runner::AsyncRunner;
26use nautilus_model::{
27 defi::chain::{Blockchain, Chain, chains},
28 identifiers::TraderId,
29};
30use tokio::sync::Notify;
31
32#[tokio::main]
35async fn main() -> Result<(), Box<dyn std::error::Error>> {
36 dotenvy::dotenv().ok();
37
38 let _logger_guard = Logger::init_with_config(
39 TraderId::default(),
40 UUID4::new(),
41 LoggerConfig::default(),
42 FileWriterConfig::new(None, None, None, None),
43 )?;
44
45 let _ = AsyncRunner::default(); let notify = Arc::new(Notify::new());
49 let notifier = notify.clone();
50 tokio::spawn(async move {
51 let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
52 .expect("Failed to create SIGTERM listener");
53 let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
54 .expect("Failed to create SIGINT listener");
55 tokio::select! {
56 _ = sigterm.recv() => {}
57 _ = sigint.recv() => {}
58 }
59 log::info!("Shutdown signal received, shutting down...");
60 notifier.notify_one();
61 });
62
63 let chain: Chain = match std::env::var("CHAIN") {
65 Ok(chain_str) => {
66 if let Ok(blockchain) = chain_str.parse::<Blockchain>() {
67 match blockchain {
68 Blockchain::Ethereum => chains::ETHEREUM.clone(),
69 Blockchain::Base => chains::BASE.clone(),
70 Blockchain::Arbitrum => chains::ARBITRUM.clone(),
71 Blockchain::Polygon => chains::POLYGON.clone(),
72 _ => panic!("Invalid chain {chain_str}"),
73 }
74 } else {
75 panic!("Invalid chain {chain_str}");
76 }
77 }
78 Err(_) => chains::ETHEREUM.clone(), };
80 let chain = Arc::new(chain);
81 let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
82 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
83 let blockchain_config = BlockchainDataClientConfig::new(
84 chain.clone(),
85 http_rpc_url,
86 None, Some(wss_rpc_url),
88 false, None, );
91
92 let mut data_client = BlockchainDataClient::new(blockchain_config);
93
94 data_client.connect().await?;
95 data_client.subscribe_blocks_async().await?;
96
97 loop {
98 tokio::select! {
99 () = notify.notified() => break,
100 () = data_client.process_rpc_messages() => {}
101 }
102 }
103
104 data_client.disconnect().await?;
105 Ok(())
106}