nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// TODO: Under development
17#![allow(dead_code)] // For PortfolioConfig
18
19//! Provides a generic `Portfolio` for all environments.
20use std::{
21    cell::RefCell,
22    collections::{HashMap, HashSet},
23    fmt::Debug,
24    rc::Rc,
25};
26
27use nautilus_analysis::analyzer::PortfolioAnalyzer;
28use nautilus_common::{
29    cache::Cache,
30    clock::Clock,
31    msgbus::{
32        self,
33        handler::{ShareableMessageHandler, TypedMessageHandler},
34    },
35};
36use nautilus_model::{
37    accounts::AccountAny,
38    data::{Bar, QuoteTick},
39    enums::{OrderSide, OrderType, PositionSide, PriceType},
40    events::{AccountState, OrderEventAny, position::PositionEvent},
41    identifiers::{InstrumentId, Venue},
42    instruments::{Instrument, InstrumentAny},
43    orders::{Order, OrderAny},
44    position::Position,
45    types::{Currency, Money, Price},
46};
47use rust_decimal::{Decimal, prelude::FromPrimitive};
48
49use crate::{config::PortfolioConfig, manager::AccountsManager};
50
51struct PortfolioState {
52    accounts: AccountsManager,
53    analyzer: PortfolioAnalyzer,
54    unrealized_pnls: HashMap<InstrumentId, Money>,
55    realized_pnls: HashMap<InstrumentId, Money>,
56    net_positions: HashMap<InstrumentId, Decimal>,
57    pending_calcs: HashSet<InstrumentId>,
58    bar_close_prices: HashMap<InstrumentId, Price>,
59    initialized: bool,
60}
61
62impl PortfolioState {
63    fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
64        Self {
65            accounts: AccountsManager::new(clock, cache),
66            analyzer: PortfolioAnalyzer::default(),
67            unrealized_pnls: HashMap::new(),
68            realized_pnls: HashMap::new(),
69            net_positions: HashMap::new(),
70            pending_calcs: HashSet::new(),
71            bar_close_prices: HashMap::new(),
72            initialized: false,
73        }
74    }
75
76    fn reset(&mut self) {
77        log::debug!("RESETTING");
78        self.net_positions.clear();
79        self.unrealized_pnls.clear();
80        self.realized_pnls.clear();
81        self.pending_calcs.clear();
82        self.analyzer.reset();
83        log::debug!("READY");
84    }
85}
86
87pub struct Portfolio {
88    pub(crate) clock: Rc<RefCell<dyn Clock>>,
89    pub(crate) cache: Rc<RefCell<Cache>>,
90    inner: Rc<RefCell<PortfolioState>>,
91    config: PortfolioConfig,
92}
93
94impl Debug for Portfolio {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct(stringify!(Portfolio)).finish()
97    }
98}
99
100impl Portfolio {
101    pub fn new(
102        cache: Rc<RefCell<Cache>>,
103        clock: Rc<RefCell<dyn Clock>>,
104        config: Option<PortfolioConfig>,
105    ) -> Self {
106        let inner = Rc::new(RefCell::new(PortfolioState::new(
107            clock.clone(),
108            cache.clone(),
109        )));
110        let config = config.unwrap_or_default();
111
112        Self::register_message_handlers(
113            cache.clone(),
114            clock.clone(),
115            inner.clone(),
116            config.bar_updates,
117        );
118
119        Self {
120            clock,
121            cache,
122            inner,
123            config,
124        }
125    }
126
127    fn register_message_handlers(
128        cache: Rc<RefCell<Cache>>,
129        clock: Rc<RefCell<dyn Clock>>,
130        inner: Rc<RefCell<PortfolioState>>,
131        bar_updates: bool,
132    ) {
133        let update_account_handler = {
134            let cache = cache.clone();
135            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
136                move |event: &AccountState| {
137                    update_account(cache.clone(), event);
138                },
139            )))
140        };
141
142        let update_position_handler = {
143            let cache = cache.clone();
144            let clock = clock.clone();
145            let inner = inner.clone();
146            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
147                move |event: &PositionEvent| {
148                    update_position(cache.clone(), clock.clone(), inner.clone(), event);
149                },
150            )))
151        };
152
153        let update_quote_handler = {
154            let cache = cache.clone();
155            let clock = clock.clone();
156            let inner = inner.clone();
157            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
158                move |quote: &QuoteTick| {
159                    update_quote_tick(cache.clone(), clock.clone(), inner.clone(), quote);
160                },
161            )))
162        };
163
164        let update_bar_handler = {
165            let cache = cache.clone();
166            let clock = clock.clone();
167            let inner = inner.clone();
168            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
169                update_bar(cache.clone(), clock.clone(), inner.clone(), bar);
170            })))
171        };
172
173        let update_order_handler = {
174            let cache = cache;
175            let clock = clock.clone();
176            let inner = inner;
177            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
178                move |event: &OrderEventAny| {
179                    update_order(cache.clone(), clock.clone(), inner.clone(), event);
180                },
181            )))
182        };
183
184        msgbus::register(
185            "Portfolio.update_account".into(),
186            update_account_handler.clone(),
187        );
188
189        msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
190        if bar_updates {
191            msgbus::subscribe("data.quotes.*EXTERNAL".into(), update_bar_handler, Some(10));
192        }
193        msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
194        msgbus::subscribe(
195            "events.position.*".into(),
196            update_position_handler,
197            Some(10),
198        );
199        msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
200    }
201
202    pub fn reset(&mut self) {
203        log::debug!("RESETTING");
204        self.inner.borrow_mut().reset();
205        log::debug!("READY");
206    }
207
208    // -- QUERIES ---------------------------------------------------------------------------------
209
210    #[must_use]
211    pub fn is_initialized(&self) -> bool {
212        self.inner.borrow().initialized
213    }
214
215    #[must_use]
216    pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
217        self.cache.borrow().account_for_venue(venue).map_or_else(
218            || {
219                log::error!("Cannot get balances locked: no account generated for {venue}");
220                HashMap::new()
221            },
222            AccountAny::balances_locked,
223        )
224    }
225
226    #[must_use]
227    pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
228        self.cache.borrow().account_for_venue(venue).map_or_else(
229            || {
230                log::error!(
231                    "Cannot get initial (order) margins: no account registered for {venue}"
232                );
233                HashMap::new()
234            },
235            |account| match account {
236                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
237                AccountAny::Cash(_) => {
238                    log::warn!("Initial margins not applicable for cash account");
239                    HashMap::new()
240                }
241            },
242        )
243    }
244
245    #[must_use]
246    pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
247        self.cache.borrow().account_for_venue(venue).map_or_else(
248            || {
249                log::error!(
250                    "Cannot get maintenance (position) margins: no account registered for {venue}"
251                );
252                HashMap::new()
253            },
254            |account| match account {
255                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
256                AccountAny::Cash(_) => {
257                    log::warn!("Maintenance margins not applicable for cash account");
258                    HashMap::new()
259                }
260            },
261        )
262    }
263
264    #[must_use]
265    pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
266        let instrument_ids = {
267            let cache = self.cache.borrow();
268            let positions = cache.positions(Some(venue), None, None, None);
269
270            if positions.is_empty() {
271                return HashMap::new(); // Nothing to calculate
272            }
273
274            let instrument_ids: HashSet<InstrumentId> =
275                positions.iter().map(|p| p.instrument_id).collect();
276
277            instrument_ids
278        };
279
280        let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
281
282        for instrument_id in instrument_ids {
283            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
284                // PnL already calculated
285                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
286                continue;
287            }
288
289            // Calculate PnL
290            match self.calculate_unrealized_pnl(&instrument_id) {
291                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
292                None => continue,
293            }
294        }
295
296        unrealized_pnls
297            .into_iter()
298            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
299            .collect()
300    }
301
302    #[must_use]
303    pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
304        let instrument_ids = {
305            let cache = self.cache.borrow();
306            let positions = cache.positions(Some(venue), None, None, None);
307
308            if positions.is_empty() {
309                return HashMap::new(); // Nothing to calculate
310            }
311
312            let instrument_ids: HashSet<InstrumentId> =
313                positions.iter().map(|p| p.instrument_id).collect();
314
315            instrument_ids
316        };
317
318        let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
319
320        for instrument_id in instrument_ids {
321            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
322                // PnL already calculated
323                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
324                continue;
325            }
326
327            // Calculate PnL
328            match self.calculate_realized_pnl(&instrument_id) {
329                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
330                None => continue,
331            }
332        }
333
334        realized_pnls
335            .into_iter()
336            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
337            .collect()
338    }
339
340    #[must_use]
341    pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
342        let cache = self.cache.borrow();
343        let account = if let Some(account) = cache.account_for_venue(venue) {
344            account
345        } else {
346            log::error!("Cannot calculate net exposures: no account registered for {venue}");
347            return None; // Cannot calculate
348        };
349
350        let positions_open = cache.positions_open(Some(venue), None, None, None);
351        if positions_open.is_empty() {
352            return Some(HashMap::new()); // Nothing to calculate
353        }
354
355        let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
356
357        for position in positions_open {
358            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
359                instrument
360            } else {
361                log::error!(
362                    "Cannot calculate net exposures: no instrument for {}",
363                    position.instrument_id
364                );
365                return None; // Cannot calculate
366            };
367
368            if position.side == PositionSide::Flat {
369                log::error!(
370                    "Cannot calculate net exposures: position is flat for {}",
371                    position.instrument_id
372                );
373                continue; // Nothing to calculate
374            }
375
376            let price = self.get_price(position)?;
377            let xrate = if let Some(xrate) =
378                self.calculate_xrate_to_base(instrument, account, position.entry)
379            {
380                xrate
381            } else {
382                log::error!(
383                    // TODO: Improve logging
384                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
385                    instrument.settlement_currency(),
386                    account.base_currency()
387                );
388                return None; // Cannot calculate
389            };
390
391            let settlement_currency = account
392                .base_currency()
393                .unwrap_or_else(|| instrument.settlement_currency());
394
395            let net_exposure = instrument
396                .calculate_notional_value(position.quantity, price, None)
397                .as_f64()
398                * xrate;
399
400            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
401                .round()
402                / 10f64.powi(settlement_currency.precision.into());
403
404            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
405        }
406
407        Some(
408            net_exposures
409                .into_iter()
410                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
411                .collect(),
412        )
413    }
414
415    #[must_use]
416    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
417        if let Some(pnl) = self
418            .inner
419            .borrow()
420            .unrealized_pnls
421            .get(instrument_id)
422            .copied()
423        {
424            return Some(pnl);
425        }
426
427        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
428        self.inner
429            .borrow_mut()
430            .unrealized_pnls
431            .insert(*instrument_id, pnl);
432        Some(pnl)
433    }
434
435    #[must_use]
436    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
437        if let Some(pnl) = self
438            .inner
439            .borrow()
440            .realized_pnls
441            .get(instrument_id)
442            .copied()
443        {
444            return Some(pnl);
445        }
446
447        let pnl = self.calculate_realized_pnl(instrument_id)?;
448        self.inner
449            .borrow_mut()
450            .realized_pnls
451            .insert(*instrument_id, pnl);
452        Some(pnl)
453    }
454
455    #[must_use]
456    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
457        let cache = self.cache.borrow();
458        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
459            account
460        } else {
461            log::error!(
462                "Cannot calculate net exposure: no account registered for {}",
463                instrument_id.venue
464            );
465            return None;
466        };
467
468        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
469            instrument
470        } else {
471            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
472            return None;
473        };
474
475        let positions_open = cache.positions_open(
476            None, // Faster query filtering
477            Some(instrument_id),
478            None,
479            None,
480        );
481
482        if positions_open.is_empty() {
483            return Some(Money::new(0.0, instrument.settlement_currency()));
484        }
485
486        let mut net_exposure = 0.0;
487
488        for position in positions_open {
489            let price = self.get_price(position)?;
490            let xrate = if let Some(xrate) =
491                self.calculate_xrate_to_base(instrument, account, position.entry)
492            {
493                xrate
494            } else {
495                log::error!(
496                    // TODO: Improve logging
497                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
498                    instrument.settlement_currency(),
499                    account.base_currency()
500                );
501                return None; // Cannot calculate
502            };
503
504            let notional_value =
505                instrument.calculate_notional_value(position.quantity, price, None);
506            net_exposure += notional_value.as_f64() * xrate;
507        }
508
509        let settlement_currency = account
510            .base_currency()
511            .unwrap_or_else(|| instrument.settlement_currency());
512
513        Some(Money::new(net_exposure, settlement_currency))
514    }
515
516    #[must_use]
517    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
518        self.inner
519            .borrow()
520            .net_positions
521            .get(instrument_id)
522            .copied()
523            .unwrap_or(Decimal::ZERO)
524    }
525
526    #[must_use]
527    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
528        self.inner
529            .borrow()
530            .net_positions
531            .get(instrument_id)
532            .copied()
533            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
534    }
535
536    #[must_use]
537    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
538        self.inner
539            .borrow()
540            .net_positions
541            .get(instrument_id)
542            .copied()
543            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
544    }
545
546    #[must_use]
547    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
548        self.inner
549            .borrow()
550            .net_positions
551            .get(instrument_id)
552            .copied()
553            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
554    }
555
556    #[must_use]
557    pub fn is_completely_flat(&self) -> bool {
558        for net_position in self.inner.borrow().net_positions.values() {
559            if *net_position != Decimal::ZERO {
560                return false;
561            }
562        }
563        true
564    }
565
566    // -- COMMANDS --------------------------------------------------------------------------------
567
568    /// Initializes account margin based on existing open orders.
569    ///
570    /// # Panics
571    ///
572    /// Panics if updating the cache with a mutated account fails.
573    pub fn initialize_orders(&mut self) {
574        let mut initialized = true;
575        let orders_and_instruments = {
576            let cache = self.cache.borrow();
577            let all_orders_open = cache.orders_open(None, None, None, None);
578
579            let mut instruments_with_orders = Vec::new();
580            let mut instruments = HashSet::new();
581
582            for order in &all_orders_open {
583                instruments.insert(order.instrument_id());
584            }
585
586            for instrument_id in instruments {
587                if let Some(instrument) = cache.instrument(&instrument_id) {
588                    let orders = cache
589                        .orders_open(None, Some(&instrument_id), None, None)
590                        .into_iter()
591                        .cloned()
592                        .collect::<Vec<OrderAny>>();
593                    instruments_with_orders.push((instrument.clone(), orders));
594                } else {
595                    log::error!(
596                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
597                    );
598                    initialized = false;
599                    break;
600                }
601            }
602            instruments_with_orders
603        };
604
605        for (instrument, orders_open) in &orders_and_instruments {
606            let mut cache = self.cache.borrow_mut();
607            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
608                account
609            } else {
610                log::error!(
611                    "Cannot update initial (order) margin: no account registered for {}",
612                    instrument.id().venue
613                );
614                initialized = false;
615                break;
616            };
617
618            let result = self.inner.borrow_mut().accounts.update_orders(
619                account,
620                instrument.clone(),
621                orders_open.iter().collect(),
622                self.clock.borrow().timestamp_ns(),
623            );
624
625            match result {
626                Some((updated_account, _)) => {
627                    cache.add_account(updated_account).unwrap(); // Temp Fix to update the mutated account
628                }
629                None => {
630                    initialized = false;
631                }
632            }
633        }
634
635        let total_orders = orders_and_instruments
636            .into_iter()
637            .map(|(_, orders)| orders.len())
638            .sum::<usize>();
639
640        log::info!(
641            "Initialized {} open order{}",
642            total_orders,
643            if total_orders == 1 { "" } else { "s" }
644        );
645
646        self.inner.borrow_mut().initialized = initialized;
647    }
648
649    /// Initializes account margin based on existing open positions.
650    ///
651    /// # Panics
652    ///
653    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
654    pub fn initialize_positions(&mut self) {
655        self.inner.borrow_mut().unrealized_pnls.clear();
656        self.inner.borrow_mut().realized_pnls.clear();
657        let all_positions_open: Vec<Position>;
658        let mut instruments = HashSet::new();
659        {
660            let cache = self.cache.borrow();
661            all_positions_open = cache
662                .positions_open(None, None, None, None)
663                .into_iter()
664                .cloned()
665                .collect();
666            for position in &all_positions_open {
667                instruments.insert(position.instrument_id);
668            }
669        }
670
671        let mut initialized = true;
672
673        for instrument_id in instruments {
674            let positions_open: Vec<Position> = {
675                let cache = self.cache.borrow();
676                cache
677                    .positions_open(None, Some(&instrument_id), None, None)
678                    .into_iter()
679                    .cloned()
680                    .collect()
681            };
682
683            self.update_net_position(&instrument_id, positions_open);
684
685            let calculated_unrealized_pnl = self
686                .calculate_unrealized_pnl(&instrument_id)
687                .expect("Failed to calculate unrealized PnL");
688            let calculated_realized_pnl = self
689                .calculate_realized_pnl(&instrument_id)
690                .expect("Failed to calculate realized PnL");
691
692            self.inner
693                .borrow_mut()
694                .unrealized_pnls
695                .insert(instrument_id, calculated_unrealized_pnl);
696            self.inner
697                .borrow_mut()
698                .realized_pnls
699                .insert(instrument_id, calculated_realized_pnl);
700
701            let cache = self.cache.borrow();
702            let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
703                account
704            } else {
705                log::error!(
706                    "Cannot update maintenance (position) margin: no account registered for {}",
707                    instrument_id.venue
708                );
709                initialized = false;
710                break;
711            };
712
713            let account = match account {
714                AccountAny::Cash(_) => continue,
715                AccountAny::Margin(margin_account) => margin_account,
716            };
717
718            let mut cache = self.cache.borrow_mut();
719            let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
720                instrument
721            } else {
722                log::error!(
723                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
724                );
725                initialized = false;
726                break;
727            };
728
729            let result = self.inner.borrow_mut().accounts.update_positions(
730                account,
731                instrument.clone(),
732                self.cache
733                    .borrow()
734                    .positions_open(None, Some(&instrument_id), None, None),
735                self.clock.borrow().timestamp_ns(),
736            );
737
738            match result {
739                Some((updated_account, _)) => {
740                    cache
741                        .add_account(AccountAny::Margin(updated_account)) // Temp Fix to update the mutated account
742                        .unwrap();
743                }
744                None => {
745                    initialized = false;
746                }
747            }
748        }
749
750        let open_count = all_positions_open.len();
751        self.inner.borrow_mut().initialized = initialized;
752        log::info!(
753            "Initialized {} open position{}",
754            open_count,
755            if open_count == 1 { "" } else { "s" }
756        );
757    }
758
759    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
760        update_quote_tick(
761            self.cache.clone(),
762            self.clock.clone(),
763            self.inner.clone(),
764            quote,
765        );
766    }
767
768    pub fn update_bar(&mut self, bar: &Bar) {
769        update_bar(
770            self.cache.clone(),
771            self.clock.clone(),
772            self.inner.clone(),
773            bar,
774        );
775    }
776
777    pub fn update_account(&mut self, event: &AccountState) {
778        update_account(self.cache.clone(), event);
779    }
780
781    pub fn update_order(&mut self, event: &OrderEventAny) {
782        update_order(
783            self.cache.clone(),
784            self.clock.clone(),
785            self.inner.clone(),
786            event,
787        );
788    }
789
790    pub fn update_position(&mut self, event: &PositionEvent) {
791        update_position(
792            self.cache.clone(),
793            self.clock.clone(),
794            self.inner.clone(),
795            event,
796        );
797    }
798
799    // -- INTERNAL --------------------------------------------------------------------------------
800
801    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
802        let mut net_position = Decimal::ZERO;
803
804        for open_position in positions_open {
805            log::debug!("open_position: {open_position}");
806            net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
807        }
808
809        let existing_position = self.net_position(instrument_id);
810        if existing_position != net_position {
811            self.inner
812                .borrow_mut()
813                .net_positions
814                .insert(*instrument_id, net_position);
815            log::info!("{instrument_id} net_position={net_position}");
816        }
817    }
818
819    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
820        let cache = self.cache.borrow();
821        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
822            account
823        } else {
824            log::error!(
825                "Cannot calculate unrealized PnL: no account registered for {}",
826                instrument_id.venue
827            );
828            return None;
829        };
830
831        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
832            instrument
833        } else {
834            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
835            return None;
836        };
837
838        let currency = account
839            .base_currency()
840            .unwrap_or_else(|| instrument.settlement_currency());
841
842        let positions_open = cache.positions_open(
843            None, // Faster query filtering
844            Some(instrument_id),
845            None,
846            None,
847        );
848
849        if positions_open.is_empty() {
850            return Some(Money::new(0.0, currency));
851        }
852
853        let mut total_pnl = 0.0;
854
855        for position in positions_open {
856            if position.instrument_id != *instrument_id {
857                continue; // Nothing to calculate
858            }
859
860            if position.side == PositionSide::Flat {
861                continue; // Nothing to calculate
862            }
863
864            let price = if let Some(price) = self.get_price(position) {
865                price
866            } else {
867                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
868                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
869                return None; // Cannot calculate
870            };
871
872            let mut pnl = position.unrealized_pnl(price).as_f64();
873
874            if let Some(base_currency) = account.base_currency() {
875                let xrate = if let Some(xrate) =
876                    self.calculate_xrate_to_base(instrument, account, position.entry)
877                {
878                    xrate
879                } else {
880                    log::error!(
881                        // TODO: Improve logging
882                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
883                        instrument.settlement_currency(),
884                        base_currency
885                    );
886                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
887                    return None; // Cannot calculate
888                };
889
890                let scale = 10f64.powi(currency.precision.into());
891                pnl = ((pnl * xrate) * scale).round() / scale;
892            }
893
894            total_pnl += pnl;
895        }
896
897        Some(Money::new(total_pnl, currency))
898    }
899
900    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
901        let cache = self.cache.borrow();
902        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
903            account
904        } else {
905            log::error!(
906                "Cannot calculate realized PnL: no account registered for {}",
907                instrument_id.venue
908            );
909            return None;
910        };
911
912        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
913            instrument
914        } else {
915            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
916            return None;
917        };
918
919        let currency = account
920            .base_currency()
921            .unwrap_or_else(|| instrument.settlement_currency());
922
923        let positions = cache.positions(
924            None, // Faster query filtering
925            Some(instrument_id),
926            None,
927            None,
928        );
929
930        if positions.is_empty() {
931            return Some(Money::new(0.0, currency));
932        }
933
934        let mut total_pnl = 0.0;
935
936        for position in positions {
937            if position.instrument_id != *instrument_id {
938                continue; // Nothing to calculate
939            }
940
941            if position.realized_pnl.is_none() {
942                continue; // Nothing to calculate
943            }
944
945            let mut pnl = position.realized_pnl?.as_f64();
946
947            if let Some(base_currency) = account.base_currency() {
948                let xrate = if let Some(xrate) =
949                    self.calculate_xrate_to_base(instrument, account, position.entry)
950                {
951                    xrate
952                } else {
953                    log::error!(
954                        // TODO: Improve logging
955                        "Cannot calculate realized PnL: insufficient data for {}/{}",
956                        instrument.settlement_currency(),
957                        base_currency
958                    );
959                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
960                    return None; // Cannot calculate
961                };
962
963                let scale = 10f64.powi(currency.precision.into());
964                pnl = ((pnl * xrate) * scale).round() / scale;
965            }
966
967            total_pnl += pnl;
968        }
969
970        Some(Money::new(total_pnl, currency))
971    }
972
973    fn get_price(&self, position: &Position) -> Option<Price> {
974        let price_type = match position.side {
975            PositionSide::Long => PriceType::Bid,
976            PositionSide::Short => PriceType::Ask,
977            _ => panic!("invalid `PositionSide`, was {}", position.side),
978        };
979
980        let cache = self.cache.borrow();
981
982        let instrument_id = &position.instrument_id;
983        cache
984            .price(instrument_id, price_type)
985            .or_else(|| cache.price(instrument_id, PriceType::Last))
986            .or_else(|| {
987                self.inner
988                    .borrow()
989                    .bar_close_prices
990                    .get(instrument_id)
991                    .copied()
992            })
993    }
994
995    fn calculate_xrate_to_base(
996        &self,
997        instrument: &InstrumentAny,
998        account: &AccountAny,
999        side: OrderSide,
1000    ) -> Option<f64> {
1001        if !self.config.convert_to_account_base_currency {
1002            return Some(1.0); // No conversion needed
1003        }
1004
1005        match account.base_currency() {
1006            None => Some(1.0), // No conversion needed
1007            Some(base_currency) => {
1008                let cache = self.cache.borrow();
1009
1010                if self.config.use_mark_xrates {
1011                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1012                }
1013
1014                let price_type = if side == OrderSide::Buy {
1015                    PriceType::Bid
1016                } else {
1017                    PriceType::Ask
1018                };
1019
1020                cache.get_xrate(
1021                    instrument.id().venue,
1022                    instrument.settlement_currency(),
1023                    base_currency,
1024                    price_type,
1025                )
1026            }
1027        }
1028    }
1029}
1030
1031// Helper functions
1032fn update_quote_tick(
1033    cache: Rc<RefCell<Cache>>,
1034    clock: Rc<RefCell<dyn Clock>>,
1035    inner: Rc<RefCell<PortfolioState>>,
1036    quote: &QuoteTick,
1037) {
1038    update_instrument_id(cache, clock.clone(), inner, &quote.instrument_id);
1039}
1040
1041fn update_bar(
1042    cache: Rc<RefCell<Cache>>,
1043    clock: Rc<RefCell<dyn Clock>>,
1044    inner: Rc<RefCell<PortfolioState>>,
1045    bar: &Bar,
1046) {
1047    let instrument_id = bar.bar_type.instrument_id();
1048    inner
1049        .borrow_mut()
1050        .bar_close_prices
1051        .insert(instrument_id, bar.close);
1052    update_instrument_id(cache, clock.clone(), inner, &instrument_id);
1053}
1054
1055fn update_instrument_id(
1056    cache: Rc<RefCell<Cache>>,
1057    clock: Rc<RefCell<dyn Clock>>,
1058    inner: Rc<RefCell<PortfolioState>>,
1059    instrument_id: &InstrumentId,
1060) {
1061    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1062
1063    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1064        return;
1065    }
1066
1067    let result_init;
1068    let mut result_maint = None;
1069
1070    let account = {
1071        let cache_ref = cache.borrow();
1072        let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1073            account
1074        } else {
1075            log::error!(
1076                "Cannot update tick: no account registered for {}",
1077                instrument_id.venue
1078            );
1079            return;
1080        };
1081
1082        let mut cache_ref = cache.borrow_mut();
1083        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1084            instrument.clone()
1085        } else {
1086            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1087            return;
1088        };
1089
1090        // Clone the orders and positions to own the data
1091        let orders_open: Vec<OrderAny> = cache_ref
1092            .orders_open(None, Some(instrument_id), None, None)
1093            .iter()
1094            .map(|o| (*o).clone())
1095            .collect();
1096
1097        let positions_open: Vec<Position> = cache_ref
1098            .positions_open(None, Some(instrument_id), None, None)
1099            .iter()
1100            .map(|p| (*p).clone())
1101            .collect();
1102
1103        result_init = inner.borrow().accounts.update_orders(
1104            account,
1105            instrument.clone(),
1106            orders_open.iter().collect(),
1107            clock.borrow().timestamp_ns(),
1108        );
1109
1110        if let AccountAny::Margin(margin_account) = account {
1111            result_maint = inner.borrow().accounts.update_positions(
1112                margin_account,
1113                instrument,
1114                positions_open.iter().collect(),
1115                clock.borrow().timestamp_ns(),
1116            );
1117        }
1118
1119        if let Some((ref updated_account, _)) = result_init {
1120            cache_ref.add_account(updated_account.clone()).unwrap(); // Temp Fix to update the mutated account
1121        }
1122        account.clone()
1123    };
1124
1125    let mut portfolio_clone = Portfolio {
1126        clock: clock.clone(),
1127        cache,
1128        inner: inner.clone(),
1129        config: PortfolioConfig::default(), // TODO: TBD
1130    };
1131
1132    let result_unrealized_pnl: Option<Money> =
1133        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1134
1135    if result_init.is_some()
1136        && (matches!(account, AccountAny::Cash(_))
1137            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1138    {
1139        inner.borrow_mut().pending_calcs.remove(instrument_id);
1140        if inner.borrow().pending_calcs.is_empty() {
1141            inner.borrow_mut().initialized = true;
1142        }
1143    }
1144}
1145
1146fn update_order(
1147    cache: Rc<RefCell<Cache>>,
1148    clock: Rc<RefCell<dyn Clock>>,
1149    inner: Rc<RefCell<PortfolioState>>,
1150    event: &OrderEventAny,
1151) {
1152    let cache_ref = cache.borrow();
1153    let account_id = match event.account_id() {
1154        Some(account_id) => account_id,
1155        None => {
1156            return; // No Account Assigned
1157        }
1158    };
1159
1160    let account = if let Some(account) = cache_ref.account(&account_id) {
1161        account
1162    } else {
1163        log::error!("Cannot update order: no account registered for {account_id}");
1164        return;
1165    };
1166
1167    match account {
1168        AccountAny::Cash(cash_account) => {
1169            if !cash_account.base.calculate_account_state {
1170                return;
1171            }
1172        }
1173        AccountAny::Margin(margin_account) => {
1174            if !margin_account.base.calculate_account_state {
1175                return;
1176            }
1177        }
1178    }
1179
1180    match event {
1181        OrderEventAny::Accepted(_)
1182        | OrderEventAny::Canceled(_)
1183        | OrderEventAny::Rejected(_)
1184        | OrderEventAny::Updated(_)
1185        | OrderEventAny::Filled(_) => {}
1186        _ => {
1187            return;
1188        }
1189    }
1190
1191    let cache_ref = cache.borrow();
1192    let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1193        order
1194    } else {
1195        log::error!(
1196            "Cannot update order: {} not found in the cache",
1197            event.client_order_id()
1198        );
1199        return; // No Order Found
1200    };
1201
1202    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1203        return; // No change to account state
1204    }
1205
1206    let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1207        instrument_id
1208    } else {
1209        log::error!(
1210            "Cannot update order: no instrument found for {}",
1211            event.instrument_id()
1212        );
1213        return;
1214    };
1215
1216    if let OrderEventAny::Filled(order_filled) = event {
1217        let _ = inner.borrow().accounts.update_balances(
1218            account.clone(),
1219            instrument.clone(),
1220            *order_filled,
1221        );
1222
1223        let mut portfolio_clone = Portfolio {
1224            clock: clock.clone(),
1225            cache: cache.clone(),
1226            inner: inner.clone(),
1227            config: PortfolioConfig::default(), // TODO: TBD
1228        };
1229
1230        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1231            Some(unrealized_pnl) => {
1232                inner
1233                    .borrow_mut()
1234                    .unrealized_pnls
1235                    .insert(event.instrument_id(), unrealized_pnl);
1236            }
1237            None => {
1238                log::error!(
1239                    "Failed to calculate unrealized PnL for instrument {}",
1240                    event.instrument_id()
1241                );
1242            }
1243        }
1244    }
1245
1246    let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1247
1248    let account_state = inner.borrow_mut().accounts.update_orders(
1249        account,
1250        instrument.clone(),
1251        orders_open,
1252        clock.borrow().timestamp_ns(),
1253    );
1254
1255    let mut cache_ref = cache.borrow_mut();
1256    cache_ref.update_account(account.clone()).unwrap();
1257
1258    if let Some(account_state) = account_state {
1259        msgbus::publish(
1260            format!("events.account.{}", account.id()).into(),
1261            &account_state,
1262        );
1263    } else {
1264        log::debug!("Added pending calculation for {}", instrument.id());
1265        inner.borrow_mut().pending_calcs.insert(instrument.id());
1266    }
1267
1268    log::debug!("Updated {event}");
1269}
1270
1271fn update_position(
1272    cache: Rc<RefCell<Cache>>,
1273    clock: Rc<RefCell<dyn Clock>>,
1274    inner: Rc<RefCell<PortfolioState>>,
1275    event: &PositionEvent,
1276) {
1277    let instrument_id = event.instrument_id();
1278
1279    let positions_open: Vec<Position> = {
1280        let cache_ref = cache.borrow();
1281
1282        cache_ref
1283            .positions_open(None, Some(&instrument_id), None, None)
1284            .iter()
1285            .map(|o| (*o).clone())
1286            .collect()
1287    };
1288
1289    log::debug!("postion fresh from cache -> {positions_open:?}");
1290
1291    let mut portfolio_clone = Portfolio {
1292        clock: clock.clone(),
1293        cache: cache.clone(),
1294        inner: inner.clone(),
1295        config: PortfolioConfig::default(), // TODO: TBD
1296    };
1297
1298    portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1299
1300    let calculated_unrealized_pnl = portfolio_clone
1301        .calculate_unrealized_pnl(&instrument_id)
1302        .expect("Failed to calculate unrealized PnL");
1303    let calculated_realized_pnl = portfolio_clone
1304        .calculate_realized_pnl(&instrument_id)
1305        .expect("Failed to calculate realized PnL");
1306
1307    inner
1308        .borrow_mut()
1309        .unrealized_pnls
1310        .insert(event.instrument_id(), calculated_unrealized_pnl);
1311    inner
1312        .borrow_mut()
1313        .realized_pnls
1314        .insert(event.instrument_id(), calculated_realized_pnl);
1315
1316    let cache_ref = cache.borrow();
1317    let account = cache_ref.account(&event.account_id());
1318
1319    if let Some(AccountAny::Margin(margin_account)) = account {
1320        if !margin_account.calculate_account_state {
1321            return; // Nothing to calculate
1322        }
1323
1324        let cache_ref = cache.borrow();
1325        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1326            instrument
1327        } else {
1328            log::error!("Cannot update position: no instrument found for {instrument_id}");
1329            return;
1330        };
1331
1332        let result = inner.borrow_mut().accounts.update_positions(
1333            margin_account,
1334            instrument.clone(),
1335            positions_open.iter().collect(),
1336            clock.borrow().timestamp_ns(),
1337        );
1338        let mut cache_ref = cache.borrow_mut();
1339        if let Some((margin_account, _)) = result {
1340            cache_ref
1341                .add_account(AccountAny::Margin(margin_account)) // Temp Fix to update the mutated account
1342                .unwrap();
1343        }
1344    } else if account.is_none() {
1345        log::error!(
1346            "Cannot update position: no account registered for {}",
1347            event.account_id()
1348        );
1349    }
1350}
1351
1352pub fn update_account(cache: Rc<RefCell<Cache>>, event: &AccountState) {
1353    let mut cache_ref = cache.borrow_mut();
1354
1355    if let Some(existing) = cache_ref.account(&event.account_id) {
1356        let mut account = existing.clone();
1357        account.apply(event.clone());
1358
1359        if let Err(e) = cache_ref.update_account(account.clone()) {
1360            log::error!("Failed to update account: {e}");
1361            return;
1362        }
1363    } else {
1364        let account = match AccountAny::from_events(vec![event.clone()]) {
1365            Ok(account) => account,
1366            Err(e) => {
1367                log::error!("Failed to create account: {e}");
1368                return;
1369            }
1370        };
1371
1372        if let Err(e) = cache_ref.add_account(account) {
1373            log::error!("Failed to add account: {e}");
1374            return;
1375        }
1376    }
1377
1378    log::info!("Updated {event}");
1379}