Skip to content
Learni
View all tutorials
Data Engineering

How to Implement a Data Lake with Delta Lake in 2026

Lire en français

Introduction

A Data Lake is a centralized repository for storing raw data at scale without upfront schemas, unlike traditional data warehouses. In 2026, with the explosion of IoT, ML, and log data, Data Lakes are evolving into Lakehouses thanks to formats like Delta Lake, which add ACID transactions, schema enforcement, and time travel on S3 or object storage.

This advanced tutorial guides you through implementing a scalable local Data Lake with MinIO (S3-compatible), PySpark, and Delta Lake. You'll learn to ingest heterogeneous data, manage schema evolution, perform upsert merges, and optimize queries. Perfect for senior data engineers handling petabyte-scale pipelines. By the end, you'll have a working setup ready to scale to AWS EMR or Databricks. (128 words)

Prerequisites

  • Docker and Docker Compose installed (version 24+)
  • Python 3.10+ with pip
  • Minimum 8 GB RAM (for local Spark)
  • Advanced knowledge of Spark, SQL, and data modeling
  • AWS CLI optional for S3 migration

Launch MinIO as Object Storage

docker-compose.yml
version: '3.8'
services:
  minio:
    image: minio/minio:latest
    ports:
      - '9000:9000'
      - '9001:9001'
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    command: server /data --console-address ':9001'
    volumes:
      - minio_data:/data
  createbuckets:
    image: minio/mc
    depends_on:
      - minio
    entrypoint: >
      /bin/sh -c "
      /usr/bin/mc alias set myminio http://minio:9000 minioadmin minioadmin;
      /usr/bin/mc mb myminio/datalake;
      /usr/bin/mc policy set public myminio/datalake;
      "
volumes:
  minio_data:

This docker-compose file launches MinIO on ports 9000 (API) and 9001 (console). An init service creates the public 'datalake' bucket. Access http://localhost:9001 with minioadmin/minioadmin. It perfectly simulates S3 for local testing, avoiding AWS costs.

Verify MinIO and Create an S3 Client

Start with docker compose up -d. Check the bucket via the console. Configure mc (MinIO Client) locally: mc alias set myminio http://localhost:9000 minioadmin minioadmin. This sets up the environment for Spark, which will treat MinIO like S3.

Install PySpark and Delta Lake

requirements.sh
#!/bin/bash
pip install pyspark==3.5.1 delta-spark==3.2.0 pandas numpy

# Download sample data (IoT-like CSV)
curl -o sensor_data.csv https://raw.githubusercontent.com/databricks-tech-hub/spark-delta-lake-workshop/main/data/sensor_data.csv

# Configure Spark for MinIO (export vars)
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin

export SPARK_CONF='spark.hadoop.fs.s3a.endpoint http://localhost:9000'
export SPARK_CONF+=' spark.hadoop.fs.s3a.access.key minioadmin'
export SPARK_CONF+=' spark.hadoop.fs.s3a.secret.key minioadmin'
export SPARK_CONF+=' spark.hadoop.fs.s3a.path.style.access true'
export SPARK_CONF+=' spark.hadoop.fs.s3a.impl com.systems.integration.hive.HiveShimsS3A'
export SPARK_CONF+=' spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension'
export SPARK_CONF+=' spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog'

Installs PySpark 3.5+ and Delta 3.2 for 2026 compatibility. Downloads a sample CSV (sensor temperatures). Exports environment variables so Spark sees MinIO as S3. Adds Delta extensions for Lakehouse features.

Ingest Initial Data into Delta

ingest_initial.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import *

spark = SparkSession.builder \
    .appName("DataLakeInit") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
    .getOrCreate()

# Lire CSV local
raw_df = spark.read.option("header", "true").csv("sensor_data.csv")

# Ajouter métadonnées (ingestion time)
enriched_df = raw_df.withColumn("ingestion_time", current_timestamp())

# Écrire en Delta sur MinIO
path = "s3a://datalake/sensors/"
enriched_df.write.format("delta").mode("overwrite").save(path)

spark.sql(f"CREATE TABLE IF NOT EXISTS sensors USING DELTA LOCATION '{path}'")

spark.stop()
print("Table Delta créée avec succès.")

Reads the CSV, enriches it with ingestion timestamp (best practice for auditing). Writes in Delta format to s3a:// (MinIO). Creates a managed table for SQL queries. Use overwrite for initialization; prefer append in production. Avoids implicit schema pitfalls.

Read and Validate the Delta Table

Run spark-submit --packages io.delta:delta-spark_2.12:3.2.0 ingest_initial.py or python ingest_initial.py. Query with Spark SQL: spark.sql('SELECT * FROM sensors LIMIT 10').show(). Check the Delta history in MinIO: _delta_log/ contains transaction JSONs.

Time Travel: Access Past Versions

time_travel.py
from pyspark.sql import SparkSession
from delta.tables import *

spark = SparkSession.builder \
    .appName("TimeTravel") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
    .getOrCreate()

path = "s3a://datalake/sensors/"

# Lire version précédente (ex: 1h ago ou version 0)
old_df = spark.read.format("delta").option("versionAsOf", 0).load(path)
print("Version 0:")
old_df.show(5)

# Time travel par timestamp
spark.sql(f"""
  SELECT * FROM sensors
  VERSION AS OF TIMESTAMP '2026-01-01 10:00:00'
  LIMIT 5
""").show()

spark.stop()

Delta Lake enables native time travel via versionAsOf or timestamp, perfect for audits and rollbacks. Here, it reads version 0 (before updates). In production, integrate with CI/CD pipelines for automatic versioning. Pitfall: Versions get deleted if vacuum is enabled.

Schema Evolution and Enforcement

schema_evolution.py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("SchemaEvolution") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
    .getOrCreate()

path = "s3a://datalake/sensors/"

dt = DeltaTable.forPath(spark, path)

# Data avec nouveau champ (évolution)
new_data = spark.createDataFrame([
    (1, 25.5, "2026-01-01 11:00:00", "new_sensor")
], ["id", "temperature", "ingestion_time", "device_type"])

# Merge avec autoMerge (évolue schéma)
new_data.write.format("delta").mode("append").option("mergeSchema", "true").save(path)

print("Schéma évolué :")
spark.sql(f"DESCRIBE sensors").show(truncate=False)

spark.stop()

Enables mergeSchema for safe schema evolution during append/merge. Delta adds device_type without breaking existing data. Use autoMerge.enabled in production. Pitfall: Type conflicts (string vs int) require explicit resolution.

Perform Upserts with MERGE

MERGE is key for CDC (Change Data Capture) in Data Lakes. It handles upserts/deduplication on primary keys with complex conditions. Partition by date for scalability.

Advanced Upsert via MERGE

merge_upsert.py
from pyspark.sql import SparkSession
from delta.tables import *

spark = SparkSession.builder \
    .appName("MergeUpsert") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
    .getOrCreate()

path = "s3a://datalake/sensors/"

# Source updates (avec duplicata)
updates_df = spark.createDataFrame([
    (1, 26.0, "2026-01-01 12:00:00", "new_sensor"),
    (2, 22.1, "2026-01-01 12:00:00", "old_sensor")
], ["id", "temperature", "ingestion_time", "device_type"])

# MERGE upsert
spark.sql(f"""
  MERGE INTO sensors t
  USING (SELECT * FROM {updates_df.createOrReplaceTempView('updates')})
  ON t.id = updates.id
  WHEN MATCHED THEN
    UPDATE SET *
  WHEN NOT MATCHED THEN
    INSERT *
""")

spark.sql("SELECT * FROM sensors WHERE id=1").show()
spark.stop()

MERGE matches on id, updates if existing, inserts otherwise. Supports DELETE/WHEN NOT MATCHED BY SOURCE. Ideal for Kafka streams. Optimize with ZORDER BY on frequent clauses. Pitfall: Full table scans without partitioning.

Optimization: Partitioning and ZORDER

optimize.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from delta.tables import *

spark = SparkSession.builder \
    .appName("Optimize") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
    .getOrCreate()

path = "s3a://datalake/sensors/"

dt = DeltaTable.forPath(spark, path)

# Repartitionner par date (évolutif)
spark.sql(f"""
  ALTER TABLE sensors
  SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)
""")

# ZORDER sur colonnes queryées
spark.sql(f"OPTIMIZE sensors ZORDER BY (id, device_type)")

# ANALYZE pour stats
spark.sql("ANALYZE TABLE sensors COMPUTE STATISTICS")

print("Table optimisée.")
spark.stop()

Enables auto-optimize for automatic compaction. OPTIMIZE ZORDER clusters data for predicate pushdown (10-100x faster). ANALYZE updates stats for the optimizer. Run periodically via cron/Airflow. Pitfall: OPTIMIZE is costly on live tables.

Best Practices

  • Always partition by high-cardinality fields like date/region to prune scans.
  • Enable Liquid Clustering (Delta 3+) for dynamic auto-ZORDER.
  • Use Unity Catalog (Databricks) for multi-tenant governance.
  • Integrate Great Expectations for post-ingest data quality checks.
  • Monitor with Delta Lake metrics (files, churn) via Prometheus.

Common Errors to Avoid

  • Writing without schema enforcement: Enable mergeSchema='false' in production to fail fast on drifts.
  • Forgetting VACUUM: Run VACUUM RETAIN 168 HOURS to clean old files without losing time travel.
  • MERGE without WHEN NOT MATCHED BY SOURCE: Risks data loss on deletes.
  • Scaling without compaction: Small files degrade performance; force OPTIMIZE nightly.

Next Steps

Migrate to AWS S3 + EMR by replacing the MinIO endpoint with real S3. Explore Apache Iceberg for multi-engine support. Read the Delta Lake docs.

Check out our Data Engineering trainings to master Spark, Lakehouse, and GenAI pipelines.