nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    collections::{HashMap, HashSet},
19    fmt::Debug,
20    rc::Rc,
21};
22
23use nautilus_common::{
24    cache::Cache,
25    clock::Clock,
26    logging::{CMD, EVT, RECV},
27    messages::execution::{
28        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
29    },
30    msgbus::{self, handler::ShareableMessageHandler},
31};
32use nautilus_core::uuid::UUID4;
33use nautilus_model::{
34    data::{OrderBookDeltas, QuoteTick, TradeTick},
35    enums::{ContingencyType, OrderSide, OrderStatus, OrderType, TriggerType},
36    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
37    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
38    instruments::Instrument,
39    orders::{LimitOrder, MarketOrder, Order, OrderAny, PassiveOrderAny},
40    types::{Price, Quantity},
41};
42
43use crate::{
44    matching_core::OrderMatchingCore, order_manager::manager::OrderManager,
45    trailing::trailing_stop_calculate,
46};
47
48pub struct OrderEmulator {
49    clock: Rc<RefCell<dyn Clock>>,
50    cache: Rc<RefCell<Cache>>,
51    manager: OrderManager,
52    matching_cores: HashMap<InstrumentId, OrderMatchingCore>,
53    subscribed_quotes: HashSet<InstrumentId>,
54    subscribed_trades: HashSet<InstrumentId>,
55    subscribed_strategies: HashSet<StrategyId>,
56    monitored_positions: HashSet<PositionId>,
57    on_event_handler: Option<ShareableMessageHandler>,
58}
59
60impl Debug for OrderEmulator {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct(stringify!(OrderEmulator))
63            .field("cores", &self.matching_cores.len())
64            .field("subscribed_quotes", &self.subscribed_quotes.len())
65            .finish()
66    }
67}
68
69impl OrderEmulator {
70    pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
71        // TODO: Impl Actor Trait
72        // self.register_base(portfolio, msgbus, cache, clock);
73
74        let active_local = true;
75        let manager = OrderManager::new(clock.clone(), cache.clone(), active_local);
76
77        Self {
78            clock,
79            cache,
80            manager,
81            matching_cores: HashMap::new(),
82            subscribed_quotes: HashSet::new(),
83            subscribed_trades: HashSet::new(),
84            subscribed_strategies: HashSet::new(),
85            monitored_positions: HashSet::new(),
86            on_event_handler: None,
87        }
88    }
89
90    pub fn set_on_event_handler(&mut self, handler: ShareableMessageHandler) {
91        self.on_event_handler = Some(handler);
92    }
93
94    // TODO: WIP
95    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
96    //     self.manager.set_submit_order_handler(handler);
97    // }
98    //
99    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
100    //     self.manager.set_cancel_order_handler(handler);
101    // }
102    //
103    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
104    //     self.manager.set_modify_order_handler(handler);
105    // }
106
107    #[must_use]
108    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
109        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
110        quotes.sort();
111        quotes
112    }
113
114    #[must_use]
115    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
116        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
117        trades.sort();
118        trades
119    }
120
121    #[must_use]
122    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
123        self.manager.get_submit_order_commands()
124    }
125
126    #[must_use]
127    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
128        self.matching_cores.get(instrument_id).cloned()
129    }
130
131    /// Reactivates emulated orders from cache on start.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if no emulated orders are found or processing fails.
136    ///
137    /// # Panics
138    ///
139    /// Panics if a cached client ID cannot be unwrapped.
140    pub fn on_start(&mut self) -> anyhow::Result<()> {
141        let emulated_orders: Vec<OrderAny> = self
142            .cache
143            .borrow()
144            .orders_emulated(None, None, None, None)
145            .into_iter()
146            .cloned()
147            .collect();
148
149        if emulated_orders.is_empty() {
150            log::error!("No emulated orders to reactivate");
151            return Ok(());
152        }
153
154        for order in emulated_orders {
155            if !matches!(
156                order.status(),
157                OrderStatus::Initialized | OrderStatus::Emulated
158            ) {
159                continue; // No longer emulated
160            }
161
162            if let Some(parent_order_id) = &order.parent_order_id() {
163                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
164                    order.clone()
165                } else {
166                    log::error!("Cannot handle order: parent {parent_order_id} not found");
167                    continue;
168                };
169
170                let is_position_closed = parent_order
171                    .position_id()
172                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
173                if parent_order.is_closed() && is_position_closed {
174                    self.manager.cancel_order(&order);
175                    continue; // Parent already closed
176                }
177
178                if parent_order.contingency_type() == Some(ContingencyType::Oto)
179                    && (parent_order.is_active_local()
180                        || parent_order.filled_qty() == Quantity::zero(0))
181                {
182                    continue; // Process contingency order later once parent triggered
183                }
184            }
185
186            let position_id = self
187                .cache
188                .borrow()
189                .position_id(&order.client_order_id())
190                .copied();
191            let client_id = self
192                .cache
193                .borrow()
194                .client_id(&order.client_order_id())
195                .copied();
196
197            let command = SubmitOrder::new(
198                order.trader_id(),
199                client_id.unwrap(),
200                order.strategy_id(),
201                order.instrument_id(),
202                order.client_order_id(),
203                order.venue_order_id().unwrap(),
204                order.clone(),
205                order.exec_algorithm_id(),
206                position_id,
207                UUID4::new(),
208                self.clock.borrow().timestamp_ns(),
209            )?;
210
211            self.handle_submit_order(command);
212        }
213
214        Ok(())
215    }
216
217    pub fn on_event(&mut self, event: OrderEventAny) {
218        log::info!("{RECV}{EVT} {event}");
219
220        self.manager.handle_event(event.clone());
221
222        if let Some(order) = self.cache.borrow().order(&event.client_order_id()) {
223            if order.is_closed() {
224                if let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id()) {
225                    if let Err(e) =
226                        matching_core.delete_order(&PassiveOrderAny::from(order.clone()))
227                    {
228                        log::error!("Error deleting order: {e}");
229                    }
230                }
231            }
232        }
233        // else: Order not in cache yet
234    }
235
236    pub const fn on_stop(&self) {}
237
238    pub fn on_reset(&mut self) {
239        self.manager.reset();
240        self.matching_cores.clear();
241    }
242
243    pub const fn on_dispose(&self) {}
244
245    pub fn execute(&mut self, command: TradingCommand) {
246        log::info!("{RECV}{CMD} {command}");
247
248        match command {
249            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
250            TradingCommand::SubmitOrderList(command) => self.handle_submit_order_list(command),
251            TradingCommand::ModifyOrder(command) => self.handle_modify_order(command),
252            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
253            TradingCommand::CancelAllOrders(command) => self.handle_cancel_all_orders(command),
254            _ => log::error!("Cannot handle command: unrecognized {command:?}"),
255        }
256    }
257
258    fn create_matching_core(
259        &mut self,
260        instrument_id: InstrumentId,
261        price_increment: Price,
262    ) -> OrderMatchingCore {
263        let matching_core =
264            OrderMatchingCore::new(instrument_id, price_increment, None, None, None);
265        self.matching_cores
266            .insert(instrument_id, matching_core.clone());
267        log::info!("Creating matching core for {instrument_id:?}");
268        matching_core
269    }
270
271    /// # Panics
272    ///
273    /// Panics if the emulation trigger type is `NoTrigger`.
274    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
275        let mut order = command.order.clone();
276        let emulation_trigger = order.emulation_trigger();
277
278        assert_ne!(
279            emulation_trigger,
280            Some(TriggerType::NoTrigger),
281            "command.order.emulation_trigger must not be TriggerType::NoTrigger"
282        );
283        assert!(
284            self.manager
285                .get_submit_order_commands()
286                .contains_key(&order.client_order_id()),
287            "command.order.client_order_id must be in submit_order_commands"
288        );
289
290        if !matches!(
291            emulation_trigger,
292            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
293        ) {
294            log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
295            self.manager.cancel_order(&order);
296            return;
297        }
298
299        self.check_monitoring(command.strategy_id, command.position_id);
300
301        // Get or create matching core
302        let trigger_instrument_id = order
303            .trigger_instrument_id()
304            .unwrap_or_else(|| order.instrument_id());
305
306        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
307
308        let mut matching_core = if let Some(core) = matching_core {
309            core
310        } else {
311            // Handle synthetic instruments
312            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
313                let synthetic = self
314                    .cache
315                    .borrow()
316                    .synthetic(&trigger_instrument_id)
317                    .cloned();
318                if let Some(synthetic) = synthetic {
319                    (synthetic.id, synthetic.price_increment)
320                } else {
321                    log::error!(
322                        "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
323                    );
324                    self.manager.cancel_order(&order);
325                    return;
326                }
327            } else {
328                let instrument = self
329                    .cache
330                    .borrow()
331                    .instrument(&trigger_instrument_id)
332                    .cloned();
333                if let Some(instrument) = instrument {
334                    (instrument.id(), instrument.price_increment())
335                } else {
336                    log::error!(
337                        "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
338                    );
339                    self.manager.cancel_order(&order);
340                    return;
341                }
342            };
343
344            self.create_matching_core(instrument_id, price_increment)
345        };
346
347        // Update trailing stop
348        if matches!(
349            order.order_type(),
350            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
351        ) {
352            self.update_trailing_stop_order(&mut order);
353            if order.trigger_price().is_none() {
354                log::error!(
355                    "Cannot handle trailing stop order with no trigger_price and no market updates"
356                );
357
358                self.manager.cancel_order(&order);
359                return;
360            }
361        }
362
363        // Cache command
364        self.manager.cache_submit_order_command(command);
365
366        // Check if immediately marketable
367        matching_core.match_order(&PassiveOrderAny::from(order.clone()), true);
368
369        // Handle data subscriptions
370        match emulation_trigger.unwrap() {
371            TriggerType::Default | TriggerType::BidAsk => {
372                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
373                    if !trigger_instrument_id.is_synthetic() {
374                        // TODO: Impl Actor Trait
375                        // self.subscribe_order_book_deltas(&trigger_instrument_id);
376                    }
377                    // TODO: Impl Actor Trait
378                    // self.subscribe_quote_ticks(&trigger_instrument_id)?;
379                    self.subscribed_quotes.insert(trigger_instrument_id);
380                }
381            }
382            TriggerType::LastPrice => {
383                if !self.subscribed_trades.contains(&trigger_instrument_id) {
384                    // TODO: Impl Actor Trait
385                    // self.subscribe_trade_ticks(&trigger_instrument_id)?;
386                    self.subscribed_trades.insert(trigger_instrument_id);
387                }
388            }
389            _ => {
390                log::error!("Invalid TriggerType: {emulation_trigger:?}");
391                return;
392            }
393        }
394
395        // Check if order was already released
396        if !self
397            .manager
398            .get_submit_order_commands()
399            .contains_key(&order.client_order_id())
400        {
401            return; // Already released
402        }
403
404        // Hold in matching core
405        if let Err(e) = matching_core.add_order(PassiveOrderAny::from(order.clone())) {
406            log::error!("Cannot add order: {e:?}");
407            return;
408        }
409
410        // Generate emulated event if needed
411        if order.status() == OrderStatus::Initialized {
412            let event = OrderEmulated::new(
413                order.trader_id(),
414                order.strategy_id(),
415                order.instrument_id(),
416                order.client_order_id(),
417                UUID4::new(),
418                self.clock.borrow().timestamp_ns(),
419                self.clock.borrow().timestamp_ns(),
420            );
421
422            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
423                log::error!("Cannot apply order event: {e:?}");
424                return;
425            }
426
427            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
428                log::error!("Cannot update order: {e:?}");
429                return;
430            }
431
432            self.manager.send_risk_event(OrderEventAny::Emulated(event));
433
434            msgbus::publish(
435                format!("events.order.{}", order.strategy_id()).into(),
436                &OrderEventAny::Emulated(event),
437            );
438        }
439
440        // Since we are cloning the matching core, we need to insert it back into the original hashmap
441        self.matching_cores
442            .insert(trigger_instrument_id, matching_core);
443
444        log::info!("Emulating {order}");
445    }
446
447    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
448        self.check_monitoring(command.strategy_id, command.position_id);
449
450        for order in &command.order_list.orders {
451            if let Some(parent_order_id) = order.parent_order_id() {
452                let cache = self.cache.borrow();
453                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
454                    parent_order
455                } else {
456                    log::error!("Parent order for {} not found", order.client_order_id());
457                    continue;
458                };
459
460                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
461                    continue; // Process contingency order later once parent triggered
462                }
463            }
464
465            if let Err(e) = self.manager.create_new_submit_order(
466                order,
467                command.position_id,
468                Some(command.client_id),
469            ) {
470                log::error!("Error creating new submit order: {e}");
471            }
472        }
473    }
474
475    fn handle_modify_order(&mut self, command: ModifyOrder) {
476        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
477            let price = match command.price {
478                Some(price) => Some(price),
479                None => order.price(),
480            };
481
482            let trigger_price = match command.trigger_price {
483                Some(trigger_price) => Some(trigger_price),
484                None => order.trigger_price(),
485            };
486
487            // Generate event
488            let ts_now = self.clock.borrow().timestamp_ns();
489            let event = OrderUpdated::new(
490                order.trader_id(),
491                order.strategy_id(),
492                order.instrument_id(),
493                order.client_order_id(),
494                command.quantity.unwrap_or(order.quantity()),
495                UUID4::new(),
496                ts_now,
497                ts_now,
498                false,
499                order.venue_order_id(),
500                order.account_id(),
501                price,
502                trigger_price,
503            );
504
505            self.manager.send_exec_event(OrderEventAny::Updated(event));
506
507            let trigger_instrument_id = order
508                .trigger_instrument_id()
509                .unwrap_or_else(|| order.instrument_id());
510
511            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
512                matching_core.match_order(&PassiveOrderAny::from(order.clone()), false);
513            } else {
514                log::error!(
515                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
516                );
517            }
518        } else {
519            log::error!("Cannot modify order: {} not found", command.client_order_id);
520        }
521    }
522
523    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
524        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
525            order.clone()
526        } else {
527            log::error!("Cannot cancel order: {} not found", command.client_order_id);
528            return;
529        };
530
531        let trigger_instrument_id = order
532            .trigger_instrument_id()
533            .unwrap_or_else(|| order.instrument_id());
534
535        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
536            core
537        } else {
538            self.manager.cancel_order(&order);
539            return;
540        };
541
542        if !matching_core.order_exists(order.client_order_id())
543            && order.is_open()
544            && !order.is_pending_cancel()
545        {
546            // Order not held in the emulator
547            self.manager
548                .send_exec_command(TradingCommand::CancelOrder(command));
549        } else {
550            self.manager.cancel_order(&order);
551        }
552    }
553
554    fn handle_cancel_all_orders(&mut self, command: CancelAllOrders) {
555        let matching_core = match self.matching_cores.get(&command.instrument_id) {
556            Some(core) => core,
557            None => return, // No orders to cancel
558        };
559
560        let orders_to_cancel = match command.order_side {
561            OrderSide::NoOrderSide => {
562                // Get both bid and ask orders
563                let mut all_orders = Vec::new();
564                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
565                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
566                all_orders
567            }
568            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
569            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
570        };
571
572        // Process all orders in a single iteration
573        for order in orders_to_cancel {
574            self.manager.cancel_order(&OrderAny::from(order));
575        }
576    }
577
578    // --------------------------------------------------------------------------------------------
579
580    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
581        log::info!(
582            "Updating order {} quantity to {new_quantity}",
583            order.client_order_id(),
584        );
585
586        // Generate event
587        let ts_now = self.clock.borrow().timestamp_ns();
588        let event = OrderUpdated::new(
589            order.trader_id(),
590            order.strategy_id(),
591            order.instrument_id(),
592            order.client_order_id(),
593            new_quantity,
594            UUID4::new(),
595            ts_now,
596            ts_now,
597            false,
598            None,
599            order.account_id(),
600            None,
601            None,
602        );
603
604        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
605            log::error!("Cannot apply order event: {e:?}");
606            return;
607        }
608        if let Err(e) = self.cache.borrow_mut().update_order(order) {
609            log::error!("Cannot update order: {e:?}");
610            return;
611        }
612
613        self.manager.send_risk_event(OrderEventAny::Updated(event));
614    }
615
616    // -----------------------------------------------------------------------------------------------
617
618    pub fn on_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
619        log::debug!("Processing {deltas:?}");
620
621        let instrument_id = &deltas.instrument_id;
622        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
623            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
624                let best_bid = book.best_bid_price();
625                let best_ask = book.best_ask_price();
626
627                if let Some(best_bid) = best_bid {
628                    matching_core.set_bid_raw(best_bid);
629                }
630
631                if let Some(best_ask) = best_ask {
632                    matching_core.set_ask_raw(best_ask);
633                }
634            } else {
635                log::error!(
636                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
637                    deltas.instrument_id
638                );
639            }
640
641            self.iterate_orders(instrument_id);
642        } else {
643            log::error!(
644                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
645                deltas.instrument_id
646            );
647        }
648    }
649
650    pub fn on_quote_tick(&mut self, quote: QuoteTick) {
651        log::debug!("Processing {quote}:?");
652
653        let instrument_id = &quote.instrument_id;
654        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
655            matching_core.set_bid_raw(quote.bid_price);
656            matching_core.set_ask_raw(quote.ask_price);
657
658            self.iterate_orders(instrument_id);
659        } else {
660            log::error!(
661                "Cannot handle `QuoteTick`: no matching core for instrument {}",
662                quote.instrument_id
663            );
664        }
665    }
666
667    pub fn on_trade_tick(&mut self, trade: TradeTick) {
668        log::debug!("Processing {trade:?}");
669
670        let instrument_id = &trade.instrument_id;
671        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
672            matching_core.set_last_raw(trade.price);
673            if !self.subscribed_quotes.contains(instrument_id) {
674                matching_core.set_bid_raw(trade.price);
675                matching_core.set_ask_raw(trade.price);
676            }
677
678            self.iterate_orders(instrument_id);
679        } else {
680            log::error!(
681                "Cannot handle `TradeTick`: no matching core for instrument {}",
682                trade.instrument_id
683            );
684        }
685    }
686
687    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
688        let orders = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
689            matching_core.iterate();
690
691            matching_core.get_orders()
692        } else {
693            log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
694            return;
695        };
696
697        for order in orders {
698            if order.is_closed() {
699                continue;
700            }
701
702            let mut order: OrderAny = order.clone().into();
703            if matches!(
704                order.order_type(),
705                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
706            ) {
707                self.update_trailing_stop_order(&mut order);
708            }
709        }
710    }
711
712    pub fn cancel_order(&mut self, order: &OrderAny) {
713        log::info!("Canceling order {}", order.client_order_id());
714
715        let mut order = order.clone();
716        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
717
718        let trigger_instrument_id = order
719            .trigger_instrument_id()
720            .unwrap_or(order.instrument_id());
721
722        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
723            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
724                log::error!("Cannot delete order: {e:?}");
725            }
726        }
727
728        self.cache
729            .borrow_mut()
730            .update_order_pending_cancel_local(&order);
731
732        // Generate event
733        let ts_now = self.clock.borrow().timestamp_ns();
734        let event = OrderCanceled::new(
735            order.trader_id(),
736            order.strategy_id(),
737            order.instrument_id(),
738            order.client_order_id(),
739            UUID4::new(),
740            ts_now,
741            ts_now,
742            false,
743            order.venue_order_id(),
744            order.account_id(),
745        );
746
747        self.manager.send_exec_event(OrderEventAny::Canceled(event));
748    }
749
750    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
751        if !self.subscribed_strategies.contains(&strategy_id) {
752            // Subscribe to all strategy events
753            if let Some(handler) = &self.on_event_handler {
754                msgbus::subscribe_str(format!("events.order.{strategy_id}"), handler.clone(), None);
755                msgbus::subscribe_str(
756                    format!("events.position.{strategy_id}"),
757                    handler.clone(),
758                    None,
759                );
760                self.subscribed_strategies.insert(strategy_id);
761                log::info!("Subscribed to strategy {strategy_id} order and position events");
762            }
763        }
764
765        if let Some(position_id) = position_id {
766            if !self.monitored_positions.contains(&position_id) {
767                self.monitored_positions.insert(position_id);
768            }
769        }
770    }
771
772    /// # Panics
773    ///
774    /// Panics if the order type is invalid for a stop order.
775    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
776        match order.order_type() {
777            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
778                self.fill_limit_order(order);
779            }
780            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
781                self.fill_market_order(order);
782            }
783            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
784        }
785    }
786
787    /// # Panics
788    ///
789    /// Panics if a limit order has no price.
790    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
791        if matches!(order.order_type(), OrderType::Limit) {
792            self.fill_market_order(order);
793            return;
794        }
795
796        // Fetch command
797        let mut command = match self
798            .manager
799            .pop_submit_order_command(order.client_order_id())
800        {
801            Some(command) => command,
802            None => return, // Order already released
803        };
804
805        let trigger_instrument_id = order
806            .trigger_instrument_id()
807            .unwrap_or(order.instrument_id());
808
809        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
810            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
811                log::error!("Error deleting order: {e:?}");
812            }
813
814            let emulation_trigger = TriggerType::NoTrigger;
815
816            // Transform order
817            let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
818                order.trader_id(),
819                order.strategy_id(),
820                order.instrument_id(),
821                order.client_order_id(),
822                order.order_side(),
823                order.quantity(),
824                order.price().unwrap(),
825                order.time_in_force(),
826                order.expire_time(),
827                order.is_post_only(),
828                order.is_reduce_only(),
829                order.is_quote_quantity(),
830                order.display_qty(),
831                Some(emulation_trigger),
832                Some(trigger_instrument_id),
833                order.contingency_type(),
834                order.order_list_id(),
835                order.linked_order_ids().map(Vec::from),
836                order.parent_order_id(),
837                order.exec_algorithm_id(),
838                order.exec_algorithm_params().cloned(),
839                order.exec_spawn_id(),
840                order.tags().map(Vec::from),
841                UUID4::new(),
842                self.clock.borrow().timestamp_ns(),
843            ) {
844                transformed
845            } else {
846                log::error!("Cannot create limit order");
847                return;
848            };
849            transformed.liquidity_side = order.liquidity_side();
850
851            // TODO: fix
852            // let triggered_price = order.trigger_price();
853            // if triggered_price.is_some() {
854            //     transformed.trigger_price() = (triggered_price.unwrap());
855            // }
856
857            let original_events = order.events();
858
859            for event in original_events {
860                transformed.events.insert(0, event.clone());
861            }
862
863            if let Err(e) = self.cache.borrow_mut().add_order(
864                OrderAny::Limit(transformed.clone()),
865                command.position_id,
866                Some(command.client_id),
867                true,
868            ) {
869                log::error!("Failed to add order: {e}");
870            }
871
872            // Replace commands order with transformed order
873            command.order = OrderAny::Limit(transformed.clone());
874
875            msgbus::publish(
876                format!("events.order.{}", order.strategy_id()).into(),
877                transformed.last_event(),
878            );
879
880            // Determine triggered price
881            let released_price = match order.order_side() {
882                OrderSide::Buy => matching_core.ask,
883                OrderSide::Sell => matching_core.bid,
884                _ => panic!("invalid `OrderSide`"),
885            };
886
887            // Generate event
888            let event = OrderReleased::new(
889                order.trader_id(),
890                order.strategy_id(),
891                order.instrument_id(),
892                order.client_order_id(),
893                released_price.unwrap(),
894                UUID4::new(),
895                self.clock.borrow().timestamp_ns(),
896                self.clock.borrow().timestamp_ns(),
897            );
898
899            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
900                log::error!("Failed to apply order event: {e}");
901            }
902
903            if let Err(e) = self
904                .cache
905                .borrow_mut()
906                .update_order(&OrderAny::Limit(transformed.clone()))
907            {
908                log::error!("Failed to update order: {e}");
909            }
910
911            self.manager.send_risk_event(OrderEventAny::Released(event));
912
913            log::info!("Releasing order {}", order.client_order_id());
914
915            // Publish event
916            msgbus::publish(
917                format!("events.order.{}", transformed.strategy_id()).into(),
918                &OrderEventAny::Released(event),
919            );
920
921            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
922                self.manager.send_algo_command(command, exec_algorithm_id);
923            } else {
924                self.manager
925                    .send_exec_command(TradingCommand::SubmitOrder(command));
926            }
927        } else {
928            log::error!(
929                "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
930            );
931        }
932    }
933
934    /// # Panics
935    ///
936    /// Panics if a market order command is missing.
937    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
938        // Fetch command
939        let mut command = match self
940            .manager
941            .pop_submit_order_command(order.client_order_id())
942        {
943            Some(command) => command,
944            None => panic!("invalid operation `_fill_market_order` with no command"),
945        };
946
947        let trigger_instrument_id = order
948            .trigger_instrument_id()
949            .unwrap_or(order.instrument_id());
950
951        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
952            if let Err(e) = matching_core.delete_order(&PassiveOrderAny::from(order.clone())) {
953                log::error!("Cannot delete order: {e:?}");
954            }
955
956            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
957
958            // Transform order
959            let mut transformed = MarketOrder::new(
960                order.trader_id(),
961                order.strategy_id(),
962                order.instrument_id(),
963                order.client_order_id(),
964                order.order_side(),
965                order.quantity(),
966                order.time_in_force(),
967                UUID4::new(),
968                self.clock.borrow().timestamp_ns(),
969                order.is_reduce_only(),
970                order.is_quote_quantity(),
971                order.contingency_type(),
972                order.order_list_id(),
973                order.linked_order_ids().map(Vec::from),
974                order.parent_order_id(),
975                order.exec_algorithm_id(),
976                order.exec_algorithm_params().cloned(),
977                order.exec_spawn_id(),
978                order.tags().map(Vec::from),
979            );
980
981            let original_events = order.events();
982
983            for event in original_events {
984                transformed.events.insert(0, event.clone());
985            }
986
987            if let Err(e) = self.cache.borrow_mut().add_order(
988                OrderAny::Market(transformed.clone()),
989                command.position_id,
990                Some(command.client_id),
991                true,
992            ) {
993                log::error!("Failed to add order: {e}");
994            }
995
996            // Replace commands order with transformed order
997            command.order = OrderAny::Market(transformed.clone());
998
999            msgbus::publish(
1000                format!("events.order.{}", order.strategy_id()).into(),
1001                transformed.last_event(),
1002            );
1003
1004            // Determine triggered price
1005            // TODO: fix unwraps
1006            let released_price = match order.order_side() {
1007                OrderSide::Buy => matching_core.ask,
1008                OrderSide::Sell => matching_core.bid,
1009                _ => panic!("invalid `OrderSide`"),
1010            };
1011
1012            // Generate event
1013            let ts_now = self.clock.borrow().timestamp_ns();
1014            let event = OrderReleased::new(
1015                order.trader_id(),
1016                order.strategy_id(),
1017                order.instrument_id(),
1018                order.client_order_id(),
1019                released_price.unwrap(),
1020                UUID4::new(),
1021                ts_now,
1022                ts_now,
1023            );
1024
1025            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1026                log::error!("Failed to apply order event: {e}");
1027            }
1028
1029            if let Err(e) = self
1030                .cache
1031                .borrow_mut()
1032                .update_order(&OrderAny::Market(transformed))
1033            {
1034                log::error!("Failed to update order: {e}");
1035            }
1036            self.manager.send_risk_event(OrderEventAny::Released(event));
1037
1038            log::info!("Releasing order {}", order.client_order_id());
1039
1040            // Publish event
1041            msgbus::publish(
1042                format!("events.order.{}", order.strategy_id()).into(),
1043                &OrderEventAny::Released(event),
1044            );
1045
1046            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1047                self.manager.send_algo_command(command, exec_algorithm_id);
1048            } else {
1049                self.manager
1050                    .send_exec_command(TradingCommand::SubmitOrder(command));
1051            }
1052        } else {
1053            log::error!(
1054                "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
1055            );
1056        }
1057    }
1058
1059    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1060        if let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) {
1061            let mut bid = None;
1062            let mut ask = None;
1063            let mut last = None;
1064
1065            if matching_core.is_bid_initialized {
1066                bid = matching_core.bid;
1067            }
1068            if matching_core.is_ask_initialized {
1069                ask = matching_core.ask;
1070            }
1071            if matching_core.is_last_initialized {
1072                last = matching_core.last;
1073            }
1074
1075            let quote_tick = self
1076                .cache
1077                .borrow()
1078                .quote(&matching_core.instrument_id)
1079                .copied();
1080            let trade_tick = self
1081                .cache
1082                .borrow()
1083                .trade(&matching_core.instrument_id)
1084                .copied();
1085
1086            if bid.is_none() && quote_tick.is_some() {
1087                bid = Some(quote_tick.unwrap().bid_price);
1088            }
1089            if ask.is_none() && quote_tick.is_some() {
1090                ask = Some(quote_tick.unwrap().ask_price);
1091            }
1092            if last.is_none() && trade_tick.is_some() {
1093                last = Some(trade_tick.unwrap().price);
1094            }
1095
1096            let (new_trigger_price, new_price) = if let Ok((new_trigger_price, new_price)) =
1097                trailing_stop_calculate(matching_core.price_increment, order, bid, ask, last)
1098            {
1099                (new_trigger_price, new_price)
1100            } else {
1101                log::warn!("Cannot calculate trailing stop order");
1102                return;
1103            };
1104
1105            let (new_trigger_price, new_price) = match (new_trigger_price, new_price) {
1106                (None, None) => return, // No updates
1107                _ => (new_trigger_price, new_price),
1108            };
1109
1110            // Generate event
1111            let ts_now = self.clock.borrow().timestamp_ns();
1112            let event = OrderUpdated::new(
1113                order.trader_id(),
1114                order.strategy_id(),
1115                order.instrument_id(),
1116                order.client_order_id(),
1117                order.quantity(),
1118                UUID4::new(),
1119                ts_now,
1120                ts_now,
1121                false,
1122                order.venue_order_id(),
1123                order.account_id(),
1124                new_price,
1125                new_trigger_price,
1126            );
1127
1128            if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
1129                log::error!("Failed to apply order event: {e}");
1130            }
1131            if let Err(e) = self.cache.borrow_mut().update_order(order) {
1132                log::error!("Failed to update order: {e}");
1133            }
1134
1135            self.manager.send_risk_event(OrderEventAny::Updated(event));
1136        } else {
1137            log::error!(
1138                "Cannot update trailing stop order: no matching core for instrument {}",
1139                order.instrument_id()
1140            );
1141        }
1142    }
1143}