nautilus_tardis/http/
client.rs1use std::{env, time::Duration};
17
18use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT};
19use nautilus_model::instruments::InstrumentAny;
20use reqwest::Response;
21
22use super::{
23 TARDIS_BASE_URL,
24 error::{Error, TardisErrorResponse},
25 instruments::is_available,
26 models::InstrumentInfo,
27 parse::parse_instrument_any,
28 query::InstrumentFilter,
29};
30use crate::enums::Exchange;
31
32pub type Result<T> = std::result::Result<T, Error>;
33
34#[cfg_attr(
37 feature = "python",
38 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
39)]
40#[derive(Debug, Clone)]
41pub struct TardisHttpClient {
42 base_url: String,
43 api_key: String,
44 client: reqwest::Client,
45 normalize_symbols: bool,
46}
47
48impl TardisHttpClient {
49 pub fn new(
56 api_key: Option<&str>,
57 base_url: Option<&str>,
58 timeout_secs: Option<u64>,
59 normalize_symbols: bool,
60 ) -> anyhow::Result<Self> {
61 let api_key = match api_key {
62 Some(key) => key.to_string(),
63 None => env::var("TARDIS_API_KEY").map_err(|_| {
64 anyhow::anyhow!(
65 "API key must be provided or set in the 'TARDIS_API_KEY' environment variable"
66 )
67 })?,
68 };
69
70 let base_url = base_url.map_or_else(|| TARDIS_BASE_URL.to_string(), ToString::to_string);
71 let timeout = timeout_secs.map_or_else(|| Duration::from_secs(60), Duration::from_secs);
72
73 let client = reqwest::Client::builder()
74 .user_agent(NAUTILUS_USER_AGENT)
75 .timeout(timeout)
76 .build()?;
77
78 Ok(Self {
79 base_url,
80 api_key,
81 client,
82 normalize_symbols,
83 })
84 }
85
86 async fn handle_error_response<T>(resp: Response) -> Result<T> {
87 let status = resp.status().as_u16();
88 let error_text = resp.text().await.unwrap_or_default();
89
90 if let Ok(error) = serde_json::from_str::<TardisErrorResponse>(&error_text) {
91 Err(Error::ApiError {
92 status,
93 code: error.code,
94 message: error.message,
95 })
96 } else {
97 Err(Error::ApiError {
98 status,
99 code: 0,
100 message: error_text,
101 })
102 }
103 }
104
105 pub async fn instruments_info(
113 &self,
114 exchange: Exchange,
115 symbol: Option<&str>,
116 filter: Option<&InstrumentFilter>,
117 ) -> Result<Vec<InstrumentInfo>> {
118 let mut url = format!("{}/instruments/{exchange}", &self.base_url);
119 if let Some(symbol) = symbol {
120 url.push_str(&format!("/{symbol}"));
121 }
122 if let Some(filter) = filter {
123 if let Ok(filter_json) = serde_json::to_string(filter) {
124 url.push_str(&format!("?filter={}", urlencoding::encode(&filter_json)));
125 }
126 }
127 tracing::debug!("Requesting: {url}");
128
129 let resp = self
130 .client
131 .get(url)
132 .bearer_auth(&self.api_key)
133 .send()
134 .await?;
135 tracing::debug!("Response status: {}", resp.status());
136
137 if !resp.status().is_success() {
138 return Self::handle_error_response(resp).await;
139 }
140
141 let body = resp.text().await?;
142 tracing::trace!("{body}");
143
144 if let Ok(instrument) = serde_json::from_str::<InstrumentInfo>(&body) {
145 return Ok(vec![instrument]);
146 }
147
148 match serde_json::from_str(&body) {
149 Ok(parsed) => Ok(parsed),
150 Err(e) => {
151 tracing::error!("Failed to parse response: {e}");
152 tracing::debug!("Response body was: {body}");
153 Err(Error::ResponseParse(e.to_string()))
154 }
155 }
156 }
157
158 #[allow(clippy::too_many_arguments)]
166 pub async fn instruments(
167 &self,
168 exchange: Exchange,
169 symbol: Option<&str>,
170 filter: Option<&InstrumentFilter>,
171 start: Option<UnixNanos>,
172 end: Option<UnixNanos>,
173 available_offset: Option<UnixNanos>,
174 effective: Option<UnixNanos>,
175 ts_init: Option<UnixNanos>,
176 ) -> Result<Vec<InstrumentAny>> {
177 let response = self.instruments_info(exchange, symbol, filter).await?;
178
179 Ok(response
180 .into_iter()
181 .filter(|info| is_available(info, start, end, available_offset, effective))
182 .flat_map(|info| parse_instrument_any(info, effective, ts_init, self.normalize_symbols))
183 .collect())
184 }
185}