Comprehensive Quant Workflow 3 Portfolio Rebalancing

This framework is to apply dynamic portfolio rebalancing can include:

1. Dynamic Rebalancing Triggers: Drift threshold monitoring; Scheduled rebalancing; Cash buffer breaches; Corporate action triggers;

2. Tax-Aware Rebalancing: Tax lot tracking; Tax loss harvestin opportunities; Holding period consideration;

3. Cash Management: Minimum cash buffer maintenance; Cash flow handling from corporate actions; Trade adjustment to maintain cash positions;

4. Corporate Action Driven Rebalancing: Dividend processing; Stock splits; MergersSpinoffs;

5. Drift Monitoring and Threshold-Based Rebalancing: Continuous monitoring of position drift; Customizable drift thresholds; Trade size filters; Turnover limits;

Note the Portfolio Rebalancer class provides a clear interface for rebalancing operations. Improved Configuration: Using dataclasses (RebalancingParams and PortfolioConstraints) makes it easier to manage and validate configuration parameters. Easier Testing: The separated code is easier to test in isolation. portfolio_rebalancing.py:

import numpy as np
import pandas as pd
from typing import Dict, Optional, List
from dataclasses import dataclass
from scipy.optimize import minimize

@dataclass
class RebalancingParams:
    """Parameters for portfolio rebalancing"""
    weight_threshold: float = 0.05  # 5% weight deviation threshold
    risk_threshold: float = 0.10    # 10% risk deviation threshold
    min_trade_size: float = 0.001   # 10bps minimum trade size
    risk_tolerance: float = 0.02    # 2% tracking error tolerance
    target_risk: Optional[float] = None
    risk_measure: str = 'volatility'  # 'volatility', 'var', or 'cvar'

@dataclass
class PortfolioConstraints:
    """Portfolio constraints for optimization"""
    min_weights: Dict[str, float]
    max_weights: Dict[str, float]
    
    @classmethod
    def default(cls, assets: List[str]):
        """Create default constraints (0-1 for all assets)"""
        return cls(
            min_weights={asset: 0.0 for asset in assets},
            max_weights={asset: 1.0 for asset in assets}
        )

class PortfolioRebalancer:
    def __init__(self, risk_analytics):
        """
        Initialize PortfolioRebalancer with risk analytics instance
        
        Args:
            risk_analytics: Instance of RiskAnalytics class
        """
        self.risk_analytics = risk_analytics
        
    def calculate_optimal_weights(self, 
                                constraints: Optional[PortfolioConstraints] = None,
                                params: Optional[RebalancingParams] = None) -> Dict[str, float]:
        """
        Calculate optimal portfolio weights based on risk targets
        
        Args:
            constraints: Portfolio constraints
            params: Rebalancing parameters
            
        Returns:
            Dictionary of optimal weights
        """
        if params is None:
            params = RebalancingParams()
            
        assets = self.risk_analytics.returns.columns
        n_assets = len(assets)
        
        if constraints is None:
            constraints = PortfolioConstraints.default(assets)
            
        # Initial weights (equal weight)
        initial_weights = np.array([1/n_assets] * n_assets)
        
        # Optimization constraints
        opt_constraints = [
            {'type': 'eq', 'fun': lambda x: np.sum(x) - 1}  # weights sum to 1
        ]
        
        bounds = [(constraints.min_weights[asset], constraints.max_weights[asset]) 
                 for asset in assets]
        
        # Objective function based on risk measure
        def objective(weights):
            weights_dict = dict(zip(assets, weights))
            if params.risk_measure == 'volatility':
                return np.sqrt(weights @ self.risk_analytics.cov_matrix @ weights)
            elif params.risk_measure == 'var':
                return self.risk_analytics.calculate_var(weights_dict)
            elif params.risk_measure == 'cvar':
                return self.risk_analytics.calculate_cvar(weights_dict)
            
        # If target risk is specified, adjust objective
        if params.target_risk is not None:
            def risk_target_objective(weights):
                current_risk = objective(weights)
                return (current_risk - params.target_risk) ** 2
            
            opt_result = minimize(risk_target_objective, initial_weights,
                                constraints=opt_constraints, bounds=bounds,
                                method='SLSQP')
        else:
            opt_result = minimize(objective, initial_weights,
                                constraints=opt_constraints, bounds=bounds,
                                method='SLSQP')
            
        if not opt_result.success:
            raise ValueError("Optimization failed to converge")
            
        return dict(zip(assets, opt_result.x))
    
    def generate_rebalancing_trades(self, 
                                  current_weights: Dict[str, float],
                                  target_weights: Dict[str, float],
                                  params: Optional[RebalancingParams] = None) -> Dict[str, float]:
        """
        Generate trades needed to rebalance portfolio
        
        Args:
            current_weights: Current portfolio weights
            target_weights: Target portfolio weights
            params: Rebalancing parameters
            
        Returns:
            Dictionary of trades (positive for buy, negative for sell)
        """
        if params is None:
            params = RebalancingParams()
            
        trades = {}
        for asset in current_weights:
            trade = target_weights.get(asset, 0) - current_weights.get(asset, 0)
            if abs(trade) > params.min_trade_size:
                trades[asset] = trade
        return trades
    
    def check_rebalancing_triggers(self, 
                                 current_weights: Dict[str, float],
                                 target_weights: Dict[str, float],
                                 params: Optional[RebalancingParams] = None) -> bool:
        """
        Check if portfolio rebalancing is needed
        
        Args:
            current_weights: Current portfolio weights
            target_weights: Target portfolio weights
            params: Rebalancing parameters
            
        Returns:
            Boolean indicating if rebalancing is needed
        """
        if params is None:
            params = RebalancingParams()
            
        # Check weight deviations
        for asset in target_weights:
            current = current_weights.get(asset, 0)
            target = target_weights.get(asset, 0)
            if abs(current - target) > params.weight_threshold:
                return True
        
        # Check risk deviation
        current_risk = self.risk_analytics.calculate_var(current_weights)
        target_risk = self.risk_analytics.calculate_var(target_weights)
        if abs(current_risk - target_risk) / target_risk > params.risk_threshold:
            return True
            
        return False
    
    def optimize_rebalancing(self, 
                           current_weights: Dict[str, float],
                           target_weights: Dict[str, float],
                           transaction_costs: Dict[str, float],
                           params: Optional[RebalancingParams] = None) -> Dict[str, float]:
        """
        Optimize rebalancing trades considering transaction costs
        
        Args:
            current_weights: Current portfolio weights
            target_weights: Target portfolio weights
            transaction_costs: Dictionary of transaction costs by asset
            params: Rebalancing parameters
            
        Returns:
            Dictionary of optimal weights after rebalancing
        """
        if params is None:
            params = RebalancingParams()
            
        assets = list(set(current_weights) | set(target_weights))
        n_assets = len(assets)
        
        # Initial weights (current portfolio)
        initial_weights = np.array([current_weights.get(asset, 0) for asset in assets])
        
        def objective(weights):
            # Tracking error to target portfolio
            tracking_diff = sum(abs(weights[i] - target_weights.get(assets[i], 0))
                              for i in range(n_assets))
            
            # Transaction costs
            trade_costs = sum(abs(weights[i] - current_weights.get(assets[i], 0)) 
                            * transaction_costs.get(assets[i], 0)
                            for i in range(n_assets))
            
            return tracking_diff + trade_costs
        
        constraints = [
            {'type': 'eq', 'fun': lambda x: np.sum(x) - 1},  # weights sum to 1
            {'type': 'ineq', 'fun': lambda x: params.risk_tolerance - 
             self.risk_analytics.calculate_tracking_error(dict(zip(assets, x)))}  # risk constraint
        ]
        
        bounds = [(0, 1) for _ in range(n_assets)]
        
        opt_result = minimize(objective, initial_weights,
                            constraints=constraints, bounds=bounds,
                            method='SLSQP')
        
        if not opt_result.success:
            raise ValueError("Rebalancing optimization failed to converge")
            
        return dict(zip(assets, opt_result.x))

def demo_portfolio_rebalancing():
    """Demonstrate usage of PortfolioRebalancer"""
    from risk_analytics import RiskAnalytics
    import numpy as np
    import pandas as pd
    
    # Sample data setup (similar to risk_analytics demo)
    np.random.seed(42)
    dates = pd.date_range(start='2020-01-01', end='2023-12-31', freq='D')
    assets = ['AAPL', 'MSFT', 'JPM', 'JNJ', 'PG']
    
    # Generate sample returns data
    returns = pd.DataFrame(
        np.random.normal(0.0005, 0.02, (len(dates), len(assets))),
        index=dates,
        columns=assets
    )
    
    # Initialize risk analytics
    risk_analytics = RiskAnalytics(returns=returns)
    
    # Initialize portfolio rebalancer
    rebalancer = PortfolioRebalancer(risk_analytics)
    
    # Current portfolio weights
    current_weights = {
        'AAPL': 0.35,
        'MSFT': 0.25,
        'JPM': 0.15,
        'JNJ': 0.15,
        'PG': 0.10
    }
    
    # Set up constraints
    constraints = PortfolioConstraints(
        min_weights={'AAPL': 0.1, 'MSFT': 0.1, 'JPM': 0.05, 'JNJ': 0.05, 'PG': 0.05},
        max_weights={'AAPL': 0.4, 'MSFT': 0.4, 'JPM': 0.3, 'JNJ': 0.3, 'PG': 0.3}
    )
    
    # Set up parameters
    params = RebalancingParams(
        target_risk=0.02,
        risk_measure='volatility',
        weight_threshold=0.05,
        risk_threshold=0.1,
        min_trade_size=0.005
    )
    
    # Calculate optimal target weights
    target_weights = rebalancer.calculate_optimal_weights(
        constraints=constraints,
        params=params
    )
    
    print("\nTarget Portfolio Weights:")
    for asset, weight in target_weights.items():
        print(f"{asset}: {weight:.2%}")
    
    # Check if rebalancing is needed
    need_rebalancing = rebalancer.check_rebalancing_triggers(
        current_weights,
        target_weights,
        params=params
    )
    
    if need_rebalancing:
        print("\nRebalancing needed!")
        
        # Transaction costs
        transaction_costs = {asset: 0.001 for asset in assets}  # 10bps for all assets
        
        # Optimize rebalancing
        optimal_weights = rebalancer.optimize_rebalancing(
            current_weights,
            target_weights,
            transaction_costs,
            params=params
        )
        
        # Generate trades
        trades = rebalancer.generate_rebalancing_trades(
            current_weights,
            optimal_weights,
            params=params
        )
        
        print("\nRebalancing Trades Required:")
        for asset, trade in trades.items():
            print(f"{asset}: {trade:+.2%}")
    else:
        print("\nNo rebalancing needed")

if __name__ == "__main__":
    demo_portfolio_rebalancing() 

Here is another version to be more elaborate:

import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
from risk_analytics import RiskAnalytics
from risk_monitor import RiskMonitor
from portfolio_optimization import PortfolioOptimizer

@dataclass
class Position:
    """Position details including tax lots"""
    quantity: float
    current_price: float
    tax_lots: List[Tuple[datetime, float, float]]  # (purchase_date, quantity, cost_basis)
    currency: str = 'USD'

@dataclass
class CorporateAction:
    """Corporate action details"""
    action_type: str  # 'split', 'dividend', 'merger', 'spinoff'
    effective_date: datetime
    details: Dict
    processed: bool = False

class PortfolioRebalancer:
    """Advanced portfolio rebalancing with tax awareness and corporate actions"""
    
    def __init__(self, positions: Dict[str, Position], target_weights: Dict[str, float],
                 cash: float, risk_analytics: RiskAnalytics):
        """
        Initialize portfolio rebalancer
        
        Args:
            positions: Dictionary of current positions
            target_weights: Target portfolio weights
            cash: Current cash balance
            risk_analytics: RiskAnalytics instance
        """
        self.positions = positions
        self.target_weights = target_weights
        self.cash = cash
        self.risk_analytics = risk_analytics
        self.corporate_actions = {}  # Dictionary to store pending corporate actions
        
        # Initialize monitoring
        self.risk_monitor = RiskMonitor(update_interval=60)  # 1-minute updates
        
        # Rebalancing thresholds
        self.drift_threshold = 0.05  # 5% drift threshold
        self.tax_loss_threshold = -0.10  # 10% loss threshold for tax harvesting
        self.min_trade_size = 1000  # Minimum trade size to avoid small trades
        
        # Cash management
        self.min_cash_buffer = 0.02  # 2% minimum cash buffer
        self.max_cash_buffer = 0.05  # 5% maximum cash buffer
        
        # Setup logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger('PortfolioRebalancer')
    
    def calculate_current_weights(self) -> Dict[str, float]:
        """Calculate current portfolio weights"""
        total_value = sum(pos.quantity * pos.current_price 
                         for pos in self.positions.values()) + self.cash
        
        weights = {
            ticker: (pos.quantity * pos.current_price) / total_value
            for ticker, pos in self.positions.items()
        }
        weights['CASH'] = self.cash / total_value
        return weights
    
    def check_drift_triggers(self) -> List[str]:
        """Check for positions that have drifted beyond threshold"""
        current_weights = self.calculate_current_weights()
        drift_triggers = []
        
        for ticker, target in self.target_weights.items():
            current = current_weights.get(ticker, 0)
            if abs(current - target) > self.drift_threshold:
                drift_triggers.append(ticker)
                self.logger.info(f"Drift trigger: {ticker} (Current: {current:.2%}, Target: {target:.2%})")
        
        return drift_triggers
    
    def identify_tax_loss_opportunities(self) -> List[Tuple[str, float]]:
        """Identify tax loss harvesting opportunities"""
        opportunities = []
        
        for ticker, position in self.positions.items():
            for purchase_date, quantity, cost_basis in position.tax_lots:
                unrealized_gain = (position.current_price - cost_basis) / cost_basis
                
                if unrealized_gain < self.tax_loss_threshold:
                    opportunities.append((ticker, quantity))
                    self.logger.info(f"Tax loss opportunity: {ticker} "
                                   f"(Loss: {unrealized_gain:.2%})")
        
        return opportunities
    
    def process_corporate_actions(self) -> List[str]:
        """Process pending corporate actions"""
        processed = []
        current_date = datetime.now()
        
        for ticker, actions in self.corporate_actions.items():
            for action in actions:
                if not action.processed and action.effective_date <= current_date:
                    self._apply_corporate_action(ticker, action)
                    action.processed = True
                    processed.append(ticker)
                    self.logger.info(f"Processed corporate action: {ticker} "
                                   f"({action.action_type})")
        
        return processed
    
    def _apply_corporate_action(self, ticker: str, action: CorporateAction):
        """Apply corporate action to position"""
        position = self.positions[ticker]
        
        if action.action_type == 'split':
            split_ratio = action.details['ratio']
            position.quantity *= split_ratio
            position.current_price /= split_ratio
            # Adjust tax lots
            position.tax_lots = [
                (date, qty * split_ratio, basis / split_ratio)
                for date, qty, basis in position.tax_lots
            ]
        
        elif action.action_type == 'dividend':
            dividend_amount = action.details['amount'] * position.quantity
            self.cash += dividend_amount
        
        elif action.action_type == 'merger':
            new_ticker = action.details['new_ticker']
            conversion_ratio = action.details['ratio']
            new_quantity = position.quantity * conversion_ratio
            
            # Create new position
            self.positions[new_ticker] = Position(
                quantity=new_quantity,
                current_price=action.details['new_price'],
                tax_lots=[(lot[0], lot[1] * conversion_ratio, lot[2])
                         for lot in position.tax_lots]
            )
            
            # Remove old position
            del self.positions[ticker]
        
        elif action.action_type == 'spinoff':
            new_ticker = action.details['new_ticker']
            spinoff_ratio = action.details['ratio']
            new_quantity = position.quantity * spinoff_ratio
            
            # Create new position
            self.positions[new_ticker] = Position(
                quantity=new_quantity,
                current_price=action.details['new_price'],
                tax_lots=[(datetime.now(), new_quantity, action.details['new_price'])]
            )
    
    def generate_rebalancing_trades(self) -> Dict[str, float]:
        """Generate trades to rebalance portfolio"""
        current_weights = self.calculate_current_weights()
        total_value = sum(pos.quantity * pos.current_price 
                         for pos in self.positions.values()) + self.cash
        
        trades = {}
        
        # Process corporate actions first
        self.process_corporate_actions()
        
        # Check tax loss harvesting opportunities
        tax_loss_opportunities = self.identify_tax_loss_opportunities()
        for ticker, quantity in tax_loss_opportunities:
            trades[ticker] = -quantity  # Sell tax lots with losses
        
        # Calculate required trades for rebalancing
        for ticker, target in self.target_weights.items():
            if ticker in self.positions:
                current_value = self.positions[ticker].quantity * \
                              self.positions[ticker].current_price
                target_value = total_value * target
                value_difference = target_value - current_value
                
                # Only trade if difference is significant
                if abs(value_difference) > self.min_trade_size:
                    trades[ticker] = value_difference / self.positions[ticker].current_price
        
        # Adjust cash position
        current_cash_ratio = self.cash / total_value
        if current_cash_ratio < self.min_cash_buffer:
            # Raise cash by trimming largest positions
            cash_needed = (self.min_cash_buffer - current_cash_ratio) * total_value
            self._raise_cash(cash_needed, trades)
        elif current_cash_ratio > self.max_cash_buffer:
            # Deploy excess cash proportionally
            self._deploy_cash(trades)
        
        return trades
    
    def _raise_cash(self, cash_needed: float, trades: Dict[str, float]):
        """Modify trades to raise required cash"""
        position_values = {
            ticker: pos.quantity * pos.current_price
            for ticker, pos in self.positions.items()
        }
        total_value = sum(position_values.values())
        
        for ticker in sorted(position_values, key=position_values.get, reverse=True):
            if cash_needed <= 0:
                break
                
            position = self.positions[ticker]
            sell_value = min(
                position_values[ticker] * 0.2,  # Sell up to 20% of position
                cash_needed
            )
            
            sell_quantity = sell_value / position.current_price
            trades[ticker] = trades.get(ticker, 0) - sell_quantity
            cash_needed -= sell_value
    
    def _deploy_cash(self, trades: Dict[str, float]):
        """Modify trades to deploy excess cash"""
        excess_cash = self.cash - (self.min_cash_buffer * 
                                 sum(pos.quantity * pos.current_price 
                                     for pos in self.positions.values()))
        
        # Deploy proportionally to target weights
        total_target = sum(self.target_weights.values())
        for ticker, target in self.target_weights.items():
            if ticker in self.positions:
                deploy_value = excess_cash * (target / total_target)
                deploy_quantity = deploy_value / self.positions[ticker].current_price
                trades[ticker] = trades.get(ticker, 0) + deploy_quantity
    
    def execute_rebalance(self) -> Tuple[Dict[str, float], float]:
        """Execute full rebalancing process"""
        self.logger.info("Starting portfolio rebalance")
        
        # Check triggers
        drift_triggers = self.check_drift_triggers()
        tax_opportunities = self.identify_tax_loss_opportunities()
        corporate_actions = self.process_corporate_actions()
        
        # Generate and execute trades if needed
        if drift_triggers or tax_opportunities or corporate_actions:
            trades = self.generate_rebalancing_trades()
            
            # Calculate turnover
            total_value = sum(pos.quantity * pos.current_price 
                            for pos in self.positions.values()) + self.cash
            turnover = sum(abs(trade * self.positions[ticker].current_price)
                         for ticker, trade in trades.items()) / total_value
            
            self.logger.info(f"Rebalance complete. Turnover: {turnover:.2%}")
            return trades, turnover
        
        self.logger.info("No rebalancing required")
        return {}, 0.0

def demo_portfolio_rebalancing():
    """Demonstrate portfolio rebalancing capabilities"""
    # Sample portfolio setup
    positions = {
        'AAPL': Position(
            quantity=100,
            current_price=150.0,
            tax_lots=[
                (datetime(2022, 1, 1), 50, 120.0),
                (datetime(2022, 6, 1), 50, 140.0)
            ]
        ),
        'MSFT': Position(
            quantity=200,
            current_price=280.0,
            tax_lots=[
                (datetime(2022, 1, 1), 100, 250.0),
                (datetime(2022, 6, 1), 100, 270.0)
            ]
        ),
        'JPM': Position(
            quantity=150,
            current_price=140.0,
            tax_lots=[
                (datetime(2022, 1, 1), 75, 160.0),
                (datetime(2022, 6, 1), 75, 150.0)
            ]
        )
    }
    
    target_weights = {
        'AAPL': 0.3,
        'MSFT': 0.4,
        'JPM': 0.3
    }
    
    # Initialize with sample data
    returns = pd.DataFrame(
        np.random.normal(0.001, 0.02, (252, 3)),
        columns=['AAPL', 'MSFT', 'JPM']
    )
    
    risk_analytics = RiskAnalytics(returns)
    rebalancer = PortfolioRebalancer(positions, target_weights, 10000, risk_analytics)
    
    # Add sample corporate action
    rebalancer.corporate_actions['AAPL'] = [
        CorporateAction(
            action_type='split',
            effective_date=datetime.now() + timedelta(days=1),
            details={'ratio': 4.0}
        )
    ]
    
    # Execute rebalance
    print("\nPortfolio Rebalancing Demo:")
    print("-" * 50)
    
    print("\nInitial Portfolio State:")
    current_weights = rebalancer.calculate_current_weights()
    for ticker, weight in current_weights.items():
        print(f"{ticker}: {weight:.2%}")
    
    print("\nChecking Rebalancing Triggers...")
    drift_triggers = rebalancer.check_drift_triggers()
    tax_opportunities = rebalancer.identify_tax_loss_opportunities()
    
    if drift_triggers:
        print("\nDrift Triggers:", drift_triggers)
    if tax_opportunities:
        print("\nTax Loss Opportunities:", tax_opportunities)
    
    trades, turnover = rebalancer.execute_rebalance()
    
    if trades:
        print("\nRebalancing Trades:")
        for ticker, quantity in trades.items():
            print(f"{ticker}: {quantity:+.2f} shares")
        print(f"\nPortfolio Turnover: {turnover:.2%}")
    else:
        print("\nNo rebalancing required")

if __name__ == "__main__":
    demo_portfolio_rebalancing()

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.