Skip to content
Learni
View all tutorials
Big Data

How to Implement Apache Flink for Streaming in 2026

Lire en français

Introduction

Apache Flink is the leading open-source distributed framework for unified low-latency streaming and batch data processing. In 2026, with the rise of real-time data from IoT, finance, and logs, Flink stands out with its exactly-once state management, incremental checkpoints, and native SQL support. This advanced tutorial guides you step-by-step through implementing complex pipelines: from batch/streaming WordCount to stateful processing with RocksDB backend. Why Flink? Unlike Spark Streaming's micro-batches, Flink natively handles unbounded streams with automatic backpressure and horizontal scaling. Perfect for fraud detection or live e-commerce analytics. By the end, you'll deploy a production-ready, fault-tolerant job on Kubernetes.

Prerequisites

  • Java 17+ (or Scala 2.12/3.0)
  • Maven 3.9+ or Gradle
  • Flink cluster (local via flink run or Kubernetes/Standalone)
  • IDE like IntelliJ with Flink plugin
  • Advanced knowledge of Java, streams, and distributed systems
  • Docker for containerized tests

Maven Project Configuration

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>flink-tutorial</artifactId>
  <version>1.0</version>
  <properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <flink.version>1.19.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.5.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals><goal>shade</goal></goals>
            <configuration>
              <createDependencyReducedPom>false</createDependencyReducedPom>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>com.example.WordCount</mainClass>
                </transformer>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

This pom.xml sets up a Flink project with Java streaming dependencies, clients for job submission, and RocksDB for state backend. The Shade plugin creates an executable fat JAR. Build with mvn clean package to generate flink-tutorial-1.0.jar. Pin Flink to 1.19.1 to avoid version conflicts.

First Batch Job: WordCount

Start with a classic batch job to validate your setup. Flink unifies batch and streaming via the DataStream API. Imagine processing a text file: split, map, keyBy, sum. This pattern scales to petabytes.

Batch WordCount Implementation

BatchWordCount.java
package com.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.readTextFile("input.txt"); // Remplacez par votre fichier

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new Tokenizer())
            .groupBy(0)
            .sum(1);

        wordCounts.print();

        env.execute("Batch WordCount");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

This job reads input.txt, tokenizes into lowercase words, and counts per key with groupBy(0).sum(1). Run locally or submit via flink run -c com.example.BatchWordCount target/flink-tutorial-1.0.jar. Pitfall: ensure input.txt exists; for production, use fromElements or external sources.

Transition to Streaming

Move to streaming for unbounded data. Flink handles backpressure and watermarks for late data. Use StreamExecutionEnvironment instead of batch.

Streaming WordCount with Socket Source

StreamingWordCount.java
package com.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class StreamingWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<String> text = env.socketTextStream("localhost", 9999);

        DataStream<Tuple2<String, Integer>> windowCounts = text
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        windowCounts.print();

        env.execute("Streaming WordCount");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

This job reads from a socket (start nc -lk 9999 to test), aggregates over 5-second windows via timeWindow. keyBy partitions by word. Submit with flink run. Analogy: like a conveyor belt with per-bin counters. Avoid parallelism >1 without Kafka for shared sources.

Advanced State Management

For stateful apps (sessions, top-N), use KeyedState. ValueState for simple values, MapState for complex ones. RocksDB backend for disk persistence.

Stateful Job: Per-Key Counter

StatefulCounter.java
package com.example;

import org.apache.flink.api.common.RestartStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

public class StatefulCounter {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(10000);

        DataStream<Tuple2<String, Integer>> input = env.socketTextStream("localhost", 9999)
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0);

        input.process(new CountFunction())
            .print();

        env.execute("Stateful Counter");
    }

    public static class CountFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Long>> {
        private transient ValueState<Long> countState;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count", Long.class);
            countState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
            Long currentCount = countState.value();
            if (currentCount == null) {
                currentCount = 0L;
            }
            currentCount += value.f1;
            countState.update(currentCount);
            out.collect(new Tuple2<>(value.f0, currentCount));
        }
    }

    // Tokenizer as before
    public static final class Tokenizer implements org.apache.flink.api.common.functions.FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

This KeyedProcessFunction maintains a ValueState per key (word), incremented on each event. Checkpoints enabled every 10s for exactly-once. open() initializes state. Pitfall: forget transient on states, or serialization fails.

Checkpoint and State Backend Configuration

CheckpointConfig.java
package com.example;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CheckpointConfig {

    public static void configure(StreamExecutionEnvironment env) {
        // RocksDB backend for local/disk state
        EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
        backend.setPredefinedOptions(EmbeddedRocksDBStateBackend.PREDEFINED_OPTIONS.ROCKSDB_DEFAULT);
        env.setStateBackend(backend);

        // Asynchronous checkpoints, exactly-once
        env.enableCheckpointing(60000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // FS storage for HA
        env.getCheckpointConfig().setCheckpointStorage(
            new FileSystemCheckpointStorage("file:///checkpoints"));

        // Restart strategy
        env.setRestartStrategy(org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart(3, 10000L));
    }

    // Call in main: CheckpointConfig.configure(env);
}

Configures RocksDB for persistent state, exactly-once checkpoints every 60s with FS storage. ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION retains checkpoints for manual recovery. Integrate in main() before the job. Avoid non-HA FS in production (use S3/HDFS).

SQL Example: Table API

Flink SQL simplifies things for analysts. Define tables over Kafka/files, query with windows.

SQL Pipeline with Kafka Source

flink-sql-job.sql
-- Soumettez via Flink SQL Client ou TableEnvironment
CREATE TABLE clicks (
  user_id STRING,
  url STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE url_counts (
  url STRING,
  window_start TIMESTAMP(3),
  window_end TIMESTAMP(3),
  cnt BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO url_counts
SELECT
  url,
  window_start,
  window_end,
  COUNT(*) as cnt
FROM TABLE(
  TUMBLE(TABLE clicks, DESCRIPTOR(ts), INTERVAL '10' SECONDS)
)
GROUP BY url, window_start, window_end;

This SQL reads Kafka clicks topic, tumbles over 10s per url, sinks to print (replace with Kafka/JDBC). Watermark handles late data. Run in Flink SQL Client. Pitfall: forget watermarks, and joins block on late events.

Kubernetes YAML Deployment

flink-k8s.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-tutorial
spec:
  image: flink:1.19
  flinkVersion: v1_19
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend: rocksdb
    state.checkpoints.dir: s3://my-bucket/checkpoints
    execution.checkpointing.mode: EXACTLY_ONCE
    execution.checkpointing.interval: 1min
  serviceAccount: flink
  jobManager: {}
  taskManager: {}
  job:
    jarURI: local:///opt/flink/usrlib/flink-tutorial-1.0.jar
    entryClass: com.example.StreamingWordCount
    parallelism: 4
    upgradeMode: stateless

Deploy on Kubernetes with Flink Operator. JAR in usrlib, S3 checkpoints config. Apply with kubectl apply -f flink-k8s.yaml. Scale via parallelism. Avoid stateful upgrades without savepoints.

Best Practices

  • Backpressure tuning: Monitor via Flink UI, set taskmanager.memory.network.fraction to 0.2.
  • State scaling: Use RocksDB compaction filters for >1TB state; prefer incremental checkpoints.
  • Sources/sinks: Kafka with exactly-once (transactional.id); test with Chaos Engineering.
  • Monitoring: Integrate Prometheus + Grafana; alert on checkpoint age >2x interval.
  • Testing: Flink Test Harness for stateful unit tests.

Common Errors to Avoid

  • Missing watermarks: Causes memory buildup on late data; always define - INTERVAL.
  • No state TTL: Memory explodes on ephemeral keys; use state.ttl.
  • Checkpoint timeout: Increase for large states; test under load.
  • KeyBy on non-serializable: Use keySelector lambda with Kryo.

Next Steps

Dive into Flink Forward conf, official Apache Flink docs. Master PyFlink/CephRocksDB. Check our Learni Big Data & Streaming training for Flink certification.