Breakdown 3: DAG Airflow Concrete

airflow_project/
├── dags/
│ └── index_rebalancing_dag.py # Our main DAG file
├── include/
│ └── calculations/
│ │ └── engine.py # Python code for the heavy lifting
│ └── utils/
│ └── reporting.py # Code for generating reports
└── config/
└── indices/
└── sp500_rules.yaml # Our version-controlled index definition

The workflow would look like

graph TD
A[get_index_config] –> B[collect_and_validate_market_data];
B –> C[calculate_preliminary_rebalance];
C –> D[generate_analyst_report];
D –> E[notify_committee_for_review];
E –> F[wait_for_committee_approval];
F –> G{check_approval_status};
G — Approved –> H[publish_changes_to_clients];
G — Rejected –> I[handle_rejection];
I –> J[notify_manager_of_rejection];
H –> K[final_reconciliation];
J –> K;

the python DAG file is

# In dags/index_rebalancing_dag.py

import yaml
from pendulum import datetime
from airflow.models.dag import DAG
from airflow.decorators import task
from airflow.operators.email import EmailOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models.baseoperator import chain

# --- DAG Definition ---
with DAG(
    dag_id="sp500_quarterly_rebalance",
    start_date=datetime(2023, 1, 1),
    # In reality, this would be a custom Timetable for the "3rd Friday of the quarter"
    # For simplicity, we use a cron string that runs weekly.
    schedule_interval="0 0 * * 5", 
    catchup=False,
    tags=["index", "finance"],
    doc_md="""
    ### S&P 500 Quarterly Rebalance
    This DAG performs the quarterly rebalancing for the S&P 500 index.
    1. Fetches the index rules from the config file.
    2. Collects and validates market data.
    3. Calculates preliminary changes (adds, drops, weights).
    4. **Pauses** and waits for the Index Committee to approve the changes via an external signal.
    5. Publishes the final changes upon approval.
    """,
) as dag:

    # --- Task Definitions ---

    @task
    def get_index_config(index_id: str):
        """
        Loads the index methodology from its version-controlled YAML file.
        This task is idempotent.
        """
        import yaml
        # In a real setup, this path would be part of the Airflow deployment.
        config_path = f"/opt/airflow/config/indices/{index_id}_rules.yaml"
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        
        print(f"Loaded configuration for {config['name']}")
        return config

    @task
    def collect_and_validate_market_data(config: dict, run_id: str):
        """
        Fetches market data (pricing, fundamentals) for the universe.
        Validates data quality (e.g., no missing prices, no extreme moves).
        Saves data to a persistent location (like S3) for this specific run.
        """
        print(f"Fetching data for universe: {config['universe']}")
        
        # --- In a real system ---
        # from data_vendors import Bloomberg, Refinitiv
        # universe_securities = Bloomberg.get_exchange_members(config['universe']['exchange_filter'])
        # market_data = Refinitiv.get_bulk_data(universe_securities, as_of_date=...)
        # DataQuality.run_checks(market_data)
        
        # --- For this example ---
        print("Pretending to fetch and validate terabytes of data...")
        # Data is saved to a unique path for this run to ensure idempotency.
        output_path = f"/data/raw/{run_id}/market_data.parquet"
        print(f"Data saved to {output_path}")
        return output_path

    @task
    def calculate_preliminary_rebalance(config: dict, market_data_path: str, run_id: str):
        """
        The core calculation engine.
        Applies all eligibility rules from the config to the market data.
        Calculates new weights based on the weighting scheme.
        """
        # Here we would import our more complex, testable calculation code
        # from include.calculations.engine import run_rebalance_engine
        
        print("Applying eligibility criteria from config file:")
        # --- Illustrative Logic ---
        # import pandas as pd
        # df = pd.read_parquet(market_data_path)
        # for criterion in config['eligibility_criteria']:
        #     print(f"  - Applying rule: {criterion['rule']} with min value {criterion.get('min')}")
        #     # df = apply_filter(df, criterion) -> Real logic here
        #
        # new_weights = calculate_weights(df, config['weighting'])
        # -------------------------

        print("Calculation complete. Generating preliminary Adds & Drops list.")
        preliminary_results_path = f"/data/preliminary/{run_id}/rebalance_results.json"
        print(f"Results saved to {preliminary_results_path}")
        return preliminary_results_path

    @task
    def generate_analyst_report(preliminary_results_path: str, run_id: str):
        """
        Creates a human-readable PDF/HTML report for the Index Committee.
        """
        # from include.utils.reporting import create_pdf_summary
        
        print("Generating summary report for analyst review...")
        # report = create_pdf_summary(preliminary_results_path)
        # report.save(f"/reports/{run_id}_committee_review.pdf")
        
        report_url = f"http://internal-dashboard.example.com/reports/{run_id}"
        print(f"Report available at: {report_url}")
        return report_url

    notify_committee_for_review = EmailOperator(
        task_id="notify_committee_for_review",
        to="index-committee@example.com",
        subject="Action Required: S&P 500 Preliminary Rebalance for {{ ds }}",
        html_content="""
        <h3>Review Required</h3>
        The preliminary rebalance calculation for the S&P 500 is complete.
        <p>Please review the report and provide your approval or rejection in the Index Management Dashboard.</p>
        <p>Report Link: {{ task_instance.xcom_pull(task_ids='generate_analyst_report') }}</p>
        <p>DAG Run ID: {{ run_id }}</p>
        """,
    )
    
    # This is the "Human-in-the-Loop" step. The DAG will pause here.
    # It waits for an external DAG named 'committee_approval_dag' to run for the same
    # logical date and have a task named 'signal_approval' succeed.
    # The 'committee_approval_dag' would be triggered by a button in a UI.
    wait_for_approval = ExternalTaskSensor(
        task_id='wait_for_committee_approval',
        external_dag_id='committee_approval_dag',
        external_task_id='signal_approval',
        allowed_states=['success'],
        failed_states=['failed', 'skipped'],
        mode='poke',
        poke_interval=600, # Check every 10 minutes
        timeout=86400 * 2 # Timeout after 2 days
    )

    @task.branch
    def check_approval_status(run_id: str):
        """
        After the sensor succeeds, this task checks the final outcome.
        (e.g., by calling an API on the approval dashboard).
        It then tells the DAG which path to take.
        """
        # from include.utils.api import get_approval_status
        # status = get_approval_status(run_id) -> returns "APPROVED" or "REJECTED"
        status = "APPROVED" # Hardcoded for example
        
        if status == "APPROVED":
            return "publish_changes_to_clients"
        else:
            return "handle_rejection"

    @task
    def publish_changes_to_clients(preliminary_results_path: str):
        """
        Takes the final, approved changes and publishes them to all subscribers
        (e.g., via FTP, API, flat files).
        """
        print("PUBLISHING APPROVED CHANGES TO CLIENTS...")
        # publish_to_ftp('ftp.client1.com', preliminary_results_path)
        # publish_to_api('api.client2.com', preliminary_results_path)
        print("Publication complete.")

    @task
    def handle_rejection():
        """
        The 'rejection' branch. Triggers alerts and ends the workflow.
        """
        print("REBALANCE REJECTED BY COMMITTEE. Notifying index manager.")

    notify_manager_of_rejection = EmailOperator(
        task_id="notify_manager_of_rejection",
        to="index-manager@example.com",
        subject="URGENT: S&P 500 Rebalance REJECTED for {{ ds }}",
        html_content="""
        The quarterly rebalance for run_id {{ run_id }} was rejected by the committee.
        Please investigate immediately.
        """,
    )
    
    @task(trigger_rule="one_success")
    def final_reconciliation():
        """This task runs after either a successful publication or a handled rejection."""
        print("Workflow finished. Running final checks.")


    # --- Define DAG Dependencies ---
    
    # 1. Get Config
    config_data = get_index_config(index_id="sp500")

    # 2. Run Main Calculation Pipeline
    market_data = collect_and_validate_market_data(config_data, run_id="{{ run_id }}")
    prelim_results = calculate_preliminary_rebalance(config_data, market_data, run_id="{{ run_id }}")
    report = generate_analyst_report(prelim_results, run_id="{{ run_id }}")

    # 3. Notify and Wait for Human
    chain(report, notify_committee_for_review, wait_for_approval)

    # 4. Branch based on Approval
    approval_branch = check_approval_status(run_id="{{ run_id }}")
    
    # 5. Define Post-Approval Paths
    rejection_path = handle_rejection()
    chain(rejection_path, notify_manager_of_rejection, final_reconciliation())

    publication_path = publish_changes_to_clients(prelim_results)
    chain(publication_path, final_reconciliation())
    
    # Connect the branch task to its possible outcomes
    approval_branch >> [publication_path, rejection_path]

Now the details of calculation engine python file

# in include/calculations/engine.py

import pandas as pd
from typing import Dict, Any, List

# --- Helper Functions for Specific Rules ---
# In a real system, these would be far more complex.

def _apply_market_cap_filter(df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
    """Filters DataFrame based on minimum market capitalization."""
    min_market_cap = rule.get("min", 0)
    print(f"  -> Applying market cap filter: > ${min_market_cap / 1_000_000_000:.2f}B")
    return df[df["market_cap_usd"] >= min_market_cap].copy()

def _apply_public_float_filter(df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
    """Filters DataFrame based on minimum public float percentage."""
    min_float = rule.get("min", 0)
    print(f"  -> Applying public float filter: > {min_float:.0%}")
    return df[df["public_float_percent"] >= min_float].copy()
    
def _apply_liquidity_filter(df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
    """Filters DataFrame based on liquidity rules."""
    min_ratio = rule.get("min_ratio_adv_to_fmc", 0)
    print(f"  -> Applying liquidity filter (ADV/FMC ratio): > {min_ratio}")
    df["adv_to_fmc_ratio"] = df["annual_dollar_volume"] / (df["market_cap_usd"] * df["public_float_percent"])
    return df[df["adv_to_fmc_ratio"] >= min_ratio].copy()

# A dispatcher to map rule names from YAML to functions
RULE_DISPATCHER = {
    "market_cap_usd": _apply_market_cap_filter,
    "public_float_percent": _apply_public_float_filter,
    "liquidity": _apply_liquidity_filter,
    # "profitability" would require fetching fundamental data, omitted for brevity
}

def _get_current_constituents() -> set:
    """
    Placeholder function. In reality, this would fetch the current list of 
    constituents from a database or a flat file source to compare against.
    """
    # For this example, we'll hardcode a small, simplified "current" list
    return {"AAPL", "MSFT", "GOOG", "AMZN", "META"} 

def _calculate_weights(df: pd.DataFrame, weighting_config: Dict[str, Any]) -> pd.Series:
    """Calculates weights for the final constituents based on the config."""
    if weighting_config["scheme"] != "float_adjusted_market_cap":
        raise NotImplementedError("Only float-adjusted market cap weighting is supported.")

    df["float_adjusted_market_cap"] = df["market_cap_usd"] * df["public_float_percent"]
    total_fmc = df["float_adjusted_market_cap"].sum()
    weights = df["float_adjusted_market_cap"] / total_fmc
    
    # Apply capping if specified
    if "capping" in weighting_config:
        max_weight = weighting_config["capping"].get("max_weight_percent", 100) / 100.0
        weights[weights > max_weight] = max_weight
        # Note: A real capping mechanism requires iterative redistribution of excess weight.
        # This is a simplified version.
        
    return weights

# --- Main Engine Entry Point ---

def run_rebalance_engine(config: Dict[str, Any], market_data_path: str) -> Dict[str, Any]:
    """
    Main function to run the index rebalance calculation.
    
    Args:
        config: The parsed YAML configuration dictionary for the index.
        market_data_path: Path to the Parquet file containing market data.

    Returns:
        A dictionary containing the full rebalance results.
    """
    print("--- Starting Rebalance Calculation Engine ---")
    
    # 1. Load Data
    # In a real system, this would be a large dataset. We'll create a sample.
    # df = pd.read_parquet(market_data_path)
    mock_data = {
        'ticker': ['AAPL', 'MSFT', 'GOOG', 'NVDA', 'TSLA', 'META', 'BRK.A', 'V', 'JNJ', 'WMT'],
        'market_cap_usd': [2.8e12, 2.5e12, 1.7e12, 1.2e12, 800e9, 700e9, 750e9, 500e9, 450e9, 430e9],
        'public_float_percent': [0.85, 0.88, 0.80, 0.75, 0.60, 0.90, 0.45, 0.95, 0.98, 0.55],
        'annual_dollar_volume': [20e12, 18e12, 10e12, 30e12, 40e12, 15e12, 0.5e12, 5e12, 2e12, 3e12],
        'sector': ['Tech', 'Tech', 'Tech', 'Tech', 'Consumer', 'Tech', 'Finance', 'Finance', 'Health', 'Retail']
    }
    df = pd.DataFrame(mock_data)
    df_filtered = df.copy()
    print(f"Loaded {len(df_filtered)} securities for initial screening.")
    
    # 2. Apply Eligibility Criteria
    print("\nApplying eligibility criteria from config:")
    for rule in config["eligibility_criteria"]:
        rule_name = rule["rule"]
        if rule_name in RULE_DISPATCHER:
            df_filtered = RULE_DISPATCHER[rule_name](df_filtered, rule)
        else:
            print(f"  -> WARNING: No function found for rule '{rule_name}'. Skipping.")
    print(f"\n{len(df_filtered)} securities passed all eligibility filters.")

    # 3. Apply Selection Logic
    target_count = config["selection"]["target_constituent_count"]
    # For S&P 500, selection is by market cap, then committee review. 
    # The engine selects the top N candidates by market cap.
    df_eligible = df_filtered.sort_values(by="market_cap_usd", ascending=False)
    df_final_constituents = df_eligible.head(target_count)
    print(f"Selected top {len(df_final_constituents)} candidates for the index.")

    # 4. Determine Adds and Drops
    current_constituents = _get_current_constituents()
    new_constituents = set(df_final_constituents['ticker'])
    
    adds = sorted(list(new_constituents - current_constituents))
    drops = sorted(list(current_constituents - new_constituents))
    
    turnover_count = len(adds) + len(drops)
    turnover_percent = (turnover_count / target_count) * 100

    # 5. Calculate Final Weights
    final_weights = _calculate_weights(df_final_constituents, config["weighting"])
    
    # 6. Assemble Final Results
    results = {
        "metadata": {
            "index_id": config["index_id"],
            "rebalance_date": pd.Timestamp.now().isoformat(),
        },
        "summary": {
            "constituent_count": len(df_final_constituents),
            "adds_count": len(adds),
            "drops_count": len(drops),
            "turnover_percent": round(turnover_percent, 2),
        },
        "adds": adds,
        "drops": drops,
        "final_constituents": df_final_constituents[['ticker', 'market_cap_usd', 'sector']].to_dict(orient='records'),
        "weights": final_weights.to_dict(),
    }
    
    print("--- Rebalance Calculation Engine Finished ---")
    return results

And the report python file

# in include/utils/reporting.py

from typing import Dict, Any
import pandas as pd
import json

def create_committee_report(rebalance_results: Dict[str, Any], config: Dict[str, Any], output_path: str):
    """
    Generates a Markdown report summarizing the rebalance for committee review.

    Args:
        rebalance_results: The dictionary output from the calculation engine.
        config: The index configuration dictionary.
        output_path: The path to save the final report.
    """
    print("--- Generating Committee Review Report ---")
    
    # Extract key data for easier access
    meta = rebalance_results['metadata']
    summary = rebalance_results['summary']
    
    # Start building the Markdown string
    report_md = f"""
# Index Rebalance Review: {config['name']} ({config['index_id']})

**Report Generated:** {meta['rebalance_date']}
**Rebalance Effective:** As per schedule: {config['schedule']['effective_on']} of months {config['schedule']['effective_months']}

---

## 1. Executive Summary

| Metric | Value |
| :--- | :--- |
| **Target Constituent Count** | {config['selection']['target_constituent_count']} |
| **Final Constituent Count** | {summary['constituent_count']} |
| **Constituents Added** | {summary['adds_count']} |
| **Constituents Dropped** | {summary['drops_count']} |
| **Turnover** | {summary['turnover_percent']}% |

---

## 2. Adds & Drops

### Added to Index ({summary['adds_count']})
"""
    if summary['adds_count'] > 0:
        adds_df = pd.DataFrame([
            c for c in rebalance_results['final_constituents'] 
            if c['ticker'] in rebalance_results['adds']
        ])
        report_md += adds_df.to_markdown(index=False)
    else:
        report_md += "_No securities added._"

    report_md += "\n\n### Dropped from Index ({summary['drops_count']})\n"
    if summary['drops_count'] > 0:
        # Note: To show stats for dropped stocks, the engine would need to keep them.
        # For this example, we just list the tickers.
        for ticker in rebalance_results['drops']:
            report_md += f"- {ticker}\n"
    else:
        report_md += "_No securities dropped._"

    report_md += "\n\n---\n\n## 3. Post-Rebalance Sector Weights\n\n"
    
    constituents_df = pd.DataFrame(rebalance_results['final_constituents'])
    weights_s = pd.Series(rebalance_results['weights'], name='weight').reset_index().rename(columns={'index': 'ticker'})
    
    # Merge weights back with sector info
    df_with_weights = pd.merge(constituents_df, weights_s, on='ticker')
    
    sector_weights = df_with_weights.groupby('sector')['weight'].sum().sort_values(ascending=False)
    
    sector_weights_df = (sector_weights * 100).round(2).reset_index()
    sector_weights_df.columns = ['Sector', 'Weight (%)']
    
    report_md += sector_weights_df.to_markdown(index=False)
    
    report_md += "\n\n--- END OF REPORT ---"
    
    # Save the report to a file
    with open(output_path, 'w') as f:
        f.write(report_md)
        
    print(f"Report successfully saved to {output_path}")

# Example of how this might be called and tested locally
if __name__ == '__main__':
    # This block allows you to test the reporting function directly
    mock_results_file = 'mock_rebalance_results.json'
    mock_config_file = '../../config/indices/sp500_rules.yaml' # Adjust path for local run
    
    # 1. Run the engine to get mock results
    with open(mock_config_file, 'r') as f:
        test_config = yaml.safe_load(f)

    # In a real test, you'd have a static mock data file
    test_results = run_rebalance_engine(test_config, market_data_path="/path/to/mock_data.parquet")
    
    with open(mock_results_file, 'w') as f:
        json.dump(test_results, f, indent=2)
    
    # 2. Generate the report from those results
    create_committee_report(test_results, test_config, "committee_report_preview.md")
    print("\nGenerated 'committee_report_preview.md' for local review.")

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.