nautilus_tardis/machine/
client.rs1use std::{
17 collections::HashMap,
18 env,
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23};
24
25use futures_util::{Stream, StreamExt, pin_mut};
26use nautilus_model::data::Data;
27use ustr::Ustr;
28
29use super::{
30 Error,
31 message::WsMessage,
32 replay_normalized, stream_normalized,
33 types::{
34 InstrumentMiniInfo, ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions,
35 TardisInstrumentKey,
36 },
37};
38use crate::machine::parse::parse_tardis_ws_message;
39
40#[cfg_attr(
42 feature = "python",
43 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
44)]
45#[derive(Debug, Clone)]
46pub struct TardisMachineClient {
47 pub base_url: String,
48 pub replay_signal: Arc<AtomicBool>,
49 pub stream_signal: Arc<AtomicBool>,
50 pub instruments: HashMap<TardisInstrumentKey, Arc<InstrumentMiniInfo>>,
51 pub normalize_symbols: bool,
52}
53
54impl TardisMachineClient {
55 pub fn new(base_url: Option<&str>, normalize_symbols: bool) -> anyhow::Result<Self> {
61 let base_url = base_url
62 .map(ToString::to_string)
63 .or_else(|| env::var("TARDIS_MACHINE_WS_URL").ok())
64 .ok_or_else(|| {
65 anyhow::anyhow!(
66 "Tardis Machine `base_url` must be provided or set in the 'TARDIS_MACHINE_WS_URL' environment variable"
67 )
68 })?;
69
70 Ok(Self {
71 base_url,
72 replay_signal: Arc::new(AtomicBool::new(false)),
73 stream_signal: Arc::new(AtomicBool::new(false)),
74 instruments: HashMap::new(),
75 normalize_symbols,
76 })
77 }
78
79 pub fn add_instrument_info(&mut self, info: InstrumentMiniInfo) {
80 let key = info.as_tardis_instrument_key();
81 self.instruments.insert(key, Arc::new(info));
82 }
83
84 #[must_use]
85 pub fn is_closed(&self) -> bool {
86 self.replay_signal.load(Ordering::Relaxed) && self.stream_signal.load(Ordering::Relaxed)
87 }
88
89 pub fn close(&mut self) {
90 tracing::debug!("Closing");
91
92 self.replay_signal.store(true, Ordering::Relaxed);
93 self.stream_signal.store(true, Ordering::Relaxed);
94
95 tracing::debug!("Closed");
96 }
97
98 pub async fn replay(
104 &self,
105 options: Vec<ReplayNormalizedRequestOptions>,
106 ) -> impl Stream<Item = Data> {
107 let stream = replay_normalized(&self.base_url, options, self.replay_signal.clone())
108 .await
109 .expect("Failed to connect to WebSocket");
110
111 handle_ws_stream(Box::pin(stream), None, Some(self.instruments.clone()))
114 }
115
116 pub async fn stream(
122 &self,
123 instrument: InstrumentMiniInfo,
124 options: Vec<StreamNormalizedRequestOptions>,
125 ) -> impl Stream<Item = Data> {
126 let stream = stream_normalized(&self.base_url, options, self.stream_signal.clone())
127 .await
128 .expect("Failed to connect to WebSocket");
129
130 handle_ws_stream(Box::pin(stream), Some(Arc::new(instrument)), None)
133 }
134}
135
136fn handle_ws_stream<S>(
137 stream: S,
138 instrument: Option<Arc<InstrumentMiniInfo>>,
139 instrument_map: Option<HashMap<TardisInstrumentKey, Arc<InstrumentMiniInfo>>>,
140) -> impl Stream<Item = Data>
141where
142 S: Stream<Item = Result<WsMessage, Error>> + Unpin,
143{
144 assert!(
145 instrument.is_some() || instrument_map.is_some(),
146 "Either `instrument` or `instrument_map` must be provided"
147 );
148
149 async_stream::stream! {
150 pin_mut!(stream);
151 while let Some(result) = stream.next().await {
152 match result {
153 Ok(msg) => {
154 let info = instrument.clone().or_else(|| {
155 instrument_map
156 .as_ref()
157 .and_then(|map| determine_instrument_info(&msg, map))
158 });
159
160 if let Some(info) = info {
161 if let Some(data) = parse_tardis_ws_message(msg, info) {
162 yield data;
163 }
164 }
165 }
166 Err(e) => {
167 tracing::error!("Error in WebSocket stream: {e:?}");
168 break;
169 }
170 }
171 }
172 }
173}
174
175pub fn determine_instrument_info(
176 msg: &WsMessage,
177 instrument_map: &HashMap<TardisInstrumentKey, Arc<InstrumentMiniInfo>>,
178) -> Option<Arc<InstrumentMiniInfo>> {
179 let key = match msg {
180 WsMessage::BookChange(msg) => {
181 TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange.clone())
182 }
183 WsMessage::BookSnapshot(msg) => {
184 TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange.clone())
185 }
186 WsMessage::Trade(msg) => {
187 TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange.clone())
188 }
189 WsMessage::TradeBar(msg) => {
190 TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange.clone())
191 }
192 WsMessage::DerivativeTicker(_) => return None,
193 WsMessage::Disconnect(_) => return None,
194 };
195 if let Some(inst) = instrument_map.get(&key) {
196 Some(inst.clone())
197 } else {
198 tracing::error!("Instrument definition info not available for {key:?}");
199 None
200 }
201}