1use std::{
17 cell::RefCell,
18 collections::HashMap,
19 fmt::{self, Display},
20 hash::{Hash, Hasher},
21 ops::Deref,
22 rc::Rc,
23};
24
25use ahash::{AHashMap, AHashSet};
26use handler::ShareableMessageHandler;
27use indexmap::IndexMap;
28use matching::is_matching_backtracking;
29use nautilus_core::{
30 UUID4,
31 correctness::{FAILED, check_predicate_true, check_valid_string},
32};
33use nautilus_model::identifiers::TraderId;
34use switchboard::MessagingSwitchboard;
35use ustr::Ustr;
36
37use super::{handler, matching, set_message_bus, switchboard};
38
39#[inline(always)]
40fn check_fully_qualified_string(value: &Ustr, key: &str) -> anyhow::Result<()> {
41 check_predicate_true(
42 !value.chars().any(|c| c == '*' || c == '?'),
43 &format!("{key} `value` contained invalid characters, was {value}"),
44 )
45}
46
47#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
49pub struct Pattern;
50
51#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
53pub struct Topic;
54
55#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
57pub struct Endpoint;
58
59#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
61pub struct MStr<T> {
62 value: Ustr,
63 _marker: std::marker::PhantomData<T>,
64}
65
66impl<T> Display for MStr<T> {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 write!(f, "{}", self.value)
69 }
70}
71
72impl<T> Deref for MStr<T> {
73 type Target = Ustr;
74
75 fn deref(&self) -> &Self::Target {
76 &self.value
77 }
78}
79
80impl MStr<Pattern> {
81 pub fn pattern<T: AsRef<str>>(value: T) -> Self {
83 let value = Ustr::from(value.as_ref());
84
85 Self {
86 value,
87 _marker: std::marker::PhantomData,
88 }
89 }
90}
91
92impl<T: AsRef<str>> From<T> for MStr<Pattern> {
93 fn from(value: T) -> Self {
94 Self::pattern(value)
95 }
96}
97
98impl From<MStr<Topic>> for MStr<Pattern> {
99 fn from(value: MStr<Topic>) -> Self {
100 Self {
101 value: value.value,
102 _marker: std::marker::PhantomData,
103 }
104 }
105}
106
107impl MStr<Topic> {
108 pub fn topic<T: AsRef<str>>(value: T) -> anyhow::Result<Self> {
114 let topic = Ustr::from(value.as_ref());
115 check_valid_string(value, stringify!(value))?;
116 check_fully_qualified_string(&topic, stringify!(Topic))?;
117
118 Ok(Self {
119 value: topic,
120 _marker: std::marker::PhantomData,
121 })
122 }
123}
124
125impl<T: AsRef<str>> From<T> for MStr<Topic> {
126 fn from(value: T) -> Self {
127 Self::topic(value).expect(FAILED)
128 }
129}
130
131impl MStr<Endpoint> {
132 pub fn endpoint<T: AsRef<str>>(value: T) -> anyhow::Result<Self> {
138 let endpoint = Ustr::from(value.as_ref());
139 check_valid_string(value, stringify!(value))?;
140 check_fully_qualified_string(&endpoint, stringify!(Endpoint))?;
141
142 Ok(Self {
143 value: endpoint,
144 _marker: std::marker::PhantomData,
145 })
146 }
147}
148
149impl<T: AsRef<str>> From<T> for MStr<Endpoint> {
150 fn from(value: T) -> Self {
151 Self::endpoint(value).expect(FAILED)
152 }
153}
154
155#[derive(Clone, Debug)]
161pub struct Subscription {
162 pub handler: ShareableMessageHandler,
164 pub handler_id: Ustr,
166 pub pattern: MStr<Pattern>,
168 pub priority: u8,
172}
173
174impl Subscription {
175 #[must_use]
177 pub fn new(
178 pattern: MStr<Pattern>,
179 handler: ShareableMessageHandler,
180 priority: Option<u8>,
181 ) -> Self {
182 Self {
183 handler_id: handler.0.id(),
184 pattern,
185 handler,
186 priority: priority.unwrap_or(0),
187 }
188 }
189}
190
191impl PartialEq<Self> for Subscription {
192 fn eq(&self, other: &Self) -> bool {
193 self.pattern == other.pattern && self.handler_id == other.handler_id
194 }
195}
196
197impl Eq for Subscription {}
198
199impl PartialOrd for Subscription {
200 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
201 Some(self.cmp(other))
202 }
203}
204
205impl Ord for Subscription {
206 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
207 other
208 .priority
209 .cmp(&self.priority)
210 .then_with(|| self.pattern.cmp(&other.pattern))
211 .then_with(|| self.handler_id.cmp(&other.handler_id))
212 }
213}
214
215impl Hash for Subscription {
216 fn hash<H: Hasher>(&self, state: &mut H) {
217 self.pattern.hash(state);
218 self.handler_id.hash(state);
219 }
220}
221
222#[derive(Debug)]
243pub struct MessageBus {
244 pub trader_id: TraderId,
246 pub instance_id: UUID4,
248 pub name: String,
250 pub has_backing: bool,
252 pub switchboard: MessagingSwitchboard,
254 pub subscriptions: AHashSet<Subscription>,
256 pub topics: IndexMap<MStr<Topic>, Vec<Subscription>>,
259 pub endpoints: IndexMap<MStr<Endpoint>, ShareableMessageHandler>,
261 pub correlation_index: AHashMap<UUID4, ShareableMessageHandler>,
263}
264
265impl MessageBus {
271 #[must_use]
273 pub fn new(
274 trader_id: TraderId,
275 instance_id: UUID4,
276 name: Option<String>,
277 _config: Option<HashMap<String, serde_json::Value>>,
278 ) -> Self {
279 Self {
280 trader_id,
281 instance_id,
282 name: name.unwrap_or(stringify!(MessageBus).to_owned()),
283 switchboard: MessagingSwitchboard::default(),
284 subscriptions: AHashSet::new(),
285 topics: IndexMap::new(),
286 endpoints: IndexMap::new(),
287 correlation_index: AHashMap::new(),
288 has_backing: false,
289 }
290 }
291
292 #[must_use]
294 pub fn memory_address(&self) -> String {
295 format!("{:?}", std::ptr::from_ref(self))
296 }
297
298 #[must_use]
300 pub fn endpoints(&self) -> Vec<&str> {
301 self.endpoints.iter().map(|e| e.0.as_str()).collect()
302 }
303
304 #[must_use]
306 pub fn patterns(&self) -> Vec<&str> {
307 self.subscriptions
308 .iter()
309 .map(|s| s.pattern.as_str())
310 .collect()
311 }
312
313 pub fn has_subscribers<T: AsRef<str>>(&self, topic: T) -> bool {
315 self.subscriptions_count(topic) > 0
316 }
317
318 #[must_use]
324 pub fn subscriptions_count<T: AsRef<str>>(&self, topic: T) -> usize {
325 let topic = MStr::<Topic>::topic(topic).expect(FAILED);
326 self.topics
327 .get(&topic)
328 .map(|subs| subs.len())
329 .unwrap_or_else(|| self.find_topic_matches(topic).len())
330 }
331
332 #[must_use]
334 pub fn subscriptions(&self) -> Vec<&Subscription> {
335 self.subscriptions.iter().collect()
336 }
337
338 #[must_use]
340 pub fn subscription_handler_ids(&self) -> Vec<&str> {
341 self.subscriptions
342 .iter()
343 .map(|s| s.handler_id.as_str())
344 .collect()
345 }
346
347 #[must_use]
353 pub fn is_registered<T: AsRef<str>>(&self, endpoint: T) -> bool {
354 let endpoint: MStr<Endpoint> = endpoint.into();
355 self.endpoints.contains_key(&endpoint)
356 }
357
358 #[must_use]
360 pub fn is_subscribed<T: AsRef<str>>(
361 &self,
362 pattern: T,
363 handler: ShareableMessageHandler,
364 ) -> bool {
365 let pattern = MStr::<Pattern>::pattern(pattern);
366 let sub = Subscription::new(pattern, handler, None);
367 self.subscriptions.contains(&sub)
368 }
369
370 pub const fn close(&self) -> anyhow::Result<()> {
376 Ok(())
378 }
379
380 #[must_use]
382 pub fn get_endpoint(&self, endpoint: MStr<Endpoint>) -> Option<&ShareableMessageHandler> {
383 self.endpoints.get(&endpoint)
384 }
385
386 #[must_use]
388 pub fn get_response_handler(&self, correlation_id: &UUID4) -> Option<&ShareableMessageHandler> {
389 self.correlation_index.get(correlation_id)
390 }
391
392 pub(crate) fn find_topic_matches(&self, topic: MStr<Topic>) -> Vec<Subscription> {
394 self.subscriptions
395 .iter()
396 .filter_map(|sub| {
397 if is_matching_backtracking(topic, sub.pattern) {
398 Some(sub.clone())
399 } else {
400 None
401 }
402 })
403 .collect()
404 }
405
406 #[must_use]
409 pub fn matching_subscriptions<T: AsRef<str>>(&mut self, topic: T) -> Vec<Subscription> {
410 let topic = MStr::<Topic>::from(topic);
411 self.inner_matching_subscriptions(topic)
412 }
413
414 pub(crate) fn inner_matching_subscriptions(&mut self, topic: MStr<Topic>) -> Vec<Subscription> {
415 self.topics.get(&topic).cloned().unwrap_or_else(|| {
416 let mut matches = self.find_topic_matches(topic);
417 matches.sort();
418 self.topics.insert(topic, matches.clone());
419 matches
420 })
421 }
422
423 pub fn register_response_handler(
429 &mut self,
430 correlation_id: &UUID4,
431 handler: ShareableMessageHandler,
432 ) -> anyhow::Result<()> {
433 if self.correlation_index.contains_key(correlation_id) {
434 anyhow::bail!("Correlation ID <{correlation_id}> already has a registered handler");
435 }
436
437 self.correlation_index.insert(*correlation_id, handler);
438
439 Ok(())
440 }
441}
442
443impl MessageBus {
445 pub fn register_message_bus(self) -> Rc<RefCell<MessageBus>> {
462 let msgbus = Rc::new(RefCell::new(self));
463 set_message_bus(msgbus.clone());
464 msgbus
465 }
466}
467
468impl Default for MessageBus {
469 fn default() -> Self {
471 Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
472 }
473}