Introduction
Apache Iceberg est un format de table ouvert conçu pour les grands volumes de données analytiques dans les data lakes. Contrairement aux formats traditionnels comme Parquet seul, Iceberg apporte des transactions ACID, une évolution de schéma sans réécriture, le time travel pour interroger des versions historiques, et une gestion fine des partitions. En 2026, avec l'essor des data lakes open formats (Delta Lake, Hudi, Iceberg), il est essentiel pour les data engineers évitant les verrouillages vendor.
Pourquoi l'adopter ? Imaginez un data lake comme un grand livre de comptes : Iceberg ajoute des pages indexées, des annotations temporelles et des validations automatiques, rendant les opérations fiables même sous charges massives (Pétaoctets). Ce tutoriel beginner vous guide de l'installation à des cas avancés comme les snapshots, avec codes 100% fonctionnels sur machine locale. À la fin, vous maîtriserez Iceberg pour Spark, prêt pour production sur S3 ou ADLS.
Prérequis
- Python 3.10 ou supérieur installé.
- pip et ~2 Go d'espace disque libre.
- Connaissances de base en Python et SQL (pas de Spark préalable requis).
- Accès internet pour télécharger les JARs Iceberg (une seule fois).
Installer les dépendances
#!/bin/bash
mkdir iceberg-tutorial && cd iceberg-tutorial
pip install pyspark==3.5.1 pandas==2.2.2
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar
mkdir warehouse
mkdir metastore
echo "Installation terminée. Exécutez les scripts Python suivants dans ce dossier."Ce script initialise le projet, installe PySpark 3.5.1 (compatible Iceberg 1.6.1) et Pandas pour la génération de données. Il télécharge le JAR runtime essentiel d'Iceberg, sans lequel Spark ignore les extensions. Créez les dossiers warehouse (stockage tables) et metastore (métadonnées locales). Lancez-le avec bash setup.sh ; évitez les proxies bloquant wget.
Vérifier l'installation
Exécutez python -c "import pyspark; print(pyspark.__version__)" pour confirmer PySpark. Le JAR doit être présent : ls *.jar. Ces étapes assurent un environnement local sans Hadoop ou cluster, idéal pour débuter. Prochaine étape : configurer Spark pour Iceberg.
Initialiser la SparkSession Iceberg
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergInit") \
.master("local[*]") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
.config("spark.jars", "./iceberg-spark-runtime-3.5_2.12-1.6.1.jar") \
.getOrCreate()
spark.sql("SHOW NAMESPACES IN iceberg").show()
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.db").show()
spark.stop()Ce script crée une SparkSession configurée pour Iceberg avec un catalogue Hadoop local (pas de Hive/S3). Les configs activent les extensions Iceberg et pointent vers le warehouse. Exécutez python 01_init_spark.py : vous verrez le namespace 'db' créé. Piège : sans spark.jars, Iceberg échoue silencieusement.
Créer votre première table Iceberg
Avec la session prête, définissons une table sample avec ID, données et timestamp. Iceberg gère automatiquement les métadonnées (snapshots, manifests). Cette table supporte déjà l'évolution de schéma : ajoutez une colonne plus tard sans downtime.
Créer et décrire la table
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergCreate") \
.master("local[*]") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
.config("spark.jars", "./iceberg-spark-runtime-3.5_2.12-1.6.1.jar") \
.getOrCreate()
spark.sql("""
CREATE OR REPLACE TABLE iceberg.db.sample (
id bigint,
data string,
event_time timestamp
) USING iceberg
PARTITIONED BY (days(event_time))
""")
spark.sql("DESCRIBE iceberg.db.sample").show(truncate=False)
spark.sql("SELECT * FROM iceberg.db.sample LIMIT 5").show() # Vide pour l'instant
spark.stop()Ce code crée une table partitionnée par jour sur event_time, optimisant les scans. 'USING iceberg' active le format ; 'CREATE OR REPLACE' évite les doublons. Exécutez-le : DESCRIBE montre le schéma et les partitions. Avantage : les partitions sont dynamiques, sans rewrite manuel.
Insérer et interroger des données
Générons des données réalistes avec Pandas, convertissons en DataFrame Spark et utilisons MERGE INTO pour des inserts idempotents (ACID). Iceberg excelle ici : pas de verrou global comme Hive.
Insérer des données
from pyspark.sql import SparkSession
import pandas as pd
from datetime import datetime, timedelta
import random
spark = SparkSession.builder \
.appName("IcebergInsert") \
.master("local[*]") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
.config("spark.jars", "./iceberg-spark-runtime-3.5_2.12-1.6.1.jar") \
.getOrCreate()
# Générer données
now = datetime.now()
data = [(i, f"data-{i}", now - timedelta(days=random.randint(0,30))) for i in range(100)]
pdf = pd.DataFrame(data, columns=["id", "data", "event_time"])
df = spark.createDataFrame(pdf)
df.write.format("iceberg").mode("append").save("iceberg.db.sample")
spark.sql("SELECT COUNT(*) FROM iceberg.db.sample").show()
spark.sql("SELECT * FROM iceberg.db.sample LIMIT 10").show()
spark.stop()On génère 100 lignes avec Pandas pour simplicité, puis append via DataFrame (équivalent INSERT). Mode 'append' crée un nouveau snapshot. Exécutez : 100 rows insérées. Piège : sans format('iceberg'), c'est du Parquet standard sans ACID.
Time travel et maintenance
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergTimeTravel") \
.master("local[*]") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
.config("spark.jars", "./iceberg-spark-runtime-3.5_2.12-1.6.1.jar") \
.getOrCreate()
# Snapshot actuel
spark.sql("SELECT * FROM iceberg.db.sample LIMIT 5").show()
# Time travel : dernière snapshot
latest_snapshot = spark.sql("SELECT current_snapshot(iceberg.db.sample) as snap").collect()[0][0]
print(f"Snapshot actuel: {latest_snapshot}")
# Lire version précédente (simuler en supprimant puis rollbacks, mais ici query as of)
spark.sql(f"SELECT COUNT(*) FROM iceberg.db.sample VERSION AS OF {latest_snapshot}").show()
# Maintenance : expire vieux snapshots
spark.sql("CALL iceberg.system.expire_snapshots('db.sample', TIMESTAMP '2026-01-01 00:00:00')").show()
spark.stop()Ce script démontre le time travel avec 'VERSION AS OF' pour queries historiques. Obtenez snapshots via fonctions système. CALL expire nettoie les vieux fichiers (économie stockage). Lancez après inserts : comparez counts. Essentiel en prod pour éviter l'explosion de manifests.
Bonnes pratiques
- Toujours partitionner : Utilisez days(hours(event_time)) pour queries temporelles efficaces.
- Séparez les catalogues : 'iceberg' vs 'spark_catalog' évite les conflits Hive.
- Utilisez MERGE INTO pour upserts :
MERGE INTO ... ON id WHEN MATCHED THEN UPDATEpour CDC. - Monitorez snapshots :
SELECT * FROM iceberg.db.sample.historyrégulièrement. - Evoluez le schéma :
ALTER TABLE ADD COLUMN new_col stringsans rewrite.
Erreurs courantes à éviter
- JAR manquant : Spark ignore Iceberg ; vérifiez logs 'extensions not found'.
- Warehouse non créé : mkdir warehouse ; sinon 'path does not exist'.
- Catalog mal nommé : Utilisez 'iceberg' fixe, pas 'spark_catalog' qui overwrite defaults.
- Pas de master local : Ajoutez .master('local[*]') pour exécution single-node.
Pour aller plus loin
Passez à la production : intégrez S3 avec spark.sql.catalog.iceberg_s3.type=rest et AWS creds. Lisez la doc officielle Iceberg. Explorez Trino/Flink pour queries fédérées. Découvrez nos formations Learni sur Data Engineering pour maîtriser Iceberg en cluster.