nautilus_common/msgbus/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 2Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    collections::HashMap,
19    fmt::{self, Display},
20    hash::{Hash, Hasher},
21    ops::Deref,
22    rc::Rc,
23};
24
25use ahash::{AHashMap, AHashSet};
26use handler::ShareableMessageHandler;
27use indexmap::IndexMap;
28use matching::is_matching_backtracking;
29use nautilus_core::{
30    UUID4,
31    correctness::{FAILED, check_predicate_true, check_valid_string},
32};
33use nautilus_model::identifiers::TraderId;
34use switchboard::MessagingSwitchboard;
35use ustr::Ustr;
36
37use super::{handler, matching, set_message_bus, switchboard};
38
39#[inline(always)]
40fn check_fully_qualified_string(value: &Ustr, key: &str) -> anyhow::Result<()> {
41    check_predicate_true(
42        !value.chars().any(|c| c == '*' || c == '?'),
43        &format!("{key} `value` contained invalid characters, was {value}"),
44    )
45}
46
47/// Pattern is a string pattern for a subscription with special characters for pattern matching.
48#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
49pub struct Pattern;
50
51/// Topic is a fully qualified string for publishing data.
52#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
53pub struct Topic;
54
55/// Endpoint is a fully qualified string for sending data.
56#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
57pub struct Endpoint;
58
59/// A message bus string type. It can be a pattern or a topic.
60#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
61pub struct MStr<T> {
62    value: Ustr,
63    _marker: std::marker::PhantomData<T>,
64}
65
66impl<T> Display for MStr<T> {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        write!(f, "{}", self.value)
69    }
70}
71
72impl<T> Deref for MStr<T> {
73    type Target = Ustr;
74
75    fn deref(&self) -> &Self::Target {
76        &self.value
77    }
78}
79
80impl MStr<Pattern> {
81    /// Create a new pattern from a string.
82    pub fn pattern<T: AsRef<str>>(value: T) -> Self {
83        let value = Ustr::from(value.as_ref());
84
85        Self {
86            value,
87            _marker: std::marker::PhantomData,
88        }
89    }
90}
91
92impl<T: AsRef<str>> From<T> for MStr<Pattern> {
93    fn from(value: T) -> Self {
94        Self::pattern(value)
95    }
96}
97
98impl From<MStr<Topic>> for MStr<Pattern> {
99    fn from(value: MStr<Topic>) -> Self {
100        Self {
101            value: value.value,
102            _marker: std::marker::PhantomData,
103        }
104    }
105}
106
107impl MStr<Topic> {
108    /// Create a new topic from a fully qualified string.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the topic has white space or invalid characters.
113    pub fn topic<T: AsRef<str>>(value: T) -> anyhow::Result<Self> {
114        let topic = Ustr::from(value.as_ref());
115        check_valid_string(value, stringify!(value))?;
116        check_fully_qualified_string(&topic, stringify!(Topic))?;
117
118        Ok(Self {
119            value: topic,
120            _marker: std::marker::PhantomData,
121        })
122    }
123}
124
125impl<T: AsRef<str>> From<T> for MStr<Topic> {
126    fn from(value: T) -> Self {
127        Self::topic(value).expect(FAILED)
128    }
129}
130
131impl MStr<Endpoint> {
132    /// Create a new endpoint from a fully qualified string.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the endpoint has white space or invalid characters.
137    pub fn endpoint<T: AsRef<str>>(value: T) -> anyhow::Result<Self> {
138        let endpoint = Ustr::from(value.as_ref());
139        check_valid_string(value, stringify!(value))?;
140        check_fully_qualified_string(&endpoint, stringify!(Endpoint))?;
141
142        Ok(Self {
143            value: endpoint,
144            _marker: std::marker::PhantomData,
145        })
146    }
147}
148
149impl<T: AsRef<str>> From<T> for MStr<Endpoint> {
150    fn from(value: T) -> Self {
151        Self::endpoint(value).expect(FAILED)
152    }
153}
154
155/// Represents a subscription to a particular topic.
156///
157/// This is an internal class intended to be used by the message bus to organize
158/// topics and their subscribers.
159///
160#[derive(Clone, Debug)]
161pub struct Subscription {
162    /// The shareable message handler for the subscription.
163    pub handler: ShareableMessageHandler,
164    /// Store a copy of the handler ID for faster equality checks.
165    pub handler_id: Ustr,
166    /// The pattern for the subscription.
167    pub pattern: MStr<Pattern>,
168    /// The priority for the subscription determines the ordering of handlers receiving
169    /// messages being processed, higher priority handlers will receive messages before
170    /// lower priority handlers.
171    pub priority: u8,
172}
173
174impl Subscription {
175    /// Creates a new [`Subscription`] instance.
176    #[must_use]
177    pub fn new(
178        pattern: MStr<Pattern>,
179        handler: ShareableMessageHandler,
180        priority: Option<u8>,
181    ) -> Self {
182        Self {
183            handler_id: handler.0.id(),
184            pattern,
185            handler,
186            priority: priority.unwrap_or(0),
187        }
188    }
189}
190
191impl PartialEq<Self> for Subscription {
192    fn eq(&self, other: &Self) -> bool {
193        self.pattern == other.pattern && self.handler_id == other.handler_id
194    }
195}
196
197impl Eq for Subscription {}
198
199impl PartialOrd for Subscription {
200    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
201        Some(self.cmp(other))
202    }
203}
204
205impl Ord for Subscription {
206    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
207        other
208            .priority
209            .cmp(&self.priority)
210            .then_with(|| self.pattern.cmp(&other.pattern))
211            .then_with(|| self.handler_id.cmp(&other.handler_id))
212    }
213}
214
215impl Hash for Subscription {
216    fn hash<H: Hasher>(&self, state: &mut H) {
217        self.pattern.hash(state);
218        self.handler_id.hash(state);
219    }
220}
221
222/// A generic message bus to facilitate various messaging patterns.
223///
224/// The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
225/// well as direct point-to-point messaging to registered endpoints.
226///
227/// Pub/Sub wildcard patterns for hierarchical topics are possible:
228///  - `*` asterisk represents one or more characters in a pattern.
229///  - `?` question mark represents a single character in a pattern.
230///
231/// Given a topic and pattern potentially containing wildcard characters, i.e.
232/// `*` and `?`, where `?` can match any single character in the topic, and `*`
233/// can match any number of characters including zero characters.
234///
235/// The asterisk in a wildcard matches any character zero or more times. For
236/// example, `comp*` matches anything beginning with `comp` which means `comp`,
237/// `complete`, and `computer` are all matched.
238///
239/// A question mark matches a single character once. For example, `c?mp` matches
240/// `camp` and `comp`. The question mark can also be used more than once.
241/// For example, `c??p` would match both of the above examples and `coop`.
242#[derive(Debug)]
243pub struct MessageBus {
244    /// The trader ID associated with the message bus.
245    pub trader_id: TraderId,
246    /// The instance ID associated with the message bus.
247    pub instance_id: UUID4,
248    /// The name for the message bus.
249    pub name: String,
250    /// If the message bus is backed by a database.
251    pub has_backing: bool,
252    /// The switchboard for built-in endpoints.
253    pub switchboard: MessagingSwitchboard,
254    /// Active subscriptions.
255    pub subscriptions: AHashSet<Subscription>,
256    /// Maps a topic to all the handlers registered for it
257    /// this is updated whenever a new subscription is created.
258    pub topics: IndexMap<MStr<Topic>, Vec<Subscription>>,
259    /// Index of endpoint addresses and their handlers.
260    pub endpoints: IndexMap<MStr<Endpoint>, ShareableMessageHandler>,
261    /// Index of request correlation IDs and their response handlers.
262    pub correlation_index: AHashMap<UUID4, ShareableMessageHandler>,
263}
264
265// MessageBus is designed for single-threaded use within each async runtime.
266// Thread-local storage ensures each thread gets its own instance, eliminating
267// the need for unsafe Send/Sync implementations that were previously required
268// for global static storage.
269
270impl MessageBus {
271    /// Creates a new [`MessageBus`] instance.
272    #[must_use]
273    pub fn new(
274        trader_id: TraderId,
275        instance_id: UUID4,
276        name: Option<String>,
277        _config: Option<HashMap<String, serde_json::Value>>,
278    ) -> Self {
279        Self {
280            trader_id,
281            instance_id,
282            name: name.unwrap_or(stringify!(MessageBus).to_owned()),
283            switchboard: MessagingSwitchboard::default(),
284            subscriptions: AHashSet::new(),
285            topics: IndexMap::new(),
286            endpoints: IndexMap::new(),
287            correlation_index: AHashMap::new(),
288            has_backing: false,
289        }
290    }
291
292    /// Returns the message bus instances memory address.
293    #[must_use]
294    pub fn memory_address(&self) -> String {
295        format!("{:?}", std::ptr::from_ref(self))
296    }
297
298    /// Returns the registered endpoint addresses.
299    #[must_use]
300    pub fn endpoints(&self) -> Vec<&str> {
301        self.endpoints.iter().map(|e| e.0.as_str()).collect()
302    }
303
304    /// Returns actively subscribed patterns.
305    #[must_use]
306    pub fn patterns(&self) -> Vec<&str> {
307        self.subscriptions
308            .iter()
309            .map(|s| s.pattern.as_str())
310            .collect()
311    }
312
313    /// Returns whether there are subscribers for the `topic`.
314    pub fn has_subscribers<T: AsRef<str>>(&self, topic: T) -> bool {
315        self.subscriptions_count(topic) > 0
316    }
317
318    /// Returns the count of subscribers for the `topic`.
319    ///
320    /// # Panics
321    ///
322    /// Returns an error if the topic is not valid.
323    #[must_use]
324    pub fn subscriptions_count<T: AsRef<str>>(&self, topic: T) -> usize {
325        let topic = MStr::<Topic>::topic(topic).expect(FAILED);
326        self.topics
327            .get(&topic)
328            .map(|subs| subs.len())
329            .unwrap_or_else(|| self.find_topic_matches(topic).len())
330    }
331
332    /// Returns active subscriptions.
333    #[must_use]
334    pub fn subscriptions(&self) -> Vec<&Subscription> {
335        self.subscriptions.iter().collect()
336    }
337
338    /// Returns the handler IDs for actively subscribed patterns.
339    #[must_use]
340    pub fn subscription_handler_ids(&self) -> Vec<&str> {
341        self.subscriptions
342            .iter()
343            .map(|s| s.handler_id.as_str())
344            .collect()
345    }
346
347    /// Returns whether the endpoint is registered.
348    ///
349    /// # Panics
350    ///
351    /// Returns an error if the endpoint is not valid topic string.
352    #[must_use]
353    pub fn is_registered<T: AsRef<str>>(&self, endpoint: T) -> bool {
354        let endpoint: MStr<Endpoint> = endpoint.into();
355        self.endpoints.contains_key(&endpoint)
356    }
357
358    /// Returns whether the `handler` is subscribed to the `pattern`.
359    #[must_use]
360    pub fn is_subscribed<T: AsRef<str>>(
361        &self,
362        pattern: T,
363        handler: ShareableMessageHandler,
364    ) -> bool {
365        let pattern = MStr::<Pattern>::pattern(pattern);
366        let sub = Subscription::new(pattern, handler, None);
367        self.subscriptions.contains(&sub)
368    }
369
370    /// Close the message bus which will close the sender channel and join the thread.
371    ///
372    /// # Errors
373    ///
374    /// This function never returns an error (TBD once backing database added).
375    pub const fn close(&self) -> anyhow::Result<()> {
376        // TODO: Integrate the backing database
377        Ok(())
378    }
379
380    /// Returns the handler for the `endpoint`.
381    #[must_use]
382    pub fn get_endpoint(&self, endpoint: MStr<Endpoint>) -> Option<&ShareableMessageHandler> {
383        self.endpoints.get(&endpoint)
384    }
385
386    /// Returns the handler for the `correlation_id`.
387    #[must_use]
388    pub fn get_response_handler(&self, correlation_id: &UUID4) -> Option<&ShareableMessageHandler> {
389        self.correlation_index.get(correlation_id)
390    }
391
392    /// Finds the subscriptions with pattern matching the `topic`.
393    pub(crate) fn find_topic_matches(&self, topic: MStr<Topic>) -> Vec<Subscription> {
394        self.subscriptions
395            .iter()
396            .filter_map(|sub| {
397                if is_matching_backtracking(topic, sub.pattern) {
398                    Some(sub.clone())
399                } else {
400                    None
401                }
402            })
403            .collect()
404    }
405
406    /// Finds the subscriptions which match the `topic` and caches the
407    /// results in the `patterns` map.
408    #[must_use]
409    pub fn matching_subscriptions<T: AsRef<str>>(&mut self, topic: T) -> Vec<Subscription> {
410        let topic = MStr::<Topic>::from(topic);
411        self.inner_matching_subscriptions(topic)
412    }
413
414    pub(crate) fn inner_matching_subscriptions(&mut self, topic: MStr<Topic>) -> Vec<Subscription> {
415        self.topics.get(&topic).cloned().unwrap_or_else(|| {
416            let mut matches = self.find_topic_matches(topic);
417            matches.sort();
418            self.topics.insert(topic, matches.clone());
419            matches
420        })
421    }
422
423    /// Registers a response handler for a specific correlation ID.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if `handler` is already registered for the `correlation_id`.
428    pub fn register_response_handler(
429        &mut self,
430        correlation_id: &UUID4,
431        handler: ShareableMessageHandler,
432    ) -> anyhow::Result<()> {
433        if self.correlation_index.contains_key(correlation_id) {
434            anyhow::bail!("Correlation ID <{correlation_id}> already has a registered handler");
435        }
436
437        self.correlation_index.insert(*correlation_id, handler);
438
439        Ok(())
440    }
441}
442
443/// Data specific functions.
444impl MessageBus {
445    // /// Send a [`DataRequest`] to an endpoint that must be a data client implementation.
446    // pub fn send_data_request(&self, message: DataRequest) {
447    //     // TODO: log error
448    //     if let Some(client) = self.get_client(&message.client_id, message.venue) {
449    //         let _ = client.request(message);
450    //     }
451    // }
452    //
453    // /// Send a [`SubscriptionCommand`] to an endpoint that must be a data client implementation.
454    // pub fn send_subscription_command(&self, message: SubscriptionCommand) {
455    //     if let Some(client) = self.get_client(&message.client_id, message.venue) {
456    //         client.through_execute(message);
457    //     }
458    // }
459
460    /// Registers message bus for the current thread.
461    pub fn register_message_bus(self) -> Rc<RefCell<MessageBus>> {
462        let msgbus = Rc::new(RefCell::new(self));
463        set_message_bus(msgbus.clone());
464        msgbus
465    }
466}
467
468impl Default for MessageBus {
469    /// Creates a new default [`MessageBus`] instance.
470    fn default() -> Self {
471        Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
472    }
473}