1use std::{collections::HashMap, str::FromStr};
17
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use futures::{StreamExt, future::join_all};
21use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
22use nautilus_model::{
23 accounts::AccountAny,
24 identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
25 instruments::{InstrumentAny, SyntheticInstrument},
26 orders::OrderAny,
27 position::Position,
28 types::Currency,
29};
30use redis::{AsyncCommands, aio::ConnectionManager};
31use serde::{Serialize, de::DeserializeOwned};
32use serde_json::Value;
33use tokio::try_join;
34use ustr::Ustr;
35
36const INDEX: &str = "index";
38const GENERAL: &str = "general";
39const CURRENCIES: &str = "currencies";
40const INSTRUMENTS: &str = "instruments";
41const SYNTHETICS: &str = "synthetics";
42const ACCOUNTS: &str = "accounts";
43const ORDERS: &str = "orders";
44const POSITIONS: &str = "positions";
45const ACTORS: &str = "actors";
46const STRATEGIES: &str = "strategies";
47const REDIS_DELIMITER: char = ':';
48
49const INDEX_ORDER_IDS: &str = "index:order_ids";
51const INDEX_ORDER_POSITION: &str = "index:order_position";
52const INDEX_ORDER_CLIENT: &str = "index:order_client";
53const INDEX_ORDERS: &str = "index:orders";
54const INDEX_ORDERS_OPEN: &str = "index:orders_open";
55const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
56const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
57const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
58const INDEX_POSITIONS: &str = "index:positions";
59const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
60const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
61
62#[derive(Debug)]
63pub struct DatabaseQueries;
64
65impl DatabaseQueries {
66 pub fn serialize_payload<T: Serialize>(
72 encoding: SerializationEncoding,
73 payload: &T,
74 ) -> anyhow::Result<Vec<u8>> {
75 let mut value = serde_json::to_value(payload)?;
76 convert_timestamps(&mut value);
77 match encoding {
78 SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
79 .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
80 SerializationEncoding::Json => serde_json::to_vec(&value)
81 .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
82 }
83 }
84
85 pub fn deserialize_payload<T: DeserializeOwned>(
91 encoding: SerializationEncoding,
92 payload: &[u8],
93 ) -> anyhow::Result<T> {
94 let mut value = match encoding {
95 SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
96 .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
97 SerializationEncoding::Json => serde_json::from_slice(payload)
98 .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
99 };
100
101 convert_timestamp_strings(&mut value);
102
103 serde_json::from_value(value)
104 .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
105 }
106
107 pub async fn scan_keys(
113 con: &mut ConnectionManager,
114 pattern: String,
115 ) -> anyhow::Result<Vec<String>> {
116 Ok(con
117 .scan_match::<String, String>(pattern)
118 .await?
119 .collect()
120 .await)
121 }
122
123 pub async fn read(
129 con: &ConnectionManager,
130 trader_key: &str,
131 key: &str,
132 ) -> anyhow::Result<Vec<Bytes>> {
133 let collection = Self::get_collection_key(key)?;
134 let key = format!("{trader_key}{REDIS_DELIMITER}{key}");
135 let mut con = con.clone();
136
137 match collection {
138 INDEX => Self::read_index(&mut con, &key).await,
139 GENERAL => Self::read_string(&mut con, &key).await,
140 CURRENCIES => Self::read_string(&mut con, &key).await,
141 INSTRUMENTS => Self::read_string(&mut con, &key).await,
142 SYNTHETICS => Self::read_string(&mut con, &key).await,
143 ACCOUNTS => Self::read_list(&mut con, &key).await,
144 ORDERS => Self::read_list(&mut con, &key).await,
145 POSITIONS => Self::read_list(&mut con, &key).await,
146 ACTORS => Self::read_string(&mut con, &key).await,
147 STRATEGIES => Self::read_string(&mut con, &key).await,
148 _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
149 }
150 }
151
152 pub async fn load_all(
158 con: &ConnectionManager,
159 encoding: SerializationEncoding,
160 trader_key: &str,
161 ) -> anyhow::Result<CacheMap> {
162 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
163 Self::load_currencies(con, trader_key, encoding),
164 Self::load_instruments(con, trader_key, encoding),
165 Self::load_synthetics(con, trader_key, encoding),
166 Self::load_accounts(con, trader_key, encoding),
167 Self::load_orders(con, trader_key, encoding),
168 Self::load_positions(con, trader_key, encoding)
169 )
170 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
171
172 let greeks = HashMap::new();
175 let yield_curves = HashMap::new();
176
177 Ok(CacheMap {
178 currencies,
179 instruments,
180 synthetics,
181 accounts,
182 orders,
183 positions,
184 greeks,
185 yield_curves,
186 })
187 }
188
189 pub async fn load_currencies(
195 con: &ConnectionManager,
196 trader_key: &str,
197 encoding: SerializationEncoding,
198 ) -> anyhow::Result<HashMap<Ustr, Currency>> {
199 let mut currencies = HashMap::new();
200 let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
201 tracing::debug!("Loading {pattern}");
202
203 let mut con = con.clone();
204 let keys = Self::scan_keys(&mut con, pattern).await?;
205
206 let futures: Vec<_> = keys
207 .iter()
208 .map(|key| {
209 let con = con.clone();
210 async move {
211 let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
212 Ustr::from(code)
213 } else {
214 log::error!("Invalid key format: {key}");
215 return None;
216 };
217
218 match Self::load_currency(&con, trader_key, ¤cy_code, encoding).await {
219 Ok(Some(currency)) => Some((currency_code, currency)),
220 Ok(None) => {
221 log::error!("Currency not found: {currency_code}");
222 None
223 }
224 Err(e) => {
225 log::error!("Failed to load currency {currency_code}: {e}");
226 None
227 }
228 }
229 }
230 })
231 .collect();
232
233 currencies.extend(join_all(futures).await.into_iter().flatten());
235 tracing::debug!("Loaded {} currencies(s)", currencies.len());
236
237 Ok(currencies)
238 }
239
240 pub async fn load_instruments(
251 con: &ConnectionManager,
252 trader_key: &str,
253 encoding: SerializationEncoding,
254 ) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
255 let mut instruments = HashMap::new();
256 let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
257 tracing::debug!("Loading {pattern}");
258
259 let mut con = con.clone();
260 let keys = Self::scan_keys(&mut con, pattern).await?;
261
262 let futures: Vec<_> = keys
263 .iter()
264 .map(|key| {
265 let con = con.clone();
266 async move {
267 let instrument_id = key
268 .as_str()
269 .rsplit(':')
270 .next()
271 .ok_or_else(|| {
272 log::error!("Invalid key format: {key}");
273 "Invalid key format"
274 })
275 .and_then(|code| {
276 InstrumentId::from_str(code).map_err(|e| {
277 log::error!("Failed to convert to InstrumentId for {key}: {e}");
278 "Invalid instrument ID"
279 })
280 });
281
282 let instrument_id = match instrument_id {
283 Ok(id) => id,
284 Err(_) => return None,
285 };
286
287 match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
288 Ok(Some(instrument)) => Some((instrument_id, instrument)),
289 Ok(None) => {
290 log::error!("Instrument not found: {instrument_id}");
291 None
292 }
293 Err(e) => {
294 log::error!("Failed to load instrument {instrument_id}: {e}");
295 None
296 }
297 }
298 }
299 })
300 .collect();
301
302 instruments.extend(join_all(futures).await.into_iter().flatten());
304 tracing::debug!("Loaded {} instruments(s)", instruments.len());
305
306 Ok(instruments)
307 }
308
309 pub async fn load_synthetics(
320 con: &ConnectionManager,
321 trader_key: &str,
322 encoding: SerializationEncoding,
323 ) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
324 let mut synthetics = HashMap::new();
325 let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
326 tracing::debug!("Loading {pattern}");
327
328 let mut con = con.clone();
329 let keys = Self::scan_keys(&mut con, pattern).await?;
330
331 let futures: Vec<_> = keys
332 .iter()
333 .map(|key| {
334 let con = con.clone();
335 async move {
336 let instrument_id = key
337 .as_str()
338 .rsplit(':')
339 .next()
340 .ok_or_else(|| {
341 log::error!("Invalid key format: {key}");
342 "Invalid key format"
343 })
344 .and_then(|code| {
345 InstrumentId::from_str(code).map_err(|e| {
346 log::error!("Failed to parse InstrumentId for {key}: {e}");
347 "Invalid instrument ID"
348 })
349 });
350
351 let instrument_id = match instrument_id {
352 Ok(id) => id,
353 Err(_) => return None,
354 };
355
356 match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
357 Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
358 Ok(None) => {
359 log::error!("Synthetic not found: {instrument_id}");
360 None
361 }
362 Err(e) => {
363 log::error!("Failed to load synthetic {instrument_id}: {e}");
364 None
365 }
366 }
367 }
368 })
369 .collect();
370
371 synthetics.extend(join_all(futures).await.into_iter().flatten());
373 tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
374
375 Ok(synthetics)
376 }
377
378 pub async fn load_accounts(
389 con: &ConnectionManager,
390 trader_key: &str,
391 encoding: SerializationEncoding,
392 ) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
393 let mut accounts = HashMap::new();
394 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
395 tracing::debug!("Loading {pattern}");
396
397 let mut con = con.clone();
398 let keys = Self::scan_keys(&mut con, pattern).await?;
399
400 let futures: Vec<_> = keys
401 .iter()
402 .map(|key| {
403 let con = con.clone();
404 async move {
405 let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
406 AccountId::from(code)
407 } else {
408 log::error!("Invalid key format: {key}");
409 return None;
410 };
411
412 match Self::load_account(&con, trader_key, &account_id, encoding).await {
413 Ok(Some(account)) => Some((account_id, account)),
414 Ok(None) => {
415 log::error!("Account not found: {account_id}");
416 None
417 }
418 Err(e) => {
419 log::error!("Failed to load account {account_id}: {e}");
420 None
421 }
422 }
423 }
424 })
425 .collect();
426
427 accounts.extend(join_all(futures).await.into_iter().flatten());
429 tracing::debug!("Loaded {} accounts(s)", accounts.len());
430
431 Ok(accounts)
432 }
433
434 pub async fn load_orders(
445 con: &ConnectionManager,
446 trader_key: &str,
447 encoding: SerializationEncoding,
448 ) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
449 let mut orders = HashMap::new();
450 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
451 tracing::debug!("Loading {pattern}");
452
453 let mut con = con.clone();
454 let keys = Self::scan_keys(&mut con, pattern).await?;
455
456 let futures: Vec<_> = keys
457 .iter()
458 .map(|key| {
459 let con = con.clone();
460 async move {
461 let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
462 ClientOrderId::from(code)
463 } else {
464 log::error!("Invalid key format: {key}");
465 return None;
466 };
467
468 match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
469 Ok(Some(order)) => Some((client_order_id, order)),
470 Ok(None) => {
471 log::error!("Order not found: {client_order_id}");
472 None
473 }
474 Err(e) => {
475 log::error!("Failed to load order {client_order_id}: {e}");
476 None
477 }
478 }
479 }
480 })
481 .collect();
482
483 orders.extend(join_all(futures).await.into_iter().flatten());
485 tracing::debug!("Loaded {} order(s)", orders.len());
486
487 Ok(orders)
488 }
489
490 pub async fn load_positions(
501 con: &ConnectionManager,
502 trader_key: &str,
503 encoding: SerializationEncoding,
504 ) -> anyhow::Result<HashMap<PositionId, Position>> {
505 let mut positions = HashMap::new();
506 let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
507 tracing::debug!("Loading {pattern}");
508
509 let mut con = con.clone();
510 let keys = Self::scan_keys(&mut con, pattern).await?;
511
512 let futures: Vec<_> = keys
513 .iter()
514 .map(|key| {
515 let con = con.clone();
516 async move {
517 let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
518 PositionId::from(code)
519 } else {
520 log::error!("Invalid key format: {key}");
521 return None;
522 };
523
524 match Self::load_position(&con, trader_key, &position_id, encoding).await {
525 Ok(Some(position)) => Some((position_id, position)),
526 Ok(None) => {
527 log::error!("Position not found: {position_id}");
528 None
529 }
530 Err(e) => {
531 log::error!("Failed to load position {position_id}: {e}");
532 None
533 }
534 }
535 }
536 })
537 .collect();
538
539 positions.extend(join_all(futures).await.into_iter().flatten());
541 tracing::debug!("Loaded {} position(s)", positions.len());
542
543 Ok(positions)
544 }
545
546 pub async fn load_currency(
552 con: &ConnectionManager,
553 trader_key: &str,
554 code: &Ustr,
555 encoding: SerializationEncoding,
556 ) -> anyhow::Result<Option<Currency>> {
557 let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
558 let result = Self::read(con, trader_key, &key).await?;
559
560 if result.is_empty() {
561 return Ok(None);
562 }
563
564 let currency = Self::deserialize_payload(encoding, &result[0])?;
565 Ok(currency)
566 }
567
568 pub async fn load_instrument(
574 con: &ConnectionManager,
575 trader_key: &str,
576 instrument_id: &InstrumentId,
577 encoding: SerializationEncoding,
578 ) -> anyhow::Result<Option<InstrumentAny>> {
579 let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
580 let result = Self::read(con, trader_key, &key).await?;
581 if result.is_empty() {
582 return Ok(None);
583 }
584
585 let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
586 Ok(Some(instrument))
587 }
588
589 pub async fn load_synthetic(
595 con: &ConnectionManager,
596 trader_key: &str,
597 instrument_id: &InstrumentId,
598 encoding: SerializationEncoding,
599 ) -> anyhow::Result<Option<SyntheticInstrument>> {
600 let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
601 let result = Self::read(con, trader_key, &key).await?;
602 if result.is_empty() {
603 return Ok(None);
604 }
605
606 let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
607 Ok(Some(synthetic))
608 }
609
610 pub async fn load_account(
616 con: &ConnectionManager,
617 trader_key: &str,
618 account_id: &AccountId,
619 encoding: SerializationEncoding,
620 ) -> anyhow::Result<Option<AccountAny>> {
621 let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
622 let result = Self::read(con, trader_key, &key).await?;
623 if result.is_empty() {
624 return Ok(None);
625 }
626
627 let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
628 Ok(Some(account))
629 }
630
631 pub async fn load_order(
637 con: &ConnectionManager,
638 trader_key: &str,
639 client_order_id: &ClientOrderId,
640 encoding: SerializationEncoding,
641 ) -> anyhow::Result<Option<OrderAny>> {
642 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
643 let result = Self::read(con, trader_key, &key).await?;
644 if result.is_empty() {
645 return Ok(None);
646 }
647
648 let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
649 Ok(Some(order))
650 }
651
652 pub async fn load_position(
658 con: &ConnectionManager,
659 trader_key: &str,
660 position_id: &PositionId,
661 encoding: SerializationEncoding,
662 ) -> anyhow::Result<Option<Position>> {
663 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
664 let result = Self::read(con, trader_key, &key).await?;
665 if result.is_empty() {
666 return Ok(None);
667 }
668
669 let position: Position = Self::deserialize_payload(encoding, &result[0])?;
670 Ok(Some(position))
671 }
672
673 fn get_collection_key(key: &str) -> anyhow::Result<&str> {
674 key.split_once(REDIS_DELIMITER)
675 .map(|(collection, _)| collection)
676 .ok_or_else(|| {
677 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
678 })
679 }
680
681 async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
682 let index_key = Self::get_index_key(key)?;
683 match index_key {
684 INDEX_ORDER_IDS => Self::read_set(conn, key).await,
685 INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
686 INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
687 INDEX_ORDERS => Self::read_set(conn, key).await,
688 INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
689 INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
690 INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
691 INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
692 INDEX_POSITIONS => Self::read_set(conn, key).await,
693 INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
694 INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
695 _ => anyhow::bail!("Index unknown '{index_key}' on read"),
696 }
697 }
698
699 async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
700 let result: Vec<u8> = conn.get(key).await?;
701
702 if result.is_empty() {
703 Ok(vec![])
704 } else {
705 Ok(vec![Bytes::from(result)])
706 }
707 }
708
709 async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
710 let result: Vec<Bytes> = conn.smembers(key).await?;
711 Ok(result)
712 }
713
714 async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
715 let result: HashMap<String, String> = conn.hgetall(key).await?;
716 let json = serde_json::to_string(&result)?;
717 Ok(vec![Bytes::from(json.into_bytes())])
718 }
719
720 async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
721 let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
722 Ok(result)
723 }
724
725 fn get_index_key(key: &str) -> anyhow::Result<&str> {
726 key.split_once(REDIS_DELIMITER)
727 .map(|(_, index_key)| index_key)
728 .ok_or_else(|| {
729 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
730 })
731 }
732}
733
734fn is_timestamp_field(key: &str) -> bool {
735 let expire_match = key == "expire_time_ns";
736 let ts_match = key.starts_with("ts_");
737 expire_match || ts_match
738}
739
740fn convert_timestamps(value: &mut Value) {
741 match value {
742 Value::Object(map) => {
743 for (key, v) in map {
744 if is_timestamp_field(key) {
745 if let Value::Number(n) = v {
746 if let Some(n) = n.as_u64() {
747 let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
748 *v = Value::String(
749 dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
750 );
751 }
752 }
753 }
754 convert_timestamps(v);
755 }
756 }
757 Value::Array(arr) => {
758 for item in arr {
759 convert_timestamps(item);
760 }
761 }
762 _ => {}
763 }
764}
765
766fn convert_timestamp_strings(value: &mut Value) {
767 match value {
768 Value::Object(map) => {
769 for (key, v) in map {
770 if is_timestamp_field(key) {
771 if let Value::String(s) = v {
772 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
773 *v = Value::Number(
774 (dt.with_timezone(&Utc)
775 .timestamp_nanos_opt()
776 .expect("Invalid DateTime")
777 as u64)
778 .into(),
779 );
780 }
781 }
782 }
783 convert_timestamp_strings(v);
784 }
785 }
786 Value::Array(arr) => {
787 for item in arr {
788 convert_timestamp_strings(item);
789 }
790 }
791 _ => {}
792 }
793}