1use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use futures::StreamExt;
19use nautilus_common::{
20 clock::{Clock, LiveClock},
21 messages::DataEvent,
22 runner::{DataQueue, RunnerEvent, get_data_cmd_queue, set_data_evt_queue},
23 runtime::get_runtime,
24};
25use posei_trader::engine::DataEngine;
26use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
27
28pub struct AsyncDataQueue(UnboundedSender<DataEvent>);
29
30impl Debug for AsyncDataQueue {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_tuple(stringify!(AsyncDataQueue)).finish()
33 }
34}
35
36impl DataQueue for AsyncDataQueue {
37 fn push(&mut self, event: DataEvent) {
38 if let Err(e) = self.0.send(event) {
39 log::error!("Unable to send data event to async data channel: {e}");
40 }
41 }
42}
43
44pub trait Runner {
46 fn run(&mut self, data_engine: &mut DataEngine);
47}
48
49pub struct AsyncRunner {
50 pub clock: Rc<RefCell<LiveClock>>,
51 data_rx: UnboundedReceiver<DataEvent>,
52}
53
54impl Debug for AsyncRunner {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct(stringify!(AsyncRunner))
57 .field("clock_set", &true)
58 .finish()
59 }
60}
61
62impl AsyncRunner {
63 pub fn new(clock: Rc<RefCell<LiveClock>>) -> Self {
64 let (data_tx, data_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
65 set_data_evt_queue(Rc::new(RefCell::new(AsyncDataQueue(data_tx))));
66
67 Self { clock, data_rx }
68 }
69}
70
71impl Runner for AsyncRunner {
72 fn run(&mut self, data_engine: &mut DataEngine) {
73 let mut time_event_stream = self.clock.borrow().get_event_stream();
74 let data_cmd_queue = get_data_cmd_queue();
75
76 loop {
77 while let Some(cmd) = data_cmd_queue.borrow_mut().pop_front() {
78 data_engine.execute(&cmd);
80 }
81
82 let next_event = get_runtime().block_on(async {
84 tokio::select! {
85 Some(resp) = self.data_rx.recv() => Some(RunnerEvent::Data(resp)),
86 Some(event) = time_event_stream.next() => Some(RunnerEvent::Timer(event)),
87 else => None,
88 }
89 });
90
91 match next_event {
93 Some(RunnerEvent::Data(event)) => match event {
94 DataEvent::Response(resp) => data_engine.response(resp),
95 DataEvent::Data(data) => data_engine.process_data(data),
96 },
97 Some(RunnerEvent::Timer(event)) => self.clock.borrow().get_handler(event).run(),
98 None => break, }
100 }
101 }
102}
103
104#[cfg(test)]
105#[cfg(feature = "clock_v2")]
106mod tests {
107 use std::{cell::RefCell, rc::Rc};
108
109 use futures::StreamExt;
110 use nautilus_common::{
111 clock::LiveClock,
112 runner::{get_global_clock, set_global_clock},
113 timer::{TimeEvent, TimeEventCallback},
114 };
115
116 #[tokio::test]
117 async fn test_global_live_clock() {
118 let live_clock = Rc::new(RefCell::new(LiveClock::new()));
119 set_global_clock(live_clock.clone());
120 let alert_time = live_clock.borrow().get_time_ns() + 100;
121
122 let _ = get_global_clock().borrow_mut().set_time_alert_ns(
124 "hola",
125 alert_time,
126 Some(TimeEventCallback::Rust(Rc::new(|_event: TimeEvent| {}))),
127 None,
128 );
129
130 assert!(
132 live_clock
133 .borrow()
134 .get_event_stream()
135 .next()
136 .await
137 .is_some()
138 );
139 }
140}