Skip to content
Learni
Voir tous les tutoriels
Big Data

Comment implémenter Apache Flink pour le streaming en 2026

Read in English

Introduction

Apache Flink est un framework distribué open-source leader pour le traitement unifié de données en streaming et batch à faible latence. En 2026, avec la montée des données en temps réel (IoT, finance, logs), Flink excelle par sa gestion d'état exactly-once, ses checkpoints incrémentaux et son support SQL natif. Ce tutoriel avancé vous guide pas à pas pour implémenter des pipelines complexes : du WordCount batch/streaming au processing stateful avec rocksDB backend. Pourquoi Flink ? Contrairement à Spark Streaming (micro-batch), Flink traite nativement les streams illimités avec backpressure automatique et scalabilité horizontale. Idéal pour des cas comme la détection de fraude ou l'analyse e-commerce live. À la fin, vous déployez un job fault-tolerant prêt pour production Kubernetes. (128 mots)

Prérequis

  • Java 17+ (ou Scala 2.12/3.0)
  • Maven 3.9+ ou Gradle
  • Cluster Flink (local via flink run ou Kubernetes/Standalone)
  • IDE comme IntelliJ avec plugin Flink
  • Connaissances avancées en Java, streams et distributed systems
  • Docker pour tests conteneurisés

Configuration du projet Maven

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>flink-tutorial</artifactId>
  <version>1.0</version>
  <properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <flink.version>1.19.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.5.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals><goal>shade</goal></goals>
            <configuration>
              <createDependencyReducedPom>false</createDependencyReducedPom>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>com.example.WordCount</mainClass>
                </transformer>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

Ce pom.xml configure un projet Flink avec les dépendances streaming Java, clients pour soumission de jobs et RocksDB pour state backend. Le plugin Shade crée un JAR fat exécutable. Compilez avec mvn clean package pour générer flink-tutorial-1.0.jar. Évitez les conflits de versions en pinnant Flink à 1.19.1.

Premier job batch : WordCount

Commençons par un job batch classique pour valider le setup. Flink unifie batch/streaming via DataStream API. Imaginez traiter un fichier texte : split, map, keyBy, sum. Ce pattern scale à des pétaoctets.

Implémentation WordCount batch

BatchWordCount.java
package com.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.readTextFile("input.txt"); // Remplacez par votre fichier

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new Tokenizer())
            .groupBy(0)
            .sum(1);

        wordCounts.print();

        env.execute("Batch WordCount");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

Ce job lit input.txt, tokenise en mots minuscules, compte par clé avec groupBy(0).sum(1). Exécutez localement ou soumettez via flink run -c com.example.BatchWordCount target/flink-tutorial-1.0.jar. Piège : assurez-vous que input.txt existe ; pour prod, utilisez fromElements ou sources externes.

Transition vers le streaming

Passez au streaming pour données illimitées. Flink gère backpressure et watermarks pour late data. Utilisez StreamExecutionEnvironment au lieu de batch.

WordCount streaming avec socket source

StreamingWordCount.java
package com.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class StreamingWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<String> text = env.socketTextStream("localhost", 9999);

        DataStream<Tuple2<String, Integer>> windowCounts = text
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        windowCounts.print();

        env.execute("Streaming WordCount");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

Ce job lit d'un socket (lancez nc -lk 9999 pour tester), agrège sur fenêtres de 5s via timeWindow. keyBy partitionne par mot. Soumettez avec flink run. Analogie : comme un convoyeur roulant avec compteurs par bac. Évitez parallelism >1 sans Kafka pour sources partagées.

Gestion d'état avancée

Pour apps stateful (sessions, top-N), utilisez KeyedState. ValueState pour simples, MapState pour complexes. Backend RocksDB pour persistence disque.

Job stateful : Compteur par clé

StatefulCounter.java
package com.example;

import org.apache.flink.api.common RestartStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

public class StatefulCounter {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(10000);

        DataStream<Tuple2<String, Integer>> input = env.socketTextStream("localhost", 9999)
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0);

        input.process(new CountFunction())
            .print();

        env.execute("Stateful Counter");
    }

    public static class CountFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Long>> {
        private transient ValueState<Long> countState;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count", Long.class);
            countState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
            Long currentCount = countState.value();
            if (currentCount == null) {
                currentCount = 0L;
            }
            currentCount += value.f1;
            countState.update(currentCount);
            out.collect(new Tuple2<>(value.f0, currentCount));
        }
    }

    // Tokenizer comme avant
    public static final class Tokenizer implements org.apache.flink.api.common.functions.FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

Ce KeyedProcessFunction maintient un ValueState par clé (mot), incrémenté à chaque événement. Checkpoints activés toutes 10s pour exactly-once. Open() initialise state. Piège : oubliez transient pour states, sinon sérialisation échoue.

Configuration checkpoints et state backend

CheckpointConfig.java
package com.example;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CheckpointConfig {

    public static void configure(StreamExecutionEnvironment env) {
        // Backend RocksDB pour state local/disque
        EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
        backend.setPredefinedOptions(EmbeddedRocksDBStateBackend.PREDEFINED_OPTIONS.ROCKSDB_DEFAULT);
        env.setStateBackend(backend);

        // Checkpoints asynchrones, exactly-once
        env.enableCheckpointing(60000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // Storage FS pour HA
        env.getCheckpointConfig().setCheckpointStorage(
            new FileSystemCheckpointStorage("file:///checkpoints"));

        // Restart strategy
        env.setRestartStrategy(org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart(3, 10000L));
    }

    // Appel dans main: CheckpointConfig.configure(env);
}

Configure RocksDB pour state persistant, checkpoints exactly-once toutes 60s avec storage FS. ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION garde checkpoints pour reprise manuelle. Intégrez dans main() avant job. Évitez FS non-HA en prod (utilisez S3/HDFS).

Exemple SQL : Table API

Flink SQL simplifie pour analystes. Définissez tables sur Kafka/files, queryz avec fenêtres.

Pipeline SQL avec Kafka source

flink-sql-job.sql
-- Soumettez via Flink SQL Client ou TableEnvironment
CREATE TABLE clicks (
  user_id STRING,
  url STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE url_counts (
  url STRING,
  window_start TIMESTAMP(3),
  window_end TIMESTAMP(3),
  cnt BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO url_counts
SELECT
  url,
  window_start,
  window_end,
  COUNT(*) as cnt
FROM TABLE(
  TUMBLE(TABLE clicks, DESCRIPTOR(ts), INTERVAL '10' SECONDS)
)
GROUP BY url, window_start, window_end;

Ce SQL lit Kafka clicks, tumble sur 10s par url, sink vers print (remplacez par Kafka/JDBC). Watermark gère late data. Exécutez dans Flink SQL Client. Piège : oubliez watermark, joins bloquent sur late events.

Déploiement Kubernetes YAML

flink-k8s.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-tutorial
spec:
  image: flink:1.19
  flinkVersion: v1_19
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend: rocksdb
    state.checkpoints.dir: s3://my-bucket/checkpoints
    execution.checkpointing.mode: EXACTLY_ONCE
    execution.checkpointing.interval: 1min
  serviceAccount: flink
  jobManager: {}
  taskManager: {}
  job:
    jarURI: local:///opt/flink/usrlib/flink-tutorial-1.0.jar
    entryClass: com.example.StreamingWordCount
    parallelism: 4
    upgradeMode: stateless

Déployez sur K8s avec Flink Operator. JAR en usrlib, config checkpoints S3. Appliquez kubectl apply -f flink-k8s.yaml. Scale via parallelism. Évitez stateful upgrade sans savepoints.

Bonnes pratiques

  • Backpressure tuning : Monitorez via Flink UI, ajustez taskmanager.memory.network.fraction à 0.2.
  • State scaling : Utilisez RocksDB compaction filters pour >1TB state ; préférez Incremental checkpoints.
  • Sources/sinks : Kafka avec exactly-once (transactional.id) ; testez avec Chaos Engineering.
  • Monitoring : Intégrez Prometheus + Grafana ; alertez sur checkpoint age >2x interval.
  • Testing : Flink Test Harness pour unit tests stateful.

Erreurs courantes à éviter

  • Oubli watermarks : Cause accumulation mémoire sur late data ; toujours définir - INTERVAL.
  • State TTL non-set : Mémoire explose sur clés éphémères ; utilisez state.ttl.
  • Checkpoint timeout : Augmentez pour gros states ; testez sous load.
  • KeyBy sur non-serializable : Utilisez keySelector lambda avec Kryo.

Pour aller plus loin

Plongez dans Flink Forward conf, docs officielles Apache Flink. Maîtrisez PyFlink/CephRocksDB. Découvrez nos formations Learni sur Big Data & Streaming pour certification Flink.