1use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
24 QuoteTick, TradeTick,
25 },
26 enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27 identifiers::{InstrumentId, TradeId},
28 types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33 message::{BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, TradeMsg, WsMessage},
34 types::InstrumentMiniInfo,
35};
36use crate::parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action};
37
38#[must_use]
39pub fn parse_tardis_ws_message(msg: WsMessage, info: Arc<InstrumentMiniInfo>) -> Option<Data> {
40 match msg {
41 WsMessage::BookChange(msg) => {
42 if msg.bids.is_empty() && msg.asks.is_empty() {
43 tracing::error!(
44 "Invalid book change for {} {} (empty bids and asks)",
45 msg.exchange,
46 msg.symbol
47 );
48 return None;
49 }
50
51 match parse_book_change_msg_as_deltas(
52 msg,
53 info.price_precision,
54 info.size_precision,
55 info.instrument_id,
56 ) {
57 Ok(deltas) => Some(Data::Deltas(deltas)),
58 Err(e) => {
59 tracing::error!("Failed to parse book change message: {e}");
60 None
61 }
62 }
63 }
64 WsMessage::BookSnapshot(msg) => match msg.bids.len() {
65 1 => {
66 match parse_book_snapshot_msg_as_quote(
67 msg,
68 info.price_precision,
69 info.size_precision,
70 info.instrument_id,
71 ) {
72 Ok(quote) => Some(Data::Quote(quote)),
73 Err(e) => {
74 tracing::error!("Failed to parse book snapshot quote message: {e}");
75 None
76 }
77 }
78 }
79 _ => {
80 match parse_book_snapshot_msg_as_deltas(
81 msg,
82 info.price_precision,
83 info.size_precision,
84 info.instrument_id,
85 ) {
86 Ok(deltas) => Some(Data::Deltas(deltas)),
87 Err(e) => {
88 tracing::error!("Failed to parse book snapshot message: {e}");
89 None
90 }
91 }
92 }
93 },
94 WsMessage::Trade(msg) => {
95 match parse_trade_msg(
96 msg,
97 info.price_precision,
98 info.size_precision,
99 info.instrument_id,
100 ) {
101 Ok(trade) => Some(Data::Trade(trade)),
102 Err(e) => {
103 tracing::error!("Failed to parse trade message: {e}");
104 None
105 }
106 }
107 }
108 WsMessage::TradeBar(msg) => Some(Data::Bar(parse_bar_msg(
109 msg,
110 info.price_precision,
111 info.size_precision,
112 info.instrument_id,
113 ))),
114 WsMessage::DerivativeTicker(_) => None,
115 WsMessage::Disconnect(_) => None,
116 }
117}
118
119pub fn parse_book_change_msg_as_deltas(
126 msg: BookChangeMsg,
127 price_precision: u8,
128 size_precision: u8,
129 instrument_id: InstrumentId,
130) -> anyhow::Result<OrderBookDeltas_API> {
131 parse_book_msg_as_deltas(
132 msg.bids,
133 msg.asks,
134 msg.is_snapshot,
135 price_precision,
136 size_precision,
137 instrument_id,
138 msg.timestamp,
139 msg.local_timestamp,
140 )
141}
142
143pub fn parse_book_snapshot_msg_as_deltas(
150 msg: BookSnapshotMsg,
151 price_precision: u8,
152 size_precision: u8,
153 instrument_id: InstrumentId,
154) -> anyhow::Result<OrderBookDeltas_API> {
155 parse_book_msg_as_deltas(
156 msg.bids,
157 msg.asks,
158 true,
159 price_precision,
160 size_precision,
161 instrument_id,
162 msg.timestamp,
163 msg.local_timestamp,
164 )
165}
166
167#[allow(clippy::too_many_arguments)]
169pub fn parse_book_msg_as_deltas(
175 bids: Vec<BookLevel>,
176 asks: Vec<BookLevel>,
177 is_snapshot: bool,
178 price_precision: u8,
179 size_precision: u8,
180 instrument_id: InstrumentId,
181 timestamp: DateTime<Utc>,
182 local_timestamp: DateTime<Utc>,
183) -> anyhow::Result<OrderBookDeltas_API> {
184 let event_nanos = timestamp
185 .timestamp_nanos_opt()
186 .context("invalid timestamp: cannot extract event nanoseconds")?;
187 let ts_event = UnixNanos::from(event_nanos as u64);
188 let init_nanos = local_timestamp
189 .timestamp_nanos_opt()
190 .context("invalid timestamp: cannot extract init nanoseconds")?;
191 let ts_init = UnixNanos::from(init_nanos as u64);
192
193 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(bids.len() + asks.len());
194
195 for level in bids {
196 deltas.push(parse_book_level(
197 instrument_id,
198 price_precision,
199 size_precision,
200 OrderSide::Buy,
201 level,
202 is_snapshot,
203 ts_event,
204 ts_init,
205 ));
206 }
207
208 for level in asks {
209 deltas.push(parse_book_level(
210 instrument_id,
211 price_precision,
212 size_precision,
213 OrderSide::Sell,
214 level,
215 is_snapshot,
216 ts_event,
217 ts_init,
218 ));
219 }
220
221 if let Some(last_delta) = deltas.last_mut() {
222 last_delta.flags += RecordFlag::F_LAST.value();
223 }
224
225 Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
227 instrument_id,
228 deltas,
229 )))
230}
231
232#[must_use]
233#[allow(clippy::too_many_arguments)]
239pub fn parse_book_level(
240 instrument_id: InstrumentId,
241 price_precision: u8,
242 size_precision: u8,
243 side: OrderSide,
244 level: BookLevel,
245 is_snapshot: bool,
246 ts_event: UnixNanos,
247 ts_init: UnixNanos,
248) -> OrderBookDelta {
249 let amount = normalize_amount(level.amount, size_precision);
250 let action = parse_book_action(is_snapshot, amount);
251 let price = Price::new(level.price, price_precision);
252 let size = Quantity::new(amount, size_precision);
253 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
255 let flags = if is_snapshot {
256 RecordFlag::F_SNAPSHOT.value()
257 } else {
258 0
259 };
260 let sequence = 0; assert!(
263 !(action != BookAction::Delete && size.is_zero()),
264 "Invalid zero size for {action}"
265 );
266
267 OrderBookDelta::new(
268 instrument_id,
269 action,
270 order,
271 flags,
272 sequence,
273 ts_event,
274 ts_init,
275 )
276}
277
278pub fn parse_book_snapshot_msg_as_quote(
285 msg: BookSnapshotMsg,
286 price_precision: u8,
287 size_precision: u8,
288 instrument_id: InstrumentId,
289) -> anyhow::Result<QuoteTick> {
290 let ts_event = UnixNanos::from(msg.timestamp);
291 let ts_init = UnixNanos::from(msg.local_timestamp);
292
293 let best_bid = msg
294 .bids
295 .first()
296 .context("missing best bid level for quote message")?;
297 let bid_price = Price::new(best_bid.price, price_precision);
298 let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
299 .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
300
301 let best_ask = msg
302 .asks
303 .first()
304 .context("missing best ask level for quote message")?;
305 let ask_price = Price::new(best_ask.price, price_precision);
306 let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
307 .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
308
309 Ok(QuoteTick::new(
310 instrument_id,
311 bid_price,
312 ask_price,
313 bid_size,
314 ask_size,
315 ts_event,
316 ts_init,
317 ))
318}
319
320pub fn parse_trade_msg(
327 msg: TradeMsg,
328 price_precision: u8,
329 size_precision: u8,
330 instrument_id: InstrumentId,
331) -> anyhow::Result<TradeTick> {
332 let price = Price::new(msg.price, price_precision);
333 let size = Quantity::non_zero_checked(msg.amount, size_precision)
334 .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
335 let aggressor_side = parse_aggressor_side(&msg.side);
336 let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
337 let ts_event = UnixNanos::from(msg.timestamp);
338 let ts_init = UnixNanos::from(msg.local_timestamp);
339
340 Ok(TradeTick::new(
341 instrument_id,
342 price,
343 size,
344 aggressor_side,
345 trade_id,
346 ts_event,
347 ts_init,
348 ))
349}
350
351#[must_use]
352pub fn parse_bar_msg(
353 msg: BarMsg,
354 price_precision: u8,
355 size_precision: u8,
356 instrument_id: InstrumentId,
357) -> Bar {
358 let spec = parse_bar_spec(&msg.name);
359 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
360
361 let open = Price::new(msg.open, price_precision);
362 let high = Price::new(msg.high, price_precision);
363 let low = Price::new(msg.low, price_precision);
364 let close = Price::new(msg.close, price_precision);
365 let volume = Quantity::non_zero(msg.volume, size_precision);
366 let ts_event = UnixNanos::from(msg.timestamp);
367 let ts_init = UnixNanos::from(msg.local_timestamp);
368
369 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
370}
371
372#[cfg(test)]
376mod tests {
377 use nautilus_model::enums::{AggressorSide, BookAction};
378 use rstest::rstest;
379
380 use super::*;
381 use crate::tests::load_test_json;
382
383 #[rstest]
384 fn test_parse_book_change_message() {
385 let json_data = load_test_json("book_change.json");
386 let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
387
388 let price_precision = 0;
389 let size_precision = 0;
390 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
391 let deltas =
392 parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
393 .unwrap();
394
395 assert_eq!(deltas.deltas.len(), 1);
396 assert_eq!(deltas.instrument_id, instrument_id);
397 assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
398 assert_eq!(deltas.sequence, 0);
399 assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
400 assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
401 assert_eq!(
402 deltas.deltas[0].instrument_id,
403 InstrumentId::from("XBTUSD.BITMEX")
404 );
405 assert_eq!(deltas.deltas[0].action, BookAction::Update);
406 assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
407 assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
408 assert_eq!(deltas.deltas[0].order.order_id, 0);
409 assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
410 assert_eq!(deltas.deltas[0].sequence, 0);
411 assert_eq!(
412 deltas.deltas[0].ts_event,
413 UnixNanos::from(1571830193469000000)
414 );
415 assert_eq!(
416 deltas.deltas[0].ts_init,
417 UnixNanos::from(1571830193469000000)
418 );
419 }
420
421 #[rstest]
422 fn test_parse_book_snapshot_message_as_deltas() {
423 let json_data = load_test_json("book_snapshot.json");
424 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
425
426 let price_precision = 1;
427 let size_precision = 0;
428 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
429 let deltas =
430 parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
431 .unwrap();
432 let delta_0 = deltas.deltas[0];
433 let delta_2 = deltas.deltas[2];
434
435 assert_eq!(deltas.deltas.len(), 4);
436 assert_eq!(deltas.instrument_id, instrument_id);
437 assert_eq!(
438 deltas.flags,
439 RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
440 );
441 assert_eq!(deltas.sequence, 0);
442 assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
443 assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
444 assert_eq!(delta_0.instrument_id, instrument_id);
445 assert_eq!(delta_0.action, BookAction::Add);
446 assert_eq!(delta_0.order.side, OrderSide::Buy);
447 assert_eq!(delta_0.order.price, Price::from("7633.5"));
448 assert_eq!(delta_0.order.size, Quantity::from(1906067));
449 assert_eq!(delta_0.order.order_id, 0);
450 assert_eq!(delta_0.flags, RecordFlag::F_SNAPSHOT.value());
451 assert_eq!(delta_0.sequence, 0);
452 assert_eq!(delta_0.ts_event, UnixNanos::from(1572010786950000000));
453 assert_eq!(delta_0.ts_init, UnixNanos::from(1572010786961000000));
454 assert_eq!(delta_2.instrument_id, instrument_id);
455 assert_eq!(delta_2.action, BookAction::Add);
456 assert_eq!(delta_2.order.side, OrderSide::Sell);
457 assert_eq!(delta_2.order.price, Price::from("7634.0"));
458 assert_eq!(delta_2.order.size, Quantity::from(1467849));
459 assert_eq!(delta_2.order.order_id, 0);
460 assert_eq!(delta_2.flags, RecordFlag::F_SNAPSHOT.value());
461 assert_eq!(delta_2.sequence, 0);
462 assert_eq!(delta_2.ts_event, UnixNanos::from(1572010786950000000));
463 assert_eq!(delta_2.ts_init, UnixNanos::from(1572010786961000000));
464 }
465
466 #[rstest]
467 fn test_parse_book_snapshot_message_as_quote() {
468 let json_data = load_test_json("book_snapshot.json");
469 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
470
471 let price_precision = 1;
472 let size_precision = 0;
473 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
474 let quote =
475 parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
476 .expect("Failed to parse book snapshot quote message");
477
478 assert_eq!(quote.instrument_id, instrument_id);
479 assert_eq!(quote.bid_price, Price::from("7633.5"));
480 assert_eq!(quote.bid_size, Quantity::from(1906067));
481 assert_eq!(quote.ask_price, Price::from("7634.0"));
482 assert_eq!(quote.ask_size, Quantity::from(1467849));
483 assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
484 assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
485 }
486
487 #[rstest]
488 fn test_parse_trade_message() {
489 let json_data = load_test_json("trade.json");
490 let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
491
492 let price_precision = 0;
493 let size_precision = 0;
494 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
495 let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
496 .expect("Failed to parse trade message");
497
498 assert_eq!(trade.instrument_id, instrument_id);
499 assert_eq!(trade.price, Price::from("7996"));
500 assert_eq!(trade.size, Quantity::from(50));
501 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
502 assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
503 assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
504 }
505
506 #[rstest]
507 fn test_parse_bar_message() {
508 let json_data = load_test_json("bar.json");
509 let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
510
511 let price_precision = 1;
512 let size_precision = 0;
513 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
514 let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id);
515
516 assert_eq!(
517 bar.bar_type,
518 BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
519 );
520 assert_eq!(bar.open, Price::from("7623.5"));
521 assert_eq!(bar.high, Price::from("7623.5"));
522 assert_eq!(bar.low, Price::from("7623"));
523 assert_eq!(bar.close, Price::from("7623.5"));
524 assert_eq!(bar.volume, Quantity::from(37034));
525 assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
526 assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
527 }
528}