1use std::{cell::Ref, fmt::Display};
17
18use nautilus_common::{
19 clock::Clock,
20 messages::{
21 DataEvent,
22 data::{
23 BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
24 QuotesResponse, TradesResponse,
25 },
26 },
27};
28use nautilus_core::UUID4;
29use posei_trader::client::DataClient;
30use nautilus_model::{
31 data::{
32 Bar, BarType, Data, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas_API,
33 OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
34 },
35 identifiers::{ClientId, InstrumentId, Venue},
36 instruments::{Instrument, InstrumentAny},
37 orderbook::OrderBook,
38};
39
40#[async_trait::async_trait]
41pub trait LiveDataClient: DataClient {
42 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<DataEvent>;
43
44 fn get_clock(&self) -> Ref<'_, dyn Clock>;
45
46 fn send_delta(&self, delta: OrderBookDelta) {
47 self.send_data(Data::Delta(delta));
48 }
49
50 fn send_deltas(&self, deltas: OrderBookDeltas_API) {
51 self.send_data(Data::Deltas(deltas));
52 }
53
54 fn send_depth10(&self, depth: OrderBookDepth10) {
55 self.send_data(Data::Depth10(Box::new(depth)));
56 }
57
58 fn send_quote(&self, quote: QuoteTick) {
59 self.send_data(Data::Quote(quote));
60 }
61
62 fn send_trade(&self, trade: TradeTick) {
63 self.send_data(Data::Trade(trade));
64 }
65
66 fn send_bar(&self, bar: Bar) {
67 self.send_data(Data::Bar(bar));
68 }
69
70 fn send_mark_price(&self, mark_price: MarkPriceUpdate) {
71 self.send_data(Data::MarkPriceUpdate(mark_price));
72 }
73
74 fn send_index_price(&self, index_price: IndexPriceUpdate) {
75 self.send_data(Data::IndexPriceUpdate(index_price));
76 }
77
78 fn send_instrument_close(&self, close: InstrumentClose) {
79 self.send_data(Data::InstrumentClose(close));
80 }
81
82 fn send_data(&self, data: Data) {
83 if let Err(e) = self.get_message_channel().send(DataEvent::Data(data)) {
84 log_send_error(&self.client_id(), &e);
85 }
86 }
87
88 fn send_instrument_response(&self, instrument: InstrumentAny, correlation_id: UUID4) {
89 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
90 correlation_id,
91 self.client_id(),
92 instrument.id(),
93 instrument,
94 self.get_clock().timestamp_ns(),
95 None,
96 )));
97
98 self.send_response(response);
99 }
100
101 fn send_instruments_response(
102 &self,
103 venue: Venue,
104 instruments: Vec<InstrumentAny>,
105 correlation_id: UUID4,
106 ) {
107 let response = DataResponse::Instruments(InstrumentsResponse::new(
108 correlation_id,
109 self.client_id(),
110 venue,
111 instruments,
112 self.get_clock().timestamp_ns(),
113 None,
114 ));
115
116 self.send_response(response);
117 }
118
119 fn send_book_response(&self, book: OrderBook, correlation_id: UUID4) {
120 let response = DataResponse::Book(BookResponse::new(
121 correlation_id,
122 self.client_id(),
123 book.instrument_id,
124 book,
125 self.get_clock().timestamp_ns(),
126 None,
127 ));
128
129 self.send_response(response);
130 }
131
132 fn send_quotes_response(
133 &self,
134 instrument_id: InstrumentId,
135 quotes: Vec<QuoteTick>,
136 correlation_id: UUID4,
137 ) {
138 let response = DataResponse::Quotes(QuotesResponse::new(
139 correlation_id,
140 self.client_id(),
141 instrument_id,
142 quotes,
143 self.get_clock().timestamp_ns(),
144 None,
145 ));
146
147 self.send_response(response);
148 }
149
150 fn send_trades_response(
151 &self,
152 instrument_id: InstrumentId,
153 trades: Vec<TradeTick>,
154 correlation_id: UUID4,
155 ) {
156 let response = DataResponse::Trades(TradesResponse::new(
157 correlation_id,
158 self.client_id(),
159 instrument_id,
160 trades,
161 self.get_clock().timestamp_ns(),
162 None,
163 ));
164
165 self.send_response(response);
166 }
167
168 fn send_bars(&self, bar_type: BarType, bars: Vec<Bar>, correlation_id: UUID4) {
169 let response = DataResponse::Bars(BarsResponse::new(
170 correlation_id,
171 self.client_id(),
172 bar_type,
173 bars,
174 self.get_clock().timestamp_ns(),
175 None,
176 ));
177
178 self.send_response(response);
179 }
180
181 fn send_response(&self, response: DataResponse) {
182 if let Err(e) = self
183 .get_message_channel()
184 .send(DataEvent::Response(response))
185 {
186 log_send_error(&self.client_id(), &e);
187 }
188 }
189}
190
191#[inline(always)]
192fn log_send_error<E: Display>(client_id: &ClientId, e: &E) {
193 log::error!("DataClient-{client_id} failed to send message: {e}");
194}