Introduction
En 2026, les Data Lakes ne sont plus de simples dépôts de données brutes sur S3, mais des Lakehouses transactionnels grâce à des formats comme Delta Lake. Contrairement aux Data Warehouses traditionnels (Snowflake, BigQuery), un Data Lake excelle dans le stockage scalable de données structurées, semi-structurées et non-structurées à coût minimal, avec schema-on-read pour une flexibilité maximale.
Ce tutoriel expert vous guide pas à pas pour implémenter un Data Lake sur AWS S3 avec Delta Lake (ACID, time travel, schema enforcement), PySpark pour l'ingestion et les optimisations (partitioning, Z-ordering), et intégration Glue/Athena pour les queries SQL. Idéal pour gérer des pétaoctets : imaginez ingérer 1 To/jour de logs IoT, les merger en temps réel, et queryer en sous-seconde.
Pourquoi c'est crucial ? Les entreprises perdent 15% de revenus par mauvaise gouvernance des données (Gartner 2025). Avec Delta Lake, vous gagnez en fiabilité sans sacrifier la scalabilité. Prêt à bookmarker ce guide ? (128 mots)
Prérequis
- Compte AWS avec droits IAM (S3FullAccess, Glue, EMR)
- Python 3.11+ et Java 11 (pour Spark)
- PySpark 3.5.0+ installé :
pip install pyspark delta-spark==3.2.0 - AWS CLI configuré (
aws configure) - 8 Go RAM minimum pour tests locaux ; EMR pour prod
- Connaissances avancées en Spark SQL et partitioning
Créer le bucket S3 et IAM role
#!/bin/bash
# Variables
BUCKET_NAME="mon-data-lake-$(date +%s)"
ROLE_NAME="delta-lake-emr-role"
# Créer bucket S3 (région eu-west-1 pour low cost)
aws s3 mb s3://$BUCKET_NAME --region eu-west-1
# Politiques IAM pour EMR/Glue (JSON inline pour simplicité)
cat > emr-policy.json << EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::$BUCKET_NAME",
"arn:aws:s3:::$BUCKET_NAME/*"
]
}
]
}
EOF
aws iam put-role-policy --role-name EMR_DefaultRole --policy-name DeltaLakePolicy --policy-document file://emr-policy.json
echo "Bucket: s3://$BUCKET_NAME prêt pour Delta Lake."Ce script bash crée un bucket S3 dédié et attache une politique IAM minimale pour EMR/Glue. Utilisez un nom unique avec timestamp pour éviter les conflits globaux. Exécutez-le en prod avec --profile prod pour isolation ; piège : oublier ListBucket bloque les crawlers Glue.
Configurer l'environnement PySpark
Maintenant, préparez PySpark avec Delta Lake. Delta ajoute ACID sur S3 (pas natif), time travel et optimisations comme OPTIMIZE. Testez localement avant EMR : Spark lit S3 via Hadoop FS, configurez spark.hadoop.fs.s3a.aws.credentials.provider pour IAM roles.
Téléchargez un dataset exemple : logs JSON de 1M lignes (simulate IoT sensors).
Script d'ingestion initiale en Delta
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import *
from delta import configure_spark_with_delta_pip
import json
# Config Spark avec Delta (copier-collable local/EMR)
builder = SparkSession.builder \
.appName("DataLakeIngestion") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.access.key", "VOTRE_ACCESS_KEY") \
.config("spark.hadoop.fs.s3a.secret.key", "VOTRE_SECRET_KEY") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.databricks.delta.optimizeWrite.enabled", "true")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Dataset exemple : générer 100k logs IoT JSON
schema = "timestamp TIMESTAMP, device_id STRING, temperature DOUBLE, humidity DOUBLE, location STRUCT<x:DOUBLE,y:DOUBLE>"
data = [{"timestamp": "2026-01-01 00:00:00", "device_id": f"dev_{i}", "temperature": 22.5 + i%10/10, "humidity": 60.0, "location": {"x": i%100, "y": i%50}} for i in range(100000)]
df = spark.createDataFrame(data, schema)
# Path Delta table partitionnée par date/device
path = "s3://mon-data-lake-$(date +%s)/iot_logs"
df.write \
.format("delta") \
.partitionBy("date(device_id)", "device_id") \
.mode("overwrite") \
.option("delta.autoOptimize.optimizeWrite", "true") \
.save(path)
spark.sql(f"CREATE TABLE IF NOT EXISTS iot_logs USING DELTA LOCATION '{path}'")
spark.stop()
print("Ingestion Delta initiale terminée : 100k rows partitionnées.")Ce script PySpark ingère 100k rows IoT en table Delta partitionnée par date et device_id, activant auto-optimize pour compacter les petits fichiers. Remplacez credentials par IAM role en prod. Piège : sans partitioning, scans S3 explosent en coût ; schema enforcement évite les corruptions futures.
Appliquer les merges ACID et time travel
Delta Lake brille avec MERGE pour upserts incrémentaux (idempotent), contrairement à Parquet brut. Time travel permet AS OF VERSION pour audits. Partitionnez sur high-cardinality cols comme date pour prune 90% des scans.
Upsert avec MERGE et optimisation Z-order
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from delta import configure_spark_with_delta_pip
builder = SparkSession.builder \
.appName("DeltaMerge") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.access.key", "VOTRE_ACCESS_KEY") \
.config("spark.hadoop.fs.s3a.secret.key", "VOTRE_SECRET_KEY") \
.config("spark.databricks.delta.optimizeWrite.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
path = "s3://mon-data-lake-$(date +%s)/iot_logs"
# Nouveaux updates : 10k rows avec changements
temperature_updates = [{"device_id": f"dev_{i%100}", "temperature": 25.0 + i%5, "humidity": 65.0, "timestamp": "2026-01-02 12:00:00"} for i in range(10000)]
df_updates = spark.createDataFrame(temperature_updates)
# MERGE ACID : update if match device_id + timestamp proche
delta_table = DeltaTable.forPath(spark, path)
delta_table.alias("target").merge(
df_updates.alias("source"),
"target.device_id = source.device_id AND abs(hours_between(target.timestamp, source.timestamp)) < 24"
).whenMatchedUpdate(set={
"temperature": "source.temperature",
"humidity": "source.humidity"
}).whenNotMatchedInsert(values={
"device_id": "source.device_id",
"temperature": "source.temperature",
"humidity": "source.humidity",
"timestamp": "source.timestamp"
}).execute()
# Z-ordering sur temperature (col queryée souvent) + OPTIMIZE
spark.sql(f"OPTIMIZE {path} ZORDER BY (temperature)")
# Time travel query : version avant merge
spark.sql(f"""
SELECT * FROM {path} VERSION AS OF 0
WHERE device_id = 'dev_1' LIMIT 5
""").show()
spark.stop()
print("MERGE + Z-order appliqués : queries 5x plus rapides.")Ce code démontre un MERGE upsert sur Delta (ACID garanti), suivi de Z-ordering pour colocaliser données chaudes (température queryée). Time travel via VERSION AS OF pour audits. Piège : MERGE sans condition stricte cause OOM ; testez sur petite échelle d'abord.
Query SQL optimisée avec Spark et schema evolution
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
builder = SparkSession.builder \
.appName("DeltaQuery") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.access.key", "VOTRE_ACCESS_KEY") \
.config("spark.hadoop.fs.s3a.secret.key", "VOTRE_SECRET_KEY") \
.config("spark.sql.files.maxPartitionBytes", "134217728") # 128MB
spark = configure_spark_with_delta_pip(builder).getOrCreate()
path = "s3://mon-data-lake-$(date +%s)/iot_logs"
# Schema evolution : ajouter col 'alert' si >30°C
spark.sql(f"""
MERGE INTO {path} AS target
USING (SELECT device_id, CASE WHEN temperature > 30 THEN true ELSE false END as alert FROM (VALUES ('dev_999', 32.0))) AS source(device_id, temperature)
ON target.device_id = source.device_id
WHEN MATCHED THEN UPDATE SET alert = source.alert
WHEN NOT MATCHED THEN INSERT (device_id, alert) VALUES (source.device_id, source.alert)
""")
# Query avancée : aggregate partition-pruned + time travel
result = spark.sql(f"""
SELECT
device_id,
AVG(temperature) as avg_temp,
COUNT(*) as readings
FROM {path}
WHERE date(timestamp) = '2026-01-02'
AND device_id LIKE 'dev_%'
TIMESTAMP AS OF VERSION AS OF 1
GROUP BY device_id
HAVING avg_temp > 24.0
ORDER BY avg_temp DESC
LIMIT 10
"")
result.explain() # Vérifiez pruning
result.show()
spark.stop()
print("Schema évolué + query optimisée exécutée.")Ici, schema evolution via MERGE ajoute 'alert' sans downtime. Query prune partitions (date), utilise AQE et time travel. explain() révèle si Z-order prune 95% data. Piège : maxPartitionBytes trop haut = skew ; ajustez à 128MB pour S3.
Intégrer Glue Crawler pour Athena
Pour queries SQL sans Spark, configurez Glue Crawler sur Delta tables. Il infère schema et partition keys automatiquement.
Déployer Glue Crawler YAML
{
"Name": "delta-iot-crawler",
"Role": "arn:aws:iam::123456789012:role/AWSGlueServiceRole-Default",
"DatabaseName": "data_lake_db",
"Description": "Crawler pour Delta Lake IoT sur S3",
"Targets": {
"S3Targets": [
{
"Path": "s3://mon-data-lake-1700000000/iot_logs",
"Exclusions": ["**/*.json", "**/_delta_log/*"]
}
]
},
"SchemaChangePolicy": {
"UpdateBehavior": "UPDATE_IN_DATABASE",
"DeleteBehavior": "DEPRECATE_IN_DATABASE"
},
"Configuration": "{\"Version\":1.0,\"CrawlerOutput\":{\"Partitions\":{\"AddOrUpdateBehavior\":\"InheritFromTable\",\"DeleteFromDatabaseBehavior\":\"DEPRECATE_IN_DATABASE\"},\"Tables\":{\"AddOrUpdateBehavior\":\"MergeNewColumns\",\"DeleteFromDatabaseBehavior\":\"DEPRECATE_IN_DATABASE\"}},\"Grouping\":{\"TableGroupingPolicy\":\"CombineCompatibleSchemas\"}}",
"CrawlerSecurityConfiguration": "delta-security"
}Ce JSON AWS CLI-ready (aws glue create-crawler --cli-input-json file://glue-crawler-config.json) crawle Delta, gérant schema changes. Exclut _delta_log pour sécurité. Piège : sans SchemaChangePolicy, breaks sur evolution ; testez avec aws glue start-crawler.
Query Athena sur Delta via Glue
-- Exécuter dans Athena Console après crawl
-- Crée DB si besoin
CREATE DATABASE IF NOT EXISTS data_lake_db;
-- Query prunee sur table Glue/Delta
SELECT
device_id,
AVG(temperature) as avg_temp,
MAX(timestamp) as last_reading
FROM "data_lake_db"."iot_logs"
WHERE year(timestamp) = 2026
AND month(timestamp) = 1
AND temperature > 20.0
GROUP BY device_id
HAVING COUNT(*) > 100
ORDER BY avg_temp DESC
LIMIT 20;
-- Time travel simulé via Glue versions (ou CTAS)
CREATE TABLE iot_logs_v0 AS
SELECT * FROM "data_lake_db"."iot_logs"
WHERE version() = 0;Athena query Delta via Glue Catalog, prune sur partitions Hive-style (year/month). CTAS pour snapshots time travel. Coût : $5/TB scanné ; Z-order réduit à $0.5. Piège : oublier partitions dans WHERE = full scan ruineux.
Bonnes pratiques
- Partitionnez intelligemment : date + high-cardinality (device_id) ; évitez >10^4 partitions/table pour éviter small files.
- Z-order + VACUUM régulières :
OPTIMIZE ZORDER BY (top_queries_cols)hebdo ;VACUUM 168hretient 7j pour time travel. - Gouvernance Unity Catalog : Centralisez metadata ; enforce PII masking via Liquid Clustering (Delta 3.1+).
- Ingestion streaming : Kafka -> Spark Structured Streaming en Delta pour <1s latency.
- Coût S3 : Requêtes IA : activez S3 Intelligent-Tiering ; monitor avec Cost Explorer.
Erreurs courantes à éviter
- Small files explosion : Sans auto-optimize, 1M fichiers = OOM sur query ; toujours .option("delta.autoOptimize.optimizeWrite", "true").
- CORS/credentials S3 : Spark lit échoue sans fs.s3a impl ; utilisez IAM roles pas keys statiques.
- Schema drift ignoré : Delta merge casse sans evolution ; testez
spark.sql("SELECT * FROM table")post-ingest. - No pruning : Query sans WHERE date = 100x lent ; analysez .explain() toujours.
Pour aller plus loin
- Docs officielles : Delta Lake 3.2
- Apache Iceberg alternative pour multi-engine (Trino+Spark)
- Formation experte : Data Engineering Lakehouse - Learni Group
- Repo GitHub exemple : fork ce tuto et scalez sur EMR Serverless.