Skip to content
Learni
View all tutorials
Data Engineering

How to Create Complex DAGs with Apache Airflow in 2026

Lire en français

Introduction

Apache Airflow has become the standard for orchestrating complex data pipelines. In 2026, data teams demand resilient, observable, and scalable workflows. This advanced tutorial guides you step by step through designing sophisticated DAGs that include custom operators, dynamic sensors, and optimized Kubernetes deployment. You will learn to avoid common pitfalls while adopting production best practices.

Prerequisites

  • Apache Airflow 2.10+ with Python 3.11
  • Docker and Kubernetes (minikube or cluster)
  • Strong knowledge of Python and ETL
  • Helm 3 installed

Advanced Docker Configuration

docker-compose.yaml
version: '3.8'
services:
  airflow-webserver:
    image: apache/airflow:2.10.3
    environment:
      - AIRFLOW__CORE__EXECUTOR=CeleryExecutor
      - AIRFLOW__CORE__LOAD_EXAMPLES=False
    volumes:
      - ./dags:/opt/airflow/dags
      - ./plugins:/opt/airflow/plugins
    ports:
      - "8080:8080"

This file configures a production-ready Airflow environment with CeleryExecutor. It mounts the DAGs and plugins directories for iterative local development.

Advanced Custom Operator

plugins/custom_operators.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class DataQualityOperator(BaseOperator):
    @apply_defaults
    def __init__(self, sql_check, conn_id, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.sql_check = sql_check
        self.conn_id = conn_id

    def execute(self, context):
        hook = PostgresHook(postgres_conn_id=self.conn_id)
        result = hook.get_first(self.sql_check)
        if result[0] == 0:
            raise ValueError("Qualité des données échouée")

This operator checks data quality after each transformation. It raises an exception on failure, enabling automatic retries and Slack alerts.

DAG with Sensors and Branching

dags/advanced_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime

with DAG('advanced_etl', start_date=datetime(2026,1,1), schedule='@daily') as dag:
    wait_for_file = FileSensor(task_id='wait_file', filepath='/data/input.csv')
    quality_check = DataQualityOperator(task_id='quality', sql_check='SELECT COUNT(*) FROM staging')
    wait_for_file >> quality_check

This DAG demonstrates combining a file sensor with a custom operator. Implicit branching ensures the quality validation runs only after the file arrives.

Kubernetes Configuration

values.yaml
executor: KubernetesExecutor
webserver:
  resources:
    limits:
      memory: "2Gi"
      cpu: "1000m"
workers:
  replicas: 5
  resources:
    requests:
      memory: "1Gi"

The Helm values.yaml file configures the Kubernetes executor with auto-scaling and resource limits. This enables handling hundreds of concurrent tasks without saturation.

Deployment Script

deploy.sh
#!/bin/bash
helm upgrade --install airflow apache-airflow/airflow \
  --namespace airflow --create-namespace \
  -f values.yaml --wait

This Helm script deploys Airflow on Kubernetes with automatic rollback on failure. It uses --wait to ensure all pods are ready before completion.

Best Practices

  • Always version DAGs and plugins in Git
  • Use pools to limit concurrency on critical resources
  • Implement unit tests for custom operators
  • Configure SLAs and PagerDuty alerts
  • Isolate sensitive connections with the Vault backend

Common Errors to Avoid

  • Forgetting to set retries on long-running tasks
  • Using global variables instead of encrypted XComs
  • Neglecting metadata cleanup (airflow db clean)
  • Deploying without configured Kubernetes healthchecks

Further Reading

Deepen these concepts with our dedicated data orchestration courses: https://learni-group.com/formations.