IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Java Web提交任务到Spark

    fansy1990发表于 2015-08-26 12:51:17
    love 0

    相关软件版本:

    Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7

    机器:

    windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);

    centos6.6虚拟机(Hadoop伪分布式集群,Spark standAlone集群,JDK1.8);

    centos7虚拟机(Tomcat,JDK1.8);

    1. 场景:

    1. windows简单java程序调用Spark,执行Scala开发的Spark程序,这里包含两种模式:

        1> 提交任务到Spark集群,使用standAlone模式执行;

        2> 提交任务到Yarn集群,使用yarn-client的模式;

    2. windows 开发java web程序调用Spark,执行Scala开发的Spark程序,同样包含两种模式,参考1.

    3. linux运行java web程序调用Spark,执行Scala开发的Spark程序,包含两种模式,参考1.


    2. 实现:

    1. 简单Scala程序,该程序的功能是读取HDFS中的log日志文件,过滤log文件中的WARN和ERROR的记录,最后把过滤后的记录写入到HDFS中,代码如下:

    import org.apache.spark.{SparkConf, SparkContext}
    
    
    /**
     * Created by Administrator on 2015/8/23.
     */
    object Scala_Test {
      def main(args:Array[String]): Unit ={
        if(args.length!=2){
          System.err.println("Usage:Scala_Test <input> <output>")
        }
        // 初始化SparkConf
        val conf = new SparkConf().setAppName("Scala filter")
        val sc = new SparkContext(conf)
    
        //  读入数据
        val lines = sc.textFile(args(0))
    
        // 转换
        val errorsRDD = lines.filter(line => line.contains("ERROR"))
        val warningsRDD = lines.filter(line => line.contains("WARN"))
        val  badLinesRDD = errorsRDD.union(warningsRDD)
    
        // 写入数据
        badLinesRDD.saveAsTextFile(args(1))
    
        // 关闭SparkConf
        sc.stop()
      }
    }
    
    使用IntelliJ IDEA 并打成jar包备用(lz这里命名为spark_filter.jar);

    2.  java调用spark_filter.jar中的Scala_Test 文件,并采用Spark standAlone模式,java代码如下:

    package test;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.spark.deploy.SparkSubmit;
    /**
     * @author fansy
     *
     */
    public class SubmitScalaJobToSpark {
    
    	public static void main(String[] args) {
    		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); 
    		String filename = dateFormat.format(new Date());
    		String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
    		tmp =tmp.substring(0, tmp.length()-8);
    		String[] arg0=new String[]{
    				"--master","spark://node101:7077",
    				"--deploy-mode","client",
    				"--name","test java submit job to spark",
    				"--class","Scala_Test",
    				"--executor-memory","1G",
    //				"spark_filter.jar",
    				tmp+"lib/spark_filter.jar",//
    				"hdfs://node101:8020/user/root/log.txt",
    				"hdfs://node101:8020/user/root/badLines_spark_"+filename
    		};
    		
    		SparkSubmit.main(arg0);
    	}
    }
    

    具体操作,使用MyEclipse新建java web工程,把spark_filter.jar 以及spark-assembly-1.4.1-hadoop2.6.0.jar(该文件在Spark压缩文件的lib目录中,同时该文件较大,拷贝需要一定时间) 拷贝到WebRoot/WEB-INF/lib目录。(注意:这里可以直接建立java web项目,在测试java调用时,直接运行java代码即可,在测试web项目时,开启tomcat即可)

    java调用spark_filter.jar中的Scala_Test 文件,并采用Yarn模式。采用Yarn模式,不能使用简单的修改master为“yarn-client”或“yarn-cluster”,在使用Spark-shell或者spark-submit的时候,使用这个,同时配置HADOOP_CONF_DIR路径是可以的,但是在这里,读取不到HADOOP的配置,所以这里采用其他方式,使用yarn.Clent提交的方式,java代码如下:

    package test;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.spark.SparkConf;
    import org.apache.spark.deploy.yarn.Client;
    import org.apache.spark.deploy.yarn.ClientArguments;
    
    public class SubmitScalaJobToYarn {
    
    	public static void main(String[] args) {
    		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); 
    		String filename = dateFormat.format(new Date());
    		String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
    		tmp =tmp.substring(0, tmp.length()-8);
    		String[] arg0=new String[]{
    				"--name","test java submit job to yarn",
    				"--class","Scala_Test",
    				"--executor-memory","1G",
    //				"WebRoot/WEB-INF/lib/spark_filter.jar",//
    				"--jar",tmp+"lib/spark_filter.jar",//
    				
    				"--arg","hdfs://node101:8020/user/root/log.txt",
    				"--arg","hdfs://node101:8020/user/root/badLines_yarn_"+filename,
    				"--addJars","hdfs://node101:8020/user/root/servlet-api.jar",//
    				"--archives","hdfs://node101:8020/user/root/servlet-api.jar"//
    		};
    		
    //		SparkSubmit.main(arg0);
    		Configuration conf = new Configuration();
    		String os = System.getProperty("os.name");
    		boolean cross_platform =false;
    		if(os.contains("Windows")){
    			cross_platform = true;
    		}
    		conf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平台提交任务
    		conf.set("fs.defaultFS", "hdfs://node101:8020");// 指定namenode
    		conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
    		conf.set("yarn.resourcemanager.address","node101:8032"); // 指定resourcemanager
    		conf.set("yarn.resourcemanager.scheduler.address", "node101:8030");// 指定资源分配器
    		conf.set("mapreduce.jobhistory.address","node101:10020");
    		
    		 System.setProperty("SPARK_YARN_MODE", "true");
    
    		 SparkConf sparkConf = new SparkConf();
    		 ClientArguments cArgs = new ClientArguments(arg0, sparkConf);
    		
    		new Client(cArgs,conf,sparkConf).run();
    	}
    }
    
    3. java web测试 任务提交到Spark的两种模式,这里采用最简单的方式,直接配置servlet,其web.xml文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <web-app version="3.0"
        xmlns="http://java.sun.com/xml/ns/javaee"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">
      <servlet>
        <description>This is the description of my J2EE component</description>
        <display-name>This is the display name of my J2EE component</display-name>
        <servlet-name>SparkServlet</servlet-name>
        <servlet-class>servlet.SparkServlet</servlet-class>
      </servlet>
      <servlet>
        <description>This is the description of my J2EE component</description>
        <display-name>This is the display name of my J2EE component</display-name>
        <servlet-name>YarnServlet</servlet-name>
        <servlet-class>servlet.YarnServlet</servlet-class>
      </servlet>
    
    
      <servlet-mapping>
        <servlet-name>SparkServlet</servlet-name>
        <url-pattern>/servlet/SparkServlet</url-pattern>
      </servlet-mapping>
      <servlet-mapping>
        <servlet-name>YarnServlet</servlet-name>
        <url-pattern>/servlet/YarnServlet</url-pattern>
      </servlet-mapping>
    
    </web-app>

    SparkServlet如下:

    package servlet;
    
    import java.io.IOException;
    import java.io.PrintWriter;
    
    import javax.servlet.ServletException;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import test.SubmitScalaJobToSpark;
    
    public class SparkServlet extends HttpServlet {
    
    	/**
    	 * Constructor of the object.
    	 */
    	public SparkServlet() {
    		super();
    	}
    
    	/**
    	 * Destruction of the servlet. <br>
    	 */
    	public void destroy() {
    		super.destroy(); // Just puts "destroy" string in log
    		// Put your code here
    	}
    
    	/**
    	 * The doGet method of the servlet. <br>
    	 *
    	 * This method is called when a form has its tag value method equals to get.
    	 * 
    	 * @param request the request send by the client to the server
    	 * @param response the response send by the server to the client
    	 * @throws ServletException if an error occurred
    	 * @throws IOException if an error occurred
    	 */
    	public void doGet(HttpServletRequest request, HttpServletResponse response)
    			throws ServletException, IOException {
    
    		this.doPost(request, response);
    	}
    
    	/**
    	 * The doPost method of the servlet. <br>
    	 *
    	 * This method is called when a form has its tag value method equals to post.
    	 * 
    	 * @param request the request send by the client to the server
    	 * @param response the response send by the server to the client
    	 * @throws ServletException if an error occurred
    	 * @throws IOException if an error occurred
    	 */
    	public void doPost(HttpServletRequest request, HttpServletResponse response)
    			throws ServletException, IOException {
    		System.out.println("开始SubmitScalaJobToSpark调用......");
    		SubmitScalaJobToSpark.main(null);
    		//YarnServlet也只是这里不同
    		System.out.println("完成SubmitScalaJobToSpark调用!");
    		response.setContentType("text/html");
    		PrintWriter out = response.getWriter();
    		out.println("<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">");
    		out.println("<HTML>");
    		out.println("  <HEAD><TITLE>A Servlet</TITLE></HEAD>");
    		out.println("  <BODY>");
    		out.print("    This is ");
    		out.print(this.getClass());
    		out.println(", using the POST method");
    		out.println("  </BODY>");
    		out.println("</HTML>");
    		out.flush();
    		out.close();
    	}
    
    	/**
    	 * Initialization of the servlet. <br>
    	 *
    	 * @throws ServletException if an error occurs
    	 */
    	public void init() throws ServletException {
    		// Put your code here
    	}
    
    }
    

    这里只是调用了java编写的任务调用类而已。同时,SparServlet和YarnServlet也只是在调用的地方不同而已。

    在web测试时,首先直接在MyEclipse上测试,然后拷贝工程WebRoot到centos7,再次运行tomcat,进行测试。

    3. 总结及问题

    1. 测试结果:

       1> java代码直接提交任务到Spark和Yarn,进行日志文件的过滤,测试是成功运行的。可以在Yarn和Spark的监控中看到相关信息:



    同时,在HDFS可以看到输出的文件:


    2> java web 提交任务到Spark和Yarn,首先需要把spark-assembly-1.4.1-hadoop2.6.0.jar中的javax.servlet文件夹删掉,因为会和tomcat的servlet-api.jar冲突。

        a. 在windows和linux上启动tomcat,提交任务到Spark standAlone,测试成功运行;

        b. 在windows和linux上启动tomcat,提交任务到Yarn,测试失败;

    2. 遇到的问题:

        1> java web 提交任务到Yarn,会失败,失败的主要日志如下:

    15/08/25 11:35:48 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
    java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
    这个是因为javax.servlet的包被删掉了,和tomcat的冲突。

    同时,在日志中还可以看到:

    15/08/26 12:39:27 INFO Client: Setting up container launch context for our AM
    15/08/26 12:39:27 INFO Client: Preparing resources for our AM container
    15/08/26 12:39:27 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark-assembly-1.4.1-hadoop2.6.0.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark-assembly-1.4.1-hadoop2.6.0.jar
    15/08/26 12:39:32 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark_filter.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark_filter.jar
    15/08/26 12:39:33 INFO Client: Uploading resource file:/C:/Users/Administrator/AppData/Local/Temp/spark-46820caf-06e0-4c51-a479-3bb35666573f/__hadoop_conf__5465819424276830228.zip -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/__hadoop_conf__5465819424276830228.zip
    15/08/26 12:39:33 INFO Client: Source and destination file systems are the same. Not copying hdfs://node101:8020/user/root/servlet-api.jar
    15/08/26 12:39:33 WARN Client: Resource hdfs://node101:8020/user/root/servlet-api.jar added multiple times to distributed cache.
    

    这里在环境初始化的时候,上传了两个jar,一个就是spark-assembly-1.4.1-hadoop2.6.0.jar 还有一个就是我们自定义的jar。上传的spark-assembly-1.4.1-hadoop2.6.0.jar 里面没有javax.servlet的文件夹,所以会报错。在java中直接调用(没有删除javax.servlet的时候)同样会看到这样的日志,同样的上传,那时是可以的,也就是说这里确实是删除了包文件夹的关系。那么如何修复呢?

    上传servlet-api到hdfs,同时在使用yarn.Client提交任务的时候,添加相关的参数,这里查看参数,发现两个比较相关的参数,--addJars以及--archive 参数,把这两个参数都添加后,看到日志中确实把这个jar包作为了job的共享文件,但是java web提交任务到yarn 还是报这个类找不到的错误。所以这个办法也是行不通!

    使用yarn.Client提交任务到Yarn参考http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/ 。



    分享,成长,快乐

    脚踏实地,专注

    转载请注明blog地址:http://blog.csdn.net/fansy1990




沪ICP备19023445号-2号
友情链接