Flink-Checkpoint机制详解:(第41天)

avatar
作者
猴君
阅读量:0

系列文章目录

1、为什么要学Checkpoint机制
2、Flink怎么实现容错
3、Checkpoint机制的执行流程
4、重启策略Restart Strategy
5、状态后端State Backend
6、开源Flink案例

文章目录


前言

本文通过案例方式详解-Flink-Checkpoint机制

1、为什么要学Checkpoint机制

因为Flink是流式(实时)计算程序,我们工作中希望Flink程序能够7x24小时运行,同时遇到一些问题/bug以后,能够自动恢复程序的运行。

2、Flink怎么实现容错

Flink由于是实时运行的程序,因此不仅要对中间计算的数据进行容错,还需要对程序进行容错。也就是Flink中的容错分为如下两类:

  • 状态后端:对中间计算的数据进行容错
  • 重启策略:对程序进行容错,让程序能够自动恢复

3、Checkpoint机制的执行流程

在这里插入图片描述

步骤如下:

Flink中Checkpoint执行流程: 1- JobManager中的检查点协调器会将barrier栅栏发送给到source算子 2- source算子接收到栅栏以后,先暂停对数据的处理工作,将算子运行的状态数据先保存到TaskManager上形成State状态数据;同时会向检查点协调器上报数据,在检查点协调器中获得到的数据称之为Checkpoint数据。数据上报完以后,才会恢复对数据的处理。 3- 栅栏会随着数据从source算子一直流动到最后的sink算子 4- 每个算子拿到栅栏以后的处理过程与source算子一样。也就是先暂停对数据的处理,在TaskManager上保存State状态数据,以及向检查点协调器汇报Checkpoint数据。然后才会继续处理数据 5- 直到所有的算子将数据汇报完成,那么这个过程才算结束。 

4、重启策略Restart Strategy

重启策略,能够让Flink程序在挂了之后进行自动重启。保证任务容错。既可以在代码中设置,也能够在配置文件中设置,一般推荐使用代码进行设置。

官网链接如下:

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/

Flink有如下的几种重启策略。

  • 不重启策略:一般不用
  • 固定延迟重启策略:频繁使用
  • 失败率重启策略:频繁使用
  • 指数延迟重启策略:较少使用

官网文档如下:

在这里插入图片描述

4.1 不重启策略

Flink程序不重启,如果遇到异常就挂了。

代码中配置:

env.set_restart_strategy(RestartStrategies.no_restart()) 

配置文件flink-conf.yaml中的配置:

restart-strategy: none 
4.2 固定延迟重启策略

允许Flink程序固定可以重启几次。每次重启的时间间隔是多少。这些参数是自己指定的。

代码中配置:

env = StreamExecutionEnvironment.get_execution_environment() env.set_restart_strategy(RestartStrategies.fixed_delay_restart(     3,  #重启的次数     10000  #延迟时间,这里配置的是10000毫秒 ))  如果重启的次数超过了3次,那么不会再给你重启 

配置文件flink-conf.yaml中的配置:

restart-strategy: fixed-delay   #配置固定延迟重启 restart-strategy.fixed-delay.attempts: 3    #重启的次数 restart-strategy.fixed-delay.delay: 10 s    #重启的间隔时间 
4.3 失败率重启策略

在一定的时间范围内,重启的次数在允许范围内,那么会一直给你重启。

代码中配置:

env.set_restart_strategy(RestartStrategies.failure_rate_restart(     3,  #间隔时间内重启的次数     300000,  #时间间隔     10000  #延迟时间,这里配置的是10000毫秒 )) 如果在300000毫秒统计时间以内,重启次数小于等于3次,那么会持续的给你进行重启;如果超过,不会再重启。 

配置文件flink-conf.yaml中配置:

restart-strategy: failure-rate  #配置失败率重启 restart-strategy.failure-rate.max-failures-per-interval: 3   #最大重启的次数 restart-strategy.failure-rate.failure-rate-interval: 5 min   #失败率的时间间隔 restart-strategy.failure-rate.delay: 10 s					 #每次重启的时间间隔 
4.4 指数延迟重启策略

Flink程序的重启时间随着指数的增加而呈指数级别递增。注意:阿里云Flink中没有这种重启策略

在这里插入图片描述

代码中配置:

Python暂不支持。 

配置文件flink-conf.yaml中配置:

restart-strategy: exponential-delay			#配置指数延迟重启 restart-strategy.exponential-delay.initial-backoff: 10 s    #重启的初始值 restart-strategy.exponential-delay.max-backoff: 2 min		#最大从重启时间间隔 restart-strategy.exponential-delay.backoff-multiplier: 2.0  #指数 restart-strategy.exponential-delay.reset-backoff-threshold: 10 min    #重置重启时间 restart-strategy.exponential-delay.jitter-factor: 0.1		#重启因子,抖动因子 
4.5 小结

工作中常用的重启策略:固定延迟重启策略,推荐使用失败率重启策略

重启策略中重启次数并不是设置的越多越好,一般推荐3-5次。

5、状态后端State Backend

用来保存Flink中State和Checkpoint的数据。

分类:

  • 内存状态后端:默认,一般在开发或者测试中使用
  • 文件系统状态后端:经常在生产环境使用,是用来存储Checkpoint数据
  • RocksDB状态后端:经常在生产环境使用,是用来存储State数据

同时,生产中一般都是文件系统状态后端和RocksDB状态后端一起配合使用。

5.1 内存状态后端MemoryStateBackend

这是Flink中默认的状态后端。

在这里插入图片描述

代码中配置:

env.set_state_backend(HashMapStateBackend()) env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage()) 

配置文件flink-conf.yaml中配置:

state.backend: hashmap state.checkpoint-storage: jobmanager 

内存状态后端,由于数据不安全,一般不用。

5.2 文件系统状态后端FsStateBackend

在这里插入图片描述

代码中配置:

env.set_state_backend(HashMapStateBackend()) env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") 

配置文件flink-conf.yaml中的配置:

state.backend: hashmap state.checkpoints.dir: file:///checkpoint-dir/ state.checkpoint-storage: filesystem 

文件系统状态后端一般工作中常用。

5.3 RocksDB数据库状态后端RocksDBStateBackend

在这里插入图片描述

代码中配置:

env.set_state_backend(EmbeddedRocksDBStateBackend()) env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") 

配置文件flink-conf.yaml中的配置:

state.backend: rocksdb state.checkpoints.dir: file:///checkpoint-dir/  state.checkpoint-storage: filesystem 

6、开源Flink案例

Checkpoint的配置一般都是固定不变的,可以配置在flink-conf.yaml文件中,这样配置完后,对所有任务都生效,如下:

建议操作前先保存一个node1的虚拟机快照,下面的操作,全部都在node1执行:

# 1.创建HDFS路径 hdfs dfs -mkdir /checkpoints   # 2.修改Flink配置文件 cd /export/server/flink/conf vim flink-conf.yaml  # 3.要添加的内容如下 注意: 配置文件中的配置项前面不要有# 
execution.checkpointing.interval: 5000 #设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE         execution.checkpointing.mode: EXACTLY_ONCE state.backend: hashmap #设置checkpoint的存储方式 state.checkpoint-storage: filesystem #设置checkpoint的存储位置 state.checkpoints.dir: hdfs://node1:8020/checkpoints #设置savepoint的存储位置 state.savepoints.dir: hdfs://node1:8020/checkpoints #设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃 execution.checkpointing.timeout: 600000 #设置两次checkpoint之间的最小时间间隔 execution.checkpointing.min-pause: 500 #设置并发checkpoint的数目 execution.checkpointing.max-concurrent-checkpoints: 1 #开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个 state.checkpoints.num-retained: 3 #默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动 #清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。 #ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为: #RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。 #DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION  #第二种:固定延迟重启策略 #设置重启策略 restart-strategy: fixed-delay #尝试重启次数 restart-strategy.fixed-delay.attempts: 3 #两次连续重启的间隔时间 restart-strategy.fixed-delay.delay: 10 s 
# 4.改完配置以后,需要重启Flink集群。另外需要注意我们使用了HDFS。 cd /export/server/flink/bin ./stop-cluster.sh ./start-cluster.sh 

Python代码:

#1.构建流式执行环境 from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, DataStream from pyflink.table import DataTypes  env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) # env.set_runtime_mode(RuntimeExecutionMode.STREAMING) #2.数据source input_ds = DataStream(env._j_stream_execution_environment.socketTextStream("node1",9999)) #3.数据处理 def map_word(word):     if word == "error":         raise ValueError("出异常了,程序挂了...")     else:         return (word,1)  result_ds = input_ds.flat_map(lambda x:x.split(" "))\     .map(lambda word:map_word(word),output_type=Types.TUPLE([Types.STRING(),Types.INT()])).\     key_by(lambda x:x[0])\     .reduce(lambda x,y:(x[0],x[1] + y[1])) #4.数据Sink result_ds.print() #5.启动流式任务 env.execute() 

在这里插入图片描述

部署python代码的命令: /export/server/flink/bin/flink run -py /export/software/checkpoint_demo.py 
需要在node1上启动nc nc -lk 9999 

运行结果截图:

在这里插入图片描述

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!