airflow_project/
├── dags/
│ └── index_rebalancing_dag.py # Our main DAG file
├── include/
│ └── calculations/
│ │ └── engine.py # Python code for the heavy lifting
│ └── utils/
│ └── reporting.py # Code for generating reports
└── config/
└── indices/
└── sp500_rules.yaml # Our version-controlled index definition
The workflow would look like
graph TD
A[get_index_config] –> B[collect_and_validate_market_data];
B –> C[calculate_preliminary_rebalance];
C –> D[generate_analyst_report];
D –> E[notify_committee_for_review];
E –> F[wait_for_committee_approval];
F –> G{check_approval_status};
G — Approved –> H[publish_changes_to_clients];
G — Rejected –> I[handle_rejection];
I –> J[notify_manager_of_rejection];
H –> K[final_reconciliation];
J –> K;
the python DAG file is
# In dags/index_rebalancing_dag.py
import yaml
from pendulum import datetime
from airflow.models.dag import DAG
from airflow.decorators import task
from airflow.operators.email import EmailOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models.baseoperator import chain
# --- DAG Definition ---
with DAG(
dag_id="sp500_quarterly_rebalance",
start_date=datetime(2023, 1, 1),
# In reality, this would be a custom Timetable for the "3rd Friday of the quarter"
# For simplicity, we use a cron string that runs weekly.
schedule_interval="0 0 * * 5",
catchup=False,
tags=["index", "finance"],
doc_md="""
### S&P 500 Quarterly Rebalance
This DAG performs the quarterly rebalancing for the S&P 500 index.
1. Fetches the index rules from the config file.
2. Collects and validates market data.
3. Calculates preliminary changes (adds, drops, weights).
4. **Pauses** and waits for the Index Committee to approve the changes via an external signal.
5. Publishes the final changes upon approval.
""",
) as dag:
# --- Task Definitions ---
@task
def get_index_config(index_id: str):
"""
Loads the index methodology from its version-controlled YAML file.
This task is idempotent.
"""
import yaml
# In a real setup, this path would be part of the Airflow deployment.
config_path = f"/opt/airflow/config/indices/{index_id}_rules.yaml"
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
print(f"Loaded configuration for {config['name']}")
return config
@task
def collect_and_validate_market_data(config: dict, run_id: str):
"""
Fetches market data (pricing, fundamentals) for the universe.
Validates data quality (e.g., no missing prices, no extreme moves).
Saves data to a persistent location (like S3) for this specific run.
"""
print(f"Fetching data for universe: {config['universe']}")
# --- In a real system ---
# from data_vendors import Bloomberg, Refinitiv
# universe_securities = Bloomberg.get_exchange_members(config['universe']['exchange_filter'])
# market_data = Refinitiv.get_bulk_data(universe_securities, as_of_date=...)
# DataQuality.run_checks(market_data)
# --- For this example ---
print("Pretending to fetch and validate terabytes of data...")
# Data is saved to a unique path for this run to ensure idempotency.
output_path = f"/data/raw/{run_id}/market_data.parquet"
print(f"Data saved to {output_path}")
return output_path
@task
def calculate_preliminary_rebalance(config: dict, market_data_path: str, run_id: str):
"""
The core calculation engine.
Applies all eligibility rules from the config to the market data.
Calculates new weights based on the weighting scheme.
"""
# Here we would import our more complex, testable calculation code
# from include.calculations.engine import run_rebalance_engine
print("Applying eligibility criteria from config file:")
# --- Illustrative Logic ---
# import pandas as pd
# df = pd.read_parquet(market_data_path)
# for criterion in config['eligibility_criteria']:
# print(f" - Applying rule: {criterion['rule']} with min value {criterion.get('min')}")
# # df = apply_filter(df, criterion) -> Real logic here
#
# new_weights = calculate_weights(df, config['weighting'])
# -------------------------
print("Calculation complete. Generating preliminary Adds & Drops list.")
preliminary_results_path = f"/data/preliminary/{run_id}/rebalance_results.json"
print(f"Results saved to {preliminary_results_path}")
return preliminary_results_path
@task
def generate_analyst_report(preliminary_results_path: str, run_id: str):
"""
Creates a human-readable PDF/HTML report for the Index Committee.
"""
# from include.utils.reporting import create_pdf_summary
print("Generating summary report for analyst review...")
# report = create_pdf_summary(preliminary_results_path)
# report.save(f"/reports/{run_id}_committee_review.pdf")
report_url = f"http://internal-dashboard.example.com/reports/{run_id}"
print(f"Report available at: {report_url}")
return report_url
notify_committee_for_review = EmailOperator(
task_id="notify_committee_for_review",
to="index-committee@example.com",
subject="Action Required: S&P 500 Preliminary Rebalance for {{ ds }}",
html_content="""
<h3>Review Required</h3>
The preliminary rebalance calculation for the S&P 500 is complete.
<p>Please review the report and provide your approval or rejection in the Index Management Dashboard.</p>
<p>Report Link: {{ task_instance.xcom_pull(task_ids='generate_analyst_report') }}</p>
<p>DAG Run ID: {{ run_id }}</p>
""",
)
# This is the "Human-in-the-Loop" step. The DAG will pause here.
# It waits for an external DAG named 'committee_approval_dag' to run for the same
# logical date and have a task named 'signal_approval' succeed.
# The 'committee_approval_dag' would be triggered by a button in a UI.
wait_for_approval = ExternalTaskSensor(
task_id='wait_for_committee_approval',
external_dag_id='committee_approval_dag',
external_task_id='signal_approval',
allowed_states=['success'],
failed_states=['failed', 'skipped'],
mode='poke',
poke_interval=600, # Check every 10 minutes
timeout=86400 * 2 # Timeout after 2 days
)
@task.branch
def check_approval_status(run_id: str):
"""
After the sensor succeeds, this task checks the final outcome.
(e.g., by calling an API on the approval dashboard).
It then tells the DAG which path to take.
"""
# from include.utils.api import get_approval_status
# status = get_approval_status(run_id) -> returns "APPROVED" or "REJECTED"
status = "APPROVED" # Hardcoded for example
if status == "APPROVED":
return "publish_changes_to_clients"
else:
return "handle_rejection"
@task
def publish_changes_to_clients(preliminary_results_path: str):
"""
Takes the final, approved changes and publishes them to all subscribers
(e.g., via FTP, API, flat files).
"""
print("PUBLISHING APPROVED CHANGES TO CLIENTS...")
# publish_to_ftp('ftp.client1.com', preliminary_results_path)
# publish_to_api('api.client2.com', preliminary_results_path)
print("Publication complete.")
@task
def handle_rejection():
"""
The 'rejection' branch. Triggers alerts and ends the workflow.
"""
print("REBALANCE REJECTED BY COMMITTEE. Notifying index manager.")
notify_manager_of_rejection = EmailOperator(
task_id="notify_manager_of_rejection",
to="index-manager@example.com",
subject="URGENT: S&P 500 Rebalance REJECTED for {{ ds }}",
html_content="""
The quarterly rebalance for run_id {{ run_id }} was rejected by the committee.
Please investigate immediately.
""",
)
@task(trigger_rule="one_success")
def final_reconciliation():
"""This task runs after either a successful publication or a handled rejection."""
print("Workflow finished. Running final checks.")
# --- Define DAG Dependencies ---
# 1. Get Config
config_data = get_index_config(index_id="sp500")
# 2. Run Main Calculation Pipeline
market_data = collect_and_validate_market_data(config_data, run_id="{{ run_id }}")
prelim_results = calculate_preliminary_rebalance(config_data, market_data, run_id="{{ run_id }}")
report = generate_analyst_report(prelim_results, run_id="{{ run_id }}")
# 3. Notify and Wait for Human
chain(report, notify_committee_for_review, wait_for_approval)
# 4. Branch based on Approval
approval_branch = check_approval_status(run_id="{{ run_id }}")
# 5. Define Post-Approval Paths
rejection_path = handle_rejection()
chain(rejection_path, notify_manager_of_rejection, final_reconciliation())
publication_path = publish_changes_to_clients(prelim_results)
chain(publication_path, final_reconciliation())
# Connect the branch task to its possible outcomes
approval_branch >> [publication_path, rejection_path]
Now the details of calculation engine python file
# in include/calculations/engine.py
import pandas as pd
from typing import Dict, Any, List
# --- Helper Functions for Specific Rules ---
# In a real system, these would be far more complex.
def _apply_market_cap_filter(df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""Filters DataFrame based on minimum market capitalization."""
min_market_cap = rule.get("min", 0)
print(f" -> Applying market cap filter: > ${min_market_cap / 1_000_000_000:.2f}B")
return df[df["market_cap_usd"] >= min_market_cap].copy()
def _apply_public_float_filter(df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""Filters DataFrame based on minimum public float percentage."""
min_float = rule.get("min", 0)
print(f" -> Applying public float filter: > {min_float:.0%}")
return df[df["public_float_percent"] >= min_float].copy()
def _apply_liquidity_filter(df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""Filters DataFrame based on liquidity rules."""
min_ratio = rule.get("min_ratio_adv_to_fmc", 0)
print(f" -> Applying liquidity filter (ADV/FMC ratio): > {min_ratio}")
df["adv_to_fmc_ratio"] = df["annual_dollar_volume"] / (df["market_cap_usd"] * df["public_float_percent"])
return df[df["adv_to_fmc_ratio"] >= min_ratio].copy()
# A dispatcher to map rule names from YAML to functions
RULE_DISPATCHER = {
"market_cap_usd": _apply_market_cap_filter,
"public_float_percent": _apply_public_float_filter,
"liquidity": _apply_liquidity_filter,
# "profitability" would require fetching fundamental data, omitted for brevity
}
def _get_current_constituents() -> set:
"""
Placeholder function. In reality, this would fetch the current list of
constituents from a database or a flat file source to compare against.
"""
# For this example, we'll hardcode a small, simplified "current" list
return {"AAPL", "MSFT", "GOOG", "AMZN", "META"}
def _calculate_weights(df: pd.DataFrame, weighting_config: Dict[str, Any]) -> pd.Series:
"""Calculates weights for the final constituents based on the config."""
if weighting_config["scheme"] != "float_adjusted_market_cap":
raise NotImplementedError("Only float-adjusted market cap weighting is supported.")
df["float_adjusted_market_cap"] = df["market_cap_usd"] * df["public_float_percent"]
total_fmc = df["float_adjusted_market_cap"].sum()
weights = df["float_adjusted_market_cap"] / total_fmc
# Apply capping if specified
if "capping" in weighting_config:
max_weight = weighting_config["capping"].get("max_weight_percent", 100) / 100.0
weights[weights > max_weight] = max_weight
# Note: A real capping mechanism requires iterative redistribution of excess weight.
# This is a simplified version.
return weights
# --- Main Engine Entry Point ---
def run_rebalance_engine(config: Dict[str, Any], market_data_path: str) -> Dict[str, Any]:
"""
Main function to run the index rebalance calculation.
Args:
config: The parsed YAML configuration dictionary for the index.
market_data_path: Path to the Parquet file containing market data.
Returns:
A dictionary containing the full rebalance results.
"""
print("--- Starting Rebalance Calculation Engine ---")
# 1. Load Data
# In a real system, this would be a large dataset. We'll create a sample.
# df = pd.read_parquet(market_data_path)
mock_data = {
'ticker': ['AAPL', 'MSFT', 'GOOG', 'NVDA', 'TSLA', 'META', 'BRK.A', 'V', 'JNJ', 'WMT'],
'market_cap_usd': [2.8e12, 2.5e12, 1.7e12, 1.2e12, 800e9, 700e9, 750e9, 500e9, 450e9, 430e9],
'public_float_percent': [0.85, 0.88, 0.80, 0.75, 0.60, 0.90, 0.45, 0.95, 0.98, 0.55],
'annual_dollar_volume': [20e12, 18e12, 10e12, 30e12, 40e12, 15e12, 0.5e12, 5e12, 2e12, 3e12],
'sector': ['Tech', 'Tech', 'Tech', 'Tech', 'Consumer', 'Tech', 'Finance', 'Finance', 'Health', 'Retail']
}
df = pd.DataFrame(mock_data)
df_filtered = df.copy()
print(f"Loaded {len(df_filtered)} securities for initial screening.")
# 2. Apply Eligibility Criteria
print("\nApplying eligibility criteria from config:")
for rule in config["eligibility_criteria"]:
rule_name = rule["rule"]
if rule_name in RULE_DISPATCHER:
df_filtered = RULE_DISPATCHER[rule_name](df_filtered, rule)
else:
print(f" -> WARNING: No function found for rule '{rule_name}'. Skipping.")
print(f"\n{len(df_filtered)} securities passed all eligibility filters.")
# 3. Apply Selection Logic
target_count = config["selection"]["target_constituent_count"]
# For S&P 500, selection is by market cap, then committee review.
# The engine selects the top N candidates by market cap.
df_eligible = df_filtered.sort_values(by="market_cap_usd", ascending=False)
df_final_constituents = df_eligible.head(target_count)
print(f"Selected top {len(df_final_constituents)} candidates for the index.")
# 4. Determine Adds and Drops
current_constituents = _get_current_constituents()
new_constituents = set(df_final_constituents['ticker'])
adds = sorted(list(new_constituents - current_constituents))
drops = sorted(list(current_constituents - new_constituents))
turnover_count = len(adds) + len(drops)
turnover_percent = (turnover_count / target_count) * 100
# 5. Calculate Final Weights
final_weights = _calculate_weights(df_final_constituents, config["weighting"])
# 6. Assemble Final Results
results = {
"metadata": {
"index_id": config["index_id"],
"rebalance_date": pd.Timestamp.now().isoformat(),
},
"summary": {
"constituent_count": len(df_final_constituents),
"adds_count": len(adds),
"drops_count": len(drops),
"turnover_percent": round(turnover_percent, 2),
},
"adds": adds,
"drops": drops,
"final_constituents": df_final_constituents[['ticker', 'market_cap_usd', 'sector']].to_dict(orient='records'),
"weights": final_weights.to_dict(),
}
print("--- Rebalance Calculation Engine Finished ---")
return results
And the report python file
# in include/utils/reporting.py
from typing import Dict, Any
import pandas as pd
import json
def create_committee_report(rebalance_results: Dict[str, Any], config: Dict[str, Any], output_path: str):
"""
Generates a Markdown report summarizing the rebalance for committee review.
Args:
rebalance_results: The dictionary output from the calculation engine.
config: The index configuration dictionary.
output_path: The path to save the final report.
"""
print("--- Generating Committee Review Report ---")
# Extract key data for easier access
meta = rebalance_results['metadata']
summary = rebalance_results['summary']
# Start building the Markdown string
report_md = f"""
# Index Rebalance Review: {config['name']} ({config['index_id']})
**Report Generated:** {meta['rebalance_date']}
**Rebalance Effective:** As per schedule: {config['schedule']['effective_on']} of months {config['schedule']['effective_months']}
---
## 1. Executive Summary
| Metric | Value |
| :--- | :--- |
| **Target Constituent Count** | {config['selection']['target_constituent_count']} |
| **Final Constituent Count** | {summary['constituent_count']} |
| **Constituents Added** | {summary['adds_count']} |
| **Constituents Dropped** | {summary['drops_count']} |
| **Turnover** | {summary['turnover_percent']}% |
---
## 2. Adds & Drops
### Added to Index ({summary['adds_count']})
"""
if summary['adds_count'] > 0:
adds_df = pd.DataFrame([
c for c in rebalance_results['final_constituents']
if c['ticker'] in rebalance_results['adds']
])
report_md += adds_df.to_markdown(index=False)
else:
report_md += "_No securities added._"
report_md += "\n\n### Dropped from Index ({summary['drops_count']})\n"
if summary['drops_count'] > 0:
# Note: To show stats for dropped stocks, the engine would need to keep them.
# For this example, we just list the tickers.
for ticker in rebalance_results['drops']:
report_md += f"- {ticker}\n"
else:
report_md += "_No securities dropped._"
report_md += "\n\n---\n\n## 3. Post-Rebalance Sector Weights\n\n"
constituents_df = pd.DataFrame(rebalance_results['final_constituents'])
weights_s = pd.Series(rebalance_results['weights'], name='weight').reset_index().rename(columns={'index': 'ticker'})
# Merge weights back with sector info
df_with_weights = pd.merge(constituents_df, weights_s, on='ticker')
sector_weights = df_with_weights.groupby('sector')['weight'].sum().sort_values(ascending=False)
sector_weights_df = (sector_weights * 100).round(2).reset_index()
sector_weights_df.columns = ['Sector', 'Weight (%)']
report_md += sector_weights_df.to_markdown(index=False)
report_md += "\n\n--- END OF REPORT ---"
# Save the report to a file
with open(output_path, 'w') as f:
f.write(report_md)
print(f"Report successfully saved to {output_path}")
# Example of how this might be called and tested locally
if __name__ == '__main__':
# This block allows you to test the reporting function directly
mock_results_file = 'mock_rebalance_results.json'
mock_config_file = '../../config/indices/sp500_rules.yaml' # Adjust path for local run
# 1. Run the engine to get mock results
with open(mock_config_file, 'r') as f:
test_config = yaml.safe_load(f)
# In a real test, you'd have a static mock data file
test_results = run_rebalance_engine(test_config, market_data_path="/path/to/mock_data.parquet")
with open(mock_results_file, 'w') as f:
json.dump(test_results, f, indent=2)
# 2. Generate the report from those results
create_committee_report(test_results, test_config, "committee_report_preview.md")
print("\nGenerated 'committee_report_preview.md' for local review.")