nautilus_tardis/machine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16pub mod client;
17pub mod message;
18pub mod parse;
19pub mod types;
20
21use std::{
22    sync::{
23        Arc,
24        atomic::{AtomicBool, Ordering},
25    },
26    time::Duration,
27};
28
29use async_stream::stream;
30use futures_util::{SinkExt, Stream, StreamExt, stream::SplitSink};
31use message::WsMessage;
32use tokio::net::TcpStream;
33use tokio_tungstenite::{
34    MaybeTlsStream, WebSocketStream, connect_async,
35    tungstenite::{self, protocol::frame::coding::CloseCode},
36};
37use types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
38
39pub use crate::machine::client::TardisMachineClient;
40
41pub type Result<T> = std::result::Result<T, Error>;
42
43/// The error that could happen while interacting with Tardis Machine Server.
44#[derive(Debug, thiserror::Error)]
45pub enum Error {
46    /// An error that could happen when an empty options array was given.
47    #[error("Options cannot be empty")]
48    EmptyOptions,
49    /// An error when failed to connect to Tardis' websocket connection.
50    #[error("Failed to connect: {0}")]
51    ConnectFailed(#[from] tungstenite::Error),
52    /// An error when WS connection to the machine server was rejected.
53    #[error("Connection rejected: {reason}")]
54    ConnectRejected {
55        /// The status code for the initial WS connection.
56        status: tungstenite::http::StatusCode,
57        /// The reason why the connection was rejected.
58        reason: String,
59    },
60    /// An error where the websocket connection was closed unexpectedly by Tardis.
61    #[error("Connection closed: {reason}")]
62    ConnectionClosed {
63        /// The reason why the connection was closed.
64        reason: String,
65    },
66    /// An error when deserializing the response from Tardis.
67    #[error("Failed to deserialize message: {0}")]
68    Deserialization(#[from] serde_json::Error),
69}
70
71/// Connects to the Tardis Machine WS replay endpoint and returns a stream of WebSocket messages.
72///
73/// # Errors
74///
75/// Returns `Error::EmptyOptions` if no options provided,
76/// or `Error::ConnectFailed`/`Error::ConnectRejected` if connection fails.
77pub async fn replay_normalized(
78    base_url: &str,
79    options: Vec<ReplayNormalizedRequestOptions>,
80    signal: Arc<AtomicBool>,
81) -> Result<impl Stream<Item = Result<WsMessage>>> {
82    if options.is_empty() {
83        return Err(Error::EmptyOptions);
84    }
85
86    let path = format!("{base_url}/ws-replay-normalized?options=");
87    let options = serde_json::to_string(&options)?;
88
89    let plain_url = format!("{path}{options}");
90    tracing::debug!("Connecting to {plain_url}");
91
92    let url = format!("{path}{}", urlencoding::encode(&options));
93    stream_from_websocket(base_url, url, signal).await
94}
95
96/// Connects to the Tardis Machine WS streaming endpoint and returns a stream of WebSocket messages.
97///
98/// # Errors
99///
100/// Returns `Error::EmptyOptions` if no options provided,
101/// or `Error::ConnectFailed`/`Error::ConnectRejected` if connection fails.
102pub async fn stream_normalized(
103    base_url: &str,
104    options: Vec<StreamNormalizedRequestOptions>,
105    signal: Arc<AtomicBool>,
106) -> Result<impl Stream<Item = Result<WsMessage>>> {
107    if options.is_empty() {
108        return Err(Error::EmptyOptions);
109    }
110
111    let path = format!("{base_url}/ws-stream-normalized?options=");
112    let options = serde_json::to_string(&options)?;
113
114    let plain_url = format!("{path}{options}");
115    tracing::debug!("Connecting to {plain_url}");
116
117    let url = format!("{path}{}", urlencoding::encode(&options));
118    stream_from_websocket(base_url, url, signal).await
119}
120
121async fn stream_from_websocket(
122    base_url: &str,
123    url: String,
124    signal: Arc<AtomicBool>,
125) -> Result<impl Stream<Item = Result<WsMessage>>> {
126    let (ws_stream, ws_resp) = connect_async(url).await?;
127
128    handle_connection_response(ws_resp)?;
129    tracing::info!("Connected to {base_url}");
130
131    Ok(stream! {
132        let (writer, mut reader) = ws_stream.split();
133        tokio::spawn(heartbeat(writer));
134
135        // Timeout awaiting the next record before checking signal
136        let timeout = Duration::from_millis(10);
137
138        tracing::info!("Streaming from websocket...");
139
140        loop {
141            if signal.load(Ordering::Relaxed) {
142                tracing::debug!("Shutdown signal received");
143                break;
144            }
145
146            let result = tokio::time::timeout(timeout, reader.next()).await;
147            let msg = match result {
148                Ok(msg) => msg,
149                Err(_) => continue, // Timeout
150            };
151
152            match msg {
153                Some(Ok(msg)) => match msg {
154                    tungstenite::Message::Frame(_)
155                    | tungstenite::Message::Binary(_)
156                    | tungstenite::Message::Pong(_)
157                    | tungstenite::Message::Ping(_) => {
158                        tracing::trace!("Received {msg:?}");
159                        continue; // Skip and continue to the next message
160                    }
161                    tungstenite::Message::Close(Some(frame)) => {
162                        let reason = frame.reason.to_string();
163                        if frame.code == CloseCode::Normal {
164                            tracing::debug!("Connection closed normally: {reason}");
165                        } else {
166                            tracing::error!(
167                                "Connection closed abnormally with code: {:?}, reason: {reason}", frame.code
168                            );
169                            yield Err(Error::ConnectionClosed { reason });
170                        }
171                        break;
172                    }
173                    tungstenite::Message::Close(None) => {
174                        tracing::error!("Connection closed without a frame");
175                        yield Err(Error::ConnectionClosed {
176                            reason: "No close frame provided".to_string()
177                        });
178                        break;
179                    }
180                    tungstenite::Message::Text(msg) => {
181                        match serde_json::from_str::<WsMessage>(&msg) {
182                            Ok(parsed_msg) => yield Ok(parsed_msg),
183                            Err(e) => {
184                                tracing::error!("Failed to deserialize message: {msg}. Error: {e}");
185                                yield Err(Error::Deserialization(e));
186                            }
187                        }
188                    }
189                },
190                Some(Err(e)) => {
191                    tracing::error!("WebSocket error: {e}");
192                    yield Err(Error::ConnectFailed(e));
193                    break;
194                }
195                None => {
196                    tracing::error!("Connection closed unexpectedly");
197                    yield Err(Error::ConnectionClosed {
198                        reason: "Unexpected connection close".to_string(),
199                    });
200                    break;
201                }
202            }
203        }
204
205        tracing::info!("Shutdown stream");
206    })
207}
208
209#[allow(clippy::result_large_err)]
210fn handle_connection_response(ws_resp: tungstenite::http::Response<Option<Vec<u8>>>) -> Result<()> {
211    if ws_resp.status() != tungstenite::http::StatusCode::SWITCHING_PROTOCOLS {
212        return match ws_resp.body() {
213            Some(resp) => Err(Error::ConnectRejected {
214                status: ws_resp.status(),
215                reason: String::from_utf8_lossy(resp).to_string(),
216            }),
217            None => Err(Error::ConnectRejected {
218                status: ws_resp.status(),
219                reason: "Unknown reason".to_string(),
220            }),
221        };
222    }
223    Ok(())
224}
225
226async fn heartbeat(
227    mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
228) {
229    let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(10));
230    let retry_interval = Duration::from_secs(1);
231
232    loop {
233        heartbeat_interval.tick().await;
234        tracing::trace!("Sending PING");
235
236        let mut count = 3;
237        let mut retry_interval = tokio::time::interval(retry_interval);
238
239        while count > 0 {
240            retry_interval.tick().await;
241            let _ = sender.send(tungstenite::Message::Ping(vec![].into())).await;
242            count -= 1;
243        }
244    }
245}