nautilus_tardis/csv/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16mod record;
17
18use std::{
19    error::Error,
20    ffi::OsStr,
21    fs::File,
22    io::{BufReader, Read, Seek, SeekFrom},
23    path::Path,
24    time::Duration,
25};
26
27use csv::{Reader, ReaderBuilder, StringRecord};
28use flate2::read::GzDecoder;
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31    data::{
32        BookOrder, DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
33    },
34    enums::{BookAction, OrderSide, RecordFlag},
35    identifiers::{InstrumentId, TradeId},
36    types::{Quantity, fixed::FIXED_PRECISION},
37};
38
39use super::{
40    csv::record::{
41        TardisBookUpdateRecord, TardisOrderBookSnapshot5Record, TardisOrderBookSnapshot25Record,
42        TardisQuoteRecord, TardisTradeRecord,
43    },
44    parse::{
45        parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
46        parse_timestamp,
47    },
48};
49use crate::parse::parse_price;
50
51fn infer_precision(value: f64) -> u8 {
52    let str_value = value.to_string(); // Single allocation
53    match str_value.find('.') {
54        Some(decimal_idx) => (str_value.len() - decimal_idx - 1) as u8,
55        None => 0,
56    }
57}
58
59fn create_csv_reader<P: AsRef<Path>>(
60    filepath: P,
61) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
62    let filepath_ref = filepath.as_ref();
63    const MAX_RETRIES: u8 = 3;
64    const DELAY_MS: u64 = 100;
65
66    fn open_file_with_retry<P: AsRef<Path>>(
67        path: P,
68        max_retries: u8,
69        delay_ms: u64,
70    ) -> anyhow::Result<File> {
71        let path_ref = path.as_ref();
72        for attempt in 1..=max_retries {
73            match File::open(path_ref) {
74                Ok(file) => return Ok(file),
75                Err(e) => {
76                    if attempt == max_retries {
77                        anyhow::bail!(
78                            "Failed to open file '{path_ref:?}' after {max_retries} attempts: {e}"
79                        );
80                    }
81                    eprintln!(
82                        "Attempt {attempt}/{max_retries} failed to open file '{path_ref:?}': {e}. Retrying after {delay_ms}ms..."
83                    );
84                    std::thread::sleep(Duration::from_millis(delay_ms));
85                }
86            }
87        }
88        unreachable!("Loop should return either Ok or Err");
89    }
90
91    let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
92
93    let is_gzipped = filepath_ref
94        .extension()
95        .and_then(OsStr::to_str)
96        .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
97
98    if !is_gzipped {
99        let buf_reader = BufReader::new(file);
100        return Ok(ReaderBuilder::new()
101            .has_headers(true)
102            .from_reader(Box::new(buf_reader)));
103    }
104
105    let file_size = file.metadata()?.len();
106    if file_size < 2 {
107        anyhow::bail!("File too small to be a valid gzip file");
108    }
109
110    let mut header_buf = [0u8; 2];
111    for attempt in 1..=MAX_RETRIES {
112        match file.read_exact(&mut header_buf) {
113            Ok(()) => break,
114            Err(e) => {
115                if attempt == MAX_RETRIES {
116                    anyhow::bail!(
117                        "Failed to read gzip header from '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
118                    );
119                }
120                eprintln!(
121                    "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
122                );
123                std::thread::sleep(Duration::from_millis(DELAY_MS));
124            }
125        }
126    }
127
128    if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
129        anyhow::bail!("File '{filepath_ref:?}' has .gz extension but invalid gzip header");
130    }
131
132    for attempt in 1..=MAX_RETRIES {
133        match file.seek(SeekFrom::Start(0)) {
134            Ok(_) => break,
135            Err(e) => {
136                if attempt == MAX_RETRIES {
137                    anyhow::bail!(
138                        "Failed to reset file position for '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
139                    );
140                }
141                eprintln!(
142                    "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
143                );
144                std::thread::sleep(Duration::from_millis(DELAY_MS));
145            }
146        }
147    }
148
149    let buf_reader = BufReader::new(file);
150    let decoder = GzDecoder::new(buf_reader);
151
152    Ok(ReaderBuilder::new()
153        .has_headers(true)
154        .from_reader(Box::new(decoder)))
155}
156
157/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
158/// automatically applying `GZip` decompression for files ending in ".gz".
159/// Load order book delta records from a CSV or gzipped CSV file.
160///
161/// # Errors
162///
163/// Returns an error if the file cannot be opened, read, or parsed as CSV.
164/// # Panics
165///
166/// Panics if a CSV record has a zero size for a non-delete action or if data conversion fails.
167pub fn load_deltas<P: AsRef<Path>>(
168    filepath: P,
169    price_precision: Option<u8>,
170    size_precision: Option<u8>,
171    instrument_id: Option<InstrumentId>,
172    limit: Option<usize>,
173) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
174    // Infer precisions if not provided
175    let (price_precision, size_precision) = match (price_precision, size_precision) {
176        (Some(p), Some(s)) => (p, s),
177        (price_precision, size_precision) => {
178            let mut reader = create_csv_reader(&filepath)?;
179            let mut record = StringRecord::new();
180
181            let mut max_price_precision = 0u8;
182            let mut max_size_precision = 0u8;
183            let mut count = 0;
184
185            while reader.read_record(&mut record)? {
186                let parsed: TardisBookUpdateRecord = record.deserialize(None)?;
187
188                if price_precision.is_none() {
189                    max_price_precision = infer_precision(parsed.price).max(max_price_precision);
190                }
191
192                if size_precision.is_none() {
193                    max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
194                }
195
196                if let Some(limit) = limit {
197                    if count >= limit {
198                        break;
199                    }
200                    count += 1;
201                }
202            }
203
204            drop(reader);
205
206            max_price_precision = max_price_precision.min(FIXED_PRECISION);
207            max_size_precision = max_size_precision.min(FIXED_PRECISION);
208
209            (
210                price_precision.unwrap_or(max_price_precision),
211                size_precision.unwrap_or(max_size_precision),
212            )
213        }
214    };
215
216    let mut deltas: Vec<OrderBookDelta> = Vec::new();
217    let mut last_ts_event = UnixNanos::default();
218
219    let mut reader = create_csv_reader(filepath)?;
220    let mut record = StringRecord::new();
221
222    while reader.read_record(&mut record)? {
223        let record: TardisBookUpdateRecord = record.deserialize(None)?;
224
225        let instrument_id = match &instrument_id {
226            Some(id) => *id,
227            None => parse_instrument_id(&record.exchange, record.symbol),
228        };
229        let side = parse_order_side(&record.side);
230        let price = parse_price(record.price, price_precision);
231        let size = Quantity::new(record.amount, size_precision);
232        let order_id = 0; // Not applicable for L2 data
233        let order = BookOrder::new(side, price, size, order_id);
234
235        let action = parse_book_action(record.is_snapshot, size.as_f64());
236        let flags = 0; // Flags always zero until timestamp changes
237        let sequence = 0; // Sequence not available
238        let ts_event = parse_timestamp(record.timestamp);
239        let ts_init = parse_timestamp(record.local_timestamp);
240
241        // Check if timestamp is different from last timestamp
242        if last_ts_event != ts_event {
243            if let Some(last_delta) = deltas.last_mut() {
244                // Set previous delta flags as F_LAST
245                last_delta.flags = RecordFlag::F_LAST.value();
246            }
247        }
248
249        assert!(
250            !(action != BookAction::Delete && size.is_zero()),
251            "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {record:?}"
252        );
253
254        last_ts_event = ts_event;
255
256        let delta = OrderBookDelta::new(
257            instrument_id,
258            action,
259            order,
260            flags,
261            sequence,
262            ts_event,
263            ts_init,
264        );
265
266        deltas.push(delta);
267
268        if let Some(limit) = limit {
269            if deltas.len() >= limit {
270                break;
271            }
272        }
273    }
274
275    // Set F_LAST flag for final delta
276    if let Some(last_delta) = deltas.last_mut() {
277        last_delta.flags = RecordFlag::F_LAST.value();
278    }
279
280    Ok(deltas)
281}
282
283fn create_book_order(
284    side: OrderSide,
285    price: Option<f64>,
286    amount: Option<f64>,
287    price_precision: u8,
288    size_precision: u8,
289) -> (BookOrder, u32) {
290    match price {
291        Some(price) => (
292            BookOrder::new(
293                side,
294                parse_price(price, price_precision),
295                Quantity::new(amount.unwrap_or(0.0), size_precision),
296                0,
297            ),
298            1, // Count set to 1 if order exists
299        ),
300        None => (NULL_ORDER, 0), // NULL_ORDER if price is None
301    }
302}
303
304/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
305/// automatically applying `GZip` decompression for files ending in ".gz".
306/// Load order book depth-10 snapshots (5-level) from a CSV or gzipped CSV file.
307///
308/// # Errors
309///
310/// Returns an error if the file cannot be opened, read, or parsed as CSV.
311/// # Panics
312///
313/// Panics if a record level cannot be parsed to depth-10.
314pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
315    filepath: P,
316    price_precision: Option<u8>,
317    size_precision: Option<u8>,
318    instrument_id: Option<InstrumentId>,
319    limit: Option<usize>,
320) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
321    // Infer precisions if not provided
322    let (price_precision, size_precision) = match (price_precision, size_precision) {
323        (Some(p), Some(s)) => (p, s),
324        (price_precision, size_precision) => {
325            let mut reader = create_csv_reader(&filepath)?;
326            let mut record = StringRecord::new();
327
328            let mut max_price_precision = 0u8;
329            let mut max_size_precision = 0u8;
330            let mut count = 0;
331
332            while reader.read_record(&mut record)? {
333                let parsed: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
334
335                if price_precision.is_none() {
336                    if let Some(bid_price) = parsed.bids_0_price {
337                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
338                    }
339                }
340
341                if size_precision.is_none() {
342                    if let Some(bid_amount) = parsed.bids_0_amount {
343                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
344                    }
345                }
346
347                if let Some(limit) = limit {
348                    if count >= limit {
349                        break;
350                    }
351                    count += 1;
352                }
353            }
354
355            drop(reader);
356
357            max_price_precision = max_price_precision.min(FIXED_PRECISION);
358            max_size_precision = max_size_precision.min(FIXED_PRECISION);
359
360            (
361                price_precision.unwrap_or(max_price_precision),
362                size_precision.unwrap_or(max_size_precision),
363            )
364        }
365    };
366
367    let mut depths: Vec<OrderBookDepth10> = Vec::new();
368
369    let mut reader = create_csv_reader(filepath)?;
370    let mut record = StringRecord::new();
371    while reader.read_record(&mut record)? {
372        let record: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
373        let instrument_id = match &instrument_id {
374            Some(id) => *id,
375            None => parse_instrument_id(&record.exchange, record.symbol),
376        };
377        let flags = RecordFlag::F_LAST.value();
378        let sequence = 0; // Sequence not available
379        let ts_event = parse_timestamp(record.timestamp);
380        let ts_init = parse_timestamp(record.local_timestamp);
381
382        // Initialize empty arrays
383        let mut bids = [NULL_ORDER; DEPTH10_LEN];
384        let mut asks = [NULL_ORDER; DEPTH10_LEN];
385        let mut bid_counts = [0u32; DEPTH10_LEN];
386        let mut ask_counts = [0u32; DEPTH10_LEN];
387
388        for i in 0..=4 {
389            // Create bids
390            let (bid_order, bid_count) = create_book_order(
391                OrderSide::Buy,
392                match i {
393                    0 => record.bids_0_price,
394                    1 => record.bids_1_price,
395                    2 => record.bids_2_price,
396                    3 => record.bids_3_price,
397                    4 => record.bids_4_price,
398                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
399                },
400                match i {
401                    0 => record.bids_0_amount,
402                    1 => record.bids_1_amount,
403                    2 => record.bids_2_amount,
404                    3 => record.bids_3_amount,
405                    4 => record.bids_4_amount,
406                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
407                },
408                price_precision,
409                size_precision,
410            );
411            bids[i] = bid_order;
412            bid_counts[i] = bid_count;
413
414            // Create asks
415            let (ask_order, ask_count) = create_book_order(
416                OrderSide::Sell,
417                match i {
418                    0 => record.asks_0_price,
419                    1 => record.asks_1_price,
420                    2 => record.asks_2_price,
421                    3 => record.asks_3_price,
422                    4 => record.asks_4_price,
423                    _ => None, // Unreachable, but for safety
424                },
425                match i {
426                    0 => record.asks_0_amount,
427                    1 => record.asks_1_amount,
428                    2 => record.asks_2_amount,
429                    3 => record.asks_3_amount,
430                    4 => record.asks_4_amount,
431                    _ => None, // Unreachable, but for safety
432                },
433                price_precision,
434                size_precision,
435            );
436            asks[i] = ask_order;
437            ask_counts[i] = ask_count;
438        }
439
440        let depth = OrderBookDepth10::new(
441            instrument_id,
442            bids,
443            asks,
444            bid_counts,
445            ask_counts,
446            flags,
447            sequence,
448            ts_event,
449            ts_init,
450        );
451
452        depths.push(depth);
453
454        if let Some(limit) = limit {
455            if depths.len() >= limit {
456                break;
457            }
458        }
459    }
460
461    Ok(depths)
462}
463
464/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
465/// automatically applying `GZip` decompression for files ending in ".gz".
466/// Load order book depth-10 snapshots (25-level) from a CSV or gzipped CSV file.
467///
468/// # Errors
469///
470/// Returns an error if the file cannot be opened, read, or parsed as CSV.
471/// # Panics
472///
473/// Panics if a record level cannot be parsed to depth-10.
474pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
475    filepath: P,
476    price_precision: Option<u8>,
477    size_precision: Option<u8>,
478    instrument_id: Option<InstrumentId>,
479    limit: Option<usize>,
480) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
481    // Infer precisions if not provided
482    let (price_precision, size_precision) = match (price_precision, size_precision) {
483        (Some(p), Some(s)) => (p, s),
484        (price_precision, size_precision) => {
485            let mut reader = create_csv_reader(&filepath)?;
486            let mut record = StringRecord::new();
487
488            let mut max_price_precision = 0u8;
489            let mut max_size_precision = 0u8;
490            let mut count = 0;
491
492            while reader.read_record(&mut record)? {
493                let parsed: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
494
495                if price_precision.is_none() {
496                    if let Some(bid_price) = parsed.bids_0_price {
497                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
498                    }
499                }
500
501                if size_precision.is_none() {
502                    if let Some(bid_amount) = parsed.bids_0_amount {
503                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
504                    }
505                }
506
507                if let Some(limit) = limit {
508                    if count >= limit {
509                        break;
510                    }
511                    count += 1;
512                }
513            }
514
515            drop(reader);
516
517            max_price_precision = max_price_precision.min(FIXED_PRECISION);
518            max_size_precision = max_size_precision.min(FIXED_PRECISION);
519
520            (
521                price_precision.unwrap_or(max_price_precision),
522                size_precision.unwrap_or(max_size_precision),
523            )
524        }
525    };
526
527    let mut depths: Vec<OrderBookDepth10> = Vec::new();
528    let mut reader = create_csv_reader(filepath)?;
529    let mut record = StringRecord::new();
530
531    while reader.read_record(&mut record)? {
532        let record: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
533
534        let instrument_id = match &instrument_id {
535            Some(id) => *id,
536            None => parse_instrument_id(&record.exchange, record.symbol),
537        };
538        let flags = RecordFlag::F_LAST.value();
539        let sequence = 0; // Sequence not available
540        let ts_event = parse_timestamp(record.timestamp);
541        let ts_init = parse_timestamp(record.local_timestamp);
542
543        // Initialize empty arrays for the first 10 levels only
544        let mut bids = [NULL_ORDER; DEPTH10_LEN];
545        let mut asks = [NULL_ORDER; DEPTH10_LEN];
546        let mut bid_counts = [0u32; DEPTH10_LEN];
547        let mut ask_counts = [0u32; DEPTH10_LEN];
548
549        // Fill only the first 10 levels from the 25-level record
550        for i in 0..DEPTH10_LEN {
551            // Create bids
552            let (bid_order, bid_count) = create_book_order(
553                OrderSide::Buy,
554                match i {
555                    0 => record.bids_0_price,
556                    1 => record.bids_1_price,
557                    2 => record.bids_2_price,
558                    3 => record.bids_3_price,
559                    4 => record.bids_4_price,
560                    5 => record.bids_5_price,
561                    6 => record.bids_6_price,
562                    7 => record.bids_7_price,
563                    8 => record.bids_8_price,
564                    9 => record.bids_9_price,
565                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
566                },
567                match i {
568                    0 => record.bids_0_amount,
569                    1 => record.bids_1_amount,
570                    2 => record.bids_2_amount,
571                    3 => record.bids_3_amount,
572                    4 => record.bids_4_amount,
573                    5 => record.bids_5_amount,
574                    6 => record.bids_6_amount,
575                    7 => record.bids_7_amount,
576                    8 => record.bids_8_amount,
577                    9 => record.bids_9_amount,
578                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
579                },
580                price_precision,
581                size_precision,
582            );
583            bids[i] = bid_order;
584            bid_counts[i] = bid_count;
585
586            // Create asks
587            let (ask_order, ask_count) = create_book_order(
588                OrderSide::Sell,
589                match i {
590                    0 => record.asks_0_price,
591                    1 => record.asks_1_price,
592                    2 => record.asks_2_price,
593                    3 => record.asks_3_price,
594                    4 => record.asks_4_price,
595                    5 => record.asks_5_price,
596                    6 => record.asks_6_price,
597                    7 => record.asks_7_price,
598                    8 => record.asks_8_price,
599                    9 => record.asks_9_price,
600                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
601                },
602                match i {
603                    0 => record.asks_0_amount,
604                    1 => record.asks_1_amount,
605                    2 => record.asks_2_amount,
606                    3 => record.asks_3_amount,
607                    4 => record.asks_4_amount,
608                    5 => record.asks_5_amount,
609                    6 => record.asks_6_amount,
610                    7 => record.asks_7_amount,
611                    8 => record.asks_8_amount,
612                    9 => record.asks_9_amount,
613                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
614                },
615                price_precision,
616                size_precision,
617            );
618            asks[i] = ask_order;
619            ask_counts[i] = ask_count;
620        }
621
622        let depth = OrderBookDepth10::new(
623            instrument_id,
624            bids,
625            asks,
626            bid_counts,
627            ask_counts,
628            flags,
629            sequence,
630            ts_event,
631            ts_init,
632        );
633
634        depths.push(depth);
635
636        if let Some(limit) = limit {
637            if depths.len() >= limit {
638                break;
639            }
640        }
641    }
642
643    Ok(depths)
644}
645
646/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
647/// automatically applying `GZip` decompression for files ending in ".gz".
648/// Load quote ticks from a CSV or gzipped CSV file.
649///
650/// # Errors
651///
652/// Returns an error if the file cannot be opened, read, or parsed as CSV.
653/// # Panics
654///
655/// Panics if a record has invalid data or CSV parsing errors.
656pub fn load_quote_ticks<P: AsRef<Path>>(
657    filepath: P,
658    price_precision: Option<u8>,
659    size_precision: Option<u8>,
660    instrument_id: Option<InstrumentId>,
661    limit: Option<usize>,
662) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
663    // Infer precisions if not provided
664    let (price_precision, size_precision) = match (price_precision, size_precision) {
665        (Some(p), Some(s)) => (p, s),
666        (price_precision, size_precision) => {
667            let mut reader = create_csv_reader(&filepath)?;
668            let mut record = StringRecord::new();
669
670            let mut max_price_precision = 0u8;
671            let mut max_size_precision = 0u8;
672            let mut count = 0;
673
674            while reader.read_record(&mut record)? {
675                let parsed: TardisQuoteRecord = record.deserialize(None)?;
676
677                if price_precision.is_none() {
678                    if let Some(bid_price) = parsed.bid_price {
679                        max_price_precision = infer_precision(bid_price).max(max_price_precision);
680                    }
681                }
682
683                if size_precision.is_none() {
684                    if let Some(bid_amount) = parsed.bid_amount {
685                        max_size_precision = infer_precision(bid_amount).max(max_size_precision);
686                    }
687                }
688
689                if let Some(limit) = limit {
690                    if count >= limit {
691                        break;
692                    }
693                    count += 1;
694                }
695            }
696
697            drop(reader);
698
699            max_price_precision = max_price_precision.min(FIXED_PRECISION);
700            max_size_precision = max_size_precision.min(FIXED_PRECISION);
701
702            (
703                price_precision.unwrap_or(max_price_precision),
704                size_precision.unwrap_or(max_size_precision),
705            )
706        }
707    };
708
709    let mut quotes = Vec::new();
710    let mut reader = create_csv_reader(filepath)?;
711    let mut record = StringRecord::new();
712
713    while reader.read_record(&mut record)? {
714        let record: TardisQuoteRecord = record.deserialize(None)?;
715
716        let instrument_id = match &instrument_id {
717            Some(id) => *id,
718            None => parse_instrument_id(&record.exchange, record.symbol),
719        };
720        let bid_price = parse_price(record.bid_price.unwrap_or(0.0), price_precision);
721        let bid_size = Quantity::new(record.bid_amount.unwrap_or(0.0), size_precision);
722        let ask_price = parse_price(record.ask_price.unwrap_or(0.0), price_precision);
723        let ask_size = Quantity::new(record.ask_amount.unwrap_or(0.0), size_precision);
724        let ts_event = parse_timestamp(record.timestamp);
725        let ts_init = parse_timestamp(record.local_timestamp);
726
727        let quote = QuoteTick::new(
728            instrument_id,
729            bid_price,
730            ask_price,
731            bid_size,
732            ask_size,
733            ts_event,
734            ts_init,
735        );
736
737        quotes.push(quote);
738
739        if let Some(limit) = limit {
740            if quotes.len() >= limit {
741                break;
742            }
743        }
744    }
745
746    Ok(quotes)
747}
748
749/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
750/// automatically applying `GZip` decompression for files ending in ".gz".
751/// Load trade ticks from a CSV or gzipped CSV file.
752///
753/// # Errors
754///
755/// Returns an error if the file cannot be opened, read, or parsed as CSV.
756/// # Panics
757///
758/// Panics if a record has invalid trade size or CSV parsing errors.
759pub fn load_trade_ticks<P: AsRef<Path>>(
760    filepath: P,
761    price_precision: Option<u8>,
762    size_precision: Option<u8>,
763    instrument_id: Option<InstrumentId>,
764    limit: Option<usize>,
765) -> Result<Vec<TradeTick>, Box<dyn Error>> {
766    // Infer precisions if not provided
767    let (price_precision, size_precision) = match (price_precision, size_precision) {
768        (Some(p), Some(s)) => (p, s),
769        (price_precision, size_precision) => {
770            let mut reader = create_csv_reader(&filepath)?;
771            let mut record = StringRecord::new();
772
773            let mut max_price_precision = 0u8;
774            let mut max_size_precision = 0u8;
775            let mut count = 0;
776
777            while reader.read_record(&mut record)? {
778                let parsed: TardisTradeRecord = record.deserialize(None)?;
779
780                if price_precision.is_none() {
781                    max_price_precision = infer_precision(parsed.price).max(max_price_precision);
782                }
783
784                if size_precision.is_none() {
785                    max_size_precision = infer_precision(parsed.amount).max(max_size_precision);
786                }
787
788                if let Some(limit) = limit {
789                    if count >= limit {
790                        break;
791                    }
792                    count += 1;
793                }
794            }
795
796            drop(reader);
797
798            max_price_precision = max_price_precision.min(FIXED_PRECISION);
799            max_size_precision = max_size_precision.min(FIXED_PRECISION);
800
801            (
802                price_precision.unwrap_or(max_price_precision),
803                size_precision.unwrap_or(max_size_precision),
804            )
805        }
806    };
807
808    let mut trades = Vec::new();
809    let mut reader = create_csv_reader(filepath)?;
810    let mut record = StringRecord::new();
811
812    while reader.read_record(&mut record)? {
813        let record: TardisTradeRecord = record.deserialize(None)?;
814
815        let instrument_id = match &instrument_id {
816            Some(id) => *id,
817            None => parse_instrument_id(&record.exchange, record.symbol),
818        };
819        let price = parse_price(record.price, price_precision);
820        let size = Quantity::non_zero_checked(record.amount, size_precision)
821            .unwrap_or_else(|e| panic!("Invalid {record:?}: size {e}"));
822        let aggressor_side = parse_aggressor_side(&record.side);
823        let trade_id = TradeId::new(&record.id);
824        let ts_event = parse_timestamp(record.timestamp);
825        let ts_init = parse_timestamp(record.local_timestamp);
826
827        let trade = TradeTick::new(
828            instrument_id,
829            price,
830            size,
831            aggressor_side,
832            trade_id,
833            ts_event,
834            ts_init,
835        );
836
837        trades.push(trade);
838
839        if let Some(limit) = limit {
840            if trades.len() >= limit {
841                break;
842            }
843        }
844    }
845
846    Ok(trades)
847}
848
849////////////////////////////////////////////////////////////////////////////////
850// Tests
851////////////////////////////////////////////////////////////////////////////////
852#[cfg(test)]
853mod tests {
854    use nautilus_model::{
855        enums::{AggressorSide, BookAction},
856        identifiers::InstrumentId,
857        types::Price,
858    };
859    use nautilus_testkit::common::{
860        ensure_data_exists_tardis_binance_snapshot5, ensure_data_exists_tardis_binance_snapshot25,
861        ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
862        ensure_data_exists_tardis_huobi_quotes,
863    };
864    use rstest::*;
865
866    use super::*;
867
868    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
869    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
870    #[rstest]
871    #[case(Some(1), Some(0))] // Explicit precisions
872    #[case(None, None)] // Inferred precisions
873    pub fn test_read_deltas(
874        #[case] price_precision: Option<u8>,
875        #[case] size_precision: Option<u8>,
876    ) {
877        let filepath = ensure_data_exists_tardis_deribit_book_l2();
878        let deltas = load_deltas(
879            filepath,
880            price_precision,
881            size_precision,
882            None,
883            Some(10_000),
884        )
885        .unwrap();
886
887        assert_eq!(deltas.len(), 10_000);
888        assert_eq!(
889            deltas[0].instrument_id,
890            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
891        );
892        assert_eq!(deltas[0].action, BookAction::Add);
893        assert_eq!(deltas[0].order.side, OrderSide::Sell);
894        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
895        assert_eq!(deltas[0].order.size, Quantity::from("18640"));
896        assert_eq!(deltas[0].flags, 0);
897        assert_eq!(deltas[0].sequence, 0);
898        assert_eq!(deltas[0].ts_event, 1585699200245000000);
899        assert_eq!(deltas[0].ts_init, 1585699200355684000);
900    }
901
902    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
903    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
904    #[rstest]
905    #[case(Some(2), Some(3))] // Explicit precisions
906    #[case(None, None)] // Inferred precisions
907    pub fn test_read_depth10s_from_snapshot5(
908        #[case] price_precision: Option<u8>,
909        #[case] size_precision: Option<u8>,
910    ) {
911        let filepath = ensure_data_exists_tardis_binance_snapshot5();
912        let depths = load_depth10_from_snapshot5(
913            filepath,
914            price_precision,
915            size_precision,
916            None,
917            Some(10_000),
918        )
919        .unwrap();
920
921        assert_eq!(depths.len(), 10_000);
922        assert_eq!(
923            depths[0].instrument_id,
924            InstrumentId::from("BTCUSDT.BINANCE")
925        );
926        assert_eq!(depths[0].bids.len(), 10);
927        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
928        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
929        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
930        assert_eq!(depths[0].bids[0].order_id, 0);
931        assert_eq!(depths[0].asks.len(), 10);
932        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
933        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
934        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
935        assert_eq!(depths[0].asks[0].order_id, 0);
936        assert_eq!(depths[0].bid_counts[0], 1);
937        assert_eq!(depths[0].ask_counts[0], 1);
938        assert_eq!(depths[0].flags, 128);
939        assert_eq!(depths[0].ts_event, 1598918403696000000);
940        assert_eq!(depths[0].ts_init, 1598918403810979000);
941        assert_eq!(depths[0].sequence, 0);
942    }
943
944    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
945    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
946    #[rstest]
947    #[case(Some(2), Some(3))] // Explicit precisions
948    #[case(None, None)] // Inferred precisions
949    pub fn test_read_depth10s_from_snapshot25(
950        #[case] price_precision: Option<u8>,
951        #[case] size_precision: Option<u8>,
952    ) {
953        let filepath = ensure_data_exists_tardis_binance_snapshot25();
954        let depths = load_depth10_from_snapshot25(
955            filepath,
956            price_precision,
957            size_precision,
958            None,
959            Some(10_000),
960        )
961        .unwrap();
962
963        assert_eq!(depths.len(), 10_000);
964        assert_eq!(
965            depths[0].instrument_id,
966            InstrumentId::from("BTCUSDT.BINANCE")
967        );
968        assert_eq!(depths[0].bids.len(), 10);
969        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
970        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
971        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
972        assert_eq!(depths[0].bids[0].order_id, 0);
973        assert_eq!(depths[0].asks.len(), 10);
974        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
975        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
976        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
977        assert_eq!(depths[0].asks[0].order_id, 0);
978        assert_eq!(depths[0].bid_counts[0], 1);
979        assert_eq!(depths[0].ask_counts[0], 1);
980        assert_eq!(depths[0].flags, 128);
981        assert_eq!(depths[0].ts_event, 1598918403696000000);
982        assert_eq!(depths[0].ts_init, 1598918403810979000);
983        assert_eq!(depths[0].sequence, 0);
984    }
985
986    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
987    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
988    #[rstest]
989    #[case(Some(1), Some(0))] // Explicit precisions
990    #[case(None, None)] // Inferred precisions
991    pub fn test_read_quotes(
992        #[case] price_precision: Option<u8>,
993        #[case] size_precision: Option<u8>,
994    ) {
995        let filepath = ensure_data_exists_tardis_huobi_quotes();
996        let quotes = load_quote_ticks(
997            filepath,
998            price_precision,
999            size_precision,
1000            None,
1001            Some(10_000),
1002        )
1003        .unwrap();
1004
1005        assert_eq!(quotes.len(), 10_000);
1006        assert_eq!(
1007            quotes[0].instrument_id,
1008            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
1009        );
1010        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
1011        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
1012        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
1013        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
1014        assert_eq!(quotes[0].ts_event, 1588291201099000000);
1015        assert_eq!(quotes[0].ts_init, 1588291201234268000);
1016    }
1017
1018    // TODO: Flakey in CI, potentially to do with syncing large test data files from cache
1019    #[ignore = "Flakey test: called `Result::unwrap()` on an `Err` value: Error(Io(Kind(UnexpectedEof)))"]
1020    #[rstest]
1021    #[case(Some(1), Some(0))] // Explicit precisions
1022    #[case(None, None)] // Inferred precisions
1023    pub fn test_read_trades(
1024        #[case] price_precision: Option<u8>,
1025        #[case] size_precision: Option<u8>,
1026    ) {
1027        let filepath = ensure_data_exists_tardis_bitmex_trades();
1028        let trades = load_trade_ticks(
1029            filepath,
1030            price_precision,
1031            size_precision,
1032            None,
1033            Some(10_000),
1034        )
1035        .unwrap();
1036
1037        assert_eq!(trades.len(), 10_000);
1038        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1039        assert_eq!(trades[0].price, Price::from("8531.5"));
1040        assert_eq!(trades[0].size, Quantity::from("2152"));
1041        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
1042        assert_eq!(
1043            trades[0].trade_id,
1044            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
1045        );
1046        assert_eq!(trades[0].ts_event, 1583020803145000000);
1047        assert_eq!(trades[0].ts_init, 1583020803307160000);
1048    }
1049}