Flink 并行运行时Watermark如何向下传递?

文摘   2024-08-28 07:00   重庆  
1. watermark传递视频
Flink 算子并行运行时Watermark如何向下传递呢?
当任务收到watermark时,将执行以下操作:
  • 任务根据watermark的时间戳更新其内部事件时钟。

  • 任务的时间服务会将所有过期的计时器标识出来,它们的时间小于当前的事件时间。对于每个过期的计时器,任务调用一个回调函数,该函数可以执行计算并发送结果。

  • 任务会发出一个带有更新后的事件时间的watermark。

现在,让我们更详细地解释一下任务在接收到新的watermark时,如何继续发送watermark并更新其事件时钟。正如我们在“数据并发和任务并发”中所了解的,Flink将数据流拆分为多个分区,并通过单独的算子任务并行地处理每个分区。每个分区都是一个流,里面包含了带着时间戳的数据和watermark。一个算子与它前置或后续算子的连接方式有多种情况,所以它对应的任务可以从一个或多个“输入分区”接收数据和watermark,同时也可以将数据和watermark发送到一个或多个“输出分区”。接下来,我们将详细描述一个任务如何向多个输出任务发送watermark,以及如何通过接收到的watermark来驱动事件时间时钟前进。
任务为每个输入分区维护一个分区水位线(watermark)。当从一个分区接收到watermark时,它会比较新接收到的值和当前水位值,然后将相应的分区watermark更新为两者的最大值。然后,任务会比较所有分区watermark的大小,将其事件时钟更新为所有分区watermark的最小值。如果事件时间时钟前进了,任务就将处理所有被触发的定时器操作,并向所有连接的输出分区发送出相应的watermark,最终将新的事件时间广播给所有下游任务。
下图显示了具有四个输入分区和三个输出分区的任务如何接收watermark、更新分区watermark和事件时间时钟,以及向下游发出watermark。
具有两个或多个输入流(如Union或CoFlatMap)的算子任务(参见“多流转换”一节)也会以所有分区watermark的最小值作为事件时间时钟。它们并不区分不同输入流的分区watermark,所以两个输入流的数据都是基于相同的事件时间时钟进行处理的。当然我们可以想到,如果应用程序的各个输入流的事件时间不一致,那么这种处理方式可能会导致问题。
Flink的水位处理和传递算法,确保了算子任务发出的时间戳和watermark是“对齐”的。不过它依赖一个条件,那就是所有分区都会提供不断增长的watermark。一旦一个分区不再推进水位线的上升,或者完全处于空闲状态、不再发送任何数据和watermark,任务的事件时间时钟就将停滞不前,任务的定时器也就无法触发了。对于基于时间的算子来说,它们需要依赖时钟的推进来执行计算和清除状态,这种情况显然就会有问题。如果任务没有定期从所有输入任务接收到新的watermark,那么基于时间的算子的处理延迟和状态空间的大小都会显著增加。
对于具有两个输入流而且watermark明显不同的算子,也会出现类似的情况。具有两个输入流的任务的事件时间时钟,将会同较慢的那条流的watermark保持一致,而通常较快流的数据或者中间结果会在state中缓冲,直到事件时间时钟达到这条流的watermark,才会允许处理它们。
代码案例:
package org.bigdatatechcir.learn_flink.part5_flink_watermark;
import org.apache.flink.api.common.eventtime.TimestampAssigner;import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import org.apache.flink.api.common.eventtime.Watermark;import org.apache.flink.api.common.eventtime.WatermarkGenerator;import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import org.apache.flink.api.common.eventtime.WatermarkOutput;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.RestOptions;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;
import java.time.Instant;import java.time.ZoneId;import java.time.ZonedDateTime;import java.time.format.DateTimeFormatter;import java.util.Random;public class PunctuatedWatermarkDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT, "8081"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> text = env.addSource(new RichParallelSourceFunction<String>() { private volatile boolean running = true; private volatile long count = 0; // 计数器用于跟踪已生成的数据条数 private final Random random = new Random();
@Override public void run(SourceContext<String> ctx) throws Exception { while (running) { int randomNum = random.nextInt(5) + 1; long timestamp = System.currentTimeMillis(); ctx.collectWithTimestamp("key" + randomNum + "," + 1 + "," + timestamp, timestamp);
ZonedDateTime generateDataDateTime = Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); String formattedGenerateDataDateTime = generateDataDateTime.format(formatter); System.out.println("Generated data: " + "key" + randomNum + "," + 1 + "," + timestamp + " at " + formattedGenerateDataDateTime); Thread.sleep(1000); } }
@Override public void cancel() { running = false; } });
DataStream<Tuple3<String, Integer, Long>> tuplesWithTimestamp = text.map(new MapFunction<String, Tuple3<String, Integer, Long>>() { @Override public Tuple3<String, Integer, Long> map(String value) { String[] words = value.split(","); return new Tuple3<>(words[0], Integer.parseInt(words[1]), Long.parseLong(words[2])); } }).returns(Types.TUPLE(Types.STRING, Types.INT, Types.LONG));
// 设置 Punctuated Watermark 策略 DataStream<Tuple3<String, Integer, Long>> withWatermarks = tuplesWithTimestamp.assignTimestampsAndWatermarks(new WatermarkStrategy<Tuple3<String, Integer, Long>>() { @Override public WatermarkGenerator<Tuple3<String, Integer, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new PunctuatedWatermarkGenerator(); }
@Override public TimestampAssigner<Tuple3<String, Integer, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return new PunctuatedWatermarkGenerator(); } });
// 窗口逻辑 DataStream<Tuple2<String, Integer>> keyedStream = withWatermarks .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception { int count = 0; for (Tuple3<String, Integer, Long> element : elements) { count++; }
long start = context.window().getStart(); long end = context.window().getEnd();
ZonedDateTime startDateTime = Instant.ofEpochMilli(start).atZone(ZoneId.systemDefault()); ZonedDateTime endDateTime = Instant.ofEpochMilli(end).atZone(ZoneId.systemDefault());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); String formattedStart = startDateTime.format(formatter); String formattedEnd = endDateTime.format(formatter);
System.out.println("Tumbling Window [start " + formattedStart + ", end " + formattedEnd + ") for key " + key);
// 输出窗口结束时的Watermark long windowEndWatermark = context.currentWatermark(); ZonedDateTime windowEndDateTime = Instant.ofEpochMilli(windowEndWatermark).atZone(ZoneId.systemDefault()); String formattedWindowEndWatermark = windowEndDateTime.format(formatter); System.out.println("Watermark at the end of window: " + formattedWindowEndWatermark);
out.collect(new Tuple2<>(key, count)); } });
// 输出结果 keyedStream.print();
// 执行任务 env.execute("Punctuated Watermark Demo"); }
private static class PunctuatedWatermarkGenerator implements WatermarkGenerator<Tuple3<String, Integer, Long>>, TimestampAssigner<Tuple3<String, Integer, Long>> { private long maxTimestamp = Long.MIN_VALUE;
@Override public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) { // 提前事件时间要先判断时间戳字段是否为-1 if (element.f2 != -1) { return element.f2; } else { // 如果为空,返回上一次的事件时间 return recordTimestamp > 0 ? recordTimestamp : 0; } }
@Override public void onEvent(Tuple3<String, Integer, Long> event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); if (event.f0.equals("key2")) { System.out.println("Event: " + event.f0 + "," + event.f1 + "," + event.f2); ZonedDateTime watermarkDateTime = Instant.ofEpochMilli(maxTimestamp).atZone(ZoneId.systemDefault()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); String formattedWatermark = watermarkDateTime.format(formatter); System.out.println("Emitting Watermark: " + formattedWatermark); output.emitWatermark(new Watermark(event.f2)); } }
@Override public void onPeriodicEmit(WatermarkOutput output) { // nothing } }}

这或许是一个对你有用的开源项目data-warehouse-learning 项目是一套基于 MySQL + Kafka + Hadoop + Hive + Dolphinscheduler + Doris + Seatunnel + Paimon + Hudi + Iceberg + Flink + Dinky + DataRT + SuperSet 实现的实时离线数仓(数据湖)系统,以大家最熟悉的电商业务为切入点,详细讲述并实现了数据产生、同步、数据建模、数仓(数据湖)建设、数据服务、BI报表展示等数据全链路处理流程。

https://gitee.com/wzylzjtn/data-warehouse-learning

https://github.com/Mrkuhuo/data-warehouse-learning

https://bigdatacircle.top/

项目演示:

01

代码获取

https://gitee.com/wzylzjtn/data-warehouse-learning

https://github.com/Mrkuhuo/data-warehouse-learning


02

文档获取

03

进交流群群添加作者

 

推荐阅读

大数据技能圈
分享大数据前沿技术,实战代码,详细文档
 最新文章