nautilus_coinbase_intx/fix/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! FIX Client for the Coinbase International Drop Copy Endpoint.
17//!
18//! This implementation focuses specifically on processing execution reports
19//! via the FIX protocol, leveraging the existing `SocketClient` for TCP/TLS connectivity.
20//!
21//! # Warning
22//!
23//! **Not a full FIX engine**: This client supports only the Coinbase International Drop Copy
24//! endpoint and lacks general-purpose FIX functionality.
25use std::{
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicUsize, Ordering},
29    },
30    time::Duration,
31};
32
33use base64::prelude::*;
34use nautilus_common::logging::{log_task_started, log_task_stopped};
35use nautilus_core::{
36    env::get_env_var, python::IntoPyObjectPoseiExt, time::get_atomic_clock_realtime,
37};
38use nautilus_model::identifiers::AccountId;
39use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
40use pyo3::prelude::*;
41use ring::hmac;
42use tokio::task::JoinHandle;
43use tokio_tungstenite::tungstenite::stream::Mode;
44
45use super::{
46    messages::{FIX_DELIMITER, FixMessage},
47    parse::convert_to_order_status_report,
48};
49use crate::{
50    common::consts::COINBASE_INTX,
51    fix::{
52        messages::{fix_exec_type, fix_message_type, fix_tag},
53        parse::convert_to_fill_report,
54    },
55};
56
57#[pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")]
58#[derive(Debug, Clone)]
59pub struct CoinbaseIntxFixClient {
60    endpoint: String,
61    api_key: String,
62    api_secret: String,
63    api_passphrase: String,
64    portfolio_id: String,
65    sender_comp_id: String,
66    target_comp_id: String,
67    socket: Option<Arc<SocketClient>>,
68    connected: Arc<AtomicBool>,
69    logged_on: Arc<AtomicBool>,
70    seq_num: Arc<AtomicUsize>,
71    received_seq_num: Arc<AtomicUsize>,
72    heartbeat_secs: u64,
73    processing_task: Option<Arc<JoinHandle<()>>>,
74    heartbeat_task: Option<Arc<JoinHandle<()>>>,
75}
76
77impl CoinbaseIntxFixClient {
78    /// Creates a new [`CoinbaseIntxFixClient`] instance.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if required environment variables or parameters are missing.
83    pub fn new(
84        endpoint: Option<String>,
85        api_key: Option<String>,
86        api_secret: Option<String>,
87        api_passphrase: Option<String>,
88        portfolio_id: Option<String>,
89    ) -> anyhow::Result<Self> {
90        let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
91        let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
92        let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
93        let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
94        let portfolio_id = portfolio_id.unwrap_or(get_env_var("COINBASE_INTX_PORTFOLIO_ID")?);
95        let sender_comp_id = api_key.to_string();
96        let target_comp_id = "CBINTLDC".to_string(); // Drop Copy endpoint
97
98        Ok(Self {
99            endpoint,
100            api_key,
101            api_secret,
102            api_passphrase,
103            portfolio_id,
104            sender_comp_id,
105            target_comp_id,
106            socket: None,
107            connected: Arc::new(AtomicBool::new(false)),
108            logged_on: Arc::new(AtomicBool::new(false)),
109            seq_num: Arc::new(AtomicUsize::new(1)),
110            received_seq_num: Arc::new(AtomicUsize::new(0)),
111            heartbeat_secs: 10, // Default (probably no need to change)
112            processing_task: None,
113            heartbeat_task: None,
114        })
115    }
116
117    /// Creates a new authenticated [`CoinbaseIntxFixClient`] instance using
118    /// environment variables and the default Coinbase International FIX drop copy endpoint.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if required environment variables are not set.
123    pub fn from_env() -> anyhow::Result<Self> {
124        Self::new(None, None, None, None, None)
125    }
126
127    /// Returns the FIX endpoint being used by the client.
128    #[must_use]
129    pub const fn endpoint(&self) -> &str {
130        self.endpoint.as_str()
131    }
132
133    /// Returns the public API key being used by the client.
134    #[must_use]
135    pub const fn api_key(&self) -> &str {
136        self.api_key.as_str()
137    }
138
139    /// Returns the Coinbase International portfolio ID being used by the client.
140    #[must_use]
141    pub const fn portfolio_id(&self) -> &str {
142        self.portfolio_id.as_str()
143    }
144
145    /// Returns the sender company ID being used by the client.
146    #[must_use]
147    pub const fn sender_comp_id(&self) -> &str {
148        self.sender_comp_id.as_str()
149    }
150
151    /// Returns the target company ID being used by the client.
152    #[must_use]
153    pub const fn target_comp_id(&self) -> &str {
154        self.target_comp_id.as_str()
155    }
156
157    /// Checks if the client is connected.
158    #[must_use]
159    pub fn is_connected(&self) -> bool {
160        self.connected.load(Ordering::SeqCst)
161    }
162
163    /// Checks if the client is logged on.
164    #[must_use]
165    pub fn is_logged_on(&self) -> bool {
166        self.logged_on.load(Ordering::SeqCst)
167    }
168
169    /// Connects to the Coinbase International FIX Drop Copy endpoint.
170    ///
171    /// # Panics
172    ///
173    /// Panics if time calculation or unwrap logic inside fails during logon retry setup.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if network connection or FIX logon fails.
178    pub async fn connect(&mut self, handler: PyObject) -> anyhow::Result<()> {
179        let config = SocketConfig {
180            url: self.endpoint.clone(),
181            mode: Mode::Tls,
182            suffix: vec![FIX_DELIMITER],
183            #[cfg(feature = "python")]
184            py_handler: None, // Using handler from arg (TODO: refactor this config pattern)
185            heartbeat: None, // Using FIX heartbeats
186            reconnect_timeout_ms: Some(10000),
187            reconnect_delay_initial_ms: Some(5000),
188            reconnect_delay_max_ms: Some(30000),
189            reconnect_backoff_factor: Some(1.5),
190            reconnect_jitter_ms: Some(500),
191            certs_dir: None,
192        };
193
194        let logged_on = self.logged_on.clone();
195        let seq_num = self.seq_num.clone();
196        let received_seq_num = self.received_seq_num.clone();
197        let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
198
199        let handle_message = Arc::new(move |data: &[u8]| {
200            if let Ok(message) = FixMessage::parse(data) {
201                // Update received sequence number
202                if let Some(msg_seq) = message.msg_seq_num() {
203                    received_seq_num.store(msg_seq, Ordering::SeqCst);
204                }
205
206                // Process message based on type
207                if let Some(msg_type) = message.msg_type() {
208                    match msg_type {
209                        fix_message_type::LOGON => {
210                            tracing::info!("Logon successful");
211                            logged_on.store(true, Ordering::SeqCst);
212                        }
213                        fix_message_type::LOGOUT => {
214                            tracing::info!("Received logout");
215                            logged_on.store(false, Ordering::SeqCst);
216                        }
217                        fix_message_type::EXECUTION_REPORT => {
218                            if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
219                                if matches!(
220                                    exec_type,
221                                    fix_exec_type::REJECTED
222                                        | fix_exec_type::NEW
223                                        | fix_exec_type::PENDING_NEW
224                                ) {
225                                    // These order events are already handled by the client
226                                    tracing::debug!(
227                                        "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
228                                    );
229                                } else if matches!(
230                                    exec_type,
231                                    fix_exec_type::CANCELED
232                                        | fix_exec_type::EXPIRED
233                                        | fix_exec_type::REPLACED
234                                ) {
235                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
236                                    let ts_init = clock.get_time_ns();
237                                    match convert_to_order_status_report(
238                                        &message, account_id, ts_init,
239                                    ) {
240                                        Ok(report) => Python::with_gil(|py| {
241                                            call_python(
242                                                py,
243                                                &handler,
244                                                report.into_py_any_unwrap(py),
245                                            );
246                                        }),
247                                        Err(e) => {
248                                            tracing::error!(
249                                                "Failed to parse FIX execution report: {e}"
250                                            );
251                                        }
252                                    }
253                                } else if exec_type == fix_exec_type::PARTIAL_FILL
254                                    || exec_type == fix_exec_type::FILL
255                                {
256                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
257                                    let ts_init = clock.get_time_ns();
258                                    match convert_to_fill_report(&message, account_id, ts_init) {
259                                        Ok(report) => Python::with_gil(|py| {
260                                            call_python(
261                                                py,
262                                                &handler,
263                                                report.into_py_any_unwrap(py),
264                                            );
265                                        }),
266                                        Err(e) => {
267                                            tracing::error!(
268                                                "Failed to parse FIX execution report: {e}"
269                                            );
270                                        }
271                                    }
272                                } else {
273                                    tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
274                                }
275                            }
276                        }
277                        // These can be HEARTBEAT or TEST_REQUEST messages,
278                        // ideally we'd respond to these with a heartbeat
279                        // including tag 112 TestReqID.
280                        _ => tracing::trace!("Recieved unexpected {message:?}"),
281                    }
282                }
283            } else {
284                tracing::error!("Failed to parse FIX message");
285            }
286        });
287
288        let socket =
289            match SocketClient::connect(config, Some(handle_message), None, None, None).await {
290                Ok(socket) => socket,
291                Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
292            };
293
294        let writer_tx = socket.writer_tx.clone();
295
296        self.socket = Some(Arc::new(socket));
297
298        self.send_logon().await?;
299
300        // Create task to monitor connection and send logon after reconnect
301        let connected_clone = self.connected.clone();
302        let logged_on_clone = self.logged_on.clone();
303        let heartbeat_secs = self.heartbeat_secs;
304        let client_clone = self.clone();
305
306        self.processing_task = Some(Arc::new(tokio::spawn(async move {
307            log_task_started("maintain-fix-connection");
308
309            let mut last_logon_attempt = std::time::Instant::now()
310                .checked_sub(Duration::from_secs(10))
311                .unwrap();
312
313            loop {
314                tokio::time::sleep(Duration::from_millis(100)).await;
315
316                // Check if connected but not logged on
317                if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
318                {
319                    // Rate limit logon attempts
320                    if last_logon_attempt.elapsed() > Duration::from_secs(10) {
321                        tracing::info!("Connected without logon");
322                        last_logon_attempt = std::time::Instant::now();
323
324                        if let Err(e) = client_clone.send_logon().await {
325                            tracing::error!("Failed to send logon: {e}");
326                        }
327                    }
328                }
329            }
330        })));
331
332        let logged_on_clone = self.logged_on.clone();
333        let sender_comp_id = self.sender_comp_id.clone();
334        let target_comp_id = self.target_comp_id.clone();
335
336        self.heartbeat_task = Some(Arc::new(tokio::spawn(async move {
337            log_task_started("heartbeat");
338            tracing::debug!("Heartbeat at {heartbeat_secs}s intervals");
339
340            let interval = Duration::from_secs(heartbeat_secs);
341
342            loop {
343                if logged_on_clone.load(Ordering::SeqCst) {
344                    // Create new heartbeat message
345                    let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
346                    let now = chrono::Utc::now();
347                    let msg =
348                        FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
349
350                    if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
351                        tracing::error!("Failed to send heartbeat: {e}");
352                        break;
353                    }
354
355                    tracing::trace!("Sent heartbeat");
356                } else {
357                    // No longer logged on
358                    tracing::debug!("No longer logged on, stopping heartbeat task");
359                    break;
360                }
361
362                tokio::time::sleep(interval).await;
363            }
364
365            log_task_stopped("heartbeat");
366        })));
367
368        Ok(())
369    }
370
371    /// Closes the connection.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if logout or socket closure fails.
376    pub async fn close(&mut self) -> anyhow::Result<()> {
377        // Send logout message if connected
378        if self.is_logged_on() {
379            if let Err(e) = self.send_logout("Normal logout").await {
380                tracing::warn!("Failed to send logout message: {e}");
381            }
382        }
383
384        // Close socket
385        if let Some(socket) = &self.socket {
386            socket.close().await;
387        }
388
389        // Cancel processing task
390        if let Some(task) = self.processing_task.take() {
391            task.abort();
392        }
393
394        // Cancel heartbeat task
395        if let Some(task) = self.heartbeat_task.take() {
396            task.abort();
397        }
398
399        self.connected.store(false, Ordering::SeqCst);
400        self.logged_on.store(false, Ordering::SeqCst);
401
402        Ok(())
403    }
404
405    /// Send a logon message
406    async fn send_logon(&self) -> anyhow::Result<()> {
407        if self.socket.is_none() {
408            anyhow::bail!("Socket not connected".to_string());
409        }
410
411        // Reset sequence number
412        self.seq_num.store(1, Ordering::SeqCst);
413
414        let now = chrono::Utc::now();
415        let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
416        let passphrase = self.api_passphrase.clone();
417
418        let message = format!(
419            "{}{}{}{}",
420            timestamp, self.api_key, self.target_comp_id, passphrase
421        );
422
423        // Create signature
424        let decoded_secret = BASE64_STANDARD
425            .decode(&self.api_secret)
426            .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
427
428        let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
429        let signature = hmac::sign(&hmac_key, message.as_bytes());
430        let encoded_signature = BASE64_STANDARD.encode(signature);
431
432        let logon_msg = FixMessage::create_logon(
433            1, // Always use 1 for new logon with reset
434            &self.sender_comp_id,
435            &self.target_comp_id,
436            self.heartbeat_secs,
437            &self.api_key,
438            &passphrase,
439            &encoded_signature,
440            &now,
441        );
442
443        if let Some(socket) = &self.socket {
444            tracing::info!("Logging on...");
445
446            match socket.send_bytes(logon_msg.to_bytes()).await {
447                Ok(()) => tracing::debug!("Sent logon message"),
448                Err(e) => tracing::error!("Error on logon: {e}"),
449            }
450        } else {
451            anyhow::bail!("Socket not connected".to_string());
452        }
453
454        let start = std::time::Instant::now();
455        while !self.is_logged_on() {
456            tokio::time::sleep(Duration::from_millis(100)).await;
457
458            if start.elapsed() > Duration::from_secs(10) {
459                anyhow::bail!("Logon timeout".to_string());
460            }
461        }
462
463        self.logged_on.store(true, Ordering::SeqCst);
464
465        Ok(())
466    }
467
468    /// Sends a logout message.
469    async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
470        if self.socket.is_none() {
471            anyhow::bail!("Socket not connected".to_string());
472        }
473
474        let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
475        let now = chrono::Utc::now();
476
477        let logout_msg = FixMessage::create_logout(
478            seq_num,
479            &self.sender_comp_id,
480            &self.target_comp_id,
481            Some(text),
482            &now,
483        );
484
485        if let Some(socket) = &self.socket {
486            match socket.send_bytes(logout_msg.to_bytes()).await {
487                Ok(()) => tracing::debug!("Sent logout message"),
488                Err(e) => tracing::error!("Error on logout: {e}"),
489            }
490        } else {
491            anyhow::bail!("Socket not connected".to_string());
492        }
493
494        Ok(())
495    }
496}
497
498// Can't be moved to core because we don't want to depend on tracing there
499pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
500    if let Err(e) = callback.call1(py, (py_obj,)) {
501        tracing::error!("Error calling Python: {e}");
502    }
503}