The operations integration module provides:
- Order Management System (OMS)
- Order submission and tracking
- Execution management
- Order status updates
- Integration with trading execution module
- Portfolio Management System (PMS)
- Position tracking
- Cash management
- P&L calculation
- Risk analytics integration
- Custodian Reconciliation
- Position comparison
- Break identification
- Automated reconciliation
- Break resolution tracking
- Settlement Processing
- Trade settlement tracking
- Cash movement processing
- Failed settlement handling
- Settlement status updates
- Corporate Actions Processing
- Corporate action tracking
- Position adjustment
- Cash entitlement processing
- Ex-date based processing
"""
Operations Integration Module for Index Solutions
This module provides comprehensive operations capabilities including:
- Order Management System (OMS) integration
- Portfolio Management System (PMS) integration
- Custodian data reconciliation
- Settlement processing
- Corporate actions processing
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Union, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from risk_analytics import RiskAnalytics
from trading_execution import TradeInstruction, TradingExecution
class OrderStatus(Enum):
NEW = "new"
SENT = "sent"
PARTIALLY_FILLED = "partially_filled"
FILLED = "filled"
CANCELLED = "cancelled"
REJECTED = "rejected"
class SettlementStatus(Enum):
PENDING = "pending"
SETTLED = "settled"
FAILED = "failed"
@dataclass
class Order:
order_id: str
symbol: str
side: str
quantity: float
order_type: str
price: Optional[float]
status: OrderStatus
creation_time: datetime
last_update_time: datetime
execution_details: Dict
@dataclass
class Position:
symbol: str
quantity: float
cost_basis: float
market_value: float
unrealized_pnl: float
last_update_time: datetime
@dataclass
class CorporateAction:
action_type: str
symbol: str
ex_date: datetime
payment_date: datetime
rate: float
status: str
processed: bool
class OrderManagementSystem:
def __init__(self, trading_execution: TradingExecution):
self.orders: Dict[str, Order] = {}
self.trading_execution = trading_execution
self.order_callbacks = []
def submit_order(self, trade_instruction: TradeInstruction) -> str:
"""Submit new order to OMS"""
order_id = self._generate_order_id()
order = Order(
order_id=order_id,
symbol=trade_instruction.symbol,
side=trade_instruction.side,
quantity=trade_instruction.quantity,
order_type="MARKET" if trade_instruction.price_limit is None else "LIMIT",
price=trade_instruction.price_limit,
status=OrderStatus.NEW,
creation_time=datetime.now(),
last_update_time=datetime.now(),
execution_details={}
)
self.orders[order_id] = order
self._execute_order(order)
return order_id
def cancel_order(self, order_id: str) -> bool:
"""Cancel existing order"""
if order_id not in self.orders:
return False
order = self.orders[order_id]
if order.status in [OrderStatus.FILLED, OrderStatus.CANCELLED]:
return False
order.status = OrderStatus.CANCELLED
order.last_update_time = datetime.now()
self._notify_order_update(order)
return True
def _execute_order(self, order: Order):
"""Execute order through trading execution module"""
trade_instruction = TradeInstruction(
symbol=order.symbol,
side=order.side,
quantity=order.quantity,
price_limit=order.price,
urgency=0.5, # Default urgency
valid_venues=[], # Use all available venues
start_time=datetime.now(),
end_time=datetime.now() + timedelta(days=1)
)
execution_result = self.trading_execution.execute_trade(trade_instruction)
self._update_order_status(order, execution_result)
class PortfolioManagementSystem:
def __init__(self, risk_analytics: RiskAnalytics):
self.positions: Dict[str, Position] = {}
self.risk_analytics = risk_analytics
self.cash_balance: float = 0.0
self.pending_settlements: Dict[str, Dict] = {}
def update_position(self, symbol: str, quantity: float, price: float):
"""Update position after trade execution"""
if symbol not in self.positions:
self.positions[symbol] = Position(
symbol=symbol,
quantity=quantity,
cost_basis=price,
market_value=quantity * price,
unrealized_pnl=0.0,
last_update_time=datetime.now()
)
else:
position = self.positions[symbol]
new_quantity = position.quantity + quantity
new_cost_basis = ((position.cost_basis * position.quantity) +
(price * quantity)) / new_quantity
self.positions[symbol] = Position(
symbol=symbol,
quantity=new_quantity,
cost_basis=new_cost_basis,
market_value=new_quantity * price,
unrealized_pnl=(price - new_cost_basis) * new_quantity,
last_update_time=datetime.now()
)
def process_corporate_action(self, action: CorporateAction):
"""Process corporate action on positions"""
if action.symbol not in self.positions:
return
position = self.positions[action.symbol]
if action.action_type == "SPLIT":
position.quantity *= action.rate
position.cost_basis /= action.rate
elif action.action_type == "DIVIDEND":
self.cash_balance += position.quantity * action.rate
position.last_update_time = datetime.now()
action.processed = True
class CustodianReconciliation:
def __init__(self, pms: PortfolioManagementSystem):
self.pms = pms
self.custodian_positions: Dict[str, Position] = {}
self.reconciliation_breaks: List[Dict] = []
def reconcile_positions(self, custodian_data: pd.DataFrame) -> List[Dict]:
"""Reconcile positions with custodian data"""
self.reconciliation_breaks = []
# Update custodian positions
for _, row in custodian_data.iterrows():
self.custodian_positions[row['symbol']] = Position(
symbol=row['symbol'],
quantity=row['quantity'],
cost_basis=row['cost_basis'],
market_value=row['market_value'],
unrealized_pnl=row['unrealized_pnl'],
last_update_time=datetime.now()
)
# Compare positions
all_symbols = set(list(self.pms.positions.keys()) +
list(self.custodian_positions.keys()))
for symbol in all_symbols:
pms_position = self.pms.positions.get(symbol)
custodian_position = self.custodian_positions.get(symbol)
if self._check_position_break(pms_position, custodian_position):
self.reconciliation_breaks.append({
'symbol': symbol,
'pms_position': pms_position,
'custodian_position': custodian_position,
'break_time': datetime.now()
})
return self.reconciliation_breaks
def _check_position_break(self, pms_position: Optional[Position],
custodian_position: Optional[Position]) -> bool:
"""Check if there's a break between PMS and custodian positions"""
if pms_position is None or custodian_position is None:
return True
quantity_threshold = 0.0001 # For floating point comparison
return (abs(pms_position.quantity - custodian_position.quantity) > quantity_threshold or
abs(pms_position.cost_basis - custodian_position.cost_basis) > quantity_threshold)
class SettlementProcessor:
def __init__(self, pms: PortfolioManagementSystem):
self.pms = pms
self.settlements: Dict[str, Dict] = {}
def process_settlement(self, trade_id: str, settlement_details: Dict):
"""Process settlement for a trade"""
if trade_id not in self.pms.pending_settlements:
return False
settlement = settlement_details.copy()
settlement['status'] = SettlementStatus.SETTLED
settlement['settlement_time'] = datetime.now()
# Update cash balance
trade_amount = settlement['quantity'] * settlement['price']
if settlement['side'] == 'BUY':
self.pms.cash_balance -= trade_amount
else:
self.pms.cash_balance += trade_amount
self.settlements[trade_id] = settlement
del self.pms.pending_settlements[trade_id]
return True
class CorporateActionProcessor:
def __init__(self, pms: PortfolioManagementSystem):
self.pms = pms
self.corporate_actions: List[CorporateAction] = []
def add_corporate_action(self, action: CorporateAction):
"""Add new corporate action to be processed"""
self.corporate_actions.append(action)
self.corporate_actions.sort(key=lambda x: x.ex_date)
def process_pending_actions(self, current_date: datetime):
"""Process all pending corporate actions"""
for action in self.corporate_actions:
if not action.processed and action.ex_date <= current_date:
self.pms.process_corporate_action(action)
class OperationsIntegration:
def __init__(self, trading_execution: TradingExecution, risk_analytics: RiskAnalytics):
self.pms = PortfolioManagementSystem(risk_analytics)
self.oms = OrderManagementSystem(trading_execution)
self.custodian_reconciliation = CustodianReconciliation(self.pms)
self.settlement_processor = SettlementProcessor(self.pms)
self.corporate_action_processor = CorporateActionProcessor(self.pms)
def process_trade(self, trade_instruction: TradeInstruction) -> Dict:
"""Process trade from instruction to settlement"""
# Submit order
order_id = self.oms.submit_order(trade_instruction)
# Track settlement
self.pms.pending_settlements[order_id] = {
'trade_instruction': trade_instruction,
'status': SettlementStatus.PENDING
}
return {
'order_id': order_id,
'status': self.oms.orders[order_id].status
}
def end_of_day_processing(self, business_date: datetime):
"""Run end of day processing"""
# Process corporate actions
self.corporate_action_processor.process_pending_actions(business_date)
# Reconcile positions
custodian_data = self._fetch_custodian_data(business_date)
reconciliation_breaks = self.custodian_reconciliation.reconcile_positions(
custodian_data
)
# Process settlements
for trade_id, settlement in self._fetch_settlement_updates().items():
self.settlement_processor.process_settlement(trade_id, settlement)
return {
'reconciliation_breaks': reconciliation_breaks,
'pending_settlements': len(self.pms.pending_settlements),
'processed_corporate_actions': len([
a for a in self.corporate_action_processor.corporate_actions
if a.processed
])
}
def _fetch_custodian_data(self, business_date: datetime) -> pd.DataFrame:
"""Fetch position data from custodian"""
# Implement custodian API integration
pass
def _fetch_settlement_updates(self) -> Dict[str, Dict]:
"""Fetch settlement updates from settlement system"""
# Implement settlement system integration
pass