聚类(Clustering) 是机器学习中一类重要的方法。其主要思想使用样本的不同特征属性,根据某一给定的相似度度量方式(如欧式距离)找到相似的样本,并根据距离将样本划分成不同的组。聚类属于典型的无监督学习(Unsupervised Learning) 方法。与监督学习(如分类器)相比1,无监督学习的训练集没有人为标注的结果。在非监督式学习中,数据并不被特别标识,学习模型是为了推断出数据的一些内在结构。
较权威的聚类问题的定义是:
所谓聚类问题,就是给定一个元素集合D,其中每个元素具有n个可观察属性,使用某种算法将D划分成k个子集,要求每个子集内部的元素之间相异度尽可能低,而不同子集的元素相异度尽可能高。其中每个子集叫做一个簇。
聚类分析以相似性为基础,其目标是使同一类对象的相似度尽可能地大,不同类对象之间的相似度尽可能地小。换句话说,在一个聚类中的模式之间比不在同一聚类中的模式之间具有更多的相似性。
目前聚类的方法很多,很难对聚类方法提出一个简洁的分类,因为这些类别可能重叠,从而使得一种方法具有几类的特征。但根据基本思想的不同,聚类分析计算方法主要有如下几种:划分算法、层次算法、密度算法、图论聚类法、网格算法和模型算法等。聚类方法已经被广泛运用在如图像处理、客户精准营销、生物信息学等多个领域,也可以作为使用分类方法前对数据进行预先探索的一种手段来使用。
Spark的MLlib库提供了许多可用的聚类方法的实现,如 KMeans、高斯混合模型、Power Iteration Clustering(PIC)、隐狄利克雷分布(LDA) 以及 KMeans 方法的变种 二分KMeans(Bisecting KMeans) 和 流式KMeans(Streaming KMeans)等。
一、KMeans原理
KMeans 是一个迭代求解的聚类算法,其属于 划分(Partitioning) 型的聚类方法,即首先创建K个划分,然后迭代地将样本从一个划分转移到另一个划分来改善最终聚类的质量,KMeans 的过程大致如下:
1.根据给定的k值,选取k个样本点作为初始划分中心; 2.计算所有样本点到每一个划分中心的距离,并将所有样本点划分到距离最近的划分中心; 3.计算每个划分中样本点的平均值,将其作为新的中心; 循环进行2~3步直至达到最大迭代次数,或划分中心的变化小于某一预定义阈值
显然,初始划分中心的选取在很大程度上决定了最终聚类的质量,MLlib内置的KMeans
类提供了名为 KMeans|| 的初始划分中心选择方法,它是著名的 KMeans++ 方法的并行化版本,其思想是令初始聚类中心尽可能的互相远离,具体实现细节可以参见斯坦福大学的B Bahmani在PVLDB上的论文Scalable K-Means++,这里不再赘述。
二、数据集的读取
本文使用模式识别领域广泛使用的UCI数据集中的鸢尾花数据Iris进行实验,它可以在这里获取,Iris数据的样本容量为150,有四个实数值的特征,分别代表花朵四个部位的尺寸,以及该样本对应鸢尾花的亚种类型(共有3种亚种类型)
,如下所示:
5.1,3.5,1.4,0.2,setosa ... 5.4,3.0,4.5,1.5,versicolor ... 7.1,3.0,5.9,2.1,virginica ...
可以通过SparkContext
自带的textFile(..)
方法将文件读入,并进行转换,形成一个RDD<Vector>
,如下所示(假设代码在Spark-shell中运行,Spark-shell已自动创建了名为sc
的SparkContext
变量,下同):
- //引入相应的包,下文不再引入
- import org.apache.spark.mllib.linalg.Vectors
- import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
- //读入文件
- scala> val rawData = sc.textFile("iris.csv")
- rawData: org.apache.spark.rdd.RDD[String] = iris.csv MapPartitionsRDD[48] at textFile at <console>:33
- //将RDD[String]转换为RDD[Vector]
- scala> val trainingData = rawData.map(line => {Vectors.dense(line.split(",").filter(p => p.matches("\\d*(\\.?)\\d*")).map(_.toDouble))}).cache()
- trainingData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[49] at map at <console>:35
scala
这里我们对RDD使用了filter算子,并通过正则表达式将鸢尾花的类标签过滤掉,Scala语言的正则表达式语法与Java语言类似,正则表达式\\d*(\\.?)\\d*
可以用于匹配实数类型的数字,\\d*
使用了*
限定符,表示匹配0次或多次的数字字符,\\.?
使用了?
限定符,表示匹配0次或1次的小数点,关于正则表达式的更多知识,可以自行查阅相关资料,这里不再叙述。
三、模型训练与分析
可以通过创建一个KMeans
类并调用其run(RDD[Vector])
方法来训练一个KMeans模型KMeansModel
,在该方法调用前需要设置一系列参数,如下表所示:
| 参数 | 含义 |
| ------------------- | :---------------------: |
| K | 聚类数目,默认为2 |
| maxIterations | 最大迭代次数,默认为20 |
| initializationMode | 初始化模式,默认为"k-means||" |
| runs | 运行次数,默认为:1 |
| initializationSteps | 初始化步数,用于KMeans||,默认为5 |
| epsilon | 迭代停止的阈值,默认为1e-4 |
其中,每一个参数均可通过名为setXXX(...)
(如maxIterations即为setMaxIterations()
)的方法进行设置。
由于KMeans
类只有无参的构造函数,其对象创建、参数设置需要分别进行,且往往使用的只有存放模型的KMeansModel
类对象,花功夫创建出的KMeans
类自象本身却并未使用。故MLlib也提供了包装好的高层次方法KMeans.train(...)
,传入训练样本和相应的参数,即返回一个训练好的KMeansModel
对象,十分方便。
该方法有4个重载形式,分别可以指定不同的输入参数,具体可以查阅MLlib的API文档,这里我们使用KMeans.train(data, k, maxIterations, runs)
形式,只需要输入k值、最大迭代次数和运行次数,其他参数使用默认值,如下所示:
- //调用KMeans.train训练模型,指定k值为3,最大迭代100次,运行5次
- scala> val model : KMeansModel = KMeans.train(trainingData, 3, 100, 5)
- model: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@4e4dcf7c
scala
(细节:若iris.csv没有位于当前用户目录下,前面的rawData和trainingData对象都可以成功创建,执行到这一步时才会出现错误,这是因为RDD的transformation操作采用了懒加载策略,只有当action型的操作发生时,才会有实际的计算)
这样,模型即创建成功了。可以通过KMeansModel
类自带的clusterCenters
属性获取到模型的所有聚类中心情况:
- scala> model.clusterCenters.foreach(
- | center => {
- | println("Clustering Center:"+center)
- | })
- Clustering Center:[6.314583333333331,2.895833333333334,4.973958333333334,1.7031249999999996]
- Clustering Center:[5.19375,3.6312499999999996,1.4749999999999999,0.2718749999999999]
- Clustering Center:[4.7318181818181815,2.9272727272727277,1.7727272727272727,0.35000000000000003]
scala
也可以通过predict()
方法来确定每个样本所属的聚类:
- scala> trainingData.collect().foreach(
- | sample => {
- | val predictedCluster = model.predict(sample)
- | println(sample.toString + " belongs to cluster " + predictedCluster)
- | })
- [5.1,3.5,1.4,0.2] belongs to cluster 1
- [4.9,3.0,1.4,0.2] belongs to cluster 1
- [4.7,3.2,1.3,0.2] belongs to cluster 1
- [4.6,3.1,1.5,0.2] belongs to cluster 1
- [5.0,3.6,1.4,0.2] belongs to cluster 1
- [5.4,3.9,1.7,0.4] belongs to cluster 1
- [4.6,3.4,1.4,0.3] belongs to cluster 1
- .....
- scala> val wssse = model.computeCost(trainingData)
- wssse: Double = 78.85144142614642
scala
对于那些无法预先知道K值的情况(本文中使用的Iris数据集很明确其K值为3),可以通过 WSSSE 的计算构建出 K-WSSSE 间的相关关系,从而确定K的值,一般来说,最优的K值即是 K-WSSSE 曲线的 拐点(Elbow) 位置(当然,对于某些情况来说,我们还需要考虑K值的语义可解释性,而不仅仅是教条地参考WSSSE曲线)。