Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , Eclipse4.5.1,intelliJ IDEA14,JDK1.7
集群环境:
windows7 (包含JDK1.7,Eclipse4.5.1,IntelliJ IDEA14);
centos6.6虚拟机(Hadoop分布式集群,JDK1.7)node1:NameNode/ResourceManager ; node2: SecondaryNameNode ; node3/node4: DataNode/NodeMananger
node1->192.168.0.31 ;(其他节点ip以此类推)
val conf = new SparkConf().setAppName("train ALS Model ") val sc = new SparkContext(conf) val ratings = sc.textFile(input).map{
package demo; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.deploy.yarn.Client; import org.apache.spark.deploy.yarn.ClientArguments; public class Utils { private static Configuration configuration = null; public static Configuration getConf(){ if(configuration==null){ configuration = new Configuration(); configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平台提交任务 configuration.set("fs.defaultFS", "hdfs://node1:8020");// 指定namenode configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架 configuration.set("yarn.resourcemanager.address", "node1:8032"); // 指定resourcemanager configuration.set("yarn.resourcemanager.scheduler.address", "node1:8030");// 指定资源分配器 configuration.set("mapreduce.jobhistory.address", "node2:10020");// 指定historyserver } return configuration; } /** * 调用Spark * @param args * @return */ public static boolean runSpark(String[] args){ try { System.setProperty("SPARK_YARN_MODE", "true"); SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.yarn.jar", "hdfs://node1:8020/user/root/spark-assembly-1.4.1-hadoop2.6.0.jar"); sparkConf.set("spark.yarn.scheduler.heartbeat.interval-ms", "1000"); ClientArguments cArgs = new ClientArguments(args, sparkConf); new Client(cArgs, getConf(), sparkConf).run(); }catch(Exception e){ e.printStackTrace(); return false; } return true ; } }
package demo; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class RunSpark { public static void main(String[] args) throws IllegalArgumentException, IOException { //<input> <output> <train_percent> <ranks> <lambda> <iteration> String[] inputArgs= new String[]{ "hdfs://node1:8020/user/root/ratings.dat", "hdfs://node1:8020/user/fansy/als_output", "0.8", "10", "10.0", "20" }; String[] runArgs=new String[]{ "--name","ALS Model Train ", "--class","als.ALSModelTrainer", //@TODO 此参数在测试时使用,否则应注释 "--driver-memory","512m", "--num-executors", "2", "--executor-memory", "512m", "--jar","hdfs://node1:8020/user/root/Spark141-als.jar",// //// Spark 在子节点运行driver时,只读取spark-assembly-1.4.1-hadoop2.6.0.jar中的配置文件; "--files","hdfs://node1:8020/user/root/yarn-site.xml", "--arg",inputArgs[0], "--arg",inputArgs[1], "--arg",inputArgs[2], "--arg",inputArgs[3], "--arg",inputArgs[4], "--arg",inputArgs[5] }; FileSystem.get(Utils.getConf()).delete(new Path(inputArgs[1]), true); Utils.runSpark(runArgs); } }