文章目录
模块 1: PySpark 简介
1.1 什么是 PySpark?
PySpark 的定义和特点
PySpark
是 Apache Spark 提供的 Python API,允许使用 Python 编程语言访问 Spark 的分布式计算引擎。通过 PySpark,开发者可以使用 Python 进行大规模数据处理、分析、机器学习和图计算等任务,充分发挥 Spark 引擎的性能和分布式计算能力。PySpark 与 Spark 的关系
PySpark
是Spark
的一个特定版本,是 Spark 为 Python 提供的 API。它允许开发者使用 Python 编程语言访问 Spark 引擎,从而能够利用 Spark 引擎的分布式计算能力进行大规模数据处理、机器学习、流式处理等任务。RDD
1.2 PySpark 生态系统
PySpark 的组件和模块
Spark Core:
描述: Spark 的核心组件,提供了分布式任务调度、内存管理和错误恢复等基本功能。
重要概念: 弹性分布式数据集(RDD)、SparkContext。
Spark SQL:
- 描述: 提供了使用 SQL 查询语言进行结构化数据处理的功能。
Spark Streaming:
- 描述: 用于实时数据流处理的模块,支持从多个数据源接收实时数据流。
模块 2: PySpark 基础
2.1 PySpark 安装和配置
安装 PySpark
1、安装java
2、安装 Apache Spark
3、设置环境变量【export SPARK_HOME=/path/to/spark export PATH= P A T H : PATH: PATH:SPARK_HOME/bin】
4、安装 Python
5、安装 PySpark【pip install pyspark】
6、pyspark进入互动行【from pyspark.sql import SparkSession spark = SparkSession.builder.appName(“example”).getOrCreate()】
配置 PySpark 环境
export SPARK_HOME=/path/to/spark
export PATH= P A T H : PATH: PATH:SPARK_HOME/bin
export PYSPARK_PYTHON=/path/to/python
export PYSPARK_DRIVER_PYTHON=/path/to/python
2.2 PySpark 编程基础
RDD (弹性分布式数据集) 简介
RDD
(Resilient Distributed Dataset,弹性分布式数据集)是 Spark 中的基本数据抽象,用于在分布式计算环境中表示和处理数据。RDD 具有弹性(Resilient)和分布式(Distributed)的特性,使其能够有效地处理大规模数据集。PySpark 操作(Transformation 和 Action)
转换算子:
Map【rdd = sc.parallelize([1, 2, 3, 4, 5]) squared_rdd = rdd.map(lambda x: x*x)】
filter【even_rdd = rdd.filter(lambda x: x % 2 == 0)】
flatmap【words_rdd = rdd.flatMap(lambda x: str(x).split(“,”))】
行动算子:
collect: 将 RDD 中的所有元素收集到一个列表中。【collected_data = squared_rdd.collect()】
count: 返回 RDD 中的元素数量。【count = rdd.count()】
reduce: 使用给定的函数逐个减少 RDD 中的元素。【sum_result = rdd.reduce(lambda x, y: x + y)】
foreach: 对 RDD 中的每个元素应用一个函数,通常用于在集群中执行分布式操作。
2.3 PySpark SQL 基础
DataFrame 和 Dataset 简介
语言支持: DataFrame: DataFrame 可以在多种编程语言中使用, 是一个带有结构信息的分布式数据集,包括 Python、Scala、Java 和 R。 Dataset: Dataset 主要是 Scala 和 Java 中的概念,在 Python 中的支持相对较少。 编码区别: # DataFrame API 示例 df.select("column1", "column2").filter(df["column3"] > 10).show() # Dataset API 示例 ds.select("column1", "column2").filter("column3 > 10").show()
Spark SQL 查询
- PySpark 提供了类似 SQL 的查询语言,可以使用 DataFrame API 或者直接运行 SQL 查询。
#dataframe过滤操作 df.select("column1", "column2").filter(df["column3"] > 10) #创建临时表 df.createOrReplaceTempView("my_table") 1、spark.sql("业务Sql").show 2、spark.sql("select * from table [join other_table]").createOrReplaceTempView("tmp1") 3、jgDF = spark.sql("select nsrsbh from table ").toDF("nsrsbh") 4、jgDF.write.mode(SaveMode.Overwrite[Append]).partitionBy("nd[分区字段]").format("orc").saveAsTable("sjjsq_db.tablename")
内置函数和UDF (用户定义函数)
【PySpark:】
def check_hj(hj): for sublist in result: if hj in sublist: return "NIL" return "N" check_hj_udf = udf(check_hj, StringType())
【Scala:】
object getCwsjJeFj { //自定义UDF函数 val getCwsjjeFj = (jyje: Double) => { if (jyje > 0 && jyje < 500000) { "A" } else if (0 == jyje) { "Y" } else { "Z" } } } spark.sqlContext.udf.register("getJyjeFj", getCwsjjeFj)
模块 3: 数据处理和分析
3.1 数据导入和导出
读取和写入各种数据格式
- 读取数据
- 读取text【text_data = spark.read.text(“path/to/textfile”)】
- 读取csv 【csv_data = spark.read.csv(“path/to/csvfile”, header=True, inferSchema=True)】
- 读取json【csv_data = spark.read.csv(“path/to/csvfile”, header=True, inferSchema=True)】
- 读取orc【orc_data = spark.read.orc(“path/to/orcfile”)】
- 读取delta lake【delta_data = spark.read.format(“delta”).table(“your_table_name”)】
- 写入数据
- 写入text【text_data.write.text(“path/to/output”)】
- 写入csv【csv_data.write.csv(“path/to/output”, header=True)】
- 写入json【json_data.write.json(“path/to/output”)】
- 写入orc【orc_data.write.orc(“path/to/output”)】
- 写入delta lake【delta_data.write.format(“delta”).save(“path/to/output”)】
- 读取数据
pandas读取一样可行
-
import pandas as pd # 读取 CSV 文件 df = pd.read_csv("path/to/your/file.csv") # 显示 DataFrame 的前几行 print(df.head())
-
外部数据源的连接
from pyspark.sql import SparkSession # 创建 SparkSession spark = SparkSession.builder.appName("JDBCExample").getOrCreate() # 定义连接属性 url = "jdbc:mysql://hostname:port/database_name" properties = {"user": "username", "password": "password", "driver": "com.mysql.jdbc.Driver"} # 从 JDBC 数据源读取数据 df = spark.read.jdbc(url=url, table="your_table_name", properties=properties) # 将数据写入 JDBC 数据源 df.write.jdbc(url=url, table="your_table_name", mode="overwrite", properties=properties) # 停止 SparkSession spark.stop()
3.2 数据清理和转换
数据类型转换
- 使用代码
from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType # 创建 SparkSession spark = SparkSession.builder.appName("DataTypeConversionExample").getOrCreate() # 示例数据框 data = [("Alice", "25"), ("Bob", "30"), ("Charlie", "35")] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns) # 显示原始数据框 print("Original DataFrame:") df.show() # 将 "Age" 列的数据类型转换为整数 df_converted = df.withColumn("Age", df["Age"].cast(IntegerType())) # 显示转换后的数据框 print("\nDataFrame after data type conversion:") df_converted.show() # 停止 SparkSession spark.stop()
- 使用SQL表达式进行转换
from pyspark.sql import SparkSession # 创建 SparkSession spark = SparkSession.builder.appName("SQLDataTypeConversionExample").getOrCreate() # 示例数据框 data = [("Alice", "25"), ("Bob", "30"), ("Charlie", "35")] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns) # 使用 SQL 表达式将 "Age" 列的数据类型转换为整数 df.createOrReplaceTempView("my_table") df_converted_sql = spark.sql("SELECT Name, CAST(Age AS INT) AS Age FROM my_table") # 显示转换后的数据框 print("DataFrame after data type conversion using SQL expression:") df_converted_sql.show() # 停止 SparkSession spark.stop()
数据筛选
- 筛选
from pyspark.sql import SparkSession # 创建 SparkSession spark = SparkSession.builder.appName("FilterExample").getOrCreate() # 示例数据框 data = [("Alice", 25, "F"), ("Bob", 30, "M"), ("Charlie", 35, "M")] columns = ["Name", "Age", "Gender"] df = spark.createDataFrame(data, columns) # 显示原始数据框 print("Original DataFrame:") df.show() # 筛选年龄大于等于 30 的行 filtered_df = df.filter(df["Age"] >= 30) # 显示筛选后的数据框 print("\nDataFrame after filtering:") filtered_df.show() # 停止 SparkSession spark.stop()
3.3 数据聚合和分组
聚合函数的使用
from pyspark.sql import SparkSession from pyspark.sql.functions import count, sum, avg, min, max # 创建 SparkSession spark = SparkSession.builder.appName("AggregationExample").getOrCreate() # 示例数据框 data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns) # 显示原始数据框 print("Original DataFrame:") df.show() # 计算总行数 row_count = df.count() print(f"\nRow count: {row_count}") # 计算年龄列的总和、平均值、最小值和最大值 result = df.agg(sum("Age").alias("TotalAge"), avg("Age").alias("AverageAge"), min("Age").alias("MinAge"), max("Age").alias("MaxAge")) # 显示聚合结果 result.show() # 停止 SparkSession spark.stop()
分组和汇总数据
from pyspark.sql import SparkSession from pyspark.sql.functions import sum # 创建 SparkSession spark = SparkSession.builder.appName("GroupByAggregationExample").getOrCreate() # 示例数据框 data = [("Alice", "Math", 90), ("Alice", "English", 95), ("Bob", "Math", 88), ("Bob", "English", 92), ("Charlie", "Math", 75), ("Charlie", "English", 80)] columns = ["Name", "Subject", "Score"] df = spark.createDataFrame(data, columns) # 显示原始数据框 print("Original DataFrame:") df.show() # 按姓名分组,计算每个姓名的总分 grouped_df = df.groupBy("Name").agg(sum("Score").alias("TotalScore")) # 显示分组聚合结果 grouped_df.show() # 停止 SparkSession spark.stop()
模块 4: PySpark Streaming
5.1 实时数据处理
流式处理简介
实时处理: 流式处理系统通常以近实时的速度处理数据,允许在数据产生后迅速作出反应,而不需要等待整个批次数据的到达。
窗口处理: 为了更好地控制和分析数据流,流处理系统通常支持窗口处理,允许将数据流划分为固定大小的时间窗口或滑动窗口,并在每个窗口上执行操作。
事件驱动: 流式处理通常是事件驱动的,系统能够在接收到新事件时触发相应的处理逻辑。
PySpark Streaming 架构
- 数据通过 Receivers 从输入源接收并划分成微批。
- Job Scheduler 将微批中的转换操作组织成 Spark 作业。
- 作业由 Worker Node 执行,计算结果存储在 DStream 中。
- Output Operations 将最终结果输出到外部系统。
模块 5: 性能优化和调试
6.1 PySpark 作业调优
合理设置资源配置:
**Executor Memory 和 Executor Cores:** 根据集群的硬件配置和作业的需求,合理设置每个 Executor 的内存和核心数。 **Driver Memory:** 确保为 Driver Program 分配足够的内存。 【sudo -u root /opt/spark/bin/spark-submit --master spark://centos1:7077 --driver-memory=4G --executor-memory 8g --conf spark.cores.max=28 test.py】
持久化中间结果:
- 对于频繁被重复使用的数据,考虑使用
persist
或cache
方法将数据持久化到内存中,以避免重复计算。
# 例子:持久化一个RDD到内存中 rdd.persist()
- 对于频繁被重复使用的数据,考虑使用
Broadcast 变量:
- 对于小数据集,可以使用 Broadcast 变量将数据广播到每个节点,减少数据传输开销。
# 例子:使用 Broadcast 变量 broadcast_var = sc.broadcast(small_data) result = large_data.map(lambda x: x + broadcast_var.value)
调整分区数:
- 根据数据的大小和集群的配置,调整 RDD 的分区数。合理的分区数可以提高并行性。
# 例子:调整分区数 rdd = rdd.repartition(100)
使用本地化数据:
- 在处理数据时,尽量将计算推送到数据而不是将数据拉取到计算节点,以减少数据传输开销。
避免 Shuffle 操作:
- Shuffle 操作是代价昂贵的,尽量减少 Shuffle 操作的次数。例如,在使用
groupByKey
时,考虑使用reduceByKey
或aggregateByKey
。
# 例子:使用 reduceByKey 替代 groupByKey result = rdd.groupByKey().mapValues(lambda values: sum(values)) # 替换为 result = rdd.reduceByKey(lambda x, y: x + y)
- Shuffle 操作是代价昂贵的,尽量减少 Shuffle 操作的次数。例如,在使用
使用 DataFrame 和 Catalyst 优化器:
- DataFrame API 和 Catalyst 优化器可以在执行计划中进行优化,提高查询性能。
# 例子:使用 DataFrame 进行查询 df.select("column1").filter(df["column2"] > 100).groupBy("column3").agg({"column4": "avg"})
模块 6: 实际项目应用
7.1 实际场景案例分析
经验教训
import os os.environ["PYSPARK_PYTHON"]="/var/lib/hive/anaconda3/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"]="/var/lib/hive/anaconda3/bin/python3"
7.2 实战项目
设计和实施一个简单的大数据项目
# coding:utf-8 import os os.environ["PYSPARK_PYTHON"]="/var/lib/hive/anaconda3/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"]="/var/lib/hive/anaconda3/bin/python3" # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 创建一个日志对象 logger = logging.getLogger(__name__) """ DESC:结算模块开发任务-票据业务 NAME:xt DATE: 2023-09-13 """ spark = create_spark_session("pj_js") spark.sparkContext.setLogLevel("ERROR") def pysql(): # 设置日志级别 # spark.sparkContext.setLogLevel("INFO") logger.info('创建临时表开始>>>>>>>>') spark.stop() logging.shutdown() # 关闭日志记录 if __name__ == '__main__': t = time.time() pysql() print(f'结算总耗时>>>>>>>>:{time.time() - t:.4f}s')
部署
1、DS设置定时
2、脚本内容【sudo -u root /opt/spark/bin/spark-submit --master spark://centos1:7077 --driver-memory=4G --executor-memory 8g --conf spark.cores.max=28 nn.py】
模块七:对比scala
1、提交任务方式不同
*提交jar包方式:sudo -u hive /opt/spark/bin/spark-submit --master spark://centos1:7077 --driver-memory=4G --executor-memory 4g --conf spark.cores.max=8 --class com_v2 /home/app/dwcwsw.jar 202309
**提交Py文件:**sudo -u root /opt/spark/bin/spark-submit --master spark://centos1:7077 --driver-memory=4G --executor-memory 8g --conf spark.cores.max=28 jnn.py
2、打包方式不同
- scala基于maven需要打包,需要相关依赖,封装比较麻烦。
- python无需这些步骤,可以直接更改业务代码