1use std::sync::Arc;
17
18use nautilus_common::messages::data::{
19 RequestBars, RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestQuotes,
20 RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
21 SubscribeBookSnapshots, SubscribeCustomData, SubscribeIndexPrices, SubscribeInstrument,
22 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices,
23 SubscribeQuotes, SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas,
24 UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCustomData,
25 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
26 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
27 UnsubscribeTrades,
28};
29use posei_trader::client::DataClient;
30use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
31use nautilus_model::{
32 defi::{
33 amm::Pool,
34 chain::{Blockchain, SharedChain},
35 dex::Dex,
36 token::Token,
37 },
38 identifiers::{ClientId, Venue},
39};
40
41use crate::{
42 cache::BlockchainCache,
43 config::BlockchainAdapterConfig,
44 contracts::erc20::Erc20Contract,
45 events::pool_created::PoolCreated,
46 exchanges::extended::DexExtended,
47 hypersync::client::HyperSyncClient,
48 rpc::{
49 BlockchainRpcClient, BlockchainRpcClientAny,
50 chains::{
51 arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
52 polygon::PolygonRpcClient,
53 },
54 http::BlockchainHttpRpcClient,
55 types::BlockchainMessage,
56 },
57};
58
59#[derive(Debug)]
69pub struct BlockchainDataClient {
70 pub chain: SharedChain,
72 cache: BlockchainCache,
74 rpc_client: Option<BlockchainRpcClientAny>,
76 tokens: Erc20Contract,
78 hypersync_client: HyperSyncClient,
80 hypersync_rx: tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>,
82}
83
84impl BlockchainDataClient {
85 #[must_use]
91 pub fn new(chain: SharedChain, config: BlockchainAdapterConfig) -> Self {
92 let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
93 let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
94 Some(Self::initialize_rpc_client(chain.name, wss_rpc_url))
95 } else {
96 None
97 };
98 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
99 let hypersync_client = HyperSyncClient::new(chain.clone(), hypersync_tx);
100 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
101 config.http_rpc_url,
102 config.rpc_requests_per_second,
103 ));
104 let erc20_contract = Erc20Contract::new(http_rpc_client);
105 let cache = BlockchainCache::new(chain.clone());
106
107 Self {
108 chain,
109 cache,
110 rpc_client,
111 tokens: erc20_contract,
112 hypersync_client,
113 hypersync_rx,
114 }
115 }
116
117 fn initialize_rpc_client(
119 blockchain: Blockchain,
120 wss_rpc_url: String,
121 ) -> BlockchainRpcClientAny {
122 match blockchain {
123 Blockchain::Ethereum => {
124 BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url))
125 }
126 Blockchain::Polygon => {
127 BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url))
128 }
129 Blockchain::Base => BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url)),
130 Blockchain::Arbitrum => {
131 BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url))
132 }
133 _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
134 }
135 }
136
137 pub async fn initialize_cache_database(
139 &mut self,
140 pg_connect_options: Option<PostgresConnectOptions>,
141 ) {
142 let pg_connect_options = pg_connect_options.unwrap_or_default();
143 log::info!(
144 "Initializing blockchain cache on database '{}'",
145 pg_connect_options.database
146 );
147 self.cache
148 .initialize_database(pg_connect_options.into())
149 .await;
150 }
151
152 pub async fn connect(&mut self) -> anyhow::Result<()> {
154 if let Some(ref mut rpc_client) = self.rpc_client {
155 rpc_client.connect().await?;
156 }
157 self.cache.connect().await?;
158 Ok(())
159 }
160
161 pub fn disconnect(&mut self) -> anyhow::Result<()> {
163 self.hypersync_client.disconnect();
164 Ok(())
165 }
166
167 pub async fn sync_exchange_pools(
169 &mut self,
170 dex_id: &str,
171 from_block: Option<u32>,
172 ) -> anyhow::Result<()> {
173 let from_block = from_block.unwrap_or(0);
174 log::info!("Syncing dex exchange pools for {dex_id} from block {from_block}");
175
176 let dex = if let Some(dex) = self.cache.get_dex(dex_id) {
177 dex.clone()
178 } else {
179 anyhow::bail!("Dex {dex_id} is not registered");
180 };
181
182 let pools = self
183 .hypersync_client
184 .request_pool_created_events(from_block, &dex)
185 .await;
186 for pool in pools {
187 self.process_token(pool.token0.to_string()).await?;
188 self.process_token(pool.token1.to_string()).await?;
189 self.process_pool(&dex, pool).await?;
190 }
191 Ok(())
192 }
193
194 async fn process_token(&mut self, token_address: String) -> anyhow::Result<()> {
200 if self.cache.get_token(&token_address).is_none() {
201 let token_info = self.tokens.fetch_token_info(&token_address).await?;
202 let token = Token::new(
203 self.chain.clone(),
204 token_address,
205 token_info.name,
206 token_info.symbol,
207 token_info.decimals,
208 );
209 log::info!("Saving fetched token {token} in the cache.");
210 self.cache.add_token(token).await?;
211 }
212 Ok(())
213 }
214
215 async fn process_pool(&mut self, dex: &Dex, event: PoolCreated) -> anyhow::Result<()> {
217 let pool = Pool::new(
218 self.chain.clone(),
219 dex.clone(),
220 event.pool_address.to_string(),
221 event.block_number,
222 self.cache
223 .get_token(&event.token0.to_string())
224 .cloned()
225 .unwrap(),
226 self.cache
227 .get_token(&event.token1.to_string())
228 .cloned()
229 .unwrap(),
230 event.fee,
231 event.tick_spacing,
232 );
233 self.cache.add_pool(pool).await?;
234 Ok(())
235 }
236
237 pub async fn register_exchange(&mut self, dex: DexExtended) -> anyhow::Result<()> {
239 let dex_id = dex.id();
240 log::info!("Registering blockchain exchange {}", &dex_id);
241 self.cache.add_dex(dex_id, dex).await?;
242 Ok(())
243 }
244
245 pub async fn process_hypersync_message(&mut self) {
247 while let Some(msg) = self.hypersync_rx.recv().await {
248 match msg {
249 BlockchainMessage::Block(block) => {
250 log::info!("{block}");
251 }
252 }
253 }
254 }
255
256 pub async fn process_rpc_message(&mut self) {
258 if let Some(rpc_client) = self.rpc_client.as_mut() {
259 loop {
260 match rpc_client.next_rpc_message().await {
261 Ok(msg) => match msg {
262 BlockchainMessage::Block(block) => {
263 log::info!("{block}");
264 }
265 },
266 Err(e) => {
267 log::error!("Error processing rpc message: {e}");
268 }
269 }
270 }
271 }
272 }
273
274 pub async fn subscribe_blocks(&mut self) {
280 if let Some(rpc_client) = self.rpc_client.as_mut() {
281 rpc_client.subscribe_blocks().await.unwrap();
282 } else {
283 self.hypersync_client.subscribe_blocks();
284 }
285 }
286
287 pub async fn unsubscribe_blocks(&mut self) {
293 if let Some(rpc_client) = self.rpc_client.as_mut() {
294 rpc_client.unsubscribe_blocks().await.unwrap();
295 } else {
296 self.hypersync_client.unsubscribe_blocks();
297 }
298 }
299}
300
301#[async_trait::async_trait]
302impl DataClient for BlockchainDataClient {
303 fn client_id(&self) -> ClientId {
304 ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
305 }
306
307 fn venue(&self) -> Option<Venue> {
308 None
311 }
312
313 fn start(&self) -> anyhow::Result<()> {
314 log::info!("Starting blockchain data client for {}", self.chain.name);
315 Ok(())
316 }
317
318 fn stop(&self) -> anyhow::Result<()> {
319 log::info!("Stopping blockchain data client for {}", self.chain.name);
320 Ok(())
321 }
322
323 fn reset(&self) -> anyhow::Result<()> {
324 log::info!("Resetting blockchain data client for {}", self.chain.name);
325 Ok(())
326 }
327
328 fn dispose(&self) -> anyhow::Result<()> {
329 log::info!("Disposing blockchain data client for {}", self.chain.name);
330 Ok(())
331 }
332
333 async fn connect(&self) -> anyhow::Result<()> {
334 log::info!("Connecting blockchain data client for {}", self.chain.name);
337 Ok(())
339 }
340
341 async fn disconnect(&self) -> anyhow::Result<()> {
342 log::info!(
344 "Disconnecting blockchain data client for {}",
345 self.chain.name
346 );
347 Ok(())
349 }
350
351 fn is_connected(&self) -> bool {
352 self.rpc_client.is_some() || true }
355
356 fn is_disconnected(&self) -> bool {
357 !self.is_connected()
358 }
359
360 fn subscribe(&mut self, _cmd: &SubscribeCustomData) -> anyhow::Result<()> {
364 log::debug!("Blockchain client doesn't support custom data subscriptions");
365 Ok(())
366 }
367
368 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
369 log::debug!("Blockchain client doesn't support instrument subscriptions");
370 Ok(())
371 }
372
373 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
374 log::debug!("Blockchain client doesn't support instrument subscriptions");
375 Ok(())
376 }
377
378 fn subscribe_instrument_status(
379 &mut self,
380 _cmd: &SubscribeInstrumentStatus,
381 ) -> anyhow::Result<()> {
382 log::debug!("Blockchain client doesn't support instrument status subscriptions");
383 Ok(())
384 }
385
386 fn subscribe_instrument_close(
387 &mut self,
388 _cmd: &SubscribeInstrumentClose,
389 ) -> anyhow::Result<()> {
390 log::debug!("Blockchain client doesn't support instrument close subscriptions");
391 Ok(())
392 }
393
394 fn subscribe_quotes(&mut self, _cmd: &SubscribeQuotes) -> anyhow::Result<()> {
395 log::debug!("Blockchain client doesn't support quote subscriptions");
396 Ok(())
397 }
398
399 fn subscribe_trades(&mut self, _cmd: &SubscribeTrades) -> anyhow::Result<()> {
400 log::debug!("Blockchain client doesn't support trade subscriptions");
401 Ok(())
402 }
403
404 fn subscribe_bars(&mut self, _cmd: &SubscribeBars) -> anyhow::Result<()> {
405 log::debug!("Blockchain client doesn't support bar subscriptions");
406 Ok(())
407 }
408
409 fn subscribe_book_snapshots(&mut self, _cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
410 log::debug!("Blockchain client doesn't support book snapshot subscriptions");
411 Ok(())
412 }
413
414 fn subscribe_book_deltas(&mut self, _cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
415 log::debug!("Blockchain client doesn't support book delta subscriptions");
416 Ok(())
417 }
418
419 fn subscribe_book_depth10(&mut self, _cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
420 log::debug!("Blockchain client doesn't support book depth subscriptions");
421 Ok(())
422 }
423
424 fn subscribe_index_prices(&mut self, _cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
425 log::debug!("Blockchain client doesn't support index price subscriptions");
426 Ok(())
427 }
428
429 fn subscribe_mark_prices(&mut self, _cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
430 log::debug!("Blockchain client doesn't support mark price subscriptions");
431 Ok(())
432 }
433
434 fn unsubscribe(&mut self, _cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
437 Ok(())
438 }
439
440 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
441 Ok(())
442 }
443
444 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
445 Ok(())
446 }
447
448 fn unsubscribe_instrument_status(
449 &mut self,
450 _cmd: &UnsubscribeInstrumentStatus,
451 ) -> anyhow::Result<()> {
452 Ok(())
453 }
454
455 fn unsubscribe_instrument_close(
456 &mut self,
457 _cmd: &UnsubscribeInstrumentClose,
458 ) -> anyhow::Result<()> {
459 Ok(())
460 }
461
462 fn unsubscribe_quotes(&mut self, _cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
463 Ok(())
464 }
465
466 fn unsubscribe_trades(&mut self, _cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
467 Ok(())
468 }
469
470 fn unsubscribe_bars(&mut self, _cmd: &UnsubscribeBars) -> anyhow::Result<()> {
471 Ok(())
472 }
473
474 fn unsubscribe_book_snapshots(
475 &mut self,
476 _cmd: &UnsubscribeBookSnapshots,
477 ) -> anyhow::Result<()> {
478 Ok(())
479 }
480
481 fn unsubscribe_book_deltas(&mut self, _cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
482 Ok(())
483 }
484
485 fn unsubscribe_book_depth10(&mut self, _cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
486 Ok(())
487 }
488
489 fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
490 Ok(())
491 }
492
493 fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
494 Ok(())
495 }
496
497 fn request_instruments(&self, _request: &RequestInstruments) -> anyhow::Result<()> {
500 log::debug!("Blockchain client doesn't support instrument requests");
501 Ok(())
502 }
503
504 fn request_instrument(&self, _request: &RequestInstrument) -> anyhow::Result<()> {
505 log::debug!("Blockchain client doesn't support instrument requests");
506 Ok(())
507 }
508
509 fn request_quotes(&self, _request: &RequestQuotes) -> anyhow::Result<()> {
510 log::debug!("Blockchain client doesn't support quote requests");
511 Ok(())
512 }
513
514 fn request_trades(&self, _request: &RequestTrades) -> anyhow::Result<()> {
515 log::debug!("Blockchain client doesn't support trade requests");
516 Ok(())
517 }
518
519 fn request_bars(&self, _request: &RequestBars) -> anyhow::Result<()> {
520 log::debug!("Blockchain client doesn't support bar requests");
521 Ok(())
522 }
523
524 fn request_book_snapshot(&self, _request: &RequestBookSnapshot) -> anyhow::Result<()> {
525 log::debug!("Blockchain client doesn't support book snapshot requests");
526 Ok(())
527 }
528}