nautilus_persistence/python/backend/
session.rs1use nautilus_core::{
17 ffi::cvec::CVec,
18 python::{IntoPyObjectPoseiExt, to_pyruntime_err},
19};
20use nautilus_model::data::{
21 Bar, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
22};
23use pyo3::{prelude::*, types::PyCapsule};
24
25use crate::backend::session::{DataBackendSession, DataQueryResult};
26
27#[repr(C)]
28#[pyclass(eq, eq_int)]
29#[derive(Clone, Copy, Debug, PartialEq, Eq)]
30pub enum PoseiDataType {
31 OrderBookDelta = 1,
33 OrderBookDepth10 = 2,
34 QuoteTick = 3,
35 TradeTick = 4,
36 Bar = 5,
37 MarkPriceUpdate = 6,
38}
39
40#[pymethods]
41impl DataBackendSession {
42 #[new]
43 #[pyo3(signature=(chunk_size=10_000))]
44 fn new_session(chunk_size: usize) -> Self {
45 Self::new(chunk_size)
46 }
47
48 #[pyo3(name = "add_file")]
62 #[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
63 fn add_file_py(
64 mut slf: PyRefMut<'_, Self>,
65 data_type: PoseiDataType,
66 table_name: &str,
67 file_path: &str,
68 sql_query: Option<&str>,
69 ) -> PyResult<()> {
70 let _guard = slf.runtime.enter();
71
72 match data_type {
73 PoseiDataType::OrderBookDelta => slf
74 .add_file::<OrderBookDelta>(table_name, file_path, sql_query)
75 .map_err(to_pyruntime_err),
76 PoseiDataType::OrderBookDepth10 => slf
77 .add_file::<OrderBookDepth10>(table_name, file_path, sql_query)
78 .map_err(to_pyruntime_err),
79 PoseiDataType::QuoteTick => slf
80 .add_file::<QuoteTick>(table_name, file_path, sql_query)
81 .map_err(to_pyruntime_err),
82 PoseiDataType::TradeTick => slf
83 .add_file::<TradeTick>(table_name, file_path, sql_query)
84 .map_err(to_pyruntime_err),
85 PoseiDataType::Bar => slf
86 .add_file::<Bar>(table_name, file_path, sql_query)
87 .map_err(to_pyruntime_err),
88 PoseiDataType::MarkPriceUpdate => slf
89 .add_file::<MarkPriceUpdate>(table_name, file_path, sql_query)
90 .map_err(to_pyruntime_err),
91 }
92 }
93
94 fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
95 let query_result = slf.get_query_result();
96 DataQueryResult::new(query_result, slf.chunk_size)
97 }
98}
99
100#[pymethods]
101impl DataQueryResult {
102 const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
104 slf
105 }
106
107 fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
109 match slf.next() {
110 Some(acc) if !acc.is_empty() => {
111 let cvec = slf.set_chunk(acc);
112 Python::with_gil(|py| match PyCapsule::new::<CVec>(py, cvec, None) {
113 Ok(capsule) => Ok(Some(capsule.into_py_any_unwrap(py))),
114 Err(e) => Err(to_pyruntime_err(e)),
115 })
116 }
117 _ => Ok(None),
118 }
119 }
120}