The compliance and regulatory module provides:
- Pre-trade Compliance
- Rule-based compliance checks
- Position limits monitoring
- Restricted securities handling
- Real-time validation
- Regulatory Reporting
- SEC filing generation
- Multi-regime support
- Filing calendar management
- Submission tracking
- ESG Compliance
- ESG constraint definition
- Portfolio-level monitoring
- Metric calculation
- Violation tracking
- Client Mandate Monitoring
- Mandate definition and tracking
- Constraint checking.
- Multi-client support
- Real-time monitoring
- Position Limits
- Security-level limits
- Portfolio-level limits
- Sector/industry limits
- Dynamic limit adjustment
"""
Compliance and Regulatory Module for Index Solutions
This module provides comprehensive compliance and regulatory capabilities including:
- Pre-trade compliance checks
- Regulatory reporting
- Position limits monitoring
- Restricted securities handling
- ESG constraints
- Client mandate monitoring
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Union, Tuple, Set
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from risk_analytics import RiskAnalytics
from trading_execution import TradeInstruction
from operation_integration import Position, PortfolioManagementSystem
class ComplianceStatus(Enum):
COMPLIANT = "compliant"
WARNING = "warning"
VIOLATION = "violation"
PENDING_REVIEW = "pending_review"
class RegulatoryRegime(Enum):
SEC = "sec"
FINRA = "finra"
ESMA = "esma"
FSA = "fsa"
@dataclass
class ComplianceRule:
rule_id: str
description: str
rule_type: str
parameters: Dict
priority: int
active: bool
@dataclass
class ESGConstraint:
constraint_type: str # 'exclusion', 'threshold', 'target'
metric: str
threshold: float
direction: str # 'above', 'below'
scope: str # 'portfolio', 'sector', 'security'
@dataclass
class ClientMandate:
mandate_id: str
client_id: str
constraints: List[Dict]
start_date: datetime
end_date: Optional[datetime]
active: bool
class PreTradeCompliance:
def __init__(self, risk_analytics: RiskAnalytics, pms: PortfolioManagementSystem):
self.risk_analytics = risk_analytics
self.pms = pms
self.rules: Dict[str, ComplianceRule] = {}
self.restricted_securities: Set[str] = set()
self.position_limits: Dict[str, float] = {}
def check_trade(self, trade: TradeInstruction) -> Dict:
"""Run pre-trade compliance checks"""
results = {
'overall_status': ComplianceStatus.COMPLIANT,
'checks': [],
'timestamp': datetime.now()
}
# Check restricted securities
if trade.symbol in self.restricted_securities:
results['checks'].append({
'rule_id': 'RESTRICTED_SECURITY',
'status': ComplianceStatus.VIOLATION,
'message': f"Security {trade.symbol} is restricted"
})
results['overall_status'] = ComplianceStatus.VIOLATION
# Check position limits
if trade.symbol in self.position_limits:
new_position = (self.pms.positions.get(trade.symbol, Position(
symbol=trade.symbol, quantity=0, cost_basis=0,
market_value=0, unrealized_pnl=0, last_update_time=datetime.now()
)).quantity + trade.quantity)
if abs(new_position) > self.position_limits[trade.symbol]:
results['checks'].append({
'rule_id': 'POSITION_LIMIT',
'status': ComplianceStatus.VIOLATION,
'message': f"Position limit exceeded for {trade.symbol}"
})
results['overall_status'] = ComplianceStatus.VIOLATION
# Run all active compliance rules
for rule in self.rules.values():
if rule.active:
check_result = self._evaluate_rule(rule, trade)
results['checks'].append(check_result)
if check_result['status'] == ComplianceStatus.VIOLATION:
results['overall_status'] = ComplianceStatus.VIOLATION
return results
def _evaluate_rule(self, rule: ComplianceRule, trade: TradeInstruction) -> Dict:
"""Evaluate a single compliance rule"""
if rule.rule_type == "concentration":
return self._check_concentration(rule, trade)
elif rule.rule_type == "risk_limit":
return self._check_risk_limit(rule, trade)
# Add more rule types as needed
return {
'rule_id': rule.rule_id,
'status': ComplianceStatus.PENDING_REVIEW,
'message': f"Rule type {rule.rule_type} not implemented"
}
class RegulatoryReporting:
def __init__(self):
self.reports: Dict[str, Dict] = {}
self.filing_calendar: Dict[str, datetime] = {}
def generate_sec_report(self, report_date: datetime) -> Dict:
"""Generate SEC regulatory report"""
report = {
'filing_date': report_date,
'report_type': 'SEC_13F',
'positions': [],
'status': 'draft'
}
# Implement SEC reporting logic
return report
def submit_regulatory_filing(self, report_id: str, regime: RegulatoryRegime) -> bool:
"""Submit regulatory filing"""
if report_id not in self.reports:
return False
report = self.reports[report_id]
# Implement submission logic for different regimes
return True
class ESGCompliance:
def __init__(self, risk_analytics: RiskAnalytics):
self.risk_analytics = risk_analytics
self.constraints: List[ESGConstraint] = []
self.esg_data: Dict[str, Dict] = {}
def add_constraint(self, constraint: ESGConstraint):
"""Add new ESG constraint"""
self.constraints.append(constraint)
def check_compliance(self, portfolio: Dict[str, Position]) -> Dict:
"""Check portfolio compliance with ESG constraints"""
results = {
'overall_status': ComplianceStatus.COMPLIANT,
'violations': [],
'metrics': {}
}
for constraint in self.constraints:
metric_value = self._calculate_esg_metric(constraint.metric, portfolio)
results['metrics'][constraint.metric] = metric_value
if constraint.direction == 'above' and metric_value < constraint.threshold:
results['violations'].append({
'constraint': constraint,
'actual_value': metric_value,
'threshold': constraint.threshold
})
results['overall_status'] = ComplianceStatus.VIOLATION
elif constraint.direction == 'below' and metric_value > constraint.threshold:
results['violations'].append({
'constraint': constraint,
'actual_value': metric_value,
'threshold': constraint.threshold
})
results['overall_status'] = ComplianceStatus.VIOLATION
return results
def _calculate_esg_metric(self, metric: str, portfolio: Dict[str, Position]) -> float:
"""Calculate ESG metric for portfolio"""
if metric not in self.esg_data:
return 0.0
weighted_sum = 0.0
total_weight = 0.0
for symbol, position in portfolio.items():
if symbol in self.esg_data:
weight = abs(position.market_value)
weighted_sum += weight * self.esg_data[symbol].get(metric, 0)
total_weight += weight
return weighted_sum / total_weight if total_weight > 0 else 0.0
class ClientMandateMonitoring:
def __init__(self, risk_analytics: RiskAnalytics, pms: PortfolioManagementSystem):
self.risk_analytics = risk_analytics
self.pms = pms
self.mandates: Dict[str, ClientMandate] = {}
def check_mandate_compliance(self, mandate_id: str) -> Dict:
"""Check compliance with client mandate"""
if mandate_id not in self.mandates:
return {'status': ComplianceStatus.VIOLATION, 'message': 'Mandate not found'}
mandate = self.mandates[mandate_id]
if not mandate.active:
return {'status': ComplianceStatus.VIOLATION, 'message': 'Mandate inactive'}
results = {
'mandate_id': mandate_id,
'status': ComplianceStatus.COMPLIANT,
'violations': [],
'timestamp': datetime.now()
}
for constraint in mandate.constraints:
check_result = self._check_constraint(constraint)
if check_result['status'] != ComplianceStatus.COMPLIANT:
results['violations'].append(check_result)
results['status'] = ComplianceStatus.VIOLATION
return results
def _check_constraint(self, constraint: Dict) -> Dict:
"""Check individual mandate constraint"""
constraint_type = constraint.get('type')
if constraint_type == 'allocation':
return self._check_allocation_constraint(constraint)
elif constraint_type == 'risk':
return self._check_risk_constraint(constraint)
elif constraint_type == 'esg':
return self._check_esg_constraint(constraint)
return {
'status': ComplianceStatus.PENDING_REVIEW,
'message': f"Unknown constraint type: {constraint_type}"
}
class ComplianceRegulatory:
def __init__(self, risk_analytics: RiskAnalytics, pms: PortfolioManagementSystem):
self.risk_analytics = risk_analytics
self.pms = pms
# Initialize components
self.pre_trade_compliance = PreTradeCompliance(risk_analytics, pms)
self.regulatory_reporting = RegulatoryReporting()
self.esg_compliance = ESGCompliance(risk_analytics)
self.mandate_monitoring = ClientMandateMonitoring(risk_analytics, pms)
def check_trade_compliance(self, trade: TradeInstruction) -> Dict:
"""Check trade compliance across all dimensions"""
results = {
'timestamp': datetime.now(),
'trade_id': id(trade),
'checks': {}
}
# Pre-trade compliance
results['checks']['pre_trade'] = self.pre_trade_compliance.check_trade(trade)
# ESG compliance
projected_portfolio = self._project_portfolio_with_trade(trade)
results['checks']['esg'] = self.esg_compliance.check_compliance(projected_portfolio)
# Client mandate compliance
for mandate_id in self.mandate_monitoring.mandates:
results['checks'][f'mandate_{mandate_id}'] = (
self.mandate_monitoring.check_mandate_compliance(mandate_id)
)
# Determine overall compliance status
results['overall_status'] = self._determine_overall_status(results['checks'])
return results
def _project_portfolio_with_trade(self, trade: TradeInstruction) -> Dict[str, Position]:
"""Project portfolio positions after trade execution"""
projected_portfolio = self.pms.positions.copy()
if trade.symbol in projected_portfolio:
position = projected_portfolio[trade.symbol]
projected_portfolio[trade.symbol] = Position(
symbol=position.symbol,
quantity=position.quantity + trade.quantity,
cost_basis=position.cost_basis,
market_value=position.market_value,
unrealized_pnl=position.unrealized_pnl,
last_update_time=datetime.now()
)
else:
projected_portfolio[trade.symbol] = Position(
symbol=trade.symbol,
quantity=trade.quantity,
cost_basis=0, # Will be updated after execution
market_value=0,
unrealized_pnl=0,
last_update_time=datetime.now()
)
return projected_portfolio
def _determine_overall_status(self, checks: Dict) -> ComplianceStatus:
"""Determine overall compliance status from all checks"""
if any(check.get('status') == ComplianceStatus.VIOLATION
for check in checks.values()):
return ComplianceStatus.VIOLATION
elif any(check.get('status') == ComplianceStatus.WARNING
for check in checks.values()):
return ComplianceStatus.WARNING
elif any(check.get('status') == ComplianceStatus.PENDING_REVIEW
for check in checks.values()):
return ComplianceStatus.PENDING_REVIEW
return ComplianceStatus.COMPLIANT