1use nautilus_core::nanos::UnixNanos;
17use nautilus_model::{
18 data::{
19 Bar, BarType, BookOrder, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
20 OrderBookDeltas, QuoteTick, TradeTick,
21 },
22 enums::{AggregationSource, AggressorSide, BookAction, OrderSide, RecordFlag},
23 identifiers::{InstrumentId, Symbol, TradeId},
24 instruments::{CryptoPerpetual, CurrencyPair, any::InstrumentAny},
25 types::{Price, Quantity},
26};
27use rust_decimal::Decimal;
28
29use super::messages::{
30 CoinbaseIntxWsCandleSnapshotMsg, CoinbaseIntxWsInstrumentMsg,
31 CoinbaseIntxWsOrderBookSnapshotMsg, CoinbaseIntxWsOrderBookUpdateMsg, CoinbaseIntxWsQuoteMsg,
32 CoinbaseIntxWsRiskMsg, CoinbaseIntxWsTradeMsg,
33};
34use crate::common::{
35 enums::CoinbaseIntxInstrumentType,
36 parse::{coinbase_channel_as_bar_spec, get_currency, parse_instrument_id},
37};
38
39pub fn parse_spot_instrument(
46 definition: &CoinbaseIntxWsInstrumentMsg,
47 margin_init: Option<Decimal>,
48 margin_maint: Option<Decimal>,
49 maker_fee: Option<Decimal>,
50 taker_fee: Option<Decimal>,
51 ts_init: UnixNanos,
52) -> anyhow::Result<InstrumentAny> {
53 let instrument_id = parse_instrument_id(definition.product_id);
54 let raw_symbol = Symbol::from_ustr_unchecked(definition.product_id);
55
56 let base_currency = get_currency(&definition.base_asset_name);
57 let quote_currency = get_currency(&definition.quote_asset_name);
58
59 let price_increment = Price::from(&definition.quote_increment);
60 let size_increment = Quantity::from(&definition.base_increment);
61
62 let lot_size = None;
63 let max_quantity = None;
64 let min_quantity = None;
65 let max_notional = None;
66 let min_notional = None;
67 let max_price = None;
68 let min_price = None;
69
70 let instrument = CurrencyPair::new(
71 instrument_id,
72 raw_symbol,
73 base_currency,
74 quote_currency,
75 price_increment.precision,
76 size_increment.precision,
77 price_increment,
78 size_increment,
79 lot_size,
80 max_quantity,
81 min_quantity,
82 max_notional,
83 min_notional,
84 max_price,
85 min_price,
86 margin_init,
87 margin_maint,
88 maker_fee,
89 taker_fee,
90 definition.time.into(),
91 ts_init,
92 );
93
94 Ok(InstrumentAny::CurrencyPair(instrument))
95}
96
97pub fn parse_perp_instrument(
104 definition: &CoinbaseIntxWsInstrumentMsg,
105 margin_init: Option<Decimal>,
106 margin_maint: Option<Decimal>,
107 maker_fee: Option<Decimal>,
108 taker_fee: Option<Decimal>,
109 ts_init: UnixNanos,
110) -> anyhow::Result<InstrumentAny> {
111 let instrument_id = parse_instrument_id(definition.product_id);
112 let raw_symbol = Symbol::from_ustr_unchecked(definition.product_id);
113
114 let base_currency = get_currency(&definition.base_asset_name);
115 let quote_currency = get_currency(&definition.quote_asset_name);
116 let settlement_currency = quote_currency;
117
118 let price_increment = Price::from(&definition.quote_increment);
119 let size_increment = Quantity::from(&definition.base_increment);
120
121 let multiplier = Some(Quantity::from(&definition.base_asset_multiplier));
122
123 let lot_size = None;
124 let max_quantity = None;
125 let min_quantity = None;
126 let max_notional = None;
127 let min_notional = None;
128 let max_price = None;
129 let min_price = None;
130
131 let is_inverse = false;
132
133 let instrument = CryptoPerpetual::new(
134 instrument_id,
135 raw_symbol,
136 base_currency,
137 quote_currency,
138 settlement_currency,
139 is_inverse,
140 price_increment.precision,
141 size_increment.precision,
142 price_increment,
143 size_increment,
144 multiplier,
145 lot_size,
146 max_quantity,
147 min_quantity,
148 max_notional,
149 min_notional,
150 max_price,
151 min_price,
152 margin_init,
153 margin_maint,
154 maker_fee,
155 taker_fee,
156 definition.time.into(),
157 ts_init,
158 );
159
160 Ok(InstrumentAny::CryptoPerpetual(instrument))
161}
162
163#[must_use]
164pub fn parse_instrument_any(
165 instrument: &CoinbaseIntxWsInstrumentMsg,
166 ts_init: UnixNanos,
167) -> Option<InstrumentAny> {
168 let result = match instrument.instrument_type {
169 CoinbaseIntxInstrumentType::Spot => {
170 parse_spot_instrument(instrument, None, None, None, None, ts_init).map(Some)
171 }
172 CoinbaseIntxInstrumentType::Perp => {
173 parse_perp_instrument(instrument, None, None, None, None, ts_init).map(Some)
174 }
175 CoinbaseIntxInstrumentType::Index => {
176 tracing::warn!(
177 "Index instrument parsing not implemented {}",
178 instrument.product_id,
179 );
180 Ok(None)
181 }
182 };
183
184 match result {
185 Ok(instrument) => instrument,
186 Err(e) => {
187 tracing::warn!("Failed to parse instrument {}: {e}", instrument.product_id,);
188 None
189 }
190 }
191}
192
193pub fn parse_orderbook_snapshot_msg(
199 msg: &CoinbaseIntxWsOrderBookSnapshotMsg,
200 instrument_id: InstrumentId,
201 price_precision: u8,
202 size_precision: u8,
203 ts_init: UnixNanos,
204) -> anyhow::Result<OrderBookDeltas> {
205 let ts_event = UnixNanos::from(msg.time);
206
207 let flags = RecordFlag::F_SNAPSHOT as u8;
209
210 let mut deltas = Vec::with_capacity(msg.bids.len() + msg.asks.len());
212
213 for bid in &msg.bids {
215 let price_str = &bid[0];
216 let size_str = &bid[1];
217
218 let price = Price::new(
219 price_str
220 .parse::<f64>()
221 .map_err(|e| anyhow::anyhow!("Failed to parse bid price: {e}"))?,
222 price_precision,
223 );
224
225 let size = Quantity::new(
226 size_str
227 .parse::<f64>()
228 .map_err(|e| anyhow::anyhow!("Failed to parse bid size: {e}"))?,
229 size_precision,
230 );
231
232 let order_id = 0; let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
235
236 let delta = OrderBookDelta::new(
237 instrument_id,
238 BookAction::Add, order,
240 flags,
241 msg.sequence,
242 ts_event,
243 ts_init,
244 );
245
246 deltas.push(delta);
247 }
248
249 for ask in &msg.asks {
251 let price_str = &ask[0];
252 let size_str = &ask[1];
253
254 let price = Price::new(
255 price_str
256 .parse::<f64>()
257 .map_err(|e| anyhow::anyhow!("Failed to parse ask price: {e}"))?,
258 price_precision,
259 );
260
261 let size = Quantity::new(
262 size_str
263 .parse::<f64>()
264 .map_err(|e| anyhow::anyhow!("Failed to parse ask size: {e}"))?,
265 size_precision,
266 );
267
268 let order_id = 0; let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
271
272 let delta = OrderBookDelta::new(
273 instrument_id,
274 BookAction::Add, order,
276 flags,
277 msg.sequence,
278 ts_event,
279 ts_init,
280 );
281
282 deltas.push(delta);
283 }
284
285 Ok(OrderBookDeltas::new(instrument_id, deltas))
286}
287
288pub fn parse_orderbook_update_msg(
294 msg: &CoinbaseIntxWsOrderBookUpdateMsg,
295 instrument_id: InstrumentId,
296 price_precision: u8,
297 size_precision: u8,
298 ts_init: UnixNanos,
299) -> anyhow::Result<OrderBookDeltas> {
300 let ts_event = UnixNanos::from(msg.time);
301
302 let flags = 0;
304
305 let mut deltas = Vec::with_capacity(msg.changes.len());
307
308 for change in &msg.changes {
310 let side_str = &change[0];
311 let price_str = &change[1];
312 let size_str = &change[2];
313
314 let price = Price::new(
315 price_str
316 .parse::<f64>()
317 .map_err(|e| anyhow::anyhow!("Failed to parse price: {e}"))?,
318 price_precision,
319 );
320
321 let size = Quantity::new(
322 size_str
323 .parse::<f64>()
324 .map_err(|e| anyhow::anyhow!("Failed to parse size: {e}"))?,
325 size_precision,
326 );
327
328 let side = match side_str.as_str() {
330 "BUY" => OrderSide::Buy,
331 "SELL" => OrderSide::Sell,
332 _ => anyhow::bail!("Unknown order side: {side_str}"),
333 };
334
335 let book_action = if size.is_zero() {
337 BookAction::Delete
338 } else {
339 BookAction::Update
340 };
341
342 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
344
345 let delta = OrderBookDelta::new(
346 instrument_id,
347 book_action,
348 order,
349 flags,
350 msg.sequence,
351 ts_event,
352 ts_init,
353 );
354
355 deltas.push(delta);
356 }
357
358 Ok(OrderBookDeltas::new(instrument_id, deltas))
359}
360
361pub fn parse_quote_msg(
367 msg: &CoinbaseIntxWsQuoteMsg,
368 instrument_id: InstrumentId,
369 price_precision: u8,
370 size_precision: u8,
371 ts_init: UnixNanos,
372) -> anyhow::Result<QuoteTick> {
373 let bid_price = Price::new(msg.bid_price.parse::<f64>()?, price_precision);
374 let ask_price = Price::new(msg.ask_price.parse::<f64>()?, price_precision);
375 let bid_size = Quantity::new(msg.bid_qty.parse::<f64>()?, size_precision);
376 let ask_size = Quantity::new(msg.ask_qty.parse::<f64>()?, size_precision);
377 let ts_event = UnixNanos::from(msg.time);
378
379 Ok(QuoteTick::new(
380 instrument_id,
381 bid_price,
382 ask_price,
383 bid_size,
384 ask_size,
385 ts_event,
386 ts_init,
387 ))
388}
389
390pub fn parse_trade_msg(
396 msg: &CoinbaseIntxWsTradeMsg,
397 instrument_id: InstrumentId,
398 price_precision: u8,
399 size_precision: u8,
400 ts_init: UnixNanos,
401) -> anyhow::Result<TradeTick> {
402 let price = Price::new(msg.trade_price.parse::<f64>()?, price_precision);
403 let size = Quantity::new(msg.trade_qty.parse::<f64>()?, size_precision);
404 let aggressor_side: AggressorSide = msg.aggressor_side.clone().into();
405 let trade_id = TradeId::new(&msg.match_id);
406 let ts_event = UnixNanos::from(msg.time);
407
408 Ok(TradeTick::new(
409 instrument_id,
410 price,
411 size,
412 aggressor_side,
413 trade_id,
414 ts_event,
415 ts_init,
416 ))
417}
418
419pub fn parse_mark_price_msg(
425 msg: &CoinbaseIntxWsRiskMsg,
426 instrument_id: InstrumentId,
427 price_precision: u8,
428 ts_init: UnixNanos,
429) -> anyhow::Result<MarkPriceUpdate> {
430 let value = Price::new(msg.mark_price.parse::<f64>()?, price_precision);
431 let ts_event = UnixNanos::from(msg.time);
432
433 Ok(MarkPriceUpdate::new(
434 instrument_id,
435 value,
436 ts_event,
437 ts_init,
438 ))
439}
440
441pub fn parse_index_price_msg(
447 msg: &CoinbaseIntxWsRiskMsg,
448 instrument_id: InstrumentId,
449 price_precision: u8,
450 ts_init: UnixNanos,
451) -> anyhow::Result<IndexPriceUpdate> {
452 let value = Price::new(msg.index_price.parse::<f64>()?, price_precision);
453 let ts_event = UnixNanos::from(msg.time);
454
455 Ok(IndexPriceUpdate::new(
456 instrument_id,
457 value,
458 ts_event,
459 ts_init,
460 ))
461}
462
463pub fn parse_candle_msg(
469 msg: &CoinbaseIntxWsCandleSnapshotMsg,
470 instrument_id: InstrumentId,
471 price_precision: u8,
472 size_precision: u8,
473 ts_init: UnixNanos,
474) -> anyhow::Result<Bar> {
475 let bar_spec = coinbase_channel_as_bar_spec(&msg.channel)?;
476 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
477 let candle = msg
478 .candles
479 .last()
480 .ok_or_else(|| anyhow::anyhow!("Empty candles in snapshot for channel {}", msg.channel))?;
481 let ts_event = UnixNanos::from(candle.start); let open_price = Price::new(candle.open.parse::<f64>()?, price_precision);
484 let high_price = Price::new(candle.high.parse::<f64>()?, price_precision);
485 let low_price = Price::new(candle.low.parse::<f64>()?, price_precision);
486 let close_price = Price::new(candle.close.parse::<f64>()?, price_precision);
487 let volume = Quantity::new(candle.volume.parse::<f64>()?, size_precision);
488
489 Ok(Bar::new(
491 bar_type,
492 open_price,
493 high_price,
494 low_price,
495 close_price,
496 volume,
497 ts_event,
498 ts_init,
499 ))
500}