1#![allow(dead_code)] use std::{
21 cell::RefCell,
22 collections::{HashMap, HashSet},
23 fmt::Debug,
24 rc::Rc,
25};
26
27use nautilus_analysis::analyzer::PortfolioAnalyzer;
28use nautilus_common::{
29 cache::Cache,
30 clock::Clock,
31 msgbus::{
32 self,
33 handler::{ShareableMessageHandler, TypedMessageHandler},
34 },
35};
36use nautilus_model::{
37 accounts::AccountAny,
38 data::{Bar, QuoteTick},
39 enums::{OrderSide, OrderType, PositionSide, PriceType},
40 events::{AccountState, OrderEventAny, position::PositionEvent},
41 identifiers::{InstrumentId, Venue},
42 instruments::{Instrument, InstrumentAny},
43 orders::{Order, OrderAny},
44 position::Position,
45 types::{Currency, Money, Price},
46};
47use rust_decimal::{Decimal, prelude::FromPrimitive};
48
49use crate::{config::PortfolioConfig, manager::AccountsManager};
50
51struct PortfolioState {
52 accounts: AccountsManager,
53 analyzer: PortfolioAnalyzer,
54 unrealized_pnls: HashMap<InstrumentId, Money>,
55 realized_pnls: HashMap<InstrumentId, Money>,
56 net_positions: HashMap<InstrumentId, Decimal>,
57 pending_calcs: HashSet<InstrumentId>,
58 bar_close_prices: HashMap<InstrumentId, Price>,
59 initialized: bool,
60}
61
62impl PortfolioState {
63 fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
64 Self {
65 accounts: AccountsManager::new(clock, cache),
66 analyzer: PortfolioAnalyzer::default(),
67 unrealized_pnls: HashMap::new(),
68 realized_pnls: HashMap::new(),
69 net_positions: HashMap::new(),
70 pending_calcs: HashSet::new(),
71 bar_close_prices: HashMap::new(),
72 initialized: false,
73 }
74 }
75
76 fn reset(&mut self) {
77 log::debug!("RESETTING");
78 self.net_positions.clear();
79 self.unrealized_pnls.clear();
80 self.realized_pnls.clear();
81 self.pending_calcs.clear();
82 self.analyzer.reset();
83 log::debug!("READY");
84 }
85}
86
87pub struct Portfolio {
88 pub(crate) clock: Rc<RefCell<dyn Clock>>,
89 pub(crate) cache: Rc<RefCell<Cache>>,
90 inner: Rc<RefCell<PortfolioState>>,
91 config: PortfolioConfig,
92}
93
94impl Debug for Portfolio {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct(stringify!(Portfolio)).finish()
97 }
98}
99
100impl Portfolio {
101 pub fn new(
102 cache: Rc<RefCell<Cache>>,
103 clock: Rc<RefCell<dyn Clock>>,
104 config: Option<PortfolioConfig>,
105 ) -> Self {
106 let inner = Rc::new(RefCell::new(PortfolioState::new(
107 clock.clone(),
108 cache.clone(),
109 )));
110 let config = config.unwrap_or_default();
111
112 Self::register_message_handlers(
113 cache.clone(),
114 clock.clone(),
115 inner.clone(),
116 config.bar_updates,
117 );
118
119 Self {
120 clock,
121 cache,
122 inner,
123 config,
124 }
125 }
126
127 fn register_message_handlers(
128 cache: Rc<RefCell<Cache>>,
129 clock: Rc<RefCell<dyn Clock>>,
130 inner: Rc<RefCell<PortfolioState>>,
131 bar_updates: bool,
132 ) {
133 let update_account_handler = {
134 let cache = cache.clone();
135 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
136 move |event: &AccountState| {
137 update_account(cache.clone(), event);
138 },
139 )))
140 };
141
142 let update_position_handler = {
143 let cache = cache.clone();
144 let clock = clock.clone();
145 let inner = inner.clone();
146 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
147 move |event: &PositionEvent| {
148 update_position(cache.clone(), clock.clone(), inner.clone(), event);
149 },
150 )))
151 };
152
153 let update_quote_handler = {
154 let cache = cache.clone();
155 let clock = clock.clone();
156 let inner = inner.clone();
157 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
158 move |quote: &QuoteTick| {
159 update_quote_tick(cache.clone(), clock.clone(), inner.clone(), quote);
160 },
161 )))
162 };
163
164 let update_bar_handler = {
165 let cache = cache.clone();
166 let clock = clock.clone();
167 let inner = inner.clone();
168 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
169 update_bar(cache.clone(), clock.clone(), inner.clone(), bar);
170 })))
171 };
172
173 let update_order_handler = {
174 let cache = cache;
175 let clock = clock.clone();
176 let inner = inner;
177 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
178 move |event: &OrderEventAny| {
179 update_order(cache.clone(), clock.clone(), inner.clone(), event);
180 },
181 )))
182 };
183
184 msgbus::register(
185 "Portfolio.update_account".into(),
186 update_account_handler.clone(),
187 );
188
189 msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
190 if bar_updates {
191 msgbus::subscribe("data.quotes.*EXTERNAL".into(), update_bar_handler, Some(10));
192 }
193 msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
194 msgbus::subscribe(
195 "events.position.*".into(),
196 update_position_handler,
197 Some(10),
198 );
199 msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
200 }
201
202 pub fn reset(&mut self) {
203 log::debug!("RESETTING");
204 self.inner.borrow_mut().reset();
205 log::debug!("READY");
206 }
207
208 #[must_use]
212 pub fn is_initialized(&self) -> bool {
213 self.inner.borrow().initialized
214 }
215
216 #[must_use]
220 pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
221 self.cache.borrow().account_for_venue(venue).map_or_else(
222 || {
223 log::error!("Cannot get balances locked: no account generated for {venue}");
224 HashMap::new()
225 },
226 AccountAny::balances_locked,
227 )
228 }
229
230 #[must_use]
234 pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
235 self.cache.borrow().account_for_venue(venue).map_or_else(
236 || {
237 log::error!(
238 "Cannot get initial (order) margins: no account registered for {venue}"
239 );
240 HashMap::new()
241 },
242 |account| match account {
243 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
244 AccountAny::Cash(_) => {
245 log::warn!("Initial margins not applicable for cash account");
246 HashMap::new()
247 }
248 },
249 )
250 }
251
252 #[must_use]
256 pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
257 self.cache.borrow().account_for_venue(venue).map_or_else(
258 || {
259 log::error!(
260 "Cannot get maintenance (position) margins: no account registered for {venue}"
261 );
262 HashMap::new()
263 },
264 |account| match account {
265 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
266 AccountAny::Cash(_) => {
267 log::warn!("Maintenance margins not applicable for cash account");
268 HashMap::new()
269 }
270 },
271 )
272 }
273
274 #[must_use]
278 pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
279 let instrument_ids = {
280 let cache = self.cache.borrow();
281 let positions = cache.positions(Some(venue), None, None, None);
282
283 if positions.is_empty() {
284 return HashMap::new(); }
286
287 let instrument_ids: HashSet<InstrumentId> =
288 positions.iter().map(|p| p.instrument_id).collect();
289
290 instrument_ids
291 };
292
293 let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
294
295 for instrument_id in instrument_ids {
296 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
297 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
299 continue;
300 }
301
302 match self.calculate_unrealized_pnl(&instrument_id) {
304 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
305 None => continue,
306 }
307 }
308
309 unrealized_pnls
310 .into_iter()
311 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
312 .collect()
313 }
314
315 #[must_use]
319 pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
320 let instrument_ids = {
321 let cache = self.cache.borrow();
322 let positions = cache.positions(Some(venue), None, None, None);
323
324 if positions.is_empty() {
325 return HashMap::new(); }
327
328 let instrument_ids: HashSet<InstrumentId> =
329 positions.iter().map(|p| p.instrument_id).collect();
330
331 instrument_ids
332 };
333
334 let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
335
336 for instrument_id in instrument_ids {
337 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
338 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
340 continue;
341 }
342
343 match self.calculate_realized_pnl(&instrument_id) {
345 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
346 None => continue,
347 }
348 }
349
350 realized_pnls
351 .into_iter()
352 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
353 .collect()
354 }
355
356 #[must_use]
357 pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
358 let cache = self.cache.borrow();
359 let account = if let Some(account) = cache.account_for_venue(venue) {
360 account
361 } else {
362 log::error!("Cannot calculate net exposures: no account registered for {venue}");
363 return None; };
365
366 let positions_open = cache.positions_open(Some(venue), None, None, None);
367 if positions_open.is_empty() {
368 return Some(HashMap::new()); }
370
371 let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
372
373 for position in positions_open {
374 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
375 instrument
376 } else {
377 log::error!(
378 "Cannot calculate net exposures: no instrument for {}",
379 position.instrument_id
380 );
381 return None; };
383
384 if position.side == PositionSide::Flat {
385 log::error!(
386 "Cannot calculate net exposures: position is flat for {}",
387 position.instrument_id
388 );
389 continue; }
391
392 let price = self.get_price(position)?;
393 let xrate = if let Some(xrate) =
394 self.calculate_xrate_to_base(instrument, account, position.entry)
395 {
396 xrate
397 } else {
398 log::error!(
399 "Cannot calculate net exposures: insufficient data for {}/{:?}",
401 instrument.settlement_currency(),
402 account.base_currency()
403 );
404 return None; };
406
407 let settlement_currency = account
408 .base_currency()
409 .unwrap_or_else(|| instrument.settlement_currency());
410
411 let net_exposure = instrument
412 .calculate_notional_value(position.quantity, price, None)
413 .as_f64()
414 * xrate;
415
416 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
417 .round()
418 / 10f64.powi(settlement_currency.precision.into());
419
420 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
421 }
422
423 Some(
424 net_exposures
425 .into_iter()
426 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
427 .collect(),
428 )
429 }
430
431 #[must_use]
432 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
433 if let Some(pnl) = self
434 .inner
435 .borrow()
436 .unrealized_pnls
437 .get(instrument_id)
438 .copied()
439 {
440 return Some(pnl);
441 }
442
443 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
444 self.inner
445 .borrow_mut()
446 .unrealized_pnls
447 .insert(*instrument_id, pnl);
448 Some(pnl)
449 }
450
451 #[must_use]
452 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
453 if let Some(pnl) = self
454 .inner
455 .borrow()
456 .realized_pnls
457 .get(instrument_id)
458 .copied()
459 {
460 return Some(pnl);
461 }
462
463 let pnl = self.calculate_realized_pnl(instrument_id)?;
464 self.inner
465 .borrow_mut()
466 .realized_pnls
467 .insert(*instrument_id, pnl);
468 Some(pnl)
469 }
470
471 #[must_use]
472 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
473 let cache = self.cache.borrow();
474 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
475 account
476 } else {
477 log::error!(
478 "Cannot calculate net exposure: no account registered for {}",
479 instrument_id.venue
480 );
481 return None;
482 };
483
484 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
485 instrument
486 } else {
487 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
488 return None;
489 };
490
491 let positions_open = cache.positions_open(
492 None, Some(instrument_id),
494 None,
495 None,
496 );
497
498 if positions_open.is_empty() {
499 return Some(Money::new(0.0, instrument.settlement_currency()));
500 }
501
502 let mut net_exposure = 0.0;
503
504 for position in positions_open {
505 let price = self.get_price(position)?;
506 let xrate = if let Some(xrate) =
507 self.calculate_xrate_to_base(instrument, account, position.entry)
508 {
509 xrate
510 } else {
511 log::error!(
512 "Cannot calculate net exposures: insufficient data for {}/{:?}",
514 instrument.settlement_currency(),
515 account.base_currency()
516 );
517 return None; };
519
520 let notional_value =
521 instrument.calculate_notional_value(position.quantity, price, None);
522 net_exposure += notional_value.as_f64() * xrate;
523 }
524
525 let settlement_currency = account
526 .base_currency()
527 .unwrap_or_else(|| instrument.settlement_currency());
528
529 Some(Money::new(net_exposure, settlement_currency))
530 }
531
532 #[must_use]
533 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
534 self.inner
535 .borrow()
536 .net_positions
537 .get(instrument_id)
538 .copied()
539 .unwrap_or(Decimal::ZERO)
540 }
541
542 #[must_use]
543 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
544 self.inner
545 .borrow()
546 .net_positions
547 .get(instrument_id)
548 .copied()
549 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
550 }
551
552 #[must_use]
553 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
554 self.inner
555 .borrow()
556 .net_positions
557 .get(instrument_id)
558 .copied()
559 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
560 }
561
562 #[must_use]
563 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
564 self.inner
565 .borrow()
566 .net_positions
567 .get(instrument_id)
568 .copied()
569 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
570 }
571
572 #[must_use]
573 pub fn is_completely_flat(&self) -> bool {
574 for net_position in self.inner.borrow().net_positions.values() {
575 if *net_position != Decimal::ZERO {
576 return false;
577 }
578 }
579 true
580 }
581
582 pub fn initialize_orders(&mut self) {
590 let mut initialized = true;
591 let orders_and_instruments = {
592 let cache = self.cache.borrow();
593 let all_orders_open = cache.orders_open(None, None, None, None);
594
595 let mut instruments_with_orders = Vec::new();
596 let mut instruments = HashSet::new();
597
598 for order in &all_orders_open {
599 instruments.insert(order.instrument_id());
600 }
601
602 for instrument_id in instruments {
603 if let Some(instrument) = cache.instrument(&instrument_id) {
604 let orders = cache
605 .orders_open(None, Some(&instrument_id), None, None)
606 .into_iter()
607 .cloned()
608 .collect::<Vec<OrderAny>>();
609 instruments_with_orders.push((instrument.clone(), orders));
610 } else {
611 log::error!(
612 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
613 );
614 initialized = false;
615 break;
616 }
617 }
618 instruments_with_orders
619 };
620
621 for (instrument, orders_open) in &orders_and_instruments {
622 let mut cache = self.cache.borrow_mut();
623 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
624 account
625 } else {
626 log::error!(
627 "Cannot update initial (order) margin: no account registered for {}",
628 instrument.id().venue
629 );
630 initialized = false;
631 break;
632 };
633
634 let result = self.inner.borrow_mut().accounts.update_orders(
635 account,
636 instrument.clone(),
637 orders_open.iter().collect(),
638 self.clock.borrow().timestamp_ns(),
639 );
640
641 match result {
642 Some((updated_account, _)) => {
643 cache.add_account(updated_account).unwrap(); }
645 None => {
646 initialized = false;
647 }
648 }
649 }
650
651 let total_orders = orders_and_instruments
652 .into_iter()
653 .map(|(_, orders)| orders.len())
654 .sum::<usize>();
655
656 log::info!(
657 "Initialized {} open order{}",
658 total_orders,
659 if total_orders == 1 { "" } else { "s" }
660 );
661
662 self.inner.borrow_mut().initialized = initialized;
663 }
664
665 pub fn initialize_positions(&mut self) {
671 self.inner.borrow_mut().unrealized_pnls.clear();
672 self.inner.borrow_mut().realized_pnls.clear();
673 let all_positions_open: Vec<Position>;
674 let mut instruments = HashSet::new();
675 {
676 let cache = self.cache.borrow();
677 all_positions_open = cache
678 .positions_open(None, None, None, None)
679 .into_iter()
680 .cloned()
681 .collect();
682 for position in &all_positions_open {
683 instruments.insert(position.instrument_id);
684 }
685 }
686
687 let mut initialized = true;
688
689 for instrument_id in instruments {
690 let positions_open: Vec<Position> = {
691 let cache = self.cache.borrow();
692 cache
693 .positions_open(None, Some(&instrument_id), None, None)
694 .into_iter()
695 .cloned()
696 .collect()
697 };
698
699 self.update_net_position(&instrument_id, positions_open);
700
701 let calculated_unrealized_pnl = self
702 .calculate_unrealized_pnl(&instrument_id)
703 .expect("Failed to calculate unrealized PnL");
704 let calculated_realized_pnl = self
705 .calculate_realized_pnl(&instrument_id)
706 .expect("Failed to calculate realized PnL");
707
708 self.inner
709 .borrow_mut()
710 .unrealized_pnls
711 .insert(instrument_id, calculated_unrealized_pnl);
712 self.inner
713 .borrow_mut()
714 .realized_pnls
715 .insert(instrument_id, calculated_realized_pnl);
716
717 let cache = self.cache.borrow();
718 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
719 account
720 } else {
721 log::error!(
722 "Cannot update maintenance (position) margin: no account registered for {}",
723 instrument_id.venue
724 );
725 initialized = false;
726 break;
727 };
728
729 let account = match account {
730 AccountAny::Cash(_) => continue,
731 AccountAny::Margin(margin_account) => margin_account,
732 };
733
734 let mut cache = self.cache.borrow_mut();
735 let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
736 instrument
737 } else {
738 log::error!(
739 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
740 );
741 initialized = false;
742 break;
743 };
744
745 let result = self.inner.borrow_mut().accounts.update_positions(
746 account,
747 instrument.clone(),
748 self.cache
749 .borrow()
750 .positions_open(None, Some(&instrument_id), None, None),
751 self.clock.borrow().timestamp_ns(),
752 );
753
754 match result {
755 Some((updated_account, _)) => {
756 cache
757 .add_account(AccountAny::Margin(updated_account)) .unwrap();
759 }
760 None => {
761 initialized = false;
762 }
763 }
764 }
765
766 let open_count = all_positions_open.len();
767 self.inner.borrow_mut().initialized = initialized;
768 log::info!(
769 "Initialized {} open position{}",
770 open_count,
771 if open_count == 1 { "" } else { "s" }
772 );
773 }
774
775 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
779 update_quote_tick(
780 self.cache.clone(),
781 self.clock.clone(),
782 self.inner.clone(),
783 quote,
784 );
785 }
786
787 pub fn update_bar(&mut self, bar: &Bar) {
791 update_bar(
792 self.cache.clone(),
793 self.clock.clone(),
794 self.inner.clone(),
795 bar,
796 );
797 }
798
799 pub fn update_account(&mut self, event: &AccountState) {
801 update_account(self.cache.clone(), event);
802 }
803
804 pub fn update_order(&mut self, event: &OrderEventAny) {
808 update_order(
809 self.cache.clone(),
810 self.clock.clone(),
811 self.inner.clone(),
812 event,
813 );
814 }
815
816 pub fn update_position(&mut self, event: &PositionEvent) {
820 update_position(
821 self.cache.clone(),
822 self.clock.clone(),
823 self.inner.clone(),
824 event,
825 );
826 }
827
828 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
831 let mut net_position = Decimal::ZERO;
832
833 for open_position in positions_open {
834 log::debug!("open_position: {open_position}");
835 net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
836 }
837
838 let existing_position = self.net_position(instrument_id);
839 if existing_position != net_position {
840 self.inner
841 .borrow_mut()
842 .net_positions
843 .insert(*instrument_id, net_position);
844 log::info!("{instrument_id} net_position={net_position}");
845 }
846 }
847
848 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
849 let cache = self.cache.borrow();
850 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
851 account
852 } else {
853 log::error!(
854 "Cannot calculate unrealized PnL: no account registered for {}",
855 instrument_id.venue
856 );
857 return None;
858 };
859
860 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
861 instrument
862 } else {
863 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
864 return None;
865 };
866
867 let currency = account
868 .base_currency()
869 .unwrap_or_else(|| instrument.settlement_currency());
870
871 let positions_open = cache.positions_open(
872 None, Some(instrument_id),
874 None,
875 None,
876 );
877
878 if positions_open.is_empty() {
879 return Some(Money::new(0.0, currency));
880 }
881
882 let mut total_pnl = 0.0;
883
884 for position in positions_open {
885 if position.instrument_id != *instrument_id {
886 continue; }
888
889 if position.side == PositionSide::Flat {
890 continue; }
892
893 let price = if let Some(price) = self.get_price(position) {
894 price
895 } else {
896 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
897 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
898 return None; };
900
901 let mut pnl = position.unrealized_pnl(price).as_f64();
902
903 if let Some(base_currency) = account.base_currency() {
904 let xrate = if let Some(xrate) =
905 self.calculate_xrate_to_base(instrument, account, position.entry)
906 {
907 xrate
908 } else {
909 log::error!(
910 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
912 instrument.settlement_currency(),
913 base_currency
914 );
915 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
916 return None; };
918
919 let scale = 10f64.powi(currency.precision.into());
920 pnl = ((pnl * xrate) * scale).round() / scale;
921 }
922
923 total_pnl += pnl;
924 }
925
926 Some(Money::new(total_pnl, currency))
927 }
928
929 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
930 let cache = self.cache.borrow();
931 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
932 account
933 } else {
934 log::error!(
935 "Cannot calculate realized PnL: no account registered for {}",
936 instrument_id.venue
937 );
938 return None;
939 };
940
941 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
942 instrument
943 } else {
944 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
945 return None;
946 };
947
948 let currency = account
949 .base_currency()
950 .unwrap_or_else(|| instrument.settlement_currency());
951
952 let positions = cache.positions(
953 None, Some(instrument_id),
955 None,
956 None,
957 );
958
959 if positions.is_empty() {
960 return Some(Money::new(0.0, currency));
961 }
962
963 let mut total_pnl = 0.0;
964
965 for position in positions {
966 if position.instrument_id != *instrument_id {
967 continue; }
969
970 if position.realized_pnl.is_none() {
971 continue; }
973
974 let mut pnl = position.realized_pnl?.as_f64();
975
976 if let Some(base_currency) = account.base_currency() {
977 let xrate = if let Some(xrate) =
978 self.calculate_xrate_to_base(instrument, account, position.entry)
979 {
980 xrate
981 } else {
982 log::error!(
983 "Cannot calculate realized PnL: insufficient data for {}/{}",
985 instrument.settlement_currency(),
986 base_currency
987 );
988 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
989 return None; };
991
992 let scale = 10f64.powi(currency.precision.into());
993 pnl = ((pnl * xrate) * scale).round() / scale;
994 }
995
996 total_pnl += pnl;
997 }
998
999 Some(Money::new(total_pnl, currency))
1000 }
1001
1002 fn get_price(&self, position: &Position) -> Option<Price> {
1003 let price_type = match position.side {
1004 PositionSide::Long => PriceType::Bid,
1005 PositionSide::Short => PriceType::Ask,
1006 _ => panic!("invalid `PositionSide`, was {}", position.side),
1007 };
1008
1009 let cache = self.cache.borrow();
1010
1011 let instrument_id = &position.instrument_id;
1012 cache
1013 .price(instrument_id, price_type)
1014 .or_else(|| cache.price(instrument_id, PriceType::Last))
1015 .or_else(|| {
1016 self.inner
1017 .borrow()
1018 .bar_close_prices
1019 .get(instrument_id)
1020 .copied()
1021 })
1022 }
1023
1024 fn calculate_xrate_to_base(
1025 &self,
1026 instrument: &InstrumentAny,
1027 account: &AccountAny,
1028 side: OrderSide,
1029 ) -> Option<f64> {
1030 if !self.config.convert_to_account_base_currency {
1031 return Some(1.0); }
1033
1034 match account.base_currency() {
1035 None => Some(1.0), Some(base_currency) => {
1037 let cache = self.cache.borrow();
1038
1039 if self.config.use_mark_xrates {
1040 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1041 }
1042
1043 let price_type = if side == OrderSide::Buy {
1044 PriceType::Bid
1045 } else {
1046 PriceType::Ask
1047 };
1048
1049 cache.get_xrate(
1050 instrument.id().venue,
1051 instrument.settlement_currency(),
1052 base_currency,
1053 price_type,
1054 )
1055 }
1056 }
1057 }
1058}
1059
1060fn update_quote_tick(
1062 cache: Rc<RefCell<Cache>>,
1063 clock: Rc<RefCell<dyn Clock>>,
1064 inner: Rc<RefCell<PortfolioState>>,
1065 quote: &QuoteTick,
1066) {
1067 update_instrument_id(cache, clock.clone(), inner, "e.instrument_id);
1068}
1069
1070fn update_bar(
1071 cache: Rc<RefCell<Cache>>,
1072 clock: Rc<RefCell<dyn Clock>>,
1073 inner: Rc<RefCell<PortfolioState>>,
1074 bar: &Bar,
1075) {
1076 let instrument_id = bar.bar_type.instrument_id();
1077 inner
1078 .borrow_mut()
1079 .bar_close_prices
1080 .insert(instrument_id, bar.close);
1081 update_instrument_id(cache, clock.clone(), inner, &instrument_id);
1082}
1083
1084fn update_instrument_id(
1085 cache: Rc<RefCell<Cache>>,
1086 clock: Rc<RefCell<dyn Clock>>,
1087 inner: Rc<RefCell<PortfolioState>>,
1088 instrument_id: &InstrumentId,
1089) {
1090 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1091
1092 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1093 return;
1094 }
1095
1096 let result_init;
1097 let mut result_maint = None;
1098
1099 let account = {
1100 let cache_ref = cache.borrow();
1101 let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1102 account
1103 } else {
1104 log::error!(
1105 "Cannot update tick: no account registered for {}",
1106 instrument_id.venue
1107 );
1108 return;
1109 };
1110
1111 let mut cache_ref = cache.borrow_mut();
1112 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1113 instrument.clone()
1114 } else {
1115 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1116 return;
1117 };
1118
1119 let orders_open: Vec<OrderAny> = cache_ref
1121 .orders_open(None, Some(instrument_id), None, None)
1122 .iter()
1123 .map(|o| (*o).clone())
1124 .collect();
1125
1126 let positions_open: Vec<Position> = cache_ref
1127 .positions_open(None, Some(instrument_id), None, None)
1128 .iter()
1129 .map(|p| (*p).clone())
1130 .collect();
1131
1132 result_init = inner.borrow().accounts.update_orders(
1133 account,
1134 instrument.clone(),
1135 orders_open.iter().collect(),
1136 clock.borrow().timestamp_ns(),
1137 );
1138
1139 if let AccountAny::Margin(margin_account) = account {
1140 result_maint = inner.borrow().accounts.update_positions(
1141 margin_account,
1142 instrument,
1143 positions_open.iter().collect(),
1144 clock.borrow().timestamp_ns(),
1145 );
1146 }
1147
1148 if let Some((ref updated_account, _)) = result_init {
1149 cache_ref.add_account(updated_account.clone()).unwrap(); }
1151 account.clone()
1152 };
1153
1154 let mut portfolio_clone = Portfolio {
1155 clock: clock.clone(),
1156 cache,
1157 inner: inner.clone(),
1158 config: PortfolioConfig::default(), };
1160
1161 let result_unrealized_pnl: Option<Money> =
1162 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1163
1164 if result_init.is_some()
1165 && (matches!(account, AccountAny::Cash(_))
1166 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1167 {
1168 inner.borrow_mut().pending_calcs.remove(instrument_id);
1169 if inner.borrow().pending_calcs.is_empty() {
1170 inner.borrow_mut().initialized = true;
1171 }
1172 }
1173}
1174
1175fn update_order(
1176 cache: Rc<RefCell<Cache>>,
1177 clock: Rc<RefCell<dyn Clock>>,
1178 inner: Rc<RefCell<PortfolioState>>,
1179 event: &OrderEventAny,
1180) {
1181 let cache_ref = cache.borrow();
1182 let account_id = match event.account_id() {
1183 Some(account_id) => account_id,
1184 None => {
1185 return; }
1187 };
1188
1189 let account = if let Some(account) = cache_ref.account(&account_id) {
1190 account
1191 } else {
1192 log::error!("Cannot update order: no account registered for {account_id}");
1193 return;
1194 };
1195
1196 match account {
1197 AccountAny::Cash(cash_account) => {
1198 if !cash_account.base.calculate_account_state {
1199 return;
1200 }
1201 }
1202 AccountAny::Margin(margin_account) => {
1203 if !margin_account.base.calculate_account_state {
1204 return;
1205 }
1206 }
1207 }
1208
1209 match event {
1210 OrderEventAny::Accepted(_)
1211 | OrderEventAny::Canceled(_)
1212 | OrderEventAny::Rejected(_)
1213 | OrderEventAny::Updated(_)
1214 | OrderEventAny::Filled(_) => {}
1215 _ => {
1216 return;
1217 }
1218 }
1219
1220 let cache_ref = cache.borrow();
1221 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1222 order
1223 } else {
1224 log::error!(
1225 "Cannot update order: {} not found in the cache",
1226 event.client_order_id()
1227 );
1228 return; };
1230
1231 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1232 return; }
1234
1235 let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1236 instrument_id
1237 } else {
1238 log::error!(
1239 "Cannot update order: no instrument found for {}",
1240 event.instrument_id()
1241 );
1242 return;
1243 };
1244
1245 if let OrderEventAny::Filled(order_filled) = event {
1246 let _ = inner.borrow().accounts.update_balances(
1247 account.clone(),
1248 instrument.clone(),
1249 *order_filled,
1250 );
1251
1252 let mut portfolio_clone = Portfolio {
1253 clock: clock.clone(),
1254 cache: cache.clone(),
1255 inner: inner.clone(),
1256 config: PortfolioConfig::default(), };
1258
1259 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1260 Some(unrealized_pnl) => {
1261 inner
1262 .borrow_mut()
1263 .unrealized_pnls
1264 .insert(event.instrument_id(), unrealized_pnl);
1265 }
1266 None => {
1267 log::error!(
1268 "Failed to calculate unrealized PnL for instrument {}",
1269 event.instrument_id()
1270 );
1271 }
1272 }
1273 }
1274
1275 let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1276
1277 let account_state = inner.borrow_mut().accounts.update_orders(
1278 account,
1279 instrument.clone(),
1280 orders_open,
1281 clock.borrow().timestamp_ns(),
1282 );
1283
1284 let mut cache_ref = cache.borrow_mut();
1285 cache_ref.update_account(account.clone()).unwrap();
1286
1287 if let Some(account_state) = account_state {
1288 msgbus::publish(
1289 format!("events.account.{}", account.id()).into(),
1290 &account_state,
1291 );
1292 } else {
1293 log::debug!("Added pending calculation for {}", instrument.id());
1294 inner.borrow_mut().pending_calcs.insert(instrument.id());
1295 }
1296
1297 log::debug!("Updated {event}");
1298}
1299
1300fn update_position(
1301 cache: Rc<RefCell<Cache>>,
1302 clock: Rc<RefCell<dyn Clock>>,
1303 inner: Rc<RefCell<PortfolioState>>,
1304 event: &PositionEvent,
1305) {
1306 let instrument_id = event.instrument_id();
1307
1308 let positions_open: Vec<Position> = {
1309 let cache_ref = cache.borrow();
1310
1311 cache_ref
1312 .positions_open(None, Some(&instrument_id), None, None)
1313 .iter()
1314 .map(|o| (*o).clone())
1315 .collect()
1316 };
1317
1318 log::debug!("postion fresh from cache -> {positions_open:?}");
1319
1320 let mut portfolio_clone = Portfolio {
1321 clock: clock.clone(),
1322 cache: cache.clone(),
1323 inner: inner.clone(),
1324 config: PortfolioConfig::default(), };
1326
1327 portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1328
1329 let calculated_unrealized_pnl = portfolio_clone
1330 .calculate_unrealized_pnl(&instrument_id)
1331 .expect("Failed to calculate unrealized PnL");
1332 let calculated_realized_pnl = portfolio_clone
1333 .calculate_realized_pnl(&instrument_id)
1334 .expect("Failed to calculate realized PnL");
1335
1336 inner
1337 .borrow_mut()
1338 .unrealized_pnls
1339 .insert(event.instrument_id(), calculated_unrealized_pnl);
1340 inner
1341 .borrow_mut()
1342 .realized_pnls
1343 .insert(event.instrument_id(), calculated_realized_pnl);
1344
1345 let cache_ref = cache.borrow();
1346 let account = cache_ref.account(&event.account_id());
1347
1348 if let Some(AccountAny::Margin(margin_account)) = account {
1349 if !margin_account.calculate_account_state {
1350 return; }
1352
1353 let cache_ref = cache.borrow();
1354 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1355 instrument
1356 } else {
1357 log::error!("Cannot update position: no instrument found for {instrument_id}");
1358 return;
1359 };
1360
1361 let result = inner.borrow_mut().accounts.update_positions(
1362 margin_account,
1363 instrument.clone(),
1364 positions_open.iter().collect(),
1365 clock.borrow().timestamp_ns(),
1366 );
1367 let mut cache_ref = cache.borrow_mut();
1368 if let Some((margin_account, _)) = result {
1369 cache_ref
1370 .add_account(AccountAny::Margin(margin_account)) .unwrap();
1372 }
1373 } else if account.is_none() {
1374 log::error!(
1375 "Cannot update position: no account registered for {}",
1376 event.account_id()
1377 );
1378 }
1379}
1380
1381pub fn update_account(cache: Rc<RefCell<Cache>>, event: &AccountState) {
1382 let mut cache_ref = cache.borrow_mut();
1383
1384 if let Some(existing) = cache_ref.account(&event.account_id) {
1385 let mut account = existing.clone();
1386 account.apply(event.clone());
1387
1388 if let Err(e) = cache_ref.update_account(account.clone()) {
1389 log::error!("Failed to update account: {e}");
1390 return;
1391 }
1392 } else {
1393 let account = match AccountAny::from_events(vec![event.clone()]) {
1394 Ok(account) => account,
1395 Err(e) => {
1396 log::error!("Failed to create account: {e}");
1397 return;
1398 }
1399 };
1400
1401 if let Err(e) = cache_ref.add_account(account) {
1402 log::error!("Failed to add account: {e}");
1403 return;
1404 }
1405 }
1406
1407 log::info!("Updated {event}");
1408}