从零开始学习大数据之Spark(二)-RDD的创建和Spark SQL中的DataFrame的创建与查看(巨详细新手必备)

avatar
作者
筋斗云
阅读量:3

RDD的创建

在Spark中,有几种常见方式可以创建RDD:

1.从内存中的集合创建:使用parallelize()方法可以将一个集合转换为RDD。例如:

data = [1, 2, 3, 4, 5] rdd = spark.sparkContext.parallelize(data) 

2.从外部存储系统读取创建:使用Spark提供的API可以从外部存储系统(如HDFS等)读取数据并创建RDD。例如:

rdd = spark.sparkContext.textFile("hdfs://path/to/your/file.txt") 
  1. 通过转换操作创建:可以通过对已存在的RDD执行转换操作来创建新的RDD。例如:
old_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5]) new_rdd = old_rdd.map(lambda x: x * 2) 

1.通过函数的结果创建:可以通过调用函数来创建RDD,其中函数的返回值会成为RDD的元素。例如:

rdd = spark.sparkContext.range(1, 100, 2) 

RDD的方法

使用map()方法转换数据

map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD

map()方法是(懒操作)转换操作,不会立即进行计算。

转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD

使用sortBy()方法排序

第1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。

第2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false

第3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等

除了第一个参数是必须输入的,而后面的两个参数可以不输入。

使用collect()方法查询数据

collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

collect()方法有两种操作方式:

collect:直接调用collect返回该RDD中的所有元素,返回类型是一个Array[T]数组.

collect[U: ClassTag](f: PartialFunction[T, U])RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数one,用于将collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”

使用flatMap()方法转换数据

flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD

这个转换操作通常用来切分单词。

创建RDD

先查看RDD

使用map()方法分割字符串后查看RDD

使用flatMap分割字符串后查看RDD

使用take()方法查询某几个值

take(N)方法用于获取RDD的前N个元素,返回数据为数组。take()与collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。获取RDD的前5个元素

查询分割后的前5个单词

使用union()方法合并多个RDD

union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数、数据类型需要保持一致。使用union()方法合并两个RDD

使用filter()方法进行过滤

filter()方法是一种转换操作,用于过滤RDD中的元素。filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。

filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD

创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素。

使用distinct()方法进行去重

distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。

创建一个带有重复数据的RDD,并使用distinct()方法去重。

简单的集合操作方法

intersection()方法

intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。

创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集。

subtract()方法

subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。

创建两个RDD,分别为rdd1rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1rdd2彼此的补集。

cartesian()方法

cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。

创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。

使用键值对RDDreduceByKey()方法

spark提供了两种方法分别获取键值对RDD的键和值通过keys和values方法分别查看。

key.collect和value.collect。

当数据集以键值对形式展现时,合并统计键相同的值是很常用的操作。

reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当RDD中有多个键相同的键值对时,则会对每个键对应的值进行处理。reduceByKey()方法需要接收一个输入函数,键值对RDD相同键的值会根据函数进行合并并且创建一个新的RDD作为返回结果。在进行处理时,reduceByKey()方法将相同键的前两个值传给输入函数,产生一个新的返回值,新产生的返回值与RDD中相同键的下一个值组成两个元素,再传给输入函数,直到最后每个键只有一个对应的值为止。reduceByKey()方法不是一种行动操作,而是一种转换操作。

RDD的连接

join()方法

join()方法用于根据键对两个RDD进行内连接,将两个RDD中键相同的数据的值存放在一个元组中,最后只返回两个RDD中都存在的键的连接结果。例如,在两个RDD中分别有键值对(K,V)(K,W),通过join()方法连接会返回(K,(V,W))。创建两个RDD,含有相同键和不同的键,通过join()方法进行内连接。

rightOuterJoin()方法

rightOuterJoin()方法用于根据键对两个RDD进行右外连接,连接结果是右边RDD的所有键的连接结果,不管这些键在左边RDD中是否存在。在rightOuterJoin()方法中,如果在左边RDD中有对应的键,那么连接结果中值显示为Some类型值;如果没有,那么显示为None值。

leftOuterJoin()方法

leftOuterJoin()方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反,返回结果保留左边RDD的所有键。

fullOuterJoin()方法

fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。

使用zip()方法组合两个RDD

zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。

将两个RDD组合成Key/Value形式的RDD,这里要求两个RDDpartition数量以及元素数量都相同,否则会抛出异常

使用lookup()方法查找指定键的值

lookup(key:K)方法作用于键值对RDD,返回指定键的所有值。

Spark SQL

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理

•Spark SQL使用的数据抽象并非是RDD,而是DataFrame。•在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。•DataFrame使Spark具备处理大规模结构化数据的能力。•在Spark中,DataFrame是一种以RDD为基础的分布式数据集。•DataFrame的结构类似传统数据库的二维表格,可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。

创建DataFrame的两种基本方式

已存在的RDD调用toDF()方法转换得到DataFrame。

通过Spark读取数据源直接创建DataFrame。

若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,在创建Dataframe之前,为了支持RDD转换成Dataframe及后续的SQL操作,需要导入import.spark.implicits._包启用隐式转换。若使用SparkSession方式创建Dataframe,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame

方法名称

相关说明

spark.read.text("people.txt")

读取txt格式文件,创建DataFrame

spark.read.csv ("people.csv")

读取csv格式文件,创建DataFrame

spark.read.json("people.json")

读取json格式文件,创建DataFrame

spark.read.parquet("people.parquet")

读取parquet格式文件,创建DataFrame

使用spark-shell进入交互界面

通过文件直接创建DataFrame

#导入 Spark 的隐式转换方法和函数的包

#通过Spark读取数据源的方式进行创建DataFrame

#使用printSchema函数查看DataFrame的数据模式输出列的名称和类型

#使用show()方法查看数据

RDD直接转换为DataFrame

创建一个命名为wsy.txt文档内容如下

连接xftp上传创建的txt文档.

新开一个窗口输入以下命令

打开Hadoop浏览目录,发现已经创建了一个spark文件夹且上传了文档

#定义一个名叫Wsy的样例类

#读取wsy.txt数据创建RDD wsyData 以空格分割

#将wsyData转换为DataFrame

#使用where()方法查询wsy中age为44的用户

DataFrame的常见操作方法

show():查看数据

方法说明
show()显示前20条记录
show(numRows:Int)显示numRows条记录
show(truncate:Boolean)是否最多只显示20个字符,默认为true
show(numRows:Int,truncate:Boolean)显示numRows条记录并设置过长字符串的显示格式

show()方法与show(true)方法查询结果一样,如需显示全部字符需使用show(false)方法

查看前4条数据

查看前4条数据并显示所有字符

获取第一条数据

head()方法获取前三条数据

take()方法获取前三条数据

takeAsList()方法获取前三条数据以列表形式展现

printSchema():查看DataFrame的Schema信息

select():查看DataFrame中选取部分列的数据及进行重命名

selectExpr():方法定义一个函数replace对wsy对象中的id字段进行转换,将字段值替换

使用方法查询对id字段使用replace函数并取别名sex,查询第一条数据

filter():实现条件查询,过滤出想要的结果查询age为44的数据groupBy():对记录进行分组

# 导入必要的模块 from pyspark.sql import SparkSession  # 创建SparkSession对象 spark = SparkSession.builder.appName("GroupByExample").getOrCreate()  # 创建一个DataFrame对象 data = [("Alice", 25, "Sydney"),         ("Bob", 30, "Melbourne"),         ("Alice", 35, "Sydney"),         ("Bob", 20, "Melbourne"),         ("Alice", 40, "Sydney"),         ("Charlie", 45, "Perth")]  df = spark.createDataFrame(data, ["Name", "Age", "City"])  # 使用groupBy()对记录按照城市进行分组 grouped_df = df.groupBy("City")  # 统计每个城市的记录数 count_df = grouped_df.count()  # 显示结果 count_df.show() 

这个例子中,创建了一个DataFrame对象,并使用groupBy()函数将记录按照城市进行分组。使用count()函数统计每个组的记录数。最后使用show()函数显示结果。运行这个代码片段:

+---------+-----+ |     City|count| +---------+-----+ |   Sydney|    3| |Melbourne|    2| |    Perth|    1| +---------+-----+ 

这个结果显示了每个城市的记录数。

 sort()/orderBy()方法对特定字段进行排序操作(默认升序)

降序排序

更多精彩持续更新中,关注我看我如何夺回属于我的一切>>>OvO<<< 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!