1#![allow(dead_code)] use std::{
21 cell::RefCell,
22 collections::{HashMap, HashSet},
23 fmt::Debug,
24 rc::Rc,
25};
26
27use nautilus_analysis::analyzer::PortfolioAnalyzer;
28use nautilus_common::{
29 cache::Cache,
30 clock::Clock,
31 msgbus::{
32 self,
33 handler::{ShareableMessageHandler, TypedMessageHandler},
34 },
35};
36use nautilus_model::{
37 accounts::AccountAny,
38 data::{Bar, QuoteTick},
39 enums::{OrderSide, OrderType, PositionSide, PriceType},
40 events::{AccountState, OrderEventAny, position::PositionEvent},
41 identifiers::{InstrumentId, Venue},
42 instruments::{Instrument, InstrumentAny},
43 orders::{Order, OrderAny},
44 position::Position,
45 types::{Currency, Money, Price},
46};
47use rust_decimal::{Decimal, prelude::FromPrimitive};
48
49use crate::{config::PortfolioConfig, manager::AccountsManager};
50
51struct PortfolioState {
52 accounts: AccountsManager,
53 analyzer: PortfolioAnalyzer,
54 unrealized_pnls: HashMap<InstrumentId, Money>,
55 realized_pnls: HashMap<InstrumentId, Money>,
56 net_positions: HashMap<InstrumentId, Decimal>,
57 pending_calcs: HashSet<InstrumentId>,
58 bar_close_prices: HashMap<InstrumentId, Price>,
59 initialized: bool,
60}
61
62impl PortfolioState {
63 fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
64 Self {
65 accounts: AccountsManager::new(clock, cache),
66 analyzer: PortfolioAnalyzer::default(),
67 unrealized_pnls: HashMap::new(),
68 realized_pnls: HashMap::new(),
69 net_positions: HashMap::new(),
70 pending_calcs: HashSet::new(),
71 bar_close_prices: HashMap::new(),
72 initialized: false,
73 }
74 }
75
76 fn reset(&mut self) {
77 log::debug!("RESETTING");
78 self.net_positions.clear();
79 self.unrealized_pnls.clear();
80 self.realized_pnls.clear();
81 self.pending_calcs.clear();
82 self.analyzer.reset();
83 log::debug!("READY");
84 }
85}
86
87pub struct Portfolio {
88 pub(crate) clock: Rc<RefCell<dyn Clock>>,
89 pub(crate) cache: Rc<RefCell<Cache>>,
90 inner: Rc<RefCell<PortfolioState>>,
91 config: PortfolioConfig,
92}
93
94impl Debug for Portfolio {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct(stringify!(Portfolio)).finish()
97 }
98}
99
100impl Portfolio {
101 pub fn new(
102 cache: Rc<RefCell<Cache>>,
103 clock: Rc<RefCell<dyn Clock>>,
104 config: Option<PortfolioConfig>,
105 ) -> Self {
106 let inner = Rc::new(RefCell::new(PortfolioState::new(
107 clock.clone(),
108 cache.clone(),
109 )));
110 let config = config.unwrap_or_default();
111
112 Self::register_message_handlers(
113 cache.clone(),
114 clock.clone(),
115 inner.clone(),
116 config.bar_updates,
117 );
118
119 Self {
120 clock,
121 cache,
122 inner,
123 config,
124 }
125 }
126
127 fn register_message_handlers(
128 cache: Rc<RefCell<Cache>>,
129 clock: Rc<RefCell<dyn Clock>>,
130 inner: Rc<RefCell<PortfolioState>>,
131 bar_updates: bool,
132 ) {
133 let update_account_handler = {
134 let cache = cache.clone();
135 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
136 move |event: &AccountState| {
137 update_account(cache.clone(), event);
138 },
139 )))
140 };
141
142 let update_position_handler = {
143 let cache = cache.clone();
144 let clock = clock.clone();
145 let inner = inner.clone();
146 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
147 move |event: &PositionEvent| {
148 update_position(cache.clone(), clock.clone(), inner.clone(), event);
149 },
150 )))
151 };
152
153 let update_quote_handler = {
154 let cache = cache.clone();
155 let clock = clock.clone();
156 let inner = inner.clone();
157 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
158 move |quote: &QuoteTick| {
159 update_quote_tick(cache.clone(), clock.clone(), inner.clone(), quote);
160 },
161 )))
162 };
163
164 let update_bar_handler = {
165 let cache = cache.clone();
166 let clock = clock.clone();
167 let inner = inner.clone();
168 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
169 update_bar(cache.clone(), clock.clone(), inner.clone(), bar);
170 })))
171 };
172
173 let update_order_handler = {
174 let cache = cache;
175 let clock = clock.clone();
176 let inner = inner;
177 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
178 move |event: &OrderEventAny| {
179 update_order(cache.clone(), clock.clone(), inner.clone(), event);
180 },
181 )))
182 };
183
184 msgbus::register(
185 "Portfolio.update_account".into(),
186 update_account_handler.clone(),
187 );
188
189 msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
190 if bar_updates {
191 msgbus::subscribe("data.quotes.*EXTERNAL".into(), update_bar_handler, Some(10));
192 }
193 msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
194 msgbus::subscribe(
195 "events.position.*".into(),
196 update_position_handler,
197 Some(10),
198 );
199 msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
200 }
201
202 pub fn reset(&mut self) {
203 log::debug!("RESETTING");
204 self.inner.borrow_mut().reset();
205 log::debug!("READY");
206 }
207
208 #[must_use]
211 pub fn is_initialized(&self) -> bool {
212 self.inner.borrow().initialized
213 }
214
215 #[must_use]
216 pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
217 self.cache.borrow().account_for_venue(venue).map_or_else(
218 || {
219 log::error!("Cannot get balances locked: no account generated for {venue}");
220 HashMap::new()
221 },
222 AccountAny::balances_locked,
223 )
224 }
225
226 #[must_use]
227 pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
228 self.cache.borrow().account_for_venue(venue).map_or_else(
229 || {
230 log::error!(
231 "Cannot get initial (order) margins: no account registered for {venue}"
232 );
233 HashMap::new()
234 },
235 |account| match account {
236 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
237 AccountAny::Cash(_) => {
238 log::warn!("Initial margins not applicable for cash account");
239 HashMap::new()
240 }
241 },
242 )
243 }
244
245 #[must_use]
246 pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
247 self.cache.borrow().account_for_venue(venue).map_or_else(
248 || {
249 log::error!(
250 "Cannot get maintenance (position) margins: no account registered for {venue}"
251 );
252 HashMap::new()
253 },
254 |account| match account {
255 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
256 AccountAny::Cash(_) => {
257 log::warn!("Maintenance margins not applicable for cash account");
258 HashMap::new()
259 }
260 },
261 )
262 }
263
264 #[must_use]
265 pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
266 let instrument_ids = {
267 let cache = self.cache.borrow();
268 let positions = cache.positions(Some(venue), None, None, None);
269
270 if positions.is_empty() {
271 return HashMap::new(); }
273
274 let instrument_ids: HashSet<InstrumentId> =
275 positions.iter().map(|p| p.instrument_id).collect();
276
277 instrument_ids
278 };
279
280 let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
281
282 for instrument_id in instrument_ids {
283 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
284 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
286 continue;
287 }
288
289 match self.calculate_unrealized_pnl(&instrument_id) {
291 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
292 None => continue,
293 }
294 }
295
296 unrealized_pnls
297 .into_iter()
298 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
299 .collect()
300 }
301
302 #[must_use]
303 pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
304 let instrument_ids = {
305 let cache = self.cache.borrow();
306 let positions = cache.positions(Some(venue), None, None, None);
307
308 if positions.is_empty() {
309 return HashMap::new(); }
311
312 let instrument_ids: HashSet<InstrumentId> =
313 positions.iter().map(|p| p.instrument_id).collect();
314
315 instrument_ids
316 };
317
318 let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
319
320 for instrument_id in instrument_ids {
321 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
322 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
324 continue;
325 }
326
327 match self.calculate_realized_pnl(&instrument_id) {
329 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
330 None => continue,
331 }
332 }
333
334 realized_pnls
335 .into_iter()
336 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
337 .collect()
338 }
339
340 #[must_use]
341 pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
342 let cache = self.cache.borrow();
343 let account = if let Some(account) = cache.account_for_venue(venue) {
344 account
345 } else {
346 log::error!("Cannot calculate net exposures: no account registered for {venue}");
347 return None; };
349
350 let positions_open = cache.positions_open(Some(venue), None, None, None);
351 if positions_open.is_empty() {
352 return Some(HashMap::new()); }
354
355 let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
356
357 for position in positions_open {
358 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
359 instrument
360 } else {
361 log::error!(
362 "Cannot calculate net exposures: no instrument for {}",
363 position.instrument_id
364 );
365 return None; };
367
368 if position.side == PositionSide::Flat {
369 log::error!(
370 "Cannot calculate net exposures: position is flat for {}",
371 position.instrument_id
372 );
373 continue; }
375
376 let price = self.get_price(position)?;
377 let xrate = if let Some(xrate) =
378 self.calculate_xrate_to_base(instrument, account, position.entry)
379 {
380 xrate
381 } else {
382 log::error!(
383 "Cannot calculate net exposures: insufficient data for {}/{:?}",
385 instrument.settlement_currency(),
386 account.base_currency()
387 );
388 return None; };
390
391 let settlement_currency = account
392 .base_currency()
393 .unwrap_or_else(|| instrument.settlement_currency());
394
395 let net_exposure = instrument
396 .calculate_notional_value(position.quantity, price, None)
397 .as_f64()
398 * xrate;
399
400 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
401 .round()
402 / 10f64.powi(settlement_currency.precision.into());
403
404 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
405 }
406
407 Some(
408 net_exposures
409 .into_iter()
410 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
411 .collect(),
412 )
413 }
414
415 #[must_use]
416 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
417 if let Some(pnl) = self
418 .inner
419 .borrow()
420 .unrealized_pnls
421 .get(instrument_id)
422 .copied()
423 {
424 return Some(pnl);
425 }
426
427 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
428 self.inner
429 .borrow_mut()
430 .unrealized_pnls
431 .insert(*instrument_id, pnl);
432 Some(pnl)
433 }
434
435 #[must_use]
436 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
437 if let Some(pnl) = self
438 .inner
439 .borrow()
440 .realized_pnls
441 .get(instrument_id)
442 .copied()
443 {
444 return Some(pnl);
445 }
446
447 let pnl = self.calculate_realized_pnl(instrument_id)?;
448 self.inner
449 .borrow_mut()
450 .realized_pnls
451 .insert(*instrument_id, pnl);
452 Some(pnl)
453 }
454
455 #[must_use]
456 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
457 let cache = self.cache.borrow();
458 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
459 account
460 } else {
461 log::error!(
462 "Cannot calculate net exposure: no account registered for {}",
463 instrument_id.venue
464 );
465 return None;
466 };
467
468 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
469 instrument
470 } else {
471 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
472 return None;
473 };
474
475 let positions_open = cache.positions_open(
476 None, Some(instrument_id),
478 None,
479 None,
480 );
481
482 if positions_open.is_empty() {
483 return Some(Money::new(0.0, instrument.settlement_currency()));
484 }
485
486 let mut net_exposure = 0.0;
487
488 for position in positions_open {
489 let price = self.get_price(position)?;
490 let xrate = if let Some(xrate) =
491 self.calculate_xrate_to_base(instrument, account, position.entry)
492 {
493 xrate
494 } else {
495 log::error!(
496 "Cannot calculate net exposures: insufficient data for {}/{:?}",
498 instrument.settlement_currency(),
499 account.base_currency()
500 );
501 return None; };
503
504 let notional_value =
505 instrument.calculate_notional_value(position.quantity, price, None);
506 net_exposure += notional_value.as_f64() * xrate;
507 }
508
509 let settlement_currency = account
510 .base_currency()
511 .unwrap_or_else(|| instrument.settlement_currency());
512
513 Some(Money::new(net_exposure, settlement_currency))
514 }
515
516 #[must_use]
517 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
518 self.inner
519 .borrow()
520 .net_positions
521 .get(instrument_id)
522 .copied()
523 .unwrap_or(Decimal::ZERO)
524 }
525
526 #[must_use]
527 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
528 self.inner
529 .borrow()
530 .net_positions
531 .get(instrument_id)
532 .copied()
533 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
534 }
535
536 #[must_use]
537 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
538 self.inner
539 .borrow()
540 .net_positions
541 .get(instrument_id)
542 .copied()
543 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
544 }
545
546 #[must_use]
547 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
548 self.inner
549 .borrow()
550 .net_positions
551 .get(instrument_id)
552 .copied()
553 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
554 }
555
556 #[must_use]
557 pub fn is_completely_flat(&self) -> bool {
558 for net_position in self.inner.borrow().net_positions.values() {
559 if *net_position != Decimal::ZERO {
560 return false;
561 }
562 }
563 true
564 }
565
566 pub fn initialize_orders(&mut self) {
574 let mut initialized = true;
575 let orders_and_instruments = {
576 let cache = self.cache.borrow();
577 let all_orders_open = cache.orders_open(None, None, None, None);
578
579 let mut instruments_with_orders = Vec::new();
580 let mut instruments = HashSet::new();
581
582 for order in &all_orders_open {
583 instruments.insert(order.instrument_id());
584 }
585
586 for instrument_id in instruments {
587 if let Some(instrument) = cache.instrument(&instrument_id) {
588 let orders = cache
589 .orders_open(None, Some(&instrument_id), None, None)
590 .into_iter()
591 .cloned()
592 .collect::<Vec<OrderAny>>();
593 instruments_with_orders.push((instrument.clone(), orders));
594 } else {
595 log::error!(
596 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
597 );
598 initialized = false;
599 break;
600 }
601 }
602 instruments_with_orders
603 };
604
605 for (instrument, orders_open) in &orders_and_instruments {
606 let mut cache = self.cache.borrow_mut();
607 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
608 account
609 } else {
610 log::error!(
611 "Cannot update initial (order) margin: no account registered for {}",
612 instrument.id().venue
613 );
614 initialized = false;
615 break;
616 };
617
618 let result = self.inner.borrow_mut().accounts.update_orders(
619 account,
620 instrument.clone(),
621 orders_open.iter().collect(),
622 self.clock.borrow().timestamp_ns(),
623 );
624
625 match result {
626 Some((updated_account, _)) => {
627 cache.add_account(updated_account).unwrap(); }
629 None => {
630 initialized = false;
631 }
632 }
633 }
634
635 let total_orders = orders_and_instruments
636 .into_iter()
637 .map(|(_, orders)| orders.len())
638 .sum::<usize>();
639
640 log::info!(
641 "Initialized {} open order{}",
642 total_orders,
643 if total_orders == 1 { "" } else { "s" }
644 );
645
646 self.inner.borrow_mut().initialized = initialized;
647 }
648
649 pub fn initialize_positions(&mut self) {
655 self.inner.borrow_mut().unrealized_pnls.clear();
656 self.inner.borrow_mut().realized_pnls.clear();
657 let all_positions_open: Vec<Position>;
658 let mut instruments = HashSet::new();
659 {
660 let cache = self.cache.borrow();
661 all_positions_open = cache
662 .positions_open(None, None, None, None)
663 .into_iter()
664 .cloned()
665 .collect();
666 for position in &all_positions_open {
667 instruments.insert(position.instrument_id);
668 }
669 }
670
671 let mut initialized = true;
672
673 for instrument_id in instruments {
674 let positions_open: Vec<Position> = {
675 let cache = self.cache.borrow();
676 cache
677 .positions_open(None, Some(&instrument_id), None, None)
678 .into_iter()
679 .cloned()
680 .collect()
681 };
682
683 self.update_net_position(&instrument_id, positions_open);
684
685 let calculated_unrealized_pnl = self
686 .calculate_unrealized_pnl(&instrument_id)
687 .expect("Failed to calculate unrealized PnL");
688 let calculated_realized_pnl = self
689 .calculate_realized_pnl(&instrument_id)
690 .expect("Failed to calculate realized PnL");
691
692 self.inner
693 .borrow_mut()
694 .unrealized_pnls
695 .insert(instrument_id, calculated_unrealized_pnl);
696 self.inner
697 .borrow_mut()
698 .realized_pnls
699 .insert(instrument_id, calculated_realized_pnl);
700
701 let cache = self.cache.borrow();
702 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
703 account
704 } else {
705 log::error!(
706 "Cannot update maintenance (position) margin: no account registered for {}",
707 instrument_id.venue
708 );
709 initialized = false;
710 break;
711 };
712
713 let account = match account {
714 AccountAny::Cash(_) => continue,
715 AccountAny::Margin(margin_account) => margin_account,
716 };
717
718 let mut cache = self.cache.borrow_mut();
719 let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
720 instrument
721 } else {
722 log::error!(
723 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
724 );
725 initialized = false;
726 break;
727 };
728
729 let result = self.inner.borrow_mut().accounts.update_positions(
730 account,
731 instrument.clone(),
732 self.cache
733 .borrow()
734 .positions_open(None, Some(&instrument_id), None, None),
735 self.clock.borrow().timestamp_ns(),
736 );
737
738 match result {
739 Some((updated_account, _)) => {
740 cache
741 .add_account(AccountAny::Margin(updated_account)) .unwrap();
743 }
744 None => {
745 initialized = false;
746 }
747 }
748 }
749
750 let open_count = all_positions_open.len();
751 self.inner.borrow_mut().initialized = initialized;
752 log::info!(
753 "Initialized {} open position{}",
754 open_count,
755 if open_count == 1 { "" } else { "s" }
756 );
757 }
758
759 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
760 update_quote_tick(
761 self.cache.clone(),
762 self.clock.clone(),
763 self.inner.clone(),
764 quote,
765 );
766 }
767
768 pub fn update_bar(&mut self, bar: &Bar) {
769 update_bar(
770 self.cache.clone(),
771 self.clock.clone(),
772 self.inner.clone(),
773 bar,
774 );
775 }
776
777 pub fn update_account(&mut self, event: &AccountState) {
778 update_account(self.cache.clone(), event);
779 }
780
781 pub fn update_order(&mut self, event: &OrderEventAny) {
782 update_order(
783 self.cache.clone(),
784 self.clock.clone(),
785 self.inner.clone(),
786 event,
787 );
788 }
789
790 pub fn update_position(&mut self, event: &PositionEvent) {
791 update_position(
792 self.cache.clone(),
793 self.clock.clone(),
794 self.inner.clone(),
795 event,
796 );
797 }
798
799 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
802 let mut net_position = Decimal::ZERO;
803
804 for open_position in positions_open {
805 log::debug!("open_position: {open_position}");
806 net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
807 }
808
809 let existing_position = self.net_position(instrument_id);
810 if existing_position != net_position {
811 self.inner
812 .borrow_mut()
813 .net_positions
814 .insert(*instrument_id, net_position);
815 log::info!("{instrument_id} net_position={net_position}");
816 }
817 }
818
819 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
820 let cache = self.cache.borrow();
821 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
822 account
823 } else {
824 log::error!(
825 "Cannot calculate unrealized PnL: no account registered for {}",
826 instrument_id.venue
827 );
828 return None;
829 };
830
831 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
832 instrument
833 } else {
834 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
835 return None;
836 };
837
838 let currency = account
839 .base_currency()
840 .unwrap_or_else(|| instrument.settlement_currency());
841
842 let positions_open = cache.positions_open(
843 None, Some(instrument_id),
845 None,
846 None,
847 );
848
849 if positions_open.is_empty() {
850 return Some(Money::new(0.0, currency));
851 }
852
853 let mut total_pnl = 0.0;
854
855 for position in positions_open {
856 if position.instrument_id != *instrument_id {
857 continue; }
859
860 if position.side == PositionSide::Flat {
861 continue; }
863
864 let price = if let Some(price) = self.get_price(position) {
865 price
866 } else {
867 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
868 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
869 return None; };
871
872 let mut pnl = position.unrealized_pnl(price).as_f64();
873
874 if let Some(base_currency) = account.base_currency() {
875 let xrate = if let Some(xrate) =
876 self.calculate_xrate_to_base(instrument, account, position.entry)
877 {
878 xrate
879 } else {
880 log::error!(
881 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
883 instrument.settlement_currency(),
884 base_currency
885 );
886 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
887 return None; };
889
890 let scale = 10f64.powi(currency.precision.into());
891 pnl = ((pnl * xrate) * scale).round() / scale;
892 }
893
894 total_pnl += pnl;
895 }
896
897 Some(Money::new(total_pnl, currency))
898 }
899
900 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
901 let cache = self.cache.borrow();
902 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
903 account
904 } else {
905 log::error!(
906 "Cannot calculate realized PnL: no account registered for {}",
907 instrument_id.venue
908 );
909 return None;
910 };
911
912 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
913 instrument
914 } else {
915 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
916 return None;
917 };
918
919 let currency = account
920 .base_currency()
921 .unwrap_or_else(|| instrument.settlement_currency());
922
923 let positions = cache.positions(
924 None, Some(instrument_id),
926 None,
927 None,
928 );
929
930 if positions.is_empty() {
931 return Some(Money::new(0.0, currency));
932 }
933
934 let mut total_pnl = 0.0;
935
936 for position in positions {
937 if position.instrument_id != *instrument_id {
938 continue; }
940
941 if position.realized_pnl.is_none() {
942 continue; }
944
945 let mut pnl = position.realized_pnl?.as_f64();
946
947 if let Some(base_currency) = account.base_currency() {
948 let xrate = if let Some(xrate) =
949 self.calculate_xrate_to_base(instrument, account, position.entry)
950 {
951 xrate
952 } else {
953 log::error!(
954 "Cannot calculate realized PnL: insufficient data for {}/{}",
956 instrument.settlement_currency(),
957 base_currency
958 );
959 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
960 return None; };
962
963 let scale = 10f64.powi(currency.precision.into());
964 pnl = ((pnl * xrate) * scale).round() / scale;
965 }
966
967 total_pnl += pnl;
968 }
969
970 Some(Money::new(total_pnl, currency))
971 }
972
973 fn get_price(&self, position: &Position) -> Option<Price> {
974 let price_type = match position.side {
975 PositionSide::Long => PriceType::Bid,
976 PositionSide::Short => PriceType::Ask,
977 _ => panic!("invalid `PositionSide`, was {}", position.side),
978 };
979
980 let cache = self.cache.borrow();
981
982 let instrument_id = &position.instrument_id;
983 cache
984 .price(instrument_id, price_type)
985 .or_else(|| cache.price(instrument_id, PriceType::Last))
986 .or_else(|| {
987 self.inner
988 .borrow()
989 .bar_close_prices
990 .get(instrument_id)
991 .copied()
992 })
993 }
994
995 fn calculate_xrate_to_base(
996 &self,
997 instrument: &InstrumentAny,
998 account: &AccountAny,
999 side: OrderSide,
1000 ) -> Option<f64> {
1001 if !self.config.convert_to_account_base_currency {
1002 return Some(1.0); }
1004
1005 match account.base_currency() {
1006 None => Some(1.0), Some(base_currency) => {
1008 let cache = self.cache.borrow();
1009
1010 if self.config.use_mark_xrates {
1011 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1012 }
1013
1014 let price_type = if side == OrderSide::Buy {
1015 PriceType::Bid
1016 } else {
1017 PriceType::Ask
1018 };
1019
1020 cache.get_xrate(
1021 instrument.id().venue,
1022 instrument.settlement_currency(),
1023 base_currency,
1024 price_type,
1025 )
1026 }
1027 }
1028 }
1029}
1030
1031fn update_quote_tick(
1033 cache: Rc<RefCell<Cache>>,
1034 clock: Rc<RefCell<dyn Clock>>,
1035 inner: Rc<RefCell<PortfolioState>>,
1036 quote: &QuoteTick,
1037) {
1038 update_instrument_id(cache, clock.clone(), inner, "e.instrument_id);
1039}
1040
1041fn update_bar(
1042 cache: Rc<RefCell<Cache>>,
1043 clock: Rc<RefCell<dyn Clock>>,
1044 inner: Rc<RefCell<PortfolioState>>,
1045 bar: &Bar,
1046) {
1047 let instrument_id = bar.bar_type.instrument_id();
1048 inner
1049 .borrow_mut()
1050 .bar_close_prices
1051 .insert(instrument_id, bar.close);
1052 update_instrument_id(cache, clock.clone(), inner, &instrument_id);
1053}
1054
1055fn update_instrument_id(
1056 cache: Rc<RefCell<Cache>>,
1057 clock: Rc<RefCell<dyn Clock>>,
1058 inner: Rc<RefCell<PortfolioState>>,
1059 instrument_id: &InstrumentId,
1060) {
1061 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1062
1063 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1064 return;
1065 }
1066
1067 let result_init;
1068 let mut result_maint = None;
1069
1070 let account = {
1071 let cache_ref = cache.borrow();
1072 let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1073 account
1074 } else {
1075 log::error!(
1076 "Cannot update tick: no account registered for {}",
1077 instrument_id.venue
1078 );
1079 return;
1080 };
1081
1082 let mut cache_ref = cache.borrow_mut();
1083 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1084 instrument.clone()
1085 } else {
1086 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1087 return;
1088 };
1089
1090 let orders_open: Vec<OrderAny> = cache_ref
1092 .orders_open(None, Some(instrument_id), None, None)
1093 .iter()
1094 .map(|o| (*o).clone())
1095 .collect();
1096
1097 let positions_open: Vec<Position> = cache_ref
1098 .positions_open(None, Some(instrument_id), None, None)
1099 .iter()
1100 .map(|p| (*p).clone())
1101 .collect();
1102
1103 result_init = inner.borrow().accounts.update_orders(
1104 account,
1105 instrument.clone(),
1106 orders_open.iter().collect(),
1107 clock.borrow().timestamp_ns(),
1108 );
1109
1110 if let AccountAny::Margin(margin_account) = account {
1111 result_maint = inner.borrow().accounts.update_positions(
1112 margin_account,
1113 instrument,
1114 positions_open.iter().collect(),
1115 clock.borrow().timestamp_ns(),
1116 );
1117 }
1118
1119 if let Some((ref updated_account, _)) = result_init {
1120 cache_ref.add_account(updated_account.clone()).unwrap(); }
1122 account.clone()
1123 };
1124
1125 let mut portfolio_clone = Portfolio {
1126 clock: clock.clone(),
1127 cache,
1128 inner: inner.clone(),
1129 config: PortfolioConfig::default(), };
1131
1132 let result_unrealized_pnl: Option<Money> =
1133 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1134
1135 if result_init.is_some()
1136 && (matches!(account, AccountAny::Cash(_))
1137 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1138 {
1139 inner.borrow_mut().pending_calcs.remove(instrument_id);
1140 if inner.borrow().pending_calcs.is_empty() {
1141 inner.borrow_mut().initialized = true;
1142 }
1143 }
1144}
1145
1146fn update_order(
1147 cache: Rc<RefCell<Cache>>,
1148 clock: Rc<RefCell<dyn Clock>>,
1149 inner: Rc<RefCell<PortfolioState>>,
1150 event: &OrderEventAny,
1151) {
1152 let cache_ref = cache.borrow();
1153 let account_id = match event.account_id() {
1154 Some(account_id) => account_id,
1155 None => {
1156 return; }
1158 };
1159
1160 let account = if let Some(account) = cache_ref.account(&account_id) {
1161 account
1162 } else {
1163 log::error!("Cannot update order: no account registered for {account_id}");
1164 return;
1165 };
1166
1167 match account {
1168 AccountAny::Cash(cash_account) => {
1169 if !cash_account.base.calculate_account_state {
1170 return;
1171 }
1172 }
1173 AccountAny::Margin(margin_account) => {
1174 if !margin_account.base.calculate_account_state {
1175 return;
1176 }
1177 }
1178 }
1179
1180 match event {
1181 OrderEventAny::Accepted(_)
1182 | OrderEventAny::Canceled(_)
1183 | OrderEventAny::Rejected(_)
1184 | OrderEventAny::Updated(_)
1185 | OrderEventAny::Filled(_) => {}
1186 _ => {
1187 return;
1188 }
1189 }
1190
1191 let cache_ref = cache.borrow();
1192 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1193 order
1194 } else {
1195 log::error!(
1196 "Cannot update order: {} not found in the cache",
1197 event.client_order_id()
1198 );
1199 return; };
1201
1202 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1203 return; }
1205
1206 let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1207 instrument_id
1208 } else {
1209 log::error!(
1210 "Cannot update order: no instrument found for {}",
1211 event.instrument_id()
1212 );
1213 return;
1214 };
1215
1216 if let OrderEventAny::Filled(order_filled) = event {
1217 let _ = inner.borrow().accounts.update_balances(
1218 account.clone(),
1219 instrument.clone(),
1220 *order_filled,
1221 );
1222
1223 let mut portfolio_clone = Portfolio {
1224 clock: clock.clone(),
1225 cache: cache.clone(),
1226 inner: inner.clone(),
1227 config: PortfolioConfig::default(), };
1229
1230 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1231 Some(unrealized_pnl) => {
1232 inner
1233 .borrow_mut()
1234 .unrealized_pnls
1235 .insert(event.instrument_id(), unrealized_pnl);
1236 }
1237 None => {
1238 log::error!(
1239 "Failed to calculate unrealized PnL for instrument {}",
1240 event.instrument_id()
1241 );
1242 }
1243 }
1244 }
1245
1246 let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1247
1248 let account_state = inner.borrow_mut().accounts.update_orders(
1249 account,
1250 instrument.clone(),
1251 orders_open,
1252 clock.borrow().timestamp_ns(),
1253 );
1254
1255 let mut cache_ref = cache.borrow_mut();
1256 cache_ref.update_account(account.clone()).unwrap();
1257
1258 if let Some(account_state) = account_state {
1259 msgbus::publish(
1260 format!("events.account.{}", account.id()).into(),
1261 &account_state,
1262 );
1263 } else {
1264 log::debug!("Added pending calculation for {}", instrument.id());
1265 inner.borrow_mut().pending_calcs.insert(instrument.id());
1266 }
1267
1268 log::debug!("Updated {event}");
1269}
1270
1271fn update_position(
1272 cache: Rc<RefCell<Cache>>,
1273 clock: Rc<RefCell<dyn Clock>>,
1274 inner: Rc<RefCell<PortfolioState>>,
1275 event: &PositionEvent,
1276) {
1277 let instrument_id = event.instrument_id();
1278
1279 let positions_open: Vec<Position> = {
1280 let cache_ref = cache.borrow();
1281
1282 cache_ref
1283 .positions_open(None, Some(&instrument_id), None, None)
1284 .iter()
1285 .map(|o| (*o).clone())
1286 .collect()
1287 };
1288
1289 log::debug!("postion fresh from cache -> {positions_open:?}");
1290
1291 let mut portfolio_clone = Portfolio {
1292 clock: clock.clone(),
1293 cache: cache.clone(),
1294 inner: inner.clone(),
1295 config: PortfolioConfig::default(), };
1297
1298 portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1299
1300 let calculated_unrealized_pnl = portfolio_clone
1301 .calculate_unrealized_pnl(&instrument_id)
1302 .expect("Failed to calculate unrealized PnL");
1303 let calculated_realized_pnl = portfolio_clone
1304 .calculate_realized_pnl(&instrument_id)
1305 .expect("Failed to calculate realized PnL");
1306
1307 inner
1308 .borrow_mut()
1309 .unrealized_pnls
1310 .insert(event.instrument_id(), calculated_unrealized_pnl);
1311 inner
1312 .borrow_mut()
1313 .realized_pnls
1314 .insert(event.instrument_id(), calculated_realized_pnl);
1315
1316 let cache_ref = cache.borrow();
1317 let account = cache_ref.account(&event.account_id());
1318
1319 if let Some(AccountAny::Margin(margin_account)) = account {
1320 if !margin_account.calculate_account_state {
1321 return; }
1323
1324 let cache_ref = cache.borrow();
1325 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1326 instrument
1327 } else {
1328 log::error!("Cannot update position: no instrument found for {instrument_id}");
1329 return;
1330 };
1331
1332 let result = inner.borrow_mut().accounts.update_positions(
1333 margin_account,
1334 instrument.clone(),
1335 positions_open.iter().collect(),
1336 clock.borrow().timestamp_ns(),
1337 );
1338 let mut cache_ref = cache.borrow_mut();
1339 if let Some((margin_account, _)) = result {
1340 cache_ref
1341 .add_account(AccountAny::Margin(margin_account)) .unwrap();
1343 }
1344 } else if account.is_none() {
1345 log::error!(
1346 "Cannot update position: no account registered for {}",
1347 event.account_id()
1348 );
1349 }
1350}
1351
1352pub fn update_account(cache: Rc<RefCell<Cache>>, event: &AccountState) {
1353 let mut cache_ref = cache.borrow_mut();
1354
1355 if let Some(existing) = cache_ref.account(&event.account_id) {
1356 let mut account = existing.clone();
1357 account.apply(event.clone());
1358
1359 if let Err(e) = cache_ref.update_account(account.clone()) {
1360 log::error!("Failed to update account: {e}");
1361 return;
1362 }
1363 } else {
1364 let account = match AccountAny::from_events(vec![event.clone()]) {
1365 Ok(account) => account,
1366 Err(e) => {
1367 log::error!("Failed to create account: {e}");
1368 return;
1369 }
1370 };
1371
1372 if let Err(e) = cache_ref.add_account(account) {
1373 log::error!("Failed to add account: {e}");
1374 return;
1375 }
1376 }
1377
1378 log::info!("Updated {event}");
1379}