nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// TODO: Under development
17#![allow(dead_code)] // For PortfolioConfig
18
19//! Provides a generic `Portfolio` for all environments.
20use std::{
21    cell::RefCell,
22    collections::{HashMap, HashSet},
23    fmt::Debug,
24    rc::Rc,
25};
26
27use nautilus_analysis::analyzer::PortfolioAnalyzer;
28use nautilus_common::{
29    cache::Cache,
30    clock::Clock,
31    msgbus::{
32        self,
33        handler::{ShareableMessageHandler, TypedMessageHandler},
34    },
35};
36use nautilus_model::{
37    accounts::AccountAny,
38    data::{Bar, QuoteTick},
39    enums::{OrderSide, OrderType, PositionSide, PriceType},
40    events::{AccountState, OrderEventAny, position::PositionEvent},
41    identifiers::{InstrumentId, Venue},
42    instruments::{Instrument, InstrumentAny},
43    orders::{Order, OrderAny},
44    position::Position,
45    types::{Currency, Money, Price},
46};
47use rust_decimal::{Decimal, prelude::FromPrimitive};
48
49use crate::{config::PortfolioConfig, manager::AccountsManager};
50
51struct PortfolioState {
52    accounts: AccountsManager,
53    analyzer: PortfolioAnalyzer,
54    unrealized_pnls: HashMap<InstrumentId, Money>,
55    realized_pnls: HashMap<InstrumentId, Money>,
56    net_positions: HashMap<InstrumentId, Decimal>,
57    pending_calcs: HashSet<InstrumentId>,
58    bar_close_prices: HashMap<InstrumentId, Price>,
59    initialized: bool,
60}
61
62impl PortfolioState {
63    fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
64        Self {
65            accounts: AccountsManager::new(clock, cache),
66            analyzer: PortfolioAnalyzer::default(),
67            unrealized_pnls: HashMap::new(),
68            realized_pnls: HashMap::new(),
69            net_positions: HashMap::new(),
70            pending_calcs: HashSet::new(),
71            bar_close_prices: HashMap::new(),
72            initialized: false,
73        }
74    }
75
76    fn reset(&mut self) {
77        log::debug!("RESETTING");
78        self.net_positions.clear();
79        self.unrealized_pnls.clear();
80        self.realized_pnls.clear();
81        self.pending_calcs.clear();
82        self.analyzer.reset();
83        log::debug!("READY");
84    }
85}
86
87pub struct Portfolio {
88    pub(crate) clock: Rc<RefCell<dyn Clock>>,
89    pub(crate) cache: Rc<RefCell<Cache>>,
90    inner: Rc<RefCell<PortfolioState>>,
91    config: PortfolioConfig,
92}
93
94impl Debug for Portfolio {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct(stringify!(Portfolio)).finish()
97    }
98}
99
100impl Portfolio {
101    pub fn new(
102        cache: Rc<RefCell<Cache>>,
103        clock: Rc<RefCell<dyn Clock>>,
104        config: Option<PortfolioConfig>,
105    ) -> Self {
106        let inner = Rc::new(RefCell::new(PortfolioState::new(
107            clock.clone(),
108            cache.clone(),
109        )));
110        let config = config.unwrap_or_default();
111
112        Self::register_message_handlers(
113            cache.clone(),
114            clock.clone(),
115            inner.clone(),
116            config.bar_updates,
117        );
118
119        Self {
120            clock,
121            cache,
122            inner,
123            config,
124        }
125    }
126
127    fn register_message_handlers(
128        cache: Rc<RefCell<Cache>>,
129        clock: Rc<RefCell<dyn Clock>>,
130        inner: Rc<RefCell<PortfolioState>>,
131        bar_updates: bool,
132    ) {
133        let update_account_handler = {
134            let cache = cache.clone();
135            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
136                move |event: &AccountState| {
137                    update_account(cache.clone(), event);
138                },
139            )))
140        };
141
142        let update_position_handler = {
143            let cache = cache.clone();
144            let clock = clock.clone();
145            let inner = inner.clone();
146            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
147                move |event: &PositionEvent| {
148                    update_position(cache.clone(), clock.clone(), inner.clone(), event);
149                },
150            )))
151        };
152
153        let update_quote_handler = {
154            let cache = cache.clone();
155            let clock = clock.clone();
156            let inner = inner.clone();
157            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
158                move |quote: &QuoteTick| {
159                    update_quote_tick(cache.clone(), clock.clone(), inner.clone(), quote);
160                },
161            )))
162        };
163
164        let update_bar_handler = {
165            let cache = cache.clone();
166            let clock = clock.clone();
167            let inner = inner.clone();
168            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
169                update_bar(cache.clone(), clock.clone(), inner.clone(), bar);
170            })))
171        };
172
173        let update_order_handler = {
174            let cache = cache;
175            let clock = clock.clone();
176            let inner = inner;
177            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
178                move |event: &OrderEventAny| {
179                    update_order(cache.clone(), clock.clone(), inner.clone(), event);
180                },
181            )))
182        };
183
184        msgbus::register(
185            "Portfolio.update_account".into(),
186            update_account_handler.clone(),
187        );
188
189        msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
190        if bar_updates {
191            msgbus::subscribe("data.quotes.*EXTERNAL".into(), update_bar_handler, Some(10));
192        }
193        msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
194        msgbus::subscribe(
195            "events.position.*".into(),
196            update_position_handler,
197            Some(10),
198        );
199        msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
200    }
201
202    pub fn reset(&mut self) {
203        log::debug!("RESETTING");
204        self.inner.borrow_mut().reset();
205        log::debug!("READY");
206    }
207
208    // -- QUERIES ---------------------------------------------------------------------------------
209
210    /// Returns `true` if the portfolio has been initialized.
211    #[must_use]
212    pub fn is_initialized(&self) -> bool {
213        self.inner.borrow().initialized
214    }
215
216    /// Returns the locked balances for the given venue.
217    ///
218    /// Locked balances represent funds reserved for open orders.
219    #[must_use]
220    pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
221        self.cache.borrow().account_for_venue(venue).map_or_else(
222            || {
223                log::error!("Cannot get balances locked: no account generated for {venue}");
224                HashMap::new()
225            },
226            AccountAny::balances_locked,
227        )
228    }
229
230    /// Returns the initial margin requirements for the given venue.
231    ///
232    /// Only applicable for margin accounts. Returns empty map for cash accounts.
233    #[must_use]
234    pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
235        self.cache.borrow().account_for_venue(venue).map_or_else(
236            || {
237                log::error!(
238                    "Cannot get initial (order) margins: no account registered for {venue}"
239                );
240                HashMap::new()
241            },
242            |account| match account {
243                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
244                AccountAny::Cash(_) => {
245                    log::warn!("Initial margins not applicable for cash account");
246                    HashMap::new()
247                }
248            },
249        )
250    }
251
252    /// Returns the maintenance margin requirements for the given venue.
253    ///
254    /// Only applicable for margin accounts. Returns empty map for cash accounts.
255    #[must_use]
256    pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
257        self.cache.borrow().account_for_venue(venue).map_or_else(
258            || {
259                log::error!(
260                    "Cannot get maintenance (position) margins: no account registered for {venue}"
261                );
262                HashMap::new()
263            },
264            |account| match account {
265                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
266                AccountAny::Cash(_) => {
267                    log::warn!("Maintenance margins not applicable for cash account");
268                    HashMap::new()
269                }
270            },
271        )
272    }
273
274    /// Returns the unrealized PnLs for all positions at the given venue.
275    ///
276    /// Calculates mark-to-market PnL based on current market prices.
277    #[must_use]
278    pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
279        let instrument_ids = {
280            let cache = self.cache.borrow();
281            let positions = cache.positions(Some(venue), None, None, None);
282
283            if positions.is_empty() {
284                return HashMap::new(); // Nothing to calculate
285            }
286
287            let instrument_ids: HashSet<InstrumentId> =
288                positions.iter().map(|p| p.instrument_id).collect();
289
290            instrument_ids
291        };
292
293        let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
294
295        for instrument_id in instrument_ids {
296            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
297                // PnL already calculated
298                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
299                continue;
300            }
301
302            // Calculate PnL
303            match self.calculate_unrealized_pnl(&instrument_id) {
304                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
305                None => continue,
306            }
307        }
308
309        unrealized_pnls
310            .into_iter()
311            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
312            .collect()
313    }
314
315    /// Returns the realized PnLs for all positions at the given venue.
316    ///
317    /// Calculates total realized profit and loss from closed positions.
318    #[must_use]
319    pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
320        let instrument_ids = {
321            let cache = self.cache.borrow();
322            let positions = cache.positions(Some(venue), None, None, None);
323
324            if positions.is_empty() {
325                return HashMap::new(); // Nothing to calculate
326            }
327
328            let instrument_ids: HashSet<InstrumentId> =
329                positions.iter().map(|p| p.instrument_id).collect();
330
331            instrument_ids
332        };
333
334        let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
335
336        for instrument_id in instrument_ids {
337            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
338                // PnL already calculated
339                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
340                continue;
341            }
342
343            // Calculate PnL
344            match self.calculate_realized_pnl(&instrument_id) {
345                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
346                None => continue,
347            }
348        }
349
350        realized_pnls
351            .into_iter()
352            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
353            .collect()
354    }
355
356    #[must_use]
357    pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
358        let cache = self.cache.borrow();
359        let account = if let Some(account) = cache.account_for_venue(venue) {
360            account
361        } else {
362            log::error!("Cannot calculate net exposures: no account registered for {venue}");
363            return None; // Cannot calculate
364        };
365
366        let positions_open = cache.positions_open(Some(venue), None, None, None);
367        if positions_open.is_empty() {
368            return Some(HashMap::new()); // Nothing to calculate
369        }
370
371        let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
372
373        for position in positions_open {
374            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
375                instrument
376            } else {
377                log::error!(
378                    "Cannot calculate net exposures: no instrument for {}",
379                    position.instrument_id
380                );
381                return None; // Cannot calculate
382            };
383
384            if position.side == PositionSide::Flat {
385                log::error!(
386                    "Cannot calculate net exposures: position is flat for {}",
387                    position.instrument_id
388                );
389                continue; // Nothing to calculate
390            }
391
392            let price = self.get_price(position)?;
393            let xrate = if let Some(xrate) =
394                self.calculate_xrate_to_base(instrument, account, position.entry)
395            {
396                xrate
397            } else {
398                log::error!(
399                    // TODO: Improve logging
400                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
401                    instrument.settlement_currency(),
402                    account.base_currency()
403                );
404                return None; // Cannot calculate
405            };
406
407            let settlement_currency = account
408                .base_currency()
409                .unwrap_or_else(|| instrument.settlement_currency());
410
411            let net_exposure = instrument
412                .calculate_notional_value(position.quantity, price, None)
413                .as_f64()
414                * xrate;
415
416            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
417                .round()
418                / 10f64.powi(settlement_currency.precision.into());
419
420            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
421        }
422
423        Some(
424            net_exposures
425                .into_iter()
426                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
427                .collect(),
428        )
429    }
430
431    #[must_use]
432    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
433        if let Some(pnl) = self
434            .inner
435            .borrow()
436            .unrealized_pnls
437            .get(instrument_id)
438            .copied()
439        {
440            return Some(pnl);
441        }
442
443        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
444        self.inner
445            .borrow_mut()
446            .unrealized_pnls
447            .insert(*instrument_id, pnl);
448        Some(pnl)
449    }
450
451    #[must_use]
452    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
453        if let Some(pnl) = self
454            .inner
455            .borrow()
456            .realized_pnls
457            .get(instrument_id)
458            .copied()
459        {
460            return Some(pnl);
461        }
462
463        let pnl = self.calculate_realized_pnl(instrument_id)?;
464        self.inner
465            .borrow_mut()
466            .realized_pnls
467            .insert(*instrument_id, pnl);
468        Some(pnl)
469    }
470
471    #[must_use]
472    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
473        let cache = self.cache.borrow();
474        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
475            account
476        } else {
477            log::error!(
478                "Cannot calculate net exposure: no account registered for {}",
479                instrument_id.venue
480            );
481            return None;
482        };
483
484        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
485            instrument
486        } else {
487            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
488            return None;
489        };
490
491        let positions_open = cache.positions_open(
492            None, // Faster query filtering
493            Some(instrument_id),
494            None,
495            None,
496        );
497
498        if positions_open.is_empty() {
499            return Some(Money::new(0.0, instrument.settlement_currency()));
500        }
501
502        let mut net_exposure = 0.0;
503
504        for position in positions_open {
505            let price = self.get_price(position)?;
506            let xrate = if let Some(xrate) =
507                self.calculate_xrate_to_base(instrument, account, position.entry)
508            {
509                xrate
510            } else {
511                log::error!(
512                    // TODO: Improve logging
513                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
514                    instrument.settlement_currency(),
515                    account.base_currency()
516                );
517                return None; // Cannot calculate
518            };
519
520            let notional_value =
521                instrument.calculate_notional_value(position.quantity, price, None);
522            net_exposure += notional_value.as_f64() * xrate;
523        }
524
525        let settlement_currency = account
526            .base_currency()
527            .unwrap_or_else(|| instrument.settlement_currency());
528
529        Some(Money::new(net_exposure, settlement_currency))
530    }
531
532    #[must_use]
533    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
534        self.inner
535            .borrow()
536            .net_positions
537            .get(instrument_id)
538            .copied()
539            .unwrap_or(Decimal::ZERO)
540    }
541
542    #[must_use]
543    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
544        self.inner
545            .borrow()
546            .net_positions
547            .get(instrument_id)
548            .copied()
549            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
550    }
551
552    #[must_use]
553    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
554        self.inner
555            .borrow()
556            .net_positions
557            .get(instrument_id)
558            .copied()
559            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
560    }
561
562    #[must_use]
563    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
564        self.inner
565            .borrow()
566            .net_positions
567            .get(instrument_id)
568            .copied()
569            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
570    }
571
572    #[must_use]
573    pub fn is_completely_flat(&self) -> bool {
574        for net_position in self.inner.borrow().net_positions.values() {
575            if *net_position != Decimal::ZERO {
576                return false;
577            }
578        }
579        true
580    }
581
582    // -- COMMANDS --------------------------------------------------------------------------------
583
584    /// Initializes account margin based on existing open orders.
585    ///
586    /// # Panics
587    ///
588    /// Panics if updating the cache with a mutated account fails.
589    pub fn initialize_orders(&mut self) {
590        let mut initialized = true;
591        let orders_and_instruments = {
592            let cache = self.cache.borrow();
593            let all_orders_open = cache.orders_open(None, None, None, None);
594
595            let mut instruments_with_orders = Vec::new();
596            let mut instruments = HashSet::new();
597
598            for order in &all_orders_open {
599                instruments.insert(order.instrument_id());
600            }
601
602            for instrument_id in instruments {
603                if let Some(instrument) = cache.instrument(&instrument_id) {
604                    let orders = cache
605                        .orders_open(None, Some(&instrument_id), None, None)
606                        .into_iter()
607                        .cloned()
608                        .collect::<Vec<OrderAny>>();
609                    instruments_with_orders.push((instrument.clone(), orders));
610                } else {
611                    log::error!(
612                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
613                    );
614                    initialized = false;
615                    break;
616                }
617            }
618            instruments_with_orders
619        };
620
621        for (instrument, orders_open) in &orders_and_instruments {
622            let mut cache = self.cache.borrow_mut();
623            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
624                account
625            } else {
626                log::error!(
627                    "Cannot update initial (order) margin: no account registered for {}",
628                    instrument.id().venue
629                );
630                initialized = false;
631                break;
632            };
633
634            let result = self.inner.borrow_mut().accounts.update_orders(
635                account,
636                instrument.clone(),
637                orders_open.iter().collect(),
638                self.clock.borrow().timestamp_ns(),
639            );
640
641            match result {
642                Some((updated_account, _)) => {
643                    cache.add_account(updated_account).unwrap(); // Temp Fix to update the mutated account
644                }
645                None => {
646                    initialized = false;
647                }
648            }
649        }
650
651        let total_orders = orders_and_instruments
652            .into_iter()
653            .map(|(_, orders)| orders.len())
654            .sum::<usize>();
655
656        log::info!(
657            "Initialized {} open order{}",
658            total_orders,
659            if total_orders == 1 { "" } else { "s" }
660        );
661
662        self.inner.borrow_mut().initialized = initialized;
663    }
664
665    /// Initializes account margin based on existing open positions.
666    ///
667    /// # Panics
668    ///
669    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
670    pub fn initialize_positions(&mut self) {
671        self.inner.borrow_mut().unrealized_pnls.clear();
672        self.inner.borrow_mut().realized_pnls.clear();
673        let all_positions_open: Vec<Position>;
674        let mut instruments = HashSet::new();
675        {
676            let cache = self.cache.borrow();
677            all_positions_open = cache
678                .positions_open(None, None, None, None)
679                .into_iter()
680                .cloned()
681                .collect();
682            for position in &all_positions_open {
683                instruments.insert(position.instrument_id);
684            }
685        }
686
687        let mut initialized = true;
688
689        for instrument_id in instruments {
690            let positions_open: Vec<Position> = {
691                let cache = self.cache.borrow();
692                cache
693                    .positions_open(None, Some(&instrument_id), None, None)
694                    .into_iter()
695                    .cloned()
696                    .collect()
697            };
698
699            self.update_net_position(&instrument_id, positions_open);
700
701            let calculated_unrealized_pnl = self
702                .calculate_unrealized_pnl(&instrument_id)
703                .expect("Failed to calculate unrealized PnL");
704            let calculated_realized_pnl = self
705                .calculate_realized_pnl(&instrument_id)
706                .expect("Failed to calculate realized PnL");
707
708            self.inner
709                .borrow_mut()
710                .unrealized_pnls
711                .insert(instrument_id, calculated_unrealized_pnl);
712            self.inner
713                .borrow_mut()
714                .realized_pnls
715                .insert(instrument_id, calculated_realized_pnl);
716
717            let cache = self.cache.borrow();
718            let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
719                account
720            } else {
721                log::error!(
722                    "Cannot update maintenance (position) margin: no account registered for {}",
723                    instrument_id.venue
724                );
725                initialized = false;
726                break;
727            };
728
729            let account = match account {
730                AccountAny::Cash(_) => continue,
731                AccountAny::Margin(margin_account) => margin_account,
732            };
733
734            let mut cache = self.cache.borrow_mut();
735            let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
736                instrument
737            } else {
738                log::error!(
739                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
740                );
741                initialized = false;
742                break;
743            };
744
745            let result = self.inner.borrow_mut().accounts.update_positions(
746                account,
747                instrument.clone(),
748                self.cache
749                    .borrow()
750                    .positions_open(None, Some(&instrument_id), None, None),
751                self.clock.borrow().timestamp_ns(),
752            );
753
754            match result {
755                Some((updated_account, _)) => {
756                    cache
757                        .add_account(AccountAny::Margin(updated_account)) // Temp Fix to update the mutated account
758                        .unwrap();
759                }
760                None => {
761                    initialized = false;
762                }
763            }
764        }
765
766        let open_count = all_positions_open.len();
767        self.inner.borrow_mut().initialized = initialized;
768        log::info!(
769            "Initialized {} open position{}",
770            open_count,
771            if open_count == 1 { "" } else { "s" }
772        );
773    }
774
775    /// Updates portfolio calculations based on a new quote tick.
776    ///
777    /// Recalculates unrealized PnL for positions affected by the quote update.
778    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
779        update_quote_tick(
780            self.cache.clone(),
781            self.clock.clone(),
782            self.inner.clone(),
783            quote,
784        );
785    }
786
787    /// Updates portfolio calculations based on a new bar.
788    ///
789    /// Updates cached bar close prices and recalculates unrealized PnL.
790    pub fn update_bar(&mut self, bar: &Bar) {
791        update_bar(
792            self.cache.clone(),
793            self.clock.clone(),
794            self.inner.clone(),
795            bar,
796        );
797    }
798
799    /// Updates portfolio with a new account state event.
800    pub fn update_account(&mut self, event: &AccountState) {
801        update_account(self.cache.clone(), event);
802    }
803
804    /// Updates portfolio calculations based on an order event.
805    ///
806    /// Handles balance updates for order fills and margin calculations for order changes.
807    pub fn update_order(&mut self, event: &OrderEventAny) {
808        update_order(
809            self.cache.clone(),
810            self.clock.clone(),
811            self.inner.clone(),
812            event,
813        );
814    }
815
816    /// Updates portfolio calculations based on a position event.
817    ///
818    /// Recalculates net positions, unrealized PnL, and margin requirements.
819    pub fn update_position(&mut self, event: &PositionEvent) {
820        update_position(
821            self.cache.clone(),
822            self.clock.clone(),
823            self.inner.clone(),
824            event,
825        );
826    }
827
828    // -- INTERNAL --------------------------------------------------------------------------------
829
830    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
831        let mut net_position = Decimal::ZERO;
832
833        for open_position in positions_open {
834            log::debug!("open_position: {open_position}");
835            net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
836        }
837
838        let existing_position = self.net_position(instrument_id);
839        if existing_position != net_position {
840            self.inner
841                .borrow_mut()
842                .net_positions
843                .insert(*instrument_id, net_position);
844            log::info!("{instrument_id} net_position={net_position}");
845        }
846    }
847
848    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
849        let cache = self.cache.borrow();
850        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
851            account
852        } else {
853            log::error!(
854                "Cannot calculate unrealized PnL: no account registered for {}",
855                instrument_id.venue
856            );
857            return None;
858        };
859
860        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
861            instrument
862        } else {
863            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
864            return None;
865        };
866
867        let currency = account
868            .base_currency()
869            .unwrap_or_else(|| instrument.settlement_currency());
870
871        let positions_open = cache.positions_open(
872            None, // Faster query filtering
873            Some(instrument_id),
874            None,
875            None,
876        );
877
878        if positions_open.is_empty() {
879            return Some(Money::new(0.0, currency));
880        }
881
882        let mut total_pnl = 0.0;
883
884        for position in positions_open {
885            if position.instrument_id != *instrument_id {
886                continue; // Nothing to calculate
887            }
888
889            if position.side == PositionSide::Flat {
890                continue; // Nothing to calculate
891            }
892
893            let price = if let Some(price) = self.get_price(position) {
894                price
895            } else {
896                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
897                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
898                return None; // Cannot calculate
899            };
900
901            let mut pnl = position.unrealized_pnl(price).as_f64();
902
903            if let Some(base_currency) = account.base_currency() {
904                let xrate = if let Some(xrate) =
905                    self.calculate_xrate_to_base(instrument, account, position.entry)
906                {
907                    xrate
908                } else {
909                    log::error!(
910                        // TODO: Improve logging
911                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
912                        instrument.settlement_currency(),
913                        base_currency
914                    );
915                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
916                    return None; // Cannot calculate
917                };
918
919                let scale = 10f64.powi(currency.precision.into());
920                pnl = ((pnl * xrate) * scale).round() / scale;
921            }
922
923            total_pnl += pnl;
924        }
925
926        Some(Money::new(total_pnl, currency))
927    }
928
929    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
930        let cache = self.cache.borrow();
931        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
932            account
933        } else {
934            log::error!(
935                "Cannot calculate realized PnL: no account registered for {}",
936                instrument_id.venue
937            );
938            return None;
939        };
940
941        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
942            instrument
943        } else {
944            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
945            return None;
946        };
947
948        let currency = account
949            .base_currency()
950            .unwrap_or_else(|| instrument.settlement_currency());
951
952        let positions = cache.positions(
953            None, // Faster query filtering
954            Some(instrument_id),
955            None,
956            None,
957        );
958
959        if positions.is_empty() {
960            return Some(Money::new(0.0, currency));
961        }
962
963        let mut total_pnl = 0.0;
964
965        for position in positions {
966            if position.instrument_id != *instrument_id {
967                continue; // Nothing to calculate
968            }
969
970            if position.realized_pnl.is_none() {
971                continue; // Nothing to calculate
972            }
973
974            let mut pnl = position.realized_pnl?.as_f64();
975
976            if let Some(base_currency) = account.base_currency() {
977                let xrate = if let Some(xrate) =
978                    self.calculate_xrate_to_base(instrument, account, position.entry)
979                {
980                    xrate
981                } else {
982                    log::error!(
983                        // TODO: Improve logging
984                        "Cannot calculate realized PnL: insufficient data for {}/{}",
985                        instrument.settlement_currency(),
986                        base_currency
987                    );
988                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
989                    return None; // Cannot calculate
990                };
991
992                let scale = 10f64.powi(currency.precision.into());
993                pnl = ((pnl * xrate) * scale).round() / scale;
994            }
995
996            total_pnl += pnl;
997        }
998
999        Some(Money::new(total_pnl, currency))
1000    }
1001
1002    fn get_price(&self, position: &Position) -> Option<Price> {
1003        let price_type = match position.side {
1004            PositionSide::Long => PriceType::Bid,
1005            PositionSide::Short => PriceType::Ask,
1006            _ => panic!("invalid `PositionSide`, was {}", position.side),
1007        };
1008
1009        let cache = self.cache.borrow();
1010
1011        let instrument_id = &position.instrument_id;
1012        cache
1013            .price(instrument_id, price_type)
1014            .or_else(|| cache.price(instrument_id, PriceType::Last))
1015            .or_else(|| {
1016                self.inner
1017                    .borrow()
1018                    .bar_close_prices
1019                    .get(instrument_id)
1020                    .copied()
1021            })
1022    }
1023
1024    fn calculate_xrate_to_base(
1025        &self,
1026        instrument: &InstrumentAny,
1027        account: &AccountAny,
1028        side: OrderSide,
1029    ) -> Option<f64> {
1030        if !self.config.convert_to_account_base_currency {
1031            return Some(1.0); // No conversion needed
1032        }
1033
1034        match account.base_currency() {
1035            None => Some(1.0), // No conversion needed
1036            Some(base_currency) => {
1037                let cache = self.cache.borrow();
1038
1039                if self.config.use_mark_xrates {
1040                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1041                }
1042
1043                let price_type = if side == OrderSide::Buy {
1044                    PriceType::Bid
1045                } else {
1046                    PriceType::Ask
1047                };
1048
1049                cache.get_xrate(
1050                    instrument.id().venue,
1051                    instrument.settlement_currency(),
1052                    base_currency,
1053                    price_type,
1054                )
1055            }
1056        }
1057    }
1058}
1059
1060// Helper functions
1061fn update_quote_tick(
1062    cache: Rc<RefCell<Cache>>,
1063    clock: Rc<RefCell<dyn Clock>>,
1064    inner: Rc<RefCell<PortfolioState>>,
1065    quote: &QuoteTick,
1066) {
1067    update_instrument_id(cache, clock.clone(), inner, &quote.instrument_id);
1068}
1069
1070fn update_bar(
1071    cache: Rc<RefCell<Cache>>,
1072    clock: Rc<RefCell<dyn Clock>>,
1073    inner: Rc<RefCell<PortfolioState>>,
1074    bar: &Bar,
1075) {
1076    let instrument_id = bar.bar_type.instrument_id();
1077    inner
1078        .borrow_mut()
1079        .bar_close_prices
1080        .insert(instrument_id, bar.close);
1081    update_instrument_id(cache, clock.clone(), inner, &instrument_id);
1082}
1083
1084fn update_instrument_id(
1085    cache: Rc<RefCell<Cache>>,
1086    clock: Rc<RefCell<dyn Clock>>,
1087    inner: Rc<RefCell<PortfolioState>>,
1088    instrument_id: &InstrumentId,
1089) {
1090    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1091
1092    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1093        return;
1094    }
1095
1096    let result_init;
1097    let mut result_maint = None;
1098
1099    let account = {
1100        let cache_ref = cache.borrow();
1101        let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1102            account
1103        } else {
1104            log::error!(
1105                "Cannot update tick: no account registered for {}",
1106                instrument_id.venue
1107            );
1108            return;
1109        };
1110
1111        let mut cache_ref = cache.borrow_mut();
1112        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1113            instrument.clone()
1114        } else {
1115            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1116            return;
1117        };
1118
1119        // Clone the orders and positions to own the data
1120        let orders_open: Vec<OrderAny> = cache_ref
1121            .orders_open(None, Some(instrument_id), None, None)
1122            .iter()
1123            .map(|o| (*o).clone())
1124            .collect();
1125
1126        let positions_open: Vec<Position> = cache_ref
1127            .positions_open(None, Some(instrument_id), None, None)
1128            .iter()
1129            .map(|p| (*p).clone())
1130            .collect();
1131
1132        result_init = inner.borrow().accounts.update_orders(
1133            account,
1134            instrument.clone(),
1135            orders_open.iter().collect(),
1136            clock.borrow().timestamp_ns(),
1137        );
1138
1139        if let AccountAny::Margin(margin_account) = account {
1140            result_maint = inner.borrow().accounts.update_positions(
1141                margin_account,
1142                instrument,
1143                positions_open.iter().collect(),
1144                clock.borrow().timestamp_ns(),
1145            );
1146        }
1147
1148        if let Some((ref updated_account, _)) = result_init {
1149            cache_ref.add_account(updated_account.clone()).unwrap(); // Temp Fix to update the mutated account
1150        }
1151        account.clone()
1152    };
1153
1154    let mut portfolio_clone = Portfolio {
1155        clock: clock.clone(),
1156        cache,
1157        inner: inner.clone(),
1158        config: PortfolioConfig::default(), // TODO: TBD
1159    };
1160
1161    let result_unrealized_pnl: Option<Money> =
1162        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1163
1164    if result_init.is_some()
1165        && (matches!(account, AccountAny::Cash(_))
1166            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1167    {
1168        inner.borrow_mut().pending_calcs.remove(instrument_id);
1169        if inner.borrow().pending_calcs.is_empty() {
1170            inner.borrow_mut().initialized = true;
1171        }
1172    }
1173}
1174
1175fn update_order(
1176    cache: Rc<RefCell<Cache>>,
1177    clock: Rc<RefCell<dyn Clock>>,
1178    inner: Rc<RefCell<PortfolioState>>,
1179    event: &OrderEventAny,
1180) {
1181    let cache_ref = cache.borrow();
1182    let account_id = match event.account_id() {
1183        Some(account_id) => account_id,
1184        None => {
1185            return; // No Account Assigned
1186        }
1187    };
1188
1189    let account = if let Some(account) = cache_ref.account(&account_id) {
1190        account
1191    } else {
1192        log::error!("Cannot update order: no account registered for {account_id}");
1193        return;
1194    };
1195
1196    match account {
1197        AccountAny::Cash(cash_account) => {
1198            if !cash_account.base.calculate_account_state {
1199                return;
1200            }
1201        }
1202        AccountAny::Margin(margin_account) => {
1203            if !margin_account.base.calculate_account_state {
1204                return;
1205            }
1206        }
1207    }
1208
1209    match event {
1210        OrderEventAny::Accepted(_)
1211        | OrderEventAny::Canceled(_)
1212        | OrderEventAny::Rejected(_)
1213        | OrderEventAny::Updated(_)
1214        | OrderEventAny::Filled(_) => {}
1215        _ => {
1216            return;
1217        }
1218    }
1219
1220    let cache_ref = cache.borrow();
1221    let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1222        order
1223    } else {
1224        log::error!(
1225            "Cannot update order: {} not found in the cache",
1226            event.client_order_id()
1227        );
1228        return; // No Order Found
1229    };
1230
1231    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1232        return; // No change to account state
1233    }
1234
1235    let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1236        instrument_id
1237    } else {
1238        log::error!(
1239            "Cannot update order: no instrument found for {}",
1240            event.instrument_id()
1241        );
1242        return;
1243    };
1244
1245    if let OrderEventAny::Filled(order_filled) = event {
1246        let _ = inner.borrow().accounts.update_balances(
1247            account.clone(),
1248            instrument.clone(),
1249            *order_filled,
1250        );
1251
1252        let mut portfolio_clone = Portfolio {
1253            clock: clock.clone(),
1254            cache: cache.clone(),
1255            inner: inner.clone(),
1256            config: PortfolioConfig::default(), // TODO: TBD
1257        };
1258
1259        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1260            Some(unrealized_pnl) => {
1261                inner
1262                    .borrow_mut()
1263                    .unrealized_pnls
1264                    .insert(event.instrument_id(), unrealized_pnl);
1265            }
1266            None => {
1267                log::error!(
1268                    "Failed to calculate unrealized PnL for instrument {}",
1269                    event.instrument_id()
1270                );
1271            }
1272        }
1273    }
1274
1275    let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1276
1277    let account_state = inner.borrow_mut().accounts.update_orders(
1278        account,
1279        instrument.clone(),
1280        orders_open,
1281        clock.borrow().timestamp_ns(),
1282    );
1283
1284    let mut cache_ref = cache.borrow_mut();
1285    cache_ref.update_account(account.clone()).unwrap();
1286
1287    if let Some(account_state) = account_state {
1288        msgbus::publish(
1289            format!("events.account.{}", account.id()).into(),
1290            &account_state,
1291        );
1292    } else {
1293        log::debug!("Added pending calculation for {}", instrument.id());
1294        inner.borrow_mut().pending_calcs.insert(instrument.id());
1295    }
1296
1297    log::debug!("Updated {event}");
1298}
1299
1300fn update_position(
1301    cache: Rc<RefCell<Cache>>,
1302    clock: Rc<RefCell<dyn Clock>>,
1303    inner: Rc<RefCell<PortfolioState>>,
1304    event: &PositionEvent,
1305) {
1306    let instrument_id = event.instrument_id();
1307
1308    let positions_open: Vec<Position> = {
1309        let cache_ref = cache.borrow();
1310
1311        cache_ref
1312            .positions_open(None, Some(&instrument_id), None, None)
1313            .iter()
1314            .map(|o| (*o).clone())
1315            .collect()
1316    };
1317
1318    log::debug!("postion fresh from cache -> {positions_open:?}");
1319
1320    let mut portfolio_clone = Portfolio {
1321        clock: clock.clone(),
1322        cache: cache.clone(),
1323        inner: inner.clone(),
1324        config: PortfolioConfig::default(), // TODO: TBD
1325    };
1326
1327    portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1328
1329    let calculated_unrealized_pnl = portfolio_clone
1330        .calculate_unrealized_pnl(&instrument_id)
1331        .expect("Failed to calculate unrealized PnL");
1332    let calculated_realized_pnl = portfolio_clone
1333        .calculate_realized_pnl(&instrument_id)
1334        .expect("Failed to calculate realized PnL");
1335
1336    inner
1337        .borrow_mut()
1338        .unrealized_pnls
1339        .insert(event.instrument_id(), calculated_unrealized_pnl);
1340    inner
1341        .borrow_mut()
1342        .realized_pnls
1343        .insert(event.instrument_id(), calculated_realized_pnl);
1344
1345    let cache_ref = cache.borrow();
1346    let account = cache_ref.account(&event.account_id());
1347
1348    if let Some(AccountAny::Margin(margin_account)) = account {
1349        if !margin_account.calculate_account_state {
1350            return; // Nothing to calculate
1351        }
1352
1353        let cache_ref = cache.borrow();
1354        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1355            instrument
1356        } else {
1357            log::error!("Cannot update position: no instrument found for {instrument_id}");
1358            return;
1359        };
1360
1361        let result = inner.borrow_mut().accounts.update_positions(
1362            margin_account,
1363            instrument.clone(),
1364            positions_open.iter().collect(),
1365            clock.borrow().timestamp_ns(),
1366        );
1367        let mut cache_ref = cache.borrow_mut();
1368        if let Some((margin_account, _)) = result {
1369            cache_ref
1370                .add_account(AccountAny::Margin(margin_account)) // Temp Fix to update the mutated account
1371                .unwrap();
1372        }
1373    } else if account.is_none() {
1374        log::error!(
1375            "Cannot update position: no account registered for {}",
1376            event.account_id()
1377        );
1378    }
1379}
1380
1381pub fn update_account(cache: Rc<RefCell<Cache>>, event: &AccountState) {
1382    let mut cache_ref = cache.borrow_mut();
1383
1384    if let Some(existing) = cache_ref.account(&event.account_id) {
1385        let mut account = existing.clone();
1386        account.apply(event.clone());
1387
1388        if let Err(e) = cache_ref.update_account(account.clone()) {
1389            log::error!("Failed to update account: {e}");
1390            return;
1391        }
1392    } else {
1393        let account = match AccountAny::from_events(vec![event.clone()]) {
1394            Ok(account) => account,
1395            Err(e) => {
1396                log::error!("Failed to create account: {e}");
1397                return;
1398            }
1399        };
1400
1401        if let Err(e) = cache_ref.add_account(account) {
1402            log::error!("Failed to add account: {e}");
1403            return;
1404        }
1405    }
1406
1407    log::info!("Updated {event}");
1408}