RDD的创建
在Spark中,有几种常见方式可以创建RDD:
1.从内存中的集合创建:使用parallelize()
方法可以将一个集合转换为RDD。例如:
data = [1, 2, 3, 4, 5] rdd = spark.sparkContext.parallelize(data)
2.从外部存储系统读取创建:使用Spark提供的API可以从外部存储系统(如HDFS等)读取数据并创建RDD。例如:
rdd = spark.sparkContext.textFile("hdfs://path/to/your/file.txt")
- 通过转换操作创建:可以通过对已存在的RDD执行转换操作来创建新的RDD。例如:
old_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5]) new_rdd = old_rdd.map(lambda x: x * 2)
1.通过函数的结果创建:可以通过调用函数来创建RDD,其中函数的返回值会成为RDD的元素。例如:
rdd = spark.sparkContext.range(1, 100, 2)
RDD的方法
使用map()方法转换数据
map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD。
map()方法是(懒操作)转换操作,不会立即进行计算。
转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD。
使用sortBy()方法排序
第1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
第2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false。
第3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等。
除了第一个参数是必须输入的,而后面的两个参数可以不输入。
使用collect()方法查询数据
collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。
collect()方法有两种操作方式:
collect:直接调用collect返回该RDD中的所有元素,返回类型是一个Array[T]数组.
collect[U: ClassTag](f: PartialFunction[T, U]):RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数one,用于将collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”。
使用flatMap()方法转换数据
flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。
这个转换操作通常用来切分单词。
创建RDD
先查看RDD
使用map()方法分割字符串后查看RDD
使用flatMap分割字符串后查看RDD
使用take()方法查询某几个值
take(N)方法用于获取RDD的前N个元素,返回数据为数组。take()与collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。获取RDD的前5个元素
查询分割后的前5个单词
使用union()方法合并多个RDD
union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数、数据类型需要保持一致。使用union()方法合并两个RDD
使用filter()方法进行过滤
filter()方法是一种转换操作,用于过滤RDD中的元素。filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。
filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD。
创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素。
使用distinct()方法进行去重
distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。
创建一个带有重复数据的RDD,并使用distinct()方法去重。
简单的集合操作方法
intersection()方法
intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。
创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集。
subtract()方法
subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。
创建两个RDD,分别为rdd1和rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1和rdd2彼此的补集。
cartesian()方法
cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。
创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。
使用键值对RDD的reduceByKey()方法
spark提供了两种方法分别获取键值对RDD的键和值通过keys和values方法分别查看。
key.collect和value.collect。
当数据集以键值对形式展现时,合并统计键相同的值是很常用的操作。
reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当RDD中有多个键相同的键值对时,则会对每个键对应的值进行处理。reduceByKey()方法需要接收一个输入函数,键值对RDD相同键的值会根据函数进行合并并且创建一个新的RDD作为返回结果。在进行处理时,reduceByKey()方法将相同键的前两个值传给输入函数,产生一个新的返回值,新产生的返回值与RDD中相同键的下一个值组成两个元素,再传给输入函数,直到最后每个键只有一个对应的值为止。reduceByKey()方法不是一种行动操作,而是一种转换操作。
RDD的连接
join()方法
join()方法用于根据键对两个RDD进行内连接,将两个RDD中键相同的数据的值存放在一个元组中,最后只返回两个RDD中都存在的键的连接结果。例如,在两个RDD中分别有键值对(K,V)和(K,W),通过join()方法连接会返回(K,(V,W))。创建两个RDD,含有相同键和不同的键,通过join()方法进行内连接。
rightOuterJoin()方法
rightOuterJoin()方法用于根据键对两个RDD进行右外连接,连接结果是右边RDD的所有键的连接结果,不管这些键在左边RDD中是否存在。在rightOuterJoin()方法中,如果在左边RDD中有对应的键,那么连接结果中值显示为Some类型值;如果没有,那么显示为None值。
leftOuterJoin()方法
leftOuterJoin()方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反,返回结果保留左边RDD的所有键。
fullOuterJoin()方法
fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。
使用zip()方法组合两个RDD
zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。
将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
使用lookup()方法查找指定键的值
lookup(key:K)方法作用于键值对RDD,返回指定键的所有值。
Spark SQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。
•Spark SQL使用的数据抽象并非是RDD,而是DataFrame。•在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。•DataFrame使Spark具备处理大规模结构化数据的能力。•在Spark中,DataFrame是一种以RDD为基础的分布式数据集。•DataFrame的结构类似传统数据库的二维表格,可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。创建DataFrame的两种基本方式
已存在的RDD调用toDF()方法转换得到DataFrame。
通过Spark读取数据源直接创建DataFrame。
若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,在创建Dataframe之前,为了支持RDD转换成Dataframe及后续的SQL操作,需要导入import.spark.implicits._包启用隐式转换。若使用SparkSession方式创建Dataframe,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame
方法名称 | 相关说明 |
spark.read.text("people.txt") | 读取txt格式文件,创建DataFrame |
spark.read.csv ("people.csv") | 读取csv格式文件,创建DataFrame |
spark.read.json("people.json") | 读取json格式文件,创建DataFrame |
spark.read.parquet("people.parquet") | 读取parquet格式文件,创建DataFrame |
使用spark-shell进入交互界面
通过文件直接创建DataFrame
#导入 Spark 的隐式转换方法和函数的包
#通过Spark读取数据源的方式进行创建DataFrame
#使用printSchema函数查看DataFrame的数据模式输出列的名称和类型
#使用show()方法查看数据
RDD直接转换为DataFrame
创建一个命名为wsy.txt文档内容如下
连接xftp上传创建的txt文档.
新开一个窗口输入以下命令
打开Hadoop浏览目录,发现已经创建了一个spark文件夹且上传了文档
#定义一个名叫Wsy的样例类
#读取wsy.txt数据创建RDD wsyData 以空格分割
#将wsyData转换为DataFrame
#使用where()方法查询wsy中age为44的用户
DataFrame的常见操作方法
show():查看数据
方法 说明 show() 显示前20条记录 show(numRows:Int) 显示numRows条记录 show(truncate:Boolean) 是否最多只显示20个字符,默认为true show(numRows:Int,truncate:Boolean) 显示numRows条记录并设置过长字符串的显示格式
show()方法与show(true)方法查询结果一样,如需显示全部字符需使用show(false)方法
查看前4条数据
查看前4条数据并显示所有字符
获取第一条数据
head()方法获取前三条数据
take()方法获取前三条数据
takeAsList()方法获取前三条数据以列表形式展现
printSchema():查看DataFrame的Schema信息
select():查看DataFrame中选取部分列的数据及进行重命名
selectExpr():方法定义一个函数replace对wsy对象中的id字段进行转换,将字段值替换
使用方法查询对id字段使用replace函数并取别名sex,查询第一条数据
filter():实现条件查询,过滤出想要的结果查询age为44的数据groupBy():对记录进行分组
# 导入必要的模块 from pyspark.sql import SparkSession # 创建SparkSession对象 spark = SparkSession.builder.appName("GroupByExample").getOrCreate() # 创建一个DataFrame对象 data = [("Alice", 25, "Sydney"), ("Bob", 30, "Melbourne"), ("Alice", 35, "Sydney"), ("Bob", 20, "Melbourne"), ("Alice", 40, "Sydney"), ("Charlie", 45, "Perth")] df = spark.createDataFrame(data, ["Name", "Age", "City"]) # 使用groupBy()对记录按照城市进行分组 grouped_df = df.groupBy("City") # 统计每个城市的记录数 count_df = grouped_df.count() # 显示结果 count_df.show()
这个例子中,创建了一个DataFrame对象,并使用
groupBy()
函数将记录按照城市进行分组。使用count()
函数统计每个组的记录数。最后使用show()
函数显示结果。运行这个代码片段:+---------+-----+ | City|count| +---------+-----+ | Sydney| 3| |Melbourne| 2| | Perth| 1| +---------+-----+
这个结果显示了每个城市的记录数。
sort()/orderBy()方法对特定字段进行排序操作(默认升序)
降序排序
更多精彩持续更新中,关注我看我如何夺回属于我的一切>>>OvO<<<