nautilus_execution/order_manager/
manager.rs1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 logging::{CMD, EVT, SEND},
26 messages::execution::{SubmitOrder, TradingCommand},
27 msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31 enums::{ContingencyType, TriggerType},
32 events::{
33 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34 },
35 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36 orders::{Order, OrderAny},
37 types::Quantity,
38};
39
40pub struct OrderManager {
47 clock: Rc<RefCell<dyn Clock>>,
48 cache: Rc<RefCell<Cache>>,
49 active_local: bool,
50 submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
54}
55
56impl Debug for OrderManager {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct(stringify!(OrderManager))
59 .field("pending_commands", &self.submit_order_commands.len())
60 .finish()
61 }
62}
63
64impl OrderManager {
65 pub fn new(
67 clock: Rc<RefCell<dyn Clock>>,
68 cache: Rc<RefCell<Cache>>,
69 active_local: bool,
70 ) -> Self {
74 Self {
75 clock,
76 cache,
77 active_local,
78 submit_order_commands: HashMap::new(),
82 }
83 }
84
85 #[must_use]
98 pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
100 self.submit_order_commands.clone()
101 }
102
103 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
105 self.submit_order_commands
106 .insert(command.order.client_order_id(), command);
107 }
108
109 pub fn pop_submit_order_command(
111 &mut self,
112 client_order_id: ClientOrderId,
113 ) -> Option<SubmitOrder> {
114 self.submit_order_commands.remove(&client_order_id)
115 }
116
117 pub fn reset(&mut self) {
119 self.submit_order_commands.clear();
120 }
121
122 pub fn cancel_order(&mut self, order: &OrderAny) {
124 if self
125 .cache
126 .borrow()
127 .is_order_pending_cancel_local(&order.client_order_id())
128 {
129 return;
130 }
131
132 if order.is_closed() {
133 log::warn!("Cannot cancel order: already closed");
134 return;
135 }
136
137 self.submit_order_commands.remove(&order.client_order_id());
138
139 }
143
144 pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
146 }
150
151 pub fn create_new_submit_order(
155 &mut self,
156 order: &OrderAny,
157 position_id: Option<PositionId>,
158 client_id: Option<ClientId>,
159 ) -> anyhow::Result<()> {
160 let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
161 let venue_order_id = order
162 .venue_order_id()
163 .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
164
165 let submit = SubmitOrder::new(
166 order.trader_id(),
167 client_id,
168 order.strategy_id(),
169 order.instrument_id(),
170 order.client_order_id(),
171 venue_order_id,
172 order.clone(),
173 order.exec_algorithm_id(),
174 position_id,
175 UUID4::new(),
176 self.clock.borrow().timestamp_ns(),
177 )?;
178
179 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
180 self.cache_submit_order_command(submit.clone());
181
182 match order.exec_algorithm_id() {
183 Some(exec_algorithm_id) => {
184 self.send_algo_command(submit, exec_algorithm_id);
185 }
186 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
187 }
188 } Ok(())
193 }
194
195 #[must_use]
196 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
198 self.active_local && order.is_active_local()
199 }
200
201 pub fn handle_event(&mut self, event: OrderEventAny) {
204 match event {
205 OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
206 OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
207 OrderEventAny::Expired(event) => self.handle_order_expired(event),
208 OrderEventAny::Updated(event) => self.handle_order_updated(event),
209 OrderEventAny::Filled(event) => self.handle_order_filled(event),
210 _ => self.handle_position_event(event),
211 }
212 }
213
214 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
216 let cloned_order = self
217 .cache
218 .borrow()
219 .order(&rejected.client_order_id)
220 .cloned();
221 if let Some(order) = cloned_order {
222 if order.contingency_type() != Some(ContingencyType::NoContingency) {
223 self.handle_contingencies(order);
224 }
225 } else {
226 log::error!(
227 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
228 rejected.client_order_id,
229 rejected
230 );
231 }
232 }
233
234 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
235 let cloned_order = self
236 .cache
237 .borrow()
238 .order(&canceled.client_order_id)
239 .cloned();
240 if let Some(order) = cloned_order {
241 if order.contingency_type() != Some(ContingencyType::NoContingency) {
242 self.handle_contingencies(order);
243 }
244 } else {
245 log::error!(
246 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
247 canceled.client_order_id,
248 canceled
249 );
250 }
251 }
252
253 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
254 let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
255 if let Some(order) = cloned_order {
256 if order.contingency_type() != Some(ContingencyType::NoContingency) {
257 self.handle_contingencies(order);
258 }
259 } else {
260 log::error!(
261 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
262 expired.client_order_id,
263 expired
264 );
265 }
266 }
267
268 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
269 let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
270 if let Some(order) = cloned_order {
271 if order.contingency_type() != Some(ContingencyType::NoContingency) {
272 self.handle_contingencies_update(order);
273 }
274 } else {
275 log::error!(
276 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
277 updated.client_order_id,
278 updated
279 );
280 }
281 }
282
283 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
287 let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
288 {
289 order
290 } else {
291 log::error!(
292 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
293 filled.client_order_id,
294 filled
295 );
296 return;
297 };
298
299 match order.contingency_type() {
300 Some(ContingencyType::Oto) => {
301 let position_id = self
302 .cache
303 .borrow()
304 .position_id(&order.client_order_id())
305 .copied();
306 let client_id = self
307 .cache
308 .borrow()
309 .client_id(&order.client_order_id())
310 .copied();
311
312 let parent_filled_qty = match order.exec_spawn_id() {
313 Some(spawn_id) => {
314 if let Some(qty) = self
315 .cache
316 .borrow()
317 .exec_spawn_total_filled_qty(&spawn_id, true)
318 {
319 qty
320 } else {
321 log::error!("Failed to get spawn filled quantity for {spawn_id}");
322 return;
323 }
324 }
325 None => order.filled_qty(),
326 };
327
328 let linked_orders = if let Some(orders) = order.linked_order_ids() {
329 orders
330 } else {
331 log::error!("No linked orders found for OTO order");
332 return;
333 };
334
335 for client_order_id in linked_orders {
336 let mut child_order =
337 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
338 order
339 } else {
340 panic!(
341 "Cannot find OTO child order for client_order_id: {client_order_id}"
342 );
343 };
344
345 if !self.should_manage_order(&child_order) {
346 continue;
347 }
348
349 if child_order.position_id().is_none() {
350 child_order.set_position_id(position_id);
351 }
352
353 if parent_filled_qty != child_order.leaves_qty() {
354 self.modify_order_quantity(&mut child_order, parent_filled_qty);
355 }
356
357 if !self
362 .submit_order_commands
363 .contains_key(&child_order.client_order_id())
364 {
365 if let Err(e) =
366 self.create_new_submit_order(&child_order, position_id, client_id)
367 {
368 log::error!("Failed to create new submit order: {e}");
369 }
370 }
371 }
372 }
373 Some(ContingencyType::Oco) => {
374 let linked_orders = if let Some(orders) = order.linked_order_ids() {
375 orders
376 } else {
377 log::error!("No linked orders found for OCO order");
378 return;
379 };
380
381 for client_order_id in linked_orders {
382 let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
383 {
384 Some(contingent_order) => contingent_order,
385 None => {
386 panic!(
387 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
388 );
389 }
390 };
391
392 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
394 {
395 continue;
396 }
397 if contingent_order.client_order_id() != order.client_order_id() {
398 self.cancel_order(&contingent_order);
399 }
400 }
401 }
402 Some(ContingencyType::Ouo) => self.handle_contingencies(order),
403 _ => {}
404 }
405 }
406
407 pub fn handle_contingencies(&mut self, order: OrderAny) {
411 let (filled_qty, leaves_qty, is_spawn_active) =
412 if let Some(exec_spawn_id) = order.exec_spawn_id() {
413 if let (Some(filled), Some(leaves)) = (
414 self.cache
415 .borrow()
416 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
417 self.cache
418 .borrow()
419 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
420 ) {
421 (filled, leaves, leaves.raw > 0)
422 } else {
423 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
424 return;
425 }
426 } else {
427 (order.filled_qty(), order.leaves_qty(), false)
428 };
429
430 let linked_orders = if let Some(orders) = order.linked_order_ids() {
431 orders
432 } else {
433 log::error!("No linked orders found");
434 return;
435 };
436
437 for client_order_id in linked_orders {
438 let mut contingent_order =
439 if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
440 order
441 } else {
442 panic!("Cannot find contingent order for client_order_id: {client_order_id}");
443 };
444
445 if !self.should_manage_order(&contingent_order)
446 || client_order_id == &order.client_order_id()
447 {
448 continue;
449 }
450
451 if contingent_order.is_closed() {
452 self.submit_order_commands.remove(&order.client_order_id());
453 continue;
454 }
455
456 match order.contingency_type() {
457 Some(ContingencyType::Oto) => {
458 if order.is_closed()
459 && filled_qty.raw == 0
460 && (order.exec_spawn_id().is_none() || !is_spawn_active)
461 {
462 self.cancel_order(&contingent_order);
463 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
464 self.modify_order_quantity(&mut contingent_order, filled_qty);
465 }
466 }
467 Some(ContingencyType::Oco) => {
468 if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
469 self.cancel_order(&contingent_order);
470 }
471 }
472 Some(ContingencyType::Ouo) => {
473 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
474 || (order.is_closed()
475 && (order.exec_spawn_id().is_none() || !is_spawn_active))
476 {
477 self.cancel_order(&contingent_order);
478 } else if leaves_qty != contingent_order.leaves_qty() {
479 self.modify_order_quantity(&mut contingent_order, leaves_qty);
480 }
481 }
482 _ => {}
483 }
484 }
485 }
486
487 pub fn handle_contingencies_update(&mut self, order: OrderAny) {
491 let quantity = match order.exec_spawn_id() {
492 Some(exec_spawn_id) => {
493 if let Some(qty) = self
494 .cache
495 .borrow()
496 .exec_spawn_total_quantity(&exec_spawn_id, true)
497 {
498 qty
499 } else {
500 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
501 return;
502 }
503 }
504 None => order.quantity(),
505 };
506
507 if quantity.raw == 0 {
508 return;
509 }
510
511 let linked_orders = if let Some(orders) = order.linked_order_ids() {
512 orders
513 } else {
514 log::error!("No linked orders found for contingent order");
515 return;
516 };
517
518 for client_order_id in linked_orders {
519 let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
520 Some(contingent_order) => contingent_order,
521 None => panic!(
522 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
523 ),
524 };
525
526 if !self.should_manage_order(&contingent_order)
527 || client_order_id == &order.client_order_id()
528 || contingent_order.is_closed()
529 {
530 continue;
531 }
532
533 if let Some(contingency_type) = order.contingency_type() {
534 if matches!(
535 contingency_type,
536 ContingencyType::Oto | ContingencyType::Oco
537 ) && quantity != contingent_order.quantity()
538 {
539 self.modify_order_quantity(&mut contingent_order, quantity);
540 }
541 }
542 }
543 }
544
545 pub fn handle_position_event(&mut self, _event: OrderEventAny) {
546 todo!()
547 }
548
549 pub fn send_emulator_command(&self, command: TradingCommand) {
551 log::info!("{CMD}{SEND} {command}");
552
553 msgbus::send_any("OrderEmulator.execute".into(), &command);
554 }
555
556 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
557 log::info!("{CMD}{SEND} {command}");
558
559 let endpoint = format!("{exec_algorithm_id}.execute");
560 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
561 }
562
563 pub fn send_risk_command(&self, command: TradingCommand) {
564 log::info!("{CMD}{SEND} {command}");
565 msgbus::send_any("RiskEngine.execute".into(), &command);
566 }
567
568 pub fn send_exec_command(&self, command: TradingCommand) {
569 log::info!("{CMD}{SEND} {command}");
570 msgbus::send_any("ExecEngine.execute".into(), &command);
571 }
572
573 pub fn send_risk_event(&self, event: OrderEventAny) {
574 log::info!("{EVT}{SEND} {event}");
575 msgbus::send_any("RiskEngine.process".into(), &event);
576 }
577
578 pub fn send_exec_event(&self, event: OrderEventAny) {
579 log::info!("{EVT}{SEND} {event}");
580 msgbus::send_any("ExecEngine.process".into(), &event);
581 }
582}