1pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
24
25use config::RiskEngineConfig;
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 logging::{CMD, EVT, RECV},
30 messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
31 msgbus,
32 throttler::Throttler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36 accounts::{Account, AccountAny},
37 enums::{InstrumentClass, OrderSide, OrderStatus, TimeInForce, TradingState},
38 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
39 identifiers::InstrumentId,
40 instruments::{Instrument, InstrumentAny},
41 orders::{Order, OrderAny, OrderList},
42 types::{Currency, Money, Price, Quantity},
43};
44use nautilus_portfolio::Portfolio;
45use rust_decimal::{Decimal, prelude::ToPrimitive};
46use ustr::Ustr;
47
48type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
49type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
50
51#[allow(dead_code)]
52pub struct RiskEngine {
53 clock: Rc<RefCell<dyn Clock>>,
54 cache: Rc<RefCell<Cache>>,
55 portfolio: Portfolio,
56 pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
57 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
58 max_notional_per_order: HashMap<InstrumentId, Decimal>,
59 trading_state: TradingState,
60 config: RiskEngineConfig,
61}
62
63impl Debug for RiskEngine {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct(stringify!(RiskEngine)).finish()
66 }
67}
68
69impl RiskEngine {
70 pub fn new(
71 config: RiskEngineConfig,
72 portfolio: Portfolio,
73 clock: Rc<RefCell<dyn Clock>>,
74 cache: Rc<RefCell<Cache>>,
75 ) -> Self {
76 let throttled_submit_order =
77 Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
78
79 let throttled_modify_order =
80 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
81
82 Self {
83 clock,
84 cache,
85 portfolio,
86 throttled_submit_order,
87 throttled_modify_order,
88 max_notional_per_order: HashMap::new(),
89 trading_state: TradingState::Active,
90 config,
91 }
92 }
93
94 fn create_submit_order_throttler(
95 config: &RiskEngineConfig,
96 clock: Rc<RefCell<dyn Clock>>,
97 cache: Rc<RefCell<Cache>>,
98 ) -> Throttler<SubmitOrder, SubmitOrderFn> {
99 let success_handler = {
100 Box::new(move |submit_order: SubmitOrder| {
101 msgbus::send(
102 "ExecEngine.execute".into(),
103 &TradingCommand::SubmitOrder(submit_order),
104 );
105 }) as Box<dyn Fn(SubmitOrder)>
106 };
107
108 let failure_handler = {
109 let cache = cache;
110 let clock = clock.clone();
111 Box::new(move |submit_order: SubmitOrder| {
112 let reason = "REJECTED BY THROTTLER";
113 log::warn!(
114 "SubmitOrder for {} DENIED: {}",
115 submit_order.client_order_id,
116 reason
117 );
118
119 Self::handle_submit_order_cache(&cache, &submit_order);
120
121 let denied = Self::create_order_denied(&submit_order, reason, &clock);
122
123 msgbus::send("ExecEngine.process".into(), &denied);
124 }) as Box<dyn Fn(SubmitOrder)>
125 };
126
127 Throttler::new(
128 config.max_order_submit.limit,
129 config.max_order_submit.interval_ns,
130 clock,
131 "ORDER_SUBMIT_THROTTLER".to_string(),
132 success_handler,
133 Some(failure_handler),
134 Ustr::from(&UUID4::new().to_string()),
135 )
136 }
137
138 fn create_modify_order_throttler(
139 config: &RiskEngineConfig,
140 clock: Rc<RefCell<dyn Clock>>,
141 cache: Rc<RefCell<Cache>>,
142 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
143 let success_handler = {
144 Box::new(move |order: ModifyOrder| {
145 msgbus::send(
146 "ExecEngine.execute".into(),
147 &TradingCommand::ModifyOrder(order),
148 );
149 }) as Box<dyn Fn(ModifyOrder)>
150 };
151
152 let failure_handler = {
153 let cache = cache;
154 let clock = clock.clone();
155 Box::new(move |order: ModifyOrder| {
156 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
157 log::warn!(
158 "SubmitOrder for {} DENIED: {}",
159 order.client_order_id,
160 reason
161 );
162
163 let order = match Self::get_existing_order(&cache, &order) {
164 Some(order) => order,
165 None => return,
166 };
167
168 let rejected = Self::create_modify_rejected(&order, reason, &clock);
169
170 msgbus::send("ExecEngine.process".into(), &rejected);
171 }) as Box<dyn Fn(ModifyOrder)>
172 };
173
174 Throttler::new(
175 config.max_order_modify.limit,
176 config.max_order_modify.interval_ns,
177 clock,
178 "ORDER_MODIFY_THROTTLER".to_string(),
179 success_handler,
180 Some(failure_handler),
181 Ustr::from(&UUID4::new().to_string()),
182 )
183 }
184
185 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
186 let mut cache = cache.borrow_mut();
187 if !cache.order_exists(&submit_order.client_order_id) {
188 cache
189 .add_order(submit_order.order.clone(), None, None, false)
190 .map_err(|e| {
191 log::error!("Cannot add order to cache: {e}");
192 })
193 .unwrap();
194 }
195 }
196
197 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
198 let cache = cache.borrow();
199 if let Some(order) = cache.order(&order.client_order_id) {
200 Some(order.clone())
201 } else {
202 log::error!(
203 "Order with command.client_order_id: {} not found",
204 order.client_order_id
205 );
206 None
207 }
208 }
209
210 fn create_order_denied(
211 submit_order: &SubmitOrder,
212 reason: &str,
213 clock: &Rc<RefCell<dyn Clock>>,
214 ) -> OrderEventAny {
215 let timestamp = clock.borrow().timestamp_ns();
216 OrderEventAny::Denied(OrderDenied::new(
217 submit_order.trader_id,
218 submit_order.strategy_id,
219 submit_order.instrument_id,
220 submit_order.client_order_id,
221 reason.into(),
222 UUID4::new(),
223 timestamp,
224 timestamp,
225 ))
226 }
227
228 fn create_modify_rejected(
229 order: &OrderAny,
230 reason: &str,
231 clock: &Rc<RefCell<dyn Clock>>,
232 ) -> OrderEventAny {
233 let timestamp = clock.borrow().timestamp_ns();
234 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
235 order.trader_id(),
236 order.strategy_id(),
237 order.instrument_id(),
238 order.client_order_id(),
239 reason.into(),
240 UUID4::new(),
241 timestamp,
242 timestamp,
243 false,
244 order.venue_order_id(),
245 None,
246 ))
247 }
248
249 pub fn execute(&mut self, command: TradingCommand) {
252 self.handle_command(command);
254 }
255
256 pub fn process(&mut self, event: OrderEventAny) {
257 self.handle_event(event);
259 }
260
261 pub fn set_trading_state(&mut self, state: TradingState) {
262 if state == self.trading_state {
263 log::warn!("No change to trading state: already set to {state:?}");
264 return;
265 }
266
267 self.trading_state = state;
268
269 let _ts_now = self.clock.borrow().timestamp_ns();
270
271 msgbus::publish("events.risk".into(), &"message"); log::info!("Trading state set to {state:?}");
277 }
278
279 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
280 self.max_notional_per_order.insert(instrument_id, new_value);
281
282 let new_value_str = new_value.to_string();
283 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
284 }
285
286 fn handle_command(&mut self, command: TradingCommand) {
290 if self.config.debug {
291 log::debug!("{CMD}{RECV} {command:?}");
292 }
293
294 match command {
295 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
296 TradingCommand::SubmitOrderList(submit_order_list) => {
297 self.handle_submit_order_list(submit_order_list);
298 }
299 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
300 _ => {
301 log::error!("Cannot handle command: {command}");
302 }
303 }
304 }
305
306 fn handle_submit_order(&self, command: SubmitOrder) {
307 if self.config.bypass {
308 self.send_to_execution(TradingCommand::SubmitOrder(command));
309 return;
310 }
311
312 let order = &command.order;
313 if let Some(position_id) = command.position_id {
314 if order.is_reduce_only() {
315 let position_exists = {
316 let cache = self.cache.borrow();
317 cache
318 .position(&position_id)
319 .map(|pos| (pos.side, pos.quantity))
320 };
321
322 if let Some((pos_side, pos_quantity)) = position_exists {
323 if !order.would_reduce_only(pos_side, pos_quantity) {
324 self.deny_command(
325 TradingCommand::SubmitOrder(command),
326 &format!("Reduce only order would increase position {position_id}"),
327 );
328 return; }
330 } else {
331 self.deny_command(
332 TradingCommand::SubmitOrder(command),
333 &format!("Position {position_id} not found for reduce-only order"),
334 );
335 return;
336 }
337 }
338 }
339
340 let instrument_exists = {
341 let cache = self.cache.borrow();
342 cache.instrument(&order.instrument_id()).cloned()
343 };
344
345 let instrument = if let Some(instrument) = instrument_exists {
346 instrument
347 } else {
348 self.deny_command(
349 TradingCommand::SubmitOrder(command.clone()),
350 &format!("Instrument for {} not found", command.instrument_id),
351 );
352 return; };
354
355 if !self.check_order(instrument.clone(), order.clone()) {
359 return; }
361
362 if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
363 return; }
365
366 self.execution_gateway(instrument, TradingCommand::SubmitOrder(command.clone()));
367 }
368
369 fn handle_submit_order_list(&self, command: SubmitOrderList) {
370 if self.config.bypass {
371 self.send_to_execution(TradingCommand::SubmitOrderList(command));
372 return;
373 }
374
375 let instrument_exists = {
376 let cache = self.cache.borrow();
377 cache.instrument(&command.instrument_id).cloned()
378 };
379
380 let instrument = if let Some(instrument) = instrument_exists {
381 instrument
382 } else {
383 self.deny_command(
384 TradingCommand::SubmitOrderList(command.clone()),
385 &format!("no instrument found for {}", command.instrument_id),
386 );
387 return; };
389
390 for order in command.order_list.orders.clone() {
394 if !self.check_order(instrument.clone(), order) {
395 return; }
397 }
398
399 if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
400 self.deny_order_list(
401 command.order_list.clone(),
402 &format!("OrderList {} DENIED", command.order_list.id),
403 );
404 return; }
406
407 self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
408 }
409
410 fn handle_modify_order(&self, command: ModifyOrder) {
411 let order_exists = {
415 let cache = self.cache.borrow();
416 cache.order(&command.client_order_id).cloned()
417 };
418
419 let order = if let Some(order) = order_exists {
420 order
421 } else {
422 log::error!(
423 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
424 command.client_order_id
425 );
426 return;
427 };
428
429 if order.is_closed() {
430 self.reject_modify_order(
431 order,
432 &format!(
433 "Order with command.client_order_id: {} already closed",
434 command.client_order_id
435 ),
436 );
437 return;
438 } else if order.status() == OrderStatus::PendingCancel {
439 self.reject_modify_order(
440 order,
441 &format!(
442 "Order with command.client_order_id: {} is already pending cancel",
443 command.client_order_id
444 ),
445 );
446 return;
447 }
448
449 let maybe_instrument = {
451 let cache = self.cache.borrow();
452 cache.instrument(&command.instrument_id).cloned()
453 };
454
455 let instrument = if let Some(instrument) = maybe_instrument {
456 instrument
457 } else {
458 self.reject_modify_order(
459 order,
460 &format!("no instrument found for {}", command.instrument_id),
461 );
462 return; };
464
465 let mut risk_msg = self.check_price(&instrument, command.price);
467 if let Some(risk_msg) = risk_msg {
468 self.reject_modify_order(order, &risk_msg);
469 return; }
471
472 risk_msg = self.check_price(&instrument, command.trigger_price);
474 if let Some(risk_msg) = risk_msg {
475 self.reject_modify_order(order, &risk_msg);
476 return; }
478
479 risk_msg = self.check_quantity(&instrument, command.quantity);
481 if let Some(risk_msg) = risk_msg {
482 self.reject_modify_order(order, &risk_msg);
483 return; }
485
486 match self.trading_state {
488 TradingState::Halted => {
489 self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
490 }
491 TradingState::Reducing => {
492 if let Some(quantity) = command.quantity {
493 if quantity > order.quantity()
494 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
495 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
496 {
497 self.reject_modify_order(
498 order,
499 &format!(
500 "TradingState is REDUCING and update will increase exposure {}",
501 instrument.id()
502 ),
503 );
504 }
505 }
506 }
507 _ => {}
508 }
509
510 }
513
514 fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
517 if order.time_in_force() == TimeInForce::Gtd {
521 let expire_time = order.expire_time().unwrap();
523 if expire_time <= self.clock.borrow().timestamp_ns() {
524 self.deny_order(
525 order,
526 &format!("GTD {} already past", expire_time.to_rfc3339()),
527 );
528 return false; }
530 }
531
532 if !self.check_order_price(instrument.clone(), order.clone())
533 || !self.check_order_quantity(instrument, order)
534 {
535 return false; }
537
538 true
539 }
540
541 fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
542 if order.price().is_some() {
546 let risk_msg = self.check_price(&instrument, order.price());
547 if let Some(risk_msg) = risk_msg {
548 self.deny_order(order, &risk_msg);
549 return false; }
551 }
552
553 if order.trigger_price().is_some() {
557 let risk_msg = self.check_price(&instrument, order.trigger_price());
558 if let Some(risk_msg) = risk_msg {
559 self.deny_order(order, &risk_msg);
560 return false; }
562 }
563
564 true
565 }
566
567 fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
568 let risk_msg = self.check_quantity(&instrument, Some(order.quantity()));
569 if let Some(risk_msg) = risk_msg {
570 self.deny_order(order, &risk_msg);
571 return false; }
573
574 true
575 }
576
577 fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
578 let mut last_px: Option<Price> = None;
582 let mut max_notional: Option<Money> = None;
583
584 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
586 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
587 max_notional = Some(Money::new(
588 max_notional_setting_val
589 .to_f64()
590 .expect("Invalid decimal conversion"),
591 instrument.quote_currency(),
592 ));
593 }
594
595 let account_exists = {
597 let cache = self.cache.borrow();
598 cache.account_for_venue(&instrument.id().venue).cloned()
599 };
600
601 let account = if let Some(account) = account_exists {
602 account
603 } else {
604 log::debug!("Cannot find account for venue {}", instrument.id().venue);
605 return true; };
607 let cash_account = match account {
608 AccountAny::Cash(cash_account) => cash_account,
609 AccountAny::Margin(_) => return true, };
611 let free = cash_account.balance_free(Some(instrument.quote_currency()));
612 if self.config.debug {
613 log::debug!("Free cash: {free:?}");
614 }
615
616 let mut cum_notional_buy: Option<Money> = None;
617 let mut cum_notional_sell: Option<Money> = None;
618 let mut base_currency: Option<Currency> = None;
619 for order in &orders {
620 last_px = match order {
622 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
623 if last_px.is_none() {
624 let cache = self.cache.borrow();
625 if let Some(last_quote) = cache.quote(&instrument.id()) {
626 match order.order_side() {
627 OrderSide::Buy => Some(last_quote.ask_price),
628 OrderSide::Sell => Some(last_quote.bid_price),
629 _ => panic!("Invalid order side"),
630 }
631 } else {
632 let cache = self.cache.borrow();
633 let last_trade = cache.trade(&instrument.id());
634
635 if let Some(last_trade) = last_trade {
636 Some(last_trade.price)
637 } else {
638 log::warn!(
639 "Cannot check MARKET order risk: no prices for {}",
640 instrument.id()
641 );
642 continue;
643 }
644 }
645 } else {
646 last_px
647 }
648 }
649 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
650 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
651 if let Some(trigger_price) = order.trigger_price() {
652 Some(trigger_price)
653 } else {
654 log::warn!(
655 "Cannot check {} order risk: no trigger price was set", order.order_type()
657 );
658 continue;
659 }
660 }
661 _ => order.price(),
662 };
663
664 let last_px = if let Some(px) = last_px {
665 px
666 } else {
667 log::error!("Cannot check order risk: no price available");
668 continue;
669 };
670
671 let notional =
672 instrument.calculate_notional_value(order.quantity(), last_px, Some(true));
673
674 if self.config.debug {
675 log::debug!("Notional: {notional:?}");
676 }
677
678 if let Some(max_notional_value) = max_notional {
680 if notional > max_notional_value {
681 self.deny_order(
682 order.clone(),
683 &format!(
684 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
685 ),
686 );
687 return false; }
689 }
690
691 if let Some(min_notional) = instrument.min_notional() {
693 if notional.currency == min_notional.currency && notional < min_notional {
694 self.deny_order(
695 order.clone(),
696 &format!(
697 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
698 ),
699 );
700 return false; }
702 }
703
704 if let Some(max_notional) = instrument.max_notional() {
706 if notional.currency == max_notional.currency && notional > max_notional {
707 self.deny_order(
708 order.clone(),
709 &format!(
710 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
711 ),
712 );
713 return false; }
715 }
716
717 let notional = instrument.calculate_notional_value(order.quantity(), last_px, None);
719 let order_balance_impact = match order.order_side() {
720 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
721 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
722 OrderSide::NoOrderSide => {
723 panic!("invalid `OrderSide`, was {}", order.order_side());
724 }
725 };
726
727 if self.config.debug {
728 log::debug!("Balance impact: {order_balance_impact}");
729 }
730
731 if let Some(free_val) = free {
732 if (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO {
733 self.deny_order(
734 order.clone(),
735 &format!(
736 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
737 ),
738 );
739 return false;
740 }
741 }
742
743 if base_currency.is_none() {
744 base_currency = instrument.base_currency();
745 }
746 if order.is_buy() {
747 match cum_notional_buy.as_mut() {
748 Some(cum_notional_buy_val) => {
749 cum_notional_buy_val.raw += -order_balance_impact.raw;
750 }
751 None => {
752 cum_notional_buy = Some(Money::from_raw(
753 -order_balance_impact.raw,
754 order_balance_impact.currency,
755 ));
756 }
757 }
758
759 if self.config.debug {
760 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
761 }
762
763 if let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy) {
764 if cum_notional_buy > free {
765 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
766 return false; }
768 }
769 } else if order.is_sell() {
770 if cash_account.base_currency.is_some() {
771 match cum_notional_sell.as_mut() {
772 Some(cum_notional_buy_val) => {
773 cum_notional_buy_val.raw += order_balance_impact.raw;
774 }
775 None => {
776 cum_notional_sell = Some(Money::from_raw(
777 order_balance_impact.raw,
778 order_balance_impact.currency,
779 ));
780 }
781 }
782 if self.config.debug {
783 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
784 }
785
786 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell) {
787 if cum_notional_sell > free {
788 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
789 return false; }
791 }
792 }
793 else if let Some(base_currency) = base_currency {
795 let cash_value = Money::from_raw(
796 order
797 .quantity()
798 .raw
799 .try_into()
800 .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
801 .unwrap(),
802 base_currency,
803 );
804
805 if self.config.debug {
806 log::debug!("Cash value: {cash_value:?}");
807 log::debug!(
808 "Total: {:?}",
809 cash_account.balance_total(Some(base_currency))
810 );
811 log::debug!(
812 "Locked: {:?}",
813 cash_account.balance_locked(Some(base_currency))
814 );
815 log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
816 }
817
818 match cum_notional_sell {
819 Some(mut cum_notional_sell) => {
820 cum_notional_sell.raw += cash_value.raw;
821 }
822 None => cum_notional_sell = Some(cash_value),
823 }
824
825 if self.config.debug {
826 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
827 }
828 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell) {
829 if cum_notional_sell.raw > free.raw {
830 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
831 return false; }
833 }
834 }
835 }
836 }
837
838 true }
841
842 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
843 let price_val = price?;
844
845 if price_val.precision > instrument.price_precision() {
846 return Some(format!(
847 "price {} invalid (precision {} > {})",
848 price_val,
849 price_val.precision,
850 instrument.price_precision()
851 ));
852 }
853
854 if instrument.instrument_class() != InstrumentClass::Option && price_val.raw <= 0 {
855 return Some(format!("price {price_val} invalid (<= 0)"));
856 }
857
858 None
859 }
860
861 fn check_quantity(
862 &self,
863 instrument: &InstrumentAny,
864 quantity: Option<Quantity>,
865 ) -> Option<String> {
866 let quantity_val = quantity?;
867
868 if quantity_val.precision > instrument.size_precision() {
870 return Some(format!(
871 "quantity {} invalid (precision {} > {})",
872 quantity_val,
873 quantity_val.precision,
874 instrument.size_precision()
875 ));
876 }
877
878 if let Some(max_quantity) = instrument.max_quantity() {
880 if quantity_val > max_quantity {
881 return Some(format!(
882 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
883 ));
884 }
885 }
886
887 if let Some(min_quantity) = instrument.min_quantity() {
889 if quantity_val < min_quantity {
890 return Some(format!(
891 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
892 ));
893 }
894 }
895
896 None
897 }
898
899 fn deny_command(&self, command: TradingCommand, reason: &str) {
902 match command {
903 TradingCommand::SubmitOrder(submit_order) => {
904 self.deny_order(submit_order.order, reason);
905 }
906 TradingCommand::SubmitOrderList(submit_order_list) => {
907 self.deny_order_list(submit_order_list.order_list, reason);
908 }
909 _ => {
910 panic!("Cannot deny command {command}");
911 }
912 }
913 }
914
915 fn deny_order(&self, order: OrderAny, reason: &str) {
916 log::warn!(
917 "SubmitOrder for {} DENIED: {}",
918 order.client_order_id(),
919 reason
920 );
921
922 if order.status() != OrderStatus::Initialized {
923 return;
924 }
925
926 let mut cache = self.cache.borrow_mut();
927 if !cache.order_exists(&order.client_order_id()) {
928 cache
929 .add_order(order.clone(), None, None, false)
930 .map_err(|e| {
931 log::error!("Cannot add order to cache: {e}");
932 })
933 .unwrap();
934 }
935
936 let denied = OrderEventAny::Denied(OrderDenied::new(
937 order.trader_id(),
938 order.strategy_id(),
939 order.instrument_id(),
940 order.client_order_id(),
941 reason.into(),
942 UUID4::new(),
943 self.clock.borrow().timestamp_ns(),
944 self.clock.borrow().timestamp_ns(),
945 ));
946
947 msgbus::send("ExecEngine.process".into(), &denied);
948 }
949
950 fn deny_order_list(&self, order_list: OrderList, reason: &str) {
951 for order in order_list.orders {
952 if !order.is_closed() {
953 self.deny_order(order, reason);
954 }
955 }
956 }
957
958 fn reject_modify_order(&self, order: OrderAny, reason: &str) {
959 let ts_event = self.clock.borrow().timestamp_ns();
960 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
961 order.trader_id(),
962 order.strategy_id(),
963 order.instrument_id(),
964 order.client_order_id(),
965 reason.into(),
966 UUID4::new(),
967 ts_event,
968 ts_event,
969 false,
970 order.venue_order_id(),
971 order.account_id(),
972 ));
973
974 msgbus::send("ExecEngine.process".into(), &denied);
975 }
976
977 fn execution_gateway(&self, instrument: InstrumentAny, command: TradingCommand) {
980 match self.trading_state {
981 TradingState::Halted => match command {
982 TradingCommand::SubmitOrder(submit_order) => {
983 self.deny_order(submit_order.order, "TradingState::HALTED");
984 }
985 TradingCommand::SubmitOrderList(submit_order_list) => {
986 self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
987 }
988 _ => {}
989 },
990 TradingState::Reducing => match command {
991 TradingCommand::SubmitOrder(submit_order) => {
992 let order = submit_order.order;
993 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
994 self.deny_order(
995 order,
996 &format!(
997 "BUY when TradingState::REDUCING and LONG {}",
998 instrument.id()
999 ),
1000 );
1001 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1002 self.deny_order(
1003 order,
1004 &format!(
1005 "SELL when TradingState::REDUCING and SHORT {}",
1006 instrument.id()
1007 ),
1008 );
1009 return;
1010 }
1011 }
1012 TradingCommand::SubmitOrderList(submit_order_list) => {
1013 let order_list = submit_order_list.order_list;
1014 for order in &order_list.orders {
1015 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1016 self.deny_order_list(
1017 order_list,
1018 &format!(
1019 "BUY when TradingState::REDUCING and LONG {}",
1020 instrument.id()
1021 ),
1022 );
1023 return;
1024 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1025 self.deny_order_list(
1026 order_list,
1027 &format!(
1028 "SELL when TradingState::REDUCING and SHORT {}",
1029 instrument.id()
1030 ),
1031 );
1032 return;
1033 }
1034 }
1035 }
1036 _ => {}
1037 },
1038 TradingState::Active => match command {
1039 TradingCommand::SubmitOrder(_submit_order) => {
1040 }
1043 TradingCommand::SubmitOrderList(_submit_order_list) => {
1044 todo!("NOT IMPLEMENTED");
1045 }
1046 _ => {}
1047 },
1048 }
1049 }
1050
1051 fn send_to_execution(&self, command: TradingCommand) {
1052 msgbus::send("ExecEngine.execute".into(), &command);
1053 }
1054
1055 fn handle_event(&mut self, event: OrderEventAny) {
1056 if self.config.debug {
1059 log::debug!("{RECV}{EVT} {event:?}");
1060 }
1061 }
1062}