nautilus_execution/order_manager/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
21
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    logging::{CMD, EVT, SEND},
26    messages::execution::{SubmitOrder, TradingCommand},
27    msgbus,
28};
29use nautilus_core::UUID4;
30use nautilus_model::{
31    enums::{ContingencyType, TriggerType},
32    events::{
33        OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
34    },
35    identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
36    orders::{Order, OrderAny},
37    types::Quantity,
38};
39
40/// Manages the lifecycle and state of orders with contingency handling.
41///
42/// The order manager is responsible for managing local order state, handling
43/// contingent orders (OTO, OCO, OUO), and coordinating with emulation and
44/// execution systems. It tracks order commands and manages complex order
45/// relationships for advanced order types.
46pub struct OrderManager {
47    clock: Rc<RefCell<dyn Clock>>,
48    cache: Rc<RefCell<Cache>>,
49    active_local: bool,
50    // submit_order_handler: Option<SubmitOrderHandlerAny>,
51    // cancel_order_handler: Option<CancelOrderHandlerAny>,
52    // modify_order_handler: Option<ModifyOrderHandlerAny>,
53    submit_order_commands: HashMap<ClientOrderId, SubmitOrder>,
54}
55
56impl Debug for OrderManager {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct(stringify!(OrderManager))
59            .field("pending_commands", &self.submit_order_commands.len())
60            .finish()
61    }
62}
63
64impl OrderManager {
65    /// Creates a new [`OrderManager`] instance.
66    pub fn new(
67        clock: Rc<RefCell<dyn Clock>>,
68        cache: Rc<RefCell<Cache>>,
69        active_local: bool,
70        // submit_order_handler: Option<SubmitOrderHandlerAny>,
71        // cancel_order_handler: Option<CancelOrderHandlerAny>,
72        // modify_order_handler: Option<ModifyOrderHandlerAny>,
73    ) -> Self {
74        Self {
75            clock,
76            cache,
77            active_local,
78            // submit_order_handler,
79            // cancel_order_handler,
80            // modify_order_handler,
81            submit_order_commands: HashMap::new(),
82        }
83    }
84
85    // pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
86    //     self.submit_order_handler = Some(handler);
87    // }
88    //
89    // pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
90    //     self.cancel_order_handler = Some(handler);
91    // }
92    //
93    // pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
94    //     self.modify_order_handler = Some(handler);
95    // }
96
97    #[must_use]
98    /// Returns a copy of all cached submit order commands.
99    pub fn get_submit_order_commands(&self) -> HashMap<ClientOrderId, SubmitOrder> {
100        self.submit_order_commands.clone()
101    }
102
103    /// Caches a submit order command for later processing.
104    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
105        self.submit_order_commands
106            .insert(command.order.client_order_id(), command);
107    }
108
109    /// Removes and returns a cached submit order command.
110    pub fn pop_submit_order_command(
111        &mut self,
112        client_order_id: ClientOrderId,
113    ) -> Option<SubmitOrder> {
114        self.submit_order_commands.remove(&client_order_id)
115    }
116
117    /// Resets the order manager by clearing all cached commands.
118    pub fn reset(&mut self) {
119        self.submit_order_commands.clear();
120    }
121
122    /// Cancels an order if it's not already pending cancellation or closed.
123    pub fn cancel_order(&mut self, order: &OrderAny) {
124        if self
125            .cache
126            .borrow()
127            .is_order_pending_cancel_local(&order.client_order_id())
128        {
129            return;
130        }
131
132        if order.is_closed() {
133            log::warn!("Cannot cancel order: already closed");
134            return;
135        }
136
137        self.submit_order_commands.remove(&order.client_order_id());
138
139        // if let Some(handler) = &self.cancel_order_handler {
140        //     handler.handle_cancel_order(order);
141        // }
142    }
143
144    /// Modifies the quantity of an existing order.
145    pub const fn modify_order_quantity(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
146        // if let Some(handler) = &self.modify_order_handler {
147        //     handler.handle_modify_order(order, new_quantity);
148        // }
149    }
150
151    /// # Errors
152    ///
153    /// Returns an error if creating a new submit order fails.
154    pub fn create_new_submit_order(
155        &mut self,
156        order: &OrderAny,
157        position_id: Option<PositionId>,
158        client_id: Option<ClientId>,
159    ) -> anyhow::Result<()> {
160        let client_id = client_id.ok_or_else(|| anyhow::anyhow!("Client ID is required"))?;
161        let venue_order_id = order
162            .venue_order_id()
163            .ok_or_else(|| anyhow::anyhow!("Venue order ID is required"))?;
164
165        let submit = SubmitOrder::new(
166            order.trader_id(),
167            client_id,
168            order.strategy_id(),
169            order.instrument_id(),
170            order.client_order_id(),
171            venue_order_id,
172            order.clone(),
173            order.exec_algorithm_id(),
174            position_id,
175            UUID4::new(),
176            self.clock.borrow().timestamp_ns(),
177        )?;
178
179        if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
180            self.cache_submit_order_command(submit.clone());
181
182            match order.exec_algorithm_id() {
183                Some(exec_algorithm_id) => {
184                    self.send_algo_command(submit, exec_algorithm_id);
185                }
186                None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
187            }
188        } // else if let Some(handler) = &self.submit_order_handler {
189        //     handler.handle_submit_order(submit);
190        // }
191
192        Ok(())
193    }
194
195    #[must_use]
196    /// Returns true if the order manager should manage the given order.
197    pub fn should_manage_order(&self, order: &OrderAny) -> bool {
198        self.active_local && order.is_active_local()
199    }
200
201    // Event Handlers
202    /// Handles an order event by routing it to the appropriate handler method.
203    pub fn handle_event(&mut self, event: OrderEventAny) {
204        match event {
205            OrderEventAny::Rejected(event) => self.handle_order_rejected(event),
206            OrderEventAny::Canceled(event) => self.handle_order_canceled(event),
207            OrderEventAny::Expired(event) => self.handle_order_expired(event),
208            OrderEventAny::Updated(event) => self.handle_order_updated(event),
209            OrderEventAny::Filled(event) => self.handle_order_filled(event),
210            _ => self.handle_position_event(event),
211        }
212    }
213
214    /// Handles an order rejected event and manages any contingent orders.
215    pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
216        let cloned_order = self
217            .cache
218            .borrow()
219            .order(&rejected.client_order_id)
220            .cloned();
221        if let Some(order) = cloned_order {
222            if order.contingency_type() != Some(ContingencyType::NoContingency) {
223                self.handle_contingencies(order);
224            }
225        } else {
226            log::error!(
227                "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
228                rejected.client_order_id,
229                rejected
230            );
231        }
232    }
233
234    pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
235        let cloned_order = self
236            .cache
237            .borrow()
238            .order(&canceled.client_order_id)
239            .cloned();
240        if let Some(order) = cloned_order {
241            if order.contingency_type() != Some(ContingencyType::NoContingency) {
242                self.handle_contingencies(order);
243            }
244        } else {
245            log::error!(
246                "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
247                canceled.client_order_id,
248                canceled
249            );
250        }
251    }
252
253    pub fn handle_order_expired(&mut self, expired: OrderExpired) {
254        let cloned_order = self.cache.borrow().order(&expired.client_order_id).cloned();
255        if let Some(order) = cloned_order {
256            if order.contingency_type() != Some(ContingencyType::NoContingency) {
257                self.handle_contingencies(order);
258            }
259        } else {
260            log::error!(
261                "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
262                expired.client_order_id,
263                expired
264            );
265        }
266    }
267
268    pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
269        let cloned_order = self.cache.borrow().order(&updated.client_order_id).cloned();
270        if let Some(order) = cloned_order {
271            if order.contingency_type() != Some(ContingencyType::NoContingency) {
272                self.handle_contingencies_update(order);
273            }
274        } else {
275            log::error!(
276                "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
277                updated.client_order_id,
278                updated
279            );
280        }
281    }
282
283    /// # Panics
284    ///
285    /// Panics if the OTO child order cannot be found for the given client order ID.
286    pub fn handle_order_filled(&mut self, filled: OrderFilled) {
287        let order = if let Some(order) = self.cache.borrow().order(&filled.client_order_id).cloned()
288        {
289            order
290        } else {
291            log::error!(
292                "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
293                filled.client_order_id,
294                filled
295            );
296            return;
297        };
298
299        match order.contingency_type() {
300            Some(ContingencyType::Oto) => {
301                let position_id = self
302                    .cache
303                    .borrow()
304                    .position_id(&order.client_order_id())
305                    .copied();
306                let client_id = self
307                    .cache
308                    .borrow()
309                    .client_id(&order.client_order_id())
310                    .copied();
311
312                let parent_filled_qty = match order.exec_spawn_id() {
313                    Some(spawn_id) => {
314                        if let Some(qty) = self
315                            .cache
316                            .borrow()
317                            .exec_spawn_total_filled_qty(&spawn_id, true)
318                        {
319                            qty
320                        } else {
321                            log::error!("Failed to get spawn filled quantity for {spawn_id}");
322                            return;
323                        }
324                    }
325                    None => order.filled_qty(),
326                };
327
328                let linked_orders = if let Some(orders) = order.linked_order_ids() {
329                    orders
330                } else {
331                    log::error!("No linked orders found for OTO order");
332                    return;
333                };
334
335                for client_order_id in linked_orders {
336                    let mut child_order =
337                        if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
338                            order
339                        } else {
340                            panic!(
341                                "Cannot find OTO child order for client_order_id: {client_order_id}"
342                            );
343                        };
344
345                    if !self.should_manage_order(&child_order) {
346                        continue;
347                    }
348
349                    if child_order.position_id().is_none() {
350                        child_order.set_position_id(position_id);
351                    }
352
353                    if parent_filled_qty != child_order.leaves_qty() {
354                        self.modify_order_quantity(&mut child_order, parent_filled_qty);
355                    }
356
357                    // if self.submit_order_handler.is_none() {
358                    //     return;
359                    // }
360
361                    if !self
362                        .submit_order_commands
363                        .contains_key(&child_order.client_order_id())
364                    {
365                        if let Err(e) =
366                            self.create_new_submit_order(&child_order, position_id, client_id)
367                        {
368                            log::error!("Failed to create new submit order: {e}");
369                        }
370                    }
371                }
372            }
373            Some(ContingencyType::Oco) => {
374                let linked_orders = if let Some(orders) = order.linked_order_ids() {
375                    orders
376                } else {
377                    log::error!("No linked orders found for OCO order");
378                    return;
379                };
380
381                for client_order_id in linked_orders {
382                    let contingent_order = match self.cache.borrow().order(client_order_id).cloned()
383                    {
384                        Some(contingent_order) => contingent_order,
385                        None => {
386                            panic!(
387                                "Cannot find OCO contingent order for client_order_id: {client_order_id}"
388                            );
389                        }
390                    };
391
392                    // Not being managed || Already completed
393                    if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
394                    {
395                        continue;
396                    }
397                    if contingent_order.client_order_id() != order.client_order_id() {
398                        self.cancel_order(&contingent_order);
399                    }
400                }
401            }
402            Some(ContingencyType::Ouo) => self.handle_contingencies(order),
403            _ => {}
404        }
405    }
406
407    /// # Panics
408    ///
409    /// Panics if a contingent order cannot be found for the given client order ID.
410    pub fn handle_contingencies(&mut self, order: OrderAny) {
411        let (filled_qty, leaves_qty, is_spawn_active) =
412            if let Some(exec_spawn_id) = order.exec_spawn_id() {
413                if let (Some(filled), Some(leaves)) = (
414                    self.cache
415                        .borrow()
416                        .exec_spawn_total_filled_qty(&exec_spawn_id, true),
417                    self.cache
418                        .borrow()
419                        .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
420                ) {
421                    (filled, leaves, leaves.raw > 0)
422                } else {
423                    log::error!("Failed to get spawn quantities for {exec_spawn_id}");
424                    return;
425                }
426            } else {
427                (order.filled_qty(), order.leaves_qty(), false)
428            };
429
430        let linked_orders = if let Some(orders) = order.linked_order_ids() {
431            orders
432        } else {
433            log::error!("No linked orders found");
434            return;
435        };
436
437        for client_order_id in linked_orders {
438            let mut contingent_order =
439                if let Some(order) = self.cache.borrow().order(client_order_id).cloned() {
440                    order
441                } else {
442                    panic!("Cannot find contingent order for client_order_id: {client_order_id}");
443                };
444
445            if !self.should_manage_order(&contingent_order)
446                || client_order_id == &order.client_order_id()
447            {
448                continue;
449            }
450
451            if contingent_order.is_closed() {
452                self.submit_order_commands.remove(&order.client_order_id());
453                continue;
454            }
455
456            match order.contingency_type() {
457                Some(ContingencyType::Oto) => {
458                    if order.is_closed()
459                        && filled_qty.raw == 0
460                        && (order.exec_spawn_id().is_none() || !is_spawn_active)
461                    {
462                        self.cancel_order(&contingent_order);
463                    } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
464                        self.modify_order_quantity(&mut contingent_order, filled_qty);
465                    }
466                }
467                Some(ContingencyType::Oco) => {
468                    if order.is_closed() && (order.exec_spawn_id().is_none() || !is_spawn_active) {
469                        self.cancel_order(&contingent_order);
470                    }
471                }
472                Some(ContingencyType::Ouo) => {
473                    if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
474                        || (order.is_closed()
475                            && (order.exec_spawn_id().is_none() || !is_spawn_active))
476                    {
477                        self.cancel_order(&contingent_order);
478                    } else if leaves_qty != contingent_order.leaves_qty() {
479                        self.modify_order_quantity(&mut contingent_order, leaves_qty);
480                    }
481                }
482                _ => {}
483            }
484        }
485    }
486
487    /// # Panics
488    ///
489    /// Panics if an OCO contingent order cannot be found for the given client order ID.
490    pub fn handle_contingencies_update(&mut self, order: OrderAny) {
491        let quantity = match order.exec_spawn_id() {
492            Some(exec_spawn_id) => {
493                if let Some(qty) = self
494                    .cache
495                    .borrow()
496                    .exec_spawn_total_quantity(&exec_spawn_id, true)
497                {
498                    qty
499                } else {
500                    log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
501                    return;
502                }
503            }
504            None => order.quantity(),
505        };
506
507        if quantity.raw == 0 {
508            return;
509        }
510
511        let linked_orders = if let Some(orders) = order.linked_order_ids() {
512            orders
513        } else {
514            log::error!("No linked orders found for contingent order");
515            return;
516        };
517
518        for client_order_id in linked_orders {
519            let mut contingent_order = match self.cache.borrow().order(client_order_id).cloned() {
520                Some(contingent_order) => contingent_order,
521                None => panic!(
522                    "Cannot find OCO contingent order for client_order_id: {client_order_id}"
523                ),
524            };
525
526            if !self.should_manage_order(&contingent_order)
527                || client_order_id == &order.client_order_id()
528                || contingent_order.is_closed()
529            {
530                continue;
531            }
532
533            if let Some(contingency_type) = order.contingency_type() {
534                if matches!(
535                    contingency_type,
536                    ContingencyType::Oto | ContingencyType::Oco
537                ) && quantity != contingent_order.quantity()
538                {
539                    self.modify_order_quantity(&mut contingent_order, quantity);
540                }
541            }
542        }
543    }
544
545    pub fn handle_position_event(&mut self, _event: OrderEventAny) {
546        todo!()
547    }
548
549    // Message sending methods
550    pub fn send_emulator_command(&self, command: TradingCommand) {
551        log::info!("{CMD}{SEND} {command}");
552
553        msgbus::send_any("OrderEmulator.execute".into(), &command);
554    }
555
556    pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
557        log::info!("{CMD}{SEND} {command}");
558
559        let endpoint = format!("{exec_algorithm_id}.execute");
560        msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
561    }
562
563    pub fn send_risk_command(&self, command: TradingCommand) {
564        log::info!("{CMD}{SEND} {command}");
565        msgbus::send_any("RiskEngine.execute".into(), &command);
566    }
567
568    pub fn send_exec_command(&self, command: TradingCommand) {
569        log::info!("{CMD}{SEND} {command}");
570        msgbus::send_any("ExecEngine.execute".into(), &command);
571    }
572
573    pub fn send_risk_event(&self, event: OrderEventAny) {
574        log::info!("{EVT}{SEND} {event}");
575        msgbus::send_any("RiskEngine.process".into(), &event);
576    }
577
578    pub fn send_exec_event(&self, event: OrderEventAny) {
579        log::info!("{EVT}{SEND} {event}");
580        msgbus::send_any("ExecEngine.process".into(), &event);
581    }
582}