spark mllib 线性回归实例:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; import org.apache.spark.mllib.regression.LinearRegressionWithSGD; import scala.Tuple2; public class SparkMLlibLinearRegression { public static void main(String[] args) { String path = "file:///data/hadoop/spark-2.0.0-bin-hadoop2.7/data/mllib/ridge-data/lpsa.data"; SparkConf conf = new SparkConf(); JavaSparkContext sc = new JavaSparkContext(args[0], "Spark", conf); JavaRDD<String> data = sc.textFile(path); JavaRDD<LabeledPoint> parsedData = data.map(new Function<String, LabeledPoint>() { @Override public LabeledPoint call(String line) throws Exception { String[] parts = line.split(","); String[] features = parts[1].split(" "); double[] v = new double[features.length]; for (int i = 0; i < v.length; i++) { v[i] = Double.parseDouble(features[i]); } return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); } }); parsedData.cache(); // Building the model int numIterations = 100; double stepSize = 0.00000001; final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize); // Evaluate model on training examples and compute training error JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(new Function<LabeledPoint, Tuple2<Double, Double>>(){ @Override public Tuple2<Double, Double> call(LabeledPoint point) throws Exception { double prediction = model.predict(point.features()); return new Tuple2<Double, Double>(prediction, point.label()); } }); double MSE = new JavaDoubleRDD(valuesAndPreds.map( new Function<Tuple2<Double, Double>, Object>() { public Object call(Tuple2<Double, Double> pair) { return Math.pow(pair._1() - pair._2(), 2.0); } } ).rdd()).mean(); System.out.println("training Mean Squared Error = " + MSE); // Save and load model model.save(sc.sc(), "target/tmp/javaLinearRegressionWithSGDModel"); LinearRegressionModel sameModel = LinearRegressionModel.load(sc.sc(), "target/tmp/javaLinearRegressionWithSGDModel"); double result = sameModel.predict(Vectors.dense(-0.132431544081234,2.68769877553723,1.09253092365124,1.53428167116843,-0.522940888712441,-0.442797990776478,0.342627053981254,-0.687186906466865)); System.out.println(sameModel.weights()); System.out.println("save predict result="+ result); result = model.predict(Vectors.dense(-0.132431544081234,2.68769877553723,1.09253092365124,1.53428167116843,-0.522940888712441,-0.442797990776478,0.342627053981254,-0.687186906466865)); System.out.println(model.weights()); System.out.println("predict result="+ result); } }
运行:spark-submit --class com.test.hadoop.SparkMLlibLinearRegression --master yarn --executor-memory 1024M --total-executor-cores 1 ./MRTest-1.0-jar-with-dependencies.jar yarn