PySpark实战

avatar
作者
筋斗云
阅读量:0

文章目录

模块 1: PySpark 简介

1.1 什么是 PySpark?
  • PySpark 的定义和特点

    PySpark 是 Apache Spark 提供的 Python API,允许使用 Python 编程语言访问 Spark 的分布式计算引擎。通过 PySpark,开发者可以使用 Python 进行大规模数据处理、分析、机器学习和图计算等任务,充分发挥 Spark 引擎的性能和分布式计算能力。

  • PySpark 与 Spark 的关系

    PySparkSpark 的一个特定版本,是 Spark 为 Python 提供的 API。它允许开发者使用 Python 编程语言访问 Spark 引擎,从而能够利用 Spark 引擎的分布式计算能力进行大规模数据处理、机器学习、流式处理等任务。

  • RDD

1.2 PySpark 生态系统
  • PySpark 的组件和模块

    Spark Core:

    • 描述: Spark 的核心组件,提供了分布式任务调度、内存管理和错误恢复等基本功能。

    • 重要概念: 弹性分布式数据集(RDD)、SparkContext。

    Spark SQL:

    • 描述: 提供了使用 SQL 查询语言进行结构化数据处理的功能。

    Spark Streaming:

    • 描述: 用于实时数据流处理的模块,支持从多个数据源接收实时数据流。

模块 2: PySpark 基础

2.1 PySpark 安装和配置
  • 安装 PySpark

    1、安装java

    2、安装 Apache Spark

    3、设置环境变量【export SPARK_HOME=/path/to/spark export PATH= P A T H : PATH: PATH:SPARK_HOME/bin】

    4、安装 Python

    5、安装 PySpark【pip install pyspark】

    6、pyspark进入互动行【from pyspark.sql import SparkSession spark = SparkSession.builder.appName(“example”).getOrCreate()】

  • 配置 PySpark 环境

    export SPARK_HOME=/path/to/spark
    export PATH= P A T H : PATH: PATH:SPARK_HOME/bin
    export PYSPARK_PYTHON=/path/to/python
    export PYSPARK_DRIVER_PYTHON=/path/to/python

2.2 PySpark 编程基础
  • RDD (弹性分布式数据集) 简介

    RDD(Resilient Distributed Dataset,弹性分布式数据集)是 Spark 中的基本数据抽象,用于在分布式计算环境中表示和处理数据。RDD 具有弹性(Resilient)和分布式(Distributed)的特性,使其能够有效地处理大规模数据集。

  • PySpark 操作(Transformation 和 Action)

    转换算子:

    Map【rdd = sc.parallelize([1, 2, 3, 4, 5]) squared_rdd = rdd.map(lambda x: x*x)】

    filter【even_rdd = rdd.filter(lambda x: x % 2 == 0)】

    flatmap【words_rdd = rdd.flatMap(lambda x: str(x).split(“,”))】

    行动算子:

    collect: 将 RDD 中的所有元素收集到一个列表中。【collected_data = squared_rdd.collect()】

    count: 返回 RDD 中的元素数量。【count = rdd.count()】

    reduce: 使用给定的函数逐个减少 RDD 中的元素。【sum_result = rdd.reduce(lambda x, y: x + y)】

    foreach: 对 RDD 中的每个元素应用一个函数,通常用于在集群中执行分布式操作。

2.3 PySpark SQL 基础
  • DataFrame 和 Dataset 简介

    语言支持: DataFrame: DataFrame 可以在多种编程语言中使用, 是一个带有结构信息的分布式数据集,包括 Python、Scala、Java 和 R。 Dataset: Dataset 主要是 Scala 和 Java 中的概念,在 Python 中的支持相对较少。 编码区别: # DataFrame API 示例 df.select("column1", "column2").filter(df["column3"] > 10).show() # Dataset API 示例 ds.select("column1", "column2").filter("column3 > 10").show() 
  • Spark SQL 查询

    • PySpark 提供了类似 SQL 的查询语言,可以使用 DataFrame API 或者直接运行 SQL 查询。
    #dataframe过滤操作 df.select("column1", "column2").filter(df["column3"] > 10) #创建临时表 df.createOrReplaceTempView("my_table")   1、spark.sql("业务Sql").show  2、spark.sql("select * from table [join other_table]").createOrReplaceTempView("tmp1")  3、jgDF = spark.sql("select nsrsbh from table ").toDF("nsrsbh")   4、jgDF.write.mode(SaveMode.Overwrite[Append]).partitionBy("nd[分区字段]").format("orc").saveAsTable("sjjsq_db.tablename") 
  • 内置函数和UDF (用户定义函数)

    【PySpark:】

    def check_hj(hj):    for sublist in result:        if hj in sublist:            return "NIL"    return "N" check_hj_udf = udf(check_hj, StringType()) 

    【Scala:】

    object getCwsjJeFj {  //自定义UDF函数  val getCwsjjeFj = (jyje: Double) => {    if (jyje > 0 && jyje < 500000) {      "A"    } else if (0 == jyje) {      "Y"    } else {      "Z"    }  } } spark.sqlContext.udf.register("getJyjeFj", getCwsjjeFj) 

模块 3: 数据处理和分析

3.1 数据导入和导出
  • 读取和写入各种数据格式

    • 读取数据
      • 读取text【text_data = spark.read.text(“path/to/textfile”)】
      • 读取csv 【csv_data = spark.read.csv(“path/to/csvfile”, header=True, inferSchema=True)】
      • 读取json【csv_data = spark.read.csv(“path/to/csvfile”, header=True, inferSchema=True)】
      • 读取orc【orc_data = spark.read.orc(“path/to/orcfile”)】
      • 读取delta lake【delta_data = spark.read.format(“delta”).table(“your_table_name”)】
    • 写入数据
      • 写入text【text_data.write.text(“path/to/output”)】
      • 写入csv【csv_data.write.csv(“path/to/output”, header=True)】
      • 写入json【json_data.write.json(“path/to/output”)】
      • 写入orc【orc_data.write.orc(“path/to/output”)】
      • 写入delta lake【delta_data.write.format(“delta”).save(“path/to/output”)】
  • pandas读取一样可行

    • import pandas as pd  # 读取 CSV 文件 df = pd.read_csv("path/to/your/file.csv")  # 显示 DataFrame 的前几行 print(df.head()) 
  • 外部数据源的连接

    from pyspark.sql import SparkSession  # 创建 SparkSession spark = SparkSession.builder.appName("JDBCExample").getOrCreate()  # 定义连接属性 url = "jdbc:mysql://hostname:port/database_name" properties = {"user": "username", "password": "password", "driver": "com.mysql.jdbc.Driver"}  # 从 JDBC 数据源读取数据 df = spark.read.jdbc(url=url, table="your_table_name", properties=properties)  # 将数据写入 JDBC 数据源 df.write.jdbc(url=url, table="your_table_name", mode="overwrite", properties=properties)  # 停止 SparkSession spark.stop()  
3.2 数据清理和转换
  • 数据类型转换

    • 使用代码
    from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType  # 创建 SparkSession spark = SparkSession.builder.appName("DataTypeConversionExample").getOrCreate()  # 示例数据框 data = [("Alice", "25"), ("Bob", "30"), ("Charlie", "35")] columns = ["Name", "Age"]  df = spark.createDataFrame(data, columns)  # 显示原始数据框 print("Original DataFrame:") df.show()  # 将 "Age" 列的数据类型转换为整数 df_converted = df.withColumn("Age", df["Age"].cast(IntegerType()))  # 显示转换后的数据框 print("\nDataFrame after data type conversion:") df_converted.show()  # 停止 SparkSession spark.stop()  
    • 使用SQL表达式进行转换
    from pyspark.sql import SparkSession  # 创建 SparkSession spark = SparkSession.builder.appName("SQLDataTypeConversionExample").getOrCreate()  # 示例数据框 data = [("Alice", "25"), ("Bob", "30"), ("Charlie", "35")] columns = ["Name", "Age"]  df = spark.createDataFrame(data, columns)  # 使用 SQL 表达式将 "Age" 列的数据类型转换为整数 df.createOrReplaceTempView("my_table") df_converted_sql = spark.sql("SELECT Name, CAST(Age AS INT) AS Age FROM my_table")  # 显示转换后的数据框 print("DataFrame after data type conversion using SQL expression:") df_converted_sql.show()  # 停止 SparkSession spark.stop()  
  • 数据筛选

    • 筛选
    from pyspark.sql import SparkSession  # 创建 SparkSession spark = SparkSession.builder.appName("FilterExample").getOrCreate()  # 示例数据框 data = [("Alice", 25, "F"), ("Bob", 30, "M"), ("Charlie", 35, "M")] columns = ["Name", "Age", "Gender"]  df = spark.createDataFrame(data, columns)  # 显示原始数据框 print("Original DataFrame:") df.show()  # 筛选年龄大于等于 30 的行 filtered_df = df.filter(df["Age"] >= 30)  # 显示筛选后的数据框 print("\nDataFrame after filtering:") filtered_df.show()  # 停止 SparkSession spark.stop()  
3.3 数据聚合和分组
  • 聚合函数的使用

    from pyspark.sql import SparkSession from pyspark.sql.functions import count, sum, avg, min, max  # 创建 SparkSession spark = SparkSession.builder.appName("AggregationExample").getOrCreate()  # 示例数据框 data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns)  # 显示原始数据框 print("Original DataFrame:") df.show()  # 计算总行数 row_count = df.count() print(f"\nRow count: {row_count}")  # 计算年龄列的总和、平均值、最小值和最大值 result = df.agg(sum("Age").alias("TotalAge"),                 avg("Age").alias("AverageAge"),                 min("Age").alias("MinAge"),                 max("Age").alias("MaxAge"))  # 显示聚合结果 result.show()  # 停止 SparkSession spark.stop()  
  • 分组和汇总数据

    from pyspark.sql import SparkSession from pyspark.sql.functions import sum  # 创建 SparkSession spark = SparkSession.builder.appName("GroupByAggregationExample").getOrCreate()  # 示例数据框 data = [("Alice", "Math", 90),         ("Alice", "English", 95),         ("Bob", "Math", 88),         ("Bob", "English", 92),         ("Charlie", "Math", 75),         ("Charlie", "English", 80)]  columns = ["Name", "Subject", "Score"] df = spark.createDataFrame(data, columns)  # 显示原始数据框 print("Original DataFrame:") df.show()  # 按姓名分组,计算每个姓名的总分 grouped_df = df.groupBy("Name").agg(sum("Score").alias("TotalScore"))  # 显示分组聚合结果 grouped_df.show()  # 停止 SparkSession spark.stop()  

模块 4: PySpark Streaming

5.1 实时数据处理
  • 流式处理简介

    实时处理: 流式处理系统通常以近实时的速度处理数据,允许在数据产生后迅速作出反应,而不需要等待整个批次数据的到达。

    窗口处理: 为了更好地控制和分析数据流,流处理系统通常支持窗口处理,允许将数据流划分为固定大小的时间窗口或滑动窗口,并在每个窗口上执行操作。

    事件驱动: 流式处理通常是事件驱动的,系统能够在接收到新事件时触发相应的处理逻辑。

  • PySpark Streaming 架构

    1. 数据通过 Receivers 从输入源接收并划分成微批。
    2. Job Scheduler 将微批中的转换操作组织成 Spark 作业。
    3. 作业由 Worker Node 执行,计算结果存储在 DStream 中。
    4. Output Operations 将最终结果输出到外部系统。

模块 5: 性能优化和调试

6.1 PySpark 作业调优
  • 合理设置资源配置:

    **Executor Memory 和 Executor Cores:** 根据集群的硬件配置和作业的需求,合理设置每个 Executor 的内存和核心数。  **Driver Memory:** 确保为 Driver Program 分配足够的内存。  【sudo -u root /opt/spark/bin/spark-submit --master spark://centos1:7077 --driver-memory=4G  --executor-memory 8g  --conf spark.cores.max=28  test.py】 
  • 持久化中间结果:

    • 对于频繁被重复使用的数据,考虑使用 persistcache 方法将数据持久化到内存中,以避免重复计算。
    # 例子:持久化一个RDD到内存中 rdd.persist() 
  • Broadcast 变量:

    • 对于小数据集,可以使用 Broadcast 变量将数据广播到每个节点,减少数据传输开销。
    # 例子:使用 Broadcast 变量 broadcast_var = sc.broadcast(small_data) result = large_data.map(lambda x: x + broadcast_var.value) 
  • 调整分区数:

    • 根据数据的大小和集群的配置,调整 RDD 的分区数。合理的分区数可以提高并行性。
    # 例子:调整分区数 rdd = rdd.repartition(100) 
  • 使用本地化数据:

    • 在处理数据时,尽量将计算推送到数据而不是将数据拉取到计算节点,以减少数据传输开销。
  • 避免 Shuffle 操作:

    • Shuffle 操作是代价昂贵的,尽量减少 Shuffle 操作的次数。例如,在使用 groupByKey 时,考虑使用 reduceByKeyaggregateByKey
    # 例子:使用 reduceByKey 替代 groupByKey result = rdd.groupByKey().mapValues(lambda values: sum(values)) # 替换为 result = rdd.reduceByKey(lambda x, y: x + y) 
  • 使用 DataFrame 和 Catalyst 优化器:

    • DataFrame API 和 Catalyst 优化器可以在执行计划中进行优化,提高查询性能。
    # 例子:使用 DataFrame 进行查询 df.select("column1").filter(df["column2"] > 100).groupBy("column3").agg({"column4": "avg"}) 

模块 6: 实际项目应用

7.1 实际场景案例分析
  • 经验教训

    import os os.environ["PYSPARK_PYTHON"]="/var/lib/hive/anaconda3/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"]="/var/lib/hive/anaconda3/bin/python3" 
7.2 实战项目
  • 设计和实施一个简单的大数据项目

    # coding:utf-8 import os os.environ["PYSPARK_PYTHON"]="/var/lib/hive/anaconda3/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"]="/var/lib/hive/anaconda3/bin/python3" # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 创建一个日志对象 logger = logging.getLogger(__name__)  """     DESC:结算模块开发任务-票据业务     NAME:xt     DATE: 2023-09-13 """ spark = create_spark_session("pj_js") spark.sparkContext.setLogLevel("ERROR") def pysql():     # 设置日志级别     # spark.sparkContext.setLogLevel("INFO")     logger.info('创建临时表开始>>>>>>>>')          spark.stop()     logging.shutdown()   # 关闭日志记录  if __name__ == '__main__':     t = time.time()     pysql()     print(f'结算总耗时>>>>>>>>:{time.time() - t:.4f}s') 
  • 部署

    1、DS设置定时

    2、脚本内容【sudo -u root /opt/spark/bin/spark-submit --master spark://centos1:7077 --driver-memory=4G --executor-memory 8g --conf spark.cores.max=28 nn.py】

模块七:对比scala

1、提交任务方式不同
  • *提交jar包方式:sudo -u hive /opt/spark/bin/spark-submit --master spark://centos1:7077 --driver-memory=4G --executor-memory 4g --conf spark.cores.max=8 --class com_v2 /home/app/dwcwsw.jar 202309

  • **提交Py文件:**sudo -u root /opt/spark/bin/spark-submit --master spark://centos1:7077 --driver-memory=4G --executor-memory 8g --conf spark.cores.max=28 jnn.py

2、打包方式不同
  • scala基于maven需要打包,需要相关依赖,封装比较麻烦。
  • python无需这些步骤,可以直接更改业务代码

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!