nautilus_live/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::Ref, fmt::Display};
17
18use nautilus_common::{
19    clock::Clock,
20    messages::{
21        DataEvent,
22        data::{
23            BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
24            QuotesResponse, TradesResponse,
25        },
26    },
27};
28use nautilus_core::UUID4;
29use posei_trader::client::DataClient;
30use nautilus_model::{
31    data::{
32        Bar, BarType, Data, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas_API,
33        OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
34    },
35    identifiers::{ClientId, InstrumentId, Venue},
36    instruments::{Instrument, InstrumentAny},
37    orderbook::OrderBook,
38};
39
40#[async_trait::async_trait]
41pub trait LiveDataClient: DataClient {
42    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<DataEvent>;
43
44    fn get_clock(&self) -> Ref<'_, dyn Clock>;
45
46    fn send_delta(&self, delta: OrderBookDelta) {
47        self.send_data(Data::Delta(delta));
48    }
49
50    fn send_deltas(&self, deltas: OrderBookDeltas_API) {
51        self.send_data(Data::Deltas(deltas));
52    }
53
54    fn send_depth10(&self, depth: OrderBookDepth10) {
55        self.send_data(Data::Depth10(Box::new(depth)));
56    }
57
58    fn send_quote(&self, quote: QuoteTick) {
59        self.send_data(Data::Quote(quote));
60    }
61
62    fn send_trade(&self, trade: TradeTick) {
63        self.send_data(Data::Trade(trade));
64    }
65
66    fn send_bar(&self, bar: Bar) {
67        self.send_data(Data::Bar(bar));
68    }
69
70    fn send_mark_price(&self, mark_price: MarkPriceUpdate) {
71        self.send_data(Data::MarkPriceUpdate(mark_price));
72    }
73
74    fn send_index_price(&self, index_price: IndexPriceUpdate) {
75        self.send_data(Data::IndexPriceUpdate(index_price));
76    }
77
78    fn send_instrument_close(&self, close: InstrumentClose) {
79        self.send_data(Data::InstrumentClose(close));
80    }
81
82    fn send_data(&self, data: Data) {
83        if let Err(e) = self.get_message_channel().send(DataEvent::Data(data)) {
84            log_send_error(&self.client_id(), &e);
85        }
86    }
87
88    fn send_instrument_response(&self, instrument: InstrumentAny, correlation_id: UUID4) {
89        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
90            correlation_id,
91            self.client_id(),
92            instrument.id(),
93            instrument,
94            self.get_clock().timestamp_ns(),
95            None,
96        )));
97
98        self.send_response(response);
99    }
100
101    fn send_instruments_response(
102        &self,
103        venue: Venue,
104        instruments: Vec<InstrumentAny>,
105        correlation_id: UUID4,
106    ) {
107        let response = DataResponse::Instruments(InstrumentsResponse::new(
108            correlation_id,
109            self.client_id(),
110            venue,
111            instruments,
112            self.get_clock().timestamp_ns(),
113            None,
114        ));
115
116        self.send_response(response);
117    }
118
119    fn send_book_response(&self, book: OrderBook, correlation_id: UUID4) {
120        let response = DataResponse::Book(BookResponse::new(
121            correlation_id,
122            self.client_id(),
123            book.instrument_id,
124            book,
125            self.get_clock().timestamp_ns(),
126            None,
127        ));
128
129        self.send_response(response);
130    }
131
132    fn send_quotes_response(
133        &self,
134        instrument_id: InstrumentId,
135        quotes: Vec<QuoteTick>,
136        correlation_id: UUID4,
137    ) {
138        let response = DataResponse::Quotes(QuotesResponse::new(
139            correlation_id,
140            self.client_id(),
141            instrument_id,
142            quotes,
143            self.get_clock().timestamp_ns(),
144            None,
145        ));
146
147        self.send_response(response);
148    }
149
150    fn send_trades_response(
151        &self,
152        instrument_id: InstrumentId,
153        trades: Vec<TradeTick>,
154        correlation_id: UUID4,
155    ) {
156        let response = DataResponse::Trades(TradesResponse::new(
157            correlation_id,
158            self.client_id(),
159            instrument_id,
160            trades,
161            self.get_clock().timestamp_ns(),
162            None,
163        ));
164
165        self.send_response(response);
166    }
167
168    fn send_bars(&self, bar_type: BarType, bars: Vec<Bar>, correlation_id: UUID4) {
169        let response = DataResponse::Bars(BarsResponse::new(
170            correlation_id,
171            self.client_id(),
172            bar_type,
173            bars,
174            self.get_clock().timestamp_ns(),
175            None,
176        ));
177
178        self.send_response(response);
179    }
180
181    fn send_response(&self, response: DataResponse) {
182        if let Err(e) = self
183            .get_message_channel()
184            .send(DataEvent::Response(response))
185        {
186            log_send_error(&self.client_id(), &e);
187        }
188    }
189}
190
191#[inline(always)]
192fn log_send_error<E: Display>(client_id: &ClientId, e: &E) {
193    log::error!("DataClient-{client_id} failed to send message: {e}");
194}