This framework is to apply dynamic portfolio rebalancing can include:
1. Dynamic Rebalancing Triggers: Drift threshold monitoring; Scheduled rebalancing; Cash buffer breaches; Corporate action triggers;
2. Tax-Aware Rebalancing: Tax lot tracking; Tax loss harvestin opportunities; Holding period consideration;
3. Cash Management: Minimum cash buffer maintenance; Cash flow handling from corporate actions; Trade adjustment to maintain cash positions;
4. Corporate Action Driven Rebalancing: Dividend processing; Stock splits; MergersSpinoffs;
5. Drift Monitoring and Threshold-Based Rebalancing: Continuous monitoring of position drift; Customizable drift thresholds; Trade size filters; Turnover limits;
Note the Portfolio Rebalancer class provides a clear interface for rebalancing operations. Improved Configuration: Using dataclasses (RebalancingParams and PortfolioConstraints) makes it easier to manage and validate configuration parameters. Easier Testing: The separated code is easier to test in isolation. portfolio_rebalancing.py:
import numpy as np
import pandas as pd
from typing import Dict, Optional, List
from dataclasses import dataclass
from scipy.optimize import minimize
@dataclass
class RebalancingParams:
"""Parameters for portfolio rebalancing"""
weight_threshold: float = 0.05 # 5% weight deviation threshold
risk_threshold: float = 0.10 # 10% risk deviation threshold
min_trade_size: float = 0.001 # 10bps minimum trade size
risk_tolerance: float = 0.02 # 2% tracking error tolerance
target_risk: Optional[float] = None
risk_measure: str = 'volatility' # 'volatility', 'var', or 'cvar'
@dataclass
class PortfolioConstraints:
"""Portfolio constraints for optimization"""
min_weights: Dict[str, float]
max_weights: Dict[str, float]
@classmethod
def default(cls, assets: List[str]):
"""Create default constraints (0-1 for all assets)"""
return cls(
min_weights={asset: 0.0 for asset in assets},
max_weights={asset: 1.0 for asset in assets}
)
class PortfolioRebalancer:
def __init__(self, risk_analytics):
"""
Initialize PortfolioRebalancer with risk analytics instance
Args:
risk_analytics: Instance of RiskAnalytics class
"""
self.risk_analytics = risk_analytics
def calculate_optimal_weights(self,
constraints: Optional[PortfolioConstraints] = None,
params: Optional[RebalancingParams] = None) -> Dict[str, float]:
"""
Calculate optimal portfolio weights based on risk targets
Args:
constraints: Portfolio constraints
params: Rebalancing parameters
Returns:
Dictionary of optimal weights
"""
if params is None:
params = RebalancingParams()
assets = self.risk_analytics.returns.columns
n_assets = len(assets)
if constraints is None:
constraints = PortfolioConstraints.default(assets)
# Initial weights (equal weight)
initial_weights = np.array([1/n_assets] * n_assets)
# Optimization constraints
opt_constraints = [
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1} # weights sum to 1
]
bounds = [(constraints.min_weights[asset], constraints.max_weights[asset])
for asset in assets]
# Objective function based on risk measure
def objective(weights):
weights_dict = dict(zip(assets, weights))
if params.risk_measure == 'volatility':
return np.sqrt(weights @ self.risk_analytics.cov_matrix @ weights)
elif params.risk_measure == 'var':
return self.risk_analytics.calculate_var(weights_dict)
elif params.risk_measure == 'cvar':
return self.risk_analytics.calculate_cvar(weights_dict)
# If target risk is specified, adjust objective
if params.target_risk is not None:
def risk_target_objective(weights):
current_risk = objective(weights)
return (current_risk - params.target_risk) ** 2
opt_result = minimize(risk_target_objective, initial_weights,
constraints=opt_constraints, bounds=bounds,
method='SLSQP')
else:
opt_result = minimize(objective, initial_weights,
constraints=opt_constraints, bounds=bounds,
method='SLSQP')
if not opt_result.success:
raise ValueError("Optimization failed to converge")
return dict(zip(assets, opt_result.x))
def generate_rebalancing_trades(self,
current_weights: Dict[str, float],
target_weights: Dict[str, float],
params: Optional[RebalancingParams] = None) -> Dict[str, float]:
"""
Generate trades needed to rebalance portfolio
Args:
current_weights: Current portfolio weights
target_weights: Target portfolio weights
params: Rebalancing parameters
Returns:
Dictionary of trades (positive for buy, negative for sell)
"""
if params is None:
params = RebalancingParams()
trades = {}
for asset in current_weights:
trade = target_weights.get(asset, 0) - current_weights.get(asset, 0)
if abs(trade) > params.min_trade_size:
trades[asset] = trade
return trades
def check_rebalancing_triggers(self,
current_weights: Dict[str, float],
target_weights: Dict[str, float],
params: Optional[RebalancingParams] = None) -> bool:
"""
Check if portfolio rebalancing is needed
Args:
current_weights: Current portfolio weights
target_weights: Target portfolio weights
params: Rebalancing parameters
Returns:
Boolean indicating if rebalancing is needed
"""
if params is None:
params = RebalancingParams()
# Check weight deviations
for asset in target_weights:
current = current_weights.get(asset, 0)
target = target_weights.get(asset, 0)
if abs(current - target) > params.weight_threshold:
return True
# Check risk deviation
current_risk = self.risk_analytics.calculate_var(current_weights)
target_risk = self.risk_analytics.calculate_var(target_weights)
if abs(current_risk - target_risk) / target_risk > params.risk_threshold:
return True
return False
def optimize_rebalancing(self,
current_weights: Dict[str, float],
target_weights: Dict[str, float],
transaction_costs: Dict[str, float],
params: Optional[RebalancingParams] = None) -> Dict[str, float]:
"""
Optimize rebalancing trades considering transaction costs
Args:
current_weights: Current portfolio weights
target_weights: Target portfolio weights
transaction_costs: Dictionary of transaction costs by asset
params: Rebalancing parameters
Returns:
Dictionary of optimal weights after rebalancing
"""
if params is None:
params = RebalancingParams()
assets = list(set(current_weights) | set(target_weights))
n_assets = len(assets)
# Initial weights (current portfolio)
initial_weights = np.array([current_weights.get(asset, 0) for asset in assets])
def objective(weights):
# Tracking error to target portfolio
tracking_diff = sum(abs(weights[i] - target_weights.get(assets[i], 0))
for i in range(n_assets))
# Transaction costs
trade_costs = sum(abs(weights[i] - current_weights.get(assets[i], 0))
* transaction_costs.get(assets[i], 0)
for i in range(n_assets))
return tracking_diff + trade_costs
constraints = [
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}, # weights sum to 1
{'type': 'ineq', 'fun': lambda x: params.risk_tolerance -
self.risk_analytics.calculate_tracking_error(dict(zip(assets, x)))} # risk constraint
]
bounds = [(0, 1) for _ in range(n_assets)]
opt_result = minimize(objective, initial_weights,
constraints=constraints, bounds=bounds,
method='SLSQP')
if not opt_result.success:
raise ValueError("Rebalancing optimization failed to converge")
return dict(zip(assets, opt_result.x))
def demo_portfolio_rebalancing():
"""Demonstrate usage of PortfolioRebalancer"""
from risk_analytics import RiskAnalytics
import numpy as np
import pandas as pd
# Sample data setup (similar to risk_analytics demo)
np.random.seed(42)
dates = pd.date_range(start='2020-01-01', end='2023-12-31', freq='D')
assets = ['AAPL', 'MSFT', 'JPM', 'JNJ', 'PG']
# Generate sample returns data
returns = pd.DataFrame(
np.random.normal(0.0005, 0.02, (len(dates), len(assets))),
index=dates,
columns=assets
)
# Initialize risk analytics
risk_analytics = RiskAnalytics(returns=returns)
# Initialize portfolio rebalancer
rebalancer = PortfolioRebalancer(risk_analytics)
# Current portfolio weights
current_weights = {
'AAPL': 0.35,
'MSFT': 0.25,
'JPM': 0.15,
'JNJ': 0.15,
'PG': 0.10
}
# Set up constraints
constraints = PortfolioConstraints(
min_weights={'AAPL': 0.1, 'MSFT': 0.1, 'JPM': 0.05, 'JNJ': 0.05, 'PG': 0.05},
max_weights={'AAPL': 0.4, 'MSFT': 0.4, 'JPM': 0.3, 'JNJ': 0.3, 'PG': 0.3}
)
# Set up parameters
params = RebalancingParams(
target_risk=0.02,
risk_measure='volatility',
weight_threshold=0.05,
risk_threshold=0.1,
min_trade_size=0.005
)
# Calculate optimal target weights
target_weights = rebalancer.calculate_optimal_weights(
constraints=constraints,
params=params
)
print("\nTarget Portfolio Weights:")
for asset, weight in target_weights.items():
print(f"{asset}: {weight:.2%}")
# Check if rebalancing is needed
need_rebalancing = rebalancer.check_rebalancing_triggers(
current_weights,
target_weights,
params=params
)
if need_rebalancing:
print("\nRebalancing needed!")
# Transaction costs
transaction_costs = {asset: 0.001 for asset in assets} # 10bps for all assets
# Optimize rebalancing
optimal_weights = rebalancer.optimize_rebalancing(
current_weights,
target_weights,
transaction_costs,
params=params
)
# Generate trades
trades = rebalancer.generate_rebalancing_trades(
current_weights,
optimal_weights,
params=params
)
print("\nRebalancing Trades Required:")
for asset, trade in trades.items():
print(f"{asset}: {trade:+.2%}")
else:
print("\nNo rebalancing needed")
if __name__ == "__main__":
demo_portfolio_rebalancing()
Here is another version to be more elaborate:
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
from risk_analytics import RiskAnalytics
from risk_monitor import RiskMonitor
from portfolio_optimization import PortfolioOptimizer
@dataclass
class Position:
"""Position details including tax lots"""
quantity: float
current_price: float
tax_lots: List[Tuple[datetime, float, float]] # (purchase_date, quantity, cost_basis)
currency: str = 'USD'
@dataclass
class CorporateAction:
"""Corporate action details"""
action_type: str # 'split', 'dividend', 'merger', 'spinoff'
effective_date: datetime
details: Dict
processed: bool = False
class PortfolioRebalancer:
"""Advanced portfolio rebalancing with tax awareness and corporate actions"""
def __init__(self, positions: Dict[str, Position], target_weights: Dict[str, float],
cash: float, risk_analytics: RiskAnalytics):
"""
Initialize portfolio rebalancer
Args:
positions: Dictionary of current positions
target_weights: Target portfolio weights
cash: Current cash balance
risk_analytics: RiskAnalytics instance
"""
self.positions = positions
self.target_weights = target_weights
self.cash = cash
self.risk_analytics = risk_analytics
self.corporate_actions = {} # Dictionary to store pending corporate actions
# Initialize monitoring
self.risk_monitor = RiskMonitor(update_interval=60) # 1-minute updates
# Rebalancing thresholds
self.drift_threshold = 0.05 # 5% drift threshold
self.tax_loss_threshold = -0.10 # 10% loss threshold for tax harvesting
self.min_trade_size = 1000 # Minimum trade size to avoid small trades
# Cash management
self.min_cash_buffer = 0.02 # 2% minimum cash buffer
self.max_cash_buffer = 0.05 # 5% maximum cash buffer
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger('PortfolioRebalancer')
def calculate_current_weights(self) -> Dict[str, float]:
"""Calculate current portfolio weights"""
total_value = sum(pos.quantity * pos.current_price
for pos in self.positions.values()) + self.cash
weights = {
ticker: (pos.quantity * pos.current_price) / total_value
for ticker, pos in self.positions.items()
}
weights['CASH'] = self.cash / total_value
return weights
def check_drift_triggers(self) -> List[str]:
"""Check for positions that have drifted beyond threshold"""
current_weights = self.calculate_current_weights()
drift_triggers = []
for ticker, target in self.target_weights.items():
current = current_weights.get(ticker, 0)
if abs(current - target) > self.drift_threshold:
drift_triggers.append(ticker)
self.logger.info(f"Drift trigger: {ticker} (Current: {current:.2%}, Target: {target:.2%})")
return drift_triggers
def identify_tax_loss_opportunities(self) -> List[Tuple[str, float]]:
"""Identify tax loss harvesting opportunities"""
opportunities = []
for ticker, position in self.positions.items():
for purchase_date, quantity, cost_basis in position.tax_lots:
unrealized_gain = (position.current_price - cost_basis) / cost_basis
if unrealized_gain < self.tax_loss_threshold:
opportunities.append((ticker, quantity))
self.logger.info(f"Tax loss opportunity: {ticker} "
f"(Loss: {unrealized_gain:.2%})")
return opportunities
def process_corporate_actions(self) -> List[str]:
"""Process pending corporate actions"""
processed = []
current_date = datetime.now()
for ticker, actions in self.corporate_actions.items():
for action in actions:
if not action.processed and action.effective_date <= current_date:
self._apply_corporate_action(ticker, action)
action.processed = True
processed.append(ticker)
self.logger.info(f"Processed corporate action: {ticker} "
f"({action.action_type})")
return processed
def _apply_corporate_action(self, ticker: str, action: CorporateAction):
"""Apply corporate action to position"""
position = self.positions[ticker]
if action.action_type == 'split':
split_ratio = action.details['ratio']
position.quantity *= split_ratio
position.current_price /= split_ratio
# Adjust tax lots
position.tax_lots = [
(date, qty * split_ratio, basis / split_ratio)
for date, qty, basis in position.tax_lots
]
elif action.action_type == 'dividend':
dividend_amount = action.details['amount'] * position.quantity
self.cash += dividend_amount
elif action.action_type == 'merger':
new_ticker = action.details['new_ticker']
conversion_ratio = action.details['ratio']
new_quantity = position.quantity * conversion_ratio
# Create new position
self.positions[new_ticker] = Position(
quantity=new_quantity,
current_price=action.details['new_price'],
tax_lots=[(lot[0], lot[1] * conversion_ratio, lot[2])
for lot in position.tax_lots]
)
# Remove old position
del self.positions[ticker]
elif action.action_type == 'spinoff':
new_ticker = action.details['new_ticker']
spinoff_ratio = action.details['ratio']
new_quantity = position.quantity * spinoff_ratio
# Create new position
self.positions[new_ticker] = Position(
quantity=new_quantity,
current_price=action.details['new_price'],
tax_lots=[(datetime.now(), new_quantity, action.details['new_price'])]
)
def generate_rebalancing_trades(self) -> Dict[str, float]:
"""Generate trades to rebalance portfolio"""
current_weights = self.calculate_current_weights()
total_value = sum(pos.quantity * pos.current_price
for pos in self.positions.values()) + self.cash
trades = {}
# Process corporate actions first
self.process_corporate_actions()
# Check tax loss harvesting opportunities
tax_loss_opportunities = self.identify_tax_loss_opportunities()
for ticker, quantity in tax_loss_opportunities:
trades[ticker] = -quantity # Sell tax lots with losses
# Calculate required trades for rebalancing
for ticker, target in self.target_weights.items():
if ticker in self.positions:
current_value = self.positions[ticker].quantity * \
self.positions[ticker].current_price
target_value = total_value * target
value_difference = target_value - current_value
# Only trade if difference is significant
if abs(value_difference) > self.min_trade_size:
trades[ticker] = value_difference / self.positions[ticker].current_price
# Adjust cash position
current_cash_ratio = self.cash / total_value
if current_cash_ratio < self.min_cash_buffer:
# Raise cash by trimming largest positions
cash_needed = (self.min_cash_buffer - current_cash_ratio) * total_value
self._raise_cash(cash_needed, trades)
elif current_cash_ratio > self.max_cash_buffer:
# Deploy excess cash proportionally
self._deploy_cash(trades)
return trades
def _raise_cash(self, cash_needed: float, trades: Dict[str, float]):
"""Modify trades to raise required cash"""
position_values = {
ticker: pos.quantity * pos.current_price
for ticker, pos in self.positions.items()
}
total_value = sum(position_values.values())
for ticker in sorted(position_values, key=position_values.get, reverse=True):
if cash_needed <= 0:
break
position = self.positions[ticker]
sell_value = min(
position_values[ticker] * 0.2, # Sell up to 20% of position
cash_needed
)
sell_quantity = sell_value / position.current_price
trades[ticker] = trades.get(ticker, 0) - sell_quantity
cash_needed -= sell_value
def _deploy_cash(self, trades: Dict[str, float]):
"""Modify trades to deploy excess cash"""
excess_cash = self.cash - (self.min_cash_buffer *
sum(pos.quantity * pos.current_price
for pos in self.positions.values()))
# Deploy proportionally to target weights
total_target = sum(self.target_weights.values())
for ticker, target in self.target_weights.items():
if ticker in self.positions:
deploy_value = excess_cash * (target / total_target)
deploy_quantity = deploy_value / self.positions[ticker].current_price
trades[ticker] = trades.get(ticker, 0) + deploy_quantity
def execute_rebalance(self) -> Tuple[Dict[str, float], float]:
"""Execute full rebalancing process"""
self.logger.info("Starting portfolio rebalance")
# Check triggers
drift_triggers = self.check_drift_triggers()
tax_opportunities = self.identify_tax_loss_opportunities()
corporate_actions = self.process_corporate_actions()
# Generate and execute trades if needed
if drift_triggers or tax_opportunities or corporate_actions:
trades = self.generate_rebalancing_trades()
# Calculate turnover
total_value = sum(pos.quantity * pos.current_price
for pos in self.positions.values()) + self.cash
turnover = sum(abs(trade * self.positions[ticker].current_price)
for ticker, trade in trades.items()) / total_value
self.logger.info(f"Rebalance complete. Turnover: {turnover:.2%}")
return trades, turnover
self.logger.info("No rebalancing required")
return {}, 0.0
def demo_portfolio_rebalancing():
"""Demonstrate portfolio rebalancing capabilities"""
# Sample portfolio setup
positions = {
'AAPL': Position(
quantity=100,
current_price=150.0,
tax_lots=[
(datetime(2022, 1, 1), 50, 120.0),
(datetime(2022, 6, 1), 50, 140.0)
]
),
'MSFT': Position(
quantity=200,
current_price=280.0,
tax_lots=[
(datetime(2022, 1, 1), 100, 250.0),
(datetime(2022, 6, 1), 100, 270.0)
]
),
'JPM': Position(
quantity=150,
current_price=140.0,
tax_lots=[
(datetime(2022, 1, 1), 75, 160.0),
(datetime(2022, 6, 1), 75, 150.0)
]
)
}
target_weights = {
'AAPL': 0.3,
'MSFT': 0.4,
'JPM': 0.3
}
# Initialize with sample data
returns = pd.DataFrame(
np.random.normal(0.001, 0.02, (252, 3)),
columns=['AAPL', 'MSFT', 'JPM']
)
risk_analytics = RiskAnalytics(returns)
rebalancer = PortfolioRebalancer(positions, target_weights, 10000, risk_analytics)
# Add sample corporate action
rebalancer.corporate_actions['AAPL'] = [
CorporateAction(
action_type='split',
effective_date=datetime.now() + timedelta(days=1),
details={'ratio': 4.0}
)
]
# Execute rebalance
print("\nPortfolio Rebalancing Demo:")
print("-" * 50)
print("\nInitial Portfolio State:")
current_weights = rebalancer.calculate_current_weights()
for ticker, weight in current_weights.items():
print(f"{ticker}: {weight:.2%}")
print("\nChecking Rebalancing Triggers...")
drift_triggers = rebalancer.check_drift_triggers()
tax_opportunities = rebalancer.identify_tax_loss_opportunities()
if drift_triggers:
print("\nDrift Triggers:", drift_triggers)
if tax_opportunities:
print("\nTax Loss Opportunities:", tax_opportunities)
trades, turnover = rebalancer.execute_rebalance()
if trades:
print("\nRebalancing Trades:")
for ticker, quantity in trades.items():
print(f"{ticker}: {quantity:+.2f} shares")
print(f"\nPortfolio Turnover: {turnover:.2%}")
else:
print("\nNo rebalancing required")
if __name__ == "__main__":
demo_portfolio_rebalancing()