Introduction
Apache Airflow est devenu le standard pour l'orchestration de pipelines de données complexes. En 2026, les équipes data exigent des workflows résilients, observables et scalables. Ce tutoriel avancé vous guide pas à pas dans la conception de DAGs sophistiqués incluant des opérateurs sur mesure, des capteurs dynamiques et un déploiement Kubernetes optimisé. Vous apprendrez à éviter les pièges classiques tout en adoptant les meilleures pratiques de production.
Prérequis
- Apache Airflow 2.10+ avec Python 3.11
- Docker et Kubernetes (minikube ou cluster)
- Connaissances solides en Python et en ETL
- Helm 3 installé
Configuration Docker avancée
version: '3.8'
services:
airflow-webserver:
image: apache/airflow:2.10.3
environment:
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=False
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
ports:
- "8080:8080"Ce fichier configure un environnement Airflow prêt pour la production avec CeleryExecutor. Il monte les dossiers DAGs et plugins pour un développement local itératif.
Opérateur personnalisé avancé
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class DataQualityOperator(BaseOperator):
@apply_defaults
def __init__(self, sql_check, conn_id, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sql_check = sql_check
self.conn_id = conn_id
def execute(self, context):
hook = PostgresHook(postgres_conn_id=self.conn_id)
result = hook.get_first(self.sql_check)
if result[0] == 0:
raise ValueError("Qualité des données échouée")Cet opérateur vérifie la qualité des données après chaque transformation. Il lève une exception en cas d'échec, permettant un retry automatique et une alerte Slack.
DAG avec capteurs et branchement
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
with DAG('advanced_etl', start_date=datetime(2026,1,1), schedule='@daily') as dag:
wait_for_file = FileSensor(task_id='wait_file', filepath='/data/input.csv')
quality_check = DataQualityOperator(task_id='quality', sql_check='SELECT COUNT(*) FROM staging')
wait_for_file >> quality_checkCe DAG illustre l'utilisation combinée d'un capteur de fichier et d'un opérateur personnalisé. Le branchement implicite garantit que la validation qualité ne s'exécute qu'après arrivée du fichier.
Configuration Kubernetes
executor: KubernetesExecutor
webserver:
resources:
limits:
memory: "2Gi"
cpu: "1000m"
workers:
replicas: 5
resources:
requests:
memory: "1Gi"Le fichier Helm values.yaml configure l'exécuteur Kubernetes avec auto-scaling et limites de ressources. Cela permet de gérer des centaines de tâches simultanées sans saturation.
Script de déploiement
#!/bin/bash
helm upgrade --install airflow apache-airflow/airflow \
--namespace airflow --create-namespace \
-f values.yaml --waitCe script Helm déploie Airflow sur Kubernetes avec rollback automatique en cas d'échec. Il utilise --wait pour garantir que tous les pods sont prêts avant de terminer.
Bonnes pratiques
- Toujours versionner les DAGs et plugins dans Git
- Utiliser des pools pour limiter la concurrence sur les ressources critiques
- Implémenter des tests unitaires sur les opérateurs personnalisés
- Configurer des SLA et des alertes PagerDuty
- Isoler les connexions sensibles avec le backend Vault
Erreurs courantes à éviter
- Oublier de définir des retries sur les tâches longues
- Utiliser des variables globales au lieu de XComs chiffrés
- Négliger le nettoyage des métadonnées (airflow db clean)
- Déployer sans healthchecks Kubernetes configurés
Pour aller plus loin
Approfondissez ces concepts avec nos formations dédiées à l'orchestration de données : https://learni-group.com/formations.