1use std::{
17 collections::HashMap,
18 fs,
19 path::{Path, PathBuf},
20 sync::OnceLock,
21};
22
23use anyhow::Context;
24use arrow::array::RecordBatch;
25use chrono::{DateTime, Duration, NaiveDate};
26use futures_util::{StreamExt, future::join_all, pin_mut};
27use heck::ToSnakeCase;
28use nautilus_core::{UnixNanos, parsing::precision_from_str};
29use nautilus_model::{
30 data::{
31 Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
32 TradeTick,
33 },
34 identifiers::InstrumentId,
35};
36use nautilus_persistence::parquet::write_batch_to_parquet;
37use nautilus_serialization::arrow::{
38 bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39 book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
40 trades_to_arrow_record_batch_bytes,
41};
42use thousands::Separable;
43use ustr::Ustr;
44
45use super::{enums::Exchange, http::models::InstrumentInfo};
46use crate::{
47 config::TardisReplayConfig,
48 http::TardisHttpClient,
49 machine::{TardisMachineClient, types::InstrumentMiniInfo},
50 parse::{normalize_instrument_id, parse_instrument_id},
51};
52
53static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
54
55pub fn get_runtime() -> &'static tokio::runtime::Runtime {
63 RUNTIME
64 .get_or_init(|| tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime"))
65}
66
67struct DateCursor {
68 date_utc: NaiveDate,
70 end_ns: UnixNanos,
72}
73
74impl DateCursor {
75 fn new(current_ns: UnixNanos) -> Self {
77 let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
78 let date_utc = current_utc.date_naive();
79
80 let end_utc =
83 date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
84 let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
85
86 Self { date_utc, end_ns }
87 }
88}
89
90async fn gather_instruments_info(
91 config: &TardisReplayConfig,
92 http_client: &TardisHttpClient,
93) -> HashMap<Exchange, Vec<InstrumentInfo>> {
94 let futures = config.options.iter().map(|options| {
95 let exchange = options.exchange.clone();
96 let client = &http_client;
97
98 tracing::info!("Requesting instruments for {exchange}");
99
100 async move {
101 match client.instruments_info(exchange.clone(), None, None).await {
102 Ok(instruments) => Some((exchange, instruments)),
103 Err(e) => {
104 tracing::error!("Error fetching instruments for {exchange}: {e}");
105 None
106 }
107 }
108 }
109 });
110
111 let results: Vec<(Exchange, Vec<InstrumentInfo>)> =
112 join_all(futures).await.into_iter().flatten().collect();
113
114 tracing::info!("Received all instruments");
115
116 results.into_iter().collect()
117}
118
119pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
131 tracing::info!("Starting replay");
132 tracing::info!("Config filepath: {config_filepath:?}");
133
134 let config_data = fs::read_to_string(config_filepath)
136 .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
137 let config: TardisReplayConfig = serde_json::from_str(&config_data)
138 .context("Failed to parse config JSON into TardisReplayConfig")?;
139
140 let path = config
141 .output_path
142 .as_deref()
143 .map(Path::new)
144 .map(Path::to_path_buf)
145 .or_else(|| {
146 std::env::var("NAUTILUS_CATALOG_PATH")
147 .ok()
148 .map(|env_path| PathBuf::from(env_path).join("data"))
149 })
150 .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
151
152 tracing::info!("Output path: {path:?}");
153
154 let normalize_symbols = config.normalize_symbols.unwrap_or(true);
155 tracing::info!("normalize_symbols={normalize_symbols}");
156
157 let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
158 let mut machine_client =
159 TardisMachineClient::new(config.tardis_ws_url.as_deref(), normalize_symbols)?;
160
161 let info_map = gather_instruments_info(&config, &http_client).await;
162
163 for (exchange, instruments) in &info_map {
164 for inst in instruments {
165 let instrument_type = inst.instrument_type.clone();
166 let price_precision = precision_from_str(&inst.price_increment.to_string());
167 let size_precision = precision_from_str(&inst.amount_increment.to_string());
168
169 let instrument_id = if normalize_symbols {
170 normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
171 } else {
172 parse_instrument_id(exchange, inst.id)
173 };
174
175 let info = InstrumentMiniInfo::new(
176 instrument_id,
177 Some(Ustr::from(&inst.id)),
178 exchange.clone(),
179 price_precision,
180 size_precision,
181 );
182 machine_client.add_instrument_info(info);
183 }
184 }
185
186 tracing::info!("Starting tardis-machine stream");
187 let stream = machine_client.replay(config.options).await;
188 pin_mut!(stream);
189
190 let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
192 let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
193 let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
194 let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
195 let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
196
197 let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
199 let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
200 let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
201 let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
202 let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
203
204 let mut msg_count = 0;
205
206 while let Some(msg) = stream.next().await {
207 match msg {
208 Data::Deltas(msg) => {
209 handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
210 }
211 Data::Depth10(msg) => {
212 handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
213 }
214 Data::Quote(msg) => handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path),
215 Data::Trade(msg) => handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path),
216 Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
217 Data::Delta(_) => panic!("Individual delta message not implemented (or required)"),
218 _ => panic!("Not implemented"),
219 }
220
221 msg_count += 1;
222 if msg_count % 100_000 == 0 {
223 tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
224 }
225 }
226
227 for (instrument_id, deltas) in deltas_map {
230 let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
231 batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
232 }
233
234 for (instrument_id, depths) in depths_map {
235 let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
236 batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
237 }
238
239 for (instrument_id, quotes) in quotes_map {
240 let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
241 batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
242 }
243
244 for (instrument_id, trades) in trades_map {
245 let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
246 batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
247 }
248
249 for (bar_type, bars) in bars_map {
250 let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
251 batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
252 }
253
254 tracing::info!(
255 "Replay completed after {} messages",
256 msg_count.separate_with_commas()
257 );
258 Ok(())
259}
260
261fn handle_deltas_msg(
262 deltas: OrderBookDeltas_API,
263 map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
264 cursors: &mut HashMap<InstrumentId, DateCursor>,
265 path: &Path,
266) {
267 let cursor = cursors
268 .entry(deltas.instrument_id)
269 .or_insert_with(|| DateCursor::new(deltas.ts_init));
270
271 if deltas.ts_init > cursor.end_ns {
272 if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
273 batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
274 }
275 *cursor = DateCursor::new(deltas.ts_init);
277 }
278
279 map.entry(deltas.instrument_id)
280 .or_insert_with(|| Vec::with_capacity(1_000_000))
281 .extend(&*deltas.deltas);
282}
283
284fn handle_depth10_msg(
285 depth10: OrderBookDepth10,
286 map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
287 cursors: &mut HashMap<InstrumentId, DateCursor>,
288 path: &Path,
289) {
290 let cursor = cursors
291 .entry(depth10.instrument_id)
292 .or_insert_with(|| DateCursor::new(depth10.ts_init));
293
294 if depth10.ts_init > cursor.end_ns {
295 if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
296 batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
297 }
298 *cursor = DateCursor::new(depth10.ts_init);
300 }
301
302 map.entry(depth10.instrument_id)
303 .or_insert_with(|| Vec::with_capacity(1_000_000))
304 .push(depth10);
305}
306
307fn handle_quote_msg(
308 quote: QuoteTick,
309 map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
310 cursors: &mut HashMap<InstrumentId, DateCursor>,
311 path: &Path,
312) {
313 let cursor = cursors
314 .entry(quote.instrument_id)
315 .or_insert_with(|| DateCursor::new(quote.ts_init));
316
317 if quote.ts_init > cursor.end_ns {
318 if let Some(quotes_vec) = map.remove("e.instrument_id) {
319 batch_and_write_quotes(quotes_vec, "e.instrument_id, cursor.date_utc, path);
320 }
321 *cursor = DateCursor::new(quote.ts_init);
323 }
324
325 map.entry(quote.instrument_id)
326 .or_insert_with(|| Vec::with_capacity(1_000_000))
327 .push(quote);
328}
329
330fn handle_trade_msg(
331 trade: TradeTick,
332 map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
333 cursors: &mut HashMap<InstrumentId, DateCursor>,
334 path: &Path,
335) {
336 let cursor = cursors
337 .entry(trade.instrument_id)
338 .or_insert_with(|| DateCursor::new(trade.ts_init));
339
340 if trade.ts_init > cursor.end_ns {
341 if let Some(trades_vec) = map.remove(&trade.instrument_id) {
342 batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
343 }
344 *cursor = DateCursor::new(trade.ts_init);
346 }
347
348 map.entry(trade.instrument_id)
349 .or_insert_with(|| Vec::with_capacity(1_000_000))
350 .push(trade);
351}
352
353fn handle_bar_msg(
354 bar: Bar,
355 map: &mut HashMap<BarType, Vec<Bar>>,
356 cursors: &mut HashMap<BarType, DateCursor>,
357 path: &Path,
358) {
359 let cursor = cursors
360 .entry(bar.bar_type)
361 .or_insert_with(|| DateCursor::new(bar.ts_init));
362
363 if bar.ts_init > cursor.end_ns {
364 if let Some(bars_vec) = map.remove(&bar.bar_type) {
365 batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
366 }
367 *cursor = DateCursor::new(bar.ts_init);
369 }
370
371 map.entry(bar.bar_type)
372 .or_insert_with(|| Vec::with_capacity(1_000_000))
373 .push(bar);
374}
375
376fn batch_and_write_deltas(
377 deltas: Vec<OrderBookDelta>,
378 instrument_id: &InstrumentId,
379 date: NaiveDate,
380 path: &Path,
381) {
382 let typename = stringify!(OrderBookDeltas);
383 match book_deltas_to_arrow_record_batch_bytes(deltas) {
384 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
385 Err(e) => {
386 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
387 }
388 }
389}
390
391fn batch_and_write_depths(
392 depths: Vec<OrderBookDepth10>,
393 instrument_id: &InstrumentId,
394 date: NaiveDate,
395 path: &Path,
396) {
397 let typename = stringify!(OrderBookDepth10);
398 match book_depth10_to_arrow_record_batch_bytes(depths) {
399 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
400 Err(e) => {
401 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
402 }
403 }
404}
405
406fn batch_and_write_quotes(
407 quotes: Vec<QuoteTick>,
408 instrument_id: &InstrumentId,
409 date: NaiveDate,
410 path: &Path,
411) {
412 let typename = stringify!(QuoteTick);
413 match quotes_to_arrow_record_batch_bytes(quotes) {
414 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
415 Err(e) => {
416 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
417 }
418 }
419}
420
421fn batch_and_write_trades(
422 trades: Vec<TradeTick>,
423 instrument_id: &InstrumentId,
424 date: NaiveDate,
425 path: &Path,
426) {
427 let typename = stringify!(TradeTick);
428 match trades_to_arrow_record_batch_bytes(trades) {
429 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
430 Err(e) => {
431 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
432 }
433 }
434}
435
436fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
437 let typename = stringify!(Bar);
438 let batch = match bars_to_arrow_record_batch_bytes(bars) {
439 Ok(batch) => batch,
440 Err(e) => {
441 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
442 return;
443 }
444 };
445
446 let filepath = path.join(parquet_filepath_bars(bar_type, date));
447 let filepath_str = filepath.to_string_lossy();
448
449 let rt = tokio::runtime::Runtime::new().unwrap();
450 match rt.block_on(write_batch_to_parquet(
451 batch,
452 &filepath_str,
453 None,
454 None,
455 None,
456 )) {
457 Ok(()) => tracing::info!("File written: {filepath:?}"),
458 Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
459 }
460}
461
462fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
463 let typename = typename.to_snake_case();
464 let instrument_id_str = instrument_id.to_string().replace('/', "");
465 let date_str = date.to_string().replace('-', "");
466 PathBuf::new()
467 .join(typename)
468 .join(instrument_id_str)
469 .join(format!("{date_str}.parquet"))
470}
471
472fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
473 let bar_type_str = bar_type.to_string().replace('/', "");
474 let date_str = date.to_string().replace('-', "");
475 PathBuf::new()
476 .join("bar")
477 .join(bar_type_str)
478 .join(format!("{date_str}.parquet"))
479}
480
481fn write_batch(
482 batch: RecordBatch,
483 typename: &str,
484 instrument_id: &InstrumentId,
485 date: NaiveDate,
486 path: &Path,
487) {
488 let filepath = path.join(parquet_filepath(typename, instrument_id, date));
489 let filepath_str = filepath.to_string_lossy();
490
491 let rt = get_runtime();
492 match rt.block_on(write_batch_to_parquet(
493 batch,
494 &filepath_str,
495 None,
496 None,
497 None,
498 )) {
499 Ok(()) => tracing::info!("File written: {filepath:?}"),
500 Err(e) => tracing::error!("Error writing {filepath:?}: {e:?}"),
501 }
502}
503
504#[cfg(test)]
508mod tests {
509 use chrono::{TimeZone, Utc};
510 use rstest::rstest;
511
512 use super::*;
513
514 #[rstest]
515 #[case(
516 Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
518 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
519 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
520)]
521 #[case(
522 Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
524 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
525 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
526)]
527 #[case(
528 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
530 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
531 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
532)]
533 #[case(
534 Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
536 NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
537 Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
538)]
539 fn test_date_cursor(
540 #[case] timestamp: u64,
541 #[case] expected_date: NaiveDate,
542 #[case] expected_end_ns: u64,
543 ) {
544 let unix_nanos = UnixNanos::from(timestamp);
545 let cursor = DateCursor::new(unix_nanos);
546
547 assert_eq!(cursor.date_utc, expected_date);
548 assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
549 }
550}