nautilus_tardis/machine/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22    data::{
23        Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
24        QuoteTick, TradeTick,
25    },
26    enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27    identifiers::{InstrumentId, TradeId},
28    types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33    message::{BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, TradeMsg, WsMessage},
34    types::InstrumentMiniInfo,
35};
36use crate::parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action};
37
38#[must_use]
39pub fn parse_tardis_ws_message(msg: WsMessage, info: Arc<InstrumentMiniInfo>) -> Option<Data> {
40    match msg {
41        WsMessage::BookChange(msg) => {
42            if msg.bids.is_empty() && msg.asks.is_empty() {
43                tracing::error!(
44                    "Invalid book change for {} {} (empty bids and asks)",
45                    msg.exchange,
46                    msg.symbol
47                );
48                return None;
49            }
50
51            match parse_book_change_msg_as_deltas(
52                msg,
53                info.price_precision,
54                info.size_precision,
55                info.instrument_id,
56            ) {
57                Ok(deltas) => Some(Data::Deltas(deltas)),
58                Err(e) => {
59                    tracing::error!("Failed to parse book change message: {e}");
60                    None
61                }
62            }
63        }
64        WsMessage::BookSnapshot(msg) => match msg.bids.len() {
65            1 => {
66                match parse_book_snapshot_msg_as_quote(
67                    msg,
68                    info.price_precision,
69                    info.size_precision,
70                    info.instrument_id,
71                ) {
72                    Ok(quote) => Some(Data::Quote(quote)),
73                    Err(e) => {
74                        tracing::error!("Failed to parse book snapshot quote message: {e}");
75                        None
76                    }
77                }
78            }
79            _ => {
80                match parse_book_snapshot_msg_as_deltas(
81                    msg,
82                    info.price_precision,
83                    info.size_precision,
84                    info.instrument_id,
85                ) {
86                    Ok(deltas) => Some(Data::Deltas(deltas)),
87                    Err(e) => {
88                        tracing::error!("Failed to parse book snapshot message: {e}");
89                        None
90                    }
91                }
92            }
93        },
94        WsMessage::Trade(msg) => {
95            match parse_trade_msg(
96                msg,
97                info.price_precision,
98                info.size_precision,
99                info.instrument_id,
100            ) {
101                Ok(trade) => Some(Data::Trade(trade)),
102                Err(e) => {
103                    tracing::error!("Failed to parse trade message: {e}");
104                    None
105                }
106            }
107        }
108        WsMessage::TradeBar(msg) => Some(Data::Bar(parse_bar_msg(
109            msg,
110            info.price_precision,
111            info.size_precision,
112            info.instrument_id,
113        ))),
114        WsMessage::DerivativeTicker(_) => None,
115        WsMessage::Disconnect(_) => None,
116    }
117}
118
119/// Parse a book change message into order book deltas, returning an error if timestamps invalid.
120/// Parse a book change message into order book deltas.
121///
122/// # Errors
123///
124/// Returns an error if timestamp fields cannot be converted to nanoseconds.
125pub fn parse_book_change_msg_as_deltas(
126    msg: BookChangeMsg,
127    price_precision: u8,
128    size_precision: u8,
129    instrument_id: InstrumentId,
130) -> anyhow::Result<OrderBookDeltas_API> {
131    parse_book_msg_as_deltas(
132        msg.bids,
133        msg.asks,
134        msg.is_snapshot,
135        price_precision,
136        size_precision,
137        instrument_id,
138        msg.timestamp,
139        msg.local_timestamp,
140    )
141}
142
143/// Parse a book snapshot message into order book deltas, returning an error if timestamps invalid.
144/// Parse a book snapshot message into order book deltas.
145///
146/// # Errors
147///
148/// Returns an error if timestamp fields cannot be converted to nanoseconds.
149pub fn parse_book_snapshot_msg_as_deltas(
150    msg: BookSnapshotMsg,
151    price_precision: u8,
152    size_precision: u8,
153    instrument_id: InstrumentId,
154) -> anyhow::Result<OrderBookDeltas_API> {
155    parse_book_msg_as_deltas(
156        msg.bids,
157        msg.asks,
158        true,
159        price_precision,
160        size_precision,
161        instrument_id,
162        msg.timestamp,
163        msg.local_timestamp,
164    )
165}
166
167/// Parse raw book levels into order book deltas, returning error for invalid timestamps.
168#[allow(clippy::too_many_arguments)]
169/// Parse raw book levels into order book deltas.
170///
171/// # Errors
172///
173/// Returns an error if timestamp fields cannot be converted to nanoseconds.
174pub fn parse_book_msg_as_deltas(
175    bids: Vec<BookLevel>,
176    asks: Vec<BookLevel>,
177    is_snapshot: bool,
178    price_precision: u8,
179    size_precision: u8,
180    instrument_id: InstrumentId,
181    timestamp: DateTime<Utc>,
182    local_timestamp: DateTime<Utc>,
183) -> anyhow::Result<OrderBookDeltas_API> {
184    let event_nanos = timestamp
185        .timestamp_nanos_opt()
186        .context("invalid timestamp: cannot extract event nanoseconds")?;
187    let ts_event = UnixNanos::from(event_nanos as u64);
188    let init_nanos = local_timestamp
189        .timestamp_nanos_opt()
190        .context("invalid timestamp: cannot extract init nanoseconds")?;
191    let ts_init = UnixNanos::from(init_nanos as u64);
192
193    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(bids.len() + asks.len());
194
195    for level in bids {
196        deltas.push(parse_book_level(
197            instrument_id,
198            price_precision,
199            size_precision,
200            OrderSide::Buy,
201            level,
202            is_snapshot,
203            ts_event,
204            ts_init,
205        ));
206    }
207
208    for level in asks {
209        deltas.push(parse_book_level(
210            instrument_id,
211            price_precision,
212            size_precision,
213            OrderSide::Sell,
214            level,
215            is_snapshot,
216            ts_event,
217            ts_init,
218        ));
219    }
220
221    if let Some(last_delta) = deltas.last_mut() {
222        last_delta.flags += RecordFlag::F_LAST.value();
223    }
224
225    // TODO: Opaque pointer wrapper necessary for Cython (remove once Cython gone)
226    Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
227        instrument_id,
228        deltas,
229    )))
230}
231
232#[must_use]
233/// Parse a single book level into an order book delta.
234///
235/// # Panics
236///
237/// Panics if a non-delete action has a zero size after normalization.
238#[allow(clippy::too_many_arguments)]
239pub fn parse_book_level(
240    instrument_id: InstrumentId,
241    price_precision: u8,
242    size_precision: u8,
243    side: OrderSide,
244    level: BookLevel,
245    is_snapshot: bool,
246    ts_event: UnixNanos,
247    ts_init: UnixNanos,
248) -> OrderBookDelta {
249    let amount = normalize_amount(level.amount, size_precision);
250    let action = parse_book_action(is_snapshot, amount);
251    let price = Price::new(level.price, price_precision);
252    let size = Quantity::new(amount, size_precision);
253    let order_id = 0; // Not applicable for L2 data
254    let order = BookOrder::new(side, price, size, order_id);
255    let flags = if is_snapshot {
256        RecordFlag::F_SNAPSHOT.value()
257    } else {
258        0
259    };
260    let sequence = 0; // Not available
261
262    assert!(
263        !(action != BookAction::Delete && size.is_zero()),
264        "Invalid zero size for {action}"
265    );
266
267    OrderBookDelta::new(
268        instrument_id,
269        action,
270        order,
271        flags,
272        sequence,
273        ts_event,
274        ts_init,
275    )
276}
277
278/// Parse a book snapshot message into a quote tick, returning an error on invalid data.
279/// Parse a book snapshot message into a quote tick.
280///
281/// # Errors
282///
283/// Returns an error if missing bid/ask levels or invalid sizes.
284pub fn parse_book_snapshot_msg_as_quote(
285    msg: BookSnapshotMsg,
286    price_precision: u8,
287    size_precision: u8,
288    instrument_id: InstrumentId,
289) -> anyhow::Result<QuoteTick> {
290    let ts_event = UnixNanos::from(msg.timestamp);
291    let ts_init = UnixNanos::from(msg.local_timestamp);
292
293    let best_bid = msg
294        .bids
295        .first()
296        .context("missing best bid level for quote message")?;
297    let bid_price = Price::new(best_bid.price, price_precision);
298    let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
299        .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
300
301    let best_ask = msg
302        .asks
303        .first()
304        .context("missing best ask level for quote message")?;
305    let ask_price = Price::new(best_ask.price, price_precision);
306    let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
307        .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
308
309    Ok(QuoteTick::new(
310        instrument_id,
311        bid_price,
312        ask_price,
313        bid_size,
314        ask_size,
315        ts_event,
316        ts_init,
317    ))
318}
319
320/// Parse a trade message into a trade tick, returning an error on invalid data.
321/// Parse a trade message into a trade tick.
322///
323/// # Errors
324///
325/// Returns an error if invalid trade size is encountered.
326pub fn parse_trade_msg(
327    msg: TradeMsg,
328    price_precision: u8,
329    size_precision: u8,
330    instrument_id: InstrumentId,
331) -> anyhow::Result<TradeTick> {
332    let price = Price::new(msg.price, price_precision);
333    let size = Quantity::non_zero_checked(msg.amount, size_precision)
334        .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
335    let aggressor_side = parse_aggressor_side(&msg.side);
336    let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
337    let ts_event = UnixNanos::from(msg.timestamp);
338    let ts_init = UnixNanos::from(msg.local_timestamp);
339
340    Ok(TradeTick::new(
341        instrument_id,
342        price,
343        size,
344        aggressor_side,
345        trade_id,
346        ts_event,
347        ts_init,
348    ))
349}
350
351#[must_use]
352pub fn parse_bar_msg(
353    msg: BarMsg,
354    price_precision: u8,
355    size_precision: u8,
356    instrument_id: InstrumentId,
357) -> Bar {
358    let spec = parse_bar_spec(&msg.name);
359    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
360
361    let open = Price::new(msg.open, price_precision);
362    let high = Price::new(msg.high, price_precision);
363    let low = Price::new(msg.low, price_precision);
364    let close = Price::new(msg.close, price_precision);
365    let volume = Quantity::non_zero(msg.volume, size_precision);
366    let ts_event = UnixNanos::from(msg.timestamp);
367    let ts_init = UnixNanos::from(msg.local_timestamp);
368
369    Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
370}
371
372////////////////////////////////////////////////////////////////////////////////
373// Tests
374////////////////////////////////////////////////////////////////////////////////
375#[cfg(test)]
376mod tests {
377    use nautilus_model::enums::{AggressorSide, BookAction};
378    use rstest::rstest;
379
380    use super::*;
381    use crate::tests::load_test_json;
382
383    #[rstest]
384    fn test_parse_book_change_message() {
385        let json_data = load_test_json("book_change.json");
386        let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
387
388        let price_precision = 0;
389        let size_precision = 0;
390        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
391        let deltas =
392            parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
393                .unwrap();
394
395        assert_eq!(deltas.deltas.len(), 1);
396        assert_eq!(deltas.instrument_id, instrument_id);
397        assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
398        assert_eq!(deltas.sequence, 0);
399        assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
400        assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
401        assert_eq!(
402            deltas.deltas[0].instrument_id,
403            InstrumentId::from("XBTUSD.BITMEX")
404        );
405        assert_eq!(deltas.deltas[0].action, BookAction::Update);
406        assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
407        assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
408        assert_eq!(deltas.deltas[0].order.order_id, 0);
409        assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
410        assert_eq!(deltas.deltas[0].sequence, 0);
411        assert_eq!(
412            deltas.deltas[0].ts_event,
413            UnixNanos::from(1571830193469000000)
414        );
415        assert_eq!(
416            deltas.deltas[0].ts_init,
417            UnixNanos::from(1571830193469000000)
418        );
419    }
420
421    #[rstest]
422    fn test_parse_book_snapshot_message_as_deltas() {
423        let json_data = load_test_json("book_snapshot.json");
424        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
425
426        let price_precision = 1;
427        let size_precision = 0;
428        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
429        let deltas =
430            parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
431                .unwrap();
432        let delta_0 = deltas.deltas[0];
433        let delta_2 = deltas.deltas[2];
434
435        assert_eq!(deltas.deltas.len(), 4);
436        assert_eq!(deltas.instrument_id, instrument_id);
437        assert_eq!(
438            deltas.flags,
439            RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
440        );
441        assert_eq!(deltas.sequence, 0);
442        assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
443        assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
444        assert_eq!(delta_0.instrument_id, instrument_id);
445        assert_eq!(delta_0.action, BookAction::Add);
446        assert_eq!(delta_0.order.side, OrderSide::Buy);
447        assert_eq!(delta_0.order.price, Price::from("7633.5"));
448        assert_eq!(delta_0.order.size, Quantity::from(1906067));
449        assert_eq!(delta_0.order.order_id, 0);
450        assert_eq!(delta_0.flags, RecordFlag::F_SNAPSHOT.value());
451        assert_eq!(delta_0.sequence, 0);
452        assert_eq!(delta_0.ts_event, UnixNanos::from(1572010786950000000));
453        assert_eq!(delta_0.ts_init, UnixNanos::from(1572010786961000000));
454        assert_eq!(delta_2.instrument_id, instrument_id);
455        assert_eq!(delta_2.action, BookAction::Add);
456        assert_eq!(delta_2.order.side, OrderSide::Sell);
457        assert_eq!(delta_2.order.price, Price::from("7634.0"));
458        assert_eq!(delta_2.order.size, Quantity::from(1467849));
459        assert_eq!(delta_2.order.order_id, 0);
460        assert_eq!(delta_2.flags, RecordFlag::F_SNAPSHOT.value());
461        assert_eq!(delta_2.sequence, 0);
462        assert_eq!(delta_2.ts_event, UnixNanos::from(1572010786950000000));
463        assert_eq!(delta_2.ts_init, UnixNanos::from(1572010786961000000));
464    }
465
466    #[rstest]
467    fn test_parse_book_snapshot_message_as_quote() {
468        let json_data = load_test_json("book_snapshot.json");
469        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
470
471        let price_precision = 1;
472        let size_precision = 0;
473        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
474        let quote =
475            parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
476                .expect("Failed to parse book snapshot quote message");
477
478        assert_eq!(quote.instrument_id, instrument_id);
479        assert_eq!(quote.bid_price, Price::from("7633.5"));
480        assert_eq!(quote.bid_size, Quantity::from(1906067));
481        assert_eq!(quote.ask_price, Price::from("7634.0"));
482        assert_eq!(quote.ask_size, Quantity::from(1467849));
483        assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
484        assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
485    }
486
487    #[rstest]
488    fn test_parse_trade_message() {
489        let json_data = load_test_json("trade.json");
490        let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
491
492        let price_precision = 0;
493        let size_precision = 0;
494        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
495        let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
496            .expect("Failed to parse trade message");
497
498        assert_eq!(trade.instrument_id, instrument_id);
499        assert_eq!(trade.price, Price::from("7996"));
500        assert_eq!(trade.size, Quantity::from(50));
501        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
502        assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
503        assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
504    }
505
506    #[rstest]
507    fn test_parse_bar_message() {
508        let json_data = load_test_json("bar.json");
509        let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
510
511        let price_precision = 1;
512        let size_precision = 0;
513        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
514        let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id);
515
516        assert_eq!(
517            bar.bar_type,
518            BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
519        );
520        assert_eq!(bar.open, Price::from("7623.5"));
521        assert_eq!(bar.high, Price::from("7623.5"));
522        assert_eq!(bar.low, Price::from("7623"));
523        assert_eq!(bar.close, Price::from("7623.5"));
524        assert_eq!(bar.volume, Quantity::from(37034));
525        assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
526        assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
527    }
528}