nautilus_coinbase_intx/fix/
client.rs1use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicUsize, Ordering},
29 },
30 time::Duration,
31};
32
33use base64::prelude::*;
34use nautilus_common::logging::{log_task_started, log_task_stopped};
35use nautilus_core::{
36 env::get_env_var, python::IntoPyObjectPoseiExt, time::get_atomic_clock_realtime,
37};
38use nautilus_model::identifiers::AccountId;
39use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
40use pyo3::prelude::*;
41use ring::hmac;
42use tokio::task::JoinHandle;
43use tokio_tungstenite::tungstenite::stream::Mode;
44
45use super::{
46 messages::{FIX_DELIMITER, FixMessage},
47 parse::convert_to_order_status_report,
48};
49use crate::{
50 common::consts::COINBASE_INTX,
51 fix::{
52 messages::{fix_exec_type, fix_message_type, fix_tag},
53 parse::convert_to_fill_report,
54 },
55};
56
57#[pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")]
58#[derive(Debug, Clone)]
59pub struct CoinbaseIntxFixClient {
60 endpoint: String,
61 api_key: String,
62 api_secret: String,
63 api_passphrase: String,
64 portfolio_id: String,
65 sender_comp_id: String,
66 target_comp_id: String,
67 socket: Option<Arc<SocketClient>>,
68 connected: Arc<AtomicBool>,
69 logged_on: Arc<AtomicBool>,
70 seq_num: Arc<AtomicUsize>,
71 received_seq_num: Arc<AtomicUsize>,
72 heartbeat_secs: u64,
73 processing_task: Option<Arc<JoinHandle<()>>>,
74 heartbeat_task: Option<Arc<JoinHandle<()>>>,
75}
76
77impl CoinbaseIntxFixClient {
78 pub fn new(
84 endpoint: Option<String>,
85 api_key: Option<String>,
86 api_secret: Option<String>,
87 api_passphrase: Option<String>,
88 portfolio_id: Option<String>,
89 ) -> anyhow::Result<Self> {
90 let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
91 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
92 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
93 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
94 let portfolio_id = portfolio_id.unwrap_or(get_env_var("COINBASE_INTX_PORTFOLIO_ID")?);
95 let sender_comp_id = api_key.to_string();
96 let target_comp_id = "CBINTLDC".to_string(); Ok(Self {
99 endpoint,
100 api_key,
101 api_secret,
102 api_passphrase,
103 portfolio_id,
104 sender_comp_id,
105 target_comp_id,
106 socket: None,
107 connected: Arc::new(AtomicBool::new(false)),
108 logged_on: Arc::new(AtomicBool::new(false)),
109 seq_num: Arc::new(AtomicUsize::new(1)),
110 received_seq_num: Arc::new(AtomicUsize::new(0)),
111 heartbeat_secs: 10, processing_task: None,
113 heartbeat_task: None,
114 })
115 }
116
117 pub fn from_env() -> anyhow::Result<Self> {
124 Self::new(None, None, None, None, None)
125 }
126
127 #[must_use]
129 pub const fn endpoint(&self) -> &str {
130 self.endpoint.as_str()
131 }
132
133 #[must_use]
135 pub const fn api_key(&self) -> &str {
136 self.api_key.as_str()
137 }
138
139 #[must_use]
141 pub const fn portfolio_id(&self) -> &str {
142 self.portfolio_id.as_str()
143 }
144
145 #[must_use]
147 pub const fn sender_comp_id(&self) -> &str {
148 self.sender_comp_id.as_str()
149 }
150
151 #[must_use]
153 pub const fn target_comp_id(&self) -> &str {
154 self.target_comp_id.as_str()
155 }
156
157 #[must_use]
159 pub fn is_connected(&self) -> bool {
160 self.connected.load(Ordering::SeqCst)
161 }
162
163 #[must_use]
165 pub fn is_logged_on(&self) -> bool {
166 self.logged_on.load(Ordering::SeqCst)
167 }
168
169 pub async fn connect(&mut self, handler: PyObject) -> anyhow::Result<()> {
179 let config = SocketConfig {
180 url: self.endpoint.clone(),
181 mode: Mode::Tls,
182 suffix: vec![FIX_DELIMITER],
183 #[cfg(feature = "python")]
184 py_handler: None, heartbeat: None, reconnect_timeout_ms: Some(10000),
187 reconnect_delay_initial_ms: Some(5000),
188 reconnect_delay_max_ms: Some(30000),
189 reconnect_backoff_factor: Some(1.5),
190 reconnect_jitter_ms: Some(500),
191 certs_dir: None,
192 };
193
194 let logged_on = self.logged_on.clone();
195 let seq_num = self.seq_num.clone();
196 let received_seq_num = self.received_seq_num.clone();
197 let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
198
199 let handle_message = Arc::new(move |data: &[u8]| {
200 if let Ok(message) = FixMessage::parse(data) {
201 if let Some(msg_seq) = message.msg_seq_num() {
203 received_seq_num.store(msg_seq, Ordering::SeqCst);
204 }
205
206 if let Some(msg_type) = message.msg_type() {
208 match msg_type {
209 fix_message_type::LOGON => {
210 tracing::info!("Logon successful");
211 logged_on.store(true, Ordering::SeqCst);
212 }
213 fix_message_type::LOGOUT => {
214 tracing::info!("Received logout");
215 logged_on.store(false, Ordering::SeqCst);
216 }
217 fix_message_type::EXECUTION_REPORT => {
218 if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
219 if matches!(
220 exec_type,
221 fix_exec_type::REJECTED
222 | fix_exec_type::NEW
223 | fix_exec_type::PENDING_NEW
224 ) {
225 tracing::debug!(
227 "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
228 );
229 } else if matches!(
230 exec_type,
231 fix_exec_type::CANCELED
232 | fix_exec_type::EXPIRED
233 | fix_exec_type::REPLACED
234 ) {
235 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
237 match convert_to_order_status_report(
238 &message, account_id, ts_init,
239 ) {
240 Ok(report) => Python::with_gil(|py| {
241 call_python(
242 py,
243 &handler,
244 report.into_py_any_unwrap(py),
245 );
246 }),
247 Err(e) => {
248 tracing::error!(
249 "Failed to parse FIX execution report: {e}"
250 );
251 }
252 }
253 } else if exec_type == fix_exec_type::PARTIAL_FILL
254 || exec_type == fix_exec_type::FILL
255 {
256 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
258 match convert_to_fill_report(&message, account_id, ts_init) {
259 Ok(report) => Python::with_gil(|py| {
260 call_python(
261 py,
262 &handler,
263 report.into_py_any_unwrap(py),
264 );
265 }),
266 Err(e) => {
267 tracing::error!(
268 "Failed to parse FIX execution report: {e}"
269 );
270 }
271 }
272 } else {
273 tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
274 }
275 }
276 }
277 _ => tracing::trace!("Recieved unexpected {message:?}"),
281 }
282 }
283 } else {
284 tracing::error!("Failed to parse FIX message");
285 }
286 });
287
288 let socket =
289 match SocketClient::connect(config, Some(handle_message), None, None, None).await {
290 Ok(socket) => socket,
291 Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
292 };
293
294 let writer_tx = socket.writer_tx.clone();
295
296 self.socket = Some(Arc::new(socket));
297
298 self.send_logon().await?;
299
300 let connected_clone = self.connected.clone();
302 let logged_on_clone = self.logged_on.clone();
303 let heartbeat_secs = self.heartbeat_secs;
304 let client_clone = self.clone();
305
306 self.processing_task = Some(Arc::new(tokio::spawn(async move {
307 log_task_started("maintain-fix-connection");
308
309 let mut last_logon_attempt = std::time::Instant::now()
310 .checked_sub(Duration::from_secs(10))
311 .unwrap();
312
313 loop {
314 tokio::time::sleep(Duration::from_millis(100)).await;
315
316 if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
318 {
319 if last_logon_attempt.elapsed() > Duration::from_secs(10) {
321 tracing::info!("Connected without logon");
322 last_logon_attempt = std::time::Instant::now();
323
324 if let Err(e) = client_clone.send_logon().await {
325 tracing::error!("Failed to send logon: {e}");
326 }
327 }
328 }
329 }
330 })));
331
332 let logged_on_clone = self.logged_on.clone();
333 let sender_comp_id = self.sender_comp_id.clone();
334 let target_comp_id = self.target_comp_id.clone();
335
336 self.heartbeat_task = Some(Arc::new(tokio::spawn(async move {
337 log_task_started("heartbeat");
338 tracing::debug!("Heartbeat at {heartbeat_secs}s intervals");
339
340 let interval = Duration::from_secs(heartbeat_secs);
341
342 loop {
343 if logged_on_clone.load(Ordering::SeqCst) {
344 let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
346 let now = chrono::Utc::now();
347 let msg =
348 FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
349
350 if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
351 tracing::error!("Failed to send heartbeat: {e}");
352 break;
353 }
354
355 tracing::trace!("Sent heartbeat");
356 } else {
357 tracing::debug!("No longer logged on, stopping heartbeat task");
359 break;
360 }
361
362 tokio::time::sleep(interval).await;
363 }
364
365 log_task_stopped("heartbeat");
366 })));
367
368 Ok(())
369 }
370
371 pub async fn close(&mut self) -> anyhow::Result<()> {
377 if self.is_logged_on() {
379 if let Err(e) = self.send_logout("Normal logout").await {
380 tracing::warn!("Failed to send logout message: {e}");
381 }
382 }
383
384 if let Some(socket) = &self.socket {
386 socket.close().await;
387 }
388
389 if let Some(task) = self.processing_task.take() {
391 task.abort();
392 }
393
394 if let Some(task) = self.heartbeat_task.take() {
396 task.abort();
397 }
398
399 self.connected.store(false, Ordering::SeqCst);
400 self.logged_on.store(false, Ordering::SeqCst);
401
402 Ok(())
403 }
404
405 async fn send_logon(&self) -> anyhow::Result<()> {
407 if self.socket.is_none() {
408 anyhow::bail!("Socket not connected".to_string());
409 }
410
411 self.seq_num.store(1, Ordering::SeqCst);
413
414 let now = chrono::Utc::now();
415 let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
416 let passphrase = self.api_passphrase.clone();
417
418 let message = format!(
419 "{}{}{}{}",
420 timestamp, self.api_key, self.target_comp_id, passphrase
421 );
422
423 let decoded_secret = BASE64_STANDARD
425 .decode(&self.api_secret)
426 .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
427
428 let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
429 let signature = hmac::sign(&hmac_key, message.as_bytes());
430 let encoded_signature = BASE64_STANDARD.encode(signature);
431
432 let logon_msg = FixMessage::create_logon(
433 1, &self.sender_comp_id,
435 &self.target_comp_id,
436 self.heartbeat_secs,
437 &self.api_key,
438 &passphrase,
439 &encoded_signature,
440 &now,
441 );
442
443 if let Some(socket) = &self.socket {
444 tracing::info!("Logging on...");
445
446 match socket.send_bytes(logon_msg.to_bytes()).await {
447 Ok(()) => tracing::debug!("Sent logon message"),
448 Err(e) => tracing::error!("Error on logon: {e}"),
449 }
450 } else {
451 anyhow::bail!("Socket not connected".to_string());
452 }
453
454 let start = std::time::Instant::now();
455 while !self.is_logged_on() {
456 tokio::time::sleep(Duration::from_millis(100)).await;
457
458 if start.elapsed() > Duration::from_secs(10) {
459 anyhow::bail!("Logon timeout".to_string());
460 }
461 }
462
463 self.logged_on.store(true, Ordering::SeqCst);
464
465 Ok(())
466 }
467
468 async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
470 if self.socket.is_none() {
471 anyhow::bail!("Socket not connected".to_string());
472 }
473
474 let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
475 let now = chrono::Utc::now();
476
477 let logout_msg = FixMessage::create_logout(
478 seq_num,
479 &self.sender_comp_id,
480 &self.target_comp_id,
481 Some(text),
482 &now,
483 );
484
485 if let Some(socket) = &self.socket {
486 match socket.send_bytes(logout_msg.to_bytes()).await {
487 Ok(()) => tracing::debug!("Sent logout message"),
488 Err(e) => tracing::error!("Error on logout: {e}"),
489 }
490 } else {
491 anyhow::bail!("Socket not connected".to_string());
492 }
493
494 Ok(())
495 }
496}
497
498pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
500 if let Err(e) = callback.call1(py, (py_obj,)) {
501 tracing::error!("Error calling Python: {e}");
502 }
503}