nautilus_blockchain/rpc/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::websocket::{Consumer, WebSocketClient, WebSocketConfig};
21use reqwest::header::USER_AGENT;
22use tokio_tungstenite::tungstenite::Message;
23
24use crate::rpc::{
25    error::BlockchainRpcClientError,
26    types::{BlockchainMessage, RpcEventType},
27    utils::{
28        extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
29    },
30};
31
32/// Core implementation of a blockchain RPC client that serves as the base for all chain-specific clients.
33///
34/// It provides a shared implementation of common blockchain RPC functionality, handling:
35/// - WebSocket connection management with blockchain RPC node.
36/// - Subscription lifecycle (creation, tracking, and termination).
37/// - Message serialization and deserialization of RPC messages.
38/// - Event type mapping and dispatching.
39#[derive(Debug)]
40pub struct CoreBlockchainRpcClient {
41    /// The blockchain network type this client connects to.
42    chain: Chain,
43    /// WebSocket secure URL for the blockchain node's RPC endpoint.
44    wss_rpc_url: String,
45    /// Auto-incrementing counter for generating unique RPC request IDs.
46    request_id: u64,
47    /// Tracks in-flight subscription requests by mapping request IDs to their event types.
48    pending_subscription_request: HashMap<u64, RpcEventType>,
49    /// Maps active subscription IDs to their corresponding event types for message
50    /// deserialization.
51    subscription_event_types: HashMap<String, RpcEventType>,
52    /// The active WebSocket client connection.
53    wss_client: Option<Arc<WebSocketClient>>,
54    /// Channel receiver for consuming WebSocket messages.
55    wss_consumer_rx: Option<tokio::sync::mpsc::Receiver<Message>>,
56}
57
58impl CoreBlockchainRpcClient {
59    #[must_use]
60    pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
61        Self {
62            chain,
63            wss_rpc_url,
64            request_id: 1,
65            wss_client: None,
66            pending_subscription_request: HashMap::new(),
67            subscription_event_types: HashMap::new(),
68            wss_consumer_rx: None,
69        }
70    }
71
72    /// Establishes a WebSocket connection to the blockchain node and sets up the message channel.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the WebSocket connection fails.
77    pub async fn connect(&mut self) -> anyhow::Result<()> {
78        let (tx, rx) = tokio::sync::mpsc::channel(100);
79        let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
80        // Most of the blockchain rpc nodes require a heartbeat to keep the connection alive
81        let heartbeat_interval = 30;
82        let config = WebSocketConfig {
83            url: self.wss_rpc_url.clone(),
84            headers: vec![user_agent],
85            handler: Consumer::Rust(tx),
86            heartbeat: Some(heartbeat_interval),
87            heartbeat_msg: None,
88            #[cfg(feature = "python")]
89            ping_handler: None,
90            reconnect_timeout_ms: Some(5_000),
91            reconnect_delay_initial_ms: None,
92            reconnect_delay_max_ms: None,
93            reconnect_backoff_factor: None,
94            reconnect_jitter_ms: None,
95        };
96        let client = WebSocketClient::connect(
97            config,
98            #[cfg(feature = "python")]
99            None,
100            #[cfg(feature = "python")]
101            None,
102            #[cfg(feature = "python")]
103            None,
104            vec![],
105            None,
106        )
107        .await?;
108
109        self.wss_client = Some(Arc::new(client));
110        self.wss_consumer_rx = Some(rx);
111
112        Ok(())
113    }
114
115    /// Registers a subscription for the specified event type and records it internally with the given ID.
116    async fn subscribe_events(
117        &mut self,
118        event_type: RpcEventType,
119        subscription_id: String,
120    ) -> Result<(), BlockchainRpcClientError> {
121        if let Some(client) = &self.wss_client {
122            log::info!("Subscribing to new blocks on chain '{}'", self.chain.name);
123            let msg = serde_json::json!({
124                "method": "eth_subscribe",
125                "id": self.request_id,
126                "jsonrpc": "2.0",
127                "params": [subscription_id]
128            });
129            self.pending_subscription_request
130                .insert(self.request_id, event_type);
131            self.request_id += 1;
132            if let Err(err) = client.send_text(msg.to_string(), None).await {
133                log::error!("Error sending subscribe message: {err:?}");
134            }
135            Ok(())
136        } else {
137            Err(BlockchainRpcClientError::ClientError(String::from(
138                "Client not connected",
139            )))
140        }
141    }
142
143    /// Terminates a subscription with the blockchain node using the provided subscription ID.
144    async fn unsubscribe_events(
145        &self,
146        subscription_id: String,
147    ) -> Result<(), BlockchainRpcClientError> {
148        if let Some(client) = &self.wss_client {
149            log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
150            let msg = serde_json::json!({
151                "method": "eth_unsubscribe",
152                "id": 1,
153                "jsonrpc": "2.0",
154                "params": [subscription_id]
155            });
156            if let Err(err) = client.send_text(msg.to_string(), None).await {
157                log::error!("Error sending unsubscribe message: {err:?}");
158            }
159            Ok(())
160        } else {
161            Err(BlockchainRpcClientError::ClientError(String::from(
162                "Client not connected",
163            )))
164        }
165    }
166
167    /// Waits for and returns the next available message from the WebSocket channel.
168    pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
169        match &mut self.wss_consumer_rx {
170            Some(rx) => rx.recv().await,
171            None => None,
172        }
173    }
174
175    /// Retrieves, parses, and returns the next blockchain RPC message as a structured `BlockchainRpcMessage` type.
176    ///
177    /// # Panics
178    ///
179    /// Panics if expected fields (`id`, `result`) are missing or cannot be converted when handling subscription confirmations or events.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if the RPC channel encounters an error or if deserialization of the message fails.
184    pub async fn next_rpc_message(
185        &mut self,
186    ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
187        while let Some(msg) = self.wait_on_rpc_channel().await {
188            match msg {
189                Message::Text(text) => match serde_json::from_str::<serde_json::Value>(&text) {
190                    Ok(json) => {
191                        if is_subscription_confirmation_response(&json) {
192                            let subscription_request_id = json.get("id").unwrap().as_u64().unwrap();
193                            let result = json.get("result").unwrap().as_str().unwrap();
194                            let event_type = self
195                                .pending_subscription_request
196                                .get(&subscription_request_id)
197                                .unwrap();
198                            self.subscription_event_types
199                                .insert(result.to_string(), event_type.clone());
200                            self.pending_subscription_request
201                                .remove(&subscription_request_id);
202                            continue;
203                        } else if is_subscription_event(&json) {
204                            let subscription_id = match extract_rpc_subscription_id(&json) {
205                                Some(id) => id,
206                                None => {
207                                    return Err(BlockchainRpcClientError::InternalRpcClientError(
208                                        "Error parsing subscription id from valid rpc response"
209                                            .to_string(),
210                                    ));
211                                }
212                            };
213                            if let Some(event_type) =
214                                self.subscription_event_types.get(subscription_id)
215                            {
216                                match event_type {
217                                    RpcEventType::NewBlock => {
218                                        return match serde_json::from_value::<
219                                            RpcNodeWssResponse<Block>,
220                                        >(json)
221                                        {
222                                            Ok(block_response) => {
223                                                let block = block_response.params.result;
224                                                Ok(BlockchainMessage::Block(block))
225                                            }
226                                            Err(e) => {
227                                                Err(BlockchainRpcClientError::MessageParsingError(
228                                                    format!(
229                                                        "Error parsing rpc response to block with error {e}"
230                                                    ),
231                                                ))
232                                            }
233                                        };
234                                    }
235                                }
236                            }
237                            return Err(BlockchainRpcClientError::InternalRpcClientError(format!(
238                                "Event type not found for defined subscription id {subscription_id}"
239                            )));
240                        }
241                        return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
242                            json.to_string(),
243                        ));
244                    }
245                    Err(e) => {
246                        return Err(BlockchainRpcClientError::MessageParsingError(e.to_string()));
247                    }
248                },
249                Message::Pong(_) => {
250                    continue;
251                }
252                _ => {
253                    return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
254                        msg.to_string(),
255                    ));
256                }
257            }
258        }
259
260        Err(BlockchainRpcClientError::NoMessageReceived)
261    }
262
263    /// Subscribes to real-time block updates from the blockchain node.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the subscription request fails or if the client is not connected.
268    pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
269        self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
270            .await
271    }
272
273    /// Cancels the subscription to real-time block updates.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the unsubscription request fails or if the client is not connected.
278    pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
279        self.unsubscribe_events(String::from("newHeads")).await?;
280
281        // Find and remove the subscription ID associated with the newBlock event type
282        let subscription_ids_to_remove: Vec<String> = self
283            .subscription_event_types
284            .iter()
285            .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
286            .map(|(id, _)| id.clone())
287            .collect();
288
289        for id in subscription_ids_to_remove {
290            self.subscription_event_types.remove(&id);
291        }
292        Ok(())
293    }
294}