1use std::{
17 collections::HashMap,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22 time::{Duration, SystemTime},
23};
24
25use chrono::Utc;
26use futures_util::{Stream, StreamExt};
27use nautilus_common::{logging::log_task_stopped, runtime::get_runtime};
28use nautilus_core::{
29 consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
30};
31use nautilus_model::{
32 data::{BarType, Data, OrderBookDeltas_API},
33 identifiers::InstrumentId,
34 instruments::{Instrument, InstrumentAny},
35};
36use nautilus_network::websocket::{Consumer, MessageReader, WebSocketClient, WebSocketConfig};
37use reqwest::header::USER_AGENT;
38use tokio::sync::Mutex;
39use tokio_tungstenite::tungstenite::{Error, Message};
40use ustr::Ustr;
41
42use super::{
43 enums::{CoinbaseIntxWsChannel, WsOperation},
44 error::CoinbaseIntxWsError,
45 messages::{CoinbaseIntxSubscription, CoinbaseIntxWsMessage, PoseiWsMessage},
46 parse::{
47 parse_candle_msg, parse_index_price_msg, parse_mark_price_msg,
48 parse_orderbook_snapshot_msg, parse_orderbook_update_msg, parse_quote_msg,
49 },
50};
51use crate::{
52 common::{
53 consts::COINBASE_INTX_WS_URL, credential::Credential, parse::bar_spec_as_coinbase_channel,
54 },
55 websocket::parse::{parse_instrument_any, parse_trade_msg},
56};
57
58#[derive(Debug, Clone)]
60#[cfg_attr(
61 feature = "python",
62 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
63)]
64pub struct CoinbaseIntxWebSocketClient {
65 url: String,
66 credential: Credential,
67 heartbeat: Option<u64>,
68 inner: Option<Arc<WebSocketClient>>,
69 rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<PoseiWsMessage>>>,
70 signal: Arc<AtomicBool>,
71 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
72 subscriptions: Arc<Mutex<HashMap<CoinbaseIntxWsChannel, Vec<Ustr>>>>,
73}
74
75impl Default for CoinbaseIntxWebSocketClient {
76 fn default() -> Self {
77 Self::new(None, None, None, None, Some(10)).expect("Failed to create client")
78 }
79}
80
81impl CoinbaseIntxWebSocketClient {
82 pub fn new(
88 url: Option<String>,
89 api_key: Option<String>,
90 api_secret: Option<String>,
91 api_passphrase: Option<String>,
92 heartbeat: Option<u64>,
93 ) -> anyhow::Result<Self> {
94 let url = url.unwrap_or(COINBASE_INTX_WS_URL.to_string());
95 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
96 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
97 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
98
99 let credential = Credential::new(api_key, api_secret, api_passphrase);
100 let signal = Arc::new(AtomicBool::new(false));
101 let subscriptions = Arc::new(Mutex::new(HashMap::new()));
102
103 Ok(Self {
104 url,
105 credential,
106 heartbeat,
107 inner: None,
108 rx: None,
109 signal,
110 task_handle: None,
111 subscriptions,
112 })
113 }
114
115 pub fn from_env() -> anyhow::Result<Self> {
122 Self::new(None, None, None, None, None)
123 }
124
125 #[must_use]
127 pub const fn url(&self) -> &str {
128 self.url.as_str()
129 }
130
131 #[must_use]
133 pub fn api_key(&self) -> &str {
134 self.credential.api_key.as_str()
135 }
136
137 #[must_use]
139 pub fn is_active(&self) -> bool {
140 match &self.inner {
141 Some(inner) => inner.is_active(),
142 None => false,
143 }
144 }
145
146 #[must_use]
148 pub fn is_closed(&self) -> bool {
149 match &self.inner {
150 Some(inner) => inner.is_closed(),
151 None => true,
152 }
153 }
154
155 pub async fn connect(&mut self, instruments: Vec<InstrumentAny>) -> anyhow::Result<()> {
161 let client = self.clone();
162 let post_reconnect = Arc::new(move || {
163 let client = client.clone();
164 tokio::spawn(async move { client.resubscribe_all().await });
165 });
166
167 let config = WebSocketConfig {
168 url: self.url.clone(),
169 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
170 #[cfg(feature = "python")]
171 handler: Consumer::Python(None),
172 #[cfg(not(feature = "python"))]
173 handler: {
174 let (consumer, _rx) = Consumer::rust_consumer();
175 consumer
176 },
177 heartbeat: self.heartbeat,
178 heartbeat_msg: None,
179 #[cfg(feature = "python")]
180 ping_handler: None,
181 reconnect_timeout_ms: Some(5_000),
182 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, };
187 let (reader, client) =
188 WebSocketClient::connect_stream(config, vec![], None, Some(post_reconnect)).await?;
189
190 self.inner = Some(Arc::new(client));
191
192 let mut instruments_map: HashMap<Ustr, InstrumentAny> = HashMap::new();
193 for inst in instruments {
194 instruments_map.insert(inst.raw_symbol().inner(), inst);
195 }
196
197 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<PoseiWsMessage>();
198 self.rx = Some(Arc::new(rx));
199 let signal = self.signal.clone();
200
201 let stream_handle = get_runtime().spawn(async move {
202 CoinbaseIntxWsMessageHandler::new(instruments_map, reader, signal, tx)
203 .run()
204 .await;
205 });
206
207 self.task_handle = Some(Arc::new(stream_handle));
208
209 Ok(())
210 }
211
212 pub fn stream(&mut self) -> impl Stream<Item = PoseiWsMessage> + 'static {
220 let rx = self
221 .rx
222 .take()
223 .expect("Data stream receiver already taken or not connected"); let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
225 async_stream::stream! {
226 while let Some(data) = rx.recv().await {
227 yield data;
228 }
229 }
230 }
231
232 pub async fn close(&mut self) -> Result<(), Error> {
238 tracing::debug!("Closing");
239 self.signal.store(true, Ordering::Relaxed);
240
241 match tokio::time::timeout(Duration::from_secs(5), async {
242 if let Some(inner) = &self.inner {
243 inner.disconnect().await;
244 } else {
245 log::error!("Error on close: not connected");
246 }
247 })
248 .await
249 {
250 Ok(()) => {
251 tracing::debug!("Inner disconnected");
252 }
253 Err(_) => {
254 tracing::error!("Timeout waiting for inner client to disconnect");
255 }
256 }
257
258 log::debug!("Closed");
259
260 Ok(())
261 }
262
263 async fn subscribe(
269 &self,
270 channels: Vec<CoinbaseIntxWsChannel>,
271 product_ids: Vec<Ustr>,
272 ) -> Result<(), CoinbaseIntxWsError> {
273 let mut active_subs = self.subscriptions.lock().await;
275 for channel in &channels {
276 active_subs
277 .entry(*channel)
278 .or_insert_with(Vec::new)
279 .extend(product_ids.clone());
280 }
281 tracing::debug!(
282 "Added active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
283 );
284
285 let time = chrono::DateTime::<Utc>::from(SystemTime::now())
286 .timestamp()
287 .to_string();
288 let signature = self.credential.sign_ws(&time);
289 let message = CoinbaseIntxSubscription {
290 op: WsOperation::Subscribe,
291 product_ids: Some(product_ids),
292 channels,
293 time,
294 key: self.credential.api_key,
295 passphrase: self.credential.api_passphrase,
296 signature,
297 };
298
299 let json_txt = serde_json::to_string(&message)
300 .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
301
302 if let Some(inner) = &self.inner {
303 if let Err(err) = inner.send_text(json_txt, None).await {
304 tracing::error!("Error sending message: {err:?}");
305 }
306 } else {
307 return Err(CoinbaseIntxWsError::ClientError(
308 "Cannot send message: not connected".to_string(),
309 ));
310 }
311
312 Ok(())
313 }
314
315 async fn unsubscribe(
317 &self,
318 channels: Vec<CoinbaseIntxWsChannel>,
319 product_ids: Vec<Ustr>,
320 ) -> Result<(), CoinbaseIntxWsError> {
321 let mut active_subs = self.subscriptions.lock().await;
323 for channel in &channels {
324 if let Some(subs) = active_subs.get_mut(channel) {
325 for product_id in &product_ids {
326 subs.retain(|pid| pid != product_id);
327 }
328 if subs.is_empty() {
329 active_subs.remove(channel);
330 }
331 }
332 }
333 tracing::debug!(
334 "Removed active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
335 );
336
337 let time = chrono::DateTime::<Utc>::from(SystemTime::now())
338 .timestamp()
339 .to_string();
340 let signature = self.credential.sign_ws(&time);
341 let message = CoinbaseIntxSubscription {
342 op: WsOperation::Unsubscribe,
343 product_ids: Some(product_ids),
344 channels,
345 time,
346 key: self.credential.api_key,
347 passphrase: self.credential.api_passphrase,
348 signature,
349 };
350
351 let json_txt = serde_json::to_string(&message)
352 .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
353
354 if let Some(inner) = &self.inner {
355 if let Err(err) = inner.send_text(json_txt, None).await {
356 tracing::error!("Error sending message: {err:?}");
357 }
358 } else {
359 return Err(CoinbaseIntxWsError::ClientError(
360 "Cannot send message: not connected".to_string(),
361 ));
362 }
363
364 Ok(())
365 }
366
367 async fn resubscribe_all(&self) {
369 let subs = self.subscriptions.lock().await.clone();
370
371 for (channel, product_ids) in subs {
372 if product_ids.is_empty() {
373 continue;
374 }
375
376 tracing::debug!("Resubscribing: channel={channel}, product_ids={product_ids:?}");
377
378 if let Err(e) = self.subscribe(vec![channel], product_ids).await {
379 tracing::error!("Failed to resubscribe to channel {channel}: {e}");
380 }
381 }
382 }
383
384 pub async fn subscribe_instruments(
391 &self,
392 instrument_ids: Vec<InstrumentId>,
393 ) -> Result<(), CoinbaseIntxWsError> {
394 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
395 self.subscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
396 .await
397 }
398
399 pub async fn subscribe_funding(
406 &self,
407 instrument_ids: Vec<InstrumentId>,
408 ) -> Result<(), CoinbaseIntxWsError> {
409 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
410 self.subscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
411 .await
412 }
413
414 pub async fn subscribe_risk(
421 &self,
422 instrument_ids: Vec<InstrumentId>,
423 ) -> Result<(), CoinbaseIntxWsError> {
424 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
425 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
426 .await
427 }
428
429 pub async fn subscribe_order_book(
436 &self,
437 instrument_ids: Vec<InstrumentId>,
438 ) -> Result<(), CoinbaseIntxWsError> {
439 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
440 self.subscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
441 .await
442 }
443
444 pub async fn subscribe_quotes(
451 &self,
452 instrument_ids: Vec<InstrumentId>,
453 ) -> Result<(), CoinbaseIntxWsError> {
454 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
455 self.subscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
456 .await
457 }
458
459 pub async fn subscribe_trades(
466 &self,
467 instrument_ids: Vec<InstrumentId>,
468 ) -> Result<(), CoinbaseIntxWsError> {
469 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
470 self.subscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
471 .await
472 }
473
474 pub async fn subscribe_mark_prices(
481 &self,
482 instrument_ids: Vec<InstrumentId>,
483 ) -> Result<(), CoinbaseIntxWsError> {
484 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
485 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
486 .await
487 }
488
489 pub async fn subscribe_index_prices(
496 &self,
497 instrument_ids: Vec<InstrumentId>,
498 ) -> Result<(), CoinbaseIntxWsError> {
499 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
500 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
501 .await
502 }
503
504 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
511 let channel = bar_spec_as_coinbase_channel(bar_type.spec())
512 .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
513 let product_ids = vec![bar_type.standard().instrument_id().symbol.inner()];
514 self.subscribe(vec![channel], product_ids).await
515 }
516
517 pub async fn unsubscribe_instruments(
524 &self,
525 instrument_ids: Vec<InstrumentId>,
526 ) -> Result<(), CoinbaseIntxWsError> {
527 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
528 self.unsubscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
529 .await
530 }
531
532 pub async fn unsubscribe_risk(
539 &self,
540 instrument_ids: Vec<InstrumentId>,
541 ) -> Result<(), CoinbaseIntxWsError> {
542 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
543 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
544 .await
545 }
546
547 pub async fn unsubscribe_funding(
554 &self,
555 instrument_ids: Vec<InstrumentId>,
556 ) -> Result<(), CoinbaseIntxWsError> {
557 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
558 self.unsubscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
559 .await
560 }
561
562 pub async fn unsubscribe_order_book(
569 &self,
570 instrument_ids: Vec<InstrumentId>,
571 ) -> Result<(), CoinbaseIntxWsError> {
572 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
573 self.unsubscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
574 .await
575 }
576
577 pub async fn unsubscribe_quotes(
584 &self,
585 instrument_ids: Vec<InstrumentId>,
586 ) -> Result<(), CoinbaseIntxWsError> {
587 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
588 self.unsubscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
589 .await
590 }
591
592 pub async fn unsubscribe_trades(
599 &self,
600 instrument_ids: Vec<InstrumentId>,
601 ) -> Result<(), CoinbaseIntxWsError> {
602 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
603 self.unsubscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
604 .await
605 }
606
607 pub async fn unsubscribe_mark_prices(
614 &self,
615 instrument_ids: Vec<InstrumentId>,
616 ) -> Result<(), CoinbaseIntxWsError> {
617 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
618 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
619 .await
620 }
621
622 pub async fn unsubscribe_index_prices(
629 &self,
630 instrument_ids: Vec<InstrumentId>,
631 ) -> Result<(), CoinbaseIntxWsError> {
632 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
633 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
634 .await
635 }
636
637 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
644 let channel = bar_spec_as_coinbase_channel(bar_type.spec())
645 .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
646 let product_id = bar_type.standard().instrument_id().symbol.inner();
647 self.unsubscribe(vec![channel], vec![product_id]).await
648 }
649}
650
651fn instrument_ids_to_product_ids(instrument_ids: &[InstrumentId]) -> Vec<Ustr> {
652 instrument_ids.iter().map(|x| x.symbol.inner()).collect()
653}
654
655struct CoinbaseIntxFeedHandler {
657 reader: MessageReader,
658 signal: Arc<AtomicBool>,
659}
660
661impl CoinbaseIntxFeedHandler {
662 pub const fn new(reader: MessageReader, signal: Arc<AtomicBool>) -> Self {
664 Self { reader, signal }
665 }
666
667 async fn next(&mut self) -> Option<CoinbaseIntxWsMessage> {
669 let timeout = Duration::from_millis(10);
671
672 loop {
673 if self.signal.load(Ordering::Relaxed) {
674 tracing::debug!("Stop signal received");
675 break;
676 }
677
678 match tokio::time::timeout(timeout, self.reader.next()).await {
679 Ok(Some(msg)) => match msg {
680 Ok(Message::Pong(_)) => {
681 tracing::trace!("Received pong");
682 }
683 Ok(Message::Ping(_)) => {
684 tracing::trace!("Received pong"); }
686 Ok(Message::Text(text)) => {
687 match serde_json::from_str(&text) {
688 Ok(event) => match &event {
689 CoinbaseIntxWsMessage::Reject(msg) => {
690 tracing::error!("{msg:?}");
691 }
692 CoinbaseIntxWsMessage::Confirmation(msg) => {
693 tracing::debug!("{msg:?}");
694 continue;
695 }
696 CoinbaseIntxWsMessage::Instrument(_) => return Some(event),
697 CoinbaseIntxWsMessage::Funding(_) => return Some(event),
698 CoinbaseIntxWsMessage::Risk(_) => return Some(event),
699 CoinbaseIntxWsMessage::BookSnapshot(_) => return Some(event),
700 CoinbaseIntxWsMessage::BookUpdate(_) => return Some(event),
701 CoinbaseIntxWsMessage::Quote(_) => return Some(event),
702 CoinbaseIntxWsMessage::Trade(_) => return Some(event),
703 CoinbaseIntxWsMessage::CandleSnapshot(_) => return Some(event),
704 CoinbaseIntxWsMessage::CandleUpdate(_) => continue, },
706 Err(e) => {
707 tracing::error!("Failed to parse message: {e}: {text}");
708 break;
709 }
710 }
711 }
712 Ok(Message::Binary(msg)) => {
713 tracing::debug!("Raw binary: {msg:?}");
714 }
715 Ok(Message::Close(_)) => {
716 tracing::debug!("Received close message");
717 return None;
718 }
719 Ok(msg) => {
720 tracing::warn!("Unexpected message: {msg:?}");
721 }
722 Err(e) => {
723 tracing::error!("{e}, stopping client");
724 break; }
726 },
727 Ok(None) => {
728 tracing::info!("WebSocket stream closed");
729 break;
730 }
731 Err(_) => {} }
733 }
734
735 log_task_stopped("message-streaming");
736 None
737 }
738}
739
740struct CoinbaseIntxWsMessageHandler {
742 instruments: HashMap<Ustr, InstrumentAny>,
743 handler: CoinbaseIntxFeedHandler,
744 tx: tokio::sync::mpsc::UnboundedSender<PoseiWsMessage>,
745}
746
747impl CoinbaseIntxWsMessageHandler {
748 pub const fn new(
750 instruments: HashMap<Ustr, InstrumentAny>,
751 reader: MessageReader,
752 signal: Arc<AtomicBool>,
753 tx: tokio::sync::mpsc::UnboundedSender<PoseiWsMessage>,
754 ) -> Self {
755 let handler = CoinbaseIntxFeedHandler::new(reader, signal);
756 Self {
757 instruments,
758 handler,
759 tx,
760 }
761 }
762
763 async fn run(&mut self) {
765 while let Some(data) = self.next().await {
766 if let Err(e) = self.tx.send(data) {
767 tracing::error!("Error sending data: {e}");
768 break; }
770 }
771 }
772
773 async fn next(&mut self) -> Option<PoseiWsMessage> {
775 let clock = get_atomic_clock_realtime();
776
777 while let Some(event) = self.handler.next().await {
778 match event {
779 CoinbaseIntxWsMessage::Instrument(msg) => {
780 if let Some(inst) = parse_instrument_any(&msg, clock.get_time_ns()) {
781 self.instruments
783 .insert(inst.raw_symbol().inner(), inst.clone());
784 return Some(PoseiWsMessage::Instrument(inst));
785 }
786 }
787 CoinbaseIntxWsMessage::Funding(msg) => {
788 tracing::warn!("Received {msg:?}"); }
790 CoinbaseIntxWsMessage::BookSnapshot(msg) => {
791 if let Some(inst) = self.instruments.get(&msg.product_id) {
792 match parse_orderbook_snapshot_msg(
793 &msg,
794 inst.id(),
795 inst.price_precision(),
796 inst.size_precision(),
797 clock.get_time_ns(),
798 ) {
799 Ok(deltas) => {
800 let deltas = OrderBookDeltas_API::new(deltas);
801 let data = Data::Deltas(deltas);
802 return Some(PoseiWsMessage::Data(data));
803 }
804 Err(e) => {
805 tracing::error!("Failed to parse orderbook snapshot: {e}");
806 return None;
807 }
808 }
809 }
810 tracing::error!("No instrument found for {}", msg.product_id);
811 return None;
812 }
813 CoinbaseIntxWsMessage::BookUpdate(msg) => {
814 if let Some(inst) = self.instruments.get(&msg.product_id) {
815 match parse_orderbook_update_msg(
816 &msg,
817 inst.id(),
818 inst.price_precision(),
819 inst.size_precision(),
820 clock.get_time_ns(),
821 ) {
822 Ok(deltas) => {
823 let deltas = OrderBookDeltas_API::new(deltas);
824 let data = Data::Deltas(deltas);
825 return Some(PoseiWsMessage::Data(data));
826 }
827 Err(e) => {
828 tracing::error!("Failed to parse orderbook update: {e}");
829 }
830 }
831 } else {
832 tracing::error!("No instrument found for {}", msg.product_id);
833 }
834 }
835 CoinbaseIntxWsMessage::Quote(msg) => {
836 if let Some(inst) = self.instruments.get(&msg.product_id) {
837 match parse_quote_msg(
838 &msg,
839 inst.id(),
840 inst.price_precision(),
841 inst.size_precision(),
842 clock.get_time_ns(),
843 ) {
844 Ok(quote) => return Some(PoseiWsMessage::Data(Data::Quote(quote))),
845 Err(e) => {
846 tracing::error!("Failed to parse quote: {e}");
847 }
848 }
849 } else {
850 tracing::error!("No instrument found for {}", msg.product_id);
851 }
852 }
853 CoinbaseIntxWsMessage::Trade(msg) => {
854 if let Some(inst) = self.instruments.get(&msg.product_id) {
855 match parse_trade_msg(
856 &msg,
857 inst.id(),
858 inst.price_precision(),
859 inst.size_precision(),
860 clock.get_time_ns(),
861 ) {
862 Ok(trade) => return Some(PoseiWsMessage::Data(Data::Trade(trade))),
863 Err(e) => {
864 tracing::error!("Failed to parse trade: {e}");
865 }
866 }
867 } else {
868 tracing::error!("No instrument found for {}", msg.product_id);
869 }
870 }
871 CoinbaseIntxWsMessage::Risk(msg) => {
872 if let Some(inst) = self.instruments.get(&msg.product_id) {
873 let mark_price = match parse_mark_price_msg(
874 &msg,
875 inst.id(),
876 inst.price_precision(),
877 clock.get_time_ns(),
878 ) {
879 Ok(mark_price) => Some(mark_price),
880 Err(e) => {
881 tracing::error!("Failed to parse mark price: {e}");
882 None
883 }
884 };
885
886 let index_price = match parse_index_price_msg(
887 &msg,
888 inst.id(),
889 inst.price_precision(),
890 clock.get_time_ns(),
891 ) {
892 Ok(index_price) => Some(index_price),
893 Err(e) => {
894 tracing::error!("Failed to parse index price: {e}");
895 None
896 }
897 };
898
899 match (mark_price, index_price) {
900 (Some(mark), Some(index)) => {
901 return Some(PoseiWsMessage::MarkAndIndex((mark, index)));
902 }
903 (Some(mark), None) => return Some(PoseiWsMessage::MarkPrice(mark)),
904 (None, Some(index)) => {
905 return Some(PoseiWsMessage::IndexPrice(index));
906 }
907 (None, None) => continue,
908 };
909 }
910 tracing::error!("No instrument found for {}", msg.product_id);
911 }
912 CoinbaseIntxWsMessage::CandleSnapshot(msg) => {
913 if let Some(inst) = self.instruments.get(&msg.product_id) {
914 match parse_candle_msg(
915 &msg,
916 inst.id(),
917 inst.price_precision(),
918 inst.size_precision(),
919 clock.get_time_ns(),
920 ) {
921 Ok(bar) => return Some(PoseiWsMessage::Data(Data::Bar(bar))),
922 Err(e) => {
923 tracing::error!("Failed to parse candle: {e}");
924 }
925 }
926 } else {
927 tracing::error!("No instrument found for {}", msg.product_id);
928 }
929 }
930 _ => {
931 tracing::warn!("Not implemented: {event:?}");
932 }
933 }
934 }
935 None }
937}