nautilus_execution/matching_engine/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    any::Any,
22    cell::RefCell,
23    cmp::min,
24    collections::HashMap,
25    fmt::Debug,
26    ops::{Add, Sub},
27    rc::Rc,
28};
29
30use chrono::TimeDelta;
31use nautilus_common::{
32    cache::Cache,
33    clock::Clock,
34    messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
35    msgbus,
36};
37use nautilus_core::{UUID4, UnixNanos};
38use nautilus_model::{
39    data::{Bar, BarType, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, order::BookOrder},
40    enums::{
41        AccountType, AggregationSource, AggressorSide, BarAggregation, BookType, ContingencyType,
42        LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderSideSpecified,
43        OrderStatus, OrderType, PriceType, TimeInForce,
44    },
45    events::{
46        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
47        OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
48    },
49    identifiers::{
50        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
51        VenueOrderId,
52    },
53    instruments::{EXPIRING_INSTRUMENT_TYPES, Instrument, InstrumentAny},
54    orderbook::OrderBook,
55    orders::{Order, OrderAny, PassiveOrderAny, StopOrderAny},
56    position::Position,
57    types::{Currency, Money, Price, Quantity, fixed::FIXED_PRECISION},
58};
59use ustr::Ustr;
60
61use crate::{
62    matching_core::OrderMatchingCore,
63    matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
64    models::{
65        fee::{FeeModel, FeeModelAny},
66        fill::FillModel,
67    },
68    trailing::trailing_stop_calculate,
69};
70
71/// An order matching engine for a single market.
72pub struct OrderMatchingEngine {
73    /// The venue for the matching engine.
74    pub venue: Venue,
75    /// The instrument for the matching engine.
76    pub instrument: InstrumentAny,
77    /// The instruments raw integer ID for the venue.
78    pub raw_id: u32,
79    /// The order book type for the matching engine.
80    pub book_type: BookType,
81    /// The order management system (OMS) type for the matching engine.
82    pub oms_type: OmsType,
83    /// The account type for the matching engine.
84    pub account_type: AccountType,
85    /// The market status for the matching engine.
86    pub market_status: MarketStatus,
87    /// The config for the matching engine.
88    pub config: OrderMatchingEngineConfig,
89    clock: Rc<RefCell<dyn Clock>>,
90    cache: Rc<RefCell<Cache>>,
91    book: OrderBook,
92    pub core: OrderMatchingCore,
93    fill_model: FillModel,
94    fee_model: FeeModelAny,
95    target_bid: Option<Price>,
96    target_ask: Option<Price>,
97    target_last: Option<Price>,
98    last_bar_bid: Option<Bar>,
99    last_bar_ask: Option<Bar>,
100    execution_bar_types: HashMap<InstrumentId, BarType>,
101    execution_bar_deltas: HashMap<BarType, TimeDelta>,
102    account_ids: HashMap<TraderId, AccountId>,
103    cached_filled_qty: HashMap<ClientOrderId, Quantity>,
104    ids_generator: IdsGenerator,
105}
106
107impl Debug for OrderMatchingEngine {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct(stringify!(OrderMatchingEngine))
110            .field("venue", &self.venue)
111            .field("instrument", &self.instrument.id())
112            .finish()
113    }
114}
115
116impl OrderMatchingEngine {
117    /// Creates a new [`OrderMatchingEngine`] instance.
118    #[allow(clippy::too_many_arguments)]
119    pub fn new(
120        instrument: InstrumentAny,
121        raw_id: u32,
122        fill_model: FillModel,
123        fee_model: FeeModelAny,
124        book_type: BookType,
125        oms_type: OmsType,
126        account_type: AccountType,
127        clock: Rc<RefCell<dyn Clock>>,
128        cache: Rc<RefCell<Cache>>,
129        config: OrderMatchingEngineConfig,
130    ) -> Self {
131        let book = OrderBook::new(instrument.id(), book_type);
132        let core = OrderMatchingCore::new(
133            instrument.id(),
134            instrument.price_increment(),
135            None, // TBD (will be a function on the engine)
136            None, // TBD (will be a function on the engine)
137            None, // TBD (will be a function on the engine)
138        );
139        let ids_generator = IdsGenerator::new(
140            instrument.id().venue,
141            oms_type,
142            raw_id,
143            config.use_random_ids,
144            config.use_position_ids,
145            cache.clone(),
146        );
147
148        Self {
149            venue: instrument.id().venue,
150            instrument,
151            raw_id,
152            fill_model,
153            fee_model,
154            book_type,
155            oms_type,
156            account_type,
157            clock,
158            cache,
159            book,
160            core,
161            market_status: MarketStatus::Open,
162            config,
163            target_bid: None,
164            target_ask: None,
165            target_last: None,
166            last_bar_bid: None,
167            last_bar_ask: None,
168            execution_bar_types: HashMap::new(),
169            execution_bar_deltas: HashMap::new(),
170            account_ids: HashMap::new(),
171            cached_filled_qty: HashMap::new(),
172            ids_generator,
173        }
174    }
175
176    /// Resets the matching engine to its initial state.
177    ///
178    /// Clears the order book, execution state, cached data, and resets all
179    /// internal components. This is typically used for backtesting scenarios
180    /// where the engine needs to be reset between test runs.
181    pub fn reset(&mut self) {
182        self.book.clear(0, UnixNanos::default());
183        self.execution_bar_types.clear();
184        self.execution_bar_deltas.clear();
185        self.account_ids.clear();
186        self.cached_filled_qty.clear();
187        self.core.reset();
188        self.target_bid = None;
189        self.target_ask = None;
190        self.target_last = None;
191        self.ids_generator.reset();
192
193        log::info!("Reset {}", self.instrument.id());
194    }
195
196    /// Sets the fill model for the matching engine.
197    pub const fn set_fill_model(&mut self, fill_model: FillModel) {
198        self.fill_model = fill_model;
199    }
200
201    #[must_use]
202    /// Returns the best bid price from the order book.
203    pub fn best_bid_price(&self) -> Option<Price> {
204        self.book.best_bid_price()
205    }
206
207    #[must_use]
208    /// Returns the best ask price from the order book.
209    pub fn best_ask_price(&self) -> Option<Price> {
210        self.book.best_ask_price()
211    }
212
213    #[must_use]
214    /// Returns a reference to the internal order book.
215    pub const fn get_book(&self) -> &OrderBook {
216        &self.book
217    }
218
219    #[must_use]
220    /// Returns all open bid orders managed by the matching core.
221    pub const fn get_open_bid_orders(&self) -> &[PassiveOrderAny] {
222        self.core.get_orders_bid()
223    }
224
225    #[must_use]
226    /// Returns all open ask orders managed by the matching core.
227    pub const fn get_open_ask_orders(&self) -> &[PassiveOrderAny] {
228        self.core.get_orders_ask()
229    }
230
231    #[must_use]
232    /// Returns all open orders from both bid and ask sides.
233    pub fn get_open_orders(&self) -> Vec<PassiveOrderAny> {
234        // Get orders from both open bid orders and open ask orders
235        let mut orders = Vec::new();
236        orders.extend_from_slice(self.core.get_orders_bid());
237        orders.extend_from_slice(self.core.get_orders_ask());
238        orders
239    }
240
241    #[must_use]
242    /// Returns true if an order with the given client order ID exists in the matching engine.
243    pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
244        self.core.order_exists(client_order_id)
245    }
246
247    // -- DATA PROCESSING -------------------------------------------------------------------------
248
249    /// Process the venues market for the given order book delta.
250    pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) {
251        log::debug!("Processing {delta}");
252
253        if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
254            self.book.apply_delta(delta);
255        }
256
257        self.iterate(delta.ts_event);
258    }
259
260    pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
261        log::debug!("Processing {deltas}");
262
263        if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
264            self.book.apply_deltas(deltas);
265        }
266
267        self.iterate(deltas.ts_event);
268    }
269
270    /// # Panics
271    ///
272    /// Panics if updating the order book with the quote tick fails.
273    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
274        log::debug!("Processing {quote}");
275
276        if self.book_type == BookType::L1_MBP {
277            self.book.update_quote_tick(quote).unwrap();
278        }
279
280        self.iterate(quote.ts_event);
281    }
282
283    /// # Panics
284    ///
285    /// Panics if the bar type configuration is missing a time delta.
286    pub fn process_bar(&mut self, bar: &Bar) {
287        log::debug!("Processing {bar}");
288
289        // Check if configured for bar execution can only process an L1 book with bars
290        if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
291            return;
292        }
293
294        let bar_type = bar.bar_type;
295        // Do not process internally aggregated bars
296        if bar_type.aggregation_source() == AggregationSource::Internal {
297            return;
298        }
299
300        // Do not process monthly bars (no `timedelta` available)
301        if bar_type.spec().aggregation == BarAggregation::Month {
302            return;
303        }
304
305        let execution_bar_type =
306            if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
307                execution_bar_type.to_owned()
308            } else {
309                self.execution_bar_types
310                    .insert(bar.instrument_id(), bar_type);
311                self.execution_bar_deltas
312                    .insert(bar_type, bar_type.spec().timedelta());
313                bar_type
314            };
315
316        if execution_bar_type != bar_type {
317            let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
318            if bar_type_timedelta.is_none() {
319                bar_type_timedelta = Some(bar_type.spec().timedelta());
320                self.execution_bar_deltas
321                    .insert(bar_type, bar_type_timedelta.unwrap());
322            }
323            if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
324                >= &bar_type_timedelta.unwrap()
325            {
326                self.execution_bar_types
327                    .insert(bar_type.instrument_id(), bar_type);
328            } else {
329                return;
330            }
331        }
332
333        match bar_type.spec().price_type {
334            PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
335            PriceType::Bid => {
336                self.last_bar_bid = Some(bar.to_owned());
337                self.process_quote_ticks_from_bar(bar);
338            }
339            PriceType::Ask => {
340                self.last_bar_ask = Some(bar.to_owned());
341                self.process_quote_ticks_from_bar(bar);
342            }
343            PriceType::Mark => panic!("Not implemented"),
344        }
345    }
346
347    fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
348        // Split the bar into 4 trades with quarter volume
349        let size = Quantity::new(bar.volume.as_f64() / 4.0, bar.volume.precision);
350        let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
351        {
352            AggressorSide::Buyer
353        } else {
354            AggressorSide::Seller
355        };
356
357        // Create reusable trade tick
358        let mut trade_tick = TradeTick::new(
359            bar.instrument_id(),
360            bar.open,
361            size,
362            aggressor_side,
363            self.ids_generator.generate_trade_id(),
364            bar.ts_event,
365            bar.ts_event,
366        );
367
368        // Open
369        // Check if not initialized, if it is, it will be updated by the close or last
370        if !self.core.is_last_initialized {
371            self.book.update_trade_tick(&trade_tick).unwrap();
372            self.iterate(trade_tick.ts_init);
373            self.core.set_last_raw(trade_tick.price);
374        }
375
376        // High
377        // Check if higher than last
378        if self.core.last.is_some_and(|last| bar.high > last) {
379            trade_tick.price = bar.high;
380            trade_tick.aggressor_side = AggressorSide::Buyer;
381            trade_tick.trade_id = self.ids_generator.generate_trade_id();
382
383            self.book.update_trade_tick(&trade_tick).unwrap();
384            self.iterate(trade_tick.ts_init);
385
386            self.core.set_last_raw(trade_tick.price);
387        }
388
389        // Low
390        // Check if lower than last
391        // Assumption: market traded down, aggressor hitting the bid(setting aggressor to seller)
392        if self.core.last.is_some_and(|last| bar.low < last) {
393            trade_tick.price = bar.low;
394            trade_tick.aggressor_side = AggressorSide::Seller;
395            trade_tick.trade_id = self.ids_generator.generate_trade_id();
396
397            self.book.update_trade_tick(&trade_tick).unwrap();
398            self.iterate(trade_tick.ts_init);
399
400            self.core.set_last_raw(trade_tick.price);
401        }
402
403        // Close
404        // Check if not the same as last
405        // Assumption: if close price is higher then last, aggressor is buyer
406        // Assumption: if close price is lower then last, aggressor is seller
407        if self.core.last.is_some_and(|last| bar.close != last) {
408            trade_tick.price = bar.close;
409            trade_tick.aggressor_side = if bar.close > self.core.last.unwrap() {
410                AggressorSide::Buyer
411            } else {
412                AggressorSide::Seller
413            };
414            trade_tick.trade_id = self.ids_generator.generate_trade_id();
415
416            self.book.update_trade_tick(&trade_tick).unwrap();
417            self.iterate(trade_tick.ts_init);
418
419            self.core.set_last_raw(trade_tick.price);
420        }
421    }
422
423    fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
424        // Wait for next bar
425        if self.last_bar_bid.is_none()
426            || self.last_bar_ask.is_none()
427            || self.last_bar_bid.unwrap().ts_event != self.last_bar_ask.unwrap().ts_event
428        {
429            return;
430        }
431        let bid_bar = self.last_bar_bid.unwrap();
432        let ask_bar = self.last_bar_ask.unwrap();
433        let bid_size = Quantity::new(bid_bar.volume.as_f64() / 4.0, bar.volume.precision);
434        let ask_size = Quantity::new(ask_bar.volume.as_f64() / 4.0, bar.volume.precision);
435
436        // Create reusable quote tick
437        let mut quote_tick = QuoteTick::new(
438            self.book.instrument_id,
439            bid_bar.open,
440            ask_bar.open,
441            bid_size,
442            ask_size,
443            bid_bar.ts_init,
444            bid_bar.ts_init,
445        );
446
447        // Open
448        self.book.update_quote_tick(&quote_tick).unwrap();
449        self.iterate(quote_tick.ts_init);
450
451        // High
452        quote_tick.bid_price = bid_bar.high;
453        quote_tick.ask_price = ask_bar.high;
454        self.book.update_quote_tick(&quote_tick).unwrap();
455        self.iterate(quote_tick.ts_init);
456
457        // Low
458        quote_tick.bid_price = bid_bar.low;
459        quote_tick.ask_price = ask_bar.low;
460        self.book.update_quote_tick(&quote_tick).unwrap();
461        self.iterate(quote_tick.ts_init);
462
463        // Close
464        quote_tick.bid_price = bid_bar.close;
465        quote_tick.ask_price = ask_bar.close;
466        self.book.update_quote_tick(&quote_tick).unwrap();
467        self.iterate(quote_tick.ts_init);
468
469        // Reset last bars
470        self.last_bar_bid = None;
471        self.last_bar_ask = None;
472    }
473
474    /// # Panics
475    ///
476    /// Panics if updating the order book with the trade tick fails.
477    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
478        log::debug!("Processing {trade}");
479
480        if self.book_type == BookType::L1_MBP {
481            self.book.update_trade_tick(trade).unwrap();
482        }
483        self.core.set_last_raw(trade.price);
484
485        self.iterate(trade.ts_event);
486    }
487
488    pub fn process_status(&mut self, action: MarketStatusAction) {
489        log::debug!("Processing {action}");
490
491        // Check if market is closed and market opens with trading or pre-open status
492        if self.market_status == MarketStatus::Closed
493            && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
494        {
495            self.market_status = MarketStatus::Open;
496        }
497        // Check if market is open and market pauses
498        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
499            self.market_status = MarketStatus::Paused;
500        }
501        // Check if market is open and market suspends
502        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
503            self.market_status = MarketStatus::Suspended;
504        }
505        // Check if market is open and we halt or close
506        if self.market_status == MarketStatus::Open
507            && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
508        {
509            self.market_status = MarketStatus::Closed;
510        }
511    }
512
513    // -- TRADING COMMANDS ------------------------------------------------------------------------
514
515    /// # Panics
516    ///
517    /// Panics if the instrument activation timestamp is missing.
518    #[allow(clippy::needless_return)]
519    pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
520        // Enter the scope where you will borrow a cache
521        {
522            let cache_borrow = self.cache.as_ref().borrow();
523
524            if self.core.order_exists(order.client_order_id()) {
525                self.generate_order_rejected(order, "Order already exists".into());
526                return;
527            }
528
529            // Index identifiers
530            self.account_ids.insert(order.trader_id(), account_id);
531
532            // Check for instrument expiration or activation
533            if EXPIRING_INSTRUMENT_TYPES.contains(&self.instrument.instrument_class()) {
534                if let Some(activation_ns) = self.instrument.activation_ns() {
535                    if self.clock.borrow().timestamp_ns() < activation_ns {
536                        self.generate_order_rejected(
537                            order,
538                            format!(
539                                "Contract {} is not yet active, activation {}",
540                                self.instrument.id(),
541                                self.instrument.activation_ns().unwrap()
542                            )
543                            .into(),
544                        );
545                        return;
546                    }
547                }
548                if let Some(expiration_ns) = self.instrument.expiration_ns() {
549                    if self.clock.borrow().timestamp_ns() >= expiration_ns {
550                        self.generate_order_rejected(
551                            order,
552                            format!(
553                                "Contract {} has expired, expiration {}",
554                                self.instrument.id(),
555                                self.instrument.expiration_ns().unwrap()
556                            )
557                            .into(),
558                        );
559                        return;
560                    }
561                }
562            }
563
564            // Contingent orders checks
565            if self.config.support_contingent_orders {
566                if let Some(parent_order_id) = order.parent_order_id() {
567                    let parent_order = cache_borrow.order(&parent_order_id);
568                    if parent_order.is_none()
569                        || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
570                    {
571                        panic!("OTO parent not found");
572                    }
573                    if let Some(parent_order) = parent_order {
574                        let parent_order_status = parent_order.status();
575                        let order_is_open = order.is_open();
576                        if parent_order.status() == OrderStatus::Rejected && order.is_open() {
577                            self.generate_order_rejected(
578                                order,
579                                format!("Rejected OTO order from {parent_order_id}").into(),
580                            );
581                            return;
582                        } else if parent_order.status() == OrderStatus::Accepted
583                            && parent_order.status() == OrderStatus::Triggered
584                        {
585                            log::info!(
586                                "Pending OTO order {} triggers from {parent_order_id}",
587                                order.client_order_id(),
588                            );
589                            return;
590                        }
591                    }
592                }
593
594                if let Some(linked_order_ids) = order.linked_order_ids() {
595                    for client_order_id in linked_order_ids {
596                        match cache_borrow.order(client_order_id) {
597                            Some(contingent_order)
598                                if (order.contingency_type().unwrap() == ContingencyType::Oco
599                                    || order.contingency_type().unwrap()
600                                        == ContingencyType::Ouo)
601                                    && !order.is_closed()
602                                    && contingent_order.is_closed() =>
603                            {
604                                self.generate_order_rejected(
605                                    order,
606                                    format!("Contingent order {client_order_id} already closed")
607                                        .into(),
608                                );
609                                return;
610                            }
611                            None => panic!("Cannot find contingent order for {client_order_id}"),
612                            _ => {}
613                        }
614                    }
615                }
616            }
617
618            // Check fo valid order quantity precision
619            if order.quantity().precision != self.instrument.size_precision() {
620                self.generate_order_rejected(
621                    order,
622                    format!(
623                        "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
624                        order.client_order_id(),
625                        order.quantity().precision,
626                        self.instrument.id(),
627                        self.instrument.size_precision()
628                    )
629                        .into(),
630                );
631                return;
632            }
633
634            // Check for valid order price precision
635            if let Some(price) = order.price() {
636                if price.precision != self.instrument.price_precision() {
637                    self.generate_order_rejected(
638                        order,
639                        format!(
640                            "Invalid order price precision for order {}, was {} when {} price precision is {}",
641                            order.client_order_id(),
642                            price.precision,
643                            self.instrument.id(),
644                            self.instrument.price_precision()
645                        )
646                            .into(),
647                    );
648                    return;
649                }
650            }
651
652            // Check for valid order trigger price precision
653            if let Some(trigger_price) = order.trigger_price() {
654                if trigger_price.precision != self.instrument.price_precision() {
655                    self.generate_order_rejected(
656                        order,
657                        format!(
658                            "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
659                            order.client_order_id(),
660                            trigger_price.precision,
661                            self.instrument.id(),
662                            self.instrument.price_precision()
663                        )
664                            .into(),
665                    );
666                    return;
667                }
668            }
669
670            // Get position if exists
671            let position: Option<&Position> = cache_borrow
672                .position_for_order(&order.client_order_id())
673                .or_else(|| {
674                    if self.oms_type == OmsType::Netting {
675                        let position_id = PositionId::new(
676                            format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
677                        );
678                        cache_borrow.position(&position_id)
679                    } else {
680                        None
681                    }
682                });
683
684            // Check not shorting an equity without a MARGIN account
685            if order.order_side() == OrderSide::Sell
686                && self.account_type != AccountType::Margin
687                && matches!(self.instrument, InstrumentAny::Equity(_))
688                && (position.is_none()
689                    || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
690            {
691                let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
692                self.generate_order_rejected(
693                    order,
694                    format!(
695                        "Short selling not permitted on a CASH account with position {position_string} and order {order}",
696                    )
697                        .into(),
698                );
699                return;
700            }
701
702            // Check reduce-only instruction
703            if self.config.use_reduce_only
704                && order.is_reduce_only()
705                && !order.is_closed()
706                && position.is_none_or(|pos| {
707                    pos.is_closed()
708                        || (order.is_buy() && pos.is_long())
709                        || (order.is_sell() && pos.is_short())
710                })
711            {
712                self.generate_order_rejected(
713                    order,
714                    format!(
715                        "Reduce-only order {} ({}-{}) would have increased position",
716                        order.client_order_id(),
717                        order.order_type().to_string().to_uppercase(),
718                        order.order_side().to_string().to_uppercase()
719                    )
720                    .into(),
721                );
722                return;
723            }
724        }
725
726        match order.order_type() {
727            OrderType::Market => self.process_market_order(order),
728            OrderType::Limit => self.process_limit_order(order),
729            OrderType::MarketToLimit => self.process_market_to_limit_order(order),
730            OrderType::StopMarket => self.process_stop_market_order(order),
731            OrderType::StopLimit => self.process_stop_limit_order(order),
732            OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
733            OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
734            OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
735            OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
736        }
737    }
738
739    pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
740        if let Some(order) = self.core.get_order(command.client_order_id) {
741            self.update_order(
742                &mut order.to_any(),
743                command.quantity,
744                command.price,
745                command.trigger_price,
746                None,
747            );
748        } else {
749            self.generate_order_modify_rejected(
750                command.trader_id,
751                command.strategy_id,
752                command.instrument_id,
753                command.client_order_id,
754                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
755                Some(command.venue_order_id),
756                Some(account_id),
757            );
758        }
759    }
760
761    pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
762        match self.core.get_order(command.client_order_id) {
763            Some(passive_order) => {
764                if passive_order.is_inflight() || passive_order.is_open() {
765                    self.cancel_order(&OrderAny::from(passive_order.to_owned()), None);
766                }
767            }
768            None => self.generate_order_cancel_rejected(
769                command.trader_id,
770                command.strategy_id,
771                account_id,
772                command.instrument_id,
773                command.client_order_id,
774                command.venue_order_id,
775                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
776            ),
777        }
778    }
779
780    pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
781        let open_orders = self
782            .cache
783            .borrow()
784            .orders_open(None, Some(&command.instrument_id), None, None)
785            .into_iter()
786            .cloned()
787            .collect::<Vec<OrderAny>>();
788        for order in open_orders {
789            if command.order_side != OrderSide::NoOrderSide
790                && command.order_side != order.order_side()
791            {
792                continue;
793            }
794            if order.is_inflight() || order.is_open() {
795                self.cancel_order(&order, None);
796            }
797        }
798    }
799
800    pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
801        for order in &command.cancels {
802            self.process_cancel(order, account_id);
803        }
804    }
805
806    fn process_market_order(&mut self, order: &mut OrderAny) {
807        if order.time_in_force() == TimeInForce::AtTheOpen
808            || order.time_in_force() == TimeInForce::AtTheClose
809        {
810            log::error!(
811                "Market auction for the time in force {} is currently not supported",
812                order.time_in_force()
813            );
814            return;
815        }
816
817        // Check if market exists
818        let order_side = order.order_side();
819        let is_ask_initialized = self.core.is_ask_initialized;
820        let is_bid_initialized = self.core.is_bid_initialized;
821        if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
822            || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
823        {
824            self.generate_order_rejected(
825                order,
826                format!("No market for {}", order.instrument_id()).into(),
827            );
828            return;
829        }
830
831        self.fill_market_order(order);
832    }
833
834    fn process_limit_order(&mut self, order: &mut OrderAny) {
835        let limit_px = order.price().expect("Limit order must have a price");
836        if order.is_post_only()
837            && self
838                .core
839                .is_limit_matched(order.order_side_specified(), limit_px)
840        {
841            self.generate_order_rejected(
842                order,
843                format!(
844                    "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
845                    order.order_type(),
846                    order.order_side(),
847                    order.price().unwrap(),
848                    self.core
849                        .bid
850                        .map_or_else(|| "None".to_string(), |p| p.to_string()),
851                    self.core
852                        .ask
853                        .map_or_else(|| "None".to_string(), |p| p.to_string())
854                )
855                .into(),
856            );
857            return;
858        }
859
860        // Order is valid and accepted
861        self.accept_order(order);
862
863        // Check for immediate fill
864        if self
865            .core
866            .is_limit_matched(order.order_side_specified(), limit_px)
867        {
868            // Filling as liquidity taker
869            if order.liquidity_side().is_some()
870                && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
871            {
872                order.set_liquidity_side(LiquiditySide::Taker);
873            }
874            self.fill_limit_order(order);
875        } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
876            self.cancel_order(order, None);
877        }
878    }
879
880    fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
881        // Check that market exists
882        if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
883            || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
884        {
885            self.generate_order_rejected(
886                order,
887                format!("No market for {}", order.instrument_id()).into(),
888            );
889            return;
890        }
891
892        // Immediately fill marketable order
893        self.fill_market_order(order);
894
895        if order.is_open() {
896            self.accept_order(order);
897        }
898    }
899
900    fn process_stop_market_order(&mut self, order: &mut OrderAny) {
901        let stop_px = order
902            .trigger_price()
903            .expect("Stop order must have a trigger price");
904        if self
905            .core
906            .is_stop_matched(order.order_side_specified(), stop_px)
907        {
908            if self.config.reject_stop_orders {
909                self.generate_order_rejected(
910                    order,
911                    format!(
912                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
913                        order.order_type(),
914                        order.order_side(),
915                        order.trigger_price().unwrap(),
916                        self.core
917                            .bid
918                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
919                        self.core
920                            .ask
921                            .map_or_else(|| "None".to_string(), |p| p.to_string())
922                    ).into(),
923                );
924                return;
925            }
926            self.fill_market_order(order);
927            return;
928        }
929
930        // order is not matched but is valid and we accept it
931        self.accept_order(order);
932    }
933
934    fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
935        let stop_px = order
936            .trigger_price()
937            .expect("Stop order must have a trigger price");
938        if self
939            .core
940            .is_stop_matched(order.order_side_specified(), stop_px)
941        {
942            if self.config.reject_stop_orders {
943                self.generate_order_rejected(
944                    order,
945                    format!(
946                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
947                        order.order_type(),
948                        order.order_side(),
949                        order.trigger_price().unwrap(),
950                        self.core
951                            .bid
952                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
953                        self.core
954                            .ask
955                            .map_or_else(|| "None".to_string(), |p| p.to_string())
956                    ).into(),
957                );
958                return;
959            }
960
961            self.accept_order(order);
962            self.generate_order_triggered(order);
963
964            // Check for immediate fill
965            let limit_px = order.price().expect("Stop limit order must have a price");
966            if self
967                .core
968                .is_limit_matched(order.order_side_specified(), limit_px)
969            {
970                order.set_liquidity_side(LiquiditySide::Taker);
971                self.fill_limit_order(order);
972            }
973        }
974
975        // order is not matched but is valid and we accept it
976        self.accept_order(order);
977    }
978
979    fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
980        if self
981            .core
982            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
983        {
984            if self.config.reject_stop_orders {
985                self.generate_order_rejected(
986                    order,
987                    format!(
988                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
989                        order.order_type(),
990                        order.order_side(),
991                        order.trigger_price().unwrap(),
992                        self.core
993                            .bid
994                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
995                        self.core
996                            .ask
997                            .map_or_else(|| "None".to_string(), |p| p.to_string())
998                    ).into(),
999                );
1000                return;
1001            }
1002            self.fill_market_order(order);
1003            return;
1004        }
1005
1006        // Order is valid and accepted
1007        self.accept_order(order);
1008    }
1009
1010    fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
1011        if self
1012            .core
1013            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1014        {
1015            if self.config.reject_stop_orders {
1016                self.generate_order_rejected(
1017                    order,
1018                    format!(
1019                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1020                        order.order_type(),
1021                        order.order_side(),
1022                        order.trigger_price().unwrap(),
1023                        self.core
1024                            .bid
1025                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1026                        self.core
1027                            .ask
1028                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1029                    ).into(),
1030                );
1031                return;
1032            }
1033            self.accept_order(order);
1034            self.generate_order_triggered(order);
1035
1036            // Check if immediate marketable
1037            if self
1038                .core
1039                .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1040            {
1041                order.set_liquidity_side(LiquiditySide::Taker);
1042                self.fill_limit_order(order);
1043            }
1044            return;
1045        }
1046
1047        // Order is valid and accepted
1048        self.accept_order(order);
1049    }
1050
1051    fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1052        if let Some(trigger_price) = order.trigger_price() {
1053            if self
1054                .core
1055                .is_stop_matched(order.order_side_specified(), trigger_price)
1056            {
1057                self.generate_order_rejected(
1058                    order,
1059                    format!(
1060                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1061                        order.order_type(),
1062                        order.order_side(),
1063                        trigger_price,
1064                        self.core
1065                            .bid
1066                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1067                        self.core
1068                            .ask
1069                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1070                    ).into(),
1071                );
1072                return;
1073            }
1074        }
1075
1076        // Order is valid and accepted
1077        self.accept_order(order);
1078    }
1079
1080    // -- ORDER PROCESSING ----------------------------------------------------
1081
1082    /// Iterate the matching engine by processing the bid and ask order sides
1083    /// and advancing time up to the given UNIX `timestamp_ns`.
1084    ///
1085    /// # Panics
1086    ///
1087    /// Panics if the best bid or ask price is unavailable when iterating.
1088    pub fn iterate(&mut self, timestamp_ns: UnixNanos) {
1089        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
1090
1091        // Check for updates in orderbook and set bid and ask in order matching core and iterate
1092        if self.book.has_bid() {
1093            self.core.set_bid_raw(self.book.best_bid_price().unwrap());
1094        }
1095        if self.book.has_ask() {
1096            self.core.set_ask_raw(self.book.best_ask_price().unwrap());
1097        }
1098        self.core.iterate();
1099
1100        self.core.bid = self.book.best_bid_price();
1101        self.core.ask = self.book.best_ask_price();
1102
1103        let orders_bid = self.core.get_orders_bid().to_vec();
1104        let orders_ask = self.core.get_orders_ask().to_vec();
1105
1106        self.iterate_orders(timestamp_ns, &orders_bid);
1107        self.iterate_orders(timestamp_ns, &orders_ask);
1108    }
1109
1110    fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[PassiveOrderAny]) {
1111        for order in orders {
1112            if order.is_closed() {
1113                continue;
1114            }
1115
1116            // Check expiration
1117            if self.config.support_gtd_orders {
1118                if let Some(expire_time) = order.expire_time() {
1119                    if timestamp_ns >= expire_time {
1120                        // SAFTEY: We know this order is in the core
1121                        self.core.delete_order(order).unwrap();
1122                        self.cached_filled_qty.remove(&order.client_order_id());
1123                        self.expire_order(order);
1124                    }
1125                }
1126            }
1127
1128            // Manage trailing stop
1129            if let PassiveOrderAny::Stop(o) = order {
1130                if let PassiveOrderAny::Stop(
1131                    StopOrderAny::TrailingStopMarket(_) | StopOrderAny::TrailingStopLimit(_),
1132                ) = order
1133                {
1134                    let mut order = OrderAny::from(o.to_owned());
1135                    self.update_trailing_stop_order(&mut order);
1136                }
1137            }
1138
1139            // Move market back to targets
1140            if let Some(target_bid) = self.target_bid {
1141                self.core.bid = Some(target_bid);
1142                self.target_bid = None;
1143            }
1144            if let Some(target_ask) = self.target_ask {
1145                self.core.ask = Some(target_ask);
1146                self.target_ask = None;
1147            }
1148            if let Some(target_last) = self.target_last {
1149                self.core.last = Some(target_last);
1150                self.target_last = None;
1151            }
1152        }
1153
1154        // Reset any targets after iteration
1155        self.target_bid = None;
1156        self.target_ask = None;
1157        self.target_last = None;
1158    }
1159
1160    fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1161        match order.price() {
1162            Some(order_price) => {
1163                // construct book order with price as passive with limit order price
1164                let book_order =
1165                    BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
1166
1167                let mut fills = self.book.simulate_fills(&book_order);
1168
1169                // return immediately if no fills
1170                if fills.is_empty() {
1171                    return fills;
1172                }
1173
1174                // check if trigger price exists
1175                if let Some(triggered_price) = order.trigger_price() {
1176                    // Filling as TAKER from trigger
1177                    if order
1178                        .liquidity_side()
1179                        .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
1180                    {
1181                        if order.order_side() == OrderSide::Sell && order_price > triggered_price {
1182                            // manually change the fills index 0
1183                            let first_fill = fills.first().unwrap();
1184                            let triggered_qty = first_fill.1;
1185                            fills[0] = (triggered_price, triggered_qty);
1186                            self.target_bid = self.core.bid;
1187                            self.target_ask = self.core.ask;
1188                            self.target_last = self.core.last;
1189                            self.core.set_ask_raw(order_price);
1190                            self.core.set_last_raw(order_price);
1191                        } else if order.order_side() == OrderSide::Buy
1192                            && order_price < triggered_price
1193                        {
1194                            // manually change the fills index 0
1195                            let first_fill = fills.first().unwrap();
1196                            let triggered_qty = first_fill.1;
1197                            fills[0] = (triggered_price, triggered_qty);
1198                            self.target_bid = self.core.bid;
1199                            self.target_ask = self.core.ask;
1200                            self.target_last = self.core.last;
1201                            self.core.set_bid_raw(order_price);
1202                            self.core.set_last_raw(order_price);
1203                        }
1204                    }
1205                }
1206
1207                // Filling as MAKER from trigger
1208                if order
1209                    .liquidity_side()
1210                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1211                {
1212                    match order.order_side().as_specified() {
1213                        OrderSideSpecified::Buy => {
1214                            let target_price = if order
1215                                .trigger_price()
1216                                .is_some_and(|trigger_price| order_price > trigger_price)
1217                            {
1218                                order.trigger_price().unwrap()
1219                            } else {
1220                                order_price
1221                            };
1222                            for fill in &fills {
1223                                let last_px = fill.0;
1224                                if last_px < order_price {
1225                                    // Marketable SELL would have filled at limit
1226                                    self.target_bid = self.core.bid;
1227                                    self.target_ask = self.core.ask;
1228                                    self.target_last = self.core.last;
1229                                    self.core.set_ask_raw(target_price);
1230                                    self.core.set_last_raw(target_price);
1231                                }
1232                            }
1233                        }
1234                        OrderSideSpecified::Sell => {
1235                            let target_price = if order
1236                                .trigger_price()
1237                                .is_some_and(|trigger_price| order_price < trigger_price)
1238                            {
1239                                order.trigger_price().unwrap()
1240                            } else {
1241                                order_price
1242                            };
1243                            for fill in &fills {
1244                                let last_px = fill.0;
1245                                if last_px > order_price {
1246                                    // Marketable BUY would have filled at limit
1247                                    self.target_bid = self.core.bid;
1248                                    self.target_ask = self.core.ask;
1249                                    self.target_last = self.core.last;
1250                                    self.core.set_bid_raw(target_price);
1251                                    self.core.set_last_raw(target_price);
1252                                }
1253                            }
1254                        }
1255                    }
1256                }
1257
1258                fills
1259            }
1260            None => panic!("Limit order must have a price"),
1261        }
1262    }
1263
1264    fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1265        // construct price
1266        let price = match order.order_side().as_specified() {
1267            OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
1268            OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
1269        };
1270
1271        // Construct BookOrder from order
1272        let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
1273        self.book.simulate_fills(&book_order)
1274    }
1275
1276    pub fn fill_market_order(&mut self, order: &mut OrderAny) {
1277        if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id()) {
1278            if filled_qty >= &order.quantity() {
1279                log::info!(
1280                    "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
1281                    filled_qty,
1282                    order.quantity(),
1283                    order.filled_qty(),
1284                    order.quantity()
1285                );
1286                return;
1287            }
1288        }
1289
1290        let venue_position_id = self.ids_generator.get_position_id(order, Some(true));
1291        let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
1292            let cache = self.cache.as_ref().borrow();
1293            cache.position(&venue_position_id).cloned()
1294        } else {
1295            None
1296        };
1297
1298        if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1299            log::warn!(
1300                "Canceling REDUCE_ONLY {} as would increase position",
1301                order.order_type()
1302            );
1303            self.cancel_order(order, None);
1304            return;
1305        }
1306        // set order side as taker
1307        order.set_liquidity_side(LiquiditySide::Taker);
1308        let fills = self.determine_market_price_and_volume(order);
1309        self.apply_fills(order, fills, LiquiditySide::Taker, None, position);
1310    }
1311
1312    /// # Panics
1313    ///
1314    /// Panics if the order has no price, or if fill price or quantity precision mismatches occur.
1315    pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
1316        match order.price() {
1317            Some(order_price) => {
1318                let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
1319                if cached_filled_qty.is_some() && *cached_filled_qty.unwrap() >= order.quantity() {
1320                    log::debug!(
1321                        "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
1322                        cached_filled_qty.unwrap(),
1323                        order.quantity(),
1324                        order.filled_qty(),
1325                        order.leaves_qty(),
1326                    );
1327                    return;
1328                }
1329
1330                if order
1331                    .liquidity_side()
1332                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1333                {
1334                    if order.order_side() == OrderSide::Buy
1335                        && self.core.bid.is_some_and(|bid| bid == order_price)
1336                        && !self.fill_model.is_limit_filled()
1337                    {
1338                        // no filled
1339                        return;
1340                    }
1341                    if order.order_side() == OrderSide::Sell
1342                        && self.core.ask.is_some_and(|ask| ask == order_price)
1343                        && !self.fill_model.is_limit_filled()
1344                    {
1345                        // no filled
1346                        return;
1347                    }
1348                }
1349
1350                let venue_position_id = self.ids_generator.get_position_id(order, None);
1351                let position = if let Some(venue_position_id) = venue_position_id {
1352                    let cache = self.cache.as_ref().borrow();
1353                    cache.position(&venue_position_id).cloned()
1354                } else {
1355                    None
1356                };
1357
1358                if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1359                    log::warn!(
1360                        "Canceling REDUCE_ONLY {} as would increase position",
1361                        order.order_type()
1362                    );
1363                    self.cancel_order(order, None);
1364                    return;
1365                }
1366
1367                let fills = self.determine_limit_price_and_volume(order);
1368
1369                self.apply_fills(
1370                    order,
1371                    fills,
1372                    order.liquidity_side().unwrap(),
1373                    venue_position_id,
1374                    position,
1375                );
1376            }
1377            None => panic!("Limit order must have a price"),
1378        }
1379    }
1380
1381    fn apply_fills(
1382        &mut self,
1383        order: &mut OrderAny,
1384        fills: Vec<(Price, Quantity)>,
1385        liquidity_side: LiquiditySide,
1386        venue_position_id: Option<PositionId>,
1387        position: Option<Position>,
1388    ) {
1389        if order.time_in_force() == TimeInForce::Fok {
1390            let mut total_size = Quantity::zero(order.quantity().precision);
1391            for (fill_px, fill_qty) in &fills {
1392                total_size = total_size.add(*fill_qty);
1393            }
1394
1395            if order.leaves_qty() > total_size {
1396                self.cancel_order(order, None);
1397                return;
1398            }
1399        }
1400
1401        if fills.is_empty() {
1402            if order.status() == OrderStatus::Submitted {
1403                self.generate_order_rejected(
1404                    order,
1405                    format!("No market for {}", order.instrument_id()).into(),
1406                );
1407            } else {
1408                log::error!(
1409                    "Cannot fill order: no fills from book when fills were expected (check size in data)"
1410                );
1411                return;
1412            }
1413        }
1414
1415        if self.oms_type == OmsType::Netting {
1416            let venue_position_id: Option<PositionId> = None;
1417        }
1418
1419        let mut initial_market_to_limit_fill = false;
1420        for &(mut fill_px, ref fill_qty) in &fills {
1421            // Validate price precision
1422            assert!(
1423                (fill_px.precision == self.instrument.price_precision()),
1424                "Invalid price precision for fill price {} when instrument price precision is {}.\
1425                     Check that the data price precision matches the {} instrument",
1426                fill_px.precision,
1427                self.instrument.price_precision(),
1428                self.instrument.id()
1429            );
1430
1431            // Validate quantity precision
1432            assert!(
1433                (fill_qty.precision == self.instrument.size_precision()),
1434                "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
1435                     Check that the data quantity precision matches the {} instrument",
1436                fill_qty.precision,
1437                self.instrument.size_precision(),
1438                self.instrument.id()
1439            );
1440
1441            if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
1442                && order.order_type() == OrderType::MarketToLimit
1443            {
1444                self.generate_order_updated(order, order.quantity(), Some(fill_px), None);
1445                initial_market_to_limit_fill = true;
1446            }
1447
1448            if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
1449                fill_px = match order.order_side().as_specified() {
1450                    OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
1451                    OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
1452                }
1453            }
1454
1455            // Check reduce only order
1456            if self.config.use_reduce_only && order.is_reduce_only() {
1457                if let Some(position) = &position {
1458                    if *fill_qty > position.quantity {
1459                        if position.quantity == Quantity::zero(position.quantity.precision) {
1460                            // Done
1461                            return;
1462                        }
1463
1464                        // Adjust fill to honor reduce only execution (fill remaining position size only)
1465                        let adjusted_fill_qty =
1466                            Quantity::from_raw(position.quantity.raw, fill_qty.precision);
1467
1468                        self.generate_order_updated(order, adjusted_fill_qty, None, None);
1469                    }
1470                }
1471            }
1472
1473            if fill_qty.is_zero() {
1474                if fills.len() == 1 && order.status() == OrderStatus::Submitted {
1475                    self.generate_order_rejected(
1476                        order,
1477                        format!("No market for {}", order.instrument_id()).into(),
1478                    );
1479                }
1480                return;
1481            }
1482
1483            self.fill_order(
1484                order,
1485                fill_px,
1486                *fill_qty,
1487                liquidity_side,
1488                venue_position_id,
1489                position.clone(),
1490            );
1491
1492            if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
1493                // filled initial level
1494                return;
1495            }
1496        }
1497
1498        if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
1499            // IOC order has filled all available size
1500            self.cancel_order(order, None);
1501            return;
1502        }
1503
1504        if order.is_open()
1505            && self.book_type == BookType::L1_MBP
1506            && matches!(
1507                order.order_type(),
1508                OrderType::Market
1509                    | OrderType::MarketIfTouched
1510                    | OrderType::StopMarket
1511                    | OrderType::TrailingStopMarket
1512            )
1513        {
1514            // Exhausted simulated book volume (continue aggressive filling into next level)
1515            // This is a very basic implementation of slipping by a single tick, in the future
1516            // we will implement more detailed fill modeling.
1517            todo!("Exhausted simulated book volume")
1518        }
1519    }
1520
1521    fn fill_order(
1522        &mut self,
1523        order: &mut OrderAny,
1524        last_px: Price,
1525        last_qty: Quantity,
1526        liquidity_side: LiquiditySide,
1527        venue_position_id: Option<PositionId>,
1528        position: Option<Position>,
1529    ) {
1530        match self.cached_filled_qty.get(&order.client_order_id()) {
1531            Some(filled_qty) => {
1532                let leaves_qty = order.quantity() - *filled_qty;
1533                let last_qty = min(last_qty, leaves_qty);
1534                let new_filled_qty = *filled_qty + last_qty;
1535                // update cached filled qty
1536                self.cached_filled_qty
1537                    .insert(order.client_order_id(), new_filled_qty);
1538            }
1539            None => {
1540                self.cached_filled_qty
1541                    .insert(order.client_order_id(), last_qty);
1542            }
1543        }
1544
1545        // calculate commission
1546        let commission = self
1547            .fee_model
1548            .get_commission(order, last_qty, last_px, &self.instrument)
1549            .unwrap();
1550
1551        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1552        self.generate_order_filled(
1553            order,
1554            venue_order_id,
1555            venue_position_id,
1556            last_qty,
1557            last_px,
1558            self.instrument.quote_currency(),
1559            commission,
1560            liquidity_side,
1561        );
1562
1563        if order.is_passive() && order.is_closed() {
1564            // Check if order exists in OrderMatching core, and delete it if it does
1565            if self.core.order_exists(order.client_order_id()) {
1566                let _ = self
1567                    .core
1568                    .delete_order(&PassiveOrderAny::from(order.clone()));
1569            }
1570            self.cached_filled_qty.remove(&order.client_order_id());
1571        }
1572
1573        if !self.config.support_contingent_orders {
1574            return;
1575        }
1576
1577        if let Some(contingency_type) = order.contingency_type() {
1578            match contingency_type {
1579                ContingencyType::Oto => {
1580                    if let Some(linked_orders_ids) = order.linked_order_ids() {
1581                        for client_order_id in linked_orders_ids {
1582                            let mut child_order = match self.cache.borrow().order(client_order_id) {
1583                                Some(child_order) => child_order.clone(),
1584                                None => panic!("Order {client_order_id} not found in cache"),
1585                            };
1586
1587                            if child_order.is_closed() || child_order.is_active_local() {
1588                                continue;
1589                            }
1590
1591                            // Check if we need to index position id
1592                            if let (None, Some(position_id)) =
1593                                (child_order.position_id(), order.position_id())
1594                            {
1595                                self.cache
1596                                    .borrow_mut()
1597                                    .add_position_id(
1598                                        &position_id,
1599                                        &self.venue,
1600                                        client_order_id,
1601                                        &child_order.strategy_id(),
1602                                    )
1603                                    .unwrap();
1604                                log::debug!(
1605                                    "Added position id {position_id} to cache for order {client_order_id}"
1606                                );
1607                            }
1608
1609                            if (!child_order.is_open())
1610                                || (matches!(child_order.status(), OrderStatus::PendingUpdate)
1611                                    && child_order
1612                                        .previous_status()
1613                                        .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
1614                            {
1615                                let account_id = order.account_id().unwrap_or_else(|| {
1616                                    *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
1617                                        panic!(
1618                                            "Account ID not found for trader {}",
1619                                            order.trader_id()
1620                                        )
1621                                    })
1622                                });
1623                                self.process_order(&mut child_order, account_id);
1624                            }
1625                        }
1626                    } else {
1627                        log::error!(
1628                            "OTO order {} does not have linked orders",
1629                            order.client_order_id()
1630                        );
1631                    }
1632                }
1633                ContingencyType::Oco => {
1634                    if let Some(linked_orders_ids) = order.linked_order_ids() {
1635                        for client_order_id in linked_orders_ids {
1636                            let child_order = match self.cache.borrow().order(client_order_id) {
1637                                Some(child_order) => child_order.clone(),
1638                                None => panic!("Order {client_order_id} not found in cache"),
1639                            };
1640
1641                            if child_order.is_closed() || child_order.is_active_local() {
1642                                continue;
1643                            }
1644
1645                            self.cancel_order(&child_order, None);
1646                        }
1647                    } else {
1648                        log::error!(
1649                            "OCO order {} does not have linked orders",
1650                            order.client_order_id()
1651                        );
1652                    }
1653                }
1654                ContingencyType::Ouo => {
1655                    if let Some(linked_orders_ids) = order.linked_order_ids() {
1656                        for client_order_id in linked_orders_ids {
1657                            let mut child_order = match self.cache.borrow().order(client_order_id) {
1658                                Some(child_order) => child_order.clone(),
1659                                None => panic!("Order {client_order_id} not found in cache"),
1660                            };
1661
1662                            if child_order.is_active_local() {
1663                                continue;
1664                            }
1665
1666                            if order.is_closed() && child_order.is_open() {
1667                                self.cancel_order(&child_order, None);
1668                            } else if !order.leaves_qty().is_zero()
1669                                && order.leaves_qty() != child_order.leaves_qty()
1670                            {
1671                                let price = child_order.price();
1672                                let trigger_price = child_order.trigger_price();
1673                                self.update_order(
1674                                    &mut child_order,
1675                                    Some(order.leaves_qty()),
1676                                    price,
1677                                    trigger_price,
1678                                    Some(false),
1679                                );
1680                            }
1681                        }
1682                    } else {
1683                        log::error!(
1684                            "OUO order {} does not have linked orders",
1685                            order.client_order_id()
1686                        );
1687                    }
1688                }
1689                _ => {}
1690            }
1691        }
1692    }
1693
1694    fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
1695        if self
1696            .core
1697            .is_limit_matched(order.order_side_specified(), price)
1698        {
1699            if order.is_post_only() {
1700                self.generate_order_modify_rejected(
1701                    order.trader_id(),
1702                    order.strategy_id(),
1703                    order.instrument_id(),
1704                    order.client_order_id(),
1705                    Ustr::from(format!(
1706                        "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1707                        order.order_type(),
1708                        order.order_side(),
1709                        price,
1710                        self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1711                        self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1712                    ).as_str()),
1713                    order.venue_order_id(),
1714                    order.account_id(),
1715                );
1716                return;
1717            }
1718
1719            self.generate_order_updated(order, quantity, Some(price), None);
1720            order.set_liquidity_side(LiquiditySide::Taker);
1721            self.fill_limit_order(order);
1722            return;
1723        }
1724        self.generate_order_updated(order, quantity, Some(price), None);
1725    }
1726
1727    fn update_stop_market_order(
1728        &mut self,
1729        order: &mut OrderAny,
1730        quantity: Quantity,
1731        trigger_price: Price,
1732    ) {
1733        if self
1734            .core
1735            .is_stop_matched(order.order_side_specified(), trigger_price)
1736        {
1737            self.generate_order_modify_rejected(
1738                order.trader_id(),
1739                order.strategy_id(),
1740                order.instrument_id(),
1741                order.client_order_id(),
1742                Ustr::from(
1743                    format!(
1744                        "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1745                        order.order_type(),
1746                        order.order_side(),
1747                        trigger_price,
1748                        self.core
1749                            .bid
1750                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1751                        self.core
1752                            .ask
1753                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1754                    )
1755                    .as_str(),
1756                ),
1757                order.venue_order_id(),
1758                order.account_id(),
1759            );
1760            return;
1761        }
1762
1763        self.generate_order_updated(order, quantity, None, Some(trigger_price));
1764    }
1765
1766    fn update_stop_limit_order(
1767        &mut self,
1768        order: &mut OrderAny,
1769        quantity: Quantity,
1770        price: Price,
1771        trigger_price: Price,
1772    ) {
1773        if order.is_triggered().is_some_and(|t| t) {
1774            // Update limit price
1775            if self
1776                .core
1777                .is_limit_matched(order.order_side_specified(), price)
1778            {
1779                if order.is_post_only() {
1780                    self.generate_order_modify_rejected(
1781                        order.trader_id(),
1782                        order.strategy_id(),
1783                        order.instrument_id(),
1784                        order.client_order_id(),
1785                        Ustr::from(format!(
1786                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1787                            order.order_type(),
1788                            order.order_side(),
1789                            price,
1790                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1791                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1792                        ).as_str()),
1793                        order.venue_order_id(),
1794                        order.account_id(),
1795                    );
1796                    return;
1797                }
1798                self.generate_order_updated(order, quantity, Some(price), None);
1799                order.set_liquidity_side(LiquiditySide::Taker);
1800                self.fill_limit_order(order);
1801                return; // Filled
1802            }
1803        } else {
1804            // Update stop price
1805            if self
1806                .core
1807                .is_stop_matched(order.order_side_specified(), trigger_price)
1808            {
1809                self.generate_order_modify_rejected(
1810                    order.trader_id(),
1811                    order.strategy_id(),
1812                    order.instrument_id(),
1813                    order.client_order_id(),
1814                    Ustr::from(
1815                        format!(
1816                            "{} {} order new stop px of {} was in the market: bid={}, ask={}",
1817                            order.order_type(),
1818                            order.order_side(),
1819                            trigger_price,
1820                            self.core
1821                                .bid
1822                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
1823                            self.core
1824                                .ask
1825                                .map_or_else(|| "None".to_string(), |p| p.to_string())
1826                        )
1827                        .as_str(),
1828                    ),
1829                    order.venue_order_id(),
1830                    order.account_id(),
1831                );
1832                return;
1833            }
1834        }
1835
1836        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1837    }
1838
1839    fn update_market_if_touched_order(
1840        &mut self,
1841        order: &mut OrderAny,
1842        quantity: Quantity,
1843        trigger_price: Price,
1844    ) {
1845        if self
1846            .core
1847            .is_touch_triggered(order.order_side_specified(), trigger_price)
1848        {
1849            self.generate_order_modify_rejected(
1850                order.trader_id(),
1851                order.strategy_id(),
1852                order.instrument_id(),
1853                order.client_order_id(),
1854                Ustr::from(
1855                    format!(
1856                        "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1857                        order.order_type(),
1858                        order.order_side(),
1859                        trigger_price,
1860                        self.core
1861                            .bid
1862                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
1863                        self.core
1864                            .ask
1865                            .map_or_else(|| "None".to_string(), |p| p.to_string())
1866                    )
1867                    .as_str(),
1868                ),
1869                order.venue_order_id(),
1870                order.account_id(),
1871            );
1872            // Cannot update order
1873            return;
1874        }
1875
1876        self.generate_order_updated(order, quantity, None, Some(trigger_price));
1877    }
1878
1879    fn update_limit_if_touched_order(
1880        &mut self,
1881        order: &mut OrderAny,
1882        quantity: Quantity,
1883        price: Price,
1884        trigger_price: Price,
1885    ) {
1886        if order.is_triggered().is_some_and(|t| t) {
1887            // Update limit price
1888            if self
1889                .core
1890                .is_limit_matched(order.order_side_specified(), price)
1891            {
1892                if order.is_post_only() {
1893                    self.generate_order_modify_rejected(
1894                        order.trader_id(),
1895                        order.strategy_id(),
1896                        order.instrument_id(),
1897                        order.client_order_id(),
1898                        Ustr::from(format!(
1899                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
1900                            order.order_type(),
1901                            order.order_side(),
1902                            price,
1903                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
1904                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
1905                        ).as_str()),
1906                        order.venue_order_id(),
1907                        order.account_id(),
1908                    );
1909                    // Cannot update order
1910                    return;
1911                }
1912                self.generate_order_updated(order, quantity, Some(price), None);
1913                order.set_liquidity_side(LiquiditySide::Taker);
1914                self.fill_limit_order(order);
1915                return;
1916            }
1917        } else {
1918            // Update trigger price
1919            if self
1920                .core
1921                .is_touch_triggered(order.order_side_specified(), trigger_price)
1922            {
1923                self.generate_order_modify_rejected(
1924                    order.trader_id(),
1925                    order.strategy_id(),
1926                    order.instrument_id(),
1927                    order.client_order_id(),
1928                    Ustr::from(
1929                        format!(
1930                            "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
1931                            order.order_type(),
1932                            order.order_side(),
1933                            trigger_price,
1934                            self.core
1935                                .bid
1936                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
1937                            self.core
1938                                .ask
1939                                .map_or_else(|| "None".to_string(), |p| p.to_string())
1940                        )
1941                        .as_str(),
1942                    ),
1943                    order.venue_order_id(),
1944                    order.account_id(),
1945                );
1946                return;
1947            }
1948        }
1949
1950        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price));
1951    }
1952
1953    fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
1954        let (new_trigger_price, new_price) = trailing_stop_calculate(
1955            self.instrument.price_increment(),
1956            order,
1957            self.core.bid,
1958            self.core.ask,
1959            self.core.last,
1960        )
1961        .unwrap();
1962
1963        if new_trigger_price.is_none() && new_price.is_none() {
1964            return;
1965        }
1966
1967        self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price);
1968    }
1969
1970    // -- EVENT HANDLING -----------------------------------------------------
1971
1972    fn accept_order(&mut self, order: &mut OrderAny) {
1973        if order.is_closed() {
1974            // Temporary guard to prevent invalid processing
1975            return;
1976        }
1977        if order.status() != OrderStatus::Accepted {
1978            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
1979            self.generate_order_accepted(order, venue_order_id);
1980
1981            if matches!(
1982                order.order_type(),
1983                OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
1984            ) && order.trigger_price().is_none()
1985            {
1986                self.update_trailing_stop_order(order);
1987            }
1988        }
1989
1990        let _ = self.core.add_order(order.to_owned().into());
1991    }
1992
1993    fn expire_order(&mut self, order: &PassiveOrderAny) {
1994        if self.config.support_contingent_orders
1995            && order
1996                .contingency_type()
1997                .is_some_and(|c| c != ContingencyType::NoContingency)
1998        {
1999            self.cancel_contingent_orders(&OrderAny::from(order.clone()));
2000        }
2001
2002        self.generate_order_expired(&order.to_any());
2003    }
2004
2005    fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
2006        let cancel_contingencies = cancel_contingencies.unwrap_or(true);
2007        if order.is_active_local() {
2008            log::error!(
2009                "Cannot cancel an order with {} from the matching engine",
2010                order.status()
2011            );
2012            return;
2013        }
2014
2015        // Check if order exists in OrderMatching core, and delete it if it does
2016        if self.core.order_exists(order.client_order_id()) {
2017            let _ = self
2018                .core
2019                .delete_order(&PassiveOrderAny::from(order.clone()));
2020        }
2021        self.cached_filled_qty.remove(&order.client_order_id());
2022
2023        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2024        self.generate_order_canceled(order, venue_order_id);
2025
2026        if self.config.support_contingent_orders
2027            && order.contingency_type().is_some()
2028            && order.contingency_type().unwrap() != ContingencyType::NoContingency
2029            && cancel_contingencies
2030        {
2031            self.cancel_contingent_orders(order);
2032        }
2033    }
2034
2035    fn update_order(
2036        &mut self,
2037        order: &mut OrderAny,
2038        quantity: Option<Quantity>,
2039        price: Option<Price>,
2040        trigger_price: Option<Price>,
2041        update_contingencies: Option<bool>,
2042    ) {
2043        let update_contingencies = update_contingencies.unwrap_or(true);
2044        let quantity = quantity.unwrap_or(order.quantity());
2045
2046        match order {
2047            OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
2048                let price = price.unwrap_or(order.price().unwrap());
2049                self.update_limit_order(order, quantity, price);
2050            }
2051            OrderAny::StopMarket(_) => {
2052                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2053                self.update_stop_market_order(order, quantity, trigger_price);
2054            }
2055            OrderAny::StopLimit(_) => {
2056                let price = price.unwrap_or(order.price().unwrap());
2057                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2058                self.update_stop_limit_order(order, quantity, price, trigger_price);
2059            }
2060            OrderAny::MarketIfTouched(_) => {
2061                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2062                self.update_market_if_touched_order(order, quantity, trigger_price);
2063            }
2064            OrderAny::LimitIfTouched(_) => {
2065                let price = price.unwrap_or(order.price().unwrap());
2066                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2067                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2068            }
2069            OrderAny::TrailingStopMarket(_) => {
2070                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2071                self.update_market_if_touched_order(order, quantity, trigger_price);
2072            }
2073            OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
2074                let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
2075                let trigger_price =
2076                    trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
2077                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2078            }
2079            _ => {
2080                panic!(
2081                    "Unsupported order type {} for update_order",
2082                    order.order_type()
2083                );
2084            }
2085        }
2086
2087        if self.config.support_contingent_orders
2088            && order
2089                .contingency_type()
2090                .is_some_and(|c| c != ContingencyType::NoContingency)
2091            && update_contingencies
2092        {
2093            self.update_contingent_order(order);
2094        }
2095    }
2096
2097    pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
2098        todo!("trigger_stop_order")
2099    }
2100
2101    fn update_contingent_order(&mut self, order: &OrderAny) {
2102        log::debug!("Updating OUO orders from {}", order.client_order_id());
2103        if let Some(linked_order_ids) = order.linked_order_ids() {
2104            for client_order_id in linked_order_ids {
2105                let mut child_order = match self.cache.borrow().order(client_order_id) {
2106                    Some(order) => order.clone(),
2107                    None => panic!("Order {client_order_id} not found in cache."),
2108                };
2109
2110                if child_order.is_active_local() {
2111                    continue;
2112                }
2113
2114                if order.leaves_qty().is_zero() {
2115                    self.cancel_order(&child_order, None);
2116                } else if child_order.leaves_qty() != order.leaves_qty() {
2117                    let price = child_order.price();
2118                    let trigger_price = child_order.trigger_price();
2119                    self.update_order(
2120                        &mut child_order,
2121                        Some(order.leaves_qty()),
2122                        price,
2123                        trigger_price,
2124                        Some(false),
2125                    );
2126                }
2127            }
2128        }
2129    }
2130
2131    fn cancel_contingent_orders(&mut self, order: &OrderAny) {
2132        if let Some(linked_order_ids) = order.linked_order_ids() {
2133            for client_order_id in linked_order_ids {
2134                let contingent_order = match self.cache.borrow().order(client_order_id) {
2135                    Some(order) => order.clone(),
2136                    None => panic!("Cannot find contingent order for {client_order_id}"),
2137                };
2138                if contingent_order.is_active_local() {
2139                    // order is not on the exchange yet
2140                    continue;
2141                }
2142                if !contingent_order.is_closed() {
2143                    self.cancel_order(&contingent_order, Some(false));
2144                }
2145            }
2146        }
2147    }
2148
2149    // -- EVENT GENERATORS -----------------------------------------------------
2150
2151    fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
2152        let ts_now = self.clock.borrow().timestamp_ns();
2153        let account_id = order
2154            .account_id()
2155            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2156
2157        let event = OrderEventAny::Rejected(OrderRejected::new(
2158            order.trader_id(),
2159            order.strategy_id(),
2160            order.instrument_id(),
2161            order.client_order_id(),
2162            account_id,
2163            reason,
2164            UUID4::new(),
2165            ts_now,
2166            ts_now,
2167            false,
2168        ));
2169        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2170    }
2171
2172    fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
2173        let ts_now = self.clock.borrow().timestamp_ns();
2174        let account_id = order
2175            .account_id()
2176            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2177        let event = OrderEventAny::Accepted(OrderAccepted::new(
2178            order.trader_id(),
2179            order.strategy_id(),
2180            order.instrument_id(),
2181            order.client_order_id(),
2182            venue_order_id,
2183            account_id,
2184            UUID4::new(),
2185            ts_now,
2186            ts_now,
2187            false,
2188        ));
2189        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2190
2191        // TODO remove this when execution engine msgbus handlers are correctly set
2192        order.apply(event).expect("Failed to apply order event");
2193    }
2194
2195    #[allow(clippy::too_many_arguments)]
2196    fn generate_order_modify_rejected(
2197        &self,
2198        trader_id: TraderId,
2199        strategy_id: StrategyId,
2200        instrument_id: InstrumentId,
2201        client_order_id: ClientOrderId,
2202        reason: Ustr,
2203        venue_order_id: Option<VenueOrderId>,
2204        account_id: Option<AccountId>,
2205    ) {
2206        let ts_now = self.clock.borrow().timestamp_ns();
2207        let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
2208            trader_id,
2209            strategy_id,
2210            instrument_id,
2211            client_order_id,
2212            reason,
2213            UUID4::new(),
2214            ts_now,
2215            ts_now,
2216            false,
2217            venue_order_id,
2218            account_id,
2219        ));
2220        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2221    }
2222
2223    #[allow(clippy::too_many_arguments)]
2224    fn generate_order_cancel_rejected(
2225        &self,
2226        trader_id: TraderId,
2227        strategy_id: StrategyId,
2228        account_id: AccountId,
2229        instrument_id: InstrumentId,
2230        client_order_id: ClientOrderId,
2231        venue_order_id: VenueOrderId,
2232        reason: Ustr,
2233    ) {
2234        let ts_now = self.clock.borrow().timestamp_ns();
2235        let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
2236            trader_id,
2237            strategy_id,
2238            instrument_id,
2239            client_order_id,
2240            reason,
2241            UUID4::new(),
2242            ts_now,
2243            ts_now,
2244            false,
2245            Some(venue_order_id),
2246            Some(account_id),
2247        ));
2248        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2249    }
2250
2251    fn generate_order_updated(
2252        &self,
2253        order: &mut OrderAny,
2254        quantity: Quantity,
2255        price: Option<Price>,
2256        trigger_price: Option<Price>,
2257    ) {
2258        let ts_now = self.clock.borrow().timestamp_ns();
2259        let event = OrderEventAny::Updated(OrderUpdated::new(
2260            order.trader_id(),
2261            order.strategy_id(),
2262            order.instrument_id(),
2263            order.client_order_id(),
2264            quantity,
2265            UUID4::new(),
2266            ts_now,
2267            ts_now,
2268            false,
2269            order.venue_order_id(),
2270            order.account_id(),
2271            price,
2272            trigger_price,
2273        ));
2274        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2275
2276        // TODO remove this when execution engine msgbus handlers are correctly set
2277        order.apply(event).expect("Failed to apply order event");
2278    }
2279
2280    fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
2281        let ts_now = self.clock.borrow().timestamp_ns();
2282        let event = OrderEventAny::Canceled(OrderCanceled::new(
2283            order.trader_id(),
2284            order.strategy_id(),
2285            order.instrument_id(),
2286            order.client_order_id(),
2287            UUID4::new(),
2288            ts_now,
2289            ts_now,
2290            false,
2291            Some(venue_order_id),
2292            order.account_id(),
2293        ));
2294        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2295    }
2296
2297    fn generate_order_triggered(&self, order: &OrderAny) {
2298        let ts_now = self.clock.borrow().timestamp_ns();
2299        let event = OrderEventAny::Triggered(OrderTriggered::new(
2300            order.trader_id(),
2301            order.strategy_id(),
2302            order.instrument_id(),
2303            order.client_order_id(),
2304            UUID4::new(),
2305            ts_now,
2306            ts_now,
2307            false,
2308            order.venue_order_id(),
2309            order.account_id(),
2310        ));
2311        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2312    }
2313
2314    fn generate_order_expired(&self, order: &OrderAny) {
2315        let ts_now = self.clock.borrow().timestamp_ns();
2316        let event = OrderEventAny::Expired(OrderExpired::new(
2317            order.trader_id(),
2318            order.strategy_id(),
2319            order.instrument_id(),
2320            order.client_order_id(),
2321            UUID4::new(),
2322            ts_now,
2323            ts_now,
2324            false,
2325            order.venue_order_id(),
2326            order.account_id(),
2327        ));
2328        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2329    }
2330
2331    #[allow(clippy::too_many_arguments)]
2332    fn generate_order_filled(
2333        &mut self,
2334        order: &mut OrderAny,
2335        venue_order_id: VenueOrderId,
2336        venue_position_id: Option<PositionId>,
2337        last_qty: Quantity,
2338        last_px: Price,
2339        quote_currency: Currency,
2340        commission: Money,
2341        liquidity_side: LiquiditySide,
2342    ) {
2343        let ts_now = self.clock.borrow().timestamp_ns();
2344        let account_id = order
2345            .account_id()
2346            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2347        let event = OrderEventAny::Filled(OrderFilled::new(
2348            order.trader_id(),
2349            order.strategy_id(),
2350            order.instrument_id(),
2351            order.client_order_id(),
2352            venue_order_id,
2353            account_id,
2354            self.ids_generator.generate_trade_id(),
2355            order.order_side(),
2356            order.order_type(),
2357            last_qty,
2358            last_px,
2359            quote_currency,
2360            liquidity_side,
2361            UUID4::new(),
2362            ts_now,
2363            ts_now,
2364            false,
2365            venue_position_id,
2366            Some(commission),
2367        ));
2368        msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2369
2370        // TODO remove this when execution engine msgbus handlers are correctly set
2371        order.apply(event).expect("Failed to apply order event");
2372    }
2373}