nautilus_blockchain/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use nautilus_common::messages::data::{
19    RequestBars, RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestQuotes,
20    RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
21    SubscribeBookSnapshots, SubscribeCustomData, SubscribeIndexPrices, SubscribeInstrument,
22    SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices,
23    SubscribeQuotes, SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas,
24    UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCustomData,
25    UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
26    UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
27    UnsubscribeTrades,
28};
29use posei_trader::client::DataClient;
30use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
31use nautilus_model::{
32    defi::{
33        amm::Pool,
34        chain::{Blockchain, SharedChain},
35        dex::Dex,
36        token::Token,
37    },
38    identifiers::{ClientId, Venue},
39};
40
41use crate::{
42    cache::BlockchainCache,
43    config::BlockchainAdapterConfig,
44    contracts::erc20::Erc20Contract,
45    events::pool_created::PoolCreated,
46    exchanges::extended::DexExtended,
47    hypersync::client::HyperSyncClient,
48    rpc::{
49        BlockchainRpcClient, BlockchainRpcClientAny,
50        chains::{
51            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
52            polygon::PolygonRpcClient,
53        },
54        http::BlockchainHttpRpcClient,
55        types::BlockchainMessage,
56    },
57};
58
59/// A comprehensive client for interacting with blockchain data from multiple sources.
60///
61/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
62/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
63/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
64///
65/// This client supports two primary data sources:
66/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
67/// 2. HyperSync API for efficient historical data queries.
68#[derive(Debug)]
69pub struct BlockchainDataClient {
70    /// The blockchain being targeted by this client instance.
71    pub chain: SharedChain,
72    /// Local cache for blockchain entities.
73    cache: BlockchainCache,
74    /// Optional WebSocket RPC client for direct blockchain node communication.
75    rpc_client: Option<BlockchainRpcClientAny>,
76    /// Interface for interacting with ERC20 token contracts.
77    tokens: Erc20Contract,
78    /// Client for the HyperSync data indexing service.
79    hypersync_client: HyperSyncClient,
80    /// Channel receiver for messages from the HyperSync client.
81    hypersync_rx: tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>,
82}
83
84impl BlockchainDataClient {
85    /// Creates a new [`BlockchainDataClient`] instance for the specified chain and configuration.
86    ///
87    /// # Panics
88    ///
89    /// Panics if `use_hypersync_for_live_data` is false and `wss_rpc_url` is `None` in the provided config.
90    #[must_use]
91    pub fn new(chain: SharedChain, config: BlockchainAdapterConfig) -> Self {
92        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
93            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
94            Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
95        } else {
96            None
97        };
98        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
99        let hypersync_client = HyperSyncClient::new(chain.clone(), hypersync_tx);
100        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
101            config.http_rpc_url,
102            config.rpc_requests_per_second,
103        ));
104        let erc20_contract = Erc20Contract::new(http_rpc_client);
105        let cache = BlockchainCache::new(chain.clone());
106
107        Self {
108            chain,
109            cache,
110            rpc_client,
111            tokens: erc20_contract,
112            hypersync_client,
113            hypersync_rx,
114        }
115    }
116
117    /// Creates an appropriate blockchain RPC client for the specified blockchain.
118    fn initialize_rpc_client(
119        blockchain: Blockchain,
120        wss_rpc_url: String,
121    ) -> BlockchainRpcClientAny {
122        match blockchain {
123            Blockchain::Ethereum => {
124                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
125            }
126            Blockchain::Polygon => {
127                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
128            }
129            Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
130            Blockchain::Arbitrum => {
131                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
132            }
133            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
134        }
135    }
136
137    /// Initializes the database connection for the blockchain cache.
138    pub async fn initialize_cache_database(
139        &mut self,
140        pg_connect_options: Option<PostgresConnectOptions>,
141    ) {
142        let pg_connect_options = pg_connect_options.unwrap_or_default();
143        log::info!(
144            "Initializing blockchain cache on database '{}'",
145            pg_connect_options.database
146        );
147        self.cache
148            .initialize_database(pg_connect_options.into())
149            .await;
150    }
151
152    /// Establishes connections to the data providers and cache.
153    pub async fn connect(&mut self) -> anyhow::Result<()> {
154        if let Some(ref mut rpc_client) = self.rpc_client {
155            rpc_client.connect().await?;
156        }
157        self.cache.connect().await?;
158        Ok(())
159    }
160
161    /// Gracefully disconnects from all data providers.
162    pub fn disconnect(&mut self) -> anyhow::Result<()> {
163        self.hypersync_client.disconnect();
164        Ok(())
165    }
166
167    /// Synchronizes token and pool data for a specific DEX from the specified block.
168    pub async fn sync_exchange_pools(
169        &mut self,
170        dex_id: &str,
171        from_block: Option<u32>,
172    ) -> anyhow::Result<()> {
173        let from_block = from_block.unwrap_or(0);
174        log::info!("Syncing dex exchange pools for {dex_id} from block {from_block}");
175
176        let dex = if let Some(dex) = self.cache.get_dex(dex_id) {
177            dex.clone()
178        } else {
179            anyhow::bail!("Dex {dex_id} is not registered");
180        };
181
182        let pools = self
183            .hypersync_client
184            .request_pool_created_events(from_block, &dex)
185            .await;
186        for pool in pools {
187            self.process_token(pool.token0.to_string()).await?;
188            self.process_token(pool.token1.to_string()).await?;
189            self.process_pool(&dex, pool).await?;
190        }
191        Ok(())
192    }
193
194    /// Processes a token by address, fetching and caching its metadata if not already cached.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if fetching token info or adding to cache fails.
199    async fn process_token(&mut self, token_address: String) -> anyhow::Result<()> {
200        if self.cache.get_token(&token_address).is_none() {
201            let token_info = self.tokens.fetch_token_info(&token_address).await?;
202            let token = Token::new(
203                self.chain.clone(),
204                token_address,
205                token_info.name,
206                token_info.symbol,
207                token_info.decimals,
208            );
209            log::info!("Saving fetched token {token} in the cache.");
210            self.cache.add_token(token).await?;
211        }
212        Ok(())
213    }
214
215    /// Processes a pool creation event by creating and caching a `Pool` entity.
216    async fn process_pool(&mut self, dex: &Dex, event: PoolCreated) -> anyhow::Result<()> {
217        let pool = Pool::new(
218            self.chain.clone(),
219            dex.clone(),
220            event.pool_address.to_string(),
221            event.block_number,
222            self.cache
223                .get_token(&event.token0.to_string())
224                .cloned()
225                .unwrap(),
226            self.cache
227                .get_token(&event.token1.to_string())
228                .cloned()
229                .unwrap(),
230            event.fee,
231            event.tick_spacing,
232        );
233        self.cache.add_pool(pool).await?;
234        Ok(())
235    }
236
237    /// Registers a decentralized exchange with the client.
238    pub async fn register_exchange(&mut self, dex: DexExtended) -> anyhow::Result<()> {
239        let dex_id = dex.id();
240        log::info!("Registering blockchain exchange {}", &dex_id);
241        self.cache.add_dex(dex_id, dex).await?;
242        Ok(())
243    }
244
245    /// Processes incoming messages from the HyperSync client.
246    pub async fn process_hypersync_message(&mut self) {
247        while let Some(msg) = self.hypersync_rx.recv().await {
248            match msg {
249                BlockchainMessage::Block(block) => {
250                    log::info!("{block}");
251                }
252            }
253        }
254    }
255
256    /// Processes incoming messages from the RPC client.
257    pub async fn process_rpc_message(&mut self) {
258        if let Some(rpc_client) = self.rpc_client.as_mut() {
259            loop {
260                match rpc_client.next_rpc_message().await {
261                    Ok(msg) => match msg {
262                        BlockchainMessage::Block(block) => {
263                            log::info!("{block}");
264                        }
265                    },
266                    Err(e) => {
267                        log::error!("Error processing rpc message: {e}");
268                    }
269                }
270            }
271        }
272    }
273
274    /// Subscribes to new blockchain blocks from the available data source.
275    ///
276    /// # Panics
277    ///
278    /// Panics if using the RPC client and the block subscription request fails.
279    pub async fn subscribe_blocks(&mut self) {
280        if let Some(rpc_client) = self.rpc_client.as_mut() {
281            rpc_client.subscribe_blocks().await.unwrap();
282        } else {
283            self.hypersync_client.subscribe_blocks();
284        }
285    }
286
287    /// Unsubscribes from block events.
288    ///
289    /// # Panics
290    ///
291    /// Panics if using the RPC client and the block unsubscription request fails.
292    pub async fn unsubscribe_blocks(&mut self) {
293        if let Some(rpc_client) = self.rpc_client.as_mut() {
294            rpc_client.unsubscribe_blocks().await.unwrap();
295        } else {
296            self.hypersync_client.unsubscribe_blocks();
297        }
298    }
299}
300
301#[async_trait::async_trait]
302impl DataClient for BlockchainDataClient {
303    fn client_id(&self) -> ClientId {
304        ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
305    }
306
307    fn venue(&self) -> Option<Venue> {
308        // Blockchain data clients don't map to a single venue since they can provide
309        // data for multiple DEXs across the blockchain
310        None
311    }
312
313    fn start(&self) -> anyhow::Result<()> {
314        log::info!("Starting blockchain data client for {}", self.chain.name);
315        Ok(())
316    }
317
318    fn stop(&self) -> anyhow::Result<()> {
319        log::info!("Stopping blockchain data client for {}", self.chain.name);
320        Ok(())
321    }
322
323    fn reset(&self) -> anyhow::Result<()> {
324        log::info!("Resetting blockchain data client for {}", self.chain.name);
325        Ok(())
326    }
327
328    fn dispose(&self) -> anyhow::Result<()> {
329        log::info!("Disposing blockchain data client for {}", self.chain.name);
330        Ok(())
331    }
332
333    async fn connect(&self) -> anyhow::Result<()> {
334        // Note: The current implementation has connect() taking &mut self,
335        // but the trait requires &self. For now, we'll log the intent.
336        log::info!("Connecting blockchain data client for {}", self.chain.name);
337        // TODO: This should call self.connect() but requires refactoring the mutable reference
338        Ok(())
339    }
340
341    async fn disconnect(&self) -> anyhow::Result<()> {
342        // Note: Same issue as connect() - the implementation needs &mut self
343        log::info!(
344            "Disconnecting blockchain data client for {}",
345            self.chain.name
346        );
347        // TODO: This should call self.disconnect() but requires refactoring the mutable reference
348        Ok(())
349    }
350
351    fn is_connected(&self) -> bool {
352        // For now, we'll assume connected if we have either RPC or HyperSync configured
353        self.rpc_client.is_some() || true // HyperSync is always available
354    }
355
356    fn is_disconnected(&self) -> bool {
357        !self.is_connected()
358    }
359
360    // Subscription methods - blockchain clients don't support traditional market data subscriptions
361    // but we implement them as no-ops for trait compliance
362
363    fn subscribe(&mut self, _cmd: &SubscribeCustomData) -> anyhow::Result<()> {
364        log::debug!("Blockchain client doesn't support custom data subscriptions");
365        Ok(())
366    }
367
368    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
369        log::debug!("Blockchain client doesn't support instrument subscriptions");
370        Ok(())
371    }
372
373    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
374        log::debug!("Blockchain client doesn't support instrument subscriptions");
375        Ok(())
376    }
377
378    fn subscribe_instrument_status(
379        &mut self,
380        _cmd: &SubscribeInstrumentStatus,
381    ) -> anyhow::Result<()> {
382        log::debug!("Blockchain client doesn't support instrument status subscriptions");
383        Ok(())
384    }
385
386    fn subscribe_instrument_close(
387        &mut self,
388        _cmd: &SubscribeInstrumentClose,
389    ) -> anyhow::Result<()> {
390        log::debug!("Blockchain client doesn't support instrument close subscriptions");
391        Ok(())
392    }
393
394    fn subscribe_quotes(&mut self, _cmd: &SubscribeQuotes) -> anyhow::Result<()> {
395        log::debug!("Blockchain client doesn't support quote subscriptions");
396        Ok(())
397    }
398
399    fn subscribe_trades(&mut self, _cmd: &SubscribeTrades) -> anyhow::Result<()> {
400        log::debug!("Blockchain client doesn't support trade subscriptions");
401        Ok(())
402    }
403
404    fn subscribe_bars(&mut self, _cmd: &SubscribeBars) -> anyhow::Result<()> {
405        log::debug!("Blockchain client doesn't support bar subscriptions");
406        Ok(())
407    }
408
409    fn subscribe_book_snapshots(&mut self, _cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
410        log::debug!("Blockchain client doesn't support book snapshot subscriptions");
411        Ok(())
412    }
413
414    fn subscribe_book_deltas(&mut self, _cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
415        log::debug!("Blockchain client doesn't support book delta subscriptions");
416        Ok(())
417    }
418
419    fn subscribe_book_depth10(&mut self, _cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
420        log::debug!("Blockchain client doesn't support book depth subscriptions");
421        Ok(())
422    }
423
424    fn subscribe_index_prices(&mut self, _cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
425        log::debug!("Blockchain client doesn't support index price subscriptions");
426        Ok(())
427    }
428
429    fn subscribe_mark_prices(&mut self, _cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
430        log::debug!("Blockchain client doesn't support mark price subscriptions");
431        Ok(())
432    }
433
434    // Unsubscription methods - all no-ops for blockchain client
435
436    fn unsubscribe(&mut self, _cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
437        Ok(())
438    }
439
440    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
441        Ok(())
442    }
443
444    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
445        Ok(())
446    }
447
448    fn unsubscribe_instrument_status(
449        &mut self,
450        _cmd: &UnsubscribeInstrumentStatus,
451    ) -> anyhow::Result<()> {
452        Ok(())
453    }
454
455    fn unsubscribe_instrument_close(
456        &mut self,
457        _cmd: &UnsubscribeInstrumentClose,
458    ) -> anyhow::Result<()> {
459        Ok(())
460    }
461
462    fn unsubscribe_quotes(&mut self, _cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
463        Ok(())
464    }
465
466    fn unsubscribe_trades(&mut self, _cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
467        Ok(())
468    }
469
470    fn unsubscribe_bars(&mut self, _cmd: &UnsubscribeBars) -> anyhow::Result<()> {
471        Ok(())
472    }
473
474    fn unsubscribe_book_snapshots(
475        &mut self,
476        _cmd: &UnsubscribeBookSnapshots,
477    ) -> anyhow::Result<()> {
478        Ok(())
479    }
480
481    fn unsubscribe_book_deltas(&mut self, _cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
482        Ok(())
483    }
484
485    fn unsubscribe_book_depth10(&mut self, _cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
486        Ok(())
487    }
488
489    fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
490        Ok(())
491    }
492
493    fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
494        Ok(())
495    }
496
497    // Request methods - also no-ops for blockchain client since it doesn't provide traditional market data
498
499    fn request_instruments(&self, _request: &RequestInstruments) -> anyhow::Result<()> {
500        log::debug!("Blockchain client doesn't support instrument requests");
501        Ok(())
502    }
503
504    fn request_instrument(&self, _request: &RequestInstrument) -> anyhow::Result<()> {
505        log::debug!("Blockchain client doesn't support instrument requests");
506        Ok(())
507    }
508
509    fn request_quotes(&self, _request: &RequestQuotes) -> anyhow::Result<()> {
510        log::debug!("Blockchain client doesn't support quote requests");
511        Ok(())
512    }
513
514    fn request_trades(&self, _request: &RequestTrades) -> anyhow::Result<()> {
515        log::debug!("Blockchain client doesn't support trade requests");
516        Ok(())
517    }
518
519    fn request_bars(&self, _request: &RequestBars) -> anyhow::Result<()> {
520        log::debug!("Blockchain client doesn't support bar requests");
521        Ok(())
522    }
523
524    fn request_book_snapshot(&self, _request: &RequestBookSnapshot) -> anyhow::Result<()> {
525        log::debug!("Blockchain client doesn't support book snapshot requests");
526        Ok(())
527    }
528}