nautilus_persistence/backend/
catalog.rs1use std::{fmt::Debug, path::PathBuf};
17
18use datafusion::arrow::record_batch::RecordBatch;
19use heck::ToSnakeCase;
20use itertools::Itertools;
21use log::info;
22use nautilus_core::UnixNanos;
23use nautilus_model::data::{
24 Bar, Data, GetTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
25 QuoteTick, TradeTick, close::InstrumentClose,
26};
27use nautilus_serialization::{
28 arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch},
29 enums::ParquetWriteMode,
30 parquet::{combine_data_files, min_max_from_parquet_metadata, write_batches_to_parquet},
31};
32use serde::Serialize;
33
34use super::session::{self, DataBackendSession, QueryResult, build_query};
35
36pub struct ParquetDataCatalog {
37 base_path: PathBuf,
38 batch_size: usize,
39 session: DataBackendSession,
40}
41
42impl Debug for ParquetDataCatalog {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct(stringify!(ParquetDataCatalog))
45 .field("base_path", &self.base_path)
46 .finish()
47 }
48}
49
50impl ParquetDataCatalog {
51 #[must_use]
52 pub fn new(base_path: PathBuf, batch_size: Option<usize>) -> Self {
53 let batch_size = batch_size.unwrap_or(5000);
54 Self {
55 base_path,
56 batch_size,
57 session: session::DataBackendSession::new(batch_size),
58 }
59 }
60
61 pub fn write_data_enum(&self, data: Vec<Data>, write_mode: Option<ParquetWriteMode>) {
62 let mut deltas: Vec<OrderBookDelta> = Vec::new();
63 let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
64 let mut quotes: Vec<QuoteTick> = Vec::new();
65 let mut trades: Vec<TradeTick> = Vec::new();
66 let mut bars: Vec<Bar> = Vec::new();
67 let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
68 let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
69 let mut closes: Vec<InstrumentClose> = Vec::new();
70
71 for d in data.iter().cloned() {
72 match d {
73 Data::Deltas(_) => continue,
74 Data::Delta(d) => {
75 deltas.push(d);
76 }
77 Data::Depth10(d) => {
78 depth10s.push(*d);
79 }
80 Data::Quote(d) => {
81 quotes.push(d);
82 }
83 Data::Trade(d) => {
84 trades.push(d);
85 }
86 Data::Bar(d) => {
87 bars.push(d);
88 }
89 Data::MarkPriceUpdate(p) => {
90 mark_prices.push(p);
91 }
92 Data::IndexPriceUpdate(p) => {
93 index_prices.push(p);
94 }
95 Data::InstrumentClose(c) => {
96 closes.push(c);
97 }
98 }
99 }
100
101 let _ = self.write_to_parquet(deltas, None, None, None, write_mode);
102 let _ = self.write_to_parquet(depth10s, None, None, None, write_mode);
103 let _ = self.write_to_parquet(quotes, None, None, None, write_mode);
104 let _ = self.write_to_parquet(trades, None, None, None, write_mode);
105 let _ = self.write_to_parquet(bars, None, None, None, write_mode);
106 let _ = self.write_to_parquet(mark_prices, None, None, None, write_mode);
107 let _ = self.write_to_parquet(index_prices, None, None, None, write_mode);
108 let _ = self.write_to_parquet(closes, None, None, None, write_mode);
109 }
110
111 pub fn write_to_parquet<T>(
112 &self,
113 data: Vec<T>,
114 path: Option<PathBuf>,
115 compression: Option<parquet::basic::Compression>,
116 max_row_group_size: Option<usize>,
117 write_mode: Option<ParquetWriteMode>,
118 ) -> anyhow::Result<PathBuf>
119 where
120 T: GetTsInit + EncodeToRecordBatch + CatalogPathPrefix,
121 {
122 let type_name = std::any::type_name::<T>().to_snake_case();
123 Self::check_ascending_timestamps(&data, &type_name);
124 let batches = self.data_to_record_batches(data)?;
125 let schema = batches.first().expect("Batches are empty.").schema();
126 let instrument_id = schema.metadata.get("instrument_id").cloned();
127 let new_path = self.make_path(T::path_prefix(), instrument_id, write_mode)?;
128 let path = path.unwrap_or(new_path);
129
130 info!(
132 "Writing {} batches of {type_name} data to {path:?}",
133 batches.len()
134 );
135
136 write_batches_to_parquet(&batches, &path, compression, max_row_group_size, write_mode)?;
137
138 Ok(path)
139 }
140
141 fn check_ascending_timestamps<T: GetTsInit>(data: &[T], type_name: &str) {
142 assert!(
143 data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()),
144 "{type_name} timestamps must be in ascending order"
145 );
146 }
147
148 pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
149 where
150 T: GetTsInit + EncodeToRecordBatch,
151 {
152 let mut batches = Vec::new();
153
154 for chunk in &data.into_iter().chunks(self.batch_size) {
155 let data = chunk.collect_vec();
156 let metadata = EncodeToRecordBatch::chunk_metadata(&data);
157 let record_batch = T::encode_batch(&metadata, &data)?;
158 batches.push(record_batch);
159 }
160
161 Ok(batches)
162 }
163
164 fn make_path(
165 &self,
166 type_name: &str,
167 instrument_id: Option<String>,
168 write_mode: Option<ParquetWriteMode>,
169 ) -> anyhow::Result<PathBuf> {
170 let path = self.make_directory_path(type_name, instrument_id);
171 std::fs::create_dir_all(&path)?;
172 let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
173 let mut file_path = path.join("part-0.parquet");
174 let mut empty_path = file_path.clone();
175 let mut i = 0;
176
177 while empty_path.exists() {
178 i += 1;
179 let name = format!("part-{i}.parquet");
180 empty_path = path.join(name);
181 }
182
183 if i > 1 && used_write_mode != ParquetWriteMode::NewFile {
184 anyhow::bail!(
185 "Only ParquetWriteMode::NewFile is allowed for a directory containing several parquet files."
186 );
187 } else if used_write_mode == ParquetWriteMode::NewFile {
188 file_path = empty_path;
189 }
190
191 info!("Created directory path: {file_path:?}");
192
193 Ok(file_path)
194 }
195
196 fn make_directory_path(&self, type_name: &str, instrument_id: Option<String>) -> PathBuf {
197 let mut path = self.base_path.join("data").join(type_name);
198
199 if let Some(id) = instrument_id {
200 path = path.join(id.replace('/', "")); }
202
203 path
204 }
205
206 pub fn write_to_json<T>(
207 &self,
208 data: Vec<T>,
209 path: Option<PathBuf>,
210 write_metadata: bool,
211 ) -> anyhow::Result<PathBuf>
212 where
213 T: GetTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
214 {
215 let type_name = std::any::type_name::<T>().to_snake_case();
216 Self::check_ascending_timestamps(&data, &type_name);
217 let new_path = self.make_path(T::path_prefix(), None, None)?;
218 let json_path = path.unwrap_or(new_path.with_extension("json"));
219
220 info!(
221 "Writing {} records of {type_name} data to {json_path:?}",
222 data.len(),
223 );
224
225 if write_metadata {
226 let metadata = T::chunk_metadata(&data);
227 let metadata_path = json_path.with_extension("metadata.json");
228 info!("Writing metadata to {metadata_path:?}");
229 let metadata_file = std::fs::File::create(&metadata_path)?;
230 serde_json::to_writer_pretty(metadata_file, &metadata)?;
231 }
232
233 let file = std::fs::File::create(&json_path)?;
234 serde_json::to_writer_pretty(file, &serde_json::to_value(data)?)?;
235
236 Ok(json_path)
237 }
238
239 pub fn consolidate_data(
240 &self,
241 type_name: &str,
242 instrument_id: Option<String>,
243 ) -> anyhow::Result<()> {
244 let parquet_files = self.query_parquet_files(type_name, instrument_id)?;
245
246 if !parquet_files.is_empty() {
247 combine_data_files(parquet_files, "ts_init", None, None)?;
248 }
249
250 Ok(())
251 }
252
253 pub fn consolidate_catalog(&self) -> anyhow::Result<()> {
254 let leaf_directories = self.find_leaf_data_directories()?;
255
256 for directory in leaf_directories {
257 let parquet_files: Vec<PathBuf> = std::fs::read_dir(directory)?
258 .filter_map(|entry| {
259 let path = entry.ok()?.path();
260
261 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
262 Some(path)
263 } else {
264 None
265 }
266 })
267 .collect();
268
269 if !parquet_files.is_empty() {
270 combine_data_files(parquet_files, "ts_init", None, None)?;
271 }
272 }
273
274 Ok(())
275 }
276
277 pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<PathBuf>> {
278 let mut all_paths: Vec<PathBuf> = Vec::new();
279 let data_dir = self.base_path.join("data");
280
281 for entry in walkdir::WalkDir::new(data_dir) {
282 all_paths.push(entry?.path().to_path_buf());
283 }
284
285 let all_dirs = all_paths
286 .iter()
287 .filter(|p| p.is_dir())
288 .cloned()
289 .collect::<Vec<PathBuf>>();
290 let mut leaf_dirs = Vec::new();
291
292 for directory in all_dirs {
293 let items = std::fs::read_dir(&directory)?;
294 let has_subdirs = items.into_iter().any(|entry| {
295 let entry = entry.unwrap();
296 entry.path().is_dir()
297 });
298 let has_files = std::fs::read_dir(&directory)?.any(|entry| {
299 let entry = entry.unwrap();
300 entry.path().is_file()
301 });
302
303 if has_files && !has_subdirs {
304 leaf_dirs.push(directory);
305 }
306 }
307
308 Ok(leaf_dirs)
309 }
310
311 pub fn query_file<T>(
313 &mut self,
314 path: PathBuf,
315 start: Option<UnixNanos>,
316 end: Option<UnixNanos>,
317 where_clause: Option<&str>,
318 ) -> anyhow::Result<QueryResult>
319 where
320 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
321 {
322 let path_str = path.to_str().expect("Failed to convert path to string");
323 let table_name = path
324 .file_stem()
325 .unwrap()
326 .to_str()
327 .expect("Failed to convert path to string");
328 let query = build_query(table_name, start, end, where_clause);
329 self.session
330 .add_file::<T>(table_name, path_str, Some(&query))?;
331
332 Ok(self.session.get_query_result())
333 }
334
335 pub fn query_directory<T>(
337 &mut self,
338 instrument_ids: Vec<String>,
339 start: Option<UnixNanos>,
340 end: Option<UnixNanos>,
341 where_clause: Option<&str>,
342 ) -> anyhow::Result<QueryResult>
343 where
344 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
345 {
346 let mut paths = Vec::new();
347
348 for instrument_id in instrument_ids {
349 paths.extend(self.query_parquet_files(T::path_prefix(), Some(instrument_id))?);
350 }
351
352 if paths.is_empty() {
354 paths.push(self.make_path(T::path_prefix(), None, None)?);
355 }
356
357 for path in &paths {
358 let path = path.to_str().expect("Failed to convert path to string");
359 let query = build_query(path, start, end, where_clause);
360 self.session.add_file::<T>(path, path, Some(&query))?;
361 }
362
363 Ok(self.session.get_query_result())
364 }
365
366 #[allow(dead_code)]
367 pub fn query_timestamp_bound(
368 &self,
369 data_cls: &str,
370 instrument_id: Option<String>,
371 is_last: Option<bool>,
372 ) -> anyhow::Result<Option<i64>> {
373 let is_last = is_last.unwrap_or(true);
374 let parquet_files = self.query_parquet_files(data_cls, instrument_id)?;
375
376 if parquet_files.is_empty() {
377 return Ok(None);
378 }
379
380 let min_max_per_file: Vec<(i64, i64)> = parquet_files
381 .iter()
382 .map(|file| min_max_from_parquet_metadata(file, "ts_init"))
383 .collect::<Result<Vec<_>, _>>()?;
384 let mut timestamps: Vec<i64> = Vec::new();
385
386 for min_max in min_max_per_file {
387 let (min, max) = min_max;
388
389 if is_last {
390 timestamps.push(max);
391 } else {
392 timestamps.push(min);
393 }
394 }
395
396 if timestamps.is_empty() {
397 return Ok(None);
398 }
399
400 if is_last {
401 Ok(timestamps.iter().max().copied())
402 } else {
403 Ok(timestamps.iter().min().copied())
404 }
405 }
406
407 pub fn query_parquet_files(
408 &self,
409 type_name: &str,
410 instrument_id: Option<String>,
411 ) -> anyhow::Result<Vec<PathBuf>> {
412 let path = self.make_directory_path(type_name, instrument_id);
413 let mut files = Vec::new();
414
415 if path.exists() {
416 for entry in std::fs::read_dir(path)? {
417 let path = entry?.path();
418 if path.is_file() && path.extension().unwrap() == "parquet" {
419 files.push(path);
420 }
421 }
422 }
423
424 Ok(files)
425 }
426}
427
428pub trait CatalogPathPrefix {
429 fn path_prefix() -> &'static str;
430}
431
432macro_rules! impl_catalog_path_prefix {
433 ($type:ty, $path:expr) => {
434 impl CatalogPathPrefix for $type {
435 fn path_prefix() -> &'static str {
436 $path
437 }
438 }
439 };
440}
441
442impl_catalog_path_prefix!(QuoteTick, "quotes");
443impl_catalog_path_prefix!(TradeTick, "trades");
444impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
445impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
446impl_catalog_path_prefix!(Bar, "bars");
447impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
448impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
449impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");