IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Spark MLlib之线性回归

    liuzhoulong发表于 2017-03-22 14:12:46
    love 0

    spark mllib 线性回归实例:

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaDoubleRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.mllib.linalg.Vectors;
    import org.apache.spark.mllib.regression.LabeledPoint;
    import org.apache.spark.mllib.regression.LinearRegressionModel;
    import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
    
    import scala.Tuple2;
    
    public class SparkMLlibLinearRegression {
    
    	public static void main(String[] args) {
    		
    		String path = "file:///data/hadoop/spark-2.0.0-bin-hadoop2.7/data/mllib/ridge-data/lpsa.data";
    		SparkConf conf = new SparkConf();
    	    JavaSparkContext sc = new JavaSparkContext(args[0], "Spark", conf);   
    		 
    	    JavaRDD<String> data = sc.textFile(path);
    		JavaRDD<LabeledPoint> parsedData = data.map(new Function<String, LabeledPoint>() {
    			@Override
    			public LabeledPoint call(String line) throws Exception {
    				String[] parts = line.split(",");
    				String[] features = parts[1].split(" ");
    				double[] v = new double[features.length];
    				for (int i = 0; i < v.length; i++) {
    					v[i] =  Double.parseDouble(features[i]);
    				}
    				return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
    			}
    		});
    		parsedData.cache();
    	
    		// Building the model
    		int numIterations = 100;
    		double stepSize = 0.00000001;
    		final LinearRegressionModel model =
    		  LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize);
    
    		// Evaluate model on training examples and compute training error
    		JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(new Function<LabeledPoint, Tuple2<Double, Double>>(){
    		@Override
    		public Tuple2<Double, Double> call(LabeledPoint point)
    					throws Exception {
    			double prediction = model.predict(point.features());
    			return new Tuple2<Double, Double>(prediction, point.label());
    		}
    			
    		});
    		
    		double MSE = new JavaDoubleRDD(valuesAndPreds.map(
    		  new Function<Tuple2<Double, Double>, Object>() {
    		    public Object call(Tuple2<Double, Double> pair) {
    		      return Math.pow(pair._1() - pair._2(), 2.0);
    		    }
    		  }
    		).rdd()).mean();
    		
    		System.out.println("training Mean Squared Error = " + MSE);
    
    		// Save and load model
    		model.save(sc.sc(), "target/tmp/javaLinearRegressionWithSGDModel");
    		LinearRegressionModel sameModel = LinearRegressionModel.load(sc.sc(),
    		  "target/tmp/javaLinearRegressionWithSGDModel");
    		
             double result = sameModel.predict(Vectors.dense(-0.132431544081234,2.68769877553723,1.09253092365124,1.53428167116843,-0.522940888712441,-0.442797990776478,0.342627053981254,-0.687186906466865));
    		 System.out.println(sameModel.weights());
             System.out.println("save predict result="+ result);
             
             result = model.predict(Vectors.dense(-0.132431544081234,2.68769877553723,1.09253092365124,1.53428167116843,-0.522940888712441,-0.442797990776478,0.342627053981254,-0.687186906466865));
    		 System.out.println(model.weights());
             System.out.println("predict result="+ result);
    	
    	}
    
    }

    运行:spark-submit --class com.test.hadoop.SparkMLlibLinearRegression --master yarn --executor-memory 1024M --total-executor-cores 1 ./MRTest-1.0-jar-with-dependencies.jar  yarn


    训练出来的model保存在了hdfs上target/tmp/javaLinearRegressionWithSGDModel,下次使用直接load后就可以用来做预测。如上面sameModel和model最后的结果相同。



沪ICP备19023445号-2号
友情链接