1use std::{
17 cell::RefCell,
18 collections::{HashMap, HashSet},
19 fmt::Debug,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 clock::Clock,
26 logging::{CMD, EVT, RECV},
27 messages::execution::{
28 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
29 },
30 msgbus::{self, handler::ShareableMessageHandler},
31};
32use nautilus_core::uuid::UUID4;
33use nautilus_model::{
34 data::{OrderBookDeltas, QuoteTick, TradeTick},
35 enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
36 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
37 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
38 instruments::Instrument,
39 orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
40 types::{Price, Quantity},
41};
42
43use crate::{
44 matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
45 trailing::trailing_stop_calculate,
46};
47
48pub struct OrderEmulator {
49 clock: Rc<RefCell<dyn Clock>>,
50 cache: Rc<RefCell<Cache>>,
51 manager: OrderManager,
52 matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
53 subscribed_quotes: HashSet<InstrumentId>,
54 subscribed_trades: HashSet<InstrumentId>,
55 subscribed_strategies: HashSet<StrategyId>,
56 monitored_positions: HashSet<PositionId>,
57 on_event_handler: Option<ShareableMessageHandler>,
58}
59
60impl Debug for OrderEmulator {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct(stringify!(OrderEmulator))
63 .field("cores", &self.matching_cores.len())
64 .field("subscribed_quotes", &self.subscribed_quotes.len())
65 .finish()
66 }
67}
68
69impl OrderEmulator {
70 pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
71 let active_local = true;
75 let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
76
77 Self {
78 clock,
79 cache,
80 manager,
81 matching_cores: HashMap::new(),
82 subscribed_quotes: HashSet::new(),
83 subscribed_trades: HashSet::new(),
84 subscribed_strategies: HashSet::new(),
85 monitored_positions: HashSet::new(),
86 on_event_handler: None,
87 }
88 }
89
90 pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
91 self.on_event_handler = Some(handler);
92 }
93
94 #[must_use]
108 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
109 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
110 quotes.sort();
111 quotes
112 }
113
114 #[must_use]
115 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
116 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
117 trades.sort();
118 trades
119 }
120
121 #[must_use]
122 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
123 self.manager.get_submit_order_commands()
124 }
125
126 #[must_use]
127 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
128 self.matching_cores.get(instrument_id).cloned()
129 }
130
131 pub fn on_start(&mut self) -> anyhow::Result<()> {
141 let emulated_orders: Vec<OrderAny> = self
142 .cache
143 .borrow()
144 .orders_emulated(None, None, None, None)
145 .into_iter()
146 .cloned()
147 .collect();
148
149 if emulated_orders.is_empty() {
150 log::error!("No emulated orders to reactivate");
151 return Ok(());
152 }
153
154 for order in emulated_orders {
155 if !matches!(
156 order.status(),
157 OrderStatus::Initialized | OrderStatus::Emulated
158 ) {
159 continue; }
161
162 if let Some(parent_order_id) = &order.parent_order_id() {
163 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
164 order.clone()
165 } else {
166 log::error!("Cannot handle order: parent {parent_order_id} not found");
167 continue;
168 };
169
170 let is_position_closed = parent_order
171 .position_id()
172 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
173 if parent_order.is_closed() && is_position_closed {
174 self.manager.cancel_order(&order);
175 continue; }
177
178 if parent_order.contingency_type() == Some(ContingencyType::Oto)
179 && (parent_order.is_active_local()
180 || parent_order.filled_qty() == Quantity::zero(0))
181 {
182 continue; }
184 }
185
186 let position_id = self
187 .cache
188 .borrow()
189 .position_id(&order.client_order_id())
190 .copied();
191 let client_id = self
192 .cache
193 .borrow()
194 .client_id(&order.client_order_id())
195 .copied();
196
197 let command = SubmitOrder::new(
198 order.trader_id(),
199 client_id.unwrap(),
200 order.strategy_id(),
201 order.instrument_id(),
202 order.client_order_id(),
203 order.venue_order_id().unwrap(),
204 order.clone(),
205 order.exec_algorithm_id(),
206 position_id,
207 UUID4::new(),
208 self.clock.borrow().timestamp_ns(),
209 )?;
210
211 self.handle_submit_order(command);
212 }
213
214 Ok(())
215 }
216
217 pub fn on_event(&mut self, event: OrderEventAny) {
218 log::info!("{RECV}{EVT} {event}");
219
220 self.manager.handle_event(event.clone());
221
222 if let Some(order) = self.cache.borrow().order(&event.client_order_id()) {
223 if order.is_closed() {
224 if let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id()) {
225 if let Err(e) =
226 matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
227 {
228 log::error!("Error deleting order: {e}");
229 }
230 }
231 }
232 }
233 }
235
236 pub const fn on_stop(&self) {}
237
238 pub fn on_reset(&mut self) {
239 self.manager.reset();
240 self.matching_cores.clear();
241 }
242
243 pub const fn on_dispose(&self) {}
244
245 pub fn execute(&mut self, command: TradingCommand) {
246 log::info!("{RECV}{CMD} {command}");
247
248 match command {
249 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
250 TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
251 TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
252 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
253 TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
254 _ => log::error!("Cannot handle command: unrecognized {command:?}"),
255 }
256 }
257
258 fn create_matching_core(
259 &mut self,
260 instrument_id: InstrumentId,
261 price_increment: Price,
262 ) -> OrderMatchingCore {
263 let matching_core =
264 OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
265 self.matching_cores
266 .insert(instrument_id, matching_core.clone());
267 log::info!("Creating matching core for {instrument_id:?}");
268 matching_core
269 }
270
271 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
275 let mut order = command.order.clone();
276 let emulation_trigger = order.emulation_trigger();
277
278 assert_ne!(
279 emulation_trigger,
280 Some(TriggerType::NoTrigger),
281 "command.order.emulation_trigger must not be TriggerType::NoTrigger"
282 );
283 assert!(
284 self.manager
285 .get_submit_order_commands()
286 .contains_key(&order.client_order_id()),
287 "command.order.client_order_id must be in submit_order_commands"
288 );
289
290 if !matches!(
291 emulation_trigger,
292 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
293 ) {
294 log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
295 self.manager.cancel_order(&order);
296 return;
297 }
298
299 self.check_monitoring(command.strategy_id, command.position_id);
300
301 let trigger_instrument_id = order
303 .trigger_instrument_id()
304 .unwrap_or_else(|| order.instrument_id());
305
306 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
307
308 let mut matching_core = if let Some(core) = matching_core {
309 core
310 } else {
311 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
313 let synthetic = self
314 .cache
315 .borrow()
316 .synthetic(&trigger_instrument_id)
317 .cloned();
318 if let Some(synthetic) = synthetic {
319 (synthetic.id, synthetic.price_increment)
320 } else {
321 log::error!(
322 "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
323 );
324 self.manager.cancel_order(&order);
325 return;
326 }
327 } else {
328 let instrument = self
329 .cache
330 .borrow()
331 .instrument(&trigger_instrument_id)
332 .cloned();
333 if let Some(instrument) = instrument {
334 (instrument.id(), instrument.price_increment())
335 } else {
336 log::error!(
337 "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
338 );
339 self.manager.cancel_order(&order);
340 return;
341 }
342 };
343
344 self.create_matching_core(instrument_id, price_increment)
345 };
346
347 if matches!(
349 order.order_type(),
350 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
351 ) {
352 self.update_trailing_stop_order(&mut order);
353 if order.trigger_price().is_none() {
354 log::error!(
355 "Cannot handle trailing stop order with no trigger_price and no market updates"
356 );
357
358 self.manager.cancel_order(&order);
359 return;
360 }
361 }
362
363 self.manager.cache_submit_order_command(command);
365
366 matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
368
369 match emulation_trigger.unwrap() {
371 TriggerType::Default | TriggerType::BidAsk => {
372 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
373 if !trigger_instrument_id.is_synthetic() {
374 }
377 self.subscribed_quotes.insert(trigger_instrument_id);
380 }
381 }
382 TriggerType::LastPrice => {
383 if !self.subscribed_trades.contains(&trigger_instrument_id) {
384 self.subscribed_trades.insert(trigger_instrument_id);
387 }
388 }
389 _ => {
390 log::error!("Invalid TriggerType: {emulation_trigger:?}");
391 return;
392 }
393 }
394
395 if !self
397 .manager
398 .get_submit_order_commands()
399 .contains_key(&order.client_order_id())
400 {
401 return; }
403
404 if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
406 log::error!("Cannot add order: {e:?}");
407 return;
408 }
409
410 if order.status() == OrderStatus::Initialized {
412 let event = OrderEmulated::new(
413 order.trader_id(),
414 order.strategy_id(),
415 order.instrument_id(),
416 order.client_order_id(),
417 UUID4::new(),
418 self.clock.borrow().timestamp_ns(),
419 self.clock.borrow().timestamp_ns(),
420 );
421
422 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
423 log::error!("Cannot apply order event: {e:?}");
424 return;
425 }
426
427 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
428 log::error!("Cannot update order: {e:?}");
429 return;
430 }
431
432 self.manager.send_risk_event(OrderEventAny::Emulated(event));
433
434 msgbus::publish(
435 format!("events.order.{}", order.strategy_id()).into(),
436 &OrderEventAny::Emulated(event),
437 );
438 }
439
440 self.matching_cores
442 .insert(trigger_instrument_id, matching_core);
443
444 log::info!("Emulating {order}");
445 }
446
447 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
448 self.check_monitoring(command.strategy_id, command.position_id);
449
450 for order in &command.order_list.orders {
451 if let Some(parent_order_id) = order.parent_order_id() {
452 let cache = self.cache.borrow();
453 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
454 parent_order
455 } else {
456 log::error!("Parent order for {} not found", order.client_order_id());
457 continue;
458 };
459
460 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
461 continue; }
463 }
464
465 if let Err(e) = self.manager.create_new_submit_order(
466 order,
467 command.position_id,
468 Some(command.client_id),
469 ) {
470 log::error!("Error creating new submit order: {e}");
471 }
472 }
473 }
474
475 fn handle_modify_order(&mut self, command: ModifyOrder) {
476 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
477 let price = match command.price {
478 Some(price) => Some(price),
479 None => order.price(),
480 };
481
482 let trigger_price = match command.trigger_price {
483 Some(trigger_price) => Some(trigger_price),
484 None => order.trigger_price(),
485 };
486
487 let ts_now = self.clock.borrow().timestamp_ns();
489 let event = OrderUpdated::new(
490 order.trader_id(),
491 order.strategy_id(),
492 order.instrument_id(),
493 order.client_order_id(),
494 command.quantity.unwrap_or(order.quantity()),
495 UUID4::new(),
496 ts_now,
497 ts_now,
498 false,
499 order.venue_order_id(),
500 order.account_id(),
501 price,
502 trigger_price,
503 );
504
505 self.manager.send_exec_event(OrderEventAny::Updated(event));
506
507 let trigger_instrument_id = order
508 .trigger_instrument_id()
509 .unwrap_or_else(|| order.instrument_id());
510
511 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
512 matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
513 } else {
514 log::error!(
515 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
516 );
517 }
518 } else {
519 log::error!("Cannot modify order: {} not found", command.client_order_id);
520 }
521 }
522
523 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
524 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
525 order.clone()
526 } else {
527 log::error!("Cannot cancel order: {} not found", command.client_order_id);
528 return;
529 };
530
531 let trigger_instrument_id = order
532 .trigger_instrument_id()
533 .unwrap_or_else(|| order.instrument_id());
534
535 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
536 core
537 } else {
538 self.manager.cancel_order(&order);
539 return;
540 };
541
542 if !matching_core.order_exists(order.client_order_id())
543 && order.is_open()
544 && !order.is_pending_cancel()
545 {
546 self.manager
548 .send_exec_command(TradingCommand::CancelOrder(command));
549 } else {
550 self.manager.cancel_order(&order);
551 }
552 }
553
554 fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
555 let matching_core = match self.matching_cores.get(&command.instrument_id) {
556 Some(core) => core,
557 None => return, };
559
560 let orders_to_cancel = match command.order_side {
561 OrderSide::NoOrderSide => {
562 let mut all_orders = Vec::new();
564 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
565 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
566 all_orders
567 }
568 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
569 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
570 };
571
572 for order in orders_to_cancel {
574 self.manager.cancel_order(&OrderAny::from(order));
575 }
576 }
577
578 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
581 log::info!(
582 "Updating order {} quantity to {new_quantity}",
583 order.client_order_id(),
584 );
585
586 let ts_now = self.clock.borrow().timestamp_ns();
588 let event = OrderUpdated::new(
589 order.trader_id(),
590 order.strategy_id(),
591 order.instrument_id(),
592 order.client_order_id(),
593 new_quantity,
594 UUID4::new(),
595 ts_now,
596 ts_now,
597 false,
598 None,
599 order.account_id(),
600 None,
601 None,
602 );
603
604 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
605 log::error!("Cannot apply order event: {e:?}");
606 return;
607 }
608 if let Err(e) = self.cache.borrow_mut().update_order(order) {
609 log::error!("Cannot update order: {e:?}");
610 return;
611 }
612
613 self.manager.send_risk_event(OrderEventAny::Updated(event));
614 }
615
616 pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
619 log::debug!("Processing {deltas:?}");
620
621 let instrument_id = &deltas.instrument_id;
622 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
623 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
624 let best_bid = book.best_bid_price();
625 let best_ask = book.best_ask_price();
626
627 if let Some(best_bid) = best_bid {
628 matching_core.set_bid_raw(best_bid);
629 }
630
631 if let Some(best_ask) = best_ask {
632 matching_core.set_ask_raw(best_ask);
633 }
634 } else {
635 log::error!(
636 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
637 deltas.instrument_id
638 );
639 }
640
641 self.iterate_orders(instrument_id);
642 } else {
643 log::error!(
644 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
645 deltas.instrument_id
646 );
647 }
648 }
649
650 pub fn on_quote_tick(&mut self, quote: QuoteTick) {
651 log::debug!("Processing {quote}:?");
652
653 let instrument_id = "e.instrument_id;
654 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
655 matching_core.set_bid_raw(quote.bid_price);
656 matching_core.set_ask_raw(quote.ask_price);
657
658 self.iterate_orders(instrument_id);
659 } else {
660 log::error!(
661 "Cannot handle `QuoteTick`: no matching core for instrument {}",
662 quote.instrument_id
663 );
664 }
665 }
666
667 pub fn on_trade_tick(&mut self, trade: TradeTick) {
668 log::debug!("Processing {trade:?}");
669
670 let instrument_id = &trade.instrument_id;
671 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
672 matching_core.set_last_raw(trade.price);
673 if !self.subscribed_quotes.contains(instrument_id) {
674 matching_core.set_bid_raw(trade.price);
675 matching_core.set_ask_raw(trade.price);
676 }
677
678 self.iterate_orders(instrument_id);
679 } else {
680 log::error!(
681 "Cannot handle `TradeTick`: no matching core for instrument {}",
682 trade.instrument_id
683 );
684 }
685 }
686
687 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
688 let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
689 matching_core.iterate();
690
691 matching_core.get_orders()
692 } else {
693 log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
694 return;
695 };
696
697 for order in orders {
698 if order.is_closed() {
699 continue;
700 }
701
702 let mut order: OrderAny = order.clone().into();
703 if matches!(
704 order.order_type(),
705 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
706 ) {
707 self.update_trailing_stop_order(&mut order);
708 }
709 }
710 }
711
712 pub fn cancel_order(&mut self, order: &OrderAny) {
713 log::info!("Canceling order {}", order.client_order_id());
714
715 let mut order = order.clone();
716 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
717
718 let trigger_instrument_id = order
719 .trigger_instrument_id()
720 .unwrap_or(order.instrument_id());
721
722 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
723 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
724 log::error!("Cannot delete order: {e:?}");
725 }
726 }
727
728 self.cache
729 .borrow_mut()
730 .update_order_pending_cancel_local(&order);
731
732 let ts_now = self.clock.borrow().timestamp_ns();
734 let event = OrderCanceled::new(
735 order.trader_id(),
736 order.strategy_id(),
737 order.instrument_id(),
738 order.client_order_id(),
739 UUID4::new(),
740 ts_now,
741 ts_now,
742 false,
743 order.venue_order_id(),
744 order.account_id(),
745 );
746
747 self.manager.send_exec_event(OrderEventAny::Canceled(event));
748 }
749
750 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
751 if !self.subscribed_strategies.contains(&strategy_id) {
752 if let Some(handler) = &self.on_event_handler {
754 msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
755 msgbus::subscribe_str(
756 format!("events.position.{strategy_id}"),
757 handler.clone(),
758 None,
759 );
760 self.subscribed_strategies.insert(strategy_id);
761 log::info!("Subscribed to strategy {strategy_id} order and position events");
762 }
763 }
764
765 if let Some(position_id) = position_id {
766 if !self.monitored_positions.contains(&position_id) {
767 self.monitored_positions.insert(position_id);
768 }
769 }
770 }
771
772 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
776 match order.order_type() {
777 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
778 self.fill_limit_order(order);
779 }
780 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
781 self.fill_market_order(order);
782 }
783 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
784 }
785 }
786
787 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
791 if matches!(order.order_type(), OrderType::Limit) {
792 self.fill_market_order(order);
793 return;
794 }
795
796 let mut command = match self
798 .manager
799 .pop_submit_order_command(order.client_order_id())
800 {
801 Some(command) => command,
802 None => return, };
804
805 let trigger_instrument_id = order
806 .trigger_instrument_id()
807 .unwrap_or(order.instrument_id());
808
809 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
810 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
811 log::error!("Error deleting order: {e:?}");
812 }
813
814 let emulation_trigger = TriggerType::NoTrigger;
815
816 let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
818 order.trader_id(),
819 order.strategy_id(),
820 order.instrument_id(),
821 order.client_order_id(),
822 order.order_side(),
823 order.quantity(),
824 order.price().unwrap(),
825 order.time_in_force(),
826 order.expire_time(),
827 order.is_post_only(),
828 order.is_reduce_only(),
829 order.is_quote_quantity(),
830 order.display_qty(),
831 Some(emulation_trigger),
832 Some(trigger_instrument_id),
833 order.contingency_type(),
834 order.order_list_id(),
835 order.linked_order_ids().map(Vec::from),
836 order.parent_order_id(),
837 order.exec_algorithm_id(),
838 order.exec_algorithm_params().cloned(),
839 order.exec_spawn_id(),
840 order.tags().map(Vec::from),
841 UUID4::new(),
842 self.clock.borrow().timestamp_ns(),
843 ) {
844 transformed
845 } else {
846 log::error!("Cannot create limit order");
847 return;
848 };
849 transformed.liquidity_side = order.liquidity_side();
850
851 let original_events = order.events();
858
859 for event in original_events {
860 transformed.events.insert(0, event.clone());
861 }
862
863 if let Err(e) = self.cache.borrow_mut().add_order(
864 OrderAny::Limit(transformed.clone()),
865 command.position_id,
866 Some(command.client_id),
867 true,
868 ) {
869 log::error!("Failed to add order: {e}");
870 }
871
872 command.order = OrderAny::Limit(transformed.clone());
874
875 msgbus::publish(
876 format!("events.order.{}", order.strategy_id()).into(),
877 transformed.last_event(),
878 );
879
880 let released_price = match order.order_side() {
882 OrderSide::Buy => matching_core.ask,
883 OrderSide::Sell => matching_core.bid,
884 _ => panic!("invalid `OrderSide`"),
885 };
886
887 let event = OrderReleased::new(
889 order.trader_id(),
890 order.strategy_id(),
891 order.instrument_id(),
892 order.client_order_id(),
893 released_price.unwrap(),
894 UUID4::new(),
895 self.clock.borrow().timestamp_ns(),
896 self.clock.borrow().timestamp_ns(),
897 );
898
899 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
900 log::error!("Failed to apply order event: {e}");
901 }
902
903 if let Err(e) = self
904 .cache
905 .borrow_mut()
906 .update_order(&OrderAny::Limit(transformed.clone()))
907 {
908 log::error!("Failed to update order: {e}");
909 }
910
911 self.manager.send_risk_event(OrderEventAny::Released(event));
912
913 log::info!("Releasing order {}", order.client_order_id());
914
915 msgbus::publish(
917 format!("events.order.{}", transformed.strategy_id()).into(),
918 &OrderEventAny::Released(event),
919 );
920
921 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
922 self.manager.send_algo_command(command, exec_algorithm_id);
923 } else {
924 self.manager
925 .send_exec_command(TradingCommand::SubmitOrder(command));
926 }
927 } else {
928 log::error!(
929 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
930 );
931 }
932 }
933
934 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
938 let mut command = match self
940 .manager
941 .pop_submit_order_command(order.client_order_id())
942 {
943 Some(command) => command,
944 None => panic!("invalid operation `_fill_market_order` with no command"),
945 };
946
947 let trigger_instrument_id = order
948 .trigger_instrument_id()
949 .unwrap_or(order.instrument_id());
950
951 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
952 if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
953 log::error!("Cannot delete order: {e:?}");
954 }
955
956 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
957
958 let mut transformed = MarketOrder::new(
960 order.trader_id(),
961 order.strategy_id(),
962 order.instrument_id(),
963 order.client_order_id(),
964 order.order_side(),
965 order.quantity(),
966 order.time_in_force(),
967 UUID4::new(),
968 self.clock.borrow().timestamp_ns(),
969 order.is_reduce_only(),
970 order.is_quote_quantity(),
971 order.contingency_type(),
972 order.order_list_id(),
973 order.linked_order_ids().map(Vec::from),
974 order.parent_order_id(),
975 order.exec_algorithm_id(),
976 order.exec_algorithm_params().cloned(),
977 order.exec_spawn_id(),
978 order.tags().map(Vec::from),
979 );
980
981 let original_events = order.events();
982
983 for event in original_events {
984 transformed.events.insert(0, event.clone());
985 }
986
987 if let Err(e) = self.cache.borrow_mut().add_order(
988 OrderAny::Market(transformed.clone()),
989 command.position_id,
990 Some(command.client_id),
991 true,
992 ) {
993 log::error!("Failed to add order: {e}");
994 }
995
996 command.order = OrderAny::Market(transformed.clone());
998
999 msgbus::publish(
1000 format!("events.order.{}", order.strategy_id()).into(),
1001 transformed.last_event(),
1002 );
1003
1004 let released_price = match order.order_side() {
1007 OrderSide::Buy => matching_core.ask,
1008 OrderSide::Sell => matching_core.bid,
1009 _ => panic!("invalid `OrderSide`"),
1010 };
1011
1012 let ts_now = self.clock.borrow().timestamp_ns();
1014 let event = OrderReleased::new(
1015 order.trader_id(),
1016 order.strategy_id(),
1017 order.instrument_id(),
1018 order.client_order_id(),
1019 released_price.unwrap(),
1020 UUID4::new(),
1021 ts_now,
1022 ts_now,
1023 );
1024
1025 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1026 log::error!("Failed to apply order event: {e}");
1027 }
1028
1029 if let Err(e) = self
1030 .cache
1031 .borrow_mut()
1032 .update_order(&OrderAny::Market(transformed))
1033 {
1034 log::error!("Failed to update order: {e}");
1035 }
1036 self.manager.send_risk_event(OrderEventAny::Released(event));
1037
1038 log::info!("Releasing order {}", order.client_order_id());
1039
1040 msgbus::publish(
1042 format!("events.order.{}", order.strategy_id()).into(),
1043 &OrderEventAny::Released(event),
1044 );
1045
1046 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1047 self.manager.send_algo_command(command, exec_algorithm_id);
1048 } else {
1049 self.manager
1050 .send_exec_command(TradingCommand::SubmitOrder(command));
1051 }
1052 } else {
1053 log::error!(
1054 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
1055 );
1056 }
1057 }
1058
1059 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1060 if let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) {
1061 let mut bid = None;
1062 let mut ask = None;
1063 let mut last = None;
1064
1065 if matching_core.is_bid_initialized {
1066 bid = matching_core.bid;
1067 }
1068 if matching_core.is_ask_initialized {
1069 ask = matching_core.ask;
1070 }
1071 if matching_core.is_last_initialized {
1072 last = matching_core.last;
1073 }
1074
1075 let quote_tick = self
1076 .cache
1077 .borrow()
1078 .quote(&matching_core.instrument_id)
1079 .copied();
1080 let trade_tick = self
1081 .cache
1082 .borrow()
1083 .trade(&matching_core.instrument_id)
1084 .copied();
1085
1086 if bid.is_none() && quote_tick.is_some() {
1087 bid = Some(quote_tick.unwrap().bid_price);
1088 }
1089 if ask.is_none() && quote_tick.is_some() {
1090 ask = Some(quote_tick.unwrap().ask_price);
1091 }
1092 if last.is_none() && trade_tick.is_some() {
1093 last = Some(trade_tick.unwrap().price);
1094 }
1095
1096 let (new_trigger_price, new_price) = if let Ok((new_trigger_price, new_price)) =
1097 trailing_stop_calculate(matching_core.price_increment, order, bid, ask, last)
1098 {
1099 (new_trigger_price, new_price)
1100 } else {
1101 log::warn!("Cannot calculate trailing stop order");
1102 return;
1103 };
1104
1105 let (new_trigger_price, new_price) = match (new_trigger_price, new_price) {
1106 (None, None) => return, _ => (new_trigger_price, new_price),
1108 };
1109
1110 let ts_now = self.clock.borrow().timestamp_ns();
1112 let event = OrderUpdated::new(
1113 order.trader_id(),
1114 order.strategy_id(),
1115 order.instrument_id(),
1116 order.client_order_id(),
1117 order.quantity(),
1118 UUID4::new(),
1119 ts_now,
1120 ts_now,
1121 false,
1122 order.venue_order_id(),
1123 order.account_id(),
1124 new_price,
1125 new_trigger_price,
1126 );
1127
1128 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
1129 log::error!("Failed to apply order event: {e}");
1130 }
1131 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1132 log::error!("Failed to update order: {e}");
1133 }
1134
1135 self.manager.send_risk_event(OrderEventAny::Updated(event));
1136 } else {
1137 log::error!(
1138 "Cannot update trailing stop order: no matching core for instrument {}",
1139 order.instrument_id()
1140 );
1141 }
1142 }
1143}