Spark MLlib之KMeans实例:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; public class SparkMLlibKMeans { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("K-means Example"); JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse data String path = "file:///data/hadoop/spark-2.0.0-bin-hadoop2.7/data/mllib/kmeans_data.txt"; JavaRDD<String> data = sc.textFile(path); JavaRDD<Vector> parsedData = data.map(new Function<String, Vector>() { public Vector call(String s) { String[] sarray = s.split(" "); double[] values = new double[sarray.length]; for (int i = 0; i < sarray.length; i++) values[i] = Double.parseDouble(sarray[i]); return Vectors.dense(values); } }); parsedData.cache(); // Cluster the data into two classes using KMeans int numIterations = 20; for (int numClusters = 1; numClusters <= 6; numClusters++) {//k值 KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations); // Evaluate clustering by computing Within Set Sum of Squared Errors double WSSSE = clusters.computeCost(parsedData.rdd()); System.out.println("k=" + numClusters+ " Within Set Sum of Squared Errors = " + WSSSE); Vector[] vs = clusters.clusterCenters(); int clusteridx = clusters.predict(Vectors.dense(0.2,0.2,0.2)); System.out.println("(0.2,0.2,0.2) is cluster " + clusteridx); for (Vector v : vs) { System.out.println("cluser center=" + v); } } sc.close(); } }
上面例子中迭代算出k取值1到6个中心点的情况,并计算输出computeCost 和聚合后的中心点,预测点0.2 0.2 0.2 属于聚合后的那个中心点。
原始数据:
运行结果: