Introduction
A Data Lake is a centralized repository for storing raw data at scale without upfront schemas, unlike traditional data warehouses. In 2026, with the explosion of IoT, ML, and log data, Data Lakes are evolving into Lakehouses thanks to formats like Delta Lake, which add ACID transactions, schema enforcement, and time travel on S3 or object storage.
This advanced tutorial guides you through implementing a scalable local Data Lake with MinIO (S3-compatible), PySpark, and Delta Lake. You'll learn to ingest heterogeneous data, manage schema evolution, perform upsert merges, and optimize queries. Perfect for senior data engineers handling petabyte-scale pipelines. By the end, you'll have a working setup ready to scale to AWS EMR or Databricks. (128 words)
Prerequisites
- Docker and Docker Compose installed (version 24+)
- Python 3.10+ with pip
- Minimum 8 GB RAM (for local Spark)
- Advanced knowledge of Spark, SQL, and data modeling
- AWS CLI optional for S3 migration
Launch MinIO as Object Storage
version: '3.8'
services:
minio:
image: minio/minio:latest
ports:
- '9000:9000'
- '9001:9001'
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ':9001'
volumes:
- minio_data:/data
createbuckets:
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
/usr/bin/mc alias set myminio http://minio:9000 minioadmin minioadmin;
/usr/bin/mc mb myminio/datalake;
/usr/bin/mc policy set public myminio/datalake;
"
volumes:
minio_data:This docker-compose file launches MinIO on ports 9000 (API) and 9001 (console). An init service creates the public 'datalake' bucket. Access http://localhost:9001 with minioadmin/minioadmin. It perfectly simulates S3 for local testing, avoiding AWS costs.
Verify MinIO and Create an S3 Client
Start with docker compose up -d. Check the bucket via the console. Configure mc (MinIO Client) locally: mc alias set myminio http://localhost:9000 minioadmin minioadmin. This sets up the environment for Spark, which will treat MinIO like S3.
Install PySpark and Delta Lake
#!/bin/bash
pip install pyspark==3.5.1 delta-spark==3.2.0 pandas numpy
# Download sample data (IoT-like CSV)
curl -o sensor_data.csv https://raw.githubusercontent.com/databricks-tech-hub/spark-delta-lake-workshop/main/data/sensor_data.csv
# Configure Spark for MinIO (export vars)
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export SPARK_CONF='spark.hadoop.fs.s3a.endpoint http://localhost:9000'
export SPARK_CONF+=' spark.hadoop.fs.s3a.access.key minioadmin'
export SPARK_CONF+=' spark.hadoop.fs.s3a.secret.key minioadmin'
export SPARK_CONF+=' spark.hadoop.fs.s3a.path.style.access true'
export SPARK_CONF+=' spark.hadoop.fs.s3a.impl com.systems.integration.hive.HiveShimsS3A'
export SPARK_CONF+=' spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension'
export SPARK_CONF+=' spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog'Installs PySpark 3.5+ and Delta 3.2 for 2026 compatibility. Downloads a sample CSV (sensor temperatures). Exports environment variables so Spark sees MinIO as S3. Adds Delta extensions for Lakehouse features.
Ingest Initial Data into Delta
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import *
spark = SparkSession.builder \
.appName("DataLakeInit") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
# Lire CSV local
raw_df = spark.read.option("header", "true").csv("sensor_data.csv")
# Ajouter métadonnées (ingestion time)
enriched_df = raw_df.withColumn("ingestion_time", current_timestamp())
# Écrire en Delta sur MinIO
path = "s3a://datalake/sensors/"
enriched_df.write.format("delta").mode("overwrite").save(path)
spark.sql(f"CREATE TABLE IF NOT EXISTS sensors USING DELTA LOCATION '{path}'")
spark.stop()
print("Table Delta créée avec succès.")Reads the CSV, enriches it with ingestion timestamp (best practice for auditing). Writes in Delta format to s3a:// (MinIO). Creates a managed table for SQL queries. Use overwrite for initialization; prefer append in production. Avoids implicit schema pitfalls.
Read and Validate the Delta Table
Run spark-submit --packages io.delta:delta-spark_2.12:3.2.0 ingest_initial.py or python ingest_initial.py. Query with Spark SQL: spark.sql('SELECT * FROM sensors LIMIT 10').show(). Check the Delta history in MinIO: _delta_log/ contains transaction JSONs.
Time Travel: Access Past Versions
from pyspark.sql import SparkSession
from delta.tables import *
spark = SparkSession.builder \
.appName("TimeTravel") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
path = "s3a://datalake/sensors/"
# Lire version précédente (ex: 1h ago ou version 0)
old_df = spark.read.format("delta").option("versionAsOf", 0).load(path)
print("Version 0:")
old_df.show(5)
# Time travel par timestamp
spark.sql(f"""
SELECT * FROM sensors
VERSION AS OF TIMESTAMP '2026-01-01 10:00:00'
LIMIT 5
""").show()
spark.stop()Delta Lake enables native time travel via versionAsOf or timestamp, perfect for audits and rollbacks. Here, it reads version 0 (before updates). In production, integrate with CI/CD pipelines for automatic versioning. Pitfall: Versions get deleted if vacuum is enabled.
Schema Evolution and Enforcement
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("SchemaEvolution") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
path = "s3a://datalake/sensors/"
dt = DeltaTable.forPath(spark, path)
# Data avec nouveau champ (évolution)
new_data = spark.createDataFrame([
(1, 25.5, "2026-01-01 11:00:00", "new_sensor")
], ["id", "temperature", "ingestion_time", "device_type"])
# Merge avec autoMerge (évolue schéma)
new_data.write.format("delta").mode("append").option("mergeSchema", "true").save(path)
print("Schéma évolué :")
spark.sql(f"DESCRIBE sensors").show(truncate=False)
spark.stop()Enables mergeSchema for safe schema evolution during append/merge. Delta adds device_type without breaking existing data. Use autoMerge.enabled in production. Pitfall: Type conflicts (string vs int) require explicit resolution.
Perform Upserts with MERGE
MERGE is key for CDC (Change Data Capture) in Data Lakes. It handles upserts/deduplication on primary keys with complex conditions. Partition by date for scalability.
Advanced Upsert via MERGE
from pyspark.sql import SparkSession
from delta.tables import *
spark = SparkSession.builder \
.appName("MergeUpsert") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
path = "s3a://datalake/sensors/"
# Source updates (avec duplicata)
updates_df = spark.createDataFrame([
(1, 26.0, "2026-01-01 12:00:00", "new_sensor"),
(2, 22.1, "2026-01-01 12:00:00", "old_sensor")
], ["id", "temperature", "ingestion_time", "device_type"])
# MERGE upsert
spark.sql(f"""
MERGE INTO sensors t
USING (SELECT * FROM {updates_df.createOrReplaceTempView('updates')})
ON t.id = updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")
spark.sql("SELECT * FROM sensors WHERE id=1").show()
spark.stop()MERGE matches on id, updates if existing, inserts otherwise. Supports DELETE/WHEN NOT MATCHED BY SOURCE. Ideal for Kafka streams. Optimize with ZORDER BY on frequent clauses. Pitfall: Full table scans without partitioning.
Optimization: Partitioning and ZORDER
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from delta.tables import *
spark = SparkSession.builder \
.appName("Optimize") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.getOrCreate()
path = "s3a://datalake/sensors/"
dt = DeltaTable.forPath(spark, path)
# Repartitionner par date (évolutif)
spark.sql(f"""
ALTER TABLE sensors
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)
""")
# ZORDER sur colonnes queryées
spark.sql(f"OPTIMIZE sensors ZORDER BY (id, device_type)")
# ANALYZE pour stats
spark.sql("ANALYZE TABLE sensors COMPUTE STATISTICS")
print("Table optimisée.")
spark.stop()Enables auto-optimize for automatic compaction. OPTIMIZE ZORDER clusters data for predicate pushdown (10-100x faster). ANALYZE updates stats for the optimizer. Run periodically via cron/Airflow. Pitfall: OPTIMIZE is costly on live tables.
Best Practices
- Always partition by high-cardinality fields like date/region to prune scans.
- Enable Liquid Clustering (Delta 3+) for dynamic auto-ZORDER.
- Use Unity Catalog (Databricks) for multi-tenant governance.
- Integrate Great Expectations for post-ingest data quality checks.
- Monitor with Delta Lake metrics (files, churn) via Prometheus.
Common Errors to Avoid
- Writing without schema enforcement: Enable
mergeSchema='false'in production to fail fast on drifts. - Forgetting VACUUM: Run
VACUUM RETAIN 168 HOURSto clean old files without losing time travel. - MERGE without WHEN NOT MATCHED BY SOURCE: Risks data loss on deletes.
- Scaling without compaction: Small files degrade performance; force
OPTIMIZEnightly.
Next Steps
Migrate to AWS S3 + EMR by replacing the MinIO endpoint with real S3. Explore Apache Iceberg for multi-engine support. Read the Delta Lake docs.
Check out our Data Engineering trainings to master Spark, Lakehouse, and GenAI pipelines.