nautilus_coinbase_intx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_core::nanos::UnixNanos;
17use nautilus_model::{
18    data::{
19        Bar, BarType, BookOrder, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
20        OrderBookDeltas, QuoteTick, TradeTick,
21    },
22    enums::{AggregationSource, AggressorSide, BookAction, OrderSide, RecordFlag},
23    identifiers::{InstrumentId, Symbol, TradeId},
24    instruments::{CryptoPerpetual, CurrencyPair, any::InstrumentAny},
25    types::{Price, Quantity},
26};
27use rust_decimal::Decimal;
28
29use super::messages::{
30    CoinbaseIntxWsCandleSnapshotMsg, CoinbaseIntxWsInstrumentMsg,
31    CoinbaseIntxWsOrderBookSnapshotMsg, CoinbaseIntxWsOrderBookUpdateMsg, CoinbaseIntxWsQuoteMsg,
32    CoinbaseIntxWsRiskMsg, CoinbaseIntxWsTradeMsg,
33};
34use crate::common::{
35    enums::CoinbaseIntxInstrumentType,
36    parse::{coinbase_channel_as_bar_spec, get_currency, parse_instrument_id},
37};
38
39/// Parses a Coinbase spot instrument into an `InstrumentAny::CurrencyPair`.
40/// Parses a spot instrument message into an `InstrumentAny::CurrencyPair`.
41///
42/// # Errors
43///
44/// Returns an error if any numeric field cannot be parsed or required data is missing.
45pub fn parse_spot_instrument(
46    definition: &CoinbaseIntxWsInstrumentMsg,
47    margin_init: Option<Decimal>,
48    margin_maint: Option<Decimal>,
49    maker_fee: Option<Decimal>,
50    taker_fee: Option<Decimal>,
51    ts_init: UnixNanos,
52) -> anyhow::Result<InstrumentAny> {
53    let instrument_id = parse_instrument_id(definition.product_id);
54    let raw_symbol = Symbol::from_ustr_unchecked(definition.product_id);
55
56    let base_currency = get_currency(&definition.base_asset_name);
57    let quote_currency = get_currency(&definition.quote_asset_name);
58
59    let price_increment = Price::from(&definition.quote_increment);
60    let size_increment = Quantity::from(&definition.base_increment);
61
62    let lot_size = None;
63    let max_quantity = None;
64    let min_quantity = None;
65    let max_notional = None;
66    let min_notional = None;
67    let max_price = None;
68    let min_price = None;
69
70    let instrument = CurrencyPair::new(
71        instrument_id,
72        raw_symbol,
73        base_currency,
74        quote_currency,
75        price_increment.precision,
76        size_increment.precision,
77        price_increment,
78        size_increment,
79        lot_size,
80        max_quantity,
81        min_quantity,
82        max_notional,
83        min_notional,
84        max_price,
85        min_price,
86        margin_init,
87        margin_maint,
88        maker_fee,
89        taker_fee,
90        definition.time.into(),
91        ts_init,
92    );
93
94    Ok(InstrumentAny::CurrencyPair(instrument))
95}
96
97/// Parses a Coinbase perpetual instrument into an `InstrumentAny::CryptoPerpetual`.
98/// Parses a perpetual instrument message into an `InstrumentAny::CryptoPerpetual`.
99///
100/// # Errors
101///
102/// Returns an error if any numeric field cannot be parsed or required data is missing.
103pub fn parse_perp_instrument(
104    definition: &CoinbaseIntxWsInstrumentMsg,
105    margin_init: Option<Decimal>,
106    margin_maint: Option<Decimal>,
107    maker_fee: Option<Decimal>,
108    taker_fee: Option<Decimal>,
109    ts_init: UnixNanos,
110) -> anyhow::Result<InstrumentAny> {
111    let instrument_id = parse_instrument_id(definition.product_id);
112    let raw_symbol = Symbol::from_ustr_unchecked(definition.product_id);
113
114    let base_currency = get_currency(&definition.base_asset_name);
115    let quote_currency = get_currency(&definition.quote_asset_name);
116    let settlement_currency = quote_currency;
117
118    let price_increment = Price::from(&definition.quote_increment);
119    let size_increment = Quantity::from(&definition.base_increment);
120
121    let multiplier = Some(Quantity::from(&definition.base_asset_multiplier));
122
123    let lot_size = None;
124    let max_quantity = None;
125    let min_quantity = None;
126    let max_notional = None;
127    let min_notional = None;
128    let max_price = None;
129    let min_price = None;
130
131    let is_inverse = false;
132
133    let instrument = CryptoPerpetual::new(
134        instrument_id,
135        raw_symbol,
136        base_currency,
137        quote_currency,
138        settlement_currency,
139        is_inverse,
140        price_increment.precision,
141        size_increment.precision,
142        price_increment,
143        size_increment,
144        multiplier,
145        lot_size,
146        max_quantity,
147        min_quantity,
148        max_notional,
149        min_notional,
150        max_price,
151        min_price,
152        margin_init,
153        margin_maint,
154        maker_fee,
155        taker_fee,
156        definition.time.into(),
157        ts_init,
158    );
159
160    Ok(InstrumentAny::CryptoPerpetual(instrument))
161}
162
163#[must_use]
164pub fn parse_instrument_any(
165    instrument: &CoinbaseIntxWsInstrumentMsg,
166    ts_init: UnixNanos,
167) -> Option<InstrumentAny> {
168    let result = match instrument.instrument_type {
169        CoinbaseIntxInstrumentType::Spot => {
170            parse_spot_instrument(instrument, None, None, None, None, ts_init).map(Some)
171        }
172        CoinbaseIntxInstrumentType::Perp => {
173            parse_perp_instrument(instrument, None, None, None, None, ts_init).map(Some)
174        }
175        CoinbaseIntxInstrumentType::Index => {
176            tracing::warn!(
177                "Index instrument parsing not implemented {}",
178                instrument.product_id,
179            );
180            Ok(None)
181        }
182    };
183
184    match result {
185        Ok(instrument) => instrument,
186        Err(e) => {
187            tracing::warn!("Failed to parse instrument {}: {e}", instrument.product_id,);
188            None
189        }
190    }
191}
192
193/// Parses a snapshot order book message into `OrderBookDeltas`.
194///
195/// # Errors
196///
197/// Returns an error if any price or size value cannot be parsed.
198pub fn parse_orderbook_snapshot_msg(
199    msg: &CoinbaseIntxWsOrderBookSnapshotMsg,
200    instrument_id: InstrumentId,
201    price_precision: u8,
202    size_precision: u8,
203    ts_init: UnixNanos,
204) -> anyhow::Result<OrderBookDeltas> {
205    let ts_event = UnixNanos::from(msg.time);
206
207    // Set the snapshot flag
208    let flags = RecordFlag::F_SNAPSHOT.value();
209
210    // Allocate capacity for all bids and asks
211    let mut deltas = Vec::with_capacity(msg.bids.len() + msg.asks.len());
212
213    // Process bids - in Coinbase, bids are buy orders
214    for bid in &msg.bids {
215        let price_str = &bid[0];
216        let size_str = &bid[1];
217
218        let price = Price::new(
219            price_str
220                .parse::<f64>()
221                .map_err(|e| anyhow::anyhow!("Failed to parse bid price: {e}"))?,
222            price_precision,
223        );
224
225        let size = Quantity::new(
226            size_str
227                .parse::<f64>()
228                .map_err(|e| anyhow::anyhow!("Failed to parse bid size: {e}"))?,
229            size_precision,
230        );
231
232        // For bids (buy orders), we use OrderSide::Buy
233        let order_id = 0; // Not provided by Coinbase
234        let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
235
236        let delta = OrderBookDelta::new(
237            instrument_id,
238            BookAction::Add, // For snapshots, always use Add
239            order,
240            flags,
241            msg.sequence,
242            ts_event,
243            ts_init,
244        );
245
246        deltas.push(delta);
247    }
248
249    // Process asks - in Coinbase, asks are sell orders
250    for ask in &msg.asks {
251        let price_str = &ask[0];
252        let size_str = &ask[1];
253
254        let price = Price::new(
255            price_str
256                .parse::<f64>()
257                .map_err(|e| anyhow::anyhow!("Failed to parse ask price: {e}"))?,
258            price_precision,
259        );
260
261        let size = Quantity::new(
262            size_str
263                .parse::<f64>()
264                .map_err(|e| anyhow::anyhow!("Failed to parse ask size: {e}"))?,
265            size_precision,
266        );
267
268        // For asks (sell orders), we use OrderSide::Sell
269        let order_id = 0; // Not provided by Coinbase
270        let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
271
272        let delta = OrderBookDelta::new(
273            instrument_id,
274            BookAction::Add, // For snapshots, always use Add
275            order,
276            flags,
277            msg.sequence,
278            ts_event,
279            ts_init,
280        );
281
282        deltas.push(delta);
283    }
284
285    Ok(OrderBookDeltas::new(instrument_id, deltas))
286}
287
288/// Parses an order book update message into `OrderBookDeltas`.
289///
290/// # Errors
291///
292/// Returns an error if any price, size, or side cannot be parsed.
293pub fn parse_orderbook_update_msg(
294    msg: &CoinbaseIntxWsOrderBookUpdateMsg,
295    instrument_id: InstrumentId,
296    price_precision: u8,
297    size_precision: u8,
298    ts_init: UnixNanos,
299) -> anyhow::Result<OrderBookDeltas> {
300    let ts_event = UnixNanos::from(msg.time);
301
302    // No snapshot flag for updates
303    let flags = 0;
304
305    // Allocate capacity for all changes
306    let mut deltas = Vec::with_capacity(msg.changes.len());
307
308    // Process changes
309    for change in &msg.changes {
310        let side_str = &change[0];
311        let price_str = &change[1];
312        let size_str = &change[2];
313
314        let price = Price::new(
315            price_str
316                .parse::<f64>()
317                .map_err(|e| anyhow::anyhow!("Failed to parse price: {e}"))?,
318            price_precision,
319        );
320
321        let size = Quantity::new(
322            size_str
323                .parse::<f64>()
324                .map_err(|e| anyhow::anyhow!("Failed to parse size: {e}"))?,
325            size_precision,
326        );
327
328        // Determine order side
329        let side = match side_str.as_str() {
330            "BUY" => OrderSide::Buy,
331            "SELL" => OrderSide::Sell,
332            _ => anyhow::bail!("Unknown order side: {side_str}"),
333        };
334
335        // Determine book action based on size
336        let book_action = if size.is_zero() {
337            BookAction::Delete
338        } else {
339            BookAction::Update
340        };
341
342        let order_id = 0; // Not provided by Coinbase
343        let order = BookOrder::new(side, price, size, order_id);
344
345        let delta = OrderBookDelta::new(
346            instrument_id,
347            book_action,
348            order,
349            flags,
350            msg.sequence,
351            ts_event,
352            ts_init,
353        );
354
355        deltas.push(delta);
356    }
357
358    Ok(OrderBookDeltas::new(instrument_id, deltas))
359}
360
361/// Parses a level 1 quote message into `QuoteTick`.
362///
363/// # Errors
364///
365/// Returns an error if any price or size value cannot be parsed.
366pub fn parse_quote_msg(
367    msg: &CoinbaseIntxWsQuoteMsg,
368    instrument_id: InstrumentId,
369    price_precision: u8,
370    size_precision: u8,
371    ts_init: UnixNanos,
372) -> anyhow::Result<QuoteTick> {
373    let bid_price = Price::new(msg.bid_price.parse::<f64>()?, price_precision);
374    let ask_price = Price::new(msg.ask_price.parse::<f64>()?, price_precision);
375    let bid_size = Quantity::new(msg.bid_qty.parse::<f64>()?, size_precision);
376    let ask_size = Quantity::new(msg.ask_qty.parse::<f64>()?, size_precision);
377    let ts_event = UnixNanos::from(msg.time);
378
379    Ok(QuoteTick::new(
380        instrument_id,
381        bid_price,
382        ask_price,
383        bid_size,
384        ask_size,
385        ts_event,
386        ts_init,
387    ))
388}
389
390/// Parses a trade message into `TradeTick`.
391///
392/// # Errors
393///
394/// Returns an error if any price, size, or aggressor side cannot be parsed.
395pub fn parse_trade_msg(
396    msg: &CoinbaseIntxWsTradeMsg,
397    instrument_id: InstrumentId,
398    price_precision: u8,
399    size_precision: u8,
400    ts_init: UnixNanos,
401) -> anyhow::Result<TradeTick> {
402    let price = Price::new(msg.trade_price.parse::<f64>()?, price_precision);
403    let size = Quantity::new(msg.trade_qty.parse::<f64>()?, size_precision);
404    let aggressor_side: AggressorSide = msg.aggressor_side.clone().into();
405    let trade_id = TradeId::new(&msg.match_id);
406    let ts_event = UnixNanos::from(msg.time);
407
408    Ok(TradeTick::new(
409        instrument_id,
410        price,
411        size,
412        aggressor_side,
413        trade_id,
414        ts_event,
415        ts_init,
416    ))
417}
418
419/// Parses a mark price message into `MarkPriceUpdate`.
420///
421/// # Errors
422///
423/// Returns an error if the price value cannot be parsed.
424pub fn parse_mark_price_msg(
425    msg: &CoinbaseIntxWsRiskMsg,
426    instrument_id: InstrumentId,
427    price_precision: u8,
428    ts_init: UnixNanos,
429) -> anyhow::Result<MarkPriceUpdate> {
430    let value = Price::new(msg.mark_price.parse::<f64>()?, price_precision);
431    let ts_event = UnixNanos::from(msg.time);
432
433    Ok(MarkPriceUpdate::new(
434        instrument_id,
435        value,
436        ts_event,
437        ts_init,
438    ))
439}
440
441/// Parses an index price message into `IndexPriceUpdate`.
442///
443/// # Errors
444///
445/// Returns an error if the price value cannot be parsed.
446pub fn parse_index_price_msg(
447    msg: &CoinbaseIntxWsRiskMsg,
448    instrument_id: InstrumentId,
449    price_precision: u8,
450    ts_init: UnixNanos,
451) -> anyhow::Result<IndexPriceUpdate> {
452    let value = Price::new(msg.index_price.parse::<f64>()?, price_precision);
453    let ts_event = UnixNanos::from(msg.time);
454
455    Ok(IndexPriceUpdate::new(
456        instrument_id,
457        value,
458        ts_event,
459        ts_init,
460    ))
461}
462
463/// Parses a candlestick snapshot message into `Bar`.
464///
465/// # Errors
466///
467/// Returns an error if the candle data is missing or any price/quantity cannot be parsed.
468pub fn parse_candle_msg(
469    msg: &CoinbaseIntxWsCandleSnapshotMsg,
470    instrument_id: InstrumentId,
471    price_precision: u8,
472    size_precision: u8,
473    ts_init: UnixNanos,
474) -> anyhow::Result<Bar> {
475    let bar_spec = coinbase_channel_as_bar_spec(&msg.channel)?;
476    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
477    let candle = msg
478        .candles
479        .last()
480        .ok_or_else(|| anyhow::anyhow!("Empty candles in snapshot for channel {}", msg.channel))?;
481    let ts_event = UnixNanos::from(candle.start); // TODO: Convert to close
482
483    let open_price = Price::new(candle.open.parse::<f64>()?, price_precision);
484    let high_price = Price::new(candle.high.parse::<f64>()?, price_precision);
485    let low_price = Price::new(candle.low.parse::<f64>()?, price_precision);
486    let close_price = Price::new(candle.close.parse::<f64>()?, price_precision);
487    let volume = Quantity::new(candle.volume.parse::<f64>()?, size_precision);
488
489    // Create a new bar
490    Ok(Bar::new(
491        bar_type,
492        open_price,
493        high_price,
494        low_price,
495        close_price,
496        volume,
497        ts_event,
498        ts_init,
499    ))
500}