nautilus_coinbase_intx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use futures_util::StreamExt;
17use nautilus_core::python::{IntoPyObjectPoseiExt, to_pyvalue_err};
18use nautilus_model::{
19    data::BarType,
20    identifiers::InstrumentId,
21    python::{
22        data::data_to_pycapsule,
23        events::order::order_event_to_pyobject,
24        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
25    },
26};
27use pyo3::{exceptions::PyRuntimeError, prelude::*};
28use pyo3_async_runtimes::tokio::get_runtime;
29
30use crate::websocket::{CoinbaseIntxWebSocketClient, messages::PoseiWsMessage};
31
32#[pymethods]
33impl CoinbaseIntxWebSocketClient {
34    #[new]
35    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
36    fn py_new(
37        url: Option<String>,
38        api_key: Option<String>,
39        api_secret: Option<String>,
40        api_passphrase: Option<String>,
41        heartbeat: Option<u64>,
42    ) -> PyResult<Self> {
43        Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
44    }
45
46    #[getter]
47    #[pyo3(name = "url")]
48    #[must_use]
49    pub const fn py_url(&self) -> &str {
50        self.url()
51    }
52
53    #[getter]
54    #[pyo3(name = "api_key")]
55    #[must_use]
56    pub fn py_api_key(&self) -> &str {
57        self.api_key()
58    }
59
60    #[pyo3(name = "is_active")]
61    fn py_is_active(&mut self) -> bool {
62        self.is_active()
63    }
64
65    #[pyo3(name = "is_closed")]
66    fn py_is_closed(&mut self) -> bool {
67        self.is_closed()
68    }
69
70    #[pyo3(name = "connect")]
71    fn py_connect<'py>(
72        &mut self,
73        py: Python<'py>,
74        instruments: Vec<PyObject>,
75        callback: PyObject,
76    ) -> PyResult<Bound<'py, PyAny>> {
77        let mut instruments_any = Vec::new();
78        for inst in instruments {
79            let inst_any = pyobject_to_instrument_any(py, inst)?;
80            instruments_any.push(inst_any);
81        }
82
83        get_runtime().block_on(async {
84            self.connect(instruments_any)
85                .await
86                .map_err(|e| PyRuntimeError::new_err(e.to_string()))
87        })?;
88
89        let stream = self.stream();
90
91        pyo3_async_runtimes::tokio::future_into_py(py, async move {
92            tokio::pin!(stream);
93
94            while let Some(msg) = stream.next().await {
95                match msg {
96                    PoseiWsMessage::Instrument(inst) => Python::with_gil(|py| {
97                        let py_obj = instrument_any_to_pyobject(py, inst)
98                            .expect("Failed to create instrument");
99                        call_python(py, &callback, py_obj);
100                    }),
101                    PoseiWsMessage::Data(data) => Python::with_gil(|py| {
102                        let py_obj = data_to_pycapsule(py, data);
103                        call_python(py, &callback, py_obj);
104                    }),
105                    PoseiWsMessage::DataVec(data_vec) => Python::with_gil(|py| {
106                        for data in data_vec {
107                            let py_obj = data_to_pycapsule(py, data);
108                            call_python(py, &callback, py_obj);
109                        }
110                    }),
111                    PoseiWsMessage::Deltas(deltas) => Python::with_gil(|py| {
112                        call_python(py, &callback, deltas.into_py_any_unwrap(py));
113                    }),
114                    PoseiWsMessage::MarkPrice(mark_price) => Python::with_gil(|py| {
115                        call_python(py, &callback, mark_price.into_py_any_unwrap(py));
116                    }),
117                    PoseiWsMessage::IndexPrice(index_price) => Python::with_gil(|py| {
118                        call_python(py, &callback, index_price.into_py_any_unwrap(py));
119                    }),
120                    PoseiWsMessage::MarkAndIndex((mark_price, index_price)) => {
121                        Python::with_gil(|py| {
122                            call_python(py, &callback, mark_price.into_py_any_unwrap(py));
123                            call_python(py, &callback, index_price.into_py_any_unwrap(py));
124                        });
125                    }
126                    PoseiWsMessage::OrderEvent(msg) => Python::with_gil(|py| {
127                        let py_obj =
128                            order_event_to_pyobject(py, msg).expect("Failed to create event");
129                        call_python(py, &callback, py_obj);
130                    }),
131                }
132            }
133
134            Ok(())
135        })
136    }
137
138    #[pyo3(name = "close")]
139    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
140        let mut client = self.clone();
141
142        pyo3_async_runtimes::tokio::future_into_py(py, async move {
143            if let Err(e) = client.close().await {
144                log::error!("Error on close: {e}");
145            }
146            Ok(())
147        })
148    }
149
150    #[pyo3(name = "subscribe_instruments")]
151    #[pyo3(signature = (instrument_ids=None))]
152    fn py_subscribe_instruments<'py>(
153        &self,
154        py: Python<'py>,
155        instrument_ids: Option<Vec<InstrumentId>>,
156    ) -> PyResult<Bound<'py, PyAny>> {
157        let client = self.clone();
158        let instrument_ids = instrument_ids.unwrap_or_default();
159
160        pyo3_async_runtimes::tokio::future_into_py(py, async move {
161            if let Err(e) = client.subscribe_instruments(instrument_ids).await {
162                log::error!("Failed to subscribe to instruments: {e}");
163            }
164            Ok(())
165        })
166    }
167
168    #[pyo3(name = "subscribe_order_book")]
169    fn py_subscribe_order_book<'py>(
170        &self,
171        py: Python<'py>,
172        instrument_ids: Vec<InstrumentId>,
173    ) -> PyResult<Bound<'py, PyAny>> {
174        let client = self.clone();
175
176        pyo3_async_runtimes::tokio::future_into_py(py, async move {
177            if let Err(e) = client.subscribe_order_book(instrument_ids).await {
178                log::error!("Failed to subscribe to order book: {e}");
179            }
180            Ok(())
181        })
182    }
183
184    #[pyo3(name = "subscribe_quotes")]
185    fn py_subscribe_quotes<'py>(
186        &self,
187        py: Python<'py>,
188        instrument_ids: Vec<InstrumentId>,
189    ) -> PyResult<Bound<'py, PyAny>> {
190        let client = self.clone();
191
192        pyo3_async_runtimes::tokio::future_into_py(py, async move {
193            if let Err(e) = client.subscribe_quotes(instrument_ids).await {
194                log::error!("Failed to subscribe to quotes: {e}");
195            }
196            Ok(())
197        })
198    }
199
200    #[pyo3(name = "subscribe_trades")]
201    fn py_subscribe_trades<'py>(
202        &self,
203        py: Python<'py>,
204        instrument_ids: Vec<InstrumentId>,
205    ) -> PyResult<Bound<'py, PyAny>> {
206        let client = self.clone();
207
208        pyo3_async_runtimes::tokio::future_into_py(py, async move {
209            if let Err(e) = client.subscribe_trades(instrument_ids).await {
210                log::error!("Failed to subscribe to trades: {e}");
211            }
212            Ok(())
213        })
214    }
215
216    #[pyo3(name = "subscribe_mark_prices")]
217    fn py_subscribe_mark_prices<'py>(
218        &self,
219        py: Python<'py>,
220        instrument_ids: Vec<InstrumentId>,
221    ) -> PyResult<Bound<'py, PyAny>> {
222        let client = self.clone();
223
224        pyo3_async_runtimes::tokio::future_into_py(py, async move {
225            if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
226                log::error!("Failed to subscribe to mark prices: {e}");
227            }
228            Ok(())
229        })
230    }
231
232    #[pyo3(name = "subscribe_index_prices")]
233    fn py_subscribe_index_prices<'py>(
234        &self,
235        py: Python<'py>,
236        instrument_ids: Vec<InstrumentId>,
237    ) -> PyResult<Bound<'py, PyAny>> {
238        let client = self.clone();
239
240        pyo3_async_runtimes::tokio::future_into_py(py, async move {
241            if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
242                log::error!("Failed to subscribe to index prices: {e}");
243            }
244            Ok(())
245        })
246    }
247
248    #[pyo3(name = "subscribe_bars")]
249    fn py_subscribe_bars<'py>(
250        &self,
251        py: Python<'py>,
252        bar_type: BarType,
253    ) -> PyResult<Bound<'py, PyAny>> {
254        let client = self.clone();
255
256        pyo3_async_runtimes::tokio::future_into_py(py, async move {
257            if let Err(e) = client.subscribe_bars(bar_type).await {
258                log::error!("Failed to subscribe to bars: {e}");
259            }
260            Ok(())
261        })
262    }
263
264    #[pyo3(name = "unsubscribe_instruments")]
265    fn py_unsubscribe_instruments<'py>(
266        &self,
267        py: Python<'py>,
268        instrument_ids: Vec<InstrumentId>,
269    ) -> PyResult<Bound<'py, PyAny>> {
270        let client = self.clone();
271
272        pyo3_async_runtimes::tokio::future_into_py(py, async move {
273            if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
274                log::error!("Failed to unsubscribe from order book: {e}");
275            }
276            Ok(())
277        })
278    }
279
280    #[pyo3(name = "unsubscribe_order_book")]
281    fn py_unsubscribe_order_book<'py>(
282        &self,
283        py: Python<'py>,
284        instrument_ids: Vec<InstrumentId>,
285    ) -> PyResult<Bound<'py, PyAny>> {
286        let client = self.clone();
287
288        pyo3_async_runtimes::tokio::future_into_py(py, async move {
289            if let Err(e) = client.unsubscribe_order_book(instrument_ids).await {
290                log::error!("Failed to unsubscribe from order book: {e}");
291            }
292            Ok(())
293        })
294    }
295
296    #[pyo3(name = "unsubscribe_quotes")]
297    fn py_unsubscribe_quotes<'py>(
298        &self,
299        py: Python<'py>,
300        instrument_ids: Vec<InstrumentId>,
301    ) -> PyResult<Bound<'py, PyAny>> {
302        let client = self.clone();
303
304        pyo3_async_runtimes::tokio::future_into_py(py, async move {
305            if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
306                log::error!("Failed to unsubscribe from quotes: {e}");
307            }
308            Ok(())
309        })
310    }
311
312    #[pyo3(name = "unsubscribe_trades")]
313    fn py_unsubscribe_trades<'py>(
314        &self,
315        py: Python<'py>,
316        instrument_ids: Vec<InstrumentId>,
317    ) -> PyResult<Bound<'py, PyAny>> {
318        let client = self.clone();
319
320        pyo3_async_runtimes::tokio::future_into_py(py, async move {
321            if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
322                log::error!("Failed to unsubscribe from trades: {e}");
323            }
324            Ok(())
325        })
326    }
327
328    #[pyo3(name = "unsubscribe_mark_prices")]
329    fn py_unsubscribe_mark_prices<'py>(
330        &self,
331        py: Python<'py>,
332        instrument_ids: Vec<InstrumentId>,
333    ) -> PyResult<Bound<'py, PyAny>> {
334        let client = self.clone();
335
336        pyo3_async_runtimes::tokio::future_into_py(py, async move {
337            if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
338                log::error!("Failed to unsubscribe from mark prices: {e}");
339            }
340            Ok(())
341        })
342    }
343
344    #[pyo3(name = "unsubscribe_index_prices")]
345    fn py_unsubscribe_index_prices<'py>(
346        &self,
347        py: Python<'py>,
348        instrument_ids: Vec<InstrumentId>,
349    ) -> PyResult<Bound<'py, PyAny>> {
350        let client = self.clone();
351
352        pyo3_async_runtimes::tokio::future_into_py(py, async move {
353            if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
354                log::error!("Failed to unsubscribe from index prices: {e}");
355            }
356            Ok(())
357        })
358    }
359
360    #[pyo3(name = "unsubscribe_bars")]
361    fn py_unsubscribe_bars<'py>(
362        &self,
363        py: Python<'py>,
364        bar_type: BarType,
365    ) -> PyResult<Bound<'py, PyAny>> {
366        let client = self.clone();
367
368        pyo3_async_runtimes::tokio::future_into_py(py, async move {
369            if let Err(e) = client.unsubscribe_bars(bar_type).await {
370                log::error!("Failed to unsubscribe from bars: {e}");
371            }
372            Ok(())
373        })
374    }
375}
376
377pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
378    if let Err(e) = callback.call1(py, (py_obj,)) {
379        tracing::error!("Error calling Python: {e}");
380    }
381}