1use std::{
24 collections::HashMap,
25 num::NonZeroU32,
26 sync::{Arc, LazyLock, Mutex},
27};
28
29use chrono::{DateTime, Utc};
30use nautilus_core::{
31 UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
32};
33use nautilus_model::{
34 enums::{OrderSide, OrderType, TimeInForce},
35 events::AccountState,
36 identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 reports::{FillReport, OrderStatusReport, PositionStatusReport},
39 types::{Price, Quantity},
40};
41use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
42use reqwest::{Method, StatusCode, header::USER_AGENT};
43use serde::{Deserialize, Serialize, de::DeserializeOwned};
44use ustr::Ustr;
45
46use super::{
47 error::CoinbaseIntxHttpError,
48 models::{
49 CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
50 CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
51 CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
52 CoinbaseIntxPosition,
53 },
54 parse::{
55 parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
56 parse_position_status_report,
57 },
58 query::{
59 CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
60 GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
61 GetPortfolioFillsParamsBuilder, ModifyOrderParams,
62 },
63};
64use crate::{
65 common::{
66 consts::COINBASE_INTX_REST_URL,
67 credential::Credential,
68 enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
69 },
70 http::{
71 error::ErrorBody,
72 query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
73 },
74};
75
76#[derive(Debug, Serialize, Deserialize)]
78pub struct CoinbaseIntxResponse<T> {
79 pub code: String,
81 pub msg: String,
83 pub data: Vec<T>,
85}
86
87pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
89 LazyLock::new(|| Quota::per_second(NonZeroU32::new(40).unwrap()));
90
91#[derive(Debug, Clone)]
97pub struct CoinbaseIntxHttpInnerClient {
98 base_url: String,
99 client: HttpClient,
100 credential: Option<Credential>,
101}
102
103impl Default for CoinbaseIntxHttpInnerClient {
104 fn default() -> Self {
105 Self::new(None, Some(60))
106 }
107}
108
109impl CoinbaseIntxHttpInnerClient {
110 #[must_use]
116 pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
117 Self {
118 base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
119 client: HttpClient::new(
120 Self::default_headers(),
121 vec![],
122 vec![],
123 Some(*COINBASE_INTX_REST_QUOTA),
124 timeout_secs,
125 ),
126 credential: None,
127 }
128 }
129
130 #[must_use]
133 pub fn with_credentials(
134 api_key: String,
135 api_secret: String,
136 api_passphrase: String,
137 base_url: String,
138 timeout_secs: Option<u64>,
139 ) -> Self {
140 Self {
141 base_url,
142 client: HttpClient::new(
143 Self::default_headers(),
144 vec![],
145 vec![],
146 Some(*COINBASE_INTX_REST_QUOTA),
147 timeout_secs,
148 ),
149 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
150 }
151 }
152
153 fn default_headers() -> HashMap<String, String> {
155 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
156 }
157
158 fn sign_request(
165 &self,
166 method: &Method,
167 path: &str,
168 body: Option<&[u8]>,
169 ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
170 let credential = match self.credential.as_ref() {
171 Some(c) => c,
172 None => return Err(CoinbaseIntxHttpError::MissingCredentials),
173 };
174
175 let api_key = credential.api_key.clone().to_string();
176 let api_passphrase = credential.api_passphrase.clone().to_string();
177 let timestamp = Utc::now().timestamp().to_string();
178 let body_str = body
179 .and_then(|b| String::from_utf8(b.to_vec()).ok())
180 .unwrap_or_default();
181
182 let signature = credential.sign(×tamp, method.as_str(), path, &body_str);
183
184 let mut headers = HashMap::new();
185 headers.insert("Accept".to_string(), "application/json".to_string());
186 headers.insert("CB-ACCESS-KEY".to_string(), api_key);
187 headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
188 headers.insert("CB-ACCESS-SIGN".to_string(), signature);
189 headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
190 headers.insert("Content-Type".to_string(), "application/json".to_string());
191
192 Ok(headers)
193 }
194
195 async fn send_request<T: DeserializeOwned>(
202 &self,
203 method: Method,
204 path: &str,
205 body: Option<Vec<u8>>,
206 authenticate: bool,
207 ) -> Result<T, CoinbaseIntxHttpError> {
208 let url = format!("{}{}", self.base_url, path);
209
210 let headers = if authenticate {
211 Some(self.sign_request(&method, path, body.as_deref())?)
212 } else {
213 None
214 };
215
216 tracing::trace!("Request: {url:?} {body:?}");
217
218 let resp = self
219 .client
220 .request(method.clone(), url, headers, body, None, None)
221 .await?;
222
223 tracing::trace!("Response: {resp:?}");
224
225 if resp.status.is_success() {
226 let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
227 tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
228 CoinbaseIntxHttpError::JsonError(e.to_string())
229 })?;
230
231 Ok(coinbase_response)
232 } else {
233 let error_body = String::from_utf8_lossy(&resp.body);
234 tracing::error!(
235 "HTTP error {} with body: {error_body}",
236 resp.status.as_str()
237 );
238
239 if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
240 {
241 return Err(CoinbaseIntxHttpError::CoinbaseError {
242 error_code: parsed_error.code,
243 message: parsed_error.msg,
244 });
245 }
246
247 if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body) {
248 if let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error) {
249 return Err(CoinbaseIntxHttpError::CoinbaseError {
250 error_code: error,
251 message: title,
252 });
253 }
254 }
255
256 Err(CoinbaseIntxHttpError::UnexpectedStatus {
257 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
258 body: error_body.to_string(),
259 })
260 }
261 }
262
263 pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
270 let path = "/api/v1/assets";
271 self.send_request(Method::GET, path, None, false).await
272 }
273
274 pub async fn http_get_asset_details(
281 &self,
282 asset: &str,
283 ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
284 let path = format!("/api/v1/assets/{asset}");
285 self.send_request(Method::GET, &path, None, false).await
286 }
287
288 pub async fn http_list_instruments(
295 &self,
296 ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
297 let path = "/api/v1/instruments";
298 self.send_request(Method::GET, path, None, false).await
299 }
300
301 pub async fn http_get_instrument_details(
308 &self,
309 symbol: &str,
310 ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
311 let path = format!("/api/v1/instruments/{symbol}");
312 self.send_request(Method::GET, &path, None, false).await
313 }
314
315 pub async fn http_list_fee_rate_tiers(
322 &self,
323 ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
324 let path = "/api/v1/fee-rate-tiers";
325 self.send_request(Method::GET, path, None, true).await
326 }
327
328 pub async fn http_list_portfolios(
335 &self,
336 ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
337 let path = "/api/v1/portfolios";
338 self.send_request(Method::GET, path, None, true).await
339 }
340
341 pub async fn http_get_portfolio(
348 &self,
349 portfolio_id: &str,
350 ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
351 let path = format!("/api/v1/portfolios/{portfolio_id}");
352 self.send_request(Method::GET, &path, None, true).await
353 }
354
355 pub async fn http_get_portfolio_details(
362 &self,
363 portfolio_id: &str,
364 ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
365 let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
366 self.send_request(Method::GET, &path, None, true).await
367 }
368
369 pub async fn http_get_portfolio_summary(
376 &self,
377 portfolio_id: &str,
378 ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
379 let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
380 self.send_request(Method::GET, &path, None, true).await
381 }
382
383 pub async fn http_list_portfolio_balances(
390 &self,
391 portfolio_id: &str,
392 ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
393 let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
394 self.send_request(Method::GET, &path, None, true).await
395 }
396
397 pub async fn http_get_portfolio_balance(
404 &self,
405 portfolio_id: &str,
406 asset: &str,
407 ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
408 let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
409 self.send_request(Method::GET, &path, None, true).await
410 }
411
412 pub async fn http_list_portfolio_fills(
419 &self,
420 portfolio_id: &str,
421 params: GetPortfolioFillsParams,
422 ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
423 let query = serde_urlencoded::to_string(¶ms)
424 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
425 let path = format!("/api/v1/portfolios/{portfolio_id}/fills?{query}");
426 self.send_request(Method::GET, &path, None, true).await
427 }
428
429 pub async fn http_list_portfolio_positions(
436 &self,
437 portfolio_id: &str,
438 ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
439 let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
440 self.send_request(Method::GET, &path, None, true).await
441 }
442
443 pub async fn http_get_portfolio_position(
450 &self,
451 portfolio_id: &str,
452 symbol: &str,
453 ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
454 let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
455 self.send_request(Method::GET, &path, None, true).await
456 }
457
458 pub async fn http_list_portfolio_fee_rates(
465 &self,
466 ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
467 let path = "/api/v1/portfolios/fee-rates";
468 self.send_request(Method::GET, path, None, true).await
469 }
470
471 pub async fn http_create_order(
476 &self,
477 params: CreateOrderParams,
478 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
479 let path = "/api/v1/orders";
480 let body = serde_json::to_vec(¶ms)
481 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
482 self.send_request(Method::POST, path, Some(body), true)
483 .await
484 }
485
486 pub async fn http_get_order(
493 &self,
494 venue_order_id: &str,
495 portfolio_id: &str,
496 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
497 let params = GetOrderParams {
498 portfolio: portfolio_id.to_string(),
499 };
500 let query = serde_urlencoded::to_string(¶ms)
501 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
502 let path = format!("/api/v1/orders/{venue_order_id}?{query}");
503 self.send_request(Method::GET, &path, None, true).await
504 }
505
506 pub async fn http_list_open_orders(
514 &self,
515 params: GetOrdersParams,
516 ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
517 let query = serde_urlencoded::to_string(¶ms)
518 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
519 let path = format!("/api/v1/orders?{query}");
520 self.send_request(Method::GET, &path, None, true).await
521 }
522
523 pub async fn http_cancel_order(
528 &self,
529 client_order_id: &str,
530 portfolio_id: &str,
531 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
532 let params = CancelOrderParams {
533 portfolio: portfolio_id.to_string(),
534 };
535 let query = serde_urlencoded::to_string(¶ms)
536 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
537 let path = format!("/api/v1/orders/{client_order_id}?{query}");
538 self.send_request(Method::DELETE, &path, None, true).await
539 }
540
541 pub async fn http_cancel_orders(
546 &self,
547 params: CancelOrdersParams,
548 ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
549 let query = serde_urlencoded::to_string(¶ms)
550 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
551 let path = format!("/api/v1/orders?{query}");
552 self.send_request(Method::DELETE, &path, None, true).await
553 }
554
555 pub async fn http_modify_order(
562 &self,
563 order_id: &str,
564 params: ModifyOrderParams,
565 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
566 let path = format!("/api/v1/orders/{order_id}");
567 let body = serde_json::to_vec(¶ms)
568 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
569 self.send_request(Method::PUT, &path, Some(body), true)
570 .await
571 }
572}
573
574#[derive(Debug, Clone)]
579#[cfg_attr(
580 feature = "python",
581 pyo3::pyclass(module = "posei_trader.core.nautilus_pyo3.adapters")
582)]
583pub struct CoinbaseIntxHttpClient {
584 pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
585 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
586 cache_initialized: bool,
587}
588
589impl Default for CoinbaseIntxHttpClient {
590 fn default() -> Self {
591 Self::new(None, Some(60))
592 }
593}
594
595impl CoinbaseIntxHttpClient {
596 #[must_use]
602 pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
603 Self {
604 inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)),
605 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
606 cache_initialized: false,
607 }
608 }
609
610 pub fn from_env() -> anyhow::Result<Self> {
617 Self::with_credentials(None, None, None, None, None)
618 }
619
620 pub fn with_credentials(
627 api_key: Option<String>,
628 api_secret: Option<String>,
629 api_passphrase: Option<String>,
630 base_url: Option<String>,
631 timeout_secs: Option<u64>,
632 ) -> anyhow::Result<Self> {
633 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
634 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
635 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
636 let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
637 Ok(Self {
638 inner: Arc::new(CoinbaseIntxHttpInnerClient::with_credentials(
639 api_key,
640 api_secret,
641 api_passphrase,
642 base_url,
643 timeout_secs,
644 )),
645 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
646 cache_initialized: false,
647 })
648 }
649
650 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
651 match self.instruments_cache.lock().unwrap().get(&symbol) {
652 Some(inst) => Ok(inst.clone()), None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
654 }
655 }
656
657 fn generate_ts_init(&self) -> UnixNanos {
658 get_atomic_clock_realtime().get_time_ns()
659 }
660
661 #[must_use]
663 pub fn base_url(&self) -> &str {
664 self.inner.base_url.as_str()
665 }
666
667 #[must_use]
669 pub fn api_key(&self) -> Option<&str> {
670 self.inner.credential.clone().map(|c| c.api_key.as_str())
671 }
672
673 #[must_use]
677 pub const fn is_initialized(&self) -> bool {
678 self.cache_initialized
679 }
680
681 #[must_use]
687 pub fn get_cached_symbols(&self) -> Vec<String> {
688 self.instruments_cache
689 .lock()
690 .unwrap()
691 .keys()
692 .map(std::string::ToString::to_string)
693 .collect()
694 }
695
696 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
704 for inst in instruments {
705 self.instruments_cache
706 .lock()
707 .unwrap()
708 .insert(inst.raw_symbol().inner(), inst);
709 }
710 self.cache_initialized = true;
711 }
712
713 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
721 self.instruments_cache
722 .lock()
723 .unwrap()
724 .insert(instrument.raw_symbol().inner(), instrument);
725 self.cache_initialized = true;
726 }
727
728 pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
734 let resp = self
735 .inner
736 .http_list_portfolios()
737 .await
738 .map_err(|e| anyhow::anyhow!(e))?;
739
740 Ok(resp)
741 }
742
743 pub async fn request_account_state(
749 &self,
750 account_id: AccountId,
751 ) -> anyhow::Result<AccountState> {
752 let resp = self
753 .inner
754 .http_list_portfolio_balances(account_id.get_issuers_id())
755 .await
756 .map_err(|e| anyhow::anyhow!(e))?;
757
758 let ts_init = self.generate_ts_init();
759 let account_state = parse_account_state(resp, account_id, ts_init)?;
760
761 Ok(account_state)
762 }
763
764 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
770 let resp = self
771 .inner
772 .http_list_instruments()
773 .await
774 .map_err(|e| anyhow::anyhow!(e))?;
775
776 let ts_init = self.generate_ts_init();
777
778 let mut instruments: Vec<InstrumentAny> = Vec::new();
779 for inst in &resp {
780 let instrument_any = parse_instrument_any(inst, ts_init);
781 if let Some(instrument_any) = instrument_any {
782 instruments.push(instrument_any);
783 }
784 }
785
786 Ok(instruments)
787 }
788
789 pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
795 let resp = self
796 .inner
797 .http_get_instrument_details(symbol.as_str())
798 .await
799 .map_err(|e| anyhow::anyhow!(e))?;
800
801 let ts_init = self.generate_ts_init();
802
803 match parse_instrument_any(&resp, ts_init) {
804 Some(inst) => Ok(inst),
805 None => anyhow::bail!("Unable to parse instrument"),
806 }
807 }
808
809 pub async fn request_order_status_report(
815 &self,
816 account_id: AccountId,
817 venue_order_id: VenueOrderId,
818 ) -> anyhow::Result<OrderStatusReport> {
819 let portfolio_id = account_id.get_issuers_id();
820
821 let resp = self
822 .inner
823 .http_get_order(venue_order_id.as_str(), portfolio_id)
824 .await
825 .map_err(|e| anyhow::anyhow!(e))?;
826
827 let instrument = self.get_instrument_from_cache(resp.symbol)?;
828 let ts_init = self.generate_ts_init();
829
830 let report = parse_order_status_report(
831 resp,
832 account_id,
833 instrument.price_precision(),
834 instrument.size_precision(),
835 ts_init,
836 )?;
837 Ok(report)
838 }
839
840 pub async fn request_order_status_reports(
846 &self,
847 account_id: AccountId,
848 symbol: Symbol,
849 ) -> anyhow::Result<Vec<OrderStatusReport>> {
850 let portfolio_id = account_id.get_issuers_id();
851
852 let mut params = GetOrdersParamsBuilder::default();
853 params.portfolio(portfolio_id);
854 params.instrument(symbol.as_str());
855 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
856
857 let resp = self
858 .inner
859 .http_list_open_orders(params)
860 .await
861 .map_err(|e| anyhow::anyhow!(e))?;
862
863 let ts_init = get_atomic_clock_realtime().get_time_ns();
864
865 let mut reports: Vec<OrderStatusReport> = Vec::new();
866 for order in resp.results {
867 let instrument = self.get_instrument_from_cache(order.symbol)?;
868 let report = parse_order_status_report(
869 order,
870 account_id,
871 instrument.price_precision(),
872 instrument.size_precision(),
873 ts_init,
874 )?;
875 reports.push(report);
876 }
877
878 Ok(reports)
879 }
880
881 pub async fn request_fill_reports(
887 &self,
888 account_id: AccountId,
889 client_order_id: Option<ClientOrderId>,
890 start: Option<DateTime<Utc>>,
891 ) -> anyhow::Result<Vec<FillReport>> {
892 let portfolio_id = account_id.get_issuers_id();
893
894 let mut params = GetPortfolioFillsParamsBuilder::default();
895 if let Some(start) = start {
896 params.time_from(start);
897 }
898 if let Some(client_order_id) = client_order_id {
899 params.client_order_id(client_order_id.to_string());
900 }
901 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
902
903 let resp = self
904 .inner
905 .http_list_portfolio_fills(portfolio_id, params)
906 .await
907 .map_err(|e| anyhow::anyhow!(e))?;
908
909 let ts_init = get_atomic_clock_realtime().get_time_ns();
910
911 let mut reports: Vec<FillReport> = Vec::new();
912 for fill in resp.results {
913 let instrument = self.get_instrument_from_cache(fill.symbol)?;
914 let report = parse_fill_report(
915 fill,
916 account_id,
917 instrument.price_precision(),
918 instrument.size_precision(),
919 ts_init,
920 )?;
921 reports.push(report);
922 }
923
924 Ok(reports)
925 }
926
927 pub async fn request_position_status_report(
933 &self,
934 account_id: AccountId,
935 symbol: Symbol,
936 ) -> anyhow::Result<PositionStatusReport> {
937 let portfolio_id = account_id.get_issuers_id();
938
939 let resp = self
940 .inner
941 .http_get_portfolio_position(portfolio_id, symbol.as_str())
942 .await
943 .map_err(|e| anyhow::anyhow!(e))?;
944
945 let instrument = self.get_instrument_from_cache(resp.symbol)?;
946 let ts_init = get_atomic_clock_realtime().get_time_ns();
947
948 let report =
949 parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
950 Ok(report)
951 }
952
953 pub async fn request_position_status_reports(
959 &self,
960 account_id: AccountId,
961 ) -> anyhow::Result<Vec<PositionStatusReport>> {
962 let portfolio_id = account_id.get_issuers_id();
963
964 let resp = self
965 .inner
966 .http_list_portfolio_positions(portfolio_id)
967 .await
968 .map_err(|e| anyhow::anyhow!(e))?;
969
970 let ts_init = get_atomic_clock_realtime().get_time_ns();
971
972 let mut reports: Vec<PositionStatusReport> = Vec::new();
973 for position in resp {
974 let instrument = self.get_instrument_from_cache(position.symbol)?;
975 let report = parse_position_status_report(
976 position,
977 account_id,
978 instrument.size_precision(),
979 ts_init,
980 )?;
981 reports.push(report);
982 }
983
984 Ok(reports)
985 }
986
987 #[allow(clippy::too_many_arguments)]
993 pub async fn submit_order(
994 &self,
995 account_id: AccountId,
996 client_order_id: ClientOrderId,
997 symbol: Symbol,
998 order_side: OrderSide,
999 order_type: OrderType,
1000 quantity: Quantity,
1001 time_in_force: TimeInForce,
1002 expire_time: Option<DateTime<Utc>>,
1003 price: Option<Price>,
1004 trigger_price: Option<Price>,
1005 post_only: Option<bool>,
1006 reduce_only: Option<bool>,
1007 ) -> anyhow::Result<OrderStatusReport> {
1008 let coinbase_side: CoinbaseIntxSide = order_side.into();
1009 let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1010 let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1011
1012 let mut params = CreateOrderParamsBuilder::default();
1013 params.portfolio(account_id.get_issuers_id());
1014 params.client_order_id(client_order_id.as_str());
1015 params.instrument(symbol.as_str());
1016 params.side(coinbase_side);
1017 params.size(quantity.to_string());
1018 params.order_type(coinbase_order_type);
1019 params.tif(coinbase_tif);
1020 if let Some(expire_time) = expire_time {
1021 params.expire_time(expire_time);
1022 }
1023 if let Some(price) = price {
1024 params.price(price.to_string());
1025 }
1026 if let Some(trigger_price) = trigger_price {
1027 params.stop_price(trigger_price.to_string());
1028 }
1029 if let Some(post_only) = post_only {
1030 params.post_only(post_only);
1031 }
1032 if let Some(reduce_only) = reduce_only {
1033 params.close_only(reduce_only);
1034 }
1035 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1036
1037 let resp = self.inner.http_create_order(params).await?;
1038 tracing::debug!("Submitted order: {resp:?}");
1039
1040 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1041 let ts_init = get_atomic_clock_realtime().get_time_ns();
1042 let report = parse_order_status_report(
1043 resp,
1044 account_id,
1045 instrument.price_precision(),
1046 instrument.size_precision(),
1047 ts_init,
1048 )?;
1049 Ok(report)
1050 }
1051
1052 pub async fn cancel_order(
1058 &self,
1059 account_id: AccountId,
1060 client_order_id: ClientOrderId,
1061 ) -> anyhow::Result<OrderStatusReport> {
1062 let portfolio_id = account_id.get_issuers_id();
1063
1064 let resp = self
1065 .inner
1066 .http_cancel_order(client_order_id.as_str(), portfolio_id)
1067 .await?;
1068 tracing::debug!("Canceled order: {resp:?}");
1069
1070 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1071 let ts_init = get_atomic_clock_realtime().get_time_ns();
1072
1073 let report = parse_order_status_report(
1074 resp,
1075 account_id,
1076 instrument.price_precision(),
1077 instrument.size_precision(),
1078 ts_init,
1079 )?;
1080 Ok(report)
1081 }
1082
1083 pub async fn cancel_orders(
1089 &self,
1090 account_id: AccountId,
1091 symbol: Symbol,
1092 order_side: Option<OrderSide>,
1093 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1094 let mut params = CancelOrdersParamsBuilder::default();
1095 params.portfolio(account_id.get_issuers_id());
1096 params.instrument(symbol.as_str());
1097 if let Some(side) = order_side {
1098 let side: CoinbaseIntxSide = side.into();
1099 params.side(side);
1100 }
1101 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1102
1103 let resp = self.inner.http_cancel_orders(params).await?;
1104
1105 let instrument = self.get_instrument_from_cache(symbol.inner())?;
1106 let ts_init = get_atomic_clock_realtime().get_time_ns();
1107
1108 let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1109 for order in resp {
1110 tracing::debug!("Canceled order: {order:?}");
1111 let report = parse_order_status_report(
1112 order,
1113 account_id,
1114 instrument.price_precision(),
1115 instrument.size_precision(),
1116 ts_init,
1117 )?;
1118 reports.push(report);
1119 }
1120
1121 Ok(reports)
1122 }
1123
1124 #[allow(clippy::too_many_arguments)]
1130 pub async fn modify_order(
1131 &self,
1132 account_id: AccountId,
1133 client_order_id: ClientOrderId,
1134 new_client_order_id: ClientOrderId,
1135 price: Option<Price>,
1136 trigger_price: Option<Price>,
1137 quantity: Option<Quantity>,
1138 ) -> anyhow::Result<OrderStatusReport> {
1139 let mut params = ModifyOrderParamsBuilder::default();
1140 params.portfolio(account_id.get_issuers_id());
1141 params.client_order_id(new_client_order_id.as_str());
1142 if let Some(price) = price {
1143 params.price(price.to_string());
1144 }
1145 if let Some(trigger_price) = trigger_price {
1146 params.price(trigger_price.to_string());
1147 }
1148 if let Some(quantity) = quantity {
1149 params.size(quantity.to_string());
1150 }
1151 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1152
1153 let resp = self
1154 .inner
1155 .http_modify_order(client_order_id.as_str(), params)
1156 .await?;
1157 tracing::debug!("Modified order {}", resp.client_order_id);
1158
1159 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1160 let ts_init = get_atomic_clock_realtime().get_time_ns();
1161 let report = parse_order_status_report(
1162 resp,
1163 account_id,
1164 instrument.price_precision(),
1165 instrument.size_precision(),
1166 ts_init,
1167 )?;
1168 Ok(report)
1169 }
1170}