Flink 如何保证数据不丢失?终极大招旁路输出(SideOutput)详解

文摘   2024-09-01 00:07   重庆  
旁路输出在Flink中叫作SideOutput,用途类似于DataStream#split,本质上是一个数据流的切分行为,按照条件将DataStream切分为多个子数据流,子数据流叫作旁路输出数据流,每个旁路输出数据流可以有自己的下游处理逻辑。
             旁路输出[读取数据]---->[处理逻辑1]---------->[数据输出]---->[HDFS]            \--------->[数据输出]----->[mysql]             \-------->[数据输出]----->[clickhouse]
旁路输出的作用
  • 1) 对数据流进行分割,但又不会复制数据流的一种分流机制。
  • 2) 对延迟迟到的数据进行处理,这样就可以不用丢弃迟到的数据。
  • 3) 能有效解决Split算子不能进行连续分流的问题。
这里要注意的是,定义好OutputTag之后,只有在特定的函数中才能使用旁路输出,如下:
  • 1)ProcessFunction;
  • 2)KeyedProcessFunction;
  • 3)CoProcessFunction;
  • 4)ProcessWindowFunction;
  • 5)ProcessAllWindowFunction;
  • 6)ProcessJoinFunction;
  • 7)KeyedCoProcessFunction。
只有在上述函数中才可以通过Context上下文对象,向OutputTag定义的旁路中输出emit数据。
这里总结机制为:
  • 窗口window 的作用是为了周期性的获取数据。
  • watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess是将窗口关闭时间再延迟一段时间。
  • sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。
代码案例:
package org.bigdatatechcir.learn_flink.part5_flink_watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.RestOptions;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import org.apache.flink.util.OutputTag;
import java.time.Duration;import java.time.Instant;import java.time.ZoneId;import java.time.ZonedDateTime;import java.time.format.DateTimeFormatter;import java.util.Random;
public class SideOutputDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT, "8081"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> text = env.addSource(new RichParallelSourceFunction<String>() { private volatile boolean running = true; private volatile long count = 0; // 计数器用于跟踪已生成的数据条数 private final Random random = new Random();
@Override public void run(SourceContext<String> ctx) throws Exception { while (running) { int randomNum = random.nextInt(5) + 1; long timestamp = System.currentTimeMillis();
// 如果生成的是 key2,则在一个新线程中处理延迟 if (randomNum == 2) { new Thread(() -> { try { int delay = random.nextInt(10) + 1; // 随机数范围从1到10 Thread.sleep(delay * 1000); // 增加1到10秒的延迟 ctx.collectWithTimestamp("key" + randomNum + "," + 1 + "," + timestamp, timestamp); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); } else { ctx.collectWithTimestamp("key" + randomNum + "," + 1 + "," + timestamp, timestamp); }
if (++count % 200 == 0) { ctx.emitWatermark(new Watermark(timestamp)); //System.out.println("Manual Watermark emitted: " + timestamp); }
ZonedDateTime generateDataDateTime = Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); String formattedGenerateDataDateTime = generateDataDateTime.format(formatter); //System.out.println("Generated data: " + "key" + randomNum + "," + 1 + "," + timestamp + " at " + formattedGenerateDataDateTime);
Thread.sleep(1000); // 每次循环后等待1秒 } }
@Override public void cancel() { running = false; } });
DataStream<Tuple3<String, Integer, Long>> tuplesWithTimestamp = text.map(new MapFunction<String, Tuple3<String, Integer, Long>>() { @Override public Tuple3<String, Integer, Long> map(String value) { String[] words = value.split(","); return new Tuple3<>(words[0], Integer.parseInt(words[1]), Long.parseLong(words[2])); } }).returns(Types.TUPLE(Types.STRING, Types.INT, Types.LONG));
// 设置 Watermark 策略 DataStream<Tuple3<String, Integer, Long>> withWatermarks = tuplesWithTimestamp.assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> element.f2) );
final OutputTag<Tuple3<String, Integer, Long>> outputTag = new OutputTag<Tuple3<String, Integer, Long>>("side-output"){};


// 窗口逻辑 SingleOutputStreamOperator<Tuple2<String, Integer>> keyedStream = withWatermarks .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sideOutputLateData(outputTag) .process(new ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>() {
@Override public void process(String s, Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception { int count = 0; for (Tuple3<String, Integer, Long> element : elements) { count++; }
long start = context.window().getStart(); long end = context.window().getEnd();
ZonedDateTime startDateTime = Instant.ofEpochMilli(start).atZone(ZoneId.systemDefault()); ZonedDateTime endDateTime = Instant.ofEpochMilli(end).atZone(ZoneId.systemDefault());
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); String formattedStart = startDateTime.format(formatter); String formattedEnd = endDateTime.format(formatter);
//System.out.println("Tumbling Window [start " + formattedStart + ", end " + formattedEnd + ") for key " + s);
// 输出窗口结束时的Watermark long windowEndWatermark = context.currentWatermark(); ZonedDateTime windowEndDateTime = Instant.ofEpochMilli(windowEndWatermark).atZone(ZoneId.systemDefault()); String formattedWindowEndWatermark = windowEndDateTime.format(formatter); //System.out.println("Watermark at the end of window: " + formattedWindowEndWatermark);
out.collect(new Tuple2<>(s, count));
} });
// 输出结果 keyedStream.print();
DataStream<Tuple3<String, Integer, Long>> lateStream = keyedStream.getSideOutput(outputTag);
lateStream.print();
// 执行任务 env.execute("Side Output Demo"); }}

这或许是一个对你有用的开源项目data-warehouse-learning 项目是一套基于 MySQL + Kafka + Hadoop + Hive + Dolphinscheduler + Doris + Seatunnel + Paimon + Hudi + Iceberg + Flink + Dinky + DataRT + SuperSet 实现的实时离线数仓(数据湖)系统,以大家最熟悉的电商业务为切入点,详细讲述并实现了数据产生、同步、数据建模、数仓(数据湖)建设、数据服务、BI报表展示等数据全链路处理流程。

https://gitee.com/wzylzjtn/data-warehouse-learning

https://github.com/Mrkuhuo/data-warehouse-learning

https://bigdatacircle.top/

项目演示:

03

代码获取

https://gitee.com/wzylzjtn/data-warehouse-learning

https://github.com/Mrkuhuo/data-warehouse-learning


04

文档获取

05

进交流群群添加作者

 

推荐阅读

大数据技能圈
分享大数据前沿技术,实战代码,详细文档
 最新文章