IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Eclipse调用Spark on yarn问题及解决

    fansy1990发表于 2016-08-04 15:24:13
    love 0

    软件版本:

    Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , Eclipse4.5.1,intelliJ IDEA14,JDK1.7

    集群环境:

    windows7 (包含JDK1.7,Eclipse4.5.1,IntelliJ IDEA14);

    centos6.6虚拟机(Hadoop分布式集群,JDK1.7)node1:NameNode/ResourceManager ; node2: SecondaryNameNode ; node3/node4: DataNode/NodeMananger

    node1->192.168.0.31 ;(其他节点ip以此类推)

    任务描述:

    实现Eclipse直接调用使用IDEA打包好的jar包,调用Spark相关算法;

    步骤及代码描述:

    1.  使用IDEA打包相关算法jar包,如Spark-als.jar ,在算法里面不需要设置spark master,如下代码:
    val conf = new SparkConf().setAppName("train ALS Model ")
        val sc = new SparkContext(conf)
        val ratings = sc.textFile(input).map{

    2. 在Eclipse中新建Java Project,并把Spark-als.jar 以及 spark-assembly-1.4.1-hadoop2.6.0.jar加入classpath;(其中,assembly的jar包直接解压后的lib包中获取,不需要做任何修改)

    3. 新建demo.Utils 类,如下:
    package demo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.spark.SparkConf;
    import org.apache.spark.deploy.yarn.Client;
    import org.apache.spark.deploy.yarn.ClientArguments;
    
    public class Utils {
    	private static  Configuration configuration = null;
    
    	public static Configuration getConf(){
    		if(configuration==null){
    			
    			configuration = new Configuration();
    			configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平台提交任务
    			configuration.set("fs.defaultFS", "hdfs://node1:8020");// 指定namenode
    			configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
    			configuration.set("yarn.resourcemanager.address", "node1:8032"); // 指定resourcemanager
    			configuration.set("yarn.resourcemanager.scheduler.address", "node1:8030");// 指定资源分配器
    			configuration.set("mapreduce.jobhistory.address", "node2:10020");// 指定historyserver
    		}
    		
    		return configuration;
    	}
    	
    	/**
    	 * 调用Spark
    	 * @param args
    	 * @return
    	 */
    	public static boolean runSpark(String[] args){
            try {
                System.setProperty("SPARK_YARN_MODE", "true");
                SparkConf sparkConf = new SparkConf();
                sparkConf.set("spark.yarn.jar",
                		"hdfs://node1:8020/user/root/spark-assembly-1.4.1-hadoop2.6.0.jar");
                sparkConf.set("spark.yarn.scheduler.heartbeat.interval-ms",
                		"1000");
                
                ClientArguments cArgs = new ClientArguments(args, sparkConf);
    
                new Client(cArgs, getConf(), sparkConf).run();
            }catch(Exception e){
                e.printStackTrace();
                return false;
            }
            return true ;
        }
    }
    

     3. 建立主类测试:
    package demo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class RunSpark {
    	
    	public static void main(String[] args) throws IllegalArgumentException, IOException {
    		//<input> <output> <train_percent> <ranks> <lambda> <iteration>
    		String[] inputArgs= new String[]{
    				"hdfs://node1:8020/user/root/ratings.dat",
    				"hdfs://node1:8020/user/fansy/als_output",
    				"0.8",
    				"10",
    				"10.0",
    				"20"
    		};
    		String[] runArgs=new String[]{
                    "--name","ALS Model Train ",
                    "--class","als.ALSModelTrainer",
                    //@TODO 此参数在测试时使用,否则应注释
                    "--driver-memory","512m",
                    "--num-executors", "2",
                    "--executor-memory", "512m",
                    "--jar","hdfs://node1:8020/user/root/Spark141-als.jar",//
                    //// Spark 在子节点运行driver时,只读取spark-assembly-1.4.1-hadoop2.6.0.jar中的配置文件;
                    "--files","hdfs://node1:8020/user/root/yarn-site.xml",
                    "--arg",inputArgs[0],
                    "--arg",inputArgs[1],
                    "--arg",inputArgs[2],
                    "--arg",inputArgs[3],
                    "--arg",inputArgs[4],
                    "--arg",inputArgs[5]
            };
    		FileSystem.get(Utils.getConf()).delete(new Path(inputArgs[1]), true);
    		Utils.runSpark(runArgs);
    	}
    	
    	
    }
    

    4. 上传相关文件:
    1)上传Spark-als.jar 和 spark-assembly-1.4.1-hadoop2.6.0.jar到HDFS中,上传的目录是/user/root中;
    2)上传$HADOOP_HOME/etc/hadoop/yarn-site.xml 文件到HDFS,目录是/user/root中;

    5. 直接运行,查看输出即可;





    问题描述:

    1. Spark on yarn的模式时,如果driver启动在子节点,那么会有问题(Spark1.4.1,其他版本没有测试过),具体表现为:

    也就是连接不到ResourceManager或者说读取不到配置,因为在Configuration的获取时,已经做了如下的配置:

    但是还是获取不到;
    解决方法:
    1) 修改spark-assembly-1.4.1-hadoop2.6.0.jar中的yarn-default.xml中的对应yarn.resourcemanager.host为实际的机器名(默认是0.0.0.0),然后再上传到HDFS目录中;
           但是每次换集群都需要这样做还是有点麻烦;
    2) 直接上传集群中的yarn-site.xml到HDFS,同时在代码中设置:


    2. 直接调用时,会上传用户的jar包(Spark-als.jar)以及spark-assembly-1.4.1-hadoop2.6.0.jar到HDFS上的临时文件,每次上传,效率不高,可以设置路径为HDFS路径即可,如上代码所示,

    这样就不会上传了,如下:


    3. Yarn中accept状态持续时间太长,可以设置spark.yarn.scheduler.heartbeat.interval-ms来减小时间,如下:
    未设置:

    设置后:



    分享,成长,快乐

    脚踏实地,专注

    转载请注明blog地址:http://blog.csdn.net/fansy1990





沪ICP备19023445号-2号
友情链接