利用Spark进行数据的处理之后,需要将结果返回给客户端,一种办法就是将处理得结果保存起来,客户端去访问这个结果。这种模式比较适合计算量比较大,结果相对比较稳定,实时性不高的情况。另外一种办法就是直接在SparkContext提供服务给客户端,用JavaWeb进行是一种选项,这里采用利用Scala编程,对外提供http服务,返回的结果是Json格式,下面是代码:
import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import org.apache.tools.ant.taskdefs.Java import org.mortbay.jetty.{HttpStatus, Request} import org.mortbay.jetty.{HttpStatus, Request, Server} import org.mortbay.jetty.handler._ import org.apache.spark.sql.hive._ import org.apache.spark.{SparkContext,SparkConf} /** * Created by tsq on 2016-10-04. */ object MainThread extends AbstractHandler { var _server: Server = null override def handle(target: String, request: HttpServletRequest, response: HttpServletResponse, dispatch: Int): Unit = { val url = request.getRequestURI url.substring(url.lastIndexOf("/") + 1, url.length) match { case "getData" => { response.setContentType("text/html;charset=utf-8") response.setStatus(HttpStatus.ORDINAL_200_OK) response.getWriter().println(getData(request)) request.asInstanceOf[Request].setHandled(true) response.getWriter.close() } case "stop" => { response.setContentType("text/html;charset=utf-8") response.setStatus(HttpStatus.ORDINAL_200_OK) response.getWriter().println("已经关闭") request.asInstanceOf[Request].setHandled(true) response.getWriter.close() StopThreadEx() } case _ => { response.setStatus(HttpStatus.ORDINAL_404_Not_Found); request.asInstanceOf[Request].setHandled(true) } } } def StopThreadEx(): Unit = { var theRunable = new StopThread(_server) theRunable.start() } def getData(request: HttpServletRequest): String = { var theParams = request.getParameterMap var theRet = "" var theKeys = theParams.keySet().toArray() for (theKey <- theKeys) { theRet += "," + theKey + "=" + request.getParameter(theKey.toString) } "这是您要的数据:" + theRet } def main(args: Array[String]): Unit = { _server = new Server(9998) _server.setHandler(this) println("监控启动") _server.start() } }
本质上还是利用Java在进行http服务,但这种方式下和Scala结合,会比较方便,适合实时性比较高的。
虽然有很多开源方案更为简单,但对于小规模应用,这种直接的方式有的时候也是不错的选择。