nautilus_coinbase_intx/fix/
client.rs1use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicUsize, Ordering},
29 },
30 time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::logging::{log_task_started, log_task_stopped};
36#[cfg(feature = "python")]
37use nautilus_core::python::IntoPyObjectPoseiExt;
38use nautilus_core::{env::get_env_var, time::get_atomic_clock_realtime};
39use nautilus_model::identifiers::AccountId;
40use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
41#[cfg(feature = "python")]
42use pyo3::prelude::*;
43use tokio::task::JoinHandle;
44use tokio_tungstenite::tungstenite::stream::Mode;
45
46use super::{
47 messages::{FIX_DELIMITER, FixMessage},
48 parse::convert_to_order_status_report,
49};
50use crate::{
51 common::consts::COINBASE_INTX,
52 fix::{
53 messages::{fix_exec_type, fix_message_type, fix_tag},
54 parse::convert_to_fill_report,
55 },
56};
57
58#[cfg_attr(
59 feature = "python",
60 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
61)]
62#[derive(Debug, Clone)]
63pub struct CoinbaseIntxFixClient {
64 endpoint: String,
65 api_key: String,
66 api_secret: String,
67 api_passphrase: String,
68 portfolio_id: String,
69 sender_comp_id: String,
70 target_comp_id: String,
71 socket: Option<Arc<SocketClient>>,
72 connected: Arc<AtomicBool>,
73 logged_on: Arc<AtomicBool>,
74 seq_num: Arc<AtomicUsize>,
75 received_seq_num: Arc<AtomicUsize>,
76 heartbeat_secs: u64,
77 processing_task: Option<Arc<JoinHandle<()>>>,
78 heartbeat_task: Option<Arc<JoinHandle<()>>>,
79}
80
81impl CoinbaseIntxFixClient {
82 pub fn new(
88 endpoint: Option<String>,
89 api_key: Option<String>,
90 api_secret: Option<String>,
91 api_passphrase: Option<String>,
92 portfolio_id: Option<String>,
93 ) -> anyhow::Result<Self> {
94 let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
95 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
96 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
97 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
98 let portfolio_id = portfolio_id.unwrap_or(get_env_var("COINBASE_INTX_PORTFOLIO_ID")?);
99 let sender_comp_id = api_key.to_string();
100 let target_comp_id = "CBINTLDC".to_string(); Ok(Self {
103 endpoint,
104 api_key,
105 api_secret,
106 api_passphrase,
107 portfolio_id,
108 sender_comp_id,
109 target_comp_id,
110 socket: None,
111 connected: Arc::new(AtomicBool::new(false)),
112 logged_on: Arc::new(AtomicBool::new(false)),
113 seq_num: Arc::new(AtomicUsize::new(1)),
114 received_seq_num: Arc::new(AtomicUsize::new(0)),
115 heartbeat_secs: 10, processing_task: None,
117 heartbeat_task: None,
118 })
119 }
120
121 pub fn from_env() -> anyhow::Result<Self> {
128 Self::new(None, None, None, None, None)
129 }
130
131 #[must_use]
133 pub const fn endpoint(&self) -> &str {
134 self.endpoint.as_str()
135 }
136
137 #[must_use]
139 pub const fn api_key(&self) -> &str {
140 self.api_key.as_str()
141 }
142
143 #[must_use]
145 pub const fn portfolio_id(&self) -> &str {
146 self.portfolio_id.as_str()
147 }
148
149 #[must_use]
151 pub const fn sender_comp_id(&self) -> &str {
152 self.sender_comp_id.as_str()
153 }
154
155 #[must_use]
157 pub const fn target_comp_id(&self) -> &str {
158 self.target_comp_id.as_str()
159 }
160
161 #[must_use]
163 pub fn is_connected(&self) -> bool {
164 self.connected.load(Ordering::SeqCst)
165 }
166
167 #[must_use]
169 pub fn is_logged_on(&self) -> bool {
170 self.logged_on.load(Ordering::SeqCst)
171 }
172
173 pub async fn connect(
183 &mut self,
184 #[cfg(feature = "python")] handler: PyObject,
185 #[cfg(not(feature = "python"))] _handler: (),
186 ) -> anyhow::Result<()> {
187 let config = SocketConfig {
188 url: self.endpoint.clone(),
189 mode: Mode::Tls,
190 suffix: vec![FIX_DELIMITER],
191 #[cfg(feature = "python")]
192 py_handler: None, heartbeat: None, reconnect_timeout_ms: Some(10000),
195 reconnect_delay_initial_ms: Some(5000),
196 reconnect_delay_max_ms: Some(30000),
197 reconnect_backoff_factor: Some(1.5),
198 reconnect_jitter_ms: Some(500),
199 certs_dir: None,
200 };
201
202 let logged_on = self.logged_on.clone();
203 let seq_num = self.seq_num.clone();
204 let received_seq_num = self.received_seq_num.clone();
205 let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
206
207 let handle_message = Arc::new(move |data: &[u8]| {
208 if let Ok(message) = FixMessage::parse(data) {
209 if let Some(msg_seq) = message.msg_seq_num() {
211 received_seq_num.store(msg_seq, Ordering::SeqCst);
212 }
213
214 if let Some(msg_type) = message.msg_type() {
216 match msg_type {
217 fix_message_type::LOGON => {
218 tracing::info!("Logon successful");
219 logged_on.store(true, Ordering::SeqCst);
220 }
221 fix_message_type::LOGOUT => {
222 tracing::info!("Received logout");
223 logged_on.store(false, Ordering::SeqCst);
224 }
225 fix_message_type::EXECUTION_REPORT => {
226 if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
227 if matches!(
228 exec_type,
229 fix_exec_type::REJECTED
230 | fix_exec_type::NEW
231 | fix_exec_type::PENDING_NEW
232 ) {
233 tracing::debug!(
235 "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
236 );
237 } else if matches!(
238 exec_type,
239 fix_exec_type::CANCELED
240 | fix_exec_type::EXPIRED
241 | fix_exec_type::REPLACED
242 ) {
243 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
245 match convert_to_order_status_report(
246 &message, account_id, ts_init,
247 ) {
248 #[cfg(feature = "python")]
249 Ok(report) => Python::with_gil(|py| {
250 call_python(
251 py,
252 &handler,
253 report.into_py_any_unwrap(py),
254 );
255 }),
256 #[cfg(not(feature = "python"))]
257 Ok(_report) => {
258 tracing::debug!(
259 "Order status report handled (Python disabled)"
260 );
261 }
262 Err(e) => {
263 tracing::error!(
264 "Failed to parse FIX execution report: {e}"
265 );
266 }
267 }
268 } else if exec_type == fix_exec_type::PARTIAL_FILL
269 || exec_type == fix_exec_type::FILL
270 {
271 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
273 match convert_to_fill_report(&message, account_id, ts_init) {
274 #[cfg(feature = "python")]
275 Ok(report) => Python::with_gil(|py| {
276 call_python(
277 py,
278 &handler,
279 report.into_py_any_unwrap(py),
280 );
281 }),
282 #[cfg(not(feature = "python"))]
283 Ok(_report) => {
284 tracing::debug!(
285 "Fill report handled (Python disabled)"
286 );
287 }
288 Err(e) => {
289 tracing::error!(
290 "Failed to parse FIX execution report: {e}"
291 );
292 }
293 }
294 } else {
295 tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
296 }
297 }
298 }
299 _ => tracing::trace!("Recieved unexpected {message:?}"),
303 }
304 }
305 } else {
306 tracing::error!("Failed to parse FIX message");
307 }
308 });
309
310 let socket = match SocketClient::connect(
311 config,
312 Some(handle_message),
313 #[cfg(feature = "python")]
314 None,
315 #[cfg(feature = "python")]
316 None,
317 #[cfg(feature = "python")]
318 None,
319 )
320 .await
321 {
322 Ok(socket) => socket,
323 Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
324 };
325
326 let writer_tx = socket.writer_tx.clone();
327
328 self.socket = Some(Arc::new(socket));
329
330 self.send_logon().await?;
331
332 let connected_clone = self.connected.clone();
334 let logged_on_clone = self.logged_on.clone();
335 let heartbeat_secs = self.heartbeat_secs;
336 let client_clone = self.clone();
337
338 self.processing_task = Some(Arc::new(tokio::spawn(async move {
339 log_task_started("maintain-fix-connection");
340
341 let mut last_logon_attempt = std::time::Instant::now()
342 .checked_sub(Duration::from_secs(10))
343 .unwrap();
344
345 loop {
346 tokio::time::sleep(Duration::from_millis(100)).await;
347
348 if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
350 {
351 if last_logon_attempt.elapsed() > Duration::from_secs(10) {
353 tracing::info!("Connected without logon");
354 last_logon_attempt = std::time::Instant::now();
355
356 if let Err(e) = client_clone.send_logon().await {
357 tracing::error!("Failed to send logon: {e}");
358 }
359 }
360 }
361 }
362 })));
363
364 let logged_on_clone = self.logged_on.clone();
365 let sender_comp_id = self.sender_comp_id.clone();
366 let target_comp_id = self.target_comp_id.clone();
367
368 self.heartbeat_task = Some(Arc::new(tokio::spawn(async move {
369 log_task_started("heartbeat");
370 tracing::debug!("Heartbeat at {heartbeat_secs}s intervals");
371
372 let interval = Duration::from_secs(heartbeat_secs);
373
374 loop {
375 if logged_on_clone.load(Ordering::SeqCst) {
376 let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
378 let now = chrono::Utc::now();
379 let msg =
380 FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
381
382 if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
383 tracing::error!("Failed to send heartbeat: {e}");
384 break;
385 }
386
387 tracing::trace!("Sent heartbeat");
388 } else {
389 tracing::debug!("No longer logged on, stopping heartbeat task");
391 break;
392 }
393
394 tokio::time::sleep(interval).await;
395 }
396
397 log_task_stopped("heartbeat");
398 })));
399
400 Ok(())
401 }
402
403 pub async fn close(&mut self) -> anyhow::Result<()> {
409 if self.is_logged_on() {
411 if let Err(e) = self.send_logout("Normal logout").await {
412 tracing::warn!("Failed to send logout message: {e}");
413 }
414 }
415
416 if let Some(socket) = &self.socket {
418 socket.close().await;
419 }
420
421 if let Some(task) = self.processing_task.take() {
423 task.abort();
424 }
425
426 if let Some(task) = self.heartbeat_task.take() {
428 task.abort();
429 }
430
431 self.connected.store(false, Ordering::SeqCst);
432 self.logged_on.store(false, Ordering::SeqCst);
433
434 Ok(())
435 }
436
437 async fn send_logon(&self) -> anyhow::Result<()> {
439 if self.socket.is_none() {
440 anyhow::bail!("Socket not connected".to_string());
441 }
442
443 self.seq_num.store(1, Ordering::SeqCst);
445
446 let now = chrono::Utc::now();
447 let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
448 let passphrase = self.api_passphrase.clone();
449
450 let message = format!(
451 "{}{}{}{}",
452 timestamp, self.api_key, self.target_comp_id, passphrase
453 );
454
455 let decoded_secret = BASE64_STANDARD
457 .decode(&self.api_secret)
458 .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
459
460 let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
461 let tag = hmac::sign(&key, message.as_bytes());
462 let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
463
464 let logon_msg = FixMessage::create_logon(
465 1, &self.sender_comp_id,
467 &self.target_comp_id,
468 self.heartbeat_secs,
469 &self.api_key,
470 &passphrase,
471 &encoded_signature,
472 &now,
473 );
474
475 if let Some(socket) = &self.socket {
476 tracing::info!("Logging on...");
477
478 match socket.send_bytes(logon_msg.to_bytes()).await {
479 Ok(()) => tracing::debug!("Sent logon message"),
480 Err(e) => tracing::error!("Error on logon: {e}"),
481 }
482 } else {
483 anyhow::bail!("Socket not connected".to_string());
484 }
485
486 let start = std::time::Instant::now();
487 while !self.is_logged_on() {
488 tokio::time::sleep(Duration::from_millis(100)).await;
489
490 if start.elapsed() > Duration::from_secs(10) {
491 anyhow::bail!("Logon timeout".to_string());
492 }
493 }
494
495 self.logged_on.store(true, Ordering::SeqCst);
496
497 Ok(())
498 }
499
500 async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
502 if self.socket.is_none() {
503 anyhow::bail!("Socket not connected".to_string());
504 }
505
506 let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
507 let now = chrono::Utc::now();
508
509 let logout_msg = FixMessage::create_logout(
510 seq_num,
511 &self.sender_comp_id,
512 &self.target_comp_id,
513 Some(text),
514 &now,
515 );
516
517 if let Some(socket) = &self.socket {
518 match socket.send_bytes(logout_msg.to_bytes()).await {
519 Ok(()) => tracing::debug!("Sent logout message"),
520 Err(e) => tracing::error!("Error on logout: {e}"),
521 }
522 } else {
523 anyhow::bail!("Socket not connected".to_string());
524 }
525
526 Ok(())
527 }
528}
529
530#[cfg(feature = "python")]
532pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
533 if let Err(e) = callback.call1(py, (py_obj,)) {
534 tracing::error!("Error calling Python: {e}");
535 }
536}