nautilus_coinbase_intx/python/
websocket.rs1use futures_util::StreamExt;
17use nautilus_core::python::{IntoPyObjectPoseiExt, to_pyvalue_err};
18use nautilus_model::{
19 data::BarType,
20 identifiers::InstrumentId,
21 python::{
22 data::data_to_pycapsule,
23 events::order::order_event_to_pyobject,
24 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
25 },
26};
27use pyo3::{exceptions::PyRuntimeError, prelude::*};
28use pyo3_async_runtimes::tokio::get_runtime;
29
30use crate::websocket::{CoinbaseIntxWebSocketClient, messages::PoseiWsMessage};
31
32#[pymethods]
33impl CoinbaseIntxWebSocketClient {
34 #[new]
35 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
36 fn py_new(
37 url: Option<String>,
38 api_key: Option<String>,
39 api_secret: Option<String>,
40 api_passphrase: Option<String>,
41 heartbeat: Option<u64>,
42 ) -> PyResult<Self> {
43 Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
44 }
45
46 #[getter]
47 #[pyo3(name = "url")]
48 #[must_use]
49 pub const fn py_url(&self) -> &str {
50 self.url()
51 }
52
53 #[getter]
54 #[pyo3(name = "api_key")]
55 #[must_use]
56 pub fn py_api_key(&self) -> &str {
57 self.api_key()
58 }
59
60 #[pyo3(name = "is_active")]
61 fn py_is_active(&mut self) -> bool {
62 self.is_active()
63 }
64
65 #[pyo3(name = "is_closed")]
66 fn py_is_closed(&mut self) -> bool {
67 self.is_closed()
68 }
69
70 #[pyo3(name = "connect")]
71 fn py_connect<'py>(
72 &mut self,
73 py: Python<'py>,
74 instruments: Vec<PyObject>,
75 callback: PyObject,
76 ) -> PyResult<Bound<'py, PyAny>> {
77 let mut instruments_any = Vec::new();
78 for inst in instruments {
79 let inst_any = pyobject_to_instrument_any(py, inst)?;
80 instruments_any.push(inst_any);
81 }
82
83 get_runtime().block_on(async {
84 self.connect(instruments_any)
85 .await
86 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
87 })?;
88
89 let stream = self.stream();
90
91 pyo3_async_runtimes::tokio::future_into_py(py, async move {
92 tokio::pin!(stream);
93
94 while let Some(msg) = stream.next().await {
95 match msg {
96 PoseiWsMessage::Instrument(inst) => Python::with_gil(|py| {
97 let py_obj = instrument_any_to_pyobject(py, inst)
98 .expect("Failed to create instrument");
99 call_python(py, &callback, py_obj);
100 }),
101 PoseiWsMessage::Data(data) => Python::with_gil(|py| {
102 let py_obj = data_to_pycapsule(py, data);
103 call_python(py, &callback, py_obj);
104 }),
105 PoseiWsMessage::DataVec(data_vec) => Python::with_gil(|py| {
106 for data in data_vec {
107 let py_obj = data_to_pycapsule(py, data);
108 call_python(py, &callback, py_obj);
109 }
110 }),
111 PoseiWsMessage::Deltas(deltas) => Python::with_gil(|py| {
112 call_python(py, &callback, deltas.into_py_any_unwrap(py));
113 }),
114 PoseiWsMessage::MarkPrice(mark_price) => Python::with_gil(|py| {
115 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
116 }),
117 PoseiWsMessage::IndexPrice(index_price) => Python::with_gil(|py| {
118 call_python(py, &callback, index_price.into_py_any_unwrap(py));
119 }),
120 PoseiWsMessage::MarkAndIndex((mark_price, index_price)) => {
121 Python::with_gil(|py| {
122 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
123 call_python(py, &callback, index_price.into_py_any_unwrap(py));
124 });
125 }
126 PoseiWsMessage::OrderEvent(msg) => Python::with_gil(|py| {
127 let py_obj =
128 order_event_to_pyobject(py, msg).expect("Failed to create event");
129 call_python(py, &callback, py_obj);
130 }),
131 }
132 }
133
134 Ok(())
135 })
136 }
137
138 #[pyo3(name = "close")]
139 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
140 let mut client = self.clone();
141
142 pyo3_async_runtimes::tokio::future_into_py(py, async move {
143 if let Err(e) = client.close().await {
144 log::error!("Error on close: {e}");
145 }
146 Ok(())
147 })
148 }
149
150 #[pyo3(name = "subscribe_instruments")]
151 #[pyo3(signature = (instrument_ids=None))]
152 fn py_subscribe_instruments<'py>(
153 &self,
154 py: Python<'py>,
155 instrument_ids: Option<Vec<InstrumentId>>,
156 ) -> PyResult<Bound<'py, PyAny>> {
157 let client = self.clone();
158 let instrument_ids = instrument_ids.unwrap_or_default();
159
160 pyo3_async_runtimes::tokio::future_into_py(py, async move {
161 if let Err(e) = client.subscribe_instruments(instrument_ids).await {
162 log::error!("Failed to subscribe to instruments: {e}");
163 }
164 Ok(())
165 })
166 }
167
168 #[pyo3(name = "subscribe_order_book")]
169 fn py_subscribe_order_book<'py>(
170 &self,
171 py: Python<'py>,
172 instrument_ids: Vec<InstrumentId>,
173 ) -> PyResult<Bound<'py, PyAny>> {
174 let client = self.clone();
175
176 pyo3_async_runtimes::tokio::future_into_py(py, async move {
177 if let Err(e) = client.subscribe_order_book(instrument_ids).await {
178 log::error!("Failed to subscribe to order book: {e}");
179 }
180 Ok(())
181 })
182 }
183
184 #[pyo3(name = "subscribe_quotes")]
185 fn py_subscribe_quotes<'py>(
186 &self,
187 py: Python<'py>,
188 instrument_ids: Vec<InstrumentId>,
189 ) -> PyResult<Bound<'py, PyAny>> {
190 let client = self.clone();
191
192 pyo3_async_runtimes::tokio::future_into_py(py, async move {
193 if let Err(e) = client.subscribe_quotes(instrument_ids).await {
194 log::error!("Failed to subscribe to quotes: {e}");
195 }
196 Ok(())
197 })
198 }
199
200 #[pyo3(name = "subscribe_trades")]
201 fn py_subscribe_trades<'py>(
202 &self,
203 py: Python<'py>,
204 instrument_ids: Vec<InstrumentId>,
205 ) -> PyResult<Bound<'py, PyAny>> {
206 let client = self.clone();
207
208 pyo3_async_runtimes::tokio::future_into_py(py, async move {
209 if let Err(e) = client.subscribe_trades(instrument_ids).await {
210 log::error!("Failed to subscribe to trades: {e}");
211 }
212 Ok(())
213 })
214 }
215
216 #[pyo3(name = "subscribe_mark_prices")]
217 fn py_subscribe_mark_prices<'py>(
218 &self,
219 py: Python<'py>,
220 instrument_ids: Vec<InstrumentId>,
221 ) -> PyResult<Bound<'py, PyAny>> {
222 let client = self.clone();
223
224 pyo3_async_runtimes::tokio::future_into_py(py, async move {
225 if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
226 log::error!("Failed to subscribe to mark prices: {e}");
227 }
228 Ok(())
229 })
230 }
231
232 #[pyo3(name = "subscribe_index_prices")]
233 fn py_subscribe_index_prices<'py>(
234 &self,
235 py: Python<'py>,
236 instrument_ids: Vec<InstrumentId>,
237 ) -> PyResult<Bound<'py, PyAny>> {
238 let client = self.clone();
239
240 pyo3_async_runtimes::tokio::future_into_py(py, async move {
241 if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
242 log::error!("Failed to subscribe to index prices: {e}");
243 }
244 Ok(())
245 })
246 }
247
248 #[pyo3(name = "subscribe_bars")]
249 fn py_subscribe_bars<'py>(
250 &self,
251 py: Python<'py>,
252 bar_type: BarType,
253 ) -> PyResult<Bound<'py, PyAny>> {
254 let client = self.clone();
255
256 pyo3_async_runtimes::tokio::future_into_py(py, async move {
257 if let Err(e) = client.subscribe_bars(bar_type).await {
258 log::error!("Failed to subscribe to bars: {e}");
259 }
260 Ok(())
261 })
262 }
263
264 #[pyo3(name = "unsubscribe_instruments")]
265 fn py_unsubscribe_instruments<'py>(
266 &self,
267 py: Python<'py>,
268 instrument_ids: Vec<InstrumentId>,
269 ) -> PyResult<Bound<'py, PyAny>> {
270 let client = self.clone();
271
272 pyo3_async_runtimes::tokio::future_into_py(py, async move {
273 if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
274 log::error!("Failed to unsubscribe from order book: {e}");
275 }
276 Ok(())
277 })
278 }
279
280 #[pyo3(name = "unsubscribe_order_book")]
281 fn py_unsubscribe_order_book<'py>(
282 &self,
283 py: Python<'py>,
284 instrument_ids: Vec<InstrumentId>,
285 ) -> PyResult<Bound<'py, PyAny>> {
286 let client = self.clone();
287
288 pyo3_async_runtimes::tokio::future_into_py(py, async move {
289 if let Err(e) = client.unsubscribe_order_book(instrument_ids).await {
290 log::error!("Failed to unsubscribe from order book: {e}");
291 }
292 Ok(())
293 })
294 }
295
296 #[pyo3(name = "unsubscribe_quotes")]
297 fn py_unsubscribe_quotes<'py>(
298 &self,
299 py: Python<'py>,
300 instrument_ids: Vec<InstrumentId>,
301 ) -> PyResult<Bound<'py, PyAny>> {
302 let client = self.clone();
303
304 pyo3_async_runtimes::tokio::future_into_py(py, async move {
305 if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
306 log::error!("Failed to unsubscribe from quotes: {e}");
307 }
308 Ok(())
309 })
310 }
311
312 #[pyo3(name = "unsubscribe_trades")]
313 fn py_unsubscribe_trades<'py>(
314 &self,
315 py: Python<'py>,
316 instrument_ids: Vec<InstrumentId>,
317 ) -> PyResult<Bound<'py, PyAny>> {
318 let client = self.clone();
319
320 pyo3_async_runtimes::tokio::future_into_py(py, async move {
321 if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
322 log::error!("Failed to unsubscribe from trades: {e}");
323 }
324 Ok(())
325 })
326 }
327
328 #[pyo3(name = "unsubscribe_mark_prices")]
329 fn py_unsubscribe_mark_prices<'py>(
330 &self,
331 py: Python<'py>,
332 instrument_ids: Vec<InstrumentId>,
333 ) -> PyResult<Bound<'py, PyAny>> {
334 let client = self.clone();
335
336 pyo3_async_runtimes::tokio::future_into_py(py, async move {
337 if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
338 log::error!("Failed to unsubscribe from mark prices: {e}");
339 }
340 Ok(())
341 })
342 }
343
344 #[pyo3(name = "unsubscribe_index_prices")]
345 fn py_unsubscribe_index_prices<'py>(
346 &self,
347 py: Python<'py>,
348 instrument_ids: Vec<InstrumentId>,
349 ) -> PyResult<Bound<'py, PyAny>> {
350 let client = self.clone();
351
352 pyo3_async_runtimes::tokio::future_into_py(py, async move {
353 if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
354 log::error!("Failed to unsubscribe from index prices: {e}");
355 }
356 Ok(())
357 })
358 }
359
360 #[pyo3(name = "unsubscribe_bars")]
361 fn py_unsubscribe_bars<'py>(
362 &self,
363 py: Python<'py>,
364 bar_type: BarType,
365 ) -> PyResult<Bound<'py, PyAny>> {
366 let client = self.clone();
367
368 pyo3_async_runtimes::tokio::future_into_py(py, async move {
369 if let Err(e) = client.unsubscribe_bars(bar_type).await {
370 log::error!("Failed to unsubscribe from bars: {e}");
371 }
372 Ok(())
373 })
374 }
375}
376
377pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
378 if let Err(e) = callback.call1(py, (py_obj,)) {
379 tracing::error!("Error calling Python: {e}");
380 }
381}