nautilus_tardis/
replay.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    path::{Path, PathBuf},
20    sync::OnceLock,
21};
22
23use anyhow::Context;
24use arrow::array::RecordBatch;
25use chrono::{DateTime, Duration, NaiveDate};
26use futures_util::{StreamExt, future::join_all, pin_mut};
27use heck::ToSnakeCase;
28use nautilus_core::{UnixNanos, parsing::precision_from_str};
29use nautilus_model::{
30    data::{
31        Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
32        TradeTick,
33    },
34    identifiers::InstrumentId,
35};
36use nautilus_persistence::parquet::write_batch_to_parquet;
37use nautilus_serialization::arrow::{
38    bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39    book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
40    trades_to_arrow_record_batch_bytes,
41};
42use thousands::Separable;
43use ustr::Ustr;
44
45use super::{enums::Exchange, http::models::InstrumentInfo};
46use crate::{
47    config::TardisReplayConfig,
48    http::TardisHttpClient,
49    machine::{TardisMachineClient, types::InstrumentMiniInfo},
50    parse::{normalize_instrument_id, parse_instrument_id},
51};
52
53static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
54
55/// Retrieves a reference to a globally shared Tokio runtime.
56/// The runtime is lazily initialized on the first call and reused thereafter.
57///
58/// # Panics
59///
60/// Panics if the runtime could not be created, which typically indicates
61/// an inability to spawn threads or allocate necessary resources.
62pub fn get_runtime() -> &'static tokio::runtime::Runtime {
63    RUNTIME
64        .get_or_init(|| tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime"))
65}
66
67struct DateCursor {
68    /// Cursor date UTC.
69    date_utc: NaiveDate,
70    /// Cursor end timestamp UNIX nanoseconds.
71    end_ns: UnixNanos,
72}
73
74impl DateCursor {
75    /// Creates a new [`DateCursor`] instance.
76    fn new(current_ns: UnixNanos) -> Self {
77        let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
78        let date_utc = current_utc.date_naive();
79
80        // Calculate end of the current UTC day
81        // SAFETY: Known safe input values
82        let end_utc =
83            date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
84        let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
85
86        Self { date_utc, end_ns }
87    }
88}
89
90async fn gather_instruments_info(
91    config: &TardisReplayConfig,
92    http_client: &TardisHttpClient,
93) -> HashMap<Exchange, Vec<InstrumentInfo>> {
94    let futures = config.options.iter().map(|options| {
95        let exchange = options.exchange.clone();
96        let client = &http_client;
97
98        tracing::info!("Requesting instruments for {exchange}");
99
100        async move {
101            match client.instruments_info(exchange.clone(), None, None).await {
102                Ok(instruments) => Some((exchange, instruments)),
103                Err(e) => {
104                    tracing::error!("Error fetching instruments for {exchange}: {e}");
105                    None
106                }
107            }
108        }
109    });
110
111    let results: Vec<(Exchange, Vec<InstrumentInfo>)> =
112        join_all(futures).await.into_iter().flatten().collect();
113
114    tracing::info!("Received all instruments");
115
116    results.into_iter().collect()
117}
118
119/// Run the Tardis Machine replay from a JSON configuration file.
120///
121/// # Errors
122///
123/// Returns an error if reading or parsing the config file fails,
124/// or if any downstream replay operation fails.
125/// Run the Tardis Machine replay from a JSON configuration file.
126///
127/// # Panics
128///
129/// Panics if unable to determine the output path (current directory fallback fails).
130pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
131    tracing::info!("Starting replay");
132    tracing::info!("Config filepath: {config_filepath:?}");
133
134    // Load and parse the replay configuration
135    let config_data = fs::read_to_string(config_filepath)
136        .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
137    let config: TardisReplayConfig = serde_json::from_str(&config_data)
138        .context("Failed to parse config JSON into TardisReplayConfig")?;
139
140    let path = config
141        .output_path
142        .as_deref()
143        .map(Path::new)
144        .map(Path::to_path_buf)
145        .or_else(|| {
146            std::env::var("NAUTILUS_CATALOG_PATH")
147                .ok()
148                .map(|env_path| PathBuf::from(env_path).join("data"))
149        })
150        .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
151
152    tracing::info!("Output path: {path:?}");
153
154    let normalize_symbols = config.normalize_symbols.unwrap_or(true);
155    tracing::info!("normalize_symbols={normalize_symbols}");
156
157    let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
158    let mut machine_client =
159        TardisMachineClient::new(config.tardis_ws_url.as_deref(), normalize_symbols)?;
160
161    let info_map = gather_instruments_info(&config, &http_client).await;
162
163    for (exchange, instruments) in &info_map {
164        for inst in instruments {
165            let instrument_type = inst.instrument_type.clone();
166            let price_precision = precision_from_str(&inst.price_increment.to_string());
167            let size_precision = precision_from_str(&inst.amount_increment.to_string());
168
169            let instrument_id = if normalize_symbols {
170                normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
171            } else {
172                parse_instrument_id(exchange, inst.id)
173            };
174
175            let info = InstrumentMiniInfo::new(
176                instrument_id,
177                Some(Ustr::from(&inst.id)),
178                exchange.clone(),
179                price_precision,
180                size_precision,
181            );
182            machine_client.add_instrument_info(info);
183        }
184    }
185
186    tracing::info!("Starting tardis-machine stream");
187    let stream = machine_client.replay(config.options).await;
188    pin_mut!(stream);
189
190    // Initialize date cursors
191    let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
192    let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
193    let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
194    let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
195    let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
196
197    // Initialize date collection maps
198    let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
199    let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
200    let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
201    let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
202    let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
203
204    let mut msg_count = 0;
205
206    while let Some(msg) = stream.next().await {
207        match msg {
208            Data::Deltas(msg) => {
209                handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
210            }
211            Data::Depth10(msg) => {
212                handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
213            }
214            Data::Quote(msg) => handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path),
215            Data::Trade(msg) => handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path),
216            Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
217            Data::Delta(_) => panic!("Individual delta message not implemented (or required)"),
218            _ => panic!("Not implemented"),
219        }
220
221        msg_count += 1;
222        if msg_count % 100_000 == 0 {
223            tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
224        }
225    }
226
227    // Iterate through every remaining type and instrument sequentially
228
229    for (instrument_id, deltas) in deltas_map {
230        let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
231        batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
232    }
233
234    for (instrument_id, depths) in depths_map {
235        let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
236        batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
237    }
238
239    for (instrument_id, quotes) in quotes_map {
240        let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
241        batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
242    }
243
244    for (instrument_id, trades) in trades_map {
245        let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
246        batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
247    }
248
249    for (bar_type, bars) in bars_map {
250        let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
251        batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
252    }
253
254    tracing::info!(
255        "Replay completed after {} messages",
256        msg_count.separate_with_commas()
257    );
258    Ok(())
259}
260
261fn handle_deltas_msg(
262    deltas: OrderBookDeltas_API,
263    map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
264    cursors: &mut HashMap<InstrumentId, DateCursor>,
265    path: &Path,
266) {
267    let cursor = cursors
268        .entry(deltas.instrument_id)
269        .or_insert_with(|| DateCursor::new(deltas.ts_init));
270
271    if deltas.ts_init > cursor.end_ns {
272        if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
273            batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
274        }
275        // Update cursor
276        *cursor = DateCursor::new(deltas.ts_init);
277    }
278
279    map.entry(deltas.instrument_id)
280        .or_insert_with(|| Vec::with_capacity(1_000_000))
281        .extend(&*deltas.deltas);
282}
283
284fn handle_depth10_msg(
285    depth10: OrderBookDepth10,
286    map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
287    cursors: &mut HashMap<InstrumentId, DateCursor>,
288    path: &Path,
289) {
290    let cursor = cursors
291        .entry(depth10.instrument_id)
292        .or_insert_with(|| DateCursor::new(depth10.ts_init));
293
294    if depth10.ts_init > cursor.end_ns {
295        if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
296            batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
297        }
298        // Update cursor
299        *cursor = DateCursor::new(depth10.ts_init);
300    }
301
302    map.entry(depth10.instrument_id)
303        .or_insert_with(|| Vec::with_capacity(1_000_000))
304        .push(depth10);
305}
306
307fn handle_quote_msg(
308    quote: QuoteTick,
309    map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
310    cursors: &mut HashMap<InstrumentId, DateCursor>,
311    path: &Path,
312) {
313    let cursor = cursors
314        .entry(quote.instrument_id)
315        .or_insert_with(|| DateCursor::new(quote.ts_init));
316
317    if quote.ts_init > cursor.end_ns {
318        if let Some(quotes_vec) = map.remove(&quote.instrument_id) {
319            batch_and_write_quotes(quotes_vec, &quote.instrument_id, cursor.date_utc, path);
320        }
321        // Update cursor
322        *cursor = DateCursor::new(quote.ts_init);
323    }
324
325    map.entry(quote.instrument_id)
326        .or_insert_with(|| Vec::with_capacity(1_000_000))
327        .push(quote);
328}
329
330fn handle_trade_msg(
331    trade: TradeTick,
332    map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
333    cursors: &mut HashMap<InstrumentId, DateCursor>,
334    path: &Path,
335) {
336    let cursor = cursors
337        .entry(trade.instrument_id)
338        .or_insert_with(|| DateCursor::new(trade.ts_init));
339
340    if trade.ts_init > cursor.end_ns {
341        if let Some(trades_vec) = map.remove(&trade.instrument_id) {
342            batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
343        }
344        // Update cursor
345        *cursor = DateCursor::new(trade.ts_init);
346    }
347
348    map.entry(trade.instrument_id)
349        .or_insert_with(|| Vec::with_capacity(1_000_000))
350        .push(trade);
351}
352
353fn handle_bar_msg(
354    bar: Bar,
355    map: &mut HashMap<BarType, Vec<Bar>>,
356    cursors: &mut HashMap<BarType, DateCursor>,
357    path: &Path,
358) {
359    let cursor = cursors
360        .entry(bar.bar_type)
361        .or_insert_with(|| DateCursor::new(bar.ts_init));
362
363    if bar.ts_init > cursor.end_ns {
364        if let Some(bars_vec) = map.remove(&bar.bar_type) {
365            batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
366        }
367        // Update cursor
368        *cursor = DateCursor::new(bar.ts_init);
369    }
370
371    map.entry(bar.bar_type)
372        .or_insert_with(|| Vec::with_capacity(1_000_000))
373        .push(bar);
374}
375
376fn batch_and_write_deltas(
377    deltas: Vec<OrderBookDelta>,
378    instrument_id: &InstrumentId,
379    date: NaiveDate,
380    path: &Path,
381) {
382    let typename = stringify!(OrderBookDeltas);
383    match book_deltas_to_arrow_record_batch_bytes(deltas) {
384        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
385        Err(e) => {
386            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
387        }
388    }
389}
390
391fn batch_and_write_depths(
392    depths: Vec<OrderBookDepth10>,
393    instrument_id: &InstrumentId,
394    date: NaiveDate,
395    path: &Path,
396) {
397    let typename = stringify!(OrderBookDepth10);
398    match book_depth10_to_arrow_record_batch_bytes(depths) {
399        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
400        Err(e) => {
401            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
402        }
403    }
404}
405
406fn batch_and_write_quotes(
407    quotes: Vec<QuoteTick>,
408    instrument_id: &InstrumentId,
409    date: NaiveDate,
410    path: &Path,
411) {
412    let typename = stringify!(QuoteTick);
413    match quotes_to_arrow_record_batch_bytes(quotes) {
414        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
415        Err(e) => {
416            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
417        }
418    }
419}
420
421fn batch_and_write_trades(
422    trades: Vec<TradeTick>,
423    instrument_id: &InstrumentId,
424    date: NaiveDate,
425    path: &Path,
426) {
427    let typename = stringify!(TradeTick);
428    match trades_to_arrow_record_batch_bytes(trades) {
429        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
430        Err(e) => {
431            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
432        }
433    }
434}
435
436fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
437    let typename = stringify!(Bar);
438    let batch = match bars_to_arrow_record_batch_bytes(bars) {
439        Ok(batch) => batch,
440        Err(e) => {
441            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
442            return;
443        }
444    };
445
446    let filepath = path.join(parquet_filepath_bars(bar_type, date));
447    let filepath_str = filepath.to_string_lossy();
448
449    let rt = tokio::runtime::Runtime::new().unwrap();
450    match rt.block_on(write_batch_to_parquet(
451        batch,
452        &filepath_str,
453        None,
454        None,
455        None,
456    )) {
457        Ok(()) => tracing::info!("File written: {filepath:?}"),
458        Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
459    }
460}
461
462fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
463    let typename = typename.to_snake_case();
464    let instrument_id_str = instrument_id.to_string().replace('/', "");
465    let date_str = date.to_string().replace('-', "");
466    PathBuf::new()
467        .join(typename)
468        .join(instrument_id_str)
469        .join(format!("{date_str}.parquet"))
470}
471
472fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
473    let bar_type_str = bar_type.to_string().replace('/', "");
474    let date_str = date.to_string().replace('-', "");
475    PathBuf::new()
476        .join("bar")
477        .join(bar_type_str)
478        .join(format!("{date_str}.parquet"))
479}
480
481fn write_batch(
482    batch: RecordBatch,
483    typename: &str,
484    instrument_id: &InstrumentId,
485    date: NaiveDate,
486    path: &Path,
487) {
488    let filepath = path.join(parquet_filepath(typename, instrument_id, date));
489    let filepath_str = filepath.to_string_lossy();
490
491    let rt = get_runtime();
492    match rt.block_on(write_batch_to_parquet(
493        batch,
494        &filepath_str,
495        None,
496        None,
497        None,
498    )) {
499        Ok(()) => tracing::info!("File written: {filepath:?}"),
500        Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
501    }
502}
503
504///////////////////////////////////////////////////////////////////////////////////////////////////
505// Tests
506///////////////////////////////////////////////////////////////////////////////////////////////////
507#[cfg(test)]
508mod tests {
509    use chrono::{TimeZone, Utc};
510    use rstest::rstest;
511
512    use super::*;
513
514    #[rstest]
515    #[case(
516    // Start of day: 2024-01-01 00:00:00 UTC
517    Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
518    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
519    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
520)]
521    #[case(
522    // Midday: 2024-01-01 12:00:00 UTC
523    Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
524    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
525    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
526)]
527    #[case(
528    // End of day: 2024-01-01 23:59:59.999999999 UTC
529    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
530    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
531    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
532)]
533    #[case(
534    // Start of new day: 2024-01-02 00:00:00 UTC
535    Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
536    NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
537    Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
538)]
539    fn test_date_cursor(
540        #[case] timestamp: u64,
541        #[case] expected_date: NaiveDate,
542        #[case] expected_end_ns: u64,
543    ) {
544        let unix_nanos = UnixNanos::from(timestamp);
545        let cursor = DateCursor::new(unix_nanos);
546
547        assert_eq!(cursor.date_utc, expected_date);
548        assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
549    }
550}