1use std::{
17 collections::HashMap,
18 env,
19 fmt::Display,
20 str::FromStr,
21 sync::{atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27 kv::{ToValue, Value},
28 set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31 UUID4, UnixNanos,
32 datetime::unix_nanos_to_iso8601,
33 time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_REALTIME};
40use crate::{
41 enums::{LogColor, LogLevel},
42 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46
47#[cfg_attr(
48 feature = "python",
49 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
50)]
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct LoggerConfig {
53 pub stdout_level: LevelFilter,
55 pub fileout_level: LevelFilter,
57 component_level: HashMap<Ustr, LevelFilter>,
59 pub is_colored: bool,
61 pub print_config: bool,
63}
64
65impl Default for LoggerConfig {
66 fn default() -> Self {
68 Self {
69 stdout_level: LevelFilter::Info,
70 fileout_level: LevelFilter::Off,
71 component_level: HashMap::new(),
72 is_colored: false,
73 print_config: false,
74 }
75 }
76}
77
78impl LoggerConfig {
79 #[must_use]
81 pub const fn new(
82 stdout_level: LevelFilter,
83 fileout_level: LevelFilter,
84 component_level: HashMap<Ustr, LevelFilter>,
85 is_colored: bool,
86 print_config: bool,
87 ) -> Self {
88 Self {
89 stdout_level,
90 fileout_level,
91 component_level,
92 is_colored,
93 print_config,
94 }
95 }
96
97 pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
101 let mut config = Self::default();
102 for kv in spec.split(';') {
103 let kv = kv.trim();
104 if kv.is_empty() {
105 continue;
106 }
107 let kv_lower = kv.to_lowercase(); if kv_lower == "is_colored" {
109 config.is_colored = true;
110 } else if kv_lower == "print_config" {
111 config.print_config = true;
112 } else {
113 let parts: Vec<&str> = kv.split('=').collect();
114 if parts.len() != 2 {
115 anyhow::bail!("Invalid spec pair: {}", kv);
116 }
117 let k = parts[0].trim(); let v = parts[1].trim(); let lvl = LevelFilter::from_str(v)
120 .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
121 let k_lower = k.to_lowercase(); match k_lower.as_str() {
123 "stdout" => config.stdout_level = lvl,
124 "fileout" => config.fileout_level = lvl,
125 _ => {
126 config.component_level.insert(Ustr::from(k), lvl);
127 }
128 }
129 }
130 }
131 Ok(config)
132 }
133
134 pub fn from_env() -> anyhow::Result<Self> {
140 let spec = env::var("NAUTILUS_LOG")?;
141 Self::from_spec(&spec)
142 }
143}
144
145#[derive(Debug)]
151pub struct Logger {
152 pub config: LoggerConfig,
154 tx: std::sync::mpsc::Sender<LogEvent>,
156}
157
158#[derive(Clone, Debug)]
160pub enum LogEvent {
161 Log(LogLine),
163 Flush,
165 Close,
167}
168
169#[derive(Clone, Debug, Serialize, Deserialize)]
171pub struct LogLine {
172 pub timestamp: UnixNanos,
174 pub level: Level,
176 pub color: LogColor,
178 pub component: Ustr,
180 pub message: String,
182}
183
184impl Display for LogLine {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
187 }
188}
189
190#[derive(Clone, Debug)]
197pub struct LogLineWrapper {
198 line: LogLine,
200 cache: Option<String>,
202 colored: Option<String>,
204 trader_id: Ustr,
206}
207
208impl LogLineWrapper {
209 #[must_use]
211 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
212 Self {
213 line,
214 cache: None,
215 colored: None,
216 trader_id,
217 }
218 }
219
220 pub fn get_string(&mut self) -> &str {
225 self.cache.get_or_insert_with(|| {
226 format!(
227 "{} [{}] {}.{}: {}\n",
228 unix_nanos_to_iso8601(self.line.timestamp),
229 self.line.level,
230 self.trader_id,
231 &self.line.component,
232 &self.line.message,
233 )
234 })
235 }
236
237 pub fn get_colored(&mut self) -> &str {
243 self.colored.get_or_insert_with(|| {
244 format!(
245 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
246 unix_nanos_to_iso8601(self.line.timestamp),
247 &self.line.color.as_ansi(),
248 self.line.level,
249 self.trader_id,
250 &self.line.component,
251 &self.line.message,
252 )
253 })
254 }
255
256 #[must_use]
265 pub fn get_json(&self) -> String {
266 let json_string =
267 serde_json::to_string(&self).expect("Error serializing log event to string");
268 format!("{json_string}\n")
269 }
270}
271
272impl Serialize for LogLineWrapper {
273 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
274 where
275 S: Serializer,
276 {
277 let mut json_obj = IndexMap::new();
278 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
279 json_obj.insert("timestamp".to_string(), timestamp);
280 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
281 json_obj.insert("level".to_string(), self.line.level.to_string());
282 json_obj.insert("color".to_string(), self.line.color.to_string());
283 json_obj.insert("component".to_string(), self.line.component.to_string());
284 json_obj.insert("message".to_string(), self.line.message.to_string());
285
286 json_obj.serialize(serializer)
287 }
288}
289
290impl Log for Logger {
291 fn enabled(&self, metadata: &log::Metadata) -> bool {
292 !LOGGING_BYPASSED.load(Ordering::Relaxed)
293 && (metadata.level() == Level::Error
294 || metadata.level() <= self.config.stdout_level
295 || metadata.level() <= self.config.fileout_level)
296 }
297
298 fn log(&self, record: &log::Record) {
299 if self.enabled(record.metadata()) {
300 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
301 get_atomic_clock_realtime().get_time_ns()
302 } else {
303 get_atomic_clock_static().get_time_ns()
304 };
305 let level = record.level();
306 let key_values = record.key_values();
307 let color: LogColor = key_values
308 .get("color".into())
309 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
310 .unwrap_or(level.into());
311 let component = key_values.get("component".into()).map_or_else(
312 || Ustr::from(record.metadata().target()),
313 |v| Ustr::from(&v.to_string()),
314 );
315
316 let line = LogLine {
317 timestamp,
318 level,
319 color,
320 component,
321 message: format!("{}", record.args()),
322 };
323 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
324 eprintln!("Error sending log event (receiver closed): {line}");
325 }
326 }
327 }
328
329 fn flush(&self) {
330 if let Err(e) = self.tx.send(LogEvent::Flush) {
331 eprintln!("Error sending flush log event (receiver closed): {e}");
332 }
333 }
334}
335
336#[allow(clippy::too_many_arguments)]
337impl Logger {
338 pub fn init_with_env(
344 trader_id: TraderId,
345 instance_id: UUID4,
346 file_config: FileWriterConfig,
347 ) -> anyhow::Result<LogGuard> {
348 let config = LoggerConfig::from_env()?;
349 Self::init_with_config(trader_id, instance_id, config, file_config)
350 }
351
352 pub fn init_with_config(
367 trader_id: TraderId,
368 instance_id: UUID4,
369 config: LoggerConfig,
370 file_config: FileWriterConfig,
371 ) -> anyhow::Result<LogGuard> {
372 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
373
374 let logger_tx = tx.clone();
375 let logger = Self {
376 tx: logger_tx,
377 config: config.clone(),
378 };
379
380 let print_config = config.print_config;
381 if print_config {
382 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
383 println!("Logger initialized with {config:?} {file_config:?}");
384 }
385
386 let handle: Option<std::thread::JoinHandle<()>>;
387 match set_boxed_logger(Box::new(logger)) {
388 Ok(()) => {
389 handle = Some(
390 std::thread::Builder::new()
391 .name(LOGGING.to_string())
392 .spawn(move || {
393 Self::handle_messages(
394 trader_id.to_string(),
395 instance_id.to_string(),
396 config,
397 file_config,
398 rx,
399 );
400 })?,
401 );
402
403 let max_level = log::LevelFilter::Trace;
404 set_max_level(max_level);
405 if print_config {
406 println!("Logger set as `log` implementation with max level {max_level}");
407 }
408 }
409 Err(e) => {
410 anyhow::bail!("Cannot initialize logger because of error: {e}");
411 }
412 }
413
414 Ok(LogGuard::new(handle, Some(tx)))
415 }
416
417 fn handle_messages(
418 trader_id: String,
419 instance_id: String,
420 config: LoggerConfig,
421 file_config: FileWriterConfig,
422 rx: std::sync::mpsc::Receiver<LogEvent>,
423 ) {
424 let LoggerConfig {
425 stdout_level,
426 fileout_level,
427 ref component_level,
428 is_colored,
429 print_config: _,
430 } = config;
431
432 let trader_id_cache = Ustr::from(&trader_id);
433
434 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
436 let mut stderr_writer = StderrWriter::new(is_colored);
437
438 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
440 None
441 } else {
442 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
443 };
444
445 while let Ok(event) = rx.recv() {
447 match event {
448 LogEvent::Log(line) => {
449 let component_level = component_level.get(&line.component);
450
451 if let Some(&filter_level) = component_level {
454 if line.level > filter_level {
455 continue;
456 }
457 }
458
459 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
460
461 if stderr_writer.enabled(&wrapper.line) {
462 if is_colored {
463 stderr_writer.write(wrapper.get_colored());
464 } else {
465 stderr_writer.write(wrapper.get_string());
466 }
467 }
468
469 if stdout_writer.enabled(&wrapper.line) {
470 if is_colored {
471 stdout_writer.write(wrapper.get_colored());
472 } else {
473 stdout_writer.write(wrapper.get_string());
474 }
475 }
476
477 if let Some(ref mut file_writer) = file_writer_opt {
478 if file_writer.enabled(&wrapper.line) {
479 if file_writer.json_format {
480 file_writer.write(&wrapper.get_json());
481 } else {
482 file_writer.write(wrapper.get_string());
483 }
484 }
485 }
486 }
487 LogEvent::Flush => {
488 stdout_writer.flush();
489 stderr_writer.flush();
490
491 if let Some(ref mut file_writer) = file_writer_opt {
492 file_writer.flush();
493 }
494 }
495 LogEvent::Close => {
496 stdout_writer.flush();
498 stderr_writer.flush();
499
500 if let Some(ref mut file_writer) = file_writer_opt {
501 file_writer.flush();
502 }
503 break;
504 }
505 }
506 }
507 }
508}
509
510pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
511 let color = Value::from(color as u8);
512
513 match level {
514 LogLevel::Off => {}
515 LogLevel::Trace => {
516 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
517 }
518 LogLevel::Debug => {
519 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
520 }
521 LogLevel::Info => {
522 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
523 }
524 LogLevel::Warning => {
525 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
526 }
527 LogLevel::Error => {
528 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
529 }
530 }
531}
532
533#[cfg_attr(
534 feature = "python",
535 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
536)]
537#[derive(Debug)]
538pub struct LogGuard {
539 handle: Option<std::thread::JoinHandle<()>>,
540 tx: Option<std::sync::mpsc::Sender<LogEvent>>,
541}
542
543impl LogGuard {
544 #[must_use]
546 pub const fn new(
547 handle: Option<std::thread::JoinHandle<()>>,
548 tx: Option<std::sync::mpsc::Sender<LogEvent>>,
549 ) -> Self {
550 Self { handle, tx }
551 }
552}
553
554impl Default for LogGuard {
555 fn default() -> Self {
557 Self::new(None, None)
558 }
559}
560
561impl Drop for LogGuard {
562 fn drop(&mut self) {
563 if let Some(tx) = self.tx.take() {
564 let _ = tx.send(LogEvent::Close);
565 }
566
567 if let Some(handle) = self.handle.take() {
568 handle.join().expect("Error joining logging handle");
569 }
570 }
571}
572
573#[cfg(test)]
577mod tests {
578 use std::{collections::HashMap, thread::sleep, time::Duration};
579
580 use log::LevelFilter;
581 use nautilus_core::UUID4;
582 use nautilus_model::identifiers::TraderId;
583 use rstest::*;
584 use serde_json::Value;
585 use tempfile::tempdir;
586 use ustr::Ustr;
587
588 use super::*;
589 use crate::{
590 enums::LogColor,
591 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
592 testing::wait_until,
593 };
594
595 #[rstest]
596 fn log_message_serialization() {
597 let log_message = LogLine {
598 timestamp: UnixNanos::default(),
599 level: log::Level::Info,
600 color: LogColor::Normal,
601 component: Ustr::from("Portfolio"),
602 message: "This is a log message".to_string(),
603 };
604
605 let serialized_json = serde_json::to_string(&log_message).unwrap();
606 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
607
608 assert_eq!(deserialized_value["level"], "INFO");
609 assert_eq!(deserialized_value["component"], "Portfolio");
610 assert_eq!(deserialized_value["message"], "This is a log message");
611 }
612
613 #[rstest]
614 fn log_config_parsing() {
615 let config =
616 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
617 .unwrap();
618 assert_eq!(
619 config,
620 LoggerConfig {
621 stdout_level: LevelFilter::Info,
622 fileout_level: LevelFilter::Debug,
623 component_level: HashMap::from_iter(vec![(
624 Ustr::from("RiskEngine"),
625 LevelFilter::Error
626 )]),
627 is_colored: true,
628 print_config: false,
629 }
630 );
631 }
632
633 #[rstest]
634 fn log_config_parsing2() {
635 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
636 assert_eq!(
637 config,
638 LoggerConfig {
639 stdout_level: LevelFilter::Warn,
640 fileout_level: LevelFilter::Error,
641 component_level: HashMap::new(),
642 is_colored: false,
643 print_config: true,
644 }
645 );
646 }
647
648 #[rstest]
649 fn test_logging_to_file() {
650 let config = LoggerConfig {
651 fileout_level: LevelFilter::Debug,
652 ..Default::default()
653 };
654
655 let temp_dir = tempdir().expect("Failed to create temporary directory");
656 let file_config = FileWriterConfig {
657 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
658 ..Default::default()
659 };
660
661 let log_guard = Logger::init_with_config(
662 TraderId::from("TRADER-001"),
663 UUID4::new(),
664 config,
665 file_config,
666 );
667
668 logging_clock_set_static_mode();
669 logging_clock_set_static_time(1_650_000_000_000_000);
670
671 log::info!(
672 component = "RiskEngine";
673 "This is a test."
674 );
675
676 let mut log_contents = String::new();
677
678 wait_until(
679 || {
680 std::fs::read_dir(&temp_dir)
681 .expect("Failed to read directory")
682 .filter_map(Result::ok)
683 .any(|entry| entry.path().is_file())
684 },
685 Duration::from_secs(3),
686 );
687
688 drop(log_guard); wait_until(
691 || {
692 let log_file_path = std::fs::read_dir(&temp_dir)
693 .expect("Failed to read directory")
694 .filter_map(Result::ok)
695 .find(|entry| entry.path().is_file())
696 .expect("No files found in directory")
697 .path();
698 dbg!(&log_file_path);
699 log_contents =
700 std::fs::read_to_string(log_file_path).expect("Error while reading log file");
701 !log_contents.is_empty()
702 },
703 Duration::from_secs(3),
704 );
705
706 assert_eq!(
707 log_contents,
708 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
709 );
710 }
711
712 #[rstest]
713 fn test_log_component_level_filtering() {
714 let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
715
716 let temp_dir = tempdir().expect("Failed to create temporary directory");
717 let file_config = FileWriterConfig {
718 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
719 ..Default::default()
720 };
721
722 let log_guard = Logger::init_with_config(
723 TraderId::from("TRADER-001"),
724 UUID4::new(),
725 config,
726 file_config,
727 );
728
729 logging_clock_set_static_mode();
730 logging_clock_set_static_time(1_650_000_000_000_000);
731
732 log::info!(
733 component = "RiskEngine";
734 "This is a test."
735 );
736
737 drop(log_guard); wait_until(
740 || {
741 if let Some(log_file) = std::fs::read_dir(&temp_dir)
742 .expect("Failed to read directory")
743 .filter_map(Result::ok)
744 .find(|entry| entry.path().is_file())
745 {
746 let log_file_path = log_file.path();
747 let log_contents = std::fs::read_to_string(log_file_path)
748 .expect("Error while reading log file");
749 !log_contents.contains("RiskEngine")
750 } else {
751 false
752 }
753 },
754 Duration::from_secs(3),
755 );
756
757 assert!(
758 std::fs::read_dir(&temp_dir)
759 .expect("Failed to read directory")
760 .filter_map(Result::ok)
761 .any(|entry| entry.path().is_file()),
762 "Log file exists"
763 );
764 }
765
766 #[rstest]
767 fn test_logging_to_file_in_json_format() {
768 let config =
769 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
770 .unwrap();
771
772 let temp_dir = tempdir().expect("Failed to create temporary directory");
773 let file_config = FileWriterConfig {
774 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
775 file_format: Some("json".to_string()),
776 ..Default::default()
777 };
778
779 let log_guard = Logger::init_with_config(
780 TraderId::from("TRADER-001"),
781 UUID4::new(),
782 config,
783 file_config,
784 );
785
786 logging_clock_set_static_mode();
787 logging_clock_set_static_time(1_650_000_000_000_000);
788
789 log::info!(
790 component = "RiskEngine";
791 "This is a test."
792 );
793
794 let mut log_contents = String::new();
795
796 drop(log_guard); wait_until(
799 || {
800 if let Some(log_file) = std::fs::read_dir(&temp_dir)
801 .expect("Failed to read directory")
802 .filter_map(Result::ok)
803 .find(|entry| entry.path().is_file())
804 {
805 let log_file_path = log_file.path();
806 log_contents = std::fs::read_to_string(log_file_path)
807 .expect("Error while reading log file");
808 !log_contents.is_empty()
809 } else {
810 false
811 }
812 },
813 Duration::from_secs(3),
814 );
815
816 assert_eq!(
817 log_contents,
818 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
819 );
820 }
821
822 #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
823 #[rstest]
824 fn test_file_rotation_and_backup_limits() {
825 let temp_dir = tempdir().expect("Failed to create temporary directory");
827 let dir_path = temp_dir.path().to_str().unwrap().to_string();
828
829 let max_backups = 3;
831 let max_file_size = 100;
832 let file_config = FileWriterConfig {
833 directory: Some(dir_path.clone()),
834 file_name: None,
835 file_format: Some("log".to_string()),
836 file_rotate: Some((max_file_size, max_backups).into()), };
838
839 let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
841 let log_guard = Logger::init_with_config(
842 TraderId::from("TRADER-001"),
843 UUID4::new(),
844 config,
845 file_config,
846 );
847
848 log::info!(
849 component = "Test";
850 "Test log message with enough content to exceed our small max file size limit"
851 );
852
853 sleep(Duration::from_millis(100));
854
855 let files: Vec<_> = std::fs::read_dir(&dir_path)
857 .expect("Failed to read directory")
858 .filter_map(Result::ok)
859 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
860 .collect();
861
862 assert_eq!(files.len(), 1);
864
865 log::info!(
866 component = "Test";
867 "Test log message with enough content to exceed our small max file size limit"
868 );
869
870 sleep(Duration::from_millis(100));
871
872 let files: Vec<_> = std::fs::read_dir(&dir_path)
874 .expect("Failed to read directory")
875 .filter_map(Result::ok)
876 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
877 .collect();
878
879 assert_eq!(files.len(), 2);
881
882 for _ in 0..5 {
883 log::info!(
885 component = "Test";
886 "Test log message with enough content to exceed our small max file size limit"
887 );
888
889 sleep(Duration::from_millis(100));
890 }
891
892 let files: Vec<_> = std::fs::read_dir(&dir_path)
894 .expect("Failed to read directory")
895 .filter_map(Result::ok)
896 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
897 .collect();
898
899 assert!(
901 files.len() == max_backups as usize + 1,
902 "Expected at most {} log files, found {}",
903 max_backups,
904 files.len()
905 );
906
907 drop(log_guard);
909 drop(temp_dir);
910 }
911}