For complex scheduling tasks use Airflow, features are
- Scheduling: Uses cron-style scheduling
- Error Handling: Built-in retries and failure callbacks
- Dependencies: Clear task dependencies
- Modularity: Separates different concerns into tasks
- Configuration: Uses Airflow Variables and Connections
- Monitoring: Built-in UI for monitoring and logging
from airflow.utils.dates import days_ago
from airflow.operators.email_operator import EmailOperator
with DAG(
'monthly_rebalancing',
default_args=default_args,
description='Monthly index rebalancing with notifications',
schedule_interval='0 18 15 * *',
catchup=False
) as dag:
start = DummyOperator(task_id='start')
# Data preparation
fetch_data = PythonOperator(
task_id='fetch_data',
python_callable=fetch_market_data
)
# Processing
process_sectors = create_sector_tasks()
# Validation
validate = PythonOperator(
task_id='validate_results',
python_callable=validate_weights
)
# Notification
notify = EmailOperator(
task_id='send_notification',
to='team@quant.com',
subject='Index Rebalancing Complete',
html_content='Monthly index rebalancing has completed successfully.'
)
# Define workflow
start >> fetch_data >> process_sectors >> validate >> notify