nautilus_coinbase_intx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [Coinbase International](https://www.coinbase.com/en/international-exchange) REST API.
17//!
18//! This module defines and implements a [`CoinbaseIntxHttpClient`] for
19//! sending requests to various Coinbase endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`CoinbaseIntxHttpError`].
22
23use std::{
24    collections::HashMap,
25    num::NonZeroU32,
26    sync::{Arc, LazyLock, Mutex},
27};
28
29use chrono::{DateTime, Utc};
30use nautilus_core::{
31    UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
32};
33use nautilus_model::{
34    enums::{OrderSide, OrderType, TimeInForce},
35    events::AccountState,
36    identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    reports::{FillReport, OrderStatusReport, PositionStatusReport},
39    types::{Price, Quantity},
40};
41use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
42use reqwest::{Method, StatusCode, header::USER_AGENT};
43use serde::{Deserialize, Serialize, de::DeserializeOwned};
44use ustr::Ustr;
45
46use super::{
47    error::CoinbaseIntxHttpError,
48    models::{
49        CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
50        CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
51        CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
52        CoinbaseIntxPosition,
53    },
54    parse::{
55        parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
56        parse_position_status_report,
57    },
58    query::{
59        CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
60        GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
61        GetPortfolioFillsParamsBuilder, ModifyOrderParams,
62    },
63};
64use crate::{
65    common::{
66        consts::COINBASE_INTX_REST_URL,
67        credential::Credential,
68        enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
69    },
70    http::{
71        error::ErrorBody,
72        query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
73    },
74};
75
76/// Represents an Coinbase HTTP response.
77#[derive(Debug, Serialize, Deserialize)]
78pub struct CoinbaseIntxResponse<T> {
79    /// The Coinbase response code, which is `"0"` for success.
80    pub code: String,
81    /// A message string which can be informational or describe an error cause.
82    pub msg: String,
83    /// The typed data returned by the Coinbase endpoint.
84    pub data: Vec<T>,
85}
86
87// https://docs.cdp.coinbase.com/intx/docs/rate-limits#rest-api-rate-limits
88pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
89    LazyLock::new(|| Quota::per_second(NonZeroU32::new(40).unwrap()));
90
91/// Provides a lower-level HTTP client for connecting to the [Coinbase International](https://coinbase.com) REST API.
92///
93/// This client wraps the underlying `HttpClient` to handle functionality
94/// specific to Coinbase, such as request signing (for authenticated endpoints),
95/// forming request URLs, and deserializing responses into specific data models.
96#[derive(Debug, Clone)]
97pub struct CoinbaseIntxHttpInnerClient {
98    base_url: String,
99    client: HttpClient,
100    credential: Option<Credential>,
101}
102
103impl Default for CoinbaseIntxHttpInnerClient {
104    fn default() -> Self {
105        Self::new(None, Some(60))
106    }
107}
108
109impl CoinbaseIntxHttpInnerClient {
110    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
111    /// optionally overridden with a custom base url.
112    ///
113    /// This version of the client has **no credentials**, so it can only
114    /// call publicly accessible endpoints.
115    #[must_use]
116    pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
117        Self {
118            base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
119            client: HttpClient::new(
120                Self::default_headers(),
121                vec![],
122                vec![],
123                Some(*COINBASE_INTX_REST_QUOTA),
124                timeout_secs,
125            ),
126            credential: None,
127        }
128    }
129
130    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
131    /// for authenticated requests, optionally using a custom base url.
132    #[must_use]
133    pub fn with_credentials(
134        api_key: String,
135        api_secret: String,
136        api_passphrase: String,
137        base_url: String,
138        timeout_secs: Option<u64>,
139    ) -> Self {
140        Self {
141            base_url,
142            client: HttpClient::new(
143                Self::default_headers(),
144                vec![],
145                vec![],
146                Some(*COINBASE_INTX_REST_QUOTA),
147                timeout_secs,
148            ),
149            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
150        }
151    }
152
153    /// Builds the default headers to include with each request (e.g., `User-Agent`).
154    fn default_headers() -> HashMap<String, String> {
155        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
156    }
157
158    /// Signs an Coinbase request with timestamp, API key, passphrase, and signature.
159    ///
160    /// # Errors
161    ///
162    /// Returns [`CoinbaseHttpError::MissingCredentials`] if no credentials are set
163    /// but the request requires authentication.
164    fn sign_request(
165        &self,
166        method: &Method,
167        path: &str,
168        body: Option<&[u8]>,
169    ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
170        let credential = match self.credential.as_ref() {
171            Some(c) => c,
172            None => return Err(CoinbaseIntxHttpError::MissingCredentials),
173        };
174
175        let api_key = credential.api_key.clone().to_string();
176        let api_passphrase = credential.api_passphrase.clone().to_string();
177        let timestamp = Utc::now().timestamp().to_string();
178        let body_str = body
179            .and_then(|b| String::from_utf8(b.to_vec()).ok())
180            .unwrap_or_default();
181
182        let signature = credential.sign(&timestamp, method.as_str(), path, &body_str);
183
184        let mut headers = HashMap::new();
185        headers.insert("Accept".to_string(), "application/json".to_string());
186        headers.insert("CB-ACCESS-KEY".to_string(), api_key);
187        headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
188        headers.insert("CB-ACCESS-SIGN".to_string(), signature);
189        headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
190        headers.insert("Content-Type".to_string(), "application/json".to_string());
191
192        Ok(headers)
193    }
194
195    /// Sends an HTTP request to Coinbase International and parses the response into type `T`.
196    ///
197    /// Internally, this method handles:
198    /// - Building the URL from `base_url` + `path`.
199    /// - Optionally signing the request.
200    /// - Deserializing JSON responses into typed models, or returning a [`CoinbaseIntxHttpError`].
201    async fn send_request<T: DeserializeOwned>(
202        &self,
203        method: Method,
204        path: &str,
205        body: Option<Vec<u8>>,
206        authenticate: bool,
207    ) -> Result<T, CoinbaseIntxHttpError> {
208        let url = format!("{}{}", self.base_url, path);
209
210        let headers = if authenticate {
211            Some(self.sign_request(&method, path, body.as_deref())?)
212        } else {
213            None
214        };
215
216        tracing::trace!("Request: {url:?} {body:?}");
217
218        let resp = self
219            .client
220            .request(method.clone(), url, headers, body, None, None)
221            .await?;
222
223        tracing::trace!("Response: {resp:?}");
224
225        if resp.status.is_success() {
226            let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
227                tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
228                CoinbaseIntxHttpError::JsonError(e.to_string())
229            })?;
230
231            Ok(coinbase_response)
232        } else {
233            let error_body = String::from_utf8_lossy(&resp.body);
234            tracing::error!(
235                "HTTP error {} with body: {error_body}",
236                resp.status.as_str()
237            );
238
239            if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
240            {
241                return Err(CoinbaseIntxHttpError::CoinbaseError {
242                    error_code: parsed_error.code,
243                    message: parsed_error.msg,
244                });
245            }
246
247            if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body) {
248                if let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error) {
249                    return Err(CoinbaseIntxHttpError::CoinbaseError {
250                        error_code: error,
251                        message: title,
252                    });
253                }
254            }
255
256            Err(CoinbaseIntxHttpError::UnexpectedStatus {
257                status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
258                body: error_body.to_string(),
259            })
260        }
261    }
262
263    /// Requests a list of all supported assets.
264    ///
265    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
266    /// # Errors
267    ///
268    /// Returns an error if the HTTP request fails or the response cannot be parsed.
269    pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
270        let path = "/api/v1/assets";
271        self.send_request(Method::GET, path, None, false).await
272    }
273
274    /// Requests information for a specific asset.
275    ///
276    /// See <https://docs.cdp.coinbase.com/intx/reference/getasset>.
277    /// # Errors
278    ///
279    /// Returns an error if the HTTP request fails or the response cannot be parsed.
280    pub async fn http_get_asset_details(
281        &self,
282        asset: &str,
283    ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
284        let path = format!("/api/v1/assets/{asset}");
285        self.send_request(Method::GET, &path, None, false).await
286    }
287
288    /// Requests all instruments available for trading.
289    ///
290    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstruments>.
291    /// # Errors
292    ///
293    /// Returns an error if the HTTP request fails or the response cannot be parsed.
294    pub async fn http_list_instruments(
295        &self,
296    ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
297        let path = "/api/v1/instruments";
298        self.send_request(Method::GET, path, None, false).await
299    }
300
301    /// Retrieve a list of instruments with open contracts.
302    ///
303    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstrument>.
304    /// # Errors
305    ///
306    /// Returns an error if the HTTP request fails or the response cannot be parsed.
307    pub async fn http_get_instrument_details(
308        &self,
309        symbol: &str,
310    ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
311        let path = format!("/api/v1/instruments/{symbol}");
312        self.send_request(Method::GET, &path, None, false).await
313    }
314
315    /// Return all the fee rate tiers.
316    ///
317    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
318    /// # Errors
319    ///
320    /// Returns an error if the HTTP request fails or the response cannot be parsed.
321    pub async fn http_list_fee_rate_tiers(
322        &self,
323    ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
324        let path = "/api/v1/fee-rate-tiers";
325        self.send_request(Method::GET, path, None, true).await
326    }
327
328    /// List all user portfolios.
329    ///
330    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolios>.
331    /// # Errors
332    ///
333    /// Returns an error if the HTTP request fails or the response cannot be parsed.
334    pub async fn http_list_portfolios(
335        &self,
336    ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
337        let path = "/api/v1/portfolios";
338        self.send_request(Method::GET, path, None, true).await
339    }
340
341    /// Returns the user's specified portfolio.
342    ///
343    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolio>.
344    /// # Errors
345    ///
346    /// Returns an error if the HTTP request fails or the response cannot be parsed.
347    pub async fn http_get_portfolio(
348        &self,
349        portfolio_id: &str,
350    ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
351        let path = format!("/api/v1/portfolios/{portfolio_id}");
352        self.send_request(Method::GET, &path, None, true).await
353    }
354
355    /// Retrieves the summary, positions, and balances of a portfolio.
356    ///
357    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliodetail>.
358    /// # Errors
359    ///
360    /// Returns an error if the HTTP request fails or the response cannot be parsed.
361    pub async fn http_get_portfolio_details(
362        &self,
363        portfolio_id: &str,
364    ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
365        let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
366        self.send_request(Method::GET, &path, None, true).await
367    }
368
369    /// Retrieves the high level overview of a portfolio.
370    ///
371    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosummary>.
372    /// # Errors
373    ///
374    /// Returns an error if the HTTP request fails or the response cannot be parsed.
375    pub async fn http_get_portfolio_summary(
376        &self,
377        portfolio_id: &str,
378    ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
379        let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
380        self.send_request(Method::GET, &path, None, true).await
381    }
382
383    /// Returns all balances for a given portfolio.
384    ///
385    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalances>.
386    /// # Errors
387    ///
388    /// Returns an error if the HTTP request fails or the response cannot be parsed.
389    pub async fn http_list_portfolio_balances(
390        &self,
391        portfolio_id: &str,
392    ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
393        let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
394        self.send_request(Method::GET, &path, None, true).await
395    }
396
397    /// Retrieves the balance for a given portfolio and asset.
398    ///
399    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalance>.
400    /// # Errors
401    ///
402    /// Returns an error if the HTTP request fails or the response cannot be parsed.
403    pub async fn http_get_portfolio_balance(
404        &self,
405        portfolio_id: &str,
406        asset: &str,
407    ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
408        let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
409        self.send_request(Method::GET, &path, None, true).await
410    }
411
412    /// Returns all fills for a given portfolio.
413    ///
414    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliofills>.
415    /// # Errors
416    ///
417    /// Returns an error if the HTTP request fails or the response cannot be parsed.
418    pub async fn http_list_portfolio_fills(
419        &self,
420        portfolio_id: &str,
421        params: GetPortfolioFillsParams,
422    ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
423        let query = serde_urlencoded::to_string(&params)
424            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
425        let path = format!("/api/v1/portfolios/{portfolio_id}/fills?{query}");
426        self.send_request(Method::GET, &path, None, true).await
427    }
428
429    /// Returns all positions for a given portfolio.
430    ///
431    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliopositions>.
432    /// # Errors
433    ///
434    /// Returns an error if the HTTP request fails or the response cannot be parsed.
435    pub async fn http_list_portfolio_positions(
436        &self,
437        portfolio_id: &str,
438    ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
439        let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
440        self.send_request(Method::GET, &path, None, true).await
441    }
442
443    /// Retrieves the position for a given portfolio and symbol.
444    ///
445    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolioposition>.
446    /// # Errors
447    ///
448    /// Returns an error if the HTTP request fails or the response cannot be parsed.
449    pub async fn http_get_portfolio_position(
450        &self,
451        portfolio_id: &str,
452        symbol: &str,
453    ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
454        let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
455        self.send_request(Method::GET, &path, None, true).await
456    }
457
458    /// Retrieves the Perpetual Future and Spot fee rate tiers for the user.
459    ///
460    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosfeerates>.
461    /// # Errors
462    ///
463    /// Returns an error if the HTTP request fails or the response cannot be parsed.
464    pub async fn http_list_portfolio_fee_rates(
465        &self,
466    ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
467        let path = "/api/v1/portfolios/fee-rates";
468        self.send_request(Method::GET, path, None, true).await
469    }
470
471    /// Create a new order.
472    /// # Errors
473    ///
474    /// Returns an error if the HTTP request fails or the response cannot be parsed.
475    pub async fn http_create_order(
476        &self,
477        params: CreateOrderParams,
478    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
479        let path = "/api/v1/orders";
480        let body = serde_json::to_vec(&params)
481            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
482        self.send_request(Method::POST, path, Some(body), true)
483            .await
484    }
485
486    /// Retrieves a single order. The order retrieved can be either active or inactive.
487    ///
488    /// See <https://docs.cdp.coinbase.com/intx/reference/getorder>.
489    /// # Errors
490    ///
491    /// Returns an error if the HTTP request fails or the response cannot be parsed.
492    pub async fn http_get_order(
493        &self,
494        venue_order_id: &str,
495        portfolio_id: &str,
496    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
497        let params = GetOrderParams {
498            portfolio: portfolio_id.to_string(),
499        };
500        let query = serde_urlencoded::to_string(&params)
501            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
502        let path = format!("/api/v1/orders/{venue_order_id}?{query}");
503        self.send_request(Method::GET, &path, None, true).await
504    }
505
506    /// Returns a list of active orders resting on the order book matching the requested criteria.
507    /// Does not return any rejected, cancelled, or fully filled orders as they are not active.
508    ///
509    /// See <https://docs.cdp.coinbase.com/intx/reference/getorders>.
510    /// # Errors
511    ///
512    /// Returns an error if the HTTP request fails or the response cannot be parsed.
513    pub async fn http_list_open_orders(
514        &self,
515        params: GetOrdersParams,
516    ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
517        let query = serde_urlencoded::to_string(&params)
518            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
519        let path = format!("/api/v1/orders?{query}");
520        self.send_request(Method::GET, &path, None, true).await
521    }
522
523    /// Cancels a single open order.
524    /// # Errors
525    ///
526    /// Returns an error if the HTTP request fails or the response cannot be parsed.
527    pub async fn http_cancel_order(
528        &self,
529        client_order_id: &str,
530        portfolio_id: &str,
531    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
532        let params = CancelOrderParams {
533            portfolio: portfolio_id.to_string(),
534        };
535        let query = serde_urlencoded::to_string(&params)
536            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
537        let path = format!("/api/v1/orders/{client_order_id}?{query}");
538        self.send_request(Method::DELETE, &path, None, true).await
539    }
540
541    /// Cancel user orders.
542    /// # Errors
543    ///
544    /// Returns an error if the HTTP request fails or the response cannot be parsed.
545    pub async fn http_cancel_orders(
546        &self,
547        params: CancelOrdersParams,
548    ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
549        let query = serde_urlencoded::to_string(&params)
550            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
551        let path = format!("/api/v1/orders?{query}");
552        self.send_request(Method::DELETE, &path, None, true).await
553    }
554
555    /// Modify an open order.
556    ///
557    /// See <https://docs.cdp.coinbase.com/intx/reference/modifyorder>.
558    /// # Errors
559    ///
560    /// Returns an error if the HTTP request fails or the response cannot be parsed.
561    pub async fn http_modify_order(
562        &self,
563        order_id: &str,
564        params: ModifyOrderParams,
565    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
566        let path = format!("/api/v1/orders/{order_id}");
567        let body = serde_json::to_vec(&params)
568            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
569        self.send_request(Method::PUT, &path, Some(body), true)
570            .await
571    }
572}
573
574/// Provides a higher-level HTTP client for the [Coinbase International](https://coinbase.com) REST API.
575///
576/// This client wraps the underlying `CoinbaseIntxHttpInnerClient` to handle conversions
577/// into the Posei domain model.
578#[derive(Debug, Clone)]
579#[cfg_attr(
580    feature = "python",
581    pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
582)]
583pub struct CoinbaseIntxHttpClient {
584    pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
585    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
586    cache_initialized: bool,
587}
588
589impl Default for CoinbaseIntxHttpClient {
590    fn default() -> Self {
591        Self::new(None, Some(60))
592    }
593}
594
595impl CoinbaseIntxHttpClient {
596    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
597    /// optionally overridden with a custom base url.
598    ///
599    /// This version of the client has **no credentials**, so it can only
600    /// call publicly accessible endpoints.
601    #[must_use]
602    pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
603        Self {
604            inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)),
605            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
606            cache_initialized: false,
607        }
608    }
609
610    /// Creates a new authenticated [`CoinbaseIntxHttpClient`] using environment variables and
611    /// the default Coinbase International HTTP base url.
612    ///
613    /// # Errors
614    ///
615    /// Returns an error if required environment variables are missing or invalid.
616    pub fn from_env() -> anyhow::Result<Self> {
617        Self::with_credentials(None, None, None, None, None)
618    }
619
620    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
621    /// for authenticated requests, optionally using a custom base url.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if required environment variables are missing or invalid.
626    pub fn with_credentials(
627        api_key: Option<String>,
628        api_secret: Option<String>,
629        api_passphrase: Option<String>,
630        base_url: Option<String>,
631        timeout_secs: Option<u64>,
632    ) -> anyhow::Result<Self> {
633        let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
634        let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
635        let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
636        let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
637        Ok(Self {
638            inner: Arc::new(CoinbaseIntxHttpInnerClient::with_credentials(
639                api_key,
640                api_secret,
641                api_passphrase,
642                base_url,
643                timeout_secs,
644            )),
645            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
646            cache_initialized: false,
647        })
648    }
649
650    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
651        match self.instruments_cache.lock().unwrap().get(&symbol) {
652            Some(inst) => Ok(inst.clone()), // TODO: Remove this clone
653            None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
654        }
655    }
656
657    fn generate_ts_init(&self) -> UnixNanos {
658        get_atomic_clock_realtime().get_time_ns()
659    }
660
661    /// Returns the base url being used by the client.
662    #[must_use]
663    pub fn base_url(&self) -> &str {
664        self.inner.base_url.as_str()
665    }
666
667    /// Returns the public API key being used by the client.
668    #[must_use]
669    pub fn api_key(&self) -> Option<&str> {
670        self.inner.credential.clone().map(|c| c.api_key.as_str())
671    }
672
673    /// Checks if the client is initialized.
674    ///
675    /// The client is considered initialized if any instruments have been cached from the venue.
676    #[must_use]
677    pub const fn is_initialized(&self) -> bool {
678        self.cache_initialized
679    }
680
681    /// Returns the cached instrument symbols.
682    ///
683    /// # Panics
684    ///
685    /// Panics if the instrument cache mutex is poisoned.
686    #[must_use]
687    pub fn get_cached_symbols(&self) -> Vec<String> {
688        self.instruments_cache
689            .lock()
690            .unwrap()
691            .keys()
692            .map(std::string::ToString::to_string)
693            .collect()
694    }
695
696    /// Adds the given instruments into the clients instrument cache.
697    ///
698    /// # Panics
699    ///
700    /// Panics if the instrument cache mutex is poisoned.
701    ///
702    /// Any existing instruments will be replaced.
703    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
704        for inst in instruments {
705            self.instruments_cache
706                .lock()
707                .unwrap()
708                .insert(inst.raw_symbol().inner(), inst);
709        }
710        self.cache_initialized = true;
711    }
712
713    /// Adds the given instrument into the clients instrument cache.
714    ///
715    /// # Panics
716    ///
717    /// Panics if the instrument cache mutex is poisoned.
718    ///
719    /// Any existing instrument will be replaced.
720    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
721        self.instruments_cache
722            .lock()
723            .unwrap()
724            .insert(instrument.raw_symbol().inner(), instrument);
725        self.cache_initialized = true;
726    }
727
728    /// Requests a list of portfolio details from Coinbase International.
729    ///
730    /// # Errors
731    ///
732    /// Returns an error if the HTTP request fails or the response cannot be parsed.
733    pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
734        let resp = self
735            .inner
736            .http_list_portfolios()
737            .await
738            .map_err(|e| anyhow::anyhow!(e))?;
739
740        Ok(resp)
741    }
742
743    /// Requests the account state for the given account ID from Coinbase International.
744    ///
745    /// # Errors
746    ///
747    /// Returns an error if the HTTP request fails or the response cannot be parsed.
748    pub async fn request_account_state(
749        &self,
750        account_id: AccountId,
751    ) -> anyhow::Result<AccountState> {
752        let resp = self
753            .inner
754            .http_list_portfolio_balances(account_id.get_issuers_id())
755            .await
756            .map_err(|e| anyhow::anyhow!(e))?;
757
758        let ts_init = self.generate_ts_init();
759        let account_state = parse_account_state(resp, account_id, ts_init)?;
760
761        Ok(account_state)
762    }
763
764    /// Requests all instruments from Coinbase International.
765    ///
766    /// # Errors
767    ///
768    /// Returns an error if the HTTP request fails or the response cannot be parsed.
769    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
770        let resp = self
771            .inner
772            .http_list_instruments()
773            .await
774            .map_err(|e| anyhow::anyhow!(e))?;
775
776        let ts_init = self.generate_ts_init();
777
778        let mut instruments: Vec<InstrumentAny> = Vec::new();
779        for inst in &resp {
780            let instrument_any = parse_instrument_any(inst, ts_init);
781            if let Some(instrument_any) = instrument_any {
782                instruments.push(instrument_any);
783            }
784        }
785
786        Ok(instruments)
787    }
788
789    /// Requests the instrument for the given symbol from Coinbase International.
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the HTTP request fails or the instrument cannot be parsed.
794    pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
795        let resp = self
796            .inner
797            .http_get_instrument_details(symbol.as_str())
798            .await
799            .map_err(|e| anyhow::anyhow!(e))?;
800
801        let ts_init = self.generate_ts_init();
802
803        match parse_instrument_any(&resp, ts_init) {
804            Some(inst) => Ok(inst),
805            None => anyhow::bail!("Unable to parse instrument"),
806        }
807    }
808
809    /// Requests an order status report for the given venue order ID from Coinbase International.
810    ///
811    /// # Errors
812    ///
813    /// Returns an error if the HTTP request fails or the response cannot be parsed.
814    pub async fn request_order_status_report(
815        &self,
816        account_id: AccountId,
817        venue_order_id: VenueOrderId,
818    ) -> anyhow::Result<OrderStatusReport> {
819        let portfolio_id = account_id.get_issuers_id();
820
821        let resp = self
822            .inner
823            .http_get_order(venue_order_id.as_str(), portfolio_id)
824            .await
825            .map_err(|e| anyhow::anyhow!(e))?;
826
827        let instrument = self.get_instrument_from_cache(resp.symbol)?;
828        let ts_init = self.generate_ts_init();
829
830        let report = parse_order_status_report(
831            resp,
832            account_id,
833            instrument.price_precision(),
834            instrument.size_precision(),
835            ts_init,
836        )?;
837        Ok(report)
838    }
839
840    /// Requests order status reports for all **open** orders from Coinbase International.
841    ///
842    /// # Errors
843    ///
844    /// Returns an error if the HTTP request fails or the response cannot be parsed.
845    pub async fn request_order_status_reports(
846        &self,
847        account_id: AccountId,
848        symbol: Symbol,
849    ) -> anyhow::Result<Vec<OrderStatusReport>> {
850        let portfolio_id = account_id.get_issuers_id();
851
852        let mut params = GetOrdersParamsBuilder::default();
853        params.portfolio(portfolio_id);
854        params.instrument(symbol.as_str());
855        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
856
857        let resp = self
858            .inner
859            .http_list_open_orders(params)
860            .await
861            .map_err(|e| anyhow::anyhow!(e))?;
862
863        let ts_init = get_atomic_clock_realtime().get_time_ns();
864
865        let mut reports: Vec<OrderStatusReport> = Vec::new();
866        for order in resp.results {
867            let instrument = self.get_instrument_from_cache(order.symbol)?;
868            let report = parse_order_status_report(
869                order,
870                account_id,
871                instrument.price_precision(),
872                instrument.size_precision(),
873                ts_init,
874            )?;
875            reports.push(report);
876        }
877
878        Ok(reports)
879    }
880
881    /// Requests all fill reports from Coinbase International.
882    ///
883    /// # Errors
884    ///
885    /// Returns an error if the HTTP request fails or the response cannot be parsed.
886    pub async fn request_fill_reports(
887        &self,
888        account_id: AccountId,
889        client_order_id: Option<ClientOrderId>,
890        start: Option<DateTime<Utc>>,
891    ) -> anyhow::Result<Vec<FillReport>> {
892        let portfolio_id = account_id.get_issuers_id();
893
894        let mut params = GetPortfolioFillsParamsBuilder::default();
895        if let Some(start) = start {
896            params.time_from(start);
897        }
898        if let Some(client_order_id) = client_order_id {
899            params.client_order_id(client_order_id.to_string());
900        }
901        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
902
903        let resp = self
904            .inner
905            .http_list_portfolio_fills(portfolio_id, params)
906            .await
907            .map_err(|e| anyhow::anyhow!(e))?;
908
909        let ts_init = get_atomic_clock_realtime().get_time_ns();
910
911        let mut reports: Vec<FillReport> = Vec::new();
912        for fill in resp.results {
913            let instrument = self.get_instrument_from_cache(fill.symbol)?;
914            let report = parse_fill_report(
915                fill,
916                account_id,
917                instrument.price_precision(),
918                instrument.size_precision(),
919                ts_init,
920            )?;
921            reports.push(report);
922        }
923
924        Ok(reports)
925    }
926
927    /// Requests a position status report from Coinbase International.
928    ///
929    /// # Errors
930    ///
931    /// Returns an error if the HTTP request fails or the response cannot be parsed.
932    pub async fn request_position_status_report(
933        &self,
934        account_id: AccountId,
935        symbol: Symbol,
936    ) -> anyhow::Result<PositionStatusReport> {
937        let portfolio_id = account_id.get_issuers_id();
938
939        let resp = self
940            .inner
941            .http_get_portfolio_position(portfolio_id, symbol.as_str())
942            .await
943            .map_err(|e| anyhow::anyhow!(e))?;
944
945        let instrument = self.get_instrument_from_cache(resp.symbol)?;
946        let ts_init = get_atomic_clock_realtime().get_time_ns();
947
948        let report =
949            parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
950        Ok(report)
951    }
952
953    /// Requests all position status reports from Coinbase International.
954    ///
955    /// # Errors
956    ///
957    /// Returns an error if the HTTP request fails or the response cannot be parsed.
958    pub async fn request_position_status_reports(
959        &self,
960        account_id: AccountId,
961    ) -> anyhow::Result<Vec<PositionStatusReport>> {
962        let portfolio_id = account_id.get_issuers_id();
963
964        let resp = self
965            .inner
966            .http_list_portfolio_positions(portfolio_id)
967            .await
968            .map_err(|e| anyhow::anyhow!(e))?;
969
970        let ts_init = get_atomic_clock_realtime().get_time_ns();
971
972        let mut reports: Vec<PositionStatusReport> = Vec::new();
973        for position in resp {
974            let instrument = self.get_instrument_from_cache(position.symbol)?;
975            let report = parse_position_status_report(
976                position,
977                account_id,
978                instrument.size_precision(),
979                ts_init,
980            )?;
981            reports.push(report);
982        }
983
984        Ok(reports)
985    }
986
987    /// Submits a new order to Coinbase International.
988    ///
989    /// # Errors
990    ///
991    /// Returns an error if the HTTP request fails or the response cannot be parsed.
992    #[allow(clippy::too_many_arguments)]
993    pub async fn submit_order(
994        &self,
995        account_id: AccountId,
996        client_order_id: ClientOrderId,
997        symbol: Symbol,
998        order_side: OrderSide,
999        order_type: OrderType,
1000        quantity: Quantity,
1001        time_in_force: TimeInForce,
1002        expire_time: Option<DateTime<Utc>>,
1003        price: Option<Price>,
1004        trigger_price: Option<Price>,
1005        post_only: Option<bool>,
1006        reduce_only: Option<bool>,
1007    ) -> anyhow::Result<OrderStatusReport> {
1008        let coinbase_side: CoinbaseIntxSide = order_side.into();
1009        let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1010        let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1011
1012        let mut params = CreateOrderParamsBuilder::default();
1013        params.portfolio(account_id.get_issuers_id());
1014        params.client_order_id(client_order_id.as_str());
1015        params.instrument(symbol.as_str());
1016        params.side(coinbase_side);
1017        params.size(quantity.to_string());
1018        params.order_type(coinbase_order_type);
1019        params.tif(coinbase_tif);
1020        if let Some(expire_time) = expire_time {
1021            params.expire_time(expire_time);
1022        }
1023        if let Some(price) = price {
1024            params.price(price.to_string());
1025        }
1026        if let Some(trigger_price) = trigger_price {
1027            params.stop_price(trigger_price.to_string());
1028        }
1029        if let Some(post_only) = post_only {
1030            params.post_only(post_only);
1031        }
1032        if let Some(reduce_only) = reduce_only {
1033            params.close_only(reduce_only);
1034        }
1035        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1036
1037        let resp = self.inner.http_create_order(params).await?;
1038        tracing::debug!("Submitted order: {resp:?}");
1039
1040        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1041        let ts_init = get_atomic_clock_realtime().get_time_ns();
1042        let report = parse_order_status_report(
1043            resp,
1044            account_id,
1045            instrument.price_precision(),
1046            instrument.size_precision(),
1047            ts_init,
1048        )?;
1049        Ok(report)
1050    }
1051
1052    /// Cancels a currently open order on Coinbase International.
1053    ///
1054    /// # Errors
1055    ///
1056    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1057    pub async fn cancel_order(
1058        &self,
1059        account_id: AccountId,
1060        client_order_id: ClientOrderId,
1061    ) -> anyhow::Result<OrderStatusReport> {
1062        let portfolio_id = account_id.get_issuers_id();
1063
1064        let resp = self
1065            .inner
1066            .http_cancel_order(client_order_id.as_str(), portfolio_id)
1067            .await?;
1068        tracing::debug!("Canceled order: {resp:?}");
1069
1070        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1071        let ts_init = get_atomic_clock_realtime().get_time_ns();
1072
1073        let report = parse_order_status_report(
1074            resp,
1075            account_id,
1076            instrument.price_precision(),
1077            instrument.size_precision(),
1078            ts_init,
1079        )?;
1080        Ok(report)
1081    }
1082
1083    /// Cancels all orders for the given account ID and filter params on Coinbase International.
1084    ///
1085    /// # Errors
1086    ///
1087    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1088    pub async fn cancel_orders(
1089        &self,
1090        account_id: AccountId,
1091        symbol: Symbol,
1092        order_side: Option<OrderSide>,
1093    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1094        let mut params = CancelOrdersParamsBuilder::default();
1095        params.portfolio(account_id.get_issuers_id());
1096        params.instrument(symbol.as_str());
1097        if let Some(side) = order_side {
1098            let side: CoinbaseIntxSide = side.into();
1099            params.side(side);
1100        }
1101        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1102
1103        let resp = self.inner.http_cancel_orders(params).await?;
1104
1105        let instrument = self.get_instrument_from_cache(symbol.inner())?;
1106        let ts_init = get_atomic_clock_realtime().get_time_ns();
1107
1108        let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1109        for order in resp {
1110            tracing::debug!("Canceled order: {order:?}");
1111            let report = parse_order_status_report(
1112                order,
1113                account_id,
1114                instrument.price_precision(),
1115                instrument.size_precision(),
1116                ts_init,
1117            )?;
1118            reports.push(report);
1119        }
1120
1121        Ok(reports)
1122    }
1123
1124    /// Modifies a currently open order on Coinbase International.
1125    ///
1126    /// # Errors
1127    ///
1128    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1129    #[allow(clippy::too_many_arguments)]
1130    pub async fn modify_order(
1131        &self,
1132        account_id: AccountId,
1133        client_order_id: ClientOrderId,
1134        new_client_order_id: ClientOrderId,
1135        price: Option<Price>,
1136        trigger_price: Option<Price>,
1137        quantity: Option<Quantity>,
1138    ) -> anyhow::Result<OrderStatusReport> {
1139        let mut params = ModifyOrderParamsBuilder::default();
1140        params.portfolio(account_id.get_issuers_id());
1141        params.client_order_id(new_client_order_id.as_str());
1142        if let Some(price) = price {
1143            params.price(price.to_string());
1144        }
1145        if let Some(trigger_price) = trigger_price {
1146            params.price(trigger_price.to_string());
1147        }
1148        if let Some(quantity) = quantity {
1149            params.size(quantity.to_string());
1150        }
1151        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1152
1153        let resp = self
1154            .inner
1155            .http_modify_order(client_order_id.as_str(), params)
1156            .await?;
1157        tracing::debug!("Modified order {}", resp.client_order_id);
1158
1159        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1160        let ts_init = get_atomic_clock_realtime().get_time_ns();
1161        let report = parse_order_status_report(
1162            resp,
1163            account_id,
1164            instrument.price_precision(),
1165            instrument.size_precision(),
1166            ts_init,
1167        )?;
1168        Ok(report)
1169    }
1170}