Introduction
Apache Flink est un framework distribué open-source leader pour le traitement unifié de données en streaming et batch à faible latence. En 2026, avec la montée des données en temps réel (IoT, finance, logs), Flink excelle par sa gestion d'état exactly-once, ses checkpoints incrémentaux et son support SQL natif. Ce tutoriel avancé vous guide pas à pas pour implémenter des pipelines complexes : du WordCount batch/streaming au processing stateful avec rocksDB backend. Pourquoi Flink ? Contrairement à Spark Streaming (micro-batch), Flink traite nativement les streams illimités avec backpressure automatique et scalabilité horizontale. Idéal pour des cas comme la détection de fraude ou l'analyse e-commerce live. À la fin, vous déployez un job fault-tolerant prêt pour production Kubernetes. (128 mots)
Prérequis
- Java 17+ (ou Scala 2.12/3.0)
- Maven 3.9+ ou Gradle
- Cluster Flink (local via
flink runou Kubernetes/Standalone) - IDE comme IntelliJ avec plugin Flink
- Connaissances avancées en Java, streams et distributed systems
- Docker pour tests conteneurisés
Configuration du projet Maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-tutorial</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<flink.version>1.19.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.WordCount</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>Ce pom.xml configure un projet Flink avec les dépendances streaming Java, clients pour soumission de jobs et RocksDB pour state backend. Le plugin Shade crée un JAR fat exécutable. Compilez avec mvn clean package pour générer flink-tutorial-1.0.jar. Évitez les conflits de versions en pinnant Flink à 1.19.1.
Premier job batch : WordCount
Commençons par un job batch classique pour valider le setup. Flink unifie batch/streaming via DataStream API. Imaginez traiter un fichier texte : split, map, keyBy, sum. Ce pattern scale à des pétaoctets.
Implémentation WordCount batch
package com.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("input.txt"); // Remplacez par votre fichier
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
wordCounts.print();
env.execute("Batch WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}Ce job lit input.txt, tokenise en mots minuscules, compte par clé avec groupBy(0).sum(1). Exécutez localement ou soumettez via flink run -c com.example.BatchWordCount target/flink-tutorial-1.0.jar. Piège : assurez-vous que input.txt existe ; pour prod, utilisez fromElements ou sources externes.
Transition vers le streaming
Passez au streaming pour données illimitées. Flink gère backpressure et watermarks pour late data. Utilisez StreamExecutionEnvironment au lieu de batch.
WordCount streaming avec socket source
package com.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> windowCounts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.timeWindow(Time.seconds(5))
.sum(1);
windowCounts.print();
env.execute("Streaming WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}Ce job lit d'un socket (lancez nc -lk 9999 pour tester), agrège sur fenêtres de 5s via timeWindow. keyBy partitionne par mot. Soumettez avec flink run. Analogie : comme un convoyeur roulant avec compteurs par bac. Évitez parallelism >1 sans Kafka pour sources partagées.
Gestion d'état avancée
Pour apps stateful (sessions, top-N), utilisez KeyedState. ValueState pour simples, MapState pour complexes. Backend RocksDB pour persistence disque.
Job stateful : Compteur par clé
package com.example;
import org.apache.flink.api.common RestartStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
public class StatefulCounter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000);
DataStream<Tuple2<String, Integer>> input = env.socketTextStream("localhost", 9999)
.flatMap(new Tokenizer())
.keyBy(value -> value.f0);
input.process(new CountFunction())
.print();
env.execute("Stateful Counter");
}
public static class CountFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Long>> {
private transient ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count", Long.class);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Long currentCount = countState.value();
if (currentCount == null) {
currentCount = 0L;
}
currentCount += value.f1;
countState.update(currentCount);
out.collect(new Tuple2<>(value.f0, currentCount));
}
}
// Tokenizer comme avant
public static final class Tokenizer implements org.apache.flink.api.common.functions.FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}Ce KeyedProcessFunction maintient un ValueState par clé (mot), incrémenté à chaque événement. Checkpoints activés toutes 10s pour exactly-once. Open() initialise state. Piège : oubliez transient pour states, sinon sérialisation échoue.
Configuration checkpoints et state backend
package com.example;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CheckpointConfig {
public static void configure(StreamExecutionEnvironment env) {
// Backend RocksDB pour state local/disque
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setPredefinedOptions(EmbeddedRocksDBStateBackend.PREDEFINED_OPTIONS.ROCKSDB_DEFAULT);
env.setStateBackend(backend);
// Checkpoints asynchrones, exactly-once
env.enableCheckpointing(60000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
env.getCheckpointConfig().setCheckpointTimeout(60000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// Storage FS pour HA
env.getCheckpointConfig().setCheckpointStorage(
new FileSystemCheckpointStorage("file:///checkpoints"));
// Restart strategy
env.setRestartStrategy(org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart(3, 10000L));
}
// Appel dans main: CheckpointConfig.configure(env);
}Configure RocksDB pour state persistant, checkpoints exactly-once toutes 60s avec storage FS. ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION garde checkpoints pour reprise manuelle. Intégrez dans main() avant job. Évitez FS non-HA en prod (utilisez S3/HDFS).
Exemple SQL : Table API
Flink SQL simplifie pour analystes. Définissez tables sur Kafka/files, queryz avec fenêtres.
Pipeline SQL avec Kafka source
-- Soumettez via Flink SQL Client ou TableEnvironment
CREATE TABLE clicks (
user_id STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE url_counts (
url STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
cnt BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO url_counts
SELECT
url,
window_start,
window_end,
COUNT(*) as cnt
FROM TABLE(
TUMBLE(TABLE clicks, DESCRIPTOR(ts), INTERVAL '10' SECONDS)
)
GROUP BY url, window_start, window_end;Ce SQL lit Kafka clicks, tumble sur 10s par url, sink vers print (remplacez par Kafka/JDBC). Watermark gère late data. Exécutez dans Flink SQL Client. Piège : oubliez watermark, joins bloquent sur late events.
Déploiement Kubernetes YAML
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-tutorial
spec:
image: flink:1.19
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.backend: rocksdb
state.checkpoints.dir: s3://my-bucket/checkpoints
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 1min
serviceAccount: flink
jobManager: {}
taskManager: {}
job:
jarURI: local:///opt/flink/usrlib/flink-tutorial-1.0.jar
entryClass: com.example.StreamingWordCount
parallelism: 4
upgradeMode: statelessDéployez sur K8s avec Flink Operator. JAR en usrlib, config checkpoints S3. Appliquez kubectl apply -f flink-k8s.yaml. Scale via parallelism. Évitez stateful upgrade sans savepoints.
Bonnes pratiques
- Backpressure tuning : Monitorez via Flink UI, ajustez
taskmanager.memory.network.fractionà 0.2. - State scaling : Utilisez RocksDB compaction filters pour >1TB state ; préférez Incremental checkpoints.
- Sources/sinks : Kafka avec exactly-once (transactional.id) ; testez avec Chaos Engineering.
- Monitoring : Intégrez Prometheus + Grafana ; alertez sur checkpoint age >2x interval.
- Testing : Flink Test Harness pour unit tests stateful.
Erreurs courantes à éviter
- Oubli watermarks : Cause accumulation mémoire sur late data ; toujours définir
- INTERVAL. - State TTL non-set : Mémoire explose sur clés éphémères ; utilisez
state.ttl. - Checkpoint timeout : Augmentez pour gros states ; testez sous load.
- KeyBy sur non-serializable : Utilisez
keySelectorlambda avec Kryo.
Pour aller plus loin
Plongez dans Flink Forward conf, docs officielles Apache Flink. Maîtrisez PyFlink/CephRocksDB. Découvrez nos formations Learni sur Big Data & Streaming pour certification Flink.