系列文章目录
一、Flink架构(掌握)
二、Flink代码案例(掌握)
三、UDF(熟悉)
四、Flink常见面试题整理
文章目录
前言
本文主要详解了Flink架构,通过案例详解Flink流式开发。
提示:以下是本篇文章正文内容,下面案例可供参考
一、Flink架构(掌握)
1、系统架构
官网的架构图如下:
讲义的架构如下:
1.1 通信(了解)
Spark的通信:在1.6版本及之前,用的是akka通信框架,在1.6之后,用的是netty。
Flink的通信:akka通信框架。
1.2 JobManager
作用:管理众多的TaskManager从节点。负责任务分配和资源管理
JobManager中包括如下3个组件:
ResourceManager:这是Flink自己的资源管理器。要和Yarn的ResourceManager区分开来。
JobMaster:**作业调度器。**负责向资源管理器申请资源;分配任务给到TaskManager进行执行
Dispatcher:分发器。用来接收Client进程提交的Flink任务,然后去启动JobMaster,将Flink任务转发给JobMaster
1.3 TaskManager
作用:接收JobManager分配过来的任务;同时向JobManager汇报Task执行状态、心跳等信息
1.4 Scheduler
Spark中的调度器:DAGScheduler和TaskScheduler
- DAGScheduler:将Job任务形成DAG有向无环图和划分Stage阶段,确定每个Stage阶段有多少个Task线程
- TaskScheduler:将DAGScheduler发送过来的TaskSet中的Task线程任务分配给到Executor进程进行执行
Flink:JobMaster作业调度器。负责向资源管理器申请资源;分配任务给到TaskManager进行执行
1.5 Checkpoint Coordinator
检查点协调器。主要负责Checkpoint的操作,对Flink程序进行容错。
1.6 Memory & IO Manager
内存和IO管理器。负责TaskManager的内存和IO管理
1.7 Network Manager
网络管理器,负责不同节点间的Slot进行数据的交换。分为如下3种场景:
# 场景1: 同一个节点,同一个TaskManager的不同Slot间 举例: 你和你的同学都在广州黑马的218教室学习 数据交换效率最高,而且不需要经过网络管理器 # 场景2: 同一个节点,不同的TaskManager的Slot间 举例: 你和你的同学都在广州黑马,但是在不同教室 数据交换效率中等,而且不需要经过网络管理器 # 场景3: 不同节点Slot间 举例: 你在广州黑马,你的同学在深圳黑马 数据交换效率最低,而且需要经过网络管理器
1.8 Client
只是负责任务的提交。提交成功后,其实可以断开了。在命令提交任务时,可以指定-d
参数来配置。
如果配置了-d
,则说明客户端和集群断开了。
2、任务提交流程
2.1 抽象提交流程
1- Flink任务(App)通过Client客户端提交给到分发器 2- 分发器接收到Flink任务以后,接着去启动JobManager中的JobMaster,并且将Flink任务提交给到JobMaster 3- JobMaster接收到Flink任务以后,向ResourceManager资源管理器申请Slot资源 4- 资源管理器接收到资源申请之后,首先启动新的TaskManager 5- 新的TaskManager启动以后,会反向注册回资源管理器,并且告诉它我目前有多少Slot的资源 6- 资源管理器命令TaskManager将空闲的Slot资源提供出来 7- TaskManager接收到资源提供的命令以后,将资源给到JobMaster 8- JobMaster申请到资源以后,将任务分配给到具体的TaskManager进行执行
2.2 Standalone模式提交流程
(1)客户端提交任务到Dispacher(分发器)
(2)Dispacher分发器启动JobMaster
(3)JobMaster启动后,它会向JobManager的ResourceManager(资源管理器)请求资源(slot)
(4)JobManager的ResourceManager(资源管理器)向TaskManager请求资源(slot)
(5)TaskManager会向JobMaster提供资源(slot)
(6)JobMaster收到资源后,会向TaskManager提交(分发)任务
(7)TaskManager收到任务后,就在Slot上执行
(8)任务执行完后,释放资源
注意:Standalone模式下,Slot资源使用完了以后,那么无法继续提交Flink程序,会报错。
/export/server/flink/bin/flink run -py /export/software/checkpoint_demo.py
2.3 Yarn-session模式提交流程
如果需要把任务提交在Yarn-Session下运行,则分为2步:
- 初始化Yarn-session集群
- 提交任务
首先看第一步。
2.3.1 初始化Session集群
(1)请求Yarn的ResourceManager(资源管理器)
(2)Yarn的ResourceManager收到请求后,会启动一个Container(容器),当然这个容器就是ApplicationMaster(AppMaster)
(3)这个AppMaster就是Flink的JobManager,这个JobManager会初始化Dispacher和ResourceManager(资源管理器)
这里还没有初始化TaskManager,因此集群没有slot资源
2.3.2 提交任务
(1)客户端提交任务给JobManager(AppMaster)的分发器(Dispacher)
(2)分发器收到任务后,会启动JobMaster
(3)JobMaster启动后,会向JobManager(AppMaster)请求资源(slot)
(4)JobManager会向Yarn的ResourceManager请求资源
(5)Yarn的ResourceManager收到请求后,会在闲置的节点动态启动Container(TaskManager)
(6)Container启动成功后,会注册给AppMaster(JobManager)的ResourceManager
(7)Container会向AppMaster(JobManager)的JobMaster提供资源(slot)
(8)JobMaster会把任务分发给Container(TaskManager)去执行
(9)待任务执行完后,Container(TaskManager)会被AppMaster(JobManager),最终留下JobManager,这个不会被销毁
2.4 Yarn-per-job模式提交流程
(1)客户端提交任务给Yarn的ResourceManager
(2)Yarn的ResourceManager收到请求后,会启动一个Container(AppMaster),这个AppMaster就是Flink的JobManager
(3)JobManager里有任务调度器和资源管理器,任务调度器就会开始调度任务,向JobManager的资源管理器申请资源
(4)JobManager的资源管理器它会向Yarn的ResourceManager申请资源
(5)Yarn的ResourceManager会动态启动Container(TaskManager),这些Container就是资源
(6)这些Container启动后,会反向注册给AppMaster(JobManager)
(7)这些Container向JobMaster提供资源
(8)JobMaster收到资源后,把任务分发给Container(TaskManager)去执行
(9)任务执行完后,AppMaster(JobManager)会把Container(TaskManager)注销
(10)AppMaster(JobManager)会向Yarn的ResourceManager注销自己
2.5 Yarn-application模式提交流程
与Yarn-per-job的区别是Client进程运行的地方不一样。application模式是在集群中随机找一个从节点启动和运行Client进程。Flink程序的提交流程与Yarn-per-job完全一样。
3、一些重要的概念
3.1 程序流程图
3.2 一些概念
- 层级关系
Spark层级关系:Spark的应用 > Job任务 > DAG有向无环图 > Stage阶段 > Task线程任务
Flink层级关系:Flink的应用 > Job任务 > DAG有向无环图 > 算子链 > Task线程任务 > SubTasks子任务
- 并行度
运行同时运行的任务数。Flink的并行度的设置如下:
#1.默认,在配置文件中,优先级最低。不推荐使用 在flink-conf.yaml中可配置 #2.任务提交时指定(推荐) bin/flink run -p 3 xxxx.jar #3.在全局代码中配置 env.setParallelism(1) #4.在算子中,优先级最高 ...reduce().setParllelism(1)
- 算子&算子链
算子:每一个对数据处理的方法/API都称之为算子。
算子链:把窄依赖的算子合并在一起。算子链能够提升数据处理效率
- 宽依赖&窄依赖
Spark
宽依赖:Shuffle Dependency
窄依赖:Narrow Dependency
Flink
宽依赖(重分区):redistributing dependency
窄依赖(一对一):one-to-one dependency
- 概念
Job:Flink的程序
Task:Flink的并行度
SubTask:每个任务中的子任务数
- Slot槽&槽共享
槽:slot,是集群的静态资源,在Standalone模式下,槽是预先配置的,不能更改。如果要改,改完后需要重启集群。
Yarn模式,可以通过启动多个TaskManager来动态初始化多个slot槽。
slot是运行Flink的单位。Flink任务必须运行在slot里。
slot和并行度是有关联的。并行度的数量不能超过可用slot的数量。
槽共享:一个槽可以运行不同Task下的多个SubTask。
不同的Task下的相同SubTask,尽量在同一个slot上执行,这是为了提升程序的执行效率。这就是槽共享
相同的Task下的SubTask,一定不会在同一个slot上执行,这是为了充分利用集群资源,达到并行效果。
二、Flink代码案例(掌握)
1、需求
使用代码来实现Flink的wordcount案例。
SparkCore版的WordCount实现过程: 1- 创建顶级对象SparkContext 2- 数据输入 3- 数据处理 3.1- 文本内容切分: flatMap 3.2- 数据格式转换: map 3.3- 分组聚合: reduceByKey 4- 数据输出 5- 释放资源
2、Flink流式程序开发流程
1- 创建流式执行环境 2- 数据输入 3- 数据处理 4- 数据输出 5- 启动流式任务
Flink中算子的分类: 1- source算子: 数据读取 2- transformation算子: 数据处理 3- sink算子: 数据输出
3、创建项目
前提条件:无论是在远程Linux环境还是本地Windows环境。要想成功开发Python版Flink,都需要有Python环境。
推荐如下的操作,在虚拟机集群的所有节点上都执行一次:
#1.保证有Python3.6、3.7或者3.8 python -V #2.安装flink依赖 python -m pip install apache-flink==1.15.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
4、实现
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/datastream_tutorial/
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/
4.1 批案例
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode import os os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' if __name__ == '__main__': # 1 - 创建流式执行环境 # 1.1- 得到顶级对象 env = StreamExecutionEnvironment.get_execution_environment() # 1.2- 设置运行模式:批处理 env.set_runtime_mode(RuntimeExecutionMode.BATCH) # 1.3- 设置并行度:全局并行度 env.set_parallelism(1) # 2 - 数据输入 init_ds = env.read_text_file(file_path="file:///export/data/flink_base/content.txt",charset_name="UTF-8") # 3 - 数据处理 # 3.1- 将文本内容切分得到一个个单词 """ lambda 形参1,形参2... : 单行代码 """ flatmap_ds = init_ds.flat_map(lambda line: line.split(" ")) # 3.2- 将单词转成元组 map_ds = flatmap_ds.map(lambda word: (word,1)) # 3.3- 按照单词分组 keyby_ds = map_ds.key_by(lambda tup: tup[0]) # 3.4- 对单词的次数进行聚合 """ rdd.reduceByKey(lambda agg,curr: agg+curr) """ # 错误代码 # result = keyby_ds.reduce(lambda agg,curr: agg+curr) result = keyby_ds.reduce(lambda tup1,tup2: (tup1[0],tup1[1]+tup2[1])) # 4 - 数据输出 result.print() # 5 - 启动流式任务 env.execute()
运行结果截图:
可能遇到的错误:
原因: 1- 服务器上没有安装JDK;2- 安装了JDK,但是在代码中没有明确告诉程序JDK在什么地方 解决办法: 在flink代码文件上面添加如下内容 import os os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
4.2 流案例
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, DataStream import os os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' if __name__ == '__main__': # 1 - 创建流式执行环境 # 1.1- 得到顶级对象 env = StreamExecutionEnvironment.get_execution_environment() # 1.2- 设置运行模式:流处理 env.set_runtime_mode(RuntimeExecutionMode.STREAMING) # 1.3- 设置并行度:全局并行度 env.set_parallelism(1) # 2 - 数据输入 init_ds = DataStream(env._j_stream_execution_environment.socketTextStream("192.168.88.161",9999)) # 3 - 数据处理 # 3.1- 将文本内容切分得到一个个单词 """ lambda 形参1,形参2... : 单行代码 """ flatmap_ds = init_ds.flat_map(lambda line: line.split(" ")) # 3.2- 将单词转成元组 map_ds = flatmap_ds.map(lambda word: (word,1)) # 3.3- 按照单词分组 keyby_ds = map_ds.key_by(lambda tup: tup[0]) # 3.4- 对单词的次数进行聚合 """ rdd.reduceByKey(lambda agg,curr: agg+curr) """ # 错误代码 # result = keyby_ds.reduce(lambda agg,curr: agg+curr) result = keyby_ds.reduce(lambda tup1,tup2: (tup1[0],tup1[1]+tup2[1])) # 4 - 数据输出 result.print() # 5 - 启动流式任务 env.execute()
运行结果截图:
可能遇到的错误:
原因: 9999端口号没有启动 解决办法: 提前在程序运行前,在node1上执行nc -lk 9999
4.3 SQL案例
import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment os.environ['FLINK_HOME'] = '/export/server/flink' os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' if __name__ == '__main__': # 1- 创建流式执行任务 # 1.1- 创建顶级对象 env = StreamExecutionEnvironment.get_execution_environment() # 1.2- 得到FlinkSQL层级的顶级对象 table = StreamTableEnvironment.create(stream_execution_environment=env) # 1.3- 设置并行度:全局并行度 # env.set_parallelism(1) # 2- 数据输入 table.execute_sql(""" create table source( word varchar ) with ( 'connector'='socket', 'hostname'='192.168.88.161', 'port'='9999', 'format'='csv' ) """) # 3- 数据输出 table.execute_sql(""" create table sink( word varchar, cnt bigint ) with ( 'connector'='print' ) """) # 4- 数据处理 table.execute_sql(""" insert into sink select word, count(1) as cnt from source group by word """).wait() # 5- 启动流式任务 env.execute()
运行结果截图:
可能遇到的错误一:
原因: 代码没有找到Flink的安装目录在什么地方 解决办法: 在代码的上面添加如下内容 os.environ['FLINK_HOME'] = '/export/server/flink'
可能遇到的错误二:
原因: 需要在增删改查的语句代码后面增加wait()的方法调用
5、提交运行
5.1 开源提交
环境准备:在node1上执行即可
需要确保Flink的Standalone集群是启动的状态,如果没有启动,需要执行如下命令: cd /export/server/flink/bin ./start-cluster.sh 启动nc nc -lk 9999
提交命令:在node1上执行即可
/export/server/flink/bin/flink run -py /export/data/flink_base/flink_sql_wordcount.py 注意: 代码所在的路径要改成你自己的
运行成功截图如下:
5.2 阿里云提交
)
运行结果截图:
三、UDF(熟悉)
1、概述
UDF,user defined function,用户自定义函数。
官网如下:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table/udfs/python_udfs/
Flink的UDF函数可以分为如下几种类型:
- Scalar Function:标量函数,UDF,一对一。举例:split、substr、concat
- Table Function:表数据生成函数,UDTF,一对多。举例:explode、json_tuple
- Aggregate Function:聚合函数,UDAF,多对一。举例:sum、avg、max、min、count等
- Table Aggregate Function:表数据生成聚合函数,UDTAF,多对多。
2、Scalar Function
Scalar Function,UDF。就是一进一出的函数。比如map方法。
2.1 需求
实现一个类似于两数之和的sum函数,函数名:mySum 优先采用SQL来实现。 输入数据: | num1 | num2 | | 1 | 2 | | 3 | 4 | 输出结果: | num1 | num2 | result | | 1 | 2 | 3 | | 3 | 4 | 7 |
2.2 实现
import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf os.environ['FLINK_HOME'] = '/export/server/flink' os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' """ 实现一个类似于两数之和的sum函数,函数名:mySum """ if __name__ == '__main__': # 1- 创建流式执行环境 # 1.1- 创建顶级对象 env = StreamExecutionEnvironment.get_execution_environment() # 1.2- 得到SQL API层的顶级对象 table = StreamTableEnvironment.create(stream_execution_environment=env) # 1.3- 设置全局并行度 env.set_parallelism(1) # 2- 数据输入 table.execute_sql(""" create table source( num1 bigint, num2 bigint ) with ( 'connector'='socket', 'hostname'='192.168.88.161', 'port'='9999', 'format'='csv' ) """) # 3- 数据输出 table.execute_sql(""" create table sink( num1 bigint, num2 bigint, `result` bigint ) with ( 'connector'='print' ) """) # 4- 数据处理 # 4.1- 创建自定义Python函数 @udf(result_type=DataTypes.BIGINT()) def mySum_func(num_arg1, num_arg2): return num_arg1 + num_arg2 # 4.2- 注册 table.create_temporary_function('mySum',mySum_func) # 4.3- 调用 table.execute_sql(""" insert into sink select num1,num2,mySum(num1,num2) as `result` from source """).wait() # 5- 启动流式任务 env.execute()
运行结果截图:
3、Table Function
Table Function,表值函数,一进多出的函数。类似于Hive中的UDTF。
3.1 需求
实现一个类似于flatMap的功能(explode)的功能。数据源来自于socket。函数名:myExplode。 输入数据: | mynum | | 3 | | 4 | 输出结果: 返回<mynum,并且大于等于0的数字 | result | | 0 | | 1 | | 2 | | result | | 0 | | 1 | | 2 | | 3 |
3.2 实现
udtf在SQL语句中进行调用,语法比较特殊 格式: lateral table(UDTF函数名称(字段名称)) as 视图名称(视图中新的字段名称1,视图中新的字段名称2..视图中新的字段名称n) on true
import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf,udtf os.environ['FLINK_HOME'] = '/export/server/flink' os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' """ 实现一个类似于flatMap的功能(explode)的功能。数据源来自于socket。函数名:myExplode UDTF """ if __name__ == '__main__': # 1- 创建流式执行环境 # 1.1- 创建顶级对象 env = StreamExecutionEnvironment.get_execution_environment() # 1.2- 得到SQL API层的顶级对象 table = StreamTableEnvironment.create(stream_execution_environment=env) # 1.3- 设置全局并行度 env.set_parallelism(1) # 2- 数据输入 table.execute_sql(""" create table source( mynum bigint ) with ( 'connector'='socket', 'hostname'='192.168.88.161', 'port'='9999', 'format'='csv' ) """) # 3- 数据输出 table.execute_sql(""" create table sink( `result` bigint ) with ( 'connector'='print' ) """) # 4- 数据处理 # 4.1- 创建自定义的Python函数 @udtf(result_types=DataTypes.BIGINT()) def myExplode_func(num_arg): return range(num_arg) # 4.2- 注册进Flink中 # 下面2种任意使用其中一个都行 table.create_temporary_system_function('myExplode', myExplode_func) # table.create_temporary_function('myExplode', myExplode_func) # 4.3- 调用 # 错误调用 # table.execute_sql(""" # insert into sink # select # myExplode(mynum) as `result` # from source # """).wait() # 正确调用 """ udtf在SQL语句中进行调用,语法比较特殊 格式:lateral table(UDTF函数名称(字段名称)) as 视图名称(视图中新的字段名称1,视图中新的字段名称2..视图中新的字段名称n) on true """ table.execute_sql(""" insert into sink select new_field from source left join lateral table(myExplode(mynum)) as tt(new_field) on true -- Hive中UDTF函数调用:lateral view explode(split(line,' ')) t as new_field """).wait() # 5- 启动流式任务 env.execute()
运行结果截图:
可能遇到的错误一:
原因: result单词在Flink的SQL中是一个关键字 解决办法: 1- 修改result字段的名称,变成不是关键字的 2- 在result上面加上``
可能遇到的错误二:
原因: UDTF的调用在FlinkSQL中有特殊的语法要求 解决办法: 改成如下的SQL语句 insert into sink select new_field from source left join lateral table(myExplode(mynum)) as tt(new_field) on true
可能遇到的错误三:
原因: 对UDTF进行注册需要使用@udtf装饰器
可能遇到的错误四:
原因: @udtf装饰器中的参数名叫做 result_types
4、Aggregate Function
Aggregate Function,聚合函数,是多进一出的函数,类似于Hive的UDAF函数。
4.1 需求
实现一个类似于count的函数,统计词频。数据源为socket,函数名:myCount 输入数据: | word | | hello | | spark | | hello | 输出数据: | word | cnt | | hello | 2 | | spark | 1 |
4.2 实现
import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.types import DataType from pyflink.table.udf import udf, AggregateFunction, ACC, T os.environ['FLINK_HOME'] = '/export/server/flink' os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' """ 实现一个类似于count的函数,统计词频。数据源为socket,函数名:myCount """ # 4.1- 创建udaf函数的相关代码 class MyUDAF(AggregateFunction): """ 创建累加器,也就是用来对数据进行初始化 累加器的作用:用来临时保存中间的聚合结果 """ def create_accumulator(self) -> ACC: # 返回列表,里面的0表示的是单词次数的初始值 return [0] # 从累加器中获取数据 def get_value(self, accumulator: ACC) -> T: # 这里的0是索引/下标 return accumulator[0] # 对数据进行累加。+U def accumulate(self, accumulator: ACC, *args): # 相同的单词,每来一个就对单词次数+1 # accumulator[0] = accumulator[0] + 1 accumulator[0] += 1 # 撤回累加器数据的变化。-U def retract(self, accumulator: ACC, *args): accumulator[0] = accumulator[0] - 1 # 合并多个累加器中的值,因为Flink是多线程分布式运行 def merge(self, accumulator: ACC, accumulators): for i in accumulators: accumulator[0] = accumulator[0] + i[0] # 获得最终结果的数据类型 def get_result_type(self) -> DataType: return DataTypes.BIGINT() # 获得累加器中存放的元素的数据类型 def get_accumulator_type(self) -> DataType: return DataTypes.BIGINT() if __name__ == '__main__': # 1- 创建流式执行环境 # 1.1- 创建顶级对象 env = StreamExecutionEnvironment.get_execution_environment() # 1.2- 得到SQL API层的顶级对象 table = StreamTableEnvironment.create(stream_execution_environment=env) # 1.3- 设置全局并行度 env.set_parallelism(1) # 2- 数据输入 table.execute_sql(""" create table source( word varchar ) with ( 'connector'='socket', 'hostname'='192.168.88.161', 'port'='9999', 'format'='csv' ) """) # 3- 数据输出 table.execute_sql(""" create table sink( word varchar, cnt bigint ) with ( 'connector'='print' ) """) # 4- 数据处理 # 4.2- 注册进Flink """ UDAF注册的时候,传递的是类的实例对象,也就是类名(参数) """ table.create_temporary_function('myCount',MyUDAF()) # 4.3- 调用 table.execute_sql(""" insert into sink select word, myCount(1) as cnt from source group by word """).wait() # 5- 启动流式任务 env.execute()
运行结果截图:
5、阿里云UDF
在进行函数注册时,先把函数开发好。
开发阿里云的UDF参考手册:https://help.aliyun.com/zh/flink/developer-reference/python/?spm=a2c4g.11186623.0.0.2ac522158vB92w
5.1 注册UDF函数
选择SQL开发 -> 函数选项,上传压缩包,如下图:
点击确定,如下图:
点击创建函数,提示创建成功,如下图:
到此,则函数创建成功。
5.2 使用UDF函数
- sub_string函数
#1.创建表 CREATE TEMPORARY TABLE function_udf( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'socket', 'hostname' = '172.24.24.49', 'port' = '9999', 'format' = 'csv' ); #2.查询SQL SELECT sub_string(a,2,5) FROM function_udf; 注意: hostname一定要改成自己的ECS服务器内网IP
- split函数
#1.创建表 同上,略 #2.查询SQL SELECT a,b,c,d,e FROM function_udf,lateral table(split(a)) as T(d,e);
- weight_avg函数
#1.创建表 同上,略。 #2.查询SQL SELECT weighted_avg(b,c) FROM function_udf;
四、Flink常见面试题整理
1、Flink中的部署模式?你是如何部署Flink?你的Flink的项目是用什么方式部署?
- 开源Flink:我们使用的是开源版的Flink,部署项目的时候使用的Application应用模式,给你具体说下为什么我们使用application模式进行部署,xxxx。另外我给您介绍下其他一些部署模式。Session、per-job
- 阿里云Flink:我们使用的是阿里云Flink,部署项目的时候使用的per-job,也就是job分离模式,给你具体说下为什么我们使用job分离模式进行部署,xxxx。另外我给您介绍下其他一些部署模式。Session、application
2、说一下对Flink中时间的理解?你在你的项目中是如何使用Flink进行数据统计的?
步骤一:介绍Flink中的时间分类,并且要说出每个时间代表的含义。
步骤二:结合业务举例说明如何使用3类时间。
举例:在项目中,我们使用Flink进行数据的实时ETL。但是在做数据检查和核对的时候,发现有些数据出现事件时间的缺失,排错问题发现是业务方上报数据,导致了部分事件时间的空缺。然后与业务方进行异常数据处理方式的沟通,最终确定是使用处理时间来填补事件时间空缺的情况。
3、你在项目中如何解决延迟到来的数据?如何彻底解决数据延迟到的情况?
在Flink的SQL开发中,无法解决数据超过watermark允许延迟时间后到来的数据被丢失的情况。在DataStream的开发中可以通过侧输出流解决延迟来的数据,也就是在DataStream的编程中,可以做到彻底的解决数据延迟到来的情况。
如何彻底解决数据延迟到的情况?watermark水印+侧输出流
4、介绍一下Flink的底层原理?介绍一下Flink的架构?
步骤一:从Flink的作业提交运行时的架构说起:我们开发完Flink程序以后,通过命令行或者界面提交Flink程序到集群中运行,首先第一步会启动Client客户端进程。接着Client进程将我们的Flink job通过Actor通信系统提交给JobManager,JobManager拿到任务后会分配给到具体的TaskManager来执行,并且任务运行的具体场所是TaskManager中的Slot
步骤二:我们的项目中,使用的是阿里云Flink。因此我们通过per-job模式部署Flink程序。接下来给您具体介绍下该模式下任务的底层提交流式是什么样的?
步骤三:回答Per-job作业的提交流程
5、用户自定义函数的分类
UDF:用户自定义函数,输入一行数据,得到一行数据。一对一
UDAF:用户自定义聚合函数,输入多行数据,得到一行数据。多对一
UDTF:用户自定义表数据生成函数,输入一行数据,得到多行数据。一对多