IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    mllib实践经验(1)

    ybliang8@gmail.com发表于 2014-10-01 11:23:33
    love 0

    机器学习任务主要分为两种:supervised machine learning 和 unsupervised machine learning。其中supervised machine learning主要包括classification和regression,unsupervised machine learning主要包括clustering。除了这些核心的算法以外,还有一些辅助处理的模块,例如preprocessing, dimensionality reduction, model selection等。

    目前最新的Spark 1.1.0版本中MLlib主要还是对核心算法的支持,辅助处理模块还很不完善。源代码包和其功能的对应关系如下:

    classification/clustering/regression/tree 分类算法、回归算法、决策树、聚类算法
    optimization 核心算法的优化方法实现
    stat 基础统计
    feature 预处理
    evaluation 算法效果衡量
    linalg 基础线性代数运算支持
    recommendation 推荐算法 

    本文主要讨论是用MLlib进行classification工作。分类是机器学习最基础的工作,典型的应用场景就是AD CTR prediction,也就是大部分互联网公司的利润来源。据业余了解,广告CTR预估使用最多的基础算法还是L1正则化的Logistic Regression。

    下面一步一步来看看使用MLlib进行classification机器学习。

    1,分类算法原理与MLlib的实现

    首先需要了解机器学习和MLlib的基础知识和原理,大家可以参考 http://spark.apache.org/docs/latest/mllib-linear-methods.html 。本文主要从工程实践的角度讨论如何使用和调优。

    分类问题主要包括binary classification 和 multiclass classification。目前的MLlib只支持linear classification问题,这里讨论的也都是线性分类问题,不涉及到kernel method等。 目前MLlib里面的classification算法最常用的就是LR,SVM和Tree/RF。其中LR和SVM目前只支持binary classification,Tree/RF支持multiclass classification。 本文主要讨论使用LR和SVM进行线性binary classification问题的实践中遇到的一些问题。

    抽象来看LR和SVM算法都是通过指定loss function和gradient/sub-gradient,然后通过optimization算法(SGD或LBFGS)求使得loss function最小的凸优化问题,最后得出的解是一个weights向量。 从代码中也可以看出,LR和SVM算法仅仅是指定的loss function和gradient是不同的,其求解最小值的过程是通用的,所以求解最小值的过程抽象出了optimization模块,目前主要有SGD和LBFGS两种实现。

    为了防止过拟合,需要在loss function后面加入一个正则化项一起求最小值。正则化项相当于对weights向量的惩罚,期望求出一个更简单的模型。 MLlib目前支持两种正则化方法L1和L2。 L2正则化假设模型参数服从高斯分布,L2正则化函数比L1更光滑,所以更容易计算;L1假设模型参数服从拉普拉斯分布,L1正则化具备产生稀疏解的功能,从而具备feature selection的能力。 

    2,两种optimization方法SGD和LBFGS

    有了上面的数学基础,现在就是求取一个函数的最小值问题了。MLlib里面目前提供两种方法SGD和LBFGS。关于这两种算法的原理,大家可以参考 http://spark.apache.org/docs/latest/mllib-optimization.html 。这两种优化方法的核心都是RDD的aggregate操作,这个从Spark Job运行时的UI中可以看出,SGD/LBFGS每迭代一次,aggregate执行一次,Spark UI中出现一个stage。下面分别看看两种优化算法具体怎么实现的。

    SGD:

    核心实现在GradientDescent.runMiniBatchSGD函数中

    for (i <- 1 to numIterations) {
          val bcWeights = data.context.broadcast(weights)
          // Sample a subset (fraction miniBatchFraction) of the total data
          // compute and sum up the subgradients on this subset (this is one map-reduce)
          val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
            .treeAggregate((BDV.zeros[Double](n), 0.0))(
              seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
                val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
                (grad, loss + l)
              },
              combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
                (grad1 += grad2, loss1 + loss2)
              })
    
          /**
           * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
           * and regVal is the regularization value computed in the previous iteration as well.
           */
          stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
          val update = updater.compute(
            weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
          weights = update._1
          regVal = update._2
        }
    

    每次根据miniBatchFraction指定的比例随机采样相应数量的样本,然后调用gradient.compute逐个计算gradient和loss并累加在一起,得到这一轮迭代总的gradient和loss的变化。然后调用updater.compute更新weights矩阵和regVal(正则化项)。也就是说gradient.compute只是利用当前weights向量计算gradient和loss的变化,而updater.compute则根据这个变化以及正则化项计算更新之后的weights和regVal。

    SGD算法支持使用L1和L2正则化方法。

    注意到这里weights向量在下一轮计算的过程中每个参与计算的Executor都需要,所以使用了broadcast变量把它分发到每个节点,提高了计算效率。

    LBFGS:

    LBFGS优化方法的核心实现在LBFGS.runLBFGS函数里面。LBFGS的实现比SGD更加依赖breeze库,它的迭代框架都是使用的breeze的LBFGS的实现,只是实现了自己的名为CostFun的DiffFunction。大家可以去LBGFS.CostFun函数中看看loss function和gradient的计算方法与SGD算法如出一辙,也是利用了RDD的aggregate操作。

    和SGD不同,目前LBFGS只支持L2正则化,不支持L1正则化。 其实在breeze库里面有LBFGS + L1正则化的实现owlqn (owlqn算法默认自带L1正则化,所以在传入的参数DiffFunction中不需要显示定义正则化项,只需要定义loss function即可)是可以把它引入MLlib里面完成LBFGS+L1的功能。 这个在社区也有讨论,DB Tsai等人正在做这方面的工作。等不及的同学也可以尝试下我们自己修改的版本,引入了LBFGS+L1的功能的代码。

    SGD和LBFGS两种算法的比较:

    网上的资料都告诉我们说LBFGS比SGD更容易收敛,效果更好,大家可以亲自尝试下。例如选择Logistic Regression算法,选取同一个数据集,在做training和test集合的分配的时候也要一致。然后把生成的training和test的RDD分别丢到LogisticRegressionWithSGD和LogisticRegressionWithLBFGS两种具体实现算法里。其他参数要一致(例如都选择L2正则化,regParam=1.0)然后比较效果。

    怎么比较效果的好坏呢?分类问题就是那些指标precision/recall/f1-score/area under ROC等。这里面有个需要注意的问题,MLlib的实现里面,SGD优化的终止条件是通过指定numIterations也就是迭代次数终止的;而LBFGS优化的终止条件是通过指定convergenceTol(两次迭代loss function变化的容忍度)和maxNumIterations(最大迭代次数)来终止的。

    为了达到比较这两种优化方法的目的,需要定义一个统一的指标。由于我们优化的目标是让loss function+正则化项 最小,所以这个就是统一的指标。MLlib的日志里面会打印出迭代的最后10次的Loss大小:

    //GradientDescent

    
    logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
    
    stochasticLossHistory.takeRight(10).mkString(", ")))
    

    //LBFGS

    
    logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(
    
    lossHistory.takeRight(10).mkString(", ")))
    

    我在一个数据集上测试,其他参数都一致(L2正则化,regParam=1.0,miniBatchFraction=1.0)的情况下,LBFGS需要14次就可以达到收敛;而对于SGD(实际上miniBatchFraction=1.0的已经不是纯种SGD了)算法大约需要10000次循环才能达到相似的loss大小。而且最后两种算法得到的weights(权重矩阵)是一样的。 

    3,如何解读和分析训练出来的模型。

    训练出来的模型实际上就是一个weights向量,现在MLlib的GeneralizedLinearModel的成员变量weights和intercept都是public的了。 在训练好GeneralizedLinearModel之后,可以直接把weights和intercept打印出来,或者进行一些计算找出权重最大的几个维度。

    这里有个问题,如果我的loss function和gradient是确定的,那么使用不同的优化方法求出的weights是不是应该是一样的?例如对同一个数据集,使用LogisticRegressionWithSGD和LogisticRegressionWithLBFGS分别训练出的模型是否应该是一致的?怎么衡量效果好坏?

    对于这个问题,我想首先要看几点:

    1)  确认两种方法使用的正则化和其他参数是一致的。

    2)  通过日志查看两种优化方法的loss function+正则化项最后是否收敛?是否收敛到接近的值?

    3)  如果有某种优化算法的Loss不收敛,说明那种方法的迭代次数不够(SGD)或者convergenceTol设置的太大(LBFGS),调整参数重试。如果收敛到相差较大的值,十有八九你的算法有问题。

    4)  如果两种优化方法的Loss function+正则化项收敛且到接近的值,那么就要看看传统指标precision/recall/area under ROC。如果area under ROC接近1.0,说明你的数据完全线性可分或者过拟合了,这个时候打印出两种方法得到的weights应该是类似倍数关系的;如果area under ROC不是接近1.0,那么说明你的数据是真实的数据,这个时候两种方法得到的weights应该是一致的。

    5)  接下来就可以使用上述的思路去调节参数优化precision/recall等指标了。

    4,预处理

    其实上面的工作还只是停留在学习算法的阶段,拿过来一些公开的dataset或者benchmark来跑跑,看看效果。这些数据集往往都是经过了别人的一些加工和处理,在实际工作中这一部分也是需要我们来做的,这就是数据预处理。数据预处理特别繁琐,但是对机器学习模型效果的好坏却非常重要。

    预处理主要包括normalization,scale,outlier-detection,正负样本均衡等。例如遇到一个数据正负样本比例9:1,这样的数据直接丢到模型里面显然会让模型更偏向正例做预测。解决这个问题的方法挺多的,最简单的例子就是正例采样、负例冗余,使两者达到接近平衡。

    MLlib现阶段主要精力还是在核心算法上,对预处理这部分做的不是很好,这也提高了使用门槛。在1.1.0版本开始增加了一个新的package叫feature,里面大多是preprocssing函数,包括Normalizer和StandardScaler等。 例如StandardScaler能够把feature按照列转换成mean=0,standard deviation=1的正态分布。

    数据输入支持普通文本文件和libsvm格式。在1.1.0版本之前,输入的label可以是+1/-1会被自动映射成1.0/0.0,但是从1.1.0开始貌似只支持输入1.0/0.0的label了,这个和我们以前常见的libsvm格式的数据集不一样,需要注意。这个改动应该主要考虑的是对以后多分类的支持吧。

    5,MLlib中的vector和线性代数运算

    不知道大家有没有注意到一个问题,就是MLlib底层的矩阵运算使用了breeze库,breeze库提供了vector/matrix的实现以及相应计算的接口(linalg)。但是在MLlib里面同时也提供了vector和linalg等的实现(目前只是对breeze做了一层包装)。在所有的MLlib的函数里面的参数传递都是使用Mllib自己的vector,而且在函数内的矩阵计算又通过toBreeze.toDenseVector变成breeze的形式进行运算。这样做的目的一是保持自己函数接口的稳定性,不会因为breeze的变化而变化;另外一个就是可以把distributed matrix作为一种matrix的实现而被使用。

    6,开发环境

    Spark集群(standalone、yarn-client、yarn-cluster、单机调试环境)。

    我主要使用Scala开发,IDE为Intellij IDEA,安装Scala插件。

    开发一个project可以使用maven或者sbt编译,都可以通过IDEA创建相应的工程。 Maven编译的话和Java的maven工程没啥区别,主要是修改pom.xml文件;使用sbt编译的话,主要是修改build.sbt文件。

    build.sbt的格式网上有很多资料了,简单说下需要注意的问题:

    1) 必须每隔一行写新的内容;

    2) libraryDependencies 后面%%和%的区别:artifactId后面带/不带版本号;

    3) libraryDependencies 后面可以使用”provided”使其在assembly打包的时候不被打入包中。

    开发一个基于Spark和MLlib的机器学习Job,主要依赖的两个libraryDependencies就是spark-core和spark-mllib。

    其实使用Scala开发Spark程序最重要的一点就是要知道你写的代码中哪些是RDD的操作,哪些是在RDD内部的操作,哪些是transform,哪些是actions,哪个地方会形成一个stage。这些搞清楚之后就明白了哪些code是在Driver上执行的,哪些是在executor上并行执行的。另外就是哪些资源相关的参数,像executor-memory和num-executors等。关于这方面的内容在后面的介绍Tree和Random forest的博客中讨论。



沪ICP备19023445号-2号
友情链接