1use std::{
17 collections::HashMap,
18 env,
19 fmt::Display,
20 str::FromStr,
21 sync::{OnceLock, atomic::Ordering, mpsc::SendError},
22};
23
24use indexmap::IndexMap;
25use log::{
26 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
27 kv::{ToValue, Value},
28 set_boxed_logger, set_max_level,
29};
30use nautilus_core::{
31 UUID4, UnixNanos,
32 datetime::unix_nanos_to_iso8601,
33 time::{get_atomic_clock_realtime, get_atomic_clock_static},
34};
35use nautilus_model::identifiers::TraderId;
36use serde::{Deserialize, Serialize, Serializer};
37use ustr::Ustr;
38
39use super::{LOGGING_BYPASSED, LOGGING_REALTIME};
40use crate::{
41 enums::{LogColor, LogLevel},
42 logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
43};
44
45const LOGGING: &str = "logging";
46
47static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
49
50#[cfg_attr(
51 feature = "python",
52 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
53)]
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct LoggerConfig {
56 pub stdout_level: LevelFilter,
58 pub fileout_level: LevelFilter,
60 component_level: HashMap<Ustr, LevelFilter>,
62 pub is_colored: bool,
64 pub print_config: bool,
66}
67
68impl Default for LoggerConfig {
69 fn default() -> Self {
71 Self {
72 stdout_level: LevelFilter::Info,
73 fileout_level: LevelFilter::Off,
74 component_level: HashMap::new(),
75 is_colored: true,
76 print_config: false,
77 }
78 }
79}
80
81impl LoggerConfig {
82 #[must_use]
84 pub const fn new(
85 stdout_level: LevelFilter,
86 fileout_level: LevelFilter,
87 component_level: HashMap<Ustr, LevelFilter>,
88 is_colored: bool,
89 print_config: bool,
90 ) -> Self {
91 Self {
92 stdout_level,
93 fileout_level,
94 component_level,
95 is_colored,
96 print_config,
97 }
98 }
99
100 pub fn from_spec(spec: &str) -> anyhow::Result<Self> {
104 let mut config = Self::default();
105 for kv in spec.split(';') {
106 let kv = kv.trim();
107 if kv.is_empty() {
108 continue;
109 }
110 let kv_lower = kv.to_lowercase(); if kv_lower == "is_colored" {
112 config.is_colored = true;
113 } else if kv_lower == "print_config" {
114 config.print_config = true;
115 } else {
116 let parts: Vec<&str> = kv.split('=').collect();
117 if parts.len() != 2 {
118 anyhow::bail!("Invalid spec pair: {}", kv);
119 }
120 let k = parts[0].trim(); let v = parts[1].trim(); let lvl = LevelFilter::from_str(v)
123 .map_err(|_| anyhow::anyhow!("Invalid log level: {}", v))?;
124 let k_lower = k.to_lowercase(); match k_lower.as_str() {
126 "stdout" => config.stdout_level = lvl,
127 "fileout" => config.fileout_level = lvl,
128 _ => {
129 config.component_level.insert(Ustr::from(k), lvl);
130 }
131 }
132 }
133 }
134 Ok(config)
135 }
136
137 pub fn from_env() -> anyhow::Result<Self> {
143 let spec = env::var("NAUTILUS_LOG")?;
144 Self::from_spec(&spec)
145 }
146}
147
148#[derive(Debug)]
154pub struct Logger {
155 pub config: LoggerConfig,
157 tx: std::sync::mpsc::Sender<LogEvent>,
159}
160
161#[derive(Debug)]
163pub enum LogEvent {
164 Log(LogLine),
166 Flush,
168 Close,
170}
171
172#[derive(Clone, Debug, Serialize, Deserialize)]
174pub struct LogLine {
175 pub timestamp: UnixNanos,
177 pub level: Level,
179 pub color: LogColor,
181 pub component: Ustr,
183 pub message: String,
185}
186
187impl Display for LogLine {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
190 }
191}
192
193#[derive(Clone, Debug)]
200pub struct LogLineWrapper {
201 line: LogLine,
203 cache: Option<String>,
205 colored: Option<String>,
207 trader_id: Ustr,
209}
210
211impl LogLineWrapper {
212 #[must_use]
214 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
215 Self {
216 line,
217 cache: None,
218 colored: None,
219 trader_id,
220 }
221 }
222
223 pub fn get_string(&mut self) -> &str {
228 self.cache.get_or_insert_with(|| {
229 format!(
230 "{} [{}] {}.{}: {}\n",
231 unix_nanos_to_iso8601(self.line.timestamp),
232 self.line.level,
233 self.trader_id,
234 &self.line.component,
235 &self.line.message,
236 )
237 })
238 }
239
240 pub fn get_colored(&mut self) -> &str {
246 self.colored.get_or_insert_with(|| {
247 format!(
248 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
249 unix_nanos_to_iso8601(self.line.timestamp),
250 &self.line.color.as_ansi(),
251 self.line.level,
252 self.trader_id,
253 &self.line.component,
254 &self.line.message,
255 )
256 })
257 }
258
259 #[must_use]
268 pub fn get_json(&self) -> String {
269 let json_string =
270 serde_json::to_string(&self).expect("Error serializing log event to string");
271 format!("{json_string}\n")
272 }
273}
274
275impl Serialize for LogLineWrapper {
276 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
277 where
278 S: Serializer,
279 {
280 let mut json_obj = IndexMap::new();
281 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
282 json_obj.insert("timestamp".to_string(), timestamp);
283 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
284 json_obj.insert("level".to_string(), self.line.level.to_string());
285 json_obj.insert("color".to_string(), self.line.color.to_string());
286 json_obj.insert("component".to_string(), self.line.component.to_string());
287 json_obj.insert("message".to_string(), self.line.message.to_string());
288
289 json_obj.serialize(serializer)
290 }
291}
292
293impl Log for Logger {
294 fn enabled(&self, metadata: &log::Metadata) -> bool {
295 !LOGGING_BYPASSED.load(Ordering::Relaxed)
296 && (metadata.level() == Level::Error
297 || metadata.level() <= self.config.stdout_level
298 || metadata.level() <= self.config.fileout_level)
299 }
300
301 fn log(&self, record: &log::Record) {
302 if self.enabled(record.metadata()) {
303 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
304 get_atomic_clock_realtime().get_time_ns()
305 } else {
306 get_atomic_clock_static().get_time_ns()
307 };
308 let level = record.level();
309 let key_values = record.key_values();
310 let color: LogColor = key_values
311 .get("color".into())
312 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
313 .unwrap_or(level.into());
314 let component = key_values.get("component".into()).map_or_else(
315 || Ustr::from(record.metadata().target()),
316 |v| Ustr::from(&v.to_string()),
317 );
318
319 let line = LogLine {
320 timestamp,
321 level,
322 color,
323 component,
324 message: format!("{}", record.args()),
325 };
326 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
327 eprintln!("Error sending log event (receiver closed): {line}");
328 }
329 }
330 }
331
332 fn flush(&self) {
333 if let Err(e) = self.tx.send(LogEvent::Flush) {
334 eprintln!("Error sending flush log event (receiver closed): {e}");
335 }
336 }
337}
338
339#[allow(clippy::too_many_arguments)]
340impl Logger {
341 pub fn init_with_env(
347 trader_id: TraderId,
348 instance_id: UUID4,
349 file_config: FileWriterConfig,
350 ) -> anyhow::Result<LogGuard> {
351 let config = LoggerConfig::from_env()?;
352 Self::init_with_config(trader_id, instance_id, config, file_config)
353 }
354
355 pub fn init_with_config(
370 trader_id: TraderId,
371 instance_id: UUID4,
372 config: LoggerConfig,
373 file_config: FileWriterConfig,
374 ) -> anyhow::Result<LogGuard> {
375 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
376
377 let logger_tx = tx.clone();
378 let logger = Self {
379 tx: logger_tx,
380 config: config.clone(),
381 };
382
383 set_boxed_logger(Box::new(logger))?;
384
385 LOGGER_TX.set(tx.clone()).ok(); let print_config = config.print_config;
388 if print_config {
389 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
390 println!("Logger initialized with {config:?} {file_config:?}");
391 }
392
393 let handle = std::thread::Builder::new()
394 .name(LOGGING.to_string())
395 .spawn(move || {
396 Self::handle_messages(
397 trader_id.to_string(),
398 instance_id.to_string(),
399 config,
400 file_config,
401 rx,
402 );
403 })?;
404
405 let max_level = log::LevelFilter::Trace;
406 set_max_level(max_level);
407
408 if print_config {
409 println!("Logger set as `log` implementation with max level {max_level}");
410 }
411
412 Ok(LogGuard::new(Some(handle), Some(tx)))
413 }
414
415 fn handle_messages(
416 trader_id: String,
417 instance_id: String,
418 config: LoggerConfig,
419 file_config: FileWriterConfig,
420 rx: std::sync::mpsc::Receiver<LogEvent>,
421 ) {
422 let LoggerConfig {
423 stdout_level,
424 fileout_level,
425 component_level,
426 is_colored,
427 print_config: _,
428 } = config;
429
430 let trader_id_cache = Ustr::from(&trader_id);
431
432 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
434 let mut stderr_writer = StderrWriter::new(is_colored);
435
436 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
438 None
439 } else {
440 FileWriter::new(
441 trader_id.clone(),
442 instance_id.clone(),
443 file_config.clone(),
444 fileout_level,
445 )
446 };
447
448 while let Ok(event) = rx.recv() {
450 match event {
451 LogEvent::Log(line) => {
452 if let Some(&filter_level) = component_level.get(&line.component) {
453 if line.level > filter_level {
454 continue;
455 }
456 }
457
458 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
459
460 if stderr_writer.enabled(&wrapper.line) {
461 if is_colored {
462 stderr_writer.write(wrapper.get_colored());
463 } else {
464 stderr_writer.write(wrapper.get_string());
465 }
466 }
467
468 if stdout_writer.enabled(&wrapper.line) {
469 if is_colored {
470 stdout_writer.write(wrapper.get_colored());
471 } else {
472 stdout_writer.write(wrapper.get_string());
473 }
474 }
475
476 if let Some(ref mut file_writer) = file_writer_opt {
477 if file_writer.enabled(&wrapper.line) {
478 if file_writer.json_format {
479 file_writer.write(&wrapper.get_json());
480 } else {
481 file_writer.write(wrapper.get_string());
482 }
483 }
484 }
485 }
486 LogEvent::Flush => {
487 stdout_writer.flush();
488 stderr_writer.flush();
489
490 if let Some(ref mut file_writer) = file_writer_opt {
491 file_writer.flush();
492 }
493 }
494 LogEvent::Close => {
495 stdout_writer.flush();
497 stderr_writer.flush();
498
499 if let Some(ref mut file_writer) = file_writer_opt {
500 file_writer.flush();
501 }
502
503 break;
504 }
505 }
506 }
507 }
508}
509
510pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
511 let color = Value::from(color as u8);
512
513 match level {
514 LogLevel::Off => {}
515 LogLevel::Trace => {
516 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
517 }
518 LogLevel::Debug => {
519 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
520 }
521 LogLevel::Info => {
522 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
523 }
524 LogLevel::Warning => {
525 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
526 }
527 LogLevel::Error => {
528 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
529 }
530 }
531}
532
533#[cfg_attr(
534 feature = "python",
535 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.common")
536)]
537#[derive(Debug)]
538pub struct LogGuard {
539 handle: Option<std::thread::JoinHandle<()>>,
540 tx: Option<std::sync::mpsc::Sender<LogEvent>>,
541}
542
543impl LogGuard {
544 #[must_use]
546 pub const fn new(
547 handle: Option<std::thread::JoinHandle<()>>,
548 tx: Option<std::sync::mpsc::Sender<LogEvent>>,
549 ) -> Self {
550 Self { handle, tx }
551 }
552}
553
554impl Default for LogGuard {
555 fn default() -> Self {
557 Self::new(None, None)
558 }
559}
560
561impl Drop for LogGuard {
562 fn drop(&mut self) {
563 if let Some(tx) = self.tx.take() {
569 let _ = tx.send(LogEvent::Flush);
570 }
571
572 self.handle.take();
575 }
576}
577
578#[cfg(test)]
582mod tests {
583 use std::{collections::HashMap, thread::sleep, time::Duration};
584
585 use log::LevelFilter;
586 use nautilus_core::UUID4;
587 use nautilus_model::identifiers::TraderId;
588 use rstest::*;
589 use serde_json::Value;
590 use tempfile::tempdir;
591 use ustr::Ustr;
592
593 use super::*;
594 use crate::{
595 enums::LogColor,
596 logging::{logging_clock_set_static_mode, logging_clock_set_static_time},
597 testing::wait_until,
598 };
599
600 #[rstest]
601 fn log_message_serialization() {
602 let log_message = LogLine {
603 timestamp: UnixNanos::default(),
604 level: log::Level::Info,
605 color: LogColor::Normal,
606 component: Ustr::from("Portfolio"),
607 message: "This is a log message".to_string(),
608 };
609
610 let serialized_json = serde_json::to_string(&log_message).unwrap();
611 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
612
613 assert_eq!(deserialized_value["level"], "INFO");
614 assert_eq!(deserialized_value["component"], "Portfolio");
615 assert_eq!(deserialized_value["message"], "This is a log message");
616 }
617
618 #[rstest]
619 fn log_config_parsing() {
620 let config =
621 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
622 .unwrap();
623 assert_eq!(
624 config,
625 LoggerConfig {
626 stdout_level: LevelFilter::Info,
627 fileout_level: LevelFilter::Debug,
628 component_level: HashMap::from_iter(vec![(
629 Ustr::from("RiskEngine"),
630 LevelFilter::Error
631 )]),
632 is_colored: true,
633 print_config: false,
634 }
635 );
636 }
637
638 #[rstest]
639 fn log_config_parsing2() {
640 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
641 assert_eq!(
642 config,
643 LoggerConfig {
644 stdout_level: LevelFilter::Warn,
645 fileout_level: LevelFilter::Error,
646 component_level: HashMap::new(),
647 is_colored: true,
648 print_config: true,
649 }
650 );
651 }
652
653 #[rstest]
654 fn test_logging_to_file() {
655 let config = LoggerConfig {
656 fileout_level: LevelFilter::Debug,
657 ..Default::default()
658 };
659
660 let temp_dir = tempdir().expect("Failed to create temporary directory");
661 let file_config = FileWriterConfig {
662 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
663 ..Default::default()
664 };
665
666 let log_guard = Logger::init_with_config(
667 TraderId::from("TRADER-001"),
668 UUID4::new(),
669 config,
670 file_config,
671 );
672
673 logging_clock_set_static_mode();
674 logging_clock_set_static_time(1_650_000_000_000_000);
675
676 log::info!(
677 component = "RiskEngine";
678 "This is a test."
679 );
680
681 let mut log_contents = String::new();
682
683 wait_until(
684 || {
685 std::fs::read_dir(&temp_dir)
686 .expect("Failed to read directory")
687 .filter_map(Result::ok)
688 .any(|entry| entry.path().is_file())
689 },
690 Duration::from_secs(3),
691 );
692
693 drop(log_guard); wait_until(
696 || {
697 let log_file_path = std::fs::read_dir(&temp_dir)
698 .expect("Failed to read directory")
699 .filter_map(Result::ok)
700 .find(|entry| entry.path().is_file())
701 .expect("No files found in directory")
702 .path();
703 dbg!(&log_file_path);
704 log_contents =
705 std::fs::read_to_string(log_file_path).expect("Error while reading log file");
706 !log_contents.is_empty()
707 },
708 Duration::from_secs(3),
709 );
710
711 assert_eq!(
712 log_contents,
713 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test.\n"
714 );
715 }
716
717 #[rstest]
718 fn test_log_component_level_filtering() {
719 let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
720
721 let temp_dir = tempdir().expect("Failed to create temporary directory");
722 let file_config = FileWriterConfig {
723 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
724 ..Default::default()
725 };
726
727 let log_guard = Logger::init_with_config(
728 TraderId::from("TRADER-001"),
729 UUID4::new(),
730 config,
731 file_config,
732 );
733
734 logging_clock_set_static_mode();
735 logging_clock_set_static_time(1_650_000_000_000_000);
736
737 log::info!(
738 component = "RiskEngine";
739 "This is a test."
740 );
741
742 drop(log_guard); wait_until(
745 || {
746 if let Some(log_file) = std::fs::read_dir(&temp_dir)
747 .expect("Failed to read directory")
748 .filter_map(Result::ok)
749 .find(|entry| entry.path().is_file())
750 {
751 let log_file_path = log_file.path();
752 let log_contents = std::fs::read_to_string(log_file_path)
753 .expect("Error while reading log file");
754 !log_contents.contains("RiskEngine")
755 } else {
756 false
757 }
758 },
759 Duration::from_secs(3),
760 );
761
762 assert!(
763 std::fs::read_dir(&temp_dir)
764 .expect("Failed to read directory")
765 .filter_map(Result::ok)
766 .any(|entry| entry.path().is_file()),
767 "Log file exists"
768 );
769 }
770
771 #[rstest]
772 fn test_logging_to_file_in_json_format() {
773 let config =
774 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
775 .unwrap();
776
777 let temp_dir = tempdir().expect("Failed to create temporary directory");
778 let file_config = FileWriterConfig {
779 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
780 file_format: Some("json".to_string()),
781 ..Default::default()
782 };
783
784 let log_guard = Logger::init_with_config(
785 TraderId::from("TRADER-001"),
786 UUID4::new(),
787 config,
788 file_config,
789 );
790
791 logging_clock_set_static_mode();
792 logging_clock_set_static_time(1_650_000_000_000_000);
793
794 log::info!(
795 component = "RiskEngine";
796 "This is a test."
797 );
798
799 let mut log_contents = String::new();
800
801 drop(log_guard); wait_until(
804 || {
805 if let Some(log_file) = std::fs::read_dir(&temp_dir)
806 .expect("Failed to read directory")
807 .filter_map(Result::ok)
808 .find(|entry| entry.path().is_file())
809 {
810 let log_file_path = log_file.path();
811 log_contents = std::fs::read_to_string(log_file_path)
812 .expect("Error while reading log file");
813 !log_contents.is_empty()
814 } else {
815 false
816 }
817 },
818 Duration::from_secs(3),
819 );
820
821 assert_eq!(
822 log_contents,
823 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test.\"}\n"
824 );
825 }
826
827 #[ignore = "Flaky test: Passing locally on some systems, failing in CI"]
828 #[rstest]
829 fn test_file_rotation_and_backup_limits() {
830 let temp_dir = tempdir().expect("Failed to create temporary directory");
832 let dir_path = temp_dir.path().to_str().unwrap().to_string();
833
834 let max_backups = 3;
836 let max_file_size = 100;
837 let file_config = FileWriterConfig {
838 directory: Some(dir_path.clone()),
839 file_name: None,
840 file_format: Some("log".to_string()),
841 file_rotate: Some((max_file_size, max_backups).into()), };
843
844 let config = LoggerConfig::from_spec("fileout=Info;Test=Info").unwrap();
846 let log_guard = Logger::init_with_config(
847 TraderId::from("TRADER-001"),
848 UUID4::new(),
849 config,
850 file_config,
851 );
852
853 log::info!(
854 component = "Test";
855 "Test log message with enough content to exceed our small max file size limit"
856 );
857
858 sleep(Duration::from_millis(100));
859
860 let files: Vec<_> = std::fs::read_dir(&dir_path)
862 .expect("Failed to read directory")
863 .filter_map(Result::ok)
864 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
865 .collect();
866
867 assert_eq!(files.len(), 1);
869
870 log::info!(
871 component = "Test";
872 "Test log message with enough content to exceed our small max file size limit"
873 );
874
875 sleep(Duration::from_millis(100));
876
877 let files: Vec<_> = std::fs::read_dir(&dir_path)
879 .expect("Failed to read directory")
880 .filter_map(Result::ok)
881 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
882 .collect();
883
884 assert_eq!(files.len(), 2);
886
887 for _ in 0..5 {
888 log::info!(
890 component = "Test";
891 "Test log message with enough content to exceed our small max file size limit"
892 );
893
894 sleep(Duration::from_millis(100));
895 }
896
897 let files: Vec<_> = std::fs::read_dir(&dir_path)
899 .expect("Failed to read directory")
900 .filter_map(Result::ok)
901 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "log"))
902 .collect();
903
904 assert!(
906 files.len() == max_backups as usize + 1,
907 "Expected at most {} log files, found {}",
908 max_backups,
909 files.len()
910 );
911
912 drop(log_guard);
914 drop(temp_dir);
915 }
916}