nautilus_infrastructure/redis/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr};
17
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use futures::{StreamExt, future::join_all};
21use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
22use nautilus_model::{
23    accounts::AccountAny,
24    identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
25    instruments::{InstrumentAny, SyntheticInstrument},
26    orders::OrderAny,
27    position::Position,
28    types::Currency,
29};
30use redis::{AsyncCommands, aio::ConnectionManager};
31use serde::{Serialize, de::DeserializeOwned};
32use serde_json::Value;
33use tokio::try_join;
34use ustr::Ustr;
35
36// Collection keys
37const INDEX: &str = "index";
38const GENERAL: &str = "general";
39const CURRENCIES: &str = "currencies";
40const INSTRUMENTS: &str = "instruments";
41const SYNTHETICS: &str = "synthetics";
42const ACCOUNTS: &str = "accounts";
43const ORDERS: &str = "orders";
44const POSITIONS: &str = "positions";
45const ACTORS: &str = "actors";
46const STRATEGIES: &str = "strategies";
47const REDIS_DELIMITER: char = ':';
48
49// Index keys
50const INDEX_ORDER_IDS: &str = "index:order_ids";
51const INDEX_ORDER_POSITION: &str = "index:order_position";
52const INDEX_ORDER_CLIENT: &str = "index:order_client";
53const INDEX_ORDERS: &str = "index:orders";
54const INDEX_ORDERS_OPEN: &str = "index:orders_open";
55const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
56const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
57const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
58const INDEX_POSITIONS: &str = "index:positions";
59const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
60const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
61
62#[derive(Debug)]
63pub struct DatabaseQueries;
64
65impl DatabaseQueries {
66    /// Serializes the given `payload` using the specified `encoding` to a byte vector.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if serialization to the chosen encoding fails.
71    pub fn serialize_payload<T: Serialize>(
72        encoding: SerializationEncoding,
73        payload: &T,
74    ) -> anyhow::Result<Vec<u8>> {
75        let mut value = serde_json::to_value(payload)?;
76        convert_timestamps(&mut value);
77        match encoding {
78            SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
79                .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
80            SerializationEncoding::Json => serde_json::to_vec(&value)
81                .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
82        }
83    }
84
85    /// Deserializes the given byte slice `payload` into type `T` using the specified `encoding`.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if deserialization from the chosen encoding fails or converting to the target type fails.
90    pub fn deserialize_payload<T: DeserializeOwned>(
91        encoding: SerializationEncoding,
92        payload: &[u8],
93    ) -> anyhow::Result<T> {
94        let mut value = match encoding {
95            SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
96                .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
97            SerializationEncoding::Json => serde_json::from_slice(payload)
98                .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
99        };
100
101        convert_timestamp_strings(&mut value);
102
103        serde_json::from_value(value)
104            .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
105    }
106
107    /// Scans Redis for keys matching the given `pattern`.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if the Redis scan operation fails.
112    pub async fn scan_keys(
113        con: &mut ConnectionManager,
114        pattern: String,
115    ) -> anyhow::Result<Vec<String>> {
116        Ok(con
117            .scan_match::<String, String>(pattern)
118            .await?
119            .collect()
120            .await)
121    }
122
123    /// Reads raw byte payloads for `key` under `trader_key` from Redis.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the underlying Redis read operation fails or if the collection is unsupported.
128    pub async fn read(
129        con: &ConnectionManager,
130        trader_key: &str,
131        key: &str,
132    ) -> anyhow::Result<Vec<Bytes>> {
133        let collection = Self::get_collection_key(key)?;
134        let key = format!("{trader_key}{REDIS_DELIMITER}{key}");
135        let mut con = con.clone();
136
137        match collection {
138            INDEX => Self::read_index(&mut con, &key).await,
139            GENERAL => Self::read_string(&mut con, &key).await,
140            CURRENCIES => Self::read_string(&mut con, &key).await,
141            INSTRUMENTS => Self::read_string(&mut con, &key).await,
142            SYNTHETICS => Self::read_string(&mut con, &key).await,
143            ACCOUNTS => Self::read_list(&mut con, &key).await,
144            ORDERS => Self::read_list(&mut con, &key).await,
145            POSITIONS => Self::read_list(&mut con, &key).await,
146            ACTORS => Self::read_string(&mut con, &key).await,
147            STRATEGIES => Self::read_string(&mut con, &key).await,
148            _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
149        }
150    }
151
152    /// Loads all cache data (currencies, instruments, synthetics, accounts, orders, positions) for `trader_key`.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if loading any of the individual caches fails or combining data fails.
157    pub async fn load_all(
158        con: &ConnectionManager,
159        encoding: SerializationEncoding,
160        trader_key: &str,
161    ) -> anyhow::Result<CacheMap> {
162        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
163            Self::load_currencies(con, trader_key, encoding),
164            Self::load_instruments(con, trader_key, encoding),
165            Self::load_synthetics(con, trader_key, encoding),
166            Self::load_accounts(con, trader_key, encoding),
167            Self::load_orders(con, trader_key, encoding),
168            Self::load_positions(con, trader_key, encoding)
169        )
170        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
171
172        // For now, we don't load greeks and yield curves from the database
173        // This will be implemented in the future
174        let greeks = HashMap::new();
175        let yield_curves = HashMap::new();
176
177        Ok(CacheMap {
178            currencies,
179            instruments,
180            synthetics,
181            accounts,
182            orders,
183            positions,
184            greeks,
185            yield_curves,
186        })
187    }
188
189    /// Loads all currencies for `trader_key` using the specified `encoding`.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if scanning keys or reading currency data fails.
194    pub async fn load_currencies(
195        con: &ConnectionManager,
196        trader_key: &str,
197        encoding: SerializationEncoding,
198    ) -> anyhow::Result<HashMap<Ustr, Currency>> {
199        let mut currencies = HashMap::new();
200        let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
201        tracing::debug!("Loading {pattern}");
202
203        let mut con = con.clone();
204        let keys = Self::scan_keys(&mut con, pattern).await?;
205
206        let futures: Vec<_> = keys
207            .iter()
208            .map(|key| {
209                let con = con.clone();
210                async move {
211                    let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
212                        Ustr::from(code)
213                    } else {
214                        log::error!("Invalid key format: {key}");
215                        return None;
216                    };
217
218                    match Self::load_currency(&con, trader_key, &currency_code, encoding).await {
219                        Ok(Some(currency)) => Some((currency_code, currency)),
220                        Ok(None) => {
221                            log::error!("Currency not found: {currency_code}");
222                            None
223                        }
224                        Err(e) => {
225                            log::error!("Failed to load currency {currency_code}: {e}");
226                            None
227                        }
228                    }
229                }
230            })
231            .collect();
232
233        // Insert all Currency_code (key) and Currency (value) into the HashMap, filtering out None values.
234        currencies.extend(join_all(futures).await.into_iter().flatten());
235        tracing::debug!("Loaded {} currencies(s)", currencies.len());
236
237        Ok(currencies)
238    }
239
240    /// Loads all instruments for `trader_key` using the specified `encoding`.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if scanning keys or reading instrument data fails.
245    /// Loads all instruments for `trader_key` using the specified `encoding`.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if scanning keys or reading instrument data fails.
250    pub async fn load_instruments(
251        con: &ConnectionManager,
252        trader_key: &str,
253        encoding: SerializationEncoding,
254    ) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
255        let mut instruments = HashMap::new();
256        let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
257        tracing::debug!("Loading {pattern}");
258
259        let mut con = con.clone();
260        let keys = Self::scan_keys(&mut con, pattern).await?;
261
262        let futures: Vec<_> = keys
263            .iter()
264            .map(|key| {
265                let con = con.clone();
266                async move {
267                    let instrument_id = key
268                        .as_str()
269                        .rsplit(':')
270                        .next()
271                        .ok_or_else(|| {
272                            log::error!("Invalid key format: {key}");
273                            "Invalid key format"
274                        })
275                        .and_then(|code| {
276                            InstrumentId::from_str(code).map_err(|e| {
277                                log::error!("Failed to convert to InstrumentId for {key}: {e}");
278                                "Invalid instrument ID"
279                            })
280                        });
281
282                    let instrument_id = match instrument_id {
283                        Ok(id) => id,
284                        Err(_) => return None,
285                    };
286
287                    match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
288                        Ok(Some(instrument)) => Some((instrument_id, instrument)),
289                        Ok(None) => {
290                            log::error!("Instrument not found: {instrument_id}");
291                            None
292                        }
293                        Err(e) => {
294                            log::error!("Failed to load instrument {instrument_id}: {e}");
295                            None
296                        }
297                    }
298                }
299            })
300            .collect();
301
302        // Insert all Instrument_id (key) and Instrument (value) into the HashMap, filtering out None values.
303        instruments.extend(join_all(futures).await.into_iter().flatten());
304        tracing::debug!("Loaded {} instruments(s)", instruments.len());
305
306        Ok(instruments)
307    }
308
309    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if scanning keys or reading synthetic instrument data fails.
314    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if scanning keys or reading synthetic instrument data fails.
319    pub async fn load_synthetics(
320        con: &ConnectionManager,
321        trader_key: &str,
322        encoding: SerializationEncoding,
323    ) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
324        let mut synthetics = HashMap::new();
325        let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
326        tracing::debug!("Loading {pattern}");
327
328        let mut con = con.clone();
329        let keys = Self::scan_keys(&mut con, pattern).await?;
330
331        let futures: Vec<_> = keys
332            .iter()
333            .map(|key| {
334                let con = con.clone();
335                async move {
336                    let instrument_id = key
337                        .as_str()
338                        .rsplit(':')
339                        .next()
340                        .ok_or_else(|| {
341                            log::error!("Invalid key format: {key}");
342                            "Invalid key format"
343                        })
344                        .and_then(|code| {
345                            InstrumentId::from_str(code).map_err(|e| {
346                                log::error!("Failed to parse InstrumentId for {key}: {e}");
347                                "Invalid instrument ID"
348                            })
349                        });
350
351                    let instrument_id = match instrument_id {
352                        Ok(id) => id,
353                        Err(_) => return None,
354                    };
355
356                    match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
357                        Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
358                        Ok(None) => {
359                            log::error!("Synthetic not found: {instrument_id}");
360                            None
361                        }
362                        Err(e) => {
363                            log::error!("Failed to load synthetic {instrument_id}: {e}");
364                            None
365                        }
366                    }
367                }
368            })
369            .collect();
370
371        // Insert all Instrument_id (key) and Synthetic (value) into the HashMap, filtering out None values.
372        synthetics.extend(join_all(futures).await.into_iter().flatten());
373        tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
374
375        Ok(synthetics)
376    }
377
378    /// Loads all accounts for `trader_key` using the specified `encoding`.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if scanning keys or reading account data fails.
383    /// Loads all accounts for `trader_key` using the specified `encoding`.
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if scanning keys or reading account data fails.
388    pub async fn load_accounts(
389        con: &ConnectionManager,
390        trader_key: &str,
391        encoding: SerializationEncoding,
392    ) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
393        let mut accounts = HashMap::new();
394        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
395        tracing::debug!("Loading {pattern}");
396
397        let mut con = con.clone();
398        let keys = Self::scan_keys(&mut con, pattern).await?;
399
400        let futures: Vec<_> = keys
401            .iter()
402            .map(|key| {
403                let con = con.clone();
404                async move {
405                    let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
406                        AccountId::from(code)
407                    } else {
408                        log::error!("Invalid key format: {key}");
409                        return None;
410                    };
411
412                    match Self::load_account(&con, trader_key, &account_id, encoding).await {
413                        Ok(Some(account)) => Some((account_id, account)),
414                        Ok(None) => {
415                            log::error!("Account not found: {account_id}");
416                            None
417                        }
418                        Err(e) => {
419                            log::error!("Failed to load account {account_id}: {e}");
420                            None
421                        }
422                    }
423                }
424            })
425            .collect();
426
427        // Insert all Account_id (key) and Account (value) into the HashMap, filtering out None values.
428        accounts.extend(join_all(futures).await.into_iter().flatten());
429        tracing::debug!("Loaded {} accounts(s)", accounts.len());
430
431        Ok(accounts)
432    }
433
434    /// Loads all orders for `trader_key` using the specified `encoding`.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if scanning keys or reading order data fails.
439    /// Loads all orders for `trader_key` using the specified `encoding`.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if scanning keys or reading order data fails.
444    pub async fn load_orders(
445        con: &ConnectionManager,
446        trader_key: &str,
447        encoding: SerializationEncoding,
448    ) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
449        let mut orders = HashMap::new();
450        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
451        tracing::debug!("Loading {pattern}");
452
453        let mut con = con.clone();
454        let keys = Self::scan_keys(&mut con, pattern).await?;
455
456        let futures: Vec<_> = keys
457            .iter()
458            .map(|key| {
459                let con = con.clone();
460                async move {
461                    let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
462                        ClientOrderId::from(code)
463                    } else {
464                        log::error!("Invalid key format: {key}");
465                        return None;
466                    };
467
468                    match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
469                        Ok(Some(order)) => Some((client_order_id, order)),
470                        Ok(None) => {
471                            log::error!("Order not found: {client_order_id}");
472                            None
473                        }
474                        Err(e) => {
475                            log::error!("Failed to load order {client_order_id}: {e}");
476                            None
477                        }
478                    }
479                }
480            })
481            .collect();
482
483        // Insert all Client-Order-Id (key) and Order (value) into the HashMap, filtering out None values.
484        orders.extend(join_all(futures).await.into_iter().flatten());
485        tracing::debug!("Loaded {} order(s)", orders.len());
486
487        Ok(orders)
488    }
489
490    /// Loads all positions for `trader_key` using the specified `encoding`.
491    ///
492    /// # Errors
493    ///
494    /// Returns an error if scanning keys or reading position data fails.
495    /// Loads all positions for `trader_key` using the specified `encoding`.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if scanning keys or reading position data fails.
500    pub async fn load_positions(
501        con: &ConnectionManager,
502        trader_key: &str,
503        encoding: SerializationEncoding,
504    ) -> anyhow::Result<HashMap<PositionId, Position>> {
505        let mut positions = HashMap::new();
506        let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
507        tracing::debug!("Loading {pattern}");
508
509        let mut con = con.clone();
510        let keys = Self::scan_keys(&mut con, pattern).await?;
511
512        let futures: Vec<_> = keys
513            .iter()
514            .map(|key| {
515                let con = con.clone();
516                async move {
517                    let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
518                        PositionId::from(code)
519                    } else {
520                        log::error!("Invalid key format: {key}");
521                        return None;
522                    };
523
524                    match Self::load_position(&con, trader_key, &position_id, encoding).await {
525                        Ok(Some(position)) => Some((position_id, position)),
526                        Ok(None) => {
527                            log::error!("Position not found: {position_id}");
528                            None
529                        }
530                        Err(e) => {
531                            log::error!("Failed to load position {position_id}: {e}");
532                            None
533                        }
534                    }
535                }
536            })
537            .collect();
538
539        // Insert all Position_id (key) and Position (value) into the HashMap, filtering out None values.
540        positions.extend(join_all(futures).await.into_iter().flatten());
541        tracing::debug!("Loaded {} position(s)", positions.len());
542
543        Ok(positions)
544    }
545
546    /// Loads a single currency for `trader_key` and `code` using the specified `encoding`.
547    ///
548    /// # Errors
549    ///
550    /// Returns an error if the underlying read or deserialization fails.
551    pub async fn load_currency(
552        con: &ConnectionManager,
553        trader_key: &str,
554        code: &Ustr,
555        encoding: SerializationEncoding,
556    ) -> anyhow::Result<Option<Currency>> {
557        let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
558        let result = Self::read(con, trader_key, &key).await?;
559
560        if result.is_empty() {
561            return Ok(None);
562        }
563
564        let currency = Self::deserialize_payload(encoding, &result[0])?;
565        Ok(currency)
566    }
567
568    /// Loads a single instrument for `trader_key` and `instrument_id` using the specified `encoding`.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the underlying read or deserialization fails.
573    pub async fn load_instrument(
574        con: &ConnectionManager,
575        trader_key: &str,
576        instrument_id: &InstrumentId,
577        encoding: SerializationEncoding,
578    ) -> anyhow::Result<Option<InstrumentAny>> {
579        let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
580        let result = Self::read(con, trader_key, &key).await?;
581        if result.is_empty() {
582            return Ok(None);
583        }
584
585        let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
586        Ok(Some(instrument))
587    }
588
589    /// Loads a single synthetic instrument for `trader_key` and `instrument_id` using the specified `encoding`.
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if the underlying read or deserialization fails.
594    pub async fn load_synthetic(
595        con: &ConnectionManager,
596        trader_key: &str,
597        instrument_id: &InstrumentId,
598        encoding: SerializationEncoding,
599    ) -> anyhow::Result<Option<SyntheticInstrument>> {
600        let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
601        let result = Self::read(con, trader_key, &key).await?;
602        if result.is_empty() {
603            return Ok(None);
604        }
605
606        let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
607        Ok(Some(synthetic))
608    }
609
610    /// Loads a single account for `trader_key` and `account_id` using the specified `encoding`.
611    ///
612    /// # Errors
613    ///
614    /// Returns an error if the underlying read or deserialization fails.
615    pub async fn load_account(
616        con: &ConnectionManager,
617        trader_key: &str,
618        account_id: &AccountId,
619        encoding: SerializationEncoding,
620    ) -> anyhow::Result<Option<AccountAny>> {
621        let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
622        let result = Self::read(con, trader_key, &key).await?;
623        if result.is_empty() {
624            return Ok(None);
625        }
626
627        let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
628        Ok(Some(account))
629    }
630
631    /// Loads a single order for `trader_key` and `client_order_id` using the specified `encoding`.
632    ///
633    /// # Errors
634    ///
635    /// Returns an error if the underlying read or deserialization fails.
636    pub async fn load_order(
637        con: &ConnectionManager,
638        trader_key: &str,
639        client_order_id: &ClientOrderId,
640        encoding: SerializationEncoding,
641    ) -> anyhow::Result<Option<OrderAny>> {
642        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
643        let result = Self::read(con, trader_key, &key).await?;
644        if result.is_empty() {
645            return Ok(None);
646        }
647
648        let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
649        Ok(Some(order))
650    }
651
652    /// Loads a single position for `trader_key` and `position_id` using the specified `encoding`.
653    ///
654    /// # Errors
655    ///
656    /// Returns an error if the underlying read or deserialization fails.
657    pub async fn load_position(
658        con: &ConnectionManager,
659        trader_key: &str,
660        position_id: &PositionId,
661        encoding: SerializationEncoding,
662    ) -> anyhow::Result<Option<Position>> {
663        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
664        let result = Self::read(con, trader_key, &key).await?;
665        if result.is_empty() {
666            return Ok(None);
667        }
668
669        let position: Position = Self::deserialize_payload(encoding, &result[0])?;
670        Ok(Some(position))
671    }
672
673    fn get_collection_key(key: &str) -> anyhow::Result<&str> {
674        key.split_once(REDIS_DELIMITER)
675            .map(|(collection, _)| collection)
676            .ok_or_else(|| {
677                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
678            })
679    }
680
681    async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
682        let index_key = Self::get_index_key(key)?;
683        match index_key {
684            INDEX_ORDER_IDS => Self::read_set(conn, key).await,
685            INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
686            INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
687            INDEX_ORDERS => Self::read_set(conn, key).await,
688            INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
689            INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
690            INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
691            INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
692            INDEX_POSITIONS => Self::read_set(conn, key).await,
693            INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
694            INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
695            _ => anyhow::bail!("Index unknown '{index_key}' on read"),
696        }
697    }
698
699    async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
700        let result: Vec<u8> = conn.get(key).await?;
701
702        if result.is_empty() {
703            Ok(vec![])
704        } else {
705            Ok(vec![Bytes::from(result)])
706        }
707    }
708
709    async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
710        let result: Vec<Bytes> = conn.smembers(key).await?;
711        Ok(result)
712    }
713
714    async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
715        let result: HashMap<String, String> = conn.hgetall(key).await?;
716        let json = serde_json::to_string(&result)?;
717        Ok(vec![Bytes::from(json.into_bytes())])
718    }
719
720    async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
721        let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
722        Ok(result)
723    }
724
725    fn get_index_key(key: &str) -> anyhow::Result<&str> {
726        key.split_once(REDIS_DELIMITER)
727            .map(|(_, index_key)| index_key)
728            .ok_or_else(|| {
729                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
730            })
731    }
732}
733
734fn is_timestamp_field(key: &str) -> bool {
735    let expire_match = key == "expire_time_ns";
736    let ts_match = key.starts_with("ts_");
737    expire_match || ts_match
738}
739
740fn convert_timestamps(value: &mut Value) {
741    match value {
742        Value::Object(map) => {
743            for (key, v) in map {
744                if is_timestamp_field(key) {
745                    if let Value::Number(n) = v {
746                        if let Some(n) = n.as_u64() {
747                            let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
748                            *v = Value::String(
749                                dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
750                            );
751                        }
752                    }
753                }
754                convert_timestamps(v);
755            }
756        }
757        Value::Array(arr) => {
758            for item in arr {
759                convert_timestamps(item);
760            }
761        }
762        _ => {}
763    }
764}
765
766fn convert_timestamp_strings(value: &mut Value) {
767    match value {
768        Value::Object(map) => {
769            for (key, v) in map {
770                if is_timestamp_field(key) {
771                    if let Value::String(s) = v {
772                        if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
773                            *v = Value::Number(
774                                (dt.with_timezone(&Utc)
775                                    .timestamp_nanos_opt()
776                                    .expect("Invalid DateTime")
777                                    as u64)
778                                    .into(),
779                            );
780                        }
781                    }
782                }
783                convert_timestamp_strings(v);
784            }
785        }
786        Value::Array(arr) => {
787            for item in arr {
788                convert_timestamp_strings(item);
789            }
790        }
791        _ => {}
792    }
793}