再也不用state和checkpoint傻傻分不清了,Flink1.13开始statebacked的配置方式好清晰↗

文摘   2024-09-05 00:00   重庆  
在 Flink 中,State Backend 有两个功能:
  • 提供状态的访问、查询;

  • 如果开启了 Checkpoint,会周期向远程的 Durable storage 上传数据和返回元数据 (meta) 给 Job Manager (以下简称 JM)。
在之前的 Flink1.13 版本之前中,以上两个功能是混在一起的,即把状态存储和检查点的创建概念笼统得混在一起,导致初学者对此部分感觉很混乱,很难理解。
Flink 1.13 中两个概念被拆开:
其中,State Backend 的概念变窄,只描述状态访问和存储;
另外一个概念是 Checkpoint storage,描述的是 Checkpoint 行为,如 Checkpoint 数据是发回给 JM 内存还是上传到远程。所以,相对应的配置项也被拆开 。
当前不仅需要指定 State Backend ,还需要指定 Checkpoint Storage。以下就是新老接口的对应关系:
当然,虽然旧接口目前仍然保存,但还是推荐大家使用新接口,向新方式迁移,从概念上也更清晰一些。

要说明的是,目前只有 RocksDB 支持增量 Checkpoint。

01

Flink的三种状态后端

Flink提供三种开箱即用的State Backend:

状态后端

数据存储

容量限制

场景

Memory

StateBackend

State:TaskManager  内存中

Checkpoint:存储在jobManager 内存

单个State maxStateSize默认为5M maxStateSize<= akka.frame.size默认

10M

Checkpoint总大小不能超过JobMananger的内存

本地测试状态比较少的作业

不推荐生产环境中使用

Fs

StateBackend

State:TaskManager  内存

Checkpoint:外部文件系统(本地或HDFS)

单个TaskManagerState总量不能超过TM内存总数据大小不超过文件系统容量

窗口时间比较长,如分钟

级别窗口聚合,Join

需要开启HA

作业可在生产环境中使用

RocksDBStateBackend

 

将所有的状态序列化之后,存入本地的 RocksDB 数据库中.(一种 NoSql  据库,KV 形式存储)

State:  TaskManager 中的KV数据库(实际使用内存+磁盘)

Checkpoint:外部文件系统(本地或HDFS)

TaskManager State总量不超过其内存+磁盘大小, Key最大容量2G总大小不超过配置的文件系统容量

超大状态作业

需要开启HA

作业生产环境可用

代码案例:

package org.bigdatatechcir.learn_flink.part7_flink_statebackend;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.RestOptions;import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;
import java.time.Duration;import java.time.Instant;import java.time.ZoneId;import java.time.ZonedDateTime;import java.time.format.DateTimeFormatter;import java.util.Random;
public class FsStateBackendDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT, "8081"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:///D:/flink-state"); // 开启 checkpoint,并设置间隔 ms env.enableCheckpointing(1000); // 模式 Exactly-Once、At-Least-Once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 两个 checkpoint 之间最小间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同时执行的 checkpoint 数量(比如上一个还没执行完,下一个已经触发开始了) env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 当用户取消了作业后,是否保留远程存储上的Checkpoint数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置自动生成Watermark的间隔时间 // env.getConfig().setAutoWatermarkInterval(100000); env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> text = env.addSource(new RichParallelSourceFunction<String>() { private volatile boolean running = true; private volatile long count = 0; // 计数器用于跟踪已生成的数据条数 private final Random random = new Random();
@Override public void run(SourceContext<String> ctx) throws Exception { while (running) { int randomNum = random.nextInt(5) + 1; long timestamp = System.currentTimeMillis();
// 如果生成的是 key2,则在一个新线程中处理延迟 if (randomNum == 2) { new Thread(() -> { try { int delay = random.nextInt(10) + 1; // 随机数范围从1到10 Thread.sleep(delay * 1000); // 增加1到10秒的延迟 ctx.collectWithTimestamp("key" + randomNum + "," + 1 + "," + timestamp, timestamp); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); } else { ctx.collectWithTimestamp("key" + randomNum + "," + 1 + "," + timestamp, timestamp); }
if (++count % 200 == 0) { ctx.emitWatermark(new Watermark(timestamp)); System.out.println("Manual Watermark emitted: " + timestamp); }
ZonedDateTime generateDataDateTime = Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); String formattedGenerateDataDateTime = generateDataDateTime.format(formatter); System.out.println("Generated data: " + "key" + randomNum + "," + 1 + "," + timestamp + " at " + formattedGenerateDataDateTime);
Thread.sleep(1000); // 每次循环后等待1秒 } }
@Override public void cancel() { running = false; } });
DataStream<Tuple3<String, Integer, Long>> tuplesWithTimestamp = text.map(new MapFunction<String, Tuple3<String, Integer, Long>>() { @Override public Tuple3<String, Integer, Long> map(String value) { String[] words = value.split(","); return new Tuple3<>(words[0], Integer.parseInt(words[1]), Long.parseLong(words[2])); } }).returns(Types.TUPLE(Types.STRING, Types.INT, Types.LONG));
// 设置 Watermark 策略 DataStream<Tuple3<String, Integer, Long>> withWatermarks = tuplesWithTimestamp.assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)) .withTimestampAssigner((element, recordTimestamp) -> element.f2) );
// 窗口逻辑 DataStream<Tuple2<String, Integer>> keyedStream = withWatermarks .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>() { private ValueState<Integer> countState;
@Override public void open(Configuration parameters) { countState = getRuntimeContext().getState( new ValueStateDescriptor<>("count", Types.INT) ); }
@Override public void process(String s, Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception { int count = 0; for (Tuple3<String, Integer, Long> element : elements) { count++; }
// 在更新状态之前获取当前状态值 Integer previousCount = countState.value();
// 更新状态 countState.update(count);
// 获取更新后的状态值 Integer updatedCount = countState.value();
// 打印状态更新前后的值 if (previousCount != null) { System.out.println("Key: " + s + ", Previous Count: " + previousCount + ", Updated Count: " + updatedCount); } else { System.out.println("Key: " + s + ", Initial Count: " + updatedCount); }
long start = context.window().getStart(); long end = context.window().getEnd();
ZonedDateTime startDateTime = Instant.ofEpochMilli(start).atZone(ZoneId.systemDefault()); ZonedDateTime endDateTime = Instant.ofEpochMilli(end).atZone(ZoneId.systemDefault());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); String formattedStart = startDateTime.format(formatter); String formattedEnd = endDateTime.format(formatter);
System.out.println("Tumbling Window [start " + formattedStart + ", end " + formattedEnd + ") for key " + s);
// 输出窗口结束时的Watermark long windowEndWatermark = context.currentWatermark(); ZonedDateTime windowEndDateTime = Instant.ofEpochMilli(windowEndWatermark).atZone(ZoneId.systemDefault()); String formattedWindowEndWatermark = windowEndDateTime.format(formatter); System.out.println("Watermark at the end of window: " + formattedWindowEndWatermark);
// 读取状态并输出 Integer currentCount = countState.value(); if (currentCount != null) { out.collect(new Tuple2<>(s, currentCount)); } } });
// 输出结果 keyedStream.print();
// 执行任务 env.execute("Periodic Watermark Demo"); }}

这或许是一个对你有用的开源项目data-warehouse-learning 项目是一套基于 MySQL + Kafka + Hadoop + Hive + Dolphinscheduler + Doris + Seatunnel + Paimon + Hudi + Iceberg + Flink + Dinky + DataRT + SuperSet 实现的实时离线数仓(数据湖)系统,以大家最熟悉的电商业务为切入点,详细讲述并实现了数据产生、同步、数据建模、数仓(数据湖)建设、数据服务、BI报表展示等数据全链路处理流程。

https://gitee.com/wzylzjtn/data-warehouse-learning

https://github.com/Mrkuhuo/data-warehouse-learning

https://bigdatacircle.top/

项目演示:

02

代码获取

https://gitee.com/wzylzjtn/data-warehouse-learning

https://github.com/Mrkuhuo/data-warehouse-learning


03

文档获取

05

进交流群群添加作者

 

推荐阅读

大数据技能圈
分享大数据前沿技术,实战代码,详细文档
 最新文章