Creating UI is a necessary element in DAG workflow. here is the additional strucuture:
index_management_ui/
├── app.py # The main Flask application
├── ui_database.json # A simple file to act as our database
└── templates/
├── layout.html # Base HTML template with Bootstrap CSS
├── dashboard.html # The main monitoring dashboard
└── details.html # The approval/rejection page for a single run
First the dag.py file need to be slightly edited to have it notify the UI when the step is up for approval. then the core is app.py file as the following
# in index_management_ui/app.py
import json
import os
import requests
from flask import Flask, render_template, request, redirect, url_for, jsonify, abort
app = Flask(__name__)
# --- Configuration ---
# In a real app, use environment variables!
DB_FILE = "ui_database.json"
AIRFLOW_URL = "http://localhost:8080/api/v1"
# IMPORTANT: Create a user in Airflow with API access.
AIRFLOW_USER = "admin" # Replace with your Airflow user
AIRFLOW_PASSWORD = "admin" # Replace with your Airflow password
# The DAG that the ExternalTaskSensor is waiting for.
APPROVAL_DAG_ID = "committee_approval_dag"
# --- Database Helper Functions (using a simple JSON file) ---
def get_db():
if not os.path.exists(DB_FILE):
return {}
with open(DB_FILE, 'r') as f:
return json.load(f)
def save_db(db):
with open(DB_FILE, 'w') as f:
json.dump(db, f, indent=2)
# --- UI Routes ---
@app.route('/')
def dashboard():
"""Main dashboard showing the status of all rebalance runs."""
db = get_db()
# Sort runs by timestamp, newest first
sorted_runs = sorted(db.items(), key=lambda item: item[1]['timestamp'], reverse=True)
return render_template('dashboard.html', runs=dict(sorted_runs))
@app.route('/run/<run_id>')
def run_details(run_id):
"""Shows the details for a single run, with Approve/Reject buttons."""
db = get_db()
run_data = db.get(run_id)
if not run_data:
abort(404, "Run ID not found.")
return render_template('details.html', run_id=run_id, run=run_data)
@app.route('/action/approve/<run_id>', methods=['POST'])
def approve_run(run_id):
"""Handles the 'Approve' action. Triggers the Airflow signal DAG."""
db = get_db()
if run_id not in db or db[run_id]['status'] != 'AWAITING_APPROVAL':
return "This run cannot be approved.", 400
# This is the core interaction with Airflow
# We trigger the "signal" DAG, which the main DAG is waiting for.
# The dag_run_id must be unique and predictable.
signal_run_id = f"signal_approve_{run_id}"
endpoint = f"{AIRFLOW_URL}/dags/{APPROVAL_DAG_ID}/dagRuns"
try:
response = requests.post(
endpoint,
auth=(AIRFLOW_USER, AIRFLOW_PASSWORD),
json={"dag_run_id": signal_run_id},
timeout=15
)
response.raise_for_status()
print(f"Successfully triggered approval signal DAG for {run_id}. Airflow response: {response.json()}")
db[run_id]['status'] = 'APPROVED'
save_db(db)
except requests.exceptions.RequestException as e:
print(f"ERROR: Could not trigger Airflow approval DAG: {e}")
return f"Error communicating with Airflow: {e}", 500
return redirect(url_for('dashboard'))
@app.route('/action/reject/<run_id>', methods=['POST'])
def reject_run(run_id):
"""Handles the 'Reject' action. Marks the run as rejected."""
db = get_db()
if run_id not in db or db[run_id]['status'] != 'AWAITING_APPROVAL':
return "This run cannot be rejected.", 400
# In our simplified model, we will fail the ExternalTaskSensor by triggering
# the signal DAG and then immediately setting its state to 'failed'.
signal_run_id = f"signal_reject_{run_id}"
dag_run_endpoint = f"{AIRFLOW_URL}/dags/{APPROVAL_DAG_ID}/dagRuns"
try:
# 1. Trigger the DAG
trigger_resp = requests.post(
dag_run_endpoint, auth=(AIRFLOW_USER, AIRFLOW_PASSWORD),
json={"dag_run_id": signal_run_id}, timeout=15
)
trigger_resp.raise_for_status()
# 2. Immediately mark it as failed
update_endpoint = f"{dag_run_endpoint}/{signal_run_id}"
update_resp = requests.patch(
update_endpoint, auth=(AIRFLOW_USER, AIRFLOW_PASSWORD),
json={"state": "failed"}, timeout=15
)
update_resp.raise_for_status()
db[run_id]['status'] = 'REJECTED'
save_db(db)
except requests.exceptions.RequestException as e:
print(f"ERROR: Could not trigger Airflow rejection flow: {e}")
return f"Error communicating with Airflow: {e}", 500
return redirect(url_for('dashboard'))
# --- API for Airflow to Call ---
@app.route('/api/new_run', methods=['POST'])
def new_run_from_airflow():
"""API endpoint for the Airflow DAG to report a new run for approval."""
data = request.json
run_id = data.get('run_id')
if not run_id:
return jsonify({"error": "run_id is required"}), 400
db = get_db()
db[run_id] = {
"index_id": data.get('index_id'),
"report_url": data.get('report_url'),
"status": "AWAITING_APPROVAL",
"timestamp": datetime.utcnow().isoformat() + "Z"
}
save_db(db)
return jsonify({"status": "success", "message": f"Run {run_id} is awaiting approval."})
if __name__ == '__main__':
from datetime import datetime
# Ensure the database file exists on startup
if not os.path.exists(DB_FILE):
with open(DB_FILE, 'w') as f:
json.dump({}, f)
app.run(host='0.0.0.0', port=5001, debug=True)
then the html files with javascript functionalities. the output is that
- Start Airflow: Make sure your Airflow instance (using Docker Compose) is running.
- Create the Signal DAG: Create a simple DAG in Airflow named committee_approval_dag.py. It can be empty, just a placeholder for the sensor.
- Run the Flask UI:
- Open a terminal in the index_management_ui directory.
- Install dependencies: pip install Flask requests
- Run the app: python app.py
- You will see output saying the server is running on http://127.0.0.1:5001.
- Trigger the Main DAG: Go to your Airflow UI (http://localhost:8080), find sp500_quarterly_rebalance, and trigger a manual run.
- Watch the Magic:
- The DAG will run until the generate_analyst_report task.
- This task will call your Flask API. Check the Flask terminal for a POST /api/new_run message.
- The DAG will then pause at the wait_for_committee_approval sensor.
- Go to your UI at http://localhost:5001. You will see the new run with status “Awaiting Approval”.
- Click “Review”, then click the “Approve” button.
- The UI will call the Airflow API to trigger the signal DAG.
- The ExternalTaskSensor in the main DAG will detect this and succeed, allowing the workflow to continue to the publish_changes_to_clients step.