nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use futures::StreamExt;
19use nautilus_common::{
20    clock::{Clock, LiveClock},
21    messages::DataEvent,
22    runner::{DataQueue, RunnerEvent, get_data_cmd_queue, set_data_evt_queue},
23    runtime::get_runtime,
24};
25use posei_trader::engine::DataEngine;
26use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
27
28pub struct AsyncDataQueue(UnboundedSender<DataEvent>);
29
30impl Debug for AsyncDataQueue {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_tuple(stringify!(AsyncDataQueue)).finish()
33    }
34}
35
36impl DataQueue for AsyncDataQueue {
37    fn push(&mut self, event: DataEvent) {
38        if let Err(e) = self.0.send(event) {
39            log::error!("Unable to send data event to async data channel: {e}");
40        }
41    }
42}
43
44// TODO: Use message bus instead of direct reference to DataEngine
45pub trait Runner {
46    fn run(&mut self, data_engine: &mut DataEngine);
47}
48
49pub struct AsyncRunner {
50    pub clock: Rc<RefCell<LiveClock>>,
51    data_rx: UnboundedReceiver<DataEvent>,
52}
53
54impl Debug for AsyncRunner {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct(stringify!(AsyncRunner))
57            .field("clock_set", &true)
58            .finish()
59    }
60}
61
62impl AsyncRunner {
63    pub fn new(clock: Rc<RefCell<LiveClock>>) -> Self {
64        let (data_tx, data_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
65        set_data_evt_queue(Rc::new(RefCell::new(AsyncDataQueue(data_tx))));
66
67        Self { clock, data_rx }
68    }
69}
70
71impl Runner for AsyncRunner {
72    fn run(&mut self, data_engine: &mut DataEngine) {
73        let mut time_event_stream = self.clock.borrow().get_event_stream();
74        let data_cmd_queue = get_data_cmd_queue();
75
76        loop {
77            while let Some(cmd) = data_cmd_queue.borrow_mut().pop_front() {
78                // TODO: Send to data engine execute endpoint address
79                data_engine.execute(&cmd);
80            }
81
82            // Collect the next event to process
83            let next_event = get_runtime().block_on(async {
84                tokio::select! {
85                    Some(resp) = self.data_rx.recv() => Some(RunnerEvent::Data(resp)),
86                    Some(event) = time_event_stream.next() => Some(RunnerEvent::Timer(event)),
87                    else => None,
88                }
89            });
90
91            // Process the event outside of the async context
92            match next_event {
93                Some(RunnerEvent::Data(event)) => match event {
94                    DataEvent::Response(resp) => data_engine.response(resp),
95                    DataEvent::Data(data) => data_engine.process_data(data),
96                },
97                Some(RunnerEvent::Timer(event)) => self.clock.borrow().get_handler(event).run(),
98                None => break, // Sentinel event ends runner
99            }
100        }
101    }
102}
103
104#[cfg(test)]
105#[cfg(feature = "clock_v2")]
106mod tests {
107    use std::{cell::RefCell, rc::Rc};
108
109    use futures::StreamExt;
110    use nautilus_common::{
111        clock::LiveClock,
112        runner::{get_global_clock, set_global_clock},
113        timer::{TimeEvent, TimeEventCallback},
114    };
115
116    #[tokio::test]
117    async fn test_global_live_clock() {
118        let live_clock = Rc::new(RefCell::new(LiveClock::new()));
119        set_global_clock(live_clock.clone());
120        let alert_time = live_clock.borrow().get_time_ns() + 100;
121
122        // component/actor adding an alert
123        let _ = get_global_clock().borrow_mut().set_time_alert_ns(
124            "hola",
125            alert_time,
126            Some(TimeEventCallback::Rust(Rc::new(|_event: TimeEvent| {}))),
127            None,
128        );
129
130        // runner pulling from event
131        assert!(
132            live_clock
133                .borrow()
134                .get_event_stream()
135                .next()
136                .await
137                .is_some()
138        );
139    }
140}