nautilus_blockchain/rpc/
core.rs1use std::{collections::HashMap, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::websocket::{Consumer, WebSocketClient, WebSocketConfig};
21use reqwest::header::USER_AGENT;
22use tokio_tungstenite::tungstenite::Message;
23
24use crate::rpc::{
25 error::BlockchainRpcClientError,
26 types::{BlockchainMessage, RpcEventType},
27 utils::{
28 extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
29 },
30};
31
32#[derive(Debug)]
40pub struct CoreBlockchainRpcClient {
41 chain: Chain,
43 wss_rpc_url: String,
45 request_id: u64,
47 pending_subscription_request: HashMap<u64, RpcEventType>,
49 subscription_event_types: HashMap<String, RpcEventType>,
52 wss_client: Option<Arc<WebSocketClient>>,
54 wss_consumer_rx: Option<tokio::sync::mpsc::Receiver<Message>>,
56}
57
58impl CoreBlockchainRpcClient {
59 #[must_use]
60 pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
61 Self {
62 chain,
63 wss_rpc_url,
64 request_id: 1,
65 wss_client: None,
66 pending_subscription_request: HashMap::new(),
67 subscription_event_types: HashMap::new(),
68 wss_consumer_rx: None,
69 }
70 }
71
72 pub async fn connect(&mut self) -> anyhow::Result<()> {
78 let (tx, rx) = tokio::sync::mpsc::channel(100);
79 let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
80 let heartbeat_interval = 30;
82 let config = WebSocketConfig {
83 url: self.wss_rpc_url.clone(),
84 headers: vec![user_agent],
85 handler: Consumer::Rust(tx),
86 heartbeat: Some(heartbeat_interval),
87 heartbeat_msg: None,
88 #[cfg(feature = "python")]
89 ping_handler: None,
90 reconnect_timeout_ms: Some(5_000),
91 reconnect_delay_initial_ms: None,
92 reconnect_delay_max_ms: None,
93 reconnect_backoff_factor: None,
94 reconnect_jitter_ms: None,
95 };
96 let client = WebSocketClient::connect(
97 config,
98 #[cfg(feature = "python")]
99 None,
100 #[cfg(feature = "python")]
101 None,
102 #[cfg(feature = "python")]
103 None,
104 vec![],
105 None,
106 )
107 .await?;
108
109 self.wss_client = Some(Arc::new(client));
110 self.wss_consumer_rx = Some(rx);
111
112 Ok(())
113 }
114
115 async fn subscribe_events(
117 &mut self,
118 event_type: RpcEventType,
119 subscription_id: String,
120 ) -> Result<(), BlockchainRpcClientError> {
121 if let Some(client) = &self.wss_client {
122 log::info!("Subscribing to new blocks on chain '{}'", self.chain.name);
123 let msg = serde_json::json!({
124 "method": "eth_subscribe",
125 "id": self.request_id,
126 "jsonrpc": "2.0",
127 "params": [subscription_id]
128 });
129 self.pending_subscription_request
130 .insert(self.request_id, event_type);
131 self.request_id += 1;
132 if let Err(err) = client.send_text(msg.to_string(), None).await {
133 log::error!("Error sending subscribe message: {err:?}");
134 }
135 Ok(())
136 } else {
137 Err(BlockchainRpcClientError::ClientError(String::from(
138 "Client not connected",
139 )))
140 }
141 }
142
143 async fn unsubscribe_events(
145 &self,
146 subscription_id: String,
147 ) -> Result<(), BlockchainRpcClientError> {
148 if let Some(client) = &self.wss_client {
149 log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
150 let msg = serde_json::json!({
151 "method": "eth_unsubscribe",
152 "id": 1,
153 "jsonrpc": "2.0",
154 "params": [subscription_id]
155 });
156 if let Err(err) = client.send_text(msg.to_string(), None).await {
157 log::error!("Error sending unsubscribe message: {err:?}");
158 }
159 Ok(())
160 } else {
161 Err(BlockchainRpcClientError::ClientError(String::from(
162 "Client not connected",
163 )))
164 }
165 }
166
167 pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
169 match &mut self.wss_consumer_rx {
170 Some(rx) => rx.recv().await,
171 None => None,
172 }
173 }
174
175 pub async fn next_rpc_message(
185 &mut self,
186 ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
187 while let Some(msg) = self.wait_on_rpc_channel().await {
188 match msg {
189 Message::Text(text) => match serde_json::from_str::<serde_json::Value>(&text) {
190 Ok(json) => {
191 if is_subscription_confirmation_response(&json) {
192 let subscription_request_id = json.get("id").unwrap().as_u64().unwrap();
193 let result = json.get("result").unwrap().as_str().unwrap();
194 let event_type = self
195 .pending_subscription_request
196 .get(&subscription_request_id)
197 .unwrap();
198 self.subscription_event_types
199 .insert(result.to_string(), event_type.clone());
200 self.pending_subscription_request
201 .remove(&subscription_request_id);
202 continue;
203 } else if is_subscription_event(&json) {
204 let subscription_id = match extract_rpc_subscription_id(&json) {
205 Some(id) => id,
206 None => {
207 return Err(BlockchainRpcClientError::InternalRpcClientError(
208 "Error parsing subscription id from valid rpc response"
209 .to_string(),
210 ));
211 }
212 };
213 if let Some(event_type) =
214 self.subscription_event_types.get(subscription_id)
215 {
216 match event_type {
217 RpcEventType::NewBlock => {
218 return match serde_json::from_value::<
219 RpcNodeWssResponse<Block>,
220 >(json)
221 {
222 Ok(block_response) => {
223 let block = block_response.params.result;
224 Ok(BlockchainMessage::Block(block))
225 }
226 Err(e) => {
227 Err(BlockchainRpcClientError::MessageParsingError(
228 format!(
229 "Error parsing rpc response to block with error {e}"
230 ),
231 ))
232 }
233 };
234 }
235 }
236 }
237 return Err(BlockchainRpcClientError::InternalRpcClientError(format!(
238 "Event type not found for defined subscription id {subscription_id}"
239 )));
240 }
241 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
242 json.to_string(),
243 ));
244 }
245 Err(e) => {
246 return Err(BlockchainRpcClientError::MessageParsingError(e.to_string()));
247 }
248 },
249 Message::Pong(_) => {
250 continue;
251 }
252 _ => {
253 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
254 msg.to_string(),
255 ));
256 }
257 }
258 }
259
260 Err(BlockchainRpcClientError::NoMessageReceived)
261 }
262
263 pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
269 self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
270 .await
271 }
272
273 pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
279 self.unsubscribe_events(String::from("newHeads")).await?;
280
281 let subscription_ids_to_remove: Vec<String> = self
283 .subscription_event_types
284 .iter()
285 .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
286 .map(|(id, _)| id.clone())
287 .collect();
288
289 for id in subscription_ids_to_remove {
290 self.subscription_event_types.remove(&id);
291 }
292 Ok(())
293 }
294}