nautilus_coinbase_intx/fix/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! FIX Client for the Coinbase International Drop Copy Endpoint.
17//!
18//! This implementation focuses specifically on processing execution reports
19//! via the FIX protocol, leveraging the existing `SocketClient` for TCP/TLS connectivity.
20//!
21//! # Warning
22//!
23//! **Not a full FIX engine**: This client supports only the Coinbase International Drop Copy
24//! endpoint and lacks general-purpose FIX functionality.
25use std::{
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicUsize, Ordering},
29    },
30    time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::logging::{log_task_started, log_task_stopped};
36#[cfg(feature = "python")]
37use nautilus_core::python::IntoPyObjectPoseiExt;
38use nautilus_core::{env::get_env_var, time::get_atomic_clock_realtime};
39use nautilus_model::identifiers::AccountId;
40use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
41#[cfg(feature = "python")]
42use pyo3::prelude::*;
43use tokio::task::JoinHandle;
44use tokio_tungstenite::tungstenite::stream::Mode;
45
46use super::{
47    messages::{FIX_DELIMITER, FixMessage},
48    parse::convert_to_order_status_report,
49};
50use crate::{
51    common::consts::COINBASE_INTX,
52    fix::{
53        messages::{fix_exec_type, fix_message_type, fix_tag},
54        parse::convert_to_fill_report,
55    },
56};
57
58#[cfg_attr(
59    feature = "python",
60    pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
61)]
62#[derive(Debug, Clone)]
63pub struct CoinbaseIntxFixClient {
64    endpoint: String,
65    api_key: String,
66    api_secret: String,
67    api_passphrase: String,
68    portfolio_id: String,
69    sender_comp_id: String,
70    target_comp_id: String,
71    socket: Option<Arc<SocketClient>>,
72    connected: Arc<AtomicBool>,
73    logged_on: Arc<AtomicBool>,
74    seq_num: Arc<AtomicUsize>,
75    received_seq_num: Arc<AtomicUsize>,
76    heartbeat_secs: u64,
77    processing_task: Option<Arc<JoinHandle<()>>>,
78    heartbeat_task: Option<Arc<JoinHandle<()>>>,
79}
80
81impl CoinbaseIntxFixClient {
82    /// Creates a new [`CoinbaseIntxFixClient`] instance.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if required environment variables or parameters are missing.
87    pub fn new(
88        endpoint: Option<String>,
89        api_key: Option<String>,
90        api_secret: Option<String>,
91        api_passphrase: Option<String>,
92        portfolio_id: Option<String>,
93    ) -> anyhow::Result<Self> {
94        let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
95        let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
96        let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
97        let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
98        let portfolio_id = portfolio_id.unwrap_or(get_env_var("COINBASE_INTX_PORTFOLIO_ID")?);
99        let sender_comp_id = api_key.to_string();
100        let target_comp_id = "CBINTLDC".to_string(); // Drop Copy endpoint
101
102        Ok(Self {
103            endpoint,
104            api_key,
105            api_secret,
106            api_passphrase,
107            portfolio_id,
108            sender_comp_id,
109            target_comp_id,
110            socket: None,
111            connected: Arc::new(AtomicBool::new(false)),
112            logged_on: Arc::new(AtomicBool::new(false)),
113            seq_num: Arc::new(AtomicUsize::new(1)),
114            received_seq_num: Arc::new(AtomicUsize::new(0)),
115            heartbeat_secs: 10, // Default (probably no need to change)
116            processing_task: None,
117            heartbeat_task: None,
118        })
119    }
120
121    /// Creates a new authenticated [`CoinbaseIntxFixClient`] instance using
122    /// environment variables and the default Coinbase International FIX drop copy endpoint.
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if required environment variables are not set.
127    pub fn from_env() -> anyhow::Result<Self> {
128        Self::new(None, None, None, None, None)
129    }
130
131    /// Returns the FIX endpoint being used by the client.
132    #[must_use]
133    pub const fn endpoint(&self) -> &str {
134        self.endpoint.as_str()
135    }
136
137    /// Returns the public API key being used by the client.
138    #[must_use]
139    pub const fn api_key(&self) -> &str {
140        self.api_key.as_str()
141    }
142
143    /// Returns the Coinbase International portfolio ID being used by the client.
144    #[must_use]
145    pub const fn portfolio_id(&self) -> &str {
146        self.portfolio_id.as_str()
147    }
148
149    /// Returns the sender company ID being used by the client.
150    #[must_use]
151    pub const fn sender_comp_id(&self) -> &str {
152        self.sender_comp_id.as_str()
153    }
154
155    /// Returns the target company ID being used by the client.
156    #[must_use]
157    pub const fn target_comp_id(&self) -> &str {
158        self.target_comp_id.as_str()
159    }
160
161    /// Checks if the client is connected.
162    #[must_use]
163    pub fn is_connected(&self) -> bool {
164        self.connected.load(Ordering::SeqCst)
165    }
166
167    /// Checks if the client is logged on.
168    #[must_use]
169    pub fn is_logged_on(&self) -> bool {
170        self.logged_on.load(Ordering::SeqCst)
171    }
172
173    /// Connects to the Coinbase International FIX Drop Copy endpoint.
174    ///
175    /// # Panics
176    ///
177    /// Panics if time calculation or unwrap logic inside fails during logon retry setup.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if network connection or FIX logon fails.
182    pub async fn connect(
183        &mut self,
184        #[cfg(feature = "python")] handler: PyObject,
185        #[cfg(not(feature = "python"))] _handler: (),
186    ) -> anyhow::Result<()> {
187        let config = SocketConfig {
188            url: self.endpoint.clone(),
189            mode: Mode::Tls,
190            suffix: vec![FIX_DELIMITER],
191            #[cfg(feature = "python")]
192            py_handler: None, // Using handler from arg (TODO: refactor this config pattern)
193            heartbeat: None, // Using FIX heartbeats
194            reconnect_timeout_ms: Some(10000),
195            reconnect_delay_initial_ms: Some(5000),
196            reconnect_delay_max_ms: Some(30000),
197            reconnect_backoff_factor: Some(1.5),
198            reconnect_jitter_ms: Some(500),
199            certs_dir: None,
200        };
201
202        let logged_on = self.logged_on.clone();
203        let seq_num = self.seq_num.clone();
204        let received_seq_num = self.received_seq_num.clone();
205        let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
206
207        let handle_message = Arc::new(move |data: &[u8]| {
208            if let Ok(message) = FixMessage::parse(data) {
209                // Update received sequence number
210                if let Some(msg_seq) = message.msg_seq_num() {
211                    received_seq_num.store(msg_seq, Ordering::SeqCst);
212                }
213
214                // Process message based on type
215                if let Some(msg_type) = message.msg_type() {
216                    match msg_type {
217                        fix_message_type::LOGON => {
218                            tracing::info!("Logon successful");
219                            logged_on.store(true, Ordering::SeqCst);
220                        }
221                        fix_message_type::LOGOUT => {
222                            tracing::info!("Received logout");
223                            logged_on.store(false, Ordering::SeqCst);
224                        }
225                        fix_message_type::EXECUTION_REPORT => {
226                            if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
227                                if matches!(
228                                    exec_type,
229                                    fix_exec_type::REJECTED
230                                        | fix_exec_type::NEW
231                                        | fix_exec_type::PENDING_NEW
232                                ) {
233                                    // These order events are already handled by the client
234                                    tracing::debug!(
235                                        "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
236                                    );
237                                } else if matches!(
238                                    exec_type,
239                                    fix_exec_type::CANCELED
240                                        | fix_exec_type::EXPIRED
241                                        | fix_exec_type::REPLACED
242                                ) {
243                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
244                                    let ts_init = clock.get_time_ns();
245                                    match convert_to_order_status_report(
246                                        &message, account_id, ts_init,
247                                    ) {
248                                        #[cfg(feature = "python")]
249                                        Ok(report) => Python::with_gil(|py| {
250                                            call_python(
251                                                py,
252                                                &handler,
253                                                report.into_py_any_unwrap(py),
254                                            );
255                                        }),
256                                        #[cfg(not(feature = "python"))]
257                                        Ok(_report) => {
258                                            tracing::debug!(
259                                                "Order status report handled (Python disabled)"
260                                            );
261                                        }
262                                        Err(e) => {
263                                            tracing::error!(
264                                                "Failed to parse FIX execution report: {e}"
265                                            );
266                                        }
267                                    }
268                                } else if exec_type == fix_exec_type::PARTIAL_FILL
269                                    || exec_type == fix_exec_type::FILL
270                                {
271                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
272                                    let ts_init = clock.get_time_ns();
273                                    match convert_to_fill_report(&message, account_id, ts_init) {
274                                        #[cfg(feature = "python")]
275                                        Ok(report) => Python::with_gil(|py| {
276                                            call_python(
277                                                py,
278                                                &handler,
279                                                report.into_py_any_unwrap(py),
280                                            );
281                                        }),
282                                        #[cfg(not(feature = "python"))]
283                                        Ok(_report) => {
284                                            tracing::debug!(
285                                                "Fill report handled (Python disabled)"
286                                            );
287                                        }
288                                        Err(e) => {
289                                            tracing::error!(
290                                                "Failed to parse FIX execution report: {e}"
291                                            );
292                                        }
293                                    }
294                                } else {
295                                    tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
296                                }
297                            }
298                        }
299                        // These can be HEARTBEAT or TEST_REQUEST messages,
300                        // ideally we'd respond to these with a heartbeat
301                        // including tag 112 TestReqID.
302                        _ => tracing::trace!("Recieved unexpected {message:?}"),
303                    }
304                }
305            } else {
306                tracing::error!("Failed to parse FIX message");
307            }
308        });
309
310        let socket = match SocketClient::connect(
311            config,
312            Some(handle_message),
313            #[cfg(feature = "python")]
314            None,
315            #[cfg(feature = "python")]
316            None,
317            #[cfg(feature = "python")]
318            None,
319        )
320        .await
321        {
322            Ok(socket) => socket,
323            Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
324        };
325
326        let writer_tx = socket.writer_tx.clone();
327
328        self.socket = Some(Arc::new(socket));
329
330        self.send_logon().await?;
331
332        // Create task to monitor connection and send logon after reconnect
333        let connected_clone = self.connected.clone();
334        let logged_on_clone = self.logged_on.clone();
335        let heartbeat_secs = self.heartbeat_secs;
336        let client_clone = self.clone();
337
338        self.processing_task = Some(Arc::new(tokio::spawn(async move {
339            log_task_started("maintain-fix-connection");
340
341            let mut last_logon_attempt = std::time::Instant::now()
342                .checked_sub(Duration::from_secs(10))
343                .unwrap();
344
345            loop {
346                tokio::time::sleep(Duration::from_millis(100)).await;
347
348                // Check if connected but not logged on
349                if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
350                {
351                    // Rate limit logon attempts
352                    if last_logon_attempt.elapsed() > Duration::from_secs(10) {
353                        tracing::info!("Connected without logon");
354                        last_logon_attempt = std::time::Instant::now();
355
356                        if let Err(e) = client_clone.send_logon().await {
357                            tracing::error!("Failed to send logon: {e}");
358                        }
359                    }
360                }
361            }
362        })));
363
364        let logged_on_clone = self.logged_on.clone();
365        let sender_comp_id = self.sender_comp_id.clone();
366        let target_comp_id = self.target_comp_id.clone();
367
368        self.heartbeat_task = Some(Arc::new(tokio::spawn(async move {
369            log_task_started("heartbeat");
370            tracing::debug!("Heartbeat at {heartbeat_secs}s intervals");
371
372            let interval = Duration::from_secs(heartbeat_secs);
373
374            loop {
375                if logged_on_clone.load(Ordering::SeqCst) {
376                    // Create new heartbeat message
377                    let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
378                    let now = chrono::Utc::now();
379                    let msg =
380                        FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
381
382                    if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
383                        tracing::error!("Failed to send heartbeat: {e}");
384                        break;
385                    }
386
387                    tracing::trace!("Sent heartbeat");
388                } else {
389                    // No longer logged on
390                    tracing::debug!("No longer logged on, stopping heartbeat task");
391                    break;
392                }
393
394                tokio::time::sleep(interval).await;
395            }
396
397            log_task_stopped("heartbeat");
398        })));
399
400        Ok(())
401    }
402
403    /// Closes the connection.
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if logout or socket closure fails.
408    pub async fn close(&mut self) -> anyhow::Result<()> {
409        // Send logout message if connected
410        if self.is_logged_on() {
411            if let Err(e) = self.send_logout("Normal logout").await {
412                tracing::warn!("Failed to send logout message: {e}");
413            }
414        }
415
416        // Close socket
417        if let Some(socket) = &self.socket {
418            socket.close().await;
419        }
420
421        // Cancel processing task
422        if let Some(task) = self.processing_task.take() {
423            task.abort();
424        }
425
426        // Cancel heartbeat task
427        if let Some(task) = self.heartbeat_task.take() {
428            task.abort();
429        }
430
431        self.connected.store(false, Ordering::SeqCst);
432        self.logged_on.store(false, Ordering::SeqCst);
433
434        Ok(())
435    }
436
437    /// Send a logon message
438    async fn send_logon(&self) -> anyhow::Result<()> {
439        if self.socket.is_none() {
440            anyhow::bail!("Socket not connected".to_string());
441        }
442
443        // Reset sequence number
444        self.seq_num.store(1, Ordering::SeqCst);
445
446        let now = chrono::Utc::now();
447        let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
448        let passphrase = self.api_passphrase.clone();
449
450        let message = format!(
451            "{}{}{}{}",
452            timestamp, self.api_key, self.target_comp_id, passphrase
453        );
454
455        // Create signature
456        let decoded_secret = BASE64_STANDARD
457            .decode(&self.api_secret)
458            .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
459
460        let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
461        let tag = hmac::sign(&key, message.as_bytes());
462        let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
463
464        let logon_msg = FixMessage::create_logon(
465            1, // Always use 1 for new logon with reset
466            &self.sender_comp_id,
467            &self.target_comp_id,
468            self.heartbeat_secs,
469            &self.api_key,
470            &passphrase,
471            &encoded_signature,
472            &now,
473        );
474
475        if let Some(socket) = &self.socket {
476            tracing::info!("Logging on...");
477
478            match socket.send_bytes(logon_msg.to_bytes()).await {
479                Ok(()) => tracing::debug!("Sent logon message"),
480                Err(e) => tracing::error!("Error on logon: {e}"),
481            }
482        } else {
483            anyhow::bail!("Socket not connected".to_string());
484        }
485
486        let start = std::time::Instant::now();
487        while !self.is_logged_on() {
488            tokio::time::sleep(Duration::from_millis(100)).await;
489
490            if start.elapsed() > Duration::from_secs(10) {
491                anyhow::bail!("Logon timeout".to_string());
492            }
493        }
494
495        self.logged_on.store(true, Ordering::SeqCst);
496
497        Ok(())
498    }
499
500    /// Sends a logout message.
501    async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
502        if self.socket.is_none() {
503            anyhow::bail!("Socket not connected".to_string());
504        }
505
506        let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
507        let now = chrono::Utc::now();
508
509        let logout_msg = FixMessage::create_logout(
510            seq_num,
511            &self.sender_comp_id,
512            &self.target_comp_id,
513            Some(text),
514            &now,
515        );
516
517        if let Some(socket) = &self.socket {
518            match socket.send_bytes(logout_msg.to_bytes()).await {
519                Ok(()) => tracing::debug!("Sent logout message"),
520                Err(e) => tracing::error!("Error on logout: {e}"),
521            }
522        } else {
523            anyhow::bail!("Socket not connected".to_string());
524        }
525
526        Ok(())
527    }
528}
529
530// Can't be moved to core because we don't want to depend on tracing there
531#[cfg(feature = "python")]
532pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
533    if let Err(e) = callback.call1(py, (py_obj,)) {
534        tracing::error!("Error calling Python: {e}");
535    }
536}