机器学习任务主要分为两种:supervised machine learning 和 unsupervised machine learning。其中supervised machine learning主要包括classification和regression,unsupervised machine learning主要包括clustering。除了这些核心的算法以外,还有一些辅助处理的模块,例如preprocessing, dimensionality reduction, model selection等。
目前最新的Spark 1.1.0版本中MLlib主要还是对核心算法的支持,辅助处理模块还很不完善。源代码包和其功能的对应关系如下:
classification/clustering/regression/tree | 分类算法、回归算法、决策树、聚类算法 |
optimization | 核心算法的优化方法实现 |
stat | 基础统计 |
feature | 预处理 |
evaluation | 算法效果衡量 |
linalg | 基础线性代数运算支持 |
recommendation | 推荐算法 |
本文主要讨论是用MLlib进行classification工作。分类是机器学习最基础的工作,典型的应用场景就是AD CTR prediction,也就是大部分互联网公司的利润来源。据业余了解,广告CTR预估使用最多的基础算法还是L1正则化的Logistic Regression。
下面一步一步来看看使用MLlib进行classification机器学习。
首先需要了解机器学习和MLlib的基础知识和原理,大家可以参考 http://spark.apache.org/docs/latest/mllib-linear-methods.html 。本文主要从工程实践的角度讨论如何使用和调优。
分类问题主要包括binary classification 和 multiclass classification。目前的MLlib只支持linear classification问题,这里讨论的也都是线性分类问题,不涉及到kernel method等。 目前MLlib里面的classification算法最常用的就是LR,SVM和Tree/RF。其中LR和SVM目前只支持binary classification,Tree/RF支持multiclass classification。 本文主要讨论使用LR和SVM进行线性binary classification问题的实践中遇到的一些问题。
抽象来看LR和SVM算法都是通过指定loss function和gradient/sub-gradient,然后通过optimization算法(SGD或LBFGS)求使得loss function最小的凸优化问题,最后得出的解是一个weights向量。 从代码中也可以看出,LR和SVM算法仅仅是指定的loss function和gradient是不同的,其求解最小值的过程是通用的,所以求解最小值的过程抽象出了optimization模块,目前主要有SGD和LBFGS两种实现。
为了防止过拟合,需要在loss function后面加入一个正则化项一起求最小值。正则化项相当于对weights向量的惩罚,期望求出一个更简单的模型。 MLlib目前支持两种正则化方法L1和L2。 L2正则化假设模型参数服从高斯分布,L2正则化函数比L1更光滑,所以更容易计算;L1假设模型参数服从拉普拉斯分布,L1正则化具备产生稀疏解的功能,从而具备feature selection的能力。
有了上面的数学基础,现在就是求取一个函数的最小值问题了。MLlib里面目前提供两种方法SGD和LBFGS。关于这两种算法的原理,大家可以参考 http://spark.apache.org/docs/latest/mllib-optimization.html 。这两种优化方法的核心都是RDD的aggregate操作,这个从Spark Job运行时的UI中可以看出,SGD/LBFGS每迭代一次,aggregate执行一次,Spark UI中出现一个stage。下面分别看看两种优化算法具体怎么实现的。
SGD:
核心实现在GradientDescent.runMiniBatchSGD函数中
for (i <- 1 to numIterations) { val bcWeights = data.context.broadcast(weights) // Sample a subset (fraction miniBatchFraction) of the total data // compute and sum up the subgradients on this subset (this is one map-reduce) val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](n), 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => (grad1 += grad2, loss1 + loss2) }) /** * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration * and regVal is the regularization value computed in the previous iteration as well. */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) val update = updater.compute( weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam) weights = update._1 regVal = update._2 }
每次根据miniBatchFraction指定的比例随机采样相应数量的样本,然后调用gradient.compute逐个计算gradient和loss并累加在一起,得到这一轮迭代总的gradient和loss的变化。然后调用updater.compute更新weights矩阵和regVal(正则化项)。也就是说gradient.compute只是利用当前weights向量计算gradient和loss的变化,而updater.compute则根据这个变化以及正则化项计算更新之后的weights和regVal。
SGD算法支持使用L1和L2正则化方法。
注意到这里weights向量在下一轮计算的过程中每个参与计算的Executor都需要,所以使用了broadcast变量把它分发到每个节点,提高了计算效率。
LBFGS:
LBFGS优化方法的核心实现在LBFGS.runLBFGS函数里面。LBFGS的实现比SGD更加依赖breeze库,它的迭代框架都是使用的breeze的LBFGS的实现,只是实现了自己的名为CostFun的DiffFunction。大家可以去LBGFS.CostFun函数中看看loss function和gradient的计算方法与SGD算法如出一辙,也是利用了RDD的aggregate操作。
和SGD不同,目前LBFGS只支持L2正则化,不支持L1正则化。 其实在breeze库里面有LBFGS + L1正则化的实现owlqn (owlqn算法默认自带L1正则化,所以在传入的参数DiffFunction中不需要显示定义正则化项,只需要定义loss function即可)是可以把它引入MLlib里面完成LBFGS+L1的功能。 这个在社区也有讨论,DB Tsai等人正在做这方面的工作。等不及的同学也可以尝试下我们自己修改的版本,引入了LBFGS+L1的功能的代码。
SGD和LBFGS两种算法的比较:
网上的资料都告诉我们说LBFGS比SGD更容易收敛,效果更好,大家可以亲自尝试下。例如选择Logistic Regression算法,选取同一个数据集,在做training和test集合的分配的时候也要一致。然后把生成的training和test的RDD分别丢到LogisticRegressionWithSGD和LogisticRegressionWithLBFGS两种具体实现算法里。其他参数要一致(例如都选择L2正则化,regParam=1.0)然后比较效果。
怎么比较效果的好坏呢?分类问题就是那些指标precision/recall/f1-score/area under ROC等。这里面有个需要注意的问题,MLlib的实现里面,SGD优化的终止条件是通过指定numIterations也就是迭代次数终止的;而LBFGS优化的终止条件是通过指定convergenceTol(两次迭代loss function变化的容忍度)和maxNumIterations(最大迭代次数)来终止的。
为了达到比较这两种优化方法的目的,需要定义一个统一的指标。由于我们优化的目标是让loss function+正则化项 最小,所以这个就是统一的指标。MLlib的日志里面会打印出迭代的最后10次的Loss大小:
//GradientDescent
logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( stochasticLossHistory.takeRight(10).mkString(", ")))
//LBFGS
logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format( lossHistory.takeRight(10).mkString(", ")))
我在一个数据集上测试,其他参数都一致(L2正则化,regParam=1.0,miniBatchFraction=1.0)的情况下,LBFGS需要14次就可以达到收敛;而对于SGD(实际上miniBatchFraction=1.0的已经不是纯种SGD了)算法大约需要10000次循环才能达到相似的loss大小。而且最后两种算法得到的weights(权重矩阵)是一样的。
训练出来的模型实际上就是一个weights向量,现在MLlib的GeneralizedLinearModel的成员变量weights和intercept都是public的了。 在训练好GeneralizedLinearModel之后,可以直接把weights和intercept打印出来,或者进行一些计算找出权重最大的几个维度。
这里有个问题,如果我的loss function和gradient是确定的,那么使用不同的优化方法求出的weights是不是应该是一样的?例如对同一个数据集,使用LogisticRegressionWithSGD和LogisticRegressionWithLBFGS分别训练出的模型是否应该是一致的?怎么衡量效果好坏?
对于这个问题,我想首先要看几点:
1) 确认两种方法使用的正则化和其他参数是一致的。
2) 通过日志查看两种优化方法的loss function+正则化项最后是否收敛?是否收敛到接近的值?
3) 如果有某种优化算法的Loss不收敛,说明那种方法的迭代次数不够(SGD)或者convergenceTol设置的太大(LBFGS),调整参数重试。如果收敛到相差较大的值,十有八九你的算法有问题。
4) 如果两种优化方法的Loss function+正则化项收敛且到接近的值,那么就要看看传统指标precision/recall/area under ROC。如果area under ROC接近1.0,说明你的数据完全线性可分或者过拟合了,这个时候打印出两种方法得到的weights应该是类似倍数关系的;如果area under ROC不是接近1.0,那么说明你的数据是真实的数据,这个时候两种方法得到的weights应该是一致的。
5) 接下来就可以使用上述的思路去调节参数优化precision/recall等指标了。
其实上面的工作还只是停留在学习算法的阶段,拿过来一些公开的dataset或者benchmark来跑跑,看看效果。这些数据集往往都是经过了别人的一些加工和处理,在实际工作中这一部分也是需要我们来做的,这就是数据预处理。数据预处理特别繁琐,但是对机器学习模型效果的好坏却非常重要。
预处理主要包括normalization,scale,outlier-detection,正负样本均衡等。例如遇到一个数据正负样本比例9:1,这样的数据直接丢到模型里面显然会让模型更偏向正例做预测。解决这个问题的方法挺多的,最简单的例子就是正例采样、负例冗余,使两者达到接近平衡。
MLlib现阶段主要精力还是在核心算法上,对预处理这部分做的不是很好,这也提高了使用门槛。在1.1.0版本开始增加了一个新的package叫feature,里面大多是preprocssing函数,包括Normalizer和StandardScaler等。 例如StandardScaler能够把feature按照列转换成mean=0,standard deviation=1的正态分布。
数据输入支持普通文本文件和libsvm格式。在1.1.0版本之前,输入的label可以是+1/-1会被自动映射成1.0/0.0,但是从1.1.0开始貌似只支持输入1.0/0.0的label了,这个和我们以前常见的libsvm格式的数据集不一样,需要注意。这个改动应该主要考虑的是对以后多分类的支持吧。
不知道大家有没有注意到一个问题,就是MLlib底层的矩阵运算使用了breeze库,breeze库提供了vector/matrix的实现以及相应计算的接口(linalg)。但是在MLlib里面同时也提供了vector和linalg等的实现(目前只是对breeze做了一层包装)。在所有的MLlib的函数里面的参数传递都是使用Mllib自己的vector,而且在函数内的矩阵计算又通过toBreeze.toDenseVector变成breeze的形式进行运算。这样做的目的一是保持自己函数接口的稳定性,不会因为breeze的变化而变化;另外一个就是可以把distributed matrix作为一种matrix的实现而被使用。
Spark集群(standalone、yarn-client、yarn-cluster、单机调试环境)。
我主要使用Scala开发,IDE为Intellij IDEA,安装Scala插件。
开发一个project可以使用maven或者sbt编译,都可以通过IDEA创建相应的工程。 Maven编译的话和Java的maven工程没啥区别,主要是修改pom.xml文件;使用sbt编译的话,主要是修改build.sbt文件。
build.sbt的格式网上有很多资料了,简单说下需要注意的问题:
1) 必须每隔一行写新的内容;
2) libraryDependencies 后面%%和%的区别:artifactId后面带/不带版本号;
3) libraryDependencies 后面可以使用”provided”使其在assembly打包的时候不被打入包中。
开发一个基于Spark和MLlib的机器学习Job,主要依赖的两个libraryDependencies就是spark-core和spark-mllib。
其实使用Scala开发Spark程序最重要的一点就是要知道你写的代码中哪些是RDD的操作,哪些是在RDD内部的操作,哪些是transform,哪些是actions,哪个地方会形成一个stage。这些搞清楚之后就明白了哪些code是在Driver上执行的,哪些是在executor上并行执行的。另外就是哪些资源相关的参数,像executor-memory和num-executors等。关于这方面的内容在后面的介绍Tree和Random forest的博客中讨论。