1时间语义
flink种设计时间的不同概念:
- 1 Event Time:事件时间,指代事件创建的时间,指代数据中的时间错带指代事件时间,Flink通过时间戳分配器访问事件时间
- 2 Ingestion Time: 摄入时间:指代数据进入Flink的时间
- 3 Processing Time:进程时间:数据执行算子的处理时间
1 EventTime 的引入:
在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment // 从调用时刻开始给env创建的每一个stream追加时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2 Watermark
1 watermark概述:
1 在eventTime事件时间中,Flink接收事件的数据不是严格按照事件时间进行排序,会出现乱序,需要watermark进行处理乱序的一种机制
2 一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。
2 watermark的理论知识
Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime- 延迟时长,
也就是说,Watermark是基于数据携带的时间戳生成的,
一旦Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。
由于event time是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
图解案例:
乱序流的Watermarker如下图所示:(Watermark设置为2)
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,
时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,
窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2
Watermark就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有数据都会收入窗中。
只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。
3 理论小结
watermark是用来处理按照事件时间出现乱序的一种机制
4 Watermark引入
dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.milliseconds(1000)) { @Override public long extractTimestamp(element: SensorReading): Long = { return element.getTimestamp() * 1000L;//获取事件数据的时间戳作为事件时间 } } );
我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。
Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置事件时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 在添加数据源时候设置时间戳以及watermark DataStream<SensorReading> dataStream = env.addSource(new SensorSource()) .assignTimestampsAndWatermarks(new MyAssigner());// 这里就可以自定义事件语义的时间戳
MyAssigner有两种类型(分配时间戳的接口)
- AssignerWithPeriodicWatermarks
- AssignerWithPunctuatedWatermarks
以上两个接口都继承自TimestampAssigner。
watermark生产的时间间隔,怎么周期性生成watermark设置
可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。
// 每隔5秒产生一个watermark env.getConfig.setAutoWatermarkInterval(5000);
以上代码解析:
产生watermark的逻辑:每隔5秒钟,
Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。
如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。
这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark。
周期性获取时间戳的例子:自定义周期性时间戳分配器
// 自定义周期性时间戳分配器 public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading>{ private Long bound = 60 * 1000L; // 延迟一分钟 private Long maxTs = Long.MIN_VALUE; // 当前最大时间戳 @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(maxTs - bound); } @Override public long extractTimestamp(SensorReading element, long previousElementTimestamp) { maxTs = Math.max(maxTs, element.getTimestamp()); return element.getTime
不周期性:自定义时间戳:
如果数据是单调递增:AscendingTimestampExtractor,这个类会直接使用数据的时间戳生成watermark。
代码如下:
DataStream<SensorReading> dataStream = … dataStream.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<SensorReading>() { @Override public long extractAscendingTimestamp(SensorReading element) { return element.getTimestamp() * 1000; } });
乱序数据流,如果能大致估算出最大延迟时间,则使用 BoundedOutOfOrdernessTimestampExtractor(Time.seconds(1)),
这个可以根据事件的时间戳减去1S,作为时间戳
DataStream<SensorReading> dataStream = … dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp() * 1000L; } });