nautilus_coinbase_intx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    time::{Duration, SystemTime},
23};
24
25use chrono::Utc;
26use futures_util::{Stream, StreamExt};
27use nautilus_common::{logging::log_task_stopped, runtime::get_runtime};
28use nautilus_core::{
29    consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
30};
31use nautilus_model::{
32    data::{BarType, Data, OrderBookDeltas_API},
33    identifiers::InstrumentId,
34    instruments::{Instrument, InstrumentAny},
35};
36use nautilus_network::websocket::{Consumer, MessageReader, WebSocketClient, WebSocketConfig};
37use reqwest::header::USER_AGENT;
38use tokio::sync::Mutex;
39use tokio_tungstenite::tungstenite::{Error, Message};
40use ustr::Ustr;
41
42use super::{
43    enums::{CoinbaseIntxWsChannel, WsOperation},
44    error::CoinbaseIntxWsError,
45    messages::{CoinbaseIntxSubscription, CoinbaseIntxWsMessage, PoseiWsMessage},
46    parse::{
47        parse_candle_msg, parse_index_price_msg, parse_mark_price_msg,
48        parse_orderbook_snapshot_msg, parse_orderbook_update_msg, parse_quote_msg,
49    },
50};
51use crate::{
52    common::{
53        consts::COINBASE_INTX_WS_URL, credential::Credential, parse::bar_spec_as_coinbase_channel,
54    },
55    websocket::parse::{parse_instrument_any, parse_trade_msg},
56};
57
58/// Provides a WebSocket client for connecting to [Coinbase International](https://www.coinbase.com/en/international-exchange).
59#[derive(Debug, Clone)]
60#[cfg_attr(
61    feature = "python",
62    pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
63)]
64pub struct CoinbaseIntxWebSocketClient {
65    url: String,
66    credential: Credential,
67    heartbeat: Option<u64>,
68    inner: Option<Arc<WebSocketClient>>,
69    rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<PoseiWsMessage>>>,
70    signal: Arc<AtomicBool>,
71    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
72    subscriptions: Arc<Mutex<HashMap<CoinbaseIntxWsChannel, Vec<Ustr>>>>,
73}
74
75impl Default for CoinbaseIntxWebSocketClient {
76    fn default() -> Self {
77        Self::new(None, None, None, None, Some(10)).expect("Failed to create client")
78    }
79}
80
81impl CoinbaseIntxWebSocketClient {
82    /// Creates a new [`CoinbaseIntxWebSocketClient`] instance.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if required environment variables are missing or invalid.
87    pub fn new(
88        url: Option<String>,
89        api_key: Option<String>,
90        api_secret: Option<String>,
91        api_passphrase: Option<String>,
92        heartbeat: Option<u64>,
93    ) -> anyhow::Result<Self> {
94        let url = url.unwrap_or(COINBASE_INTX_WS_URL.to_string());
95        let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
96        let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
97        let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
98
99        let credential = Credential::new(api_key, api_secret, api_passphrase);
100        let signal = Arc::new(AtomicBool::new(false));
101        let subscriptions = Arc::new(Mutex::new(HashMap::new()));
102
103        Ok(Self {
104            url,
105            credential,
106            heartbeat,
107            inner: None,
108            rx: None,
109            signal,
110            task_handle: None,
111            subscriptions,
112        })
113    }
114
115    /// Creates a new authenticated [`CoinbaseIntxWebSocketClient`] using environment variables and
116    /// the default Coinbase International production websocket url.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if required environment variables are missing or invalid.
121    pub fn from_env() -> anyhow::Result<Self> {
122        Self::new(None, None, None, None, None)
123    }
124
125    /// Returns the websocket url being used by the client.
126    #[must_use]
127    pub const fn url(&self) -> &str {
128        self.url.as_str()
129    }
130
131    /// Returns the public API key being used by the client.
132    #[must_use]
133    pub fn api_key(&self) -> &str {
134        self.credential.api_key.as_str()
135    }
136
137    /// Returns a value indicating whether the client is active.
138    #[must_use]
139    pub fn is_active(&self) -> bool {
140        match &self.inner {
141            Some(inner) => inner.is_active(),
142            None => false,
143        }
144    }
145
146    /// Returns a value indicating whether the client is closed.
147    #[must_use]
148    pub fn is_closed(&self) -> bool {
149        match &self.inner {
150            Some(inner) => inner.is_closed(),
151            None => true,
152        }
153    }
154
155    /// Connects the client to the server and caches the given instruments.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the WebSocket connection or initial subscription fails.
160    pub async fn connect(&mut self, instruments: Vec<InstrumentAny>) -> anyhow::Result<()> {
161        let client = self.clone();
162        let post_reconnect = Arc::new(move || {
163            let client = client.clone();
164            tokio::spawn(async move { client.resubscribe_all().await });
165        });
166
167        let config = WebSocketConfig {
168            url: self.url.clone(),
169            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
170            #[cfg(feature = "python")]
171            handler: Consumer::Python(None),
172            #[cfg(not(feature = "python"))]
173            handler: {
174                let (consumer, _rx) = Consumer::rust_consumer();
175                consumer
176            },
177            heartbeat: self.heartbeat,
178            heartbeat_msg: None,
179            #[cfg(feature = "python")]
180            ping_handler: None,
181            reconnect_timeout_ms: Some(5_000),
182            reconnect_delay_initial_ms: None, // Use default
183            reconnect_delay_max_ms: None,     // Use default
184            reconnect_backoff_factor: None,   // Use default
185            reconnect_jitter_ms: None,        // Use default
186        };
187        let (reader, client) =
188            WebSocketClient::connect_stream(config, vec![], None, Some(post_reconnect)).await?;
189
190        self.inner = Some(Arc::new(client));
191
192        let mut instruments_map: HashMap<Ustr, InstrumentAny> = HashMap::new();
193        for inst in instruments {
194            instruments_map.insert(inst.raw_symbol().inner(), inst);
195        }
196
197        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<PoseiWsMessage>();
198        self.rx = Some(Arc::new(rx));
199        let signal = self.signal.clone();
200
201        let stream_handle = get_runtime().spawn(async move {
202            CoinbaseIntxWsMessageHandler::new(instruments_map, reader, signal, tx)
203                .run()
204                .await;
205        });
206
207        self.task_handle = Some(Arc::new(stream_handle));
208
209        Ok(())
210    }
211
212    /// Provides the internal data stream as a channel-based stream.
213    ///
214    /// # Panics
215    ///
216    /// This function panics if:
217    /// - The websocket is not connected.
218    /// - If `stream_data` has already been called somewhere else (stream receiver is then taken).
219    pub fn stream(&mut self) -> impl Stream<Item = PoseiWsMessage> + 'static {
220        let rx = self
221            .rx
222            .take()
223            .expect("Data stream receiver already taken or not connected"); // Design-time error
224        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
225        async_stream::stream! {
226            while let Some(data) = rx.recv().await {
227                yield data;
228            }
229        }
230    }
231
232    /// Closes the client.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if the WebSocket fails to close properly.
237    pub async fn close(&mut self) -> Result<(), Error> {
238        tracing::debug!("Closing");
239        self.signal.store(true, Ordering::Relaxed);
240
241        match tokio::time::timeout(Duration::from_secs(5), async {
242            if let Some(inner) = &self.inner {
243                inner.disconnect().await;
244            } else {
245                log::error!("Error on close: not connected");
246            }
247        })
248        .await
249        {
250            Ok(()) => {
251                tracing::debug!("Inner disconnected");
252            }
253            Err(_) => {
254                tracing::error!("Timeout waiting for inner client to disconnect");
255            }
256        }
257
258        log::debug!("Closed");
259
260        Ok(())
261    }
262
263    /// Subscribes to the given channels and product IDs.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the subscription message cannot be sent.
268    async fn subscribe(
269        &self,
270        channels: Vec<CoinbaseIntxWsChannel>,
271        product_ids: Vec<Ustr>,
272    ) -> Result<(), CoinbaseIntxWsError> {
273        // Update active subscriptions
274        let mut active_subs = self.subscriptions.lock().await;
275        for channel in &channels {
276            active_subs
277                .entry(*channel)
278                .or_insert_with(Vec::new)
279                .extend(product_ids.clone());
280        }
281        tracing::debug!(
282            "Added active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
283        );
284
285        let time = chrono::DateTime::<Utc>::from(SystemTime::now())
286            .timestamp()
287            .to_string();
288        let signature = self.credential.sign_ws(&time);
289        let message = CoinbaseIntxSubscription {
290            op: WsOperation::Subscribe,
291            product_ids: Some(product_ids),
292            channels,
293            time,
294            key: self.credential.api_key,
295            passphrase: self.credential.api_passphrase,
296            signature,
297        };
298
299        let json_txt = serde_json::to_string(&message)
300            .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
301
302        if let Some(inner) = &self.inner {
303            if let Err(err) = inner.send_text(json_txt, None).await {
304                tracing::error!("Error sending message: {err:?}");
305            }
306        } else {
307            return Err(CoinbaseIntxWsError::ClientError(
308                "Cannot send message: not connected".to_string(),
309            ));
310        }
311
312        Ok(())
313    }
314
315    /// Unsubscribes from the given channels and product IDs.
316    async fn unsubscribe(
317        &self,
318        channels: Vec<CoinbaseIntxWsChannel>,
319        product_ids: Vec<Ustr>,
320    ) -> Result<(), CoinbaseIntxWsError> {
321        // Update active subscriptions
322        let mut active_subs = self.subscriptions.lock().await;
323        for channel in &channels {
324            if let Some(subs) = active_subs.get_mut(channel) {
325                for product_id in &product_ids {
326                    subs.retain(|pid| pid != product_id);
327                }
328                if subs.is_empty() {
329                    active_subs.remove(channel);
330                }
331            }
332        }
333        tracing::debug!(
334            "Removed active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
335        );
336
337        let time = chrono::DateTime::<Utc>::from(SystemTime::now())
338            .timestamp()
339            .to_string();
340        let signature = self.credential.sign_ws(&time);
341        let message = CoinbaseIntxSubscription {
342            op: WsOperation::Unsubscribe,
343            product_ids: Some(product_ids),
344            channels,
345            time,
346            key: self.credential.api_key,
347            passphrase: self.credential.api_passphrase,
348            signature,
349        };
350
351        let json_txt = serde_json::to_string(&message)
352            .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
353
354        if let Some(inner) = &self.inner {
355            if let Err(err) = inner.send_text(json_txt, None).await {
356                tracing::error!("Error sending message: {err:?}");
357            }
358        } else {
359            return Err(CoinbaseIntxWsError::ClientError(
360                "Cannot send message: not connected".to_string(),
361            ));
362        }
363
364        Ok(())
365    }
366
367    /// Resubscribes for all active subscriptions.
368    async fn resubscribe_all(&self) {
369        let subs = self.subscriptions.lock().await.clone();
370
371        for (channel, product_ids) in subs {
372            if product_ids.is_empty() {
373                continue;
374            }
375
376            tracing::debug!("Resubscribing: channel={channel}, product_ids={product_ids:?}");
377
378            if let Err(e) = self.subscribe(vec![channel], product_ids).await {
379                tracing::error!("Failed to resubscribe to channel {channel}: {e}");
380            }
381        }
382    }
383
384    /// Subscribes to instrument definition updates for the given instrument IDs.
385    /// Subscribes to instrument updates for the specified instruments.
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the subscription fails.
390    pub async fn subscribe_instruments(
391        &self,
392        instrument_ids: Vec<InstrumentId>,
393    ) -> Result<(), CoinbaseIntxWsError> {
394        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
395        self.subscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
396            .await
397    }
398
399    /// Subscribes to funding message streams for the given instrument IDs.
400    /// Subscribes to funding rate updates for the specified instruments.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the subscription fails.
405    pub async fn subscribe_funding(
406        &self,
407        instrument_ids: Vec<InstrumentId>,
408    ) -> Result<(), CoinbaseIntxWsError> {
409        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
410        self.subscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
411            .await
412    }
413
414    /// Subscribes to risk message streams for the given instrument IDs.
415    /// Subscribes to risk updates for the specified instruments.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the subscription fails.
420    pub async fn subscribe_risk(
421        &self,
422        instrument_ids: Vec<InstrumentId>,
423    ) -> Result<(), CoinbaseIntxWsError> {
424        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
425        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
426            .await
427    }
428
429    /// Subscribes to order book (level 2) streams for the given instrument IDs.
430    /// Subscribes to order book snapshots and updates for the specified instruments.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if the subscription fails.
435    pub async fn subscribe_order_book(
436        &self,
437        instrument_ids: Vec<InstrumentId>,
438    ) -> Result<(), CoinbaseIntxWsError> {
439        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
440        self.subscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
441            .await
442    }
443
444    /// Subscribes to quote (level 1) streams for the given instrument IDs.
445    /// Subscribes to top-of-book quote updates for the specified instruments.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the subscription fails.
450    pub async fn subscribe_quotes(
451        &self,
452        instrument_ids: Vec<InstrumentId>,
453    ) -> Result<(), CoinbaseIntxWsError> {
454        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
455        self.subscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
456            .await
457    }
458
459    /// Subscribes to trade (match) streams for the given instrument IDs.
460    /// Subscribes to trade updates for the specified instruments.
461    ///
462    /// # Errors
463    ///
464    /// Returns an error if the subscription fails.
465    pub async fn subscribe_trades(
466        &self,
467        instrument_ids: Vec<InstrumentId>,
468    ) -> Result<(), CoinbaseIntxWsError> {
469        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
470        self.subscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
471            .await
472    }
473
474    /// Subscribes to risk streams (for mark prices) for the given instrument IDs.
475    /// Subscribes to mark price updates for the specified instruments.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the subscription fails.
480    pub async fn subscribe_mark_prices(
481        &self,
482        instrument_ids: Vec<InstrumentId>,
483    ) -> Result<(), CoinbaseIntxWsError> {
484        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
485        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
486            .await
487    }
488
489    /// Subscribes to risk streams (for index prices) for the given instrument IDs.
490    /// Subscribes to index price updates for the specified instruments.
491    ///
492    /// # Errors
493    ///
494    /// Returns an error if the subscription fails.
495    pub async fn subscribe_index_prices(
496        &self,
497        instrument_ids: Vec<InstrumentId>,
498    ) -> Result<(), CoinbaseIntxWsError> {
499        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
500        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
501            .await
502    }
503
504    /// Subscribes to bar (candle) streams for the given instrument IDs.
505    /// Subscribes to candlestick bar updates for the specified bar type.
506    ///
507    /// # Errors
508    ///
509    /// Returns an error if the subscription fails.
510    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
511        let channel = bar_spec_as_coinbase_channel(bar_type.spec())
512            .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
513        let product_ids = vec![bar_type.standard().instrument_id().symbol.inner()];
514        self.subscribe(vec![channel], product_ids).await
515    }
516
517    /// Unsubscribes from instrument definition streams for the given instrument IDs.
518    /// Unsubscribes from instrument updates for the specified instruments.
519    ///
520    /// # Errors
521    ///
522    /// Returns an error if the unsubscription fails.
523    pub async fn unsubscribe_instruments(
524        &self,
525        instrument_ids: Vec<InstrumentId>,
526    ) -> Result<(), CoinbaseIntxWsError> {
527        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
528        self.unsubscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
529            .await
530    }
531
532    /// Unsubscribes from risk message streams for the given instrument IDs.
533    /// Unsubscribes from risk updates for the specified instruments.
534    ///
535    /// # Errors
536    ///
537    /// Returns an error if the unsubscription fails.
538    pub async fn unsubscribe_risk(
539        &self,
540        instrument_ids: Vec<InstrumentId>,
541    ) -> Result<(), CoinbaseIntxWsError> {
542        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
543        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
544            .await
545    }
546
547    /// Unsubscribes from funding message streams for the given instrument IDs.
548    /// Unsubscribes from funding updates for the specified instruments.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if the unsubscription fails.
553    pub async fn unsubscribe_funding(
554        &self,
555        instrument_ids: Vec<InstrumentId>,
556    ) -> Result<(), CoinbaseIntxWsError> {
557        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
558        self.unsubscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
559            .await
560    }
561
562    /// Unsubscribes from order book (level 2) streams for the given instrument IDs.
563    /// Unsubscribes from order book updates for the specified instruments.
564    ///
565    /// # Errors
566    ///
567    /// Returns an error if the unsubscription fails.
568    pub async fn unsubscribe_order_book(
569        &self,
570        instrument_ids: Vec<InstrumentId>,
571    ) -> Result<(), CoinbaseIntxWsError> {
572        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
573        self.unsubscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
574            .await
575    }
576
577    /// Unsubscribes from quote (level 1) streams for the given instrument IDs.
578    /// Unsubscribes from quote updates for the specified instruments.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if the unsubscription fails.
583    pub async fn unsubscribe_quotes(
584        &self,
585        instrument_ids: Vec<InstrumentId>,
586    ) -> Result<(), CoinbaseIntxWsError> {
587        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
588        self.unsubscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
589            .await
590    }
591
592    /// Unsubscribes from trade (match) streams for the given instrument IDs.
593    /// Unsubscribes from trade updates for the specified instruments.
594    ///
595    /// # Errors
596    ///
597    /// Returns an error if the unsubscription fails.
598    pub async fn unsubscribe_trades(
599        &self,
600        instrument_ids: Vec<InstrumentId>,
601    ) -> Result<(), CoinbaseIntxWsError> {
602        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
603        self.unsubscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
604            .await
605    }
606
607    /// Unsubscribes from risk streams (for mark prices) for the given instrument IDs.
608    /// Unsubscribes from mark price updates for the specified instruments.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the unsubscription fails.
613    pub async fn unsubscribe_mark_prices(
614        &self,
615        instrument_ids: Vec<InstrumentId>,
616    ) -> Result<(), CoinbaseIntxWsError> {
617        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
618        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
619            .await
620    }
621
622    /// Unsubscribes from risk streams (for index prices) for the given instrument IDs.
623    /// Unsubscribes from index price updates for the specified instruments.
624    ///
625    /// # Errors
626    ///
627    /// Returns an error if the unsubscription fails.
628    pub async fn unsubscribe_index_prices(
629        &self,
630        instrument_ids: Vec<InstrumentId>,
631    ) -> Result<(), CoinbaseIntxWsError> {
632        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
633        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
634            .await
635    }
636
637    /// Unsubscribes from bar (candle) streams for the given instrument IDs.
638    /// Unsubscribes from bar updates for the specified bar type.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if the unsubscription fails.
643    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
644        let channel = bar_spec_as_coinbase_channel(bar_type.spec())
645            .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
646        let product_id = bar_type.standard().instrument_id().symbol.inner();
647        self.unsubscribe(vec![channel], vec![product_id]).await
648    }
649}
650
651fn instrument_ids_to_product_ids(instrument_ids: &[InstrumentId]) -> Vec<Ustr> {
652    instrument_ids.iter().map(|x| x.symbol.inner()).collect()
653}
654
655/// Provides a raw message handler for Coinbase International WebSocket feed.
656struct CoinbaseIntxFeedHandler {
657    reader: MessageReader,
658    signal: Arc<AtomicBool>,
659}
660
661impl CoinbaseIntxFeedHandler {
662    /// Creates a new [`CoinbaseIntxFeedHandler`] instance.
663    pub const fn new(reader: MessageReader, signal: Arc<AtomicBool>) -> Self {
664        Self { reader, signal }
665    }
666
667    /// Gets the next message from the WebSocket message stream.
668    async fn next(&mut self) -> Option<CoinbaseIntxWsMessage> {
669        // Timeout awaiting the next message before checking signal
670        let timeout = Duration::from_millis(10);
671
672        loop {
673            if self.signal.load(Ordering::Relaxed) {
674                tracing::debug!("Stop signal received");
675                break;
676            }
677
678            match tokio::time::timeout(timeout, self.reader.next()).await {
679                Ok(Some(msg)) => match msg {
680                    Ok(Message::Pong(_)) => {
681                        tracing::trace!("Received pong");
682                    }
683                    Ok(Message::Ping(_)) => {
684                        tracing::trace!("Received pong"); // Coinbase send ping frames as pongs
685                    }
686                    Ok(Message::Text(text)) => {
687                        match serde_json::from_str(&text) {
688                            Ok(event) => match &event {
689                                CoinbaseIntxWsMessage::Reject(msg) => {
690                                    tracing::error!("{msg:?}");
691                                }
692                                CoinbaseIntxWsMessage::Confirmation(msg) => {
693                                    tracing::debug!("{msg:?}");
694                                    continue;
695                                }
696                                CoinbaseIntxWsMessage::Instrument(_) => return Some(event),
697                                CoinbaseIntxWsMessage::Funding(_) => return Some(event),
698                                CoinbaseIntxWsMessage::Risk(_) => return Some(event),
699                                CoinbaseIntxWsMessage::BookSnapshot(_) => return Some(event),
700                                CoinbaseIntxWsMessage::BookUpdate(_) => return Some(event),
701                                CoinbaseIntxWsMessage::Quote(_) => return Some(event),
702                                CoinbaseIntxWsMessage::Trade(_) => return Some(event),
703                                CoinbaseIntxWsMessage::CandleSnapshot(_) => return Some(event),
704                                CoinbaseIntxWsMessage::CandleUpdate(_) => continue, // Ignore
705                            },
706                            Err(e) => {
707                                tracing::error!("Failed to parse message: {e}: {text}");
708                                break;
709                            }
710                        }
711                    }
712                    Ok(Message::Binary(msg)) => {
713                        tracing::debug!("Raw binary: {msg:?}");
714                    }
715                    Ok(Message::Close(_)) => {
716                        tracing::debug!("Received close message");
717                        return None;
718                    }
719                    Ok(msg) => {
720                        tracing::warn!("Unexpected message: {msg:?}");
721                    }
722                    Err(e) => {
723                        tracing::error!("{e}, stopping client");
724                        break; // Break as indicates a bug in the code
725                    }
726                },
727                Ok(None) => {
728                    tracing::info!("WebSocket stream closed");
729                    break;
730                }
731                Err(_) => {} // Timeout occurred awaiting a message, continue loop to check signal
732            }
733        }
734
735        log_task_stopped("message-streaming");
736        None
737    }
738}
739
740/// Provides a Posei parser for the Coinbase International WebSocket feed.
741struct CoinbaseIntxWsMessageHandler {
742    instruments: HashMap<Ustr, InstrumentAny>,
743    handler: CoinbaseIntxFeedHandler,
744    tx: tokio::sync::mpsc::UnboundedSender<PoseiWsMessage>,
745}
746
747impl CoinbaseIntxWsMessageHandler {
748    /// Creates a new [`CoinbaseIntxWsMessageHandler`] instance.
749    pub const fn new(
750        instruments: HashMap<Ustr, InstrumentAny>,
751        reader: MessageReader,
752        signal: Arc<AtomicBool>,
753        tx: tokio::sync::mpsc::UnboundedSender<PoseiWsMessage>,
754    ) -> Self {
755        let handler = CoinbaseIntxFeedHandler::new(reader, signal);
756        Self {
757            instruments,
758            handler,
759            tx,
760        }
761    }
762
763    /// Runs the WebSocket message feed.
764    async fn run(&mut self) {
765        while let Some(data) = self.next().await {
766            if let Err(e) = self.tx.send(data) {
767                tracing::error!("Error sending data: {e}");
768                break; // Stop processing on channel error
769            }
770        }
771    }
772
773    /// Gets the next message from the WebSocket message handler.
774    async fn next(&mut self) -> Option<PoseiWsMessage> {
775        let clock = get_atomic_clock_realtime();
776
777        while let Some(event) = self.handler.next().await {
778            match event {
779                CoinbaseIntxWsMessage::Instrument(msg) => {
780                    if let Some(inst) = parse_instrument_any(&msg, clock.get_time_ns()) {
781                        // Update instruments map
782                        self.instruments
783                            .insert(inst.raw_symbol().inner(), inst.clone());
784                        return Some(PoseiWsMessage::Instrument(inst));
785                    }
786                }
787                CoinbaseIntxWsMessage::Funding(msg) => {
788                    tracing::warn!("Received {msg:?}"); // TODO: Implement
789                }
790                CoinbaseIntxWsMessage::BookSnapshot(msg) => {
791                    if let Some(inst) = self.instruments.get(&msg.product_id) {
792                        match parse_orderbook_snapshot_msg(
793                            &msg,
794                            inst.id(),
795                            inst.price_precision(),
796                            inst.size_precision(),
797                            clock.get_time_ns(),
798                        ) {
799                            Ok(deltas) => {
800                                let deltas = OrderBookDeltas_API::new(deltas);
801                                let data = Data::Deltas(deltas);
802                                return Some(PoseiWsMessage::Data(data));
803                            }
804                            Err(e) => {
805                                tracing::error!("Failed to parse orderbook snapshot: {e}");
806                                return None;
807                            }
808                        }
809                    }
810                    tracing::error!("No instrument found for {}", msg.product_id);
811                    return None;
812                }
813                CoinbaseIntxWsMessage::BookUpdate(msg) => {
814                    if let Some(inst) = self.instruments.get(&msg.product_id) {
815                        match parse_orderbook_update_msg(
816                            &msg,
817                            inst.id(),
818                            inst.price_precision(),
819                            inst.size_precision(),
820                            clock.get_time_ns(),
821                        ) {
822                            Ok(deltas) => {
823                                let deltas = OrderBookDeltas_API::new(deltas);
824                                let data = Data::Deltas(deltas);
825                                return Some(PoseiWsMessage::Data(data));
826                            }
827                            Err(e) => {
828                                tracing::error!("Failed to parse orderbook update: {e}");
829                            }
830                        }
831                    } else {
832                        tracing::error!("No instrument found for {}", msg.product_id);
833                    }
834                }
835                CoinbaseIntxWsMessage::Quote(msg) => {
836                    if let Some(inst) = self.instruments.get(&msg.product_id) {
837                        match parse_quote_msg(
838                            &msg,
839                            inst.id(),
840                            inst.price_precision(),
841                            inst.size_precision(),
842                            clock.get_time_ns(),
843                        ) {
844                            Ok(quote) => return Some(PoseiWsMessage::Data(Data::Quote(quote))),
845                            Err(e) => {
846                                tracing::error!("Failed to parse quote: {e}");
847                            }
848                        }
849                    } else {
850                        tracing::error!("No instrument found for {}", msg.product_id);
851                    }
852                }
853                CoinbaseIntxWsMessage::Trade(msg) => {
854                    if let Some(inst) = self.instruments.get(&msg.product_id) {
855                        match parse_trade_msg(
856                            &msg,
857                            inst.id(),
858                            inst.price_precision(),
859                            inst.size_precision(),
860                            clock.get_time_ns(),
861                        ) {
862                            Ok(trade) => return Some(PoseiWsMessage::Data(Data::Trade(trade))),
863                            Err(e) => {
864                                tracing::error!("Failed to parse trade: {e}");
865                            }
866                        }
867                    } else {
868                        tracing::error!("No instrument found for {}", msg.product_id);
869                    }
870                }
871                CoinbaseIntxWsMessage::Risk(msg) => {
872                    if let Some(inst) = self.instruments.get(&msg.product_id) {
873                        let mark_price = match parse_mark_price_msg(
874                            &msg,
875                            inst.id(),
876                            inst.price_precision(),
877                            clock.get_time_ns(),
878                        ) {
879                            Ok(mark_price) => Some(mark_price),
880                            Err(e) => {
881                                tracing::error!("Failed to parse mark price: {e}");
882                                None
883                            }
884                        };
885
886                        let index_price = match parse_index_price_msg(
887                            &msg,
888                            inst.id(),
889                            inst.price_precision(),
890                            clock.get_time_ns(),
891                        ) {
892                            Ok(index_price) => Some(index_price),
893                            Err(e) => {
894                                tracing::error!("Failed to parse index price: {e}");
895                                None
896                            }
897                        };
898
899                        match (mark_price, index_price) {
900                            (Some(mark), Some(index)) => {
901                                return Some(PoseiWsMessage::MarkAndIndex((mark, index)));
902                            }
903                            (Some(mark), None) => return Some(PoseiWsMessage::MarkPrice(mark)),
904                            (None, Some(index)) => {
905                                return Some(PoseiWsMessage::IndexPrice(index));
906                            }
907                            (None, None) => continue,
908                        };
909                    }
910                    tracing::error!("No instrument found for {}", msg.product_id);
911                }
912                CoinbaseIntxWsMessage::CandleSnapshot(msg) => {
913                    if let Some(inst) = self.instruments.get(&msg.product_id) {
914                        match parse_candle_msg(
915                            &msg,
916                            inst.id(),
917                            inst.price_precision(),
918                            inst.size_precision(),
919                            clock.get_time_ns(),
920                        ) {
921                            Ok(bar) => return Some(PoseiWsMessage::Data(Data::Bar(bar))),
922                            Err(e) => {
923                                tracing::error!("Failed to parse candle: {e}");
924                            }
925                        }
926                    } else {
927                        tracing::error!("No instrument found for {}", msg.product_id);
928                    }
929                }
930                _ => {
931                    tracing::warn!("Not implemented: {event:?}");
932                }
933            }
934        }
935        None // Connection closed
936    }
937}