Skip to content
Learni
View all tutorials
Data Engineering

How to Use Apache Iceberg with PySpark in 2026

Lire en français

Introduction

Apache Iceberg is an open table format designed for huge volumes of analytical data in data lakes. Unlike traditional formats like Parquet alone, Iceberg provides ACID transactions, schema evolution without rewriting, time travel for querying historical versions, and fine-grained partitioning. In 2026, with the rise of open data lake formats (Delta Lake, Hudi, Iceberg), it's essential for data engineers avoiding vendor lock-in.

Why adopt it? Think of a data lake as a massive ledger: Iceberg adds indexed pages, temporal annotations, and automatic validations, making operations reliable even under massive loads (petabytes). This beginner tutorial takes you from installation to advanced cases like snapshots, with 100% functional code on your local machine. By the end, you'll master Iceberg for Spark, ready for production on S3 or ADLS.

Prerequisites

  • Python 3.10 or higher installed.
  • pip and ~2 GB of free disk space.
  • Basic knowledge of Python and SQL (no prior Spark experience required).
  • Internet access to download the Iceberg JARs (just once).

Install Dependencies

setup.sh
#!/bin/bash
mkdir iceberg-tutorial && cd iceberg-tutorial
pip install pyspark==3.5.1 pandas==2.2.2
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar
mkdir warehouse
mkdir metastore
echo "Installation terminée. Exécutez les scripts Python suivants dans ce dossier."

This script sets up the project, installs PySpark 3.5.1 (compatible with Iceberg 1.6.1) and Pandas for data generation. It downloads the essential Iceberg runtime JAR—without it, Spark ignores the extensions. It creates the warehouse (table storage) and metastore (local metadata) directories. Run it with bash setup.sh; avoid proxies that block wget.

Verify the Installation

Run python -c "import pyspark; print(pyspark.__version__)" to confirm PySpark. Check for the JAR with ls *.jar. These steps ensure a local environment without Hadoop or a cluster—perfect for getting started. Next: configure Spark for Iceberg.

Initialize the Iceberg SparkSession

01_init_spark.py
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergInit") \
    .master("local[*]") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
    .config("spark.jars", "./iceberg-spark-runtime-3.5_2.12-1.6.1.jar") \
    .getOrCreate()

spark.sql("SHOW NAMESPACES IN iceberg").show()
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.db").show()
spark.stop()

This script creates a SparkSession configured for Iceberg using a local Hadoop catalog (no Hive/S3 needed). The configs enable Iceberg extensions and point to the warehouse. Run python 01_init_spark.py: you'll see the 'db' namespace created. Pitfall: without spark.jars, Iceberg fails silently.

Create Your First Iceberg Table

With the session ready, let's define a sample table with ID, data, and timestamp. Iceberg automatically manages metadata (snapshots, manifests). This table already supports schema evolution: add a column later without downtime.

Create and Describe the Table

02_create_table.py
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergCreate") \
    .master("local[*]") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
    .config("spark.jars", "./iceberg-spark-runtime-3.5_2.12-1.6.1.jar") \
    .getOrCreate()

spark.sql("""
CREATE OR REPLACE TABLE iceberg.db.sample (
    id bigint,
    data string,
    event_time timestamp
) USING iceberg
PARTITIONED BY (days(event_time))
""")

spark.sql("DESCRIBE iceberg.db.sample").show(truncate=False)
spark.sql("SELECT * FROM iceberg.db.sample LIMIT 5").show()  # Vide pour l'instant
spark.stop()

This code creates a table partitioned by day on event_time for optimized scans. USING iceberg enables the format; CREATE OR REPLACE avoids duplicates. Run it: DESCRIBE shows the schema and partitions. Benefit: partitions are dynamic, no manual rewrites needed.

Insert and Query Data

We'll generate realistic data with Pandas, convert to a Spark DataFrame, and use MERGE INTO for idempotent inserts (ACID). Iceberg shines here: no global locks like in Hive.

Insert Data

03_insert_data.py
from pyspark.sql import SparkSession
import pandas as pd
from datetime import datetime, timedelta
import random

spark = SparkSession.builder \
    .appName("IcebergInsert") \
    .master("local[*]") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
    .config("spark.jars", "./iceberg-spark-runtime-3.5_2.12-1.6.1.jar") \
    .getOrCreate()

# Generate data
now = datetime.now()
data = [(i, f"data-{i}", now - timedelta(days=random.randint(0,30))) for i in range(100)]
pdf = pd.DataFrame(data, columns=["id", "data", "event_time"])
df = spark.createDataFrame(pdf)
df.write.format("iceberg").mode("append").save("iceberg.db.sample")

spark.sql("SELECT COUNT(*) FROM iceberg.db.sample").show()
spark.sql("SELECT * FROM iceberg.db.sample LIMIT 10").show()
spark.stop()

We generate 100 rows with Pandas for simplicity, then append via DataFrame (equivalent to INSERT). append mode creates a new snapshot. Run it: 100 rows inserted. Pitfall: without format('iceberg'), it's standard Parquet without ACID.

Time Travel and Maintenance

04_time_travel.py
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergTimeTravel") \
    .master("local[*]") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
    .config("spark.jars", "./iceberg-spark-runtime-3.5_2.12-1.6.1.jar") \
    .getOrCreate()

# Current snapshot
spark.sql("SELECT * FROM iceberg.db.sample LIMIT 5").show()

# Time travel: latest snapshot
latest_snapshot = spark.sql("SELECT current_snapshot(iceberg.db.sample) as snap").collect()[0][0]
print(f"Snapshot actuel: {latest_snapshot}")

# Read previous version (simulate by deleting then rollback, but here query as of)
spark.sql(f"SELECT COUNT(*) FROM iceberg.db.sample VERSION AS OF {latest_snapshot}").show()

# Maintenance: expire old snapshots
spark.sql("CALL iceberg.system.expire_snapshots('db.sample', TIMESTAMP '2026-01-01 00:00:00')").show()
spark.stop()

This script demonstrates time travel with VERSION AS OF for historical queries. Get snapshots via system functions. CALL expire cleans up old files (saves storage). Run after inserts: compare counts. Essential in production to prevent manifest explosion.

Best Practices

  • Always partition: Use days(hours(event_time)) for efficient temporal queries.
  • Separate catalogs: 'iceberg' vs 'spark_catalog' avoids Hive conflicts.
  • Use MERGE INTO for upserts: MERGE INTO ... ON id WHEN MATCHED THEN UPDATE for CDC.
  • Monitor snapshots: Run SELECT * FROM iceberg.db.sample.history regularly.
  • Evolve schema: ALTER TABLE ADD COLUMN new_col string without rewrites.

Common Errors to Avoid

  • Missing JAR: Spark ignores Iceberg; check logs for 'extensions not found'.
  • Warehouse not created: Run mkdir warehouse; otherwise 'path does not exist'.
  • Wrong catalog name: Stick to 'iceberg'; don't use 'spark_catalog' which overrides defaults.
  • No local master: Add .master('local[*]') for single-node execution.

Next Steps

Go to production: Integrate S3 with spark.sql.catalog.iceberg_s3.type=rest and AWS credentials. Read the official Iceberg docs. Explore Trino/Flink for federated queries. Check out our Learni Data Engineering courses to master Iceberg in clusters.