提供状态的访问、查询;
如果开启了 Checkpoint,会周期向远程的 Durable storage 上传数据和返回元数据 (meta) 给 Job Manager (以下简称 JM)。
01
Flink的三种状态后端
Flink提供三种开箱即用的State Backend:
状态后端 | 数据存储 | 容量限制 | 场景 |
Memory StateBackend | State:TaskManager 内存中 Checkpoint:存储在jobManager 内存 | 单个State maxStateSize默认为5M maxStateSize<= akka.frame.size默认 10M Checkpoint总大小不能超过JobMananger的内存 | 本地测试状态比较少的作业 不推荐生产环境中使用 |
Fs StateBackend | State:TaskManager 内存 Checkpoint:外部文件系统(本地或HDFS) | 单个TaskManager上State总量不能超过TM内存总数据大小不超过文件系统容量 | 窗口时间比较长,如分钟 级别窗口聚合,Join等 需要开启HA的 作业可在生产环境中使用 |
RocksDBStateBackend | 将所有的状态序列化之后,存入本地的 RocksDB 数据库中.(一种 NoSql数 据库,KV 形式存储) State: TaskManager 中的KV数据库(实际使用内存+磁盘) Checkpoint:外部文件系统(本地或HDFS) | 单TaskManager上 State总量不超过其内存+磁盘大小,单 Key最大容量2G总大小不超过配置的文件系统容量 | 超大状态作业 需要开启HA的 作业生产环境可用 |
代码案例:
package org.bigdatatechcir.learn_flink.part7_flink_statebackend;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
public class FsStateBackendDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///D:/flink-state");
// 开启 checkpoint,并设置间隔 ms
env.enableCheckpointing(1000);
// 模式 Exactly-Once、At-Least-Once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两个 checkpoint 之间最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同时执行的 checkpoint 数量(比如上一个还没执行完,下一个已经触发开始了)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 当用户取消了作业后,是否保留远程存储上的Checkpoint数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置自动生成Watermark的间隔时间
// env.getConfig().setAutoWatermarkInterval(100000);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> text = env.addSource(new RichParallelSourceFunction<String>() {
private volatile boolean running = true;
private volatile long count = 0; // 计数器用于跟踪已生成的数据条数
private final Random random = new Random();
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (running) {
int randomNum = random.nextInt(5) + 1;
long timestamp = System.currentTimeMillis();
// 如果生成的是 key2,则在一个新线程中处理延迟
if (randomNum == 2) {
new Thread(() -> {
try {
int delay = random.nextInt(10) + 1; // 随机数范围从1到10
Thread.sleep(delay * 1000); // 增加1到10秒的延迟
ctx.collectWithTimestamp("key" + randomNum + "," + 1 + "," + timestamp, timestamp);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
} else {
ctx.collectWithTimestamp("key" + randomNum + "," + 1 + "," + timestamp, timestamp);
}
if (++count % 200 == 0) {
ctx.emitWatermark(new Watermark(timestamp));
System.out.println("Manual Watermark emitted: " + timestamp);
}
ZonedDateTime generateDataDateTime = Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
String formattedGenerateDataDateTime = generateDataDateTime.format(formatter);
System.out.println("Generated data: " + "key" + randomNum + "," + 1 + "," + timestamp + " at " + formattedGenerateDataDateTime);
Thread.sleep(1000); // 每次循环后等待1秒
}
}
@Override
public void cancel() {
running = false;
}
});
DataStream<Tuple3<String, Integer, Long>> tuplesWithTimestamp = text.map(new MapFunction<String, Tuple3<String, Integer, Long>>() {
@Override
public Tuple3<String, Integer, Long> map(String value) {
String[] words = value.split(",");
return new Tuple3<>(words[0], Integer.parseInt(words[1]), Long.parseLong(words[2]));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT, Types.LONG));
// 设置 Watermark 策略
DataStream<Tuple3<String, Integer, Long>> withWatermarks = tuplesWithTimestamp.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((element, recordTimestamp) -> element.f2)
);
// 窗口逻辑
DataStream<Tuple2<String, Integer>> keyedStream = withWatermarks
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>() {
private ValueState<Integer> countState;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Types.INT)
);
}
@Override
public void process(String s, Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = 0;
for (Tuple3<String, Integer, Long> element : elements) {
count++;
}
// 在更新状态之前获取当前状态值
Integer previousCount = countState.value();
// 更新状态
countState.update(count);
// 获取更新后的状态值
Integer updatedCount = countState.value();
// 打印状态更新前后的值
if (previousCount != null) {
System.out.println("Key: " + s + ", Previous Count: " + previousCount + ", Updated Count: " + updatedCount);
} else {
System.out.println("Key: " + s + ", Initial Count: " + updatedCount);
}
long start = context.window().getStart();
long end = context.window().getEnd();
ZonedDateTime startDateTime = Instant.ofEpochMilli(start).atZone(ZoneId.systemDefault());
ZonedDateTime endDateTime = Instant.ofEpochMilli(end).atZone(ZoneId.systemDefault());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
String formattedStart = startDateTime.format(formatter);
String formattedEnd = endDateTime.format(formatter);
System.out.println("Tumbling Window [start " + formattedStart + ", end " + formattedEnd + ") for key " + s);
// 输出窗口结束时的Watermark
long windowEndWatermark = context.currentWatermark();
ZonedDateTime windowEndDateTime = Instant.ofEpochMilli(windowEndWatermark).atZone(ZoneId.systemDefault());
String formattedWindowEndWatermark = windowEndDateTime.format(formatter);
System.out.println("Watermark at the end of window: " + formattedWindowEndWatermark);
// 读取状态并输出
Integer currentCount = countState.value();
if (currentCount != null) {
out.collect(new Tuple2<>(s, currentCount));
}
}
});
// 输出结果
keyedStream.print();
// 执行任务
env.execute("Periodic Watermark Demo");
}
}
这或许是一个对你有用的开源项目,data-warehouse-learning 项目是一套基于 MySQL + Kafka + Hadoop + Hive + Dolphinscheduler + Doris + Seatunnel + Paimon + Hudi + Iceberg + Flink + Dinky + DataRT + SuperSet 实现的实时离线数仓(数据湖)系统,以大家最熟悉的电商业务为切入点,详细讲述并实现了数据产生、同步、数据建模、数仓(数据湖)建设、数据服务、BI报表展示等数据全链路处理流程。
https://gitee.com/wzylzjtn/data-warehouse-learning
https://github.com/Mrkuhuo/data-warehouse-learning
https://bigdatacircle.top/
项目演示:
02
代码获取
https://gitee.com/wzylzjtn/data-warehouse-learning
https://github.com/Mrkuhuo/data-warehouse-learning
03
文档获取
05
进交流群群添加作者
推荐阅读
【视频】| 2024最新版【实时数仓数据湖实战教程视频】重磅更新,打通新手到大牛的最后一公里 【视频】| 1. Flink基本架构概述 【视频】| 2. Flink任务运行时架构图详解 【视频】| 3. Flink 任务、子任务、算子链及Slot共享详解 【视频】| 4. Flink四种API详解及代码演示 【视频】| 5. 如何在IDEA中启动Flink任务并看到WEBUI呢? 【视频】| 6. Centos8 安装Flink1.18.1 详细教程 【视频】| 7. Flink 使用 Session & Application 模式提交任务实战演示 【视频】| 8. Flink 三种时间语义详解及代码实战 【视频】| 9. Flink 窗口函数是个好东西,你真的会用吗? 【视频】| 10. Flink乱序问题解决神器Watermark详解及代码实战 【视频】| 11. Flink Watermark何种情况下可以触发窗口计算? 【视频】| 12.Flink 并行运行时Watermark如何向下传递? 【视频】| 13.Flink Watermark的两种产生方式:标点水位线和周期水位线详解 【视频】| 14.Flink Watermark卡住不动,出现eventtime倾斜问题怎么办? 【视频】| 15.Flink Watermark都包不住的这部分迟到数据怎么处理? 【视频】| 16.Flink 如何保证数据不丢失?终极大招旁路输出(SideOutput)详解 【视频】| 17.Flink State有哪几种类型?