nautilus_tardis/machine/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    env,
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23};
24
25use futures_util::{Stream, StreamExt, pin_mut};
26use nautilus_model::data::Data;
27use ustr::Ustr;
28
29use super::{
30    Error,
31    message::WsMessage,
32    replay_normalized, stream_normalized,
33    types::{
34        InstrumentMiniInfo, ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions,
35        TardisInstrumentKey,
36    },
37};
38use crate::machine::parse::parse_tardis_ws_message;
39
40/// Provides a client for connecting to a [Tardis Machine Server](https://docs.tardis.dev/api/tardis-machine).
41#[cfg_attr(
42    feature = "python",
43    pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
44)]
45#[derive(Debug, Clone)]
46pub struct TardisMachineClient {
47    pub base_url: String,
48    pub replay_signal: Arc<AtomicBool>,
49    pub stream_signal: Arc<AtomicBool>,
50    pub instruments: HashMap<TardisInstrumentKey, Arc<InstrumentMiniInfo>>,
51    pub normalize_symbols: bool,
52}
53
54impl TardisMachineClient {
55    /// Creates a new [`TardisMachineClient`] instance.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if `base_url` is not provided and `TARDIS_MACHINE_WS_URL` env var is missing.
60    pub fn new(base_url: Option<&str>, normalize_symbols: bool) -> anyhow::Result<Self> {
61        let base_url = base_url
62            .map(ToString::to_string)
63            .or_else(|| env::var("TARDIS_MACHINE_WS_URL").ok())
64            .ok_or_else(|| {
65                anyhow::anyhow!(
66                    "Tardis Machine `base_url` must be provided or set in the 'TARDIS_MACHINE_WS_URL' environment variable"
67                )
68            })?;
69
70        Ok(Self {
71            base_url,
72            replay_signal: Arc::new(AtomicBool::new(false)),
73            stream_signal: Arc::new(AtomicBool::new(false)),
74            instruments: HashMap::new(),
75            normalize_symbols,
76        })
77    }
78
79    pub fn add_instrument_info(&mut self, info: InstrumentMiniInfo) {
80        let key = info.as_tardis_instrument_key();
81        self.instruments.insert(key, Arc::new(info));
82    }
83
84    #[must_use]
85    pub fn is_closed(&self) -> bool {
86        self.replay_signal.load(Ordering::Relaxed) && self.stream_signal.load(Ordering::Relaxed)
87    }
88
89    pub fn close(&mut self) {
90        tracing::debug!("Closing");
91
92        self.replay_signal.store(true, Ordering::Relaxed);
93        self.stream_signal.store(true, Ordering::Relaxed);
94
95        tracing::debug!("Closed");
96    }
97
98    /// Connects to the Tardis Machine replay WebSocket and yields parsed `Data` items.
99    ///
100    /// # Panics
101    ///
102    /// Panics if the WebSocket connection cannot be established.
103    pub async fn replay(
104        &self,
105        options: Vec<ReplayNormalizedRequestOptions>,
106    ) -> impl Stream<Item = Data> {
107        let stream = replay_normalized(&self.base_url, options, self.replay_signal.clone())
108            .await
109            .expect("Failed to connect to WebSocket");
110
111        // We use Box::pin to heap-allocate the stream and ensure it implements
112        // Unpin for safe async handling across lifetimes.
113        handle_ws_stream(Box::pin(stream), None, Some(self.instruments.clone()))
114    }
115
116    /// Connects to the Tardis Machine stream WebSocket for a single instrument and yields parsed `Data` items.
117    ///
118    /// # Panics
119    ///
120    /// Panics if the WebSocket connection cannot be established.
121    pub async fn stream(
122        &self,
123        instrument: InstrumentMiniInfo,
124        options: Vec<StreamNormalizedRequestOptions>,
125    ) -> impl Stream<Item = Data> {
126        let stream = stream_normalized(&self.base_url, options, self.stream_signal.clone())
127            .await
128            .expect("Failed to connect to WebSocket");
129
130        // We use Box::pin to heap-allocate the stream and ensure it implements
131        // Unpin for safe async handling across lifetimes.
132        handle_ws_stream(Box::pin(stream), Some(Arc::new(instrument)), None)
133    }
134}
135
136fn handle_ws_stream<S>(
137    stream: S,
138    instrument: Option<Arc<InstrumentMiniInfo>>,
139    instrument_map: Option<HashMap<TardisInstrumentKey, Arc<InstrumentMiniInfo>>>,
140) -> impl Stream<Item = Data>
141where
142    S: Stream<Item = Result<WsMessage, Error>> + Unpin,
143{
144    assert!(
145        instrument.is_some() || instrument_map.is_some(),
146        "Either `instrument` or `instrument_map` must be provided"
147    );
148
149    async_stream::stream! {
150        pin_mut!(stream);
151        while let Some(result) = stream.next().await {
152            match result {
153                Ok(msg) => {
154                    let info = instrument.clone().or_else(|| {
155                        instrument_map
156                            .as_ref()
157                            .and_then(|map| determine_instrument_info(&msg, map))
158                    });
159
160                    if let Some(info) = info {
161                        if let Some(data) = parse_tardis_ws_message(msg, info) {
162                            yield data;
163                        }
164                    }
165                }
166                Err(e) => {
167                    tracing::error!("Error in WebSocket stream: {e:?}");
168                    break;
169                }
170            }
171        }
172    }
173}
174
175pub fn determine_instrument_info(
176    msg: &WsMessage,
177    instrument_map: &HashMap<TardisInstrumentKey, Arc<InstrumentMiniInfo>>,
178) -> Option<Arc<InstrumentMiniInfo>> {
179    let key = match msg {
180        WsMessage::BookChange(msg) => {
181            TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange.clone())
182        }
183        WsMessage::BookSnapshot(msg) => {
184            TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange.clone())
185        }
186        WsMessage::Trade(msg) => {
187            TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange.clone())
188        }
189        WsMessage::TradeBar(msg) => {
190            TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange.clone())
191        }
192        WsMessage::DerivativeTicker(_) => return None,
193        WsMessage::Disconnect(_) => return None,
194    };
195    if let Some(inst) = instrument_map.get(&key) {
196        Some(inst.clone())
197    } else {
198        tracing::error!("Instrument definition info not available for {key:?}");
199        None
200    }
201}