1、广播状态 (Broadcast State)
广播状态是一种特殊的算子状态,支持将一个流中的元素需要广播到所有下游任务的使用情形,广播状态用于保持所有子任务状态相同。
2、广播状态和其他算子状态的区别:
- 它具有 map 格式,
- 它仅在一些特殊的算子中可用,这些算子的输入为一个广播数据流和非广播数据流,
- 这类算子可以拥有不同命名的多个广播状态 。
3、广播状态 API
案例:存在一个序列,序列中的元素是具有不同颜色与形状的图形,希望在序列里相同颜色的图形中寻找满足一定顺序模式的图形对(比如在红色的图形里,有一个长方形跟着一个三角形)同时希望寻找的模式也会随着时间而改变。
定义两个流,一个流包含图形(Item)
,具有颜色
和形状
两个属性;另一个流包含特定的规则(Rule)
,代表希望寻找的模式,在图形
流中,首先使用颜色
将流进行进行分区(keyBy),确保相同颜色的图形会流转到相同的物理机上。
// 将图形使用颜色进行划分 KeyedStream<Item, Color> colorPartitionedStream = itemStream .keyBy(new KeySelector<Item, Color>(){...});
规则
流应该被广播到所有的下游 task 中,下游 task 应当存储这些规则并根据它寻找满足规则的图形对。
- 将
规则
广播给所有下游 task; - 使用
MapStateDescriptor
来描述并创建 broadcast state 在下游的存储结构。
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构 MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {})); // 广播流,广播规则并且创建 broadcast state BroadcastStream<Rule> ruleBroadcastStream = ruleStream .broadcast(ruleStateDescriptor);
使用规则
来筛选图形
序列:
- 将两个流关联起来
- 完成模式识别逻辑
为关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream
),可以调用非广播流的方法 connect()
,并将 BroadcastStream
当做参数传入;这个方法的返回参数是 BroadcastConnectedStream
,具有 process()
方法,可以传入一个特殊的 CoProcessFunction
来编辑模式识别逻辑,具体传入 process()
的是哪个类型取决于非广播流的类型:
- 如果流是一个 keyed 流,那就是
KeyedBroadcastProcessFunction
类型; - 如果流是一个 non-keyed 流,那就是
BroadcastProcessFunction
类型。
DataStream<String> output = colorPartitionedStream .connect(ruleBroadcastStream) .process( // KeyedBroadcastProcessFunction 中的类型参数表示: // 1. key stream 中的 key 类型 // 2. 非广播流中的元素类型 // 3. 广播流中的元素类型 // 4. 结果的类型,在这里是 string new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() { // 模式匹配逻辑 } );
4、BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
在传入的 BroadcastProcessFunction
或 KeyedBroadcastProcessFunction
中,需要实现两个方法;processBroadcastElement()
负责处理广播流中的元素,processElement()
负责处理非广播流中的元素。
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception; public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception; } public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> { public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception; public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception; }
注意: processBroadcastElement()
处理广播流的元素, processElement()
处理另一个流的元素,两个方法的第二个参数(Context)不同。
得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) 查询元素的时间戳:ctx.timestamp() 查询目前的Watermark:ctx.currentWatermark() 目前的处理时间(processing time):ctx.currentProcessingTime() 产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)
在 getBroadcastState()
方法中传入的 stateDescriptor
应该与调用 .broadcast(ruleStateDescriptor)
的参数相同。
注意:对于 broadcast state 的访问权限,在处理广播流元素这端,是具有读写权限的,而对于处理非广播流元素这端是只读的;因为 Flink 中是不存在跨 task 通讯的,为了保证 broadcast state 在所有的并发实例中是一致的,在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的, 那么最终所有 task 得到的 broadcast state 是一致的。
processBroadcastElement()
的实现必须在所有的并发实例中具有确定性的结果。
KeyedBroadcastProcessFunction
在 Keyed Stream 上工作,提供了一些 BroadcastProcessFunction
没有的功能:
1.processElement()
的参数 ReadOnlyContext
提供了方法访问 Flink 的定时器服务,可以注册事件时间定时器(event-time timer)或处理时间定时器(processing-time timer);当定时器触发时,会调用 onTimer()
方法, 提供了 OnTimerContext
,它具有 ReadOnlyContext
的全部功能,并且提供:
- 查询当前触发的是一个事件时间还是处理时间的定时器
- 查询定时器关联的key
2.processBroadcastElement()
方法中的参数 Context
会提供方法 applyToKeyedState(StateDescriptor stateDescriptor, KeyedStateFunction function)
;这个方法使用一个 KeyedStateFunction
能够对 stateDescriptor
对应的 state 中所有 key 的存储状态进行某些操作。
注册定时器只能在
KeyedBroadcastProcessFunction
的processElement()
方法中进行,在processBroadcastElement()
方法中不能注册定时器,因为广播的元素中并没有关联的 key。
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() { // 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素 // 用一个数组来存储,因为同时可能有很多第一个元素正在等待 private final MapStateDescriptor<String, List<Item>> mapStateDesc = new MapStateDescriptor<>( "items", BasicTypeInfo.STRING_TYPE_INFO, new ListTypeInfo<>(Item.class)); // 与之前的 ruleStateDescriptor 相同 private final MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {})); @Override public void processBroadcastElement(Rule value, Context ctx, Collector<String> out) throws Exception { ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value); } @Override public void processElement(Item value, ReadOnlyContext ctx, Collector<String> out) throws Exception { final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc); final Shape shape = value.getShape(); for (Map.Entry<String, Rule> entry : ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) { final String ruleName = entry.getKey(); final Rule rule = entry.getValue(); List<Item> stored = state.get(ruleName); if (stored == null) { stored = new ArrayList<>(); } if (shape == rule.second && !stored.isEmpty()) { for (Item i : stored) { out.collect("MATCH: " + i + " - " + value); } stored.clear(); } // 不需要额外的 else{} 段来考虑 rule.first == rule.second 的情况 if (shape.equals(rule.first)) { stored.add(value); } if (stored.isEmpty()) { state.remove(ruleName); } else { state.put(ruleName, stored); } } } }
3.注意事项
- 没有跨 task 通讯:只有在
(Keyed)-BroadcastProcessFunction
中处理广播流元素的方法里可以更改 broadcast state 的内容;同时,需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。 - broadcast state 在不同的 task 的事件顺序可能是不同的:虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同,所以 broadcast state 的更新不能依赖于流中元素到达的顺序。
- 所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint;防止在作业恢复后读文件造成的文件热点;Flink 会保证在恢复状态/改变并发的时候数据没有重复且没有缺失,在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state,在增大并发的情况下,task 会读取本身的 state,多出来的并发(
p_new
-p_old
)会使用轮询调度算法读取之前 task 的 state。 - 不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需保证内存充足,同样适用于其它 Operator State。