Apache Airflow — стандарт оркестрации пайплайнов в 2026. Если ты dbt-моделишь данные, Airflow запускает dbt по расписанию + ходит за свежими файлами + рассылает алерты в Slack + сохраняет результаты тестов. Этот гайд — практические DAG-паттерны без воды.
Покажу как писать идемпотентные DAG, использовать TaskFlow API (2.0+), грамотно работать с sensors и динамическими DAG-ами. Всё с примерами из аналитики маркетплейса.
Что узнаешь
- TaskFlow API vs PythonOperator — как писать DAG в 2026
- Idempotency: главный паттерн для retry-safe пайплайнов
- Sensors: poke vs reschedule (когда что использовать)
- Dynamic DAGs: один template, N tasks
- Мониторинг: alerts, SLA misses, on_failure_callback
Шаг 1: Airflow за 5 минут — концепции
DAG (Directed Acyclic Graph) — расписание задач + их зависимости.
Task — единица работы (запустить SQL, скопировать файл).
Scheduler — следит за расписаниями DAGов и кладёт их в очередь.
Worker — выполняет tasks из очереди.
Executor — стратегия выполнения (Local, Celery, Kubernetes).
Source DBs → Airflow Scheduler (cron) → Workers
↓
DAGs запускают:
dbt run + tests
S3 → Snowflake load
Notebook → отчёт в Slack
Подборка 10 Airflow-вопросов с собесов — паттерны idempotency, scheduling и sensors.
Шаг 2: TaskFlow API (Airflow 2.0+) — современный синтаксис
Старый PythonOperator:
def extract(ds, **kwargs):
return {'rows': 1000}
def transform(ti, **kwargs):
data = ti.xcom_pull(task_ids='extract')
return {'processed': data['rows'] * 2}
with DAG('old_style', schedule='@daily') as dag:
e = PythonOperator(task_id='extract', python_callable=extract)
t = PythonOperator(task_id='transform', python_callable=transform)
e >> t
TaskFlow API — короче в 3 раза:
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl'])
def etl_pipeline():
@task
def extract():
return {'rows': 1000}
@task
def transform(data: dict):
return {'processed': data['rows'] * 2}
@task
def load(data: dict):
print(f"Loaded {data['processed']} rows")
load(transform(extract()))
etl_pipeline()
XCom передаётся автоматически через return value → assignment.
Шаг 3: Idempotency — главный паттерн
Idempotent task: запустить N раз = тот же результат. Безопасный retry без дублирования.
Bad (не идемпотентный):
@task
def load_revenue(ds):
df = pd.read_csv(f's3://raw/events_{ds}.csv')
df.to_sql('daily_revenue', engine, if_exists='append') # ❌ retry дублирует
Good (идемпотентный):
@task
def load_revenue(ds):
df = pd.read_csv(f's3://raw/events_{ds}.csv')
with engine.begin() as conn:
conn.execute(f"DELETE FROM daily_revenue WHERE day = '{ds}'")
df.to_sql('daily_revenue', conn, if_exists='append', index=False)
Лучший вариант — UPSERT/MERGE:
@task
def load_revenue(ds):
sql = f"""
INSERT INTO daily_revenue (day, revenue)
SELECT '{ds}', SUM(amount)
FROM events
WHERE day = '{ds}'
GROUP BY day
ON CONFLICT (day) DO UPDATE SET revenue = EXCLUDED.revenue;
"""
engine.execute(sql)
Использование {{ ds }} (execution_date logical date в YYYY-MM-DD) — Airflow подставит правильную дату при backfill и retry.
Шаг 4: Sensors — ожидание условий
Sensor ждёт пока условие станет True (файл появился, partition готов, API доступен).
Modes:
- poke (default) — занимает worker-слот всё время ожидания
- reschedule — освобождает worker между проверками (для долгих ожиданий)
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_csv',
filepath='/data/incoming/sales_{{ ds }}.csv',
poke_interval=60, # проверять каждую минуту
timeout=3600, # ждать максимум 1 час
mode='reschedule', # освобождать worker между проверками
)
ExternalTaskSensor — ждать task из другого DAG:
from airflow.sensors.external_task import ExternalTaskSensor
wait_upstream = ExternalTaskSensor(
task_id='wait_upstream_dag',
external_dag_id='upstream_etl',
external_task_id='load_final',
timeout=7200,
mode='reschedule',
)
Smart Sensors deprecated в 2.4+ → используй deferrable operators (новая модель async).
Шаг 5: Dynamic Task Mapping (Airflow 2.3+)
Один task description → N исполнений на runtime данных:
@dag(schedule='@daily', start_date=datetime(2024, 1, 1))
def per_country_etl():
@task
def list_countries():
return ['ru', 'us', 'br', 'de', 'in'] # из DB или config
@task
def process_country(country: str):
# Обработка для конкретной страны
print(f"Processing {country}")
return f"done_{country}"
@task
def summarize(results: list):
print(f"Total countries: {len(results)}")
countries = list_countries()
results = process_country.expand(country=countries)
summarize(results)
При runtime Airflow создаст 5 task-инстансов process_country (по одному на страну), параллельно или последовательно в зависимости от max_active_tis_per_dag.
Шаг 6: Error handling + alerts
on_failure_callback — слать в Slack/PagerDuty при падении:
def slack_alert(context):
task = context['task'].task_id
dag = context['dag'].dag_id
msg = f"⚠️ {dag}.{task} FAILED. Log: {context['task_instance'].log_url}"
send_to_slack(msg)
@dag(
schedule='@daily',
on_failure_callback=slack_alert, # на падение всего DAG
default_args={
'on_failure_callback': slack_alert, # на падение task
'retries': 3,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=2),
'email_on_failure': False,
}
)
def critical_etl():
...
SLA misses автоматом регистрируются если task не завершился в указанный timeframe — sla_miss_callback шлёт уведомление.
Шаг 7: Best practices в проде
- catchup=False для большинства DAG — иначе при изменении start_date Airflow забэкфилит сотни runs (мест убийца).
- Только одна active DAG run для упорядоченных пайплайнов —
max_active_runs=1.
- TaskGroups вместо SubDAG (SubDAGs deprecated с 2.6):
from airflow.utils.task_group import TaskGroup
with TaskGroup('etl_block') as etl_group:
extract() >> transform() >> load()
- Не делай тяжёлый код в DAG-файле — он парсится каждые
min_file_process_intervalсекунд. Импорты дорогих библиотек → SourceCode growing → длинные scheduler heartbeats.
- Connection и Variable через Secrets Backend (AWS Secrets Manager, HashiCorp Vault). Не хардкодь.
- Pools для rate-limiting — ограничить одновременные подключения к dwarehouse:
heavy_task = PythonOperator(
task_id='heavy',
python_callable=my_func,
pool='snowflake_pool',
pool_slots=2,
)
FAQ
Airflow vs Prefect vs Dagster?
Airflow — мейн-стрим (большая комьюнити, dbt integration, K8s executor). Prefect — современный API, лучше для ML pipelines. Dagster — software-defined assets, асset-centric model. В РФ в 99% случаев Airflow.
Сколько DAG норм для одного instance?
До 500-1000 DAG на standalone Scheduler без проблем. После — нужны несколько Scheduler-инстансов или HA setup. На очень больших instance (3000+) лучше разделить на logical clusters.
Schedule '@daily' vs '0 0 * * *'?
Эквивалентны: оба = midnight UTC каждый день. Cron-выражения дают точный контроль. @daily короче и понятнее.
catchup=True — кому нужен?
Для bulk-backfill при первом запуске нового DAG. После — переключай на False, иначе любое изменение start_date запустит сотни runs.
Dependence на dbt?
Используй BashOperator (dbt run --select model_name) или Astronomer Cosmos (https://github.com/astronomer/astronomer-cosmos) — генерит Airflow tasks из dbt manifest автоматически.
Что дальше
- 🎯 SQL-тренажёр — 480+ задач для SQL внутри dbt-моделей
- 🧠 3000+ вопросов с собесов — 10+ про Airflow паттерны
- 📚 dbt практический гайд — что Airflow оркестрирует
- ⚡ ClickHouse гайд — destination для Airflow ETL
- 💼 Roadmap аналитика — где Airflow помещается в стэке
Источники
- airflow.apache.org/docs — официальные доки
- Astronomer Academy — free курсы от Astronomer (создатели managed Airflow)
- Airflow Slack — 30k+ community
Airflow — это контроль над тысячами пайплайнов. Открой SQL-тренажёр и качай SQL под dbt-модели которые Airflow будет крутить.