Spark SQL
一、Spark SQL架构
能够直接访问现存的Hive数据
提供JDBC/ODBC接口供第三方工具借助Spark进行数据处理
提供更高层级的接口方便处理数据
支持多种操作方式:SQL、API编程
- API编程:Spark SQL基于SQL开发了一套SQL语句的算子,名称和标准的SQL语句相似
支持Parquet、CSV、JSON、RDBMS、Hive、HBase等多种外部数据源。(掌握多种数据读取方式)
Spark SQL核心:是RDD+Schema(算子+表结构),为了更方便我们操作,会将RDD+Schema发给DataFrame
数据回灌:用于将处理和清洗后的数据回写到Hive中,以供后续分析和使用。
BI Tools:主要用于数据呈现。
Spark Application:开发人员使用Spark Application编写数据处理和分析逻辑,这些应用可以用不同的编程语言编写,比如Python、Scala、Java等。
二、Spark SQL运行原理
- Catalyst优化器的运行流程:
- Frontend(前端)
- 输入:用户可以通过SQL查询或DataFrame API来输入数据处理逻辑。
- Unresolved Logical Plan(未解析的逻辑计划):输入的SQL查询或DataFrame转换操作会首先被转换为一个未解析的逻辑计划,这个计划包含了用户请求的所有操作,但其中的表名和列名等可能尚未解析。
- Catalyst Optimizer(Catalyst优化器) Catalyst优化器是Spark SQL的核心组件,它负责将逻辑计划转换为物理执行计划,并进行优化。Catalyst优化器包括以下几个阶段:
- Analysis(分析):将未解析的逻辑计划中的表名和列名解析为具体的元数据,这一步依赖于Catalog(元数据存储)。输出是一个解析后的逻辑计划。
- Logical Optimization(逻辑优化):对解析后的逻辑计划进行各种优化,如投影剪切、过滤下推等。优化后的逻辑计划更加高效。
- Physical Planning(物理计划):将优化后的逻辑计划转换为一个或多个物理执行计划。每个物理计划都代表了一种可能的执行方式。
- Cost Model(成本模型):评估不同物理计划的执行成本,选择代价最低的物理计划作为最终的物理计划。
- Backend(后端)
- Code Generation(代码生成):将选择的物理计划转换为可以在Spark上执行的RDD操作。这一步会生成实际的执行代码。
- RDDs:最终生成的RDD操作被执行,以完成用户请求的数据处理任务。
- 一个SQL查询在Spark SQL中的优化流程
SELECT name FROM( SELECT id, name FROM people ) p WHERE p.id = 1
- Filter下压:将Filter操作推到更靠近数据源的位置,以减少不必要的数据处理。
- 合并Projection:减少不必要的列选择
- IndexLookup return:name:如果存在索引,可以直接通过索引查找并返回
name
列
三、Spark SQL API
SparkContext:Spark应用的主入口,代表了与Spark集群的连接。
SQLContext:Spark SQL的编程入口,使用SQLContext可以运行SQL查询、加载数据源和创建DataFrame。
HiveContext:SQLContext的一个子集,可以执行HiveQL查询,并且可以访问Hive元数据和UDF。
SparkSession:Spark2.0后推荐使用,合并了SQLContext和HiveContext,提供了与Spark所有功能交互的单一入口点。
创建一个SparkSession就包含了一个SparkContext。
若同时需要创建SparkContext和SparkSession,必须先创建SparkContext再创建SparkSession。否则,会抛出如下异常,提示重复创建SparkContext:
详细解释
创建SparkSession的代码
val conf: SparkConf = new SparkConf() .setMaster("local[4]") .setAppName("SparkSql") def main(args: Array[String]): Unit = { SparkSession.builder() .config(conf) .getOrCreate() }
优化:减少创建代码,SparkSessionBuilder工具类
package com.ybg import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession // 封装SparkSession的创建方法 class SparkSessionBuilder(master:String,appName:String){ lazy val config:SparkConf = { new SparkConf() .setMaster(master) .setAppName(appName) } lazy val spark:SparkSession = { SparkSession.builder() .config(config) .getOrCreate() } lazy val sc:SparkContext = { spark.sparkContext } def stop(): Unit = { if (null != spark) { spark.stop() } } } object SparkSessionBuilder { def apply(master: String, appName: String): SparkSessionBuilder = new SparkSessionBuilder(master, appName) }
四、Spark SQL依赖
pom.xml
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spark.version>3.1.2</spark.version> <spark.scala.version>2.12</spark.scala.version> <hadoop.version>3.1.3</hadoop.version> <mysql.version>8.0.33</mysql.version> <hive.version>3.1.2</hive.version> <hbase.version>2.3.5</hbase.version> <jackson.version>2.10.0</jackson.version> </properties> <dependencies> <!-- spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency> 若出现如下异常: Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0 追加如下依赖: --> <!-- jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.0</version> </dependency> <!-- mysql --> <dependency> <groupId>com.mysql</groupId> <artifactId>mysql-connector-j</artifactId> <version>${mysql.version}</version> </dependency> </dependencies>
log4j.properties
log4j.properties应该放在资源包下。
log4j.rootLogger=ERROR, stdout, logfile # 设置可显示的信息等级 log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=log/spark_first.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
五、Spark SQL数据集
1、DataSet
- 简介:
- 从Spark 1.6开始引入的新的抽象。
- 是特定领域对象中的强类型集合。
- 可以使用函数式编程或SQL查询进行操作。
- 等于RDD + Schema。
2、DataFrame
- 简介:
- DataFrame是特殊的DataSet:
DataFrame=DataSet[Row]
,行对象的集合,每一行就是一个行对象。 - 类似于传统数据的二维表格。
- DataFrame是特殊的DataSet:
- 特性:
- Schema:在RDD基础上增加了Schema,描述数据结构信息
- 嵌套数据类型:支持
struct
,map
,array
等嵌套数据类型。 - API:提供类似SQL的操作接口。
详细解释
创建DataSet的代码
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // 提供了一组隐式转换,这些转换允许将Scala的本地集合类型(如Seq、Array、List等)转换为Spark的DataSet。 import spark.implicits._ val dsPhone: Dataset[Product] = spark.createDataset(Seq( Product(1, "Huawei Mate60", 5888.0f), Product(2, "IPhone", 5666.0f), Product(3, "OPPO", 1888.0f) )) dsPhone.printSchema() /** * root * |-- id: integer (nullable = false) * |-- name: string (nullable = true) * |-- price: float (nullable = false) */
创建DataFrame的代码
读取CSV文件
对于CSV文件,在构建DataFrame之前,必须要先创建一个Schema,再根据文件类型分不同情况进行导入。(读取JSON文件或者数据库表都并不需要)
注意:必须要
import spark.implicits._
,导入隐式类,才能够识别一些隐式转换,否则会报错。CSV文件在创建DataFrame时,可以选择尽量模仿Hive中的OpenCSVSerDe的
val spark: SparkSession = SparkSession.builder() .config(conf) .getOrCreate() import spark.implicits._ val schema: StructType = StructType( Seq( StructField("user_id", LongType), StructField("locale", StringType), StructField("birthYear", IntegerType), StructField("gender", StringType), StructField("joinedAt", StringType), StructField("location", StringType), StructField("timezone", StringType) ) ) val frmUsers: DataFrame = spark.read .schema(schema) .option("separator", ",") // 指定文件分割符 .option("header", "true") // 指定CSV文件包含表头 .option("quoteChar", "\"") .option("escapeChar", "\\") .csv("C:\\Users\\lenovo\\Desktop\\users.csv") .repartition(4) .cache()
- 读取JSON文件
val frmUsers2: DataFrame = spark.read.json("hdfs://single01:9000/spark/cha02/users.json") frmUsers2.show()
- 读取数据库表
val url = "jdbc:mysql://single01:3306/test_db_for_bigdata" // 数据库连接地址 val mysql = new Properties() mysql.setProperty("driver", "com.mysql.cj.jdbc.Driver") mysql.setProperty("user", "root") mysql.setProperty("password", "123456") spark .read .jdbc(url,"test_table1_for_hbase_import",mysql) // (url,TableName,连接属性) .show(100)
六、Spark_SQL的两种编码方式
val spark: SparkSession = SparkSession.builder() .config(conf) .getOrCreate() import spark.implicits._ val schema: StructType = StructType( Seq( StructField("user_id", LongType), StructField("locale", StringType), StructField("birthYear", IntegerType), StructField("gender", StringType), StructField("joinedAt", StringType), StructField("location", StringType), StructField("timezone", StringType) ) ) val frmUsers: DataFrame = spark.read .schema(schema) .option("separator", ",") // 指定文件分割符 .option("header", "true") // 指定CSV文件包含表头 .option("quoteChar", "\"") .option("escapeChar", "\\") .csv("C:\\Users\\lenovo\\Desktop\\users.csv") .repartition(4) .cache()
此处已经创建好了DataFrame
1. 面向标准SQL语句(偷懒用)
frmUsers.registerTempTable("user_info") // 此方法已过期 spark.sql( """ |select * from user_info |where gender='female' |""".stripMargin) .show(10)
2. 使用Spark中的SQL算子(更规范)
frmUsers .where($"birthYear">1990) .groupBy($"locale") .agg( count($"locale").as("locale_count"), round(avg($"birthYear"),2).as("avg_birth_year") ) .where($"locale_count">=10 and $"avg_birth_year">=1993) .orderBy($"locale_count".desc) .select( $"locale", $"locale_count", $"avg_birth_year", dense_rank() .over(win) .as("rnk_by_locale_count"), lag($"locale_count",1) .over(win) .as("last_locale_count") ) .show(10)
七、常用算子
1.基本SQL模板
select col,cols*,agg* where conditionCols group by col,cols* having condition order by col asc|desc limit n
2.select
select
语句在代码的开头可以不写,因为有后续的类似where
和group by
语句已经对列进行了操作,指明了列名。如果后续有select
语句,则优先按照后面的select
语句进行。
frmUsers.select( $"locale",$"locale_count" )
3.agg
.agg( count($"locale").as("locale_count"), round(avg($"birthYear"),2).as("avg_birth_year") )
4.窗口函数
- over子句
注意:over子句中的分区信息是可以被重用的
val win: WindowSpec = Window.partitionBy($"gender").orderBy($"locale_count".desc) frmUsers ... .select( dense_rank() .over(win) .as("rnk_by_locale_count") )
5.show
show(N)表示显示符合条件的至多N条数据。(不是取前N条再提取出其中符合条件的数据)
frmUsers ... .show(10)
6.条件筛选 where
newCol:Column = $"cus_state".isNull newCol:Column = $"cus_state".isNaN newCol:Column = $"cus_state".isNotNull newCol:Column = $"cus_state".gt(10) <=> $"cus_state">10 newCol:Column = $"cus_state".geq(10) <=> $"cus_state">=10 newCol:Column = $"cus_state".lt(10) <=> $"cus_state"<10 newCol:Column = $"cus_state".leq(10) <=> $"cus_state"<=10 newCol:Column = $"cus_state".eq(10) <=> $"cus_state"===10 newCol:Column = $"cus_state".ne(10) <=> $"cus_state"=!=10 newCol:Column = $"cus_state".between(10,20) newCol:Column = $"cus_state".like("张%") newCol:Column = $"cus_state".rlike("\\d+") newCol:Column = $"cus_state".isin(list:Any*) newCol:Column = $"cus_state".isInCollection(values:Itrable[_]) 多条件: newCol:Column = ColOne and ColTwo newCol:Column = ColOne or ColTwo
在Spark SQL中,不存在Having子句,Where子句的实际作用根据相对于分组语句的前后决定。
7.分组
// 多重分组 /** rollup的效果: select birthYear,count(*) from user group by birthYear union all select gender,birthYear,count(*) from user group by gender,birthYear 存在"字段不对应"的情况: 空缺的字段会自动补全为null */ frmUsers .rollup("gender", "birthYear") .count() .show(100)
// 为了方便查找到每个数据行所对应的分组方式 spark.sql( """ |select grouping__id,gender,birthYear,count(8) as cnt from user_info |group by gender,birthday, |grouping sets(gender,birthday,(gender,birthYear)) |""".stripMargin) .show(100) // 这里的group by子句定义了分组的列,到grouping sets明确指定了分组的组合 // 因而,在数仓设计的过程中,我们能够对不同分组依据下的不同数据依据grouping__id做分区。
RollUp
和Cube
的区别假设有三列:
1
,2
,3
,使用CUBE(1, 2, 3)
,会生成以下组合:GROUP BY ()
(不分组,整体聚合)GROUP BY (1)
GROUP BY (2)
GROUP BY (3)
GROUP BY (1, 2)
GROUP BY (1, 3)
GROUP BY (2, 3)
GROUP BY (1, 2, 3)
ROLLUP
生成的分组组合是层级的,它从最详细的分组开始,一步步减少分组的列,直到整体聚合。假设有三列:
1
,2
,3
,使用ROLLUP(1, 2, 3)
,会生成以下组合:GROUP BY (1, 2, 3)
(最详细的分组)GROUP BY (1, 2)
GROUP BY (1)
GROUP BY ()
(不分组,整体聚合)
8.关联查询
val frmClass: DataFrame = spark.createDataFrame( Seq( Class(1, "yb12211"), Class(2, "yb12309"), Class(3, "yb12401") ) ) val frmStu: DataFrame = spark.createDataFrame( Seq( Student("henry", 1), Student("ariel", 2), Student("jack", 1), Student("rose", 4), Student("jerry", 2), Student("mary", 1) ) ) // 1.笛卡尔积(默认情况下) frmStu.as("S") .join(frmClass.as("C")) .show(100) /** +-----+-------+-------+---------+ | name|classId|classId|className| +-----+-------+-------+---------+ |henry| 1 | 1 | yb12211| |henry| 1 | 2 | yb12309| |henry| 1 | 3 | yb12401| |ariel| 2 | 1 | yb12211| |ariel| 2 | 2 | yb12309| |ariel| 2 | 3 | yb12401| | jack| 1 | 1 | yb12211| | jack| 1 | 2 | yb12309| | jack| 1 | 3 | yb12401| | rose| 4 | 1 | yb12211| | rose| 4 | 2 | yb12309| | rose| 4 | 3 | yb12401| |jerry| 2 | 1 | yb12211| |jerry| 2 | 2 | yb12309| |jerry| 2 | 3 | yb12401| | mary| 1 | 1 | yb12211| | mary| 1 | 2 | yb12309| | mary| 1 | 3 | yb12401| +-----+-------+-------+---------+ */ // 2.内连接 frmStu.as("S") .join(frmClass.as("C"), $"S.classId" === $"C.classId","inner") .show(100) /** +-----+-------+-------+---------+ | name|classId|classId|className| +-----+-------+-------+---------+ |henry| 1 | 1 | yb12211| |ariel| 2 | 2 | yb12309| | jack| 1 | 1 | yb12211| |jerry| 2 | 2 | yb12309| | mary| 1 | 1 | yb12211| +-----+-------+-------+---------+ */ // 启用using:使用Seq("Column")代表关联字段 frmStu.as("S") .join(frmClass.as("C"), Seq("classId"),"right") .show(100) // 3.外连接 frmStu.as("S") .join(frmClass.as("C"), $"S.classId" === $"C.classId","outer") // left | right | outer .show(100) /** +-----+-------+-------+---------+ | name|classId|classId|className| +-----+-------+-------+---------+ |henry| 1 | 1 | yb12211| | jack| 1 | 1 | yb12211| | mary| 1 | 1 | yb12211| | null| null | 3 | yb12401| | rose| 4 | null | null| |ariel| 2 | 2 | yb12309| |jerry| 2 | 2 | yb12309| +-----+-------+-------+---------+ */ // 4.反连接:返回左数据集中所有没有关联字段匹配记录的左数据集的行 frmStu.as("S") .join(frmClass.as("C"), $"S.classId" === $"C.classId","anti") .show(100) /** +----+-------+ |name|classId| +----+-------+ |rose| 4 | +----+-------+ */ // 5.半连接:返回左数据集中所有有关联字段匹配记录的左数据集的行 frmStu.as("S") .join(frmClass.as("C"), $"S.classId" === $"C.classId","semi") .show(100) /** +-----+-------+ | name|classId| +-----+-------+ |henry| 1 | |ariel| 2 | | jack| 1 | |jerry| 2 | | mary| 1 | +-----+-------+ */
9.排序
frmStu.orderBy(cols:Column*)
10.数据截取
frmStu.tail(n:Int) frmStu.take(n:Int)
八.SQL函数
常用函数
$"NAME" = col("NAME") // 取列值 as("ALIAS_NAME") // 别名 as(alias:Seq[String]) // 多个别名 when(CONDITION,V1) // 条件 .when(...) .otherwise(VN) lit(V) // 常量列 withColumn(colName:String,col:Column) // 扩展列(通常用于使用窗口函数做扩展列) cast(DataType) // 类型转换
常用函数案例
spark.createDataFrame(Seq( Test(1,Array("money","freedom"),Map("java"->85,"c++"->92)), Test(2,Array("beauty","writing"),Map("math"->91,"English"->88)), Test(3,Array("movie","draw"),Map("Sql"->100,"LLM"->77)) )) // 多个explode不能写在一个select中 .select($"id",explode($"hobbies").as("hobby"),$"scores") .select($"id",$"hobby",explode($"scores").as(Seq("course","score"))) .select($"id",$"hobby",$"course",$"score".cast("Integer")) .withColumn("score_rank", when($"score">=90,lit("A"))) when($"score">=80,lit("B")) when($"score">=70,lit("C")) when($"score">=60,lit("D")) .otherwise(lit("E"))
集合函数
array size(collectCol:Column) // 计算数组大小 array(cols:Column*) // 一行中的多列转为单列数组类型 array_sort(arrayCol:Column) // 对数组列中的元素进行排序 array_contains(arrayCol:Column,value:Any) // 依次判断数组列的各个元素是否含有特定值 array_distinct(arrayCol:Column) // 对数组列的各个元素进行去重并返回去重后的结果 array_join(arrayCol:Column,sep:String,nullReplacement:String) // 对数组列的各个元素进行拼接 array_except(arrayCol:Column) array_intersect(arrayCol:Column) array_union(arrayCol:Column) map map_keys(mapCol:Column) map_values(mapCol:Column) map_entries(mapCol:Column)
集合函数案例
data.select($"id",size($"hobbies").as("hobbies_cnt")).show() data.select($"id",array_sort($"hobbies").as("hobbies_sort")).show() data.select($"id",array_contains($"hobbies","money")).show() data.select($"id",array_distinct($"hobbies").as("unique_hobbies")).show() data.select($"id",array_join($"hobbies",",","Unknown Value").as("union_hobby")).show() data.withColumn("next_hobbies",lead($"hobbies",1) over(Window.orderBy("id"))) .where($"next_hobbies".isNotNull) // 提前做条件筛选 .select( array_intersect($"hobbies",$"next_hobbies").as("intersect_hobbies") ) .show(10) data.select($"id", map_keys($"scores").as("course_list"), map_values($"scores").as("scores"), map_entries($"scores").as("course_score_list") ).show() // 考java的学生人数有多少 val num: Long = data.select( array_contains(map_keys($"scores"), "java").as("isJava") ).filter($"isJava").count()
字符串函数
// 提取 // 1、提取 json json_tuple(jsonCol:Column, fields:String*) // $"jsonString" => field1,field2 获取单层Json字段 get_json_object(jsonCol:Column, path:String) // $"jsonString" => $.field1[.field2] 获取多层嵌套Json字段 // 2、正则分组 regexp_extract(col:Column, pattern:String, groupId:Int) // 3、分裂与截取 split(col:Column,pattern:String) substring(col:Column,pos:Int,len:Int) substring_index(col:Column,sep:String,groupId:Int) // groupId +N 从左向右前N个 // groupId -N 从右向左前N个 // 第N个 substring_index(substring_index(COL,SEP,+N),SEP,-1) // 4、子字符串在字段中的位置(表示子字符串的第一个字符在字符串中的索引位置) locate(subStr:String,col:Column) // 有则>0,否则=0, instr(col:Column,subStr:String) // 5、字符串拼接 concat(cols:Column*) concat_ws(sep:String,cols:Column*) // 6、内容长度 length(col:Column) // 字符长度 // 字节长度,未提供算子,需要通过 spark.sql(""" select octet_length(...)""") 实现 // 7、定长填充 lpad(col:Column,len:Int,pad:String) rpad(col:Column,len:Int,pad:String) // 8、清除两端空格 ltrim(col:Column) rtrim(col:Column) trim(col:Column) // 9、大小写转换 initcap(col:Column) // 每个单词首字母大写 upper(col:Column) // 全大写 lower(col:Column) // 全小写 hash(col:Column) // 去哈希值 regexp_replace(col:Column,pattern:String,replace:String) // 正则替换 translate(col:Column,from:String,to:String) // 按字母转换 reverse(col:Column) // 翻转 // 10、转码 encode(col:Column, charSet:String) decode(col:Column, charSet:String) // 11、非对称加密 sha1(col:Column) md5(col:Column)
字符串函数案例
val frm: DataFrame = spark .createDataFrame(Seq( Json(1, """{"name":"henry","age":22,"hobbies":["beauty","money","power"],"address":{"province":"jiangsu","city":"nanjing"}}"""), Json(2, """{"name":"jack","age":23,"hobbies":["beauty","power"],"address":{"province":"jiangsu","city":"wuxi"}}"""), Json(3, """{"name":"tom","age":24,"hobbies":["beauty","money"],"address":{"province":"jiangsu","city":"yancheng"}}""") )) frm.select($"id", json_tuple($"json","name","age","hobbies").as(Seq("name","age","hobbies")), get_json_object($"json","$.address.province").as("province"), get_json_object($"json","$.address.city").as("city") ).show(10) // 通过正则提取获取特定的日志信息 val regex_line = "(.*?) (INFO|WARN|ERROR) (.*?):(.*)" val regex_log = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3} (INFO|WARN|ERROR) .*" val frm: DataFrame = spark .read .text("spark-warehouse/datanode.log") .toDF("line") frm .where($"line".rlike(regex_log)) .select( regexp_extract($"line",regex_line,1).as("log_in_time"), regexp_extract($"line",regex_line,2).as("log_type"), regexp_extract($"line",regex_line,3).as("log_full_pack"), regexp_extract($"line",regex_line,4).as("log_detail") ) // 获取错误日志信息中错误类别及其所占数量 .where($"log_type".equalTo("ERROR")) .groupBy($"log_detail") .count() .show(100)
Spark自定义函数流程
自定义函数流程:定义-注册-调用
建议将自定义函数实现,单独建对象保存
import java.nio.charset.{StandardCharsets} import java.util.Base64 import javax.crypto.Cipher import javax.crypto.spec.SecretKeySpec object SparkUtil { /** * 处理密钥 * @param secret 密钥 */ private def secretInit(secret:String)={ // 对密钥长度进行约束 val allowNumBits: Array[Int] = Array(16, 24, 32) // 如果密钥长度符合,将密钥转换为AES密钥对象 if (allowNumBits.contains(secret.size)) { new SecretKeySpec( secret.getBytes(StandardCharsets.UTF_8),"AES") }else{ throw new RuntimeException( s"AES secret size of numBits ${secret.size} not in permitted values (${allowNumBits.mkString(",")})") } } /** * 加密函数 * @param src 源数据 * @param secret 密钥 */ def encrypt(src:String,secret:String)={ // 获取加密算法实例 val cipher: Cipher = Cipher.getInstance("AES") // 初始化加密模式,使用给定的密钥(需要先用key()对密钥进行处理) cipher.init(Cipher.ENCRYPT_MODE,secretInit(secret)) // 执行加密操作 val bytes: Array[Byte] = cipher.doFinal(src.getBytes(StandardCharset.UTF_8)) // 返回加密后的数据 Base64.getEncoder().encodeToString(bytes) } /** * 解密函数 * @param dest 待解密数据 * @param secret 密钥 */ def decrypt(dest:String,secret:String)={ val cipher: Cipher = Cipher.getInstance("AES") cipher.init(Cipher.DECRYPT_MODE,secretInit(secret)) val bytes: Array[Byte] = cipher.doFinal( Base64.getDecoder.decode(dest)) new String(bytes, StandardCharsets.UTF_8) } }
在 Spark 环境下导入对象实现的方法,并在 SparkSession 中注册 UDF 函数
import core.SparkUtil.{encrypt,decrypt} spark.udf.register( "aes_encrypt", (src:String,secret:String) =>encrypt(src, secret),StringType) spark.udf.register( "aes_decrypt", (src:String,secret:String) =>decrypt(src, secret),StringType)
在 SparkSql 中调用注册函数
val frm: DataFrame = spark.createDataFrame(Seq( Test(1,Array("money","freedom"),Map("java"->85,"mysql"->67)), Test(2,Array("beauty","beauty"),Map("java"->72,"mysql"->90)), Test(3,Array("sports","beauty"),Map("java"->76,"html"->52)) )) val secret = "henryyb2211ariel" val frmEncrypt: DataFrame = frm .select($"id", callUDF( "aes_encrypt", array_join($"hobbies", ","), lit(secret) ).as("encrypted_hobbies") ) val frmDecrypt: DataFrame = frmEncrypt .select($"id", split( callUDF( "aes_decrypt", $"encrypted_hobbies", lit(secret) ), "," ).as("hobbies") ).show()