Introduction
Un Data Lake est un repository centralisé pour stocker des données brutes à grande échelle, sans schéma imposé upfront, contrairement aux data warehouses traditionnels. En 2026, avec l'explosion des données IoT, ML et logs, les Data Lakes évoluent vers des Lakehouses grâce à des formats comme Delta Lake, qui ajoutent des transactions ACID, schema enforcement et time travel sur S3 ou stockage objet.
Ce tutoriel avancé vous guide pour implémenter un Data Lake local scalable avec MinIO (S3-compatible), PySpark et Delta Lake. Vous apprendrez à ingérer des données hétérogènes, gérer l'évolution de schémas, effectuer des merges upsert, et optimiser les queries. Idéal pour les data engineers seniors gérant des pipelines petabyte-scale. À la fin, vous aurez un setup fonctionnel, extensible à AWS EMR ou Databricks. (128 mots)
Prérequis
- Docker et Docker Compose installés (version 24+)
- Python 3.10+ avec pip
- 8 Go RAM minimum (Spark en local)
- Connaissances avancées en Spark, SQL et data modeling
- AWS CLI optionnel pour migration S3
Lancer MinIO comme stockage objet
version: '3.8'
services:
minio:
image: minio/minio:latest
ports:
- '9000:9000'
- '9001:9001'
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ':9001'
volumes:
- minio_data:/data
createbuckets:
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
/usr/bin/mc alias set myminio http://minio:9000 minioadmin minioadmin;
/usr/bin/mc mb myminio/datalake;
/usr/bin/mc policy set public myminio/datalake;
"
volumes:
minio_data:Ce docker-compose lance MinIO sur les ports 9000 (API) et 9001 (console). Un service init crée le bucket 'datalake' public. Accédez à http://localhost:9001 avec minioadmin/minioadmin. Cela simule parfaitement S3 pour tests locaux, évitant les coûts AWS.
Vérifier MinIO et créer un client S3
Démarrez avec docker compose up -d. Vérifiez le bucket via la console. Configurez mc (MinIO Client) localement : mc alias set myminio http://localhost:9000 minioadmin minioadmin. Cela prépare l'environnement pour Spark, qui traitera MinIO comme S3.
Installer PySpark et Delta Lake
#!/bin/bash
pip install pyspark==3.5.1 delta-spark==3.2.0 pandas numpy
# Télécharger données sample (CSV IoT-like)
curl -o sensor_data.csv https://raw.githubusercontent.com/databricks-tech-hub/spark-delta-lake-workshop/main/data/sensor_data.csv
# Config Spark pour MinIO (export vars)
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export SPARK_CONF='spark.hadoop.fs.s3a.endpoint http://localhost:9000'
export SPARK_CONF+=' spark.hadoop.fs.s3a.access.key minioadmin'
export SPARK_CONF+=' spark.hadoop.fs.s3a.secret.key minioadmin'
export SPARK_CONF+=' spark.hadoop.fs.s3a.path.style.access true'
export SPARK_CONF+=' spark.hadoop.fs.s3a.impl com.systems.integration.hive.HiveShimsS3A'
export SPARK_CONF+=' spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension'
export SPARK_CONF+=' spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog'Installe PySpark 3.5+ et Delta 3.2 pour compatibilité 2026. Télécharge un CSV sample (températures capteurs). Exporte les vars d'env pour que Spark voie MinIO comme S3. Ajoute les extensions Delta pour Lakehouse features.
Ingérer les premières données en Delta
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import *
spark = SparkSession.builder \
.appName("DataLakeInit") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
# Lire CSV local
raw_df = spark.read.option("header", "true").csv("sensor_data.csv")
# Ajouter métadonnées (ingestion time)
enriched_df = raw_df.withColumn("ingestion_time", current_timestamp())
# Écrire en Delta sur MinIO
path = "s3a://datalake/sensors/"
enriched_df.write.format("delta").mode("overwrite").save(path)
spark.sql(f"CREATE TABLE IF NOT EXISTS sensors USING DELTA LOCATION '{path}'")
spark.stop()
print("Table Delta créée avec succès.")Lit le CSV, enrichit avec timestamp d'ingestion (best practice pour audit). Écrit en format Delta sur s3a:// (MinIO). Crée une table managée pour queries SQL. Utilisez overwrite pour init ; en prod, préférez append. Évite les pièges de schéma implicite.
Lire et valider la table Delta
Exécutez spark-submit --packages io.delta:delta-spark_2.12:3.2.0 ingest_initial.py ou python ingest_initial.py. Query via Spark SQL : spark.sql('SELECT * FROM sensors LIMIT 10').show(). Vérifiez l'historique Delta dans MinIO : _delta_log/ contient les JSON transactions.
Time Travel : Accéder à des versions passées
from pyspark.sql import SparkSession
from delta.tables import *
spark = SparkSession.builder \
.appName("TimeTravel") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
path = "s3a://datalake/sensors/"
# Lire version précédente (ex: 1h ago ou version 0)
old_df = spark.read.format("delta").option("versionAsOf", 0).load(path)
print("Version 0:")
old_df.show(5)
# Time travel par timestamp
spark.sql(f"""
SELECT * FROM sensors
VERSION AS OF TIMESTAMP '2026-01-01 10:00:00'
LIMIT 5
""").show()
spark.stop()Delta Lake permet le time travel natif via versionAsOf ou timestamp, idéal pour audits et rollbacks. Ici, on lit la version 0 (avant updates). En prod, intégrez à des pipelines CI/CD pour versioning automatique. Piège : versions supprimées si vacuum activé.
Schema Evolution et Enforcement
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("SchemaEvolution") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
path = "s3a://datalake/sensors/"
dt = DeltaTable.forPath(spark, path)
# Data avec nouveau champ (évolution)
new_data = spark.createDataFrame([
(1, 25.5, "2026-01-01 11:00:00", "new_sensor")
], ["id", "temperature", "ingestion_time", "device_type"])
# Merge avec autoMerge (évolue schéma)
new_data.write.format("delta").mode("append").option("mergeSchema", "true").save(path)
print("Schéma évolué :")
spark.sql(f"DESCRIBE sensors").show(truncate=False)
spark.stop()Active mergeSchema pour évolution safe du schéma lors d'append/merge. Delta ajoute device_type sans casser existant. Utilisez autoMerge.enabled pour prod. Piège : conflits types (string vs int) nécessitent resolve explicite.
Effectuer des Upserts avec MERGE
MERGE est clé pour CDC (Change Data Capture) dans les Data Lakes. Il upsert/déduplique sur clé primaire, avec conditions complexes. Partitionnez par date pour scalabilité.
Upsert via MERGE avancé
from pyspark.sql import SparkSession
from delta.tables import *
spark = SparkSession.builder \
.appName("MergeUpsert") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
path = "s3a://datalake/sensors/"
# Source updates (avec duplicata)
updates_df = spark.createDataFrame([
(1, 26.0, "2026-01-01 12:00:00", "new_sensor"),
(2, 22.1, "2026-01-01 12:00:00", "old_sensor")
], ["id", "temperature", "ingestion_time", "device_type"])
# MERGE upsert
spark.sql(f"""
MERGE INTO sensors t
USING (SELECT * FROM {updates_df.createOrReplaceTempView('updates')})
ON t.id = updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")
spark.sql("SELECT * FROM sensors WHERE id=1").show()
spark.stop()MERGE matche sur id, update si existant, insert sinon. Supporte DELETE/WHEN NOT MATCHED BY SOURCE. Idéal pour Kafka streams. Optimisez avec ZORDER BY sur clauses fréquentes. Piège : sans partition, scans full table.
Optimisation : Partition et ZORDER
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from delta.tables import *
spark = SparkSession.builder \
.appName("Optimize") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
path = "s3a://datalake/sensors/"
dt = DeltaTable.forPath(spark, path)
# Repartitionner par date (évolutif)
spark.sql(f"""
ALTER TABLE sensors
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)
""")
# ZORDER sur colonnes queryées
spark.sql(f"OPTIMIZE sensors ZORDER BY (id, device_type)")
# ANALYZE pour stats
spark.sql("ANALYZE TABLE sensors COMPUTE STATISTICS")
print("Table optimisée.")
spark.stop()Active auto-optimize pour compacts auto. OPTIMIZE ZORDER clusterise data pour predicates pushdown (10-100x faster). ANALYZE met à jour stats pour planificateur. Exécutez périodiquement via cron/Airflow. Piège : OPTIMIZE coûteux sur tables live.
Bonnes pratiques
- Toujours partitionner par high-cardinality comme date/region pour prune scans.
- Activez Liquid Clustering (Delta 3+) pour auto-ZORDER dynamique.
- Utilisez Unity Catalog (Databricks) pour gouvernance multi-tenant.
- Intégrez Great Expectations pour data quality checks post-ingest.
- Monitorez avec Delta Lake metrics (files, churn) via Prometheus.
Erreurs courantes à éviter
- Écrire sans schema enforcement : Activez
mergeSchema='false'en prod pour fail fast sur drifts. - Oublier VACUUM : Exécutez
VACUUM RETAIN 168 HOURSpour cleaner old files sans perdre time travel. - MERGE sans WHEN NOT MATCHED BY SOURCE : Risque data loss sur deletes.
- Scaler sans compaction : Petits files dégradent perf ; forcez
OPTIMIZEnightly.
Pour aller plus loin
Migrez vers AWS S3 + EMR en remplaçant l'endpoint MinIO par S3 réel. Explorez Apache Iceberg pour multi-engine support. Lisez la doc Delta Lake.
Découvrez nos formations Data Engineering Learni pour master Spark, Lakehouse et GenAI pipelines.