Introduction
Apache Flink is the leading open-source distributed framework for unified low-latency streaming and batch data processing. In 2026, with the rise of real-time data from IoT, finance, and logs, Flink stands out with its exactly-once state management, incremental checkpoints, and native SQL support. This advanced tutorial guides you step-by-step through implementing complex pipelines: from batch/streaming WordCount to stateful processing with RocksDB backend. Why Flink? Unlike Spark Streaming's micro-batches, Flink natively handles unbounded streams with automatic backpressure and horizontal scaling. Perfect for fraud detection or live e-commerce analytics. By the end, you'll deploy a production-ready, fault-tolerant job on Kubernetes.
Prerequisites
- Java 17+ (or Scala 2.12/3.0)
- Maven 3.9+ or Gradle
- Flink cluster (local via
flink runor Kubernetes/Standalone) - IDE like IntelliJ with Flink plugin
- Advanced knowledge of Java, streams, and distributed systems
- Docker for containerized tests
Maven Project Configuration
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-tutorial</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<flink.version>1.19.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.WordCount</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>This pom.xml sets up a Flink project with Java streaming dependencies, clients for job submission, and RocksDB for state backend. The Shade plugin creates an executable fat JAR. Build with mvn clean package to generate flink-tutorial-1.0.jar. Pin Flink to 1.19.1 to avoid version conflicts.
First Batch Job: WordCount
Start with a classic batch job to validate your setup. Flink unifies batch and streaming via the DataStream API. Imagine processing a text file: split, map, keyBy, sum. This pattern scales to petabytes.
Batch WordCount Implementation
package com.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("input.txt"); // Remplacez par votre fichier
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
wordCounts.print();
env.execute("Batch WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}This job reads input.txt, tokenizes into lowercase words, and counts per key with groupBy(0).sum(1). Run locally or submit via flink run -c com.example.BatchWordCount target/flink-tutorial-1.0.jar. Pitfall: ensure input.txt exists; for production, use fromElements or external sources.
Transition to Streaming
Move to streaming for unbounded data. Flink handles backpressure and watermarks for late data. Use StreamExecutionEnvironment instead of batch.
Streaming WordCount with Socket Source
package com.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> windowCounts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.timeWindow(Time.seconds(5))
.sum(1);
windowCounts.print();
env.execute("Streaming WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}This job reads from a socket (start nc -lk 9999 to test), aggregates over 5-second windows via timeWindow. keyBy partitions by word. Submit with flink run. Analogy: like a conveyor belt with per-bin counters. Avoid parallelism >1 without Kafka for shared sources.
Advanced State Management
For stateful apps (sessions, top-N), use KeyedState. ValueState for simple values, MapState for complex ones. RocksDB backend for disk persistence.
Stateful Job: Per-Key Counter
package com.example;
import org.apache.flink.api.common.RestartStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
public class StatefulCounter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000);
DataStream<Tuple2<String, Integer>> input = env.socketTextStream("localhost", 9999)
.flatMap(new Tokenizer())
.keyBy(value -> value.f0);
input.process(new CountFunction())
.print();
env.execute("Stateful Counter");
}
public static class CountFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Long>> {
private transient ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count", Long.class);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Long currentCount = countState.value();
if (currentCount == null) {
currentCount = 0L;
}
currentCount += value.f1;
countState.update(currentCount);
out.collect(new Tuple2<>(value.f0, currentCount));
}
}
// Tokenizer as before
public static final class Tokenizer implements org.apache.flink.api.common.functions.FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}This KeyedProcessFunction maintains a ValueState per key (word), incremented on each event. Checkpoints enabled every 10s for exactly-once. open() initializes state. Pitfall: forget transient on states, or serialization fails.
Checkpoint and State Backend Configuration
package com.example;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CheckpointConfig {
public static void configure(StreamExecutionEnvironment env) {
// RocksDB backend for local/disk state
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setPredefinedOptions(EmbeddedRocksDBStateBackend.PREDEFINED_OPTIONS.ROCKSDB_DEFAULT);
env.setStateBackend(backend);
// Asynchronous checkpoints, exactly-once
env.enableCheckpointing(60000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
env.getCheckpointConfig().setCheckpointTimeout(60000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// FS storage for HA
env.getCheckpointConfig().setCheckpointStorage(
new FileSystemCheckpointStorage("file:///checkpoints"));
// Restart strategy
env.setRestartStrategy(org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart(3, 10000L));
}
// Call in main: CheckpointConfig.configure(env);
}Configures RocksDB for persistent state, exactly-once checkpoints every 60s with FS storage. ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION retains checkpoints for manual recovery. Integrate in main() before the job. Avoid non-HA FS in production (use S3/HDFS).
SQL Example: Table API
Flink SQL simplifies things for analysts. Define tables over Kafka/files, query with windows.
SQL Pipeline with Kafka Source
-- Soumettez via Flink SQL Client ou TableEnvironment
CREATE TABLE clicks (
user_id STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE url_counts (
url STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
cnt BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO url_counts
SELECT
url,
window_start,
window_end,
COUNT(*) as cnt
FROM TABLE(
TUMBLE(TABLE clicks, DESCRIPTOR(ts), INTERVAL '10' SECONDS)
)
GROUP BY url, window_start, window_end;This SQL reads Kafka clicks topic, tumbles over 10s per url, sinks to print (replace with Kafka/JDBC). Watermark handles late data. Run in Flink SQL Client. Pitfall: forget watermarks, and joins block on late events.
Kubernetes YAML Deployment
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-tutorial
spec:
image: flink:1.19
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.backend: rocksdb
state.checkpoints.dir: s3://my-bucket/checkpoints
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 1min
serviceAccount: flink
jobManager: {}
taskManager: {}
job:
jarURI: local:///opt/flink/usrlib/flink-tutorial-1.0.jar
entryClass: com.example.StreamingWordCount
parallelism: 4
upgradeMode: statelessDeploy on Kubernetes with Flink Operator. JAR in usrlib, S3 checkpoints config. Apply with kubectl apply -f flink-k8s.yaml. Scale via parallelism. Avoid stateful upgrades without savepoints.
Best Practices
- Backpressure tuning: Monitor via Flink UI, set
taskmanager.memory.network.fractionto 0.2. - State scaling: Use RocksDB compaction filters for >1TB state; prefer incremental checkpoints.
- Sources/sinks: Kafka with exactly-once (transactional.id); test with Chaos Engineering.
- Monitoring: Integrate Prometheus + Grafana; alert on checkpoint age >2x interval.
- Testing: Flink Test Harness for stateful unit tests.
Common Errors to Avoid
- Missing watermarks: Causes memory buildup on late data; always define
- INTERVAL. - No state TTL: Memory explodes on ephemeral keys; use
state.ttl. - Checkpoint timeout: Increase for large states; test under load.
- KeyBy on non-serializable: Use
keySelectorlambda with Kryo.
Next Steps
Dive into Flink Forward conf, official Apache Flink docs. Master PyFlink/CephRocksDB. Check our Learni Big Data & Streaming training for Flink certification.