0、批模式和流模式对比表
类别 | 流模式 | 批模式 |
---|---|---|
任务调度 | 所有任务需要持续运行,消耗资源大 | 任务可以按Shuffle分阶段执行,消耗资源小 |
Shuffle | 记录会被流水线式的持续发送到下游任务,在网络上进行缓冲 | 可以保存Shuffle分阶段执行的中间结果 |
State Backends & State | 使用StateBackend控制状态的存储方式和检查点的工作方式 | 对输入按key分组排序,依次处理一个key的所有记录,在同一时间只保留一个key的状态,当处理到下一个key时,上一个key的状态将被丢弃 |
处理顺序 | 对于用户自定义函数,数据一到达就被处理 | 广播输入》常规输入》keyed 输入 |
事件时间 | 存在数据乱序的可能 | 不存在数据乱序的可能,可以按时间排序 |
水位线 | 使用Watermark(一个带有时间戳 T 的水印)标志再没有时间戳 t < T 的元素出现 | 在当前key的末尾有一个 MAX_WATERMARK ,仅在每个key末尾触发定时器,在下一个key的开始有一个MIN_WATERMARK |
处理时间 | 事件时间和处理时间存在相关性 | 事件时间和处理时间相关性不存在,允许用户请求当前的处理时间,并注册处理时间计时器,仅在每个key末尾触发计时器 |
故障恢复 | 使用 checkpoint 进行故障恢复,发生故障时,从 checkpoint 重新启动所有正在运行的任务 | 尝试回溯到之前的中间结果仍可获取的处理阶段,只有失败的任务才需要重新启动 |
1)概述
1.流处理和批处理概述
DataStream API 的流(STREAMING)
执行模式,适用于需要连续增量处理,而且常驻线上的无边界作业。
DataStream API 的批(BATCH)
执行模式,类似于 MapReduce 等批处理框架,适用于已知输入、不会连续运行的的有边界作业。
Flink 对流处理和批处理采取统一的处理方式,无论配置何种执行模式,在有界输入上执行的 DataStream 应用都会产生相同的最终 结果;在流
模式执行的作业可能会产生增量更新(类似于数据库中的插入(upsert)操作),而批
作业只在最后产生一个最终结果。
通过启用批
执行模式,Flink 可以对有边界作业进行额外的优化:可以使用不同的关联(join)/ 聚合(aggregation)策略、不同 shuffle 实现来提高任务调度和故障恢复的效率等。
2.批执行模式选择时机
批
执行模式只能用于 有边界 的作业/Flink 程序。
边界:是数据源的一个属性,表明是否在执行之前已经知道来自该数据源的所有输入,或者新数据是否会无限期地出现;如果作业的所有源都是有边界的,则它就是有边界的,否则就是无边界的。
流
执行模式,既可用于有边界任务,也可用于无边界任务。
批
执行模式,只用于有边界任务。
使用流
模式运行有边界作业:
- 使用有边界作业的运行结果去初始化作业状态,并将该状态在之后的无边界作业中使用;例如,通过
流
模式运行一个有边界作业,获取一个 savepoint,然后在一个无边界作业上恢复这个 savepoint【将 savepoint 作为批
执行作业的附加输出】; - 为无边界数据源写测试代码时,使用有边界数据源更自然。
3.配置批执行模式
执行模式可以通过 execution.runtime-mode
配置,可选值如下:
STREAMING
: 经典 DataStream 执行模式(默认)BATCH
: 在 DataStream API 上进行批量式执行AUTOMATIC
: 让系统根据数据源的边界性来决定
可以通过 bin/flink run ...
的命令行参数配置,或者在创建/配置 StreamExecutionEnvironment
时写进程序。
案例:通过命令行配置执行模式
bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>
案例:在代码中配置执行模式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH);
注意:不建议在程序中设置运行模式,而是在提交应用程序时使用命令行设置,保持应用程序代码的免配置可以让程序更加灵活,因为同一个应用程序可能在任何执行模式下执行。
4.执行行为
a)任务调度与网络 Shuffle
在批
执行模式中,因为输入数据是有边界的,Flink可以使用更高效的数据结构和算法来进行任务调度和网络 shuffle。
案例:任务调度和网络传输的差异
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.fromElements(...); source.name("source") .map(...).name("map1") .map(...).name("map2") .rebalance() .map(...).name("map3") .map(...).name("map4") .keyBy((value) -> value) .map(...).name("map5") .map(...).name("map6") .sinkTo(...).name("sink");
map()
、 flatMap()
或 filter()
,可以依靠算子链链在一起,直接将数据转发到下一个操作,不涉及网络 shuffle。
keyBy()
或 rebalance()
,需要在不同的任务并行实例之间传输数据,涉及网络 shuffle。
对于上面的例子,Flink 会将操作分组为以下任务
- 任务1:
source
、map1
和map2
- 任务2:
map3
和map4
- 任务3:
map5
、map6
和sink
在任务1到任务2、任务2到任务3之间各有一次网络 shuffle,作业的可视化表示如下
i)流执行模式
在**流
执行模式下**,所有任务需要持续运行;Flink可以通过整个管道立即处理新的记录,以达到需要的连续和低延迟的流处理;同时分配给某个作业的 TaskManagers 需要有足够的资源来同时运行所有的任务。
网络 shuffle 是 流水线 式的,记录会立即发送给下游任务,在网络层上进行一些缓冲;当处理连续的数据流时,在任务(或任务管道)之间没有可以实体化的自然数据点(时间点),而在批
执行模式下,中间的结果可以被实体化。
ii)批执行模式
在**批
执行模式下**,作业的任务可以分阶段执行;因为输入是有边界的,因此 Flink 可以在进入下一个阶段之前完全处理管道中的一个阶段;在上面的例子中,工作会有三个阶段,对应着被 shuffle 界线分开的三个任务。
分阶段处理要求 Flink 将任务的中间结果实体化到非永久存储中,让下游任务在上游任务下线后再读取,将增加处理的延迟,但这允许 Flink 在故障发生时回溯到最新的可用结果,而不是重新启动整个任务;同时批
作业可以在更少的资源上执行。
TaskManagers 将至少在下游任务开始消费前保留中间结果,在这之后,只要空间允许,中间结果就会被保留,以便任务失败回滚。
b)State Backends / State
在流
模式下,Flink 使用 StateBackend 控制状态的存储方式和检查点的工作方式。
在批
模式下,配置的 state backend 被忽略;对输入按 key 分组(使用排序),依次处理一个 key 的所有记录,以便在同一时间只保留一个 key 的状态,当进行到下一个 key 时,上一个 key 的状态将被丢弃。
c)处理顺序
在批
执行和流
执行中,算子或用户自定义函数(UDF)处理记录的顺序可能不同。
在流
模式下,对于用户自定义函数,数据一到达就被处理。
在批
模式下,Flink 确保数据有序,排序可以是特定调度任务、网络 shuffle、上文提到的 state backend 等。
将常见输入类型分为三类
- 广播输入(broadcast input): 从广播流输入【广播状态】;
- 常规输入(regular input): 从广播或 keyed 输入;
- keyed 输入(keyed input): 从
KeyedStream
输入;
消费多种类型输入的函数或算子,处理顺序如下
- 广播输入第一个处理
- 常规输入第二个处理
- keyed 输入最后处理
对于从多个常规或广播输入进行消费的函数 — 比如 CoProcessFunction
— Flink 有权从任一输入以任意顺序处理数据。
对于从多个keyed输入进行消费的函数 — 比如 KeyedCoProcessFunction
— Flink 先处理单一键中的所有记录再处理下一个。
d)事件时间/水印
在流
模式下,存在数据乱序的可能,使用 Watermark(一个带有时间戳 T
的水印)标志再没有时间戳 t < T
的元素跟进。
在批
模式下,输入的数据集是事先已知的,至少可以按照时间戳对元素进行排序,从而按照时间顺序进行处理,不存在数据乱序的可能。
综上:在批
模式下,只需要在输入的末尾有一个与每个键相关的 MAX_WATERMARK
,如果输入流没有键,则在输入的末尾需要一个 MAX_WATERMARK
;所有注册的定时器都会在 时间结束 时触发,用户定义的 WatermarkAssigners
或 WatermarkStrategies
会被忽略;但配置 WatermarkStrategy
是有用的,因为它的 TimestampAssigner
仍然会被用来给记录分配时间戳。
e)处理时间
处理时间是指在处理记录的具体实例上,处理记录的机器上的时钟时间,基于处理时间的计算结果是不可重复的,因为同一条记录被处理两次,会有两个不同的时间戳。
在流
模式下,事件时间和处理时间存在相关性;在流
模式下事件时间的1小时
也几乎是处理时间的1小时
,使用处理时间可以用于早期(不完全)触发,给出预期结果的提示。
在批
模式下,事件时间和处理时间相关性不存在,因为输入的数据集是静态的;允许用户请求当前的处理时间,并注册处理时间计时器,但与事件时间的情况一样,所有的计时器都要在输入结束时触发。
在作业执行过程中,处理时间不会提前,当整个输入处理完毕后,会快进到时间结束。
f)故障恢复
在流
模式下,Flink 使用 checkpoint 进行故障恢复,在发生故障时,Flink 会从 checkpoint 重新启动所有正在运行的任务。
在批
模式下,Flink 会尝试回溯到之前的中间结果仍可获取的处理阶段,只有失败的任务才需要重新启动,相比从 checkpoint 重新启动所有任务,可以提高作业的处理效率和整体处理时间。
5.重要的考虑因素
a)概述
批
模式下的行为变化
- “滚动"操作,如 reduce() 或 sum(),会对
流
模式下每一条新记录发出增量更新,在批
模式下,只发出最终结果。
批
模式下不支持
- Checkpointing 和任何依赖于 checkpointing 的操作都不支持。
b)Checkpointing
批处理程序的故障恢复不使用检查点,因为没有 checkpoints,某些功能如 CheckpointListener ,以及因此,Kafka 的精确一次(EXACTLY_ONCE) 模式或 File Sink
的 OnCheckpointRollingPolicy 将无法工作。
仍然可以使用所有的状态原语(state primitives),只是用于故障恢复的机制会有所不同。
c)编写自定义算子
注意:在流
模式下运行良好的 Operator 可能会在批
模式下产生错误的结果。
- 在
批
模式下,会逐个 key 处理记录;因此,Watermark 会在每个 key 之间从MAX_VALUE
切换到MIN_VALUE
; - Watermark 在一个算子中不总是递增的。
- 定时器将首先按 key 的顺序触发,然后按每个 key 内的时间戳顺序触发。
- 不支持手动更改 key 的操作。
6.总结
1、即使不配置执行模式,在有界输入上执行的DataStream应用会产生相同的最终结果,但是配置批处理模式执行有界作业后,可以执行额外的优化。 2、可以通过代码【不建议】和命令行【建议】配置运行模式。 3、相同的算子在批模式下和流模式下可能会产生不同的结果(水位线的触发=>定时器的触发、窗口的触发)。 4、批执行模式和流执行模式均可以配置重试策略(RestartStrategies)来重启任务,但只有流执行模式支持检查点(Checkpoint)。