IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]sparkSQL1.1入门之六:sparkSQL之基础应用

    book_mmicky发表于 2014-09-10 09:16:16
    love 0
    SparkSQL引入了一种新的RDD——SchemaRDD,SchemaRDD由行对象(row)以及描述行对象中每列数据类型的schema组成;SchemaRDD很象传统数据库中的表。SchemaRDD可以通过RDD、Parquet文件、JSON文件、或者通过使用hiveql查询hive数据来建立。SchemaRDD除了可以和RDD一样操作外,还可以通过registerTempTable注册成临时表,然后通过SQL语句进行操作。
    值得注意的是:
    • Spark1.1使用registerTempTable代替1.0版本的registerAsTable
    • Spark1.1在hiveContext中,hql()将被弃用,sql()将代替hql()来提交查询语句,统一了接口。
    • 使用registerTempTable注册表是一个临时表,生命周期只在所定义的sqlContext或hiveContext实例之中。换而言之,在一个sqlontext(或hiveContext)中registerTempTable的表不能在另一个sqlContext(或hiveContext)中使用。
    另外,spark1.1提供了语法解析器选项spark.sql.dialect,就目前而言,spark1.1提供了两种语法解析器:sql语法解析器和hiveql语法解析器。
    • sqlContext现在只支持sql语法解析器(SQL-92语法)
    • hiveContext现在支持sql语法解析器和hivesql语法解析器,默认为hivesql语法解析器,用户可以通过配置切换成sql语法解析器,来运行hiveql不支持的语法,如select 1。
    切换可以通过下列方式完成:
    • 在sqlContexet中使用setconf配置spark.sql.dialect
    • 在hiveContexet中使用setconf配置spark.sql.dialect
    • 在sql命令中使用 set spark.sql.dialect=value

    sparkSQL1.1对数据的查询分成了2个分支:sqlContext 和 hiveContext。至于两者之间的关系,hiveSQL继承了sqlContext,所以拥有sqlontext的特性之外,还拥有自身的特性(最大的特性就是支持hive,sparkSQL1.1入门之六:sparkSQL之基础应用 - mmicky - mmicky 的博客)。
    下面就sparkSQL的一些基本操作做一演示:
    • sqlContext基础应用
      • RDD
      • parquet文件
      • json文件
    • hiveContext基础应用
    • 混合使用
    • 缓存之使用
    • DSL之使用
    为了方便演示,我们在spark-shell里面进行下列演示,并加以说明。首先,启动spark集群,然后在客户端wyy上启动spark-shell:
    bin/spark-shell --master spark://hadoop1:7077 --executor-memory 3g

    1:sqlContext基础应用
    首先创建sqlContext,并引入sqlContext.createSchemaRDD以完成RDD隐式转换成SchemaRDD:
    val sqlContext=	new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.createSchemaRDD

    1.1:RDD
    Spark1.1.0开始提供了两种方式将RDD转换成SchemaRDD:
    • 通过定义case class,使用反射推断Schema(case class方式)
    • 通过可编程接口,定义Schema,并应用到RDD上(applySchema 方式)
    前者使用简单、代码简洁,适用于已知Schema的源数据上;后者使用较为复杂,但可以在程序运行过程中实行,适用于未知Schema的RDD上。

    1.1.1 case class方式
    对于case class方式,首先要定义case class,在RDD的transform过程中使用case class可以隐式转化成SchemaRDD,然后再使用registerTempTable注册成表。注册成表后就可以在sqlContext对表进行操作,如select 、insert、join等。注意,case class可以是嵌套的,也可以使用类似Sequences 或 Arrays之类复杂的数据类型。
    下面的例子是定义一个符合数据文件/sparksql/people.txt类型的case clase(Person),然后将数据文件读入后隐式转换成SchemaRDD:people,并将people在sqlContext中注册成表rddTable,最后对表进行查询,找出年纪在13-19岁之间的人名。

    /sparksql/people.txt的内容有3行:

    运行下列代码:
    //RDD1演示
    case class Person(name:String,age:Int)
    val rddpeople=sc.textFile("/sparksql/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))
    rddpeople.registerTempTable("rddTable")
    
    sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
    运行结果:

    1.1.2 applySchema 方式
    applySchema 方式比较复杂,通常有3步过程:
    • 从源RDD创建rowRDD
    • 创建与rowRDD匹配的Schema
    • 将Schema通过applySchema应用到rowRDD
    上面的例子通过applySchema 方式实现的代码如下:
    //RDD2演示
    //导入SparkSQL的数据类型和Row
    import org.apache.spark.sql._
    
    //创建于数据结构匹配的schema
    val schemaString = "name age"
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    
    //创建rowRDD
    val rowRDD = sc.textFile("/sparksql/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim))
    //用applySchema将schema应用到rowRDD
    val rddpeople2 = sqlContext.applySchema(rowRDD, schema)
    
    rddpeople2.registerTempTable("rddTable2")
    sqlContext.sql("SELECT name FROM rddTable2 WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
    运行结果:

    1.2:parquet文件
    同样得,sqlContext可以读取parquet文件,由于parquet文件中保留了schema的信息,所以不需要使用case class来隐式转换。sqlContext读入parquet文件后直接转换成SchemaRDD,也可以将SchemaRDD保存成parquet文件格式。

    我们先将上面建立的SchemaRDD:people保存成parquet文件:
    rddpeople.saveAsParquetFile("/sparksql/people.parquet")
    运行后/sparksql/目录下就多出了一个名称为people.parquet的目录:

    然后,将people.parquet读入,注册成表parquetTable,查询年纪大于25岁的人名:
    //parquet演示
    val parquetpeople = sqlContext.parquetFile("/sparksql/people.parquet")
    parquetpeople.registerTempTable("parquetTable")
    
    sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
    运行结果:

    1.3:json文件
    sparkSQL1.1.0开始提供对json文件格式的支持,这意味着开发者可以使用更多的数据源,如鼎鼎大名的NOSQL数据库MongDB等。sqlContext可以从jsonFile或jsonRDD获取schema信息,来构建SchemaRDD,注册成表后就可以使用。
    • jsonFile - 加载JSON文件目录中的数据,文件的每一行是一个JSON对象。
    • jsonRdd - 从现有的RDD加载数据,其中RDD的每个元素包含一个JSON对象的字符串。
    下面的例子读入一个json文件/sparksql/people.json,注册成jsonTable,并查询年纪大于25岁的人名。

    /sparksql/people.json的内容:
    运行下面代码:
    //json演示
    val jsonpeople = sqlContext.jsonFile("/sparksql/people.json")
    jsonpeople.registerTempTable("jsonTable")
    
    sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
    运行结果:


    2:hiveContext基础应用
    使用hiveContext之前首先要确认以下两点:
    • 使用的Spark是支持hive
    • hive的配置文件hive-site.xml已经存在conf目录中
    前者可以查看lib目录下是否存在以datanucleus开头的3个JAR来确定,后者注意是否在hive-site.xml里配置了uris来访问hive metastore。

    要使用hiveContext,需要先构建hiveContext:
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    然后就可以对hive数据进行操作了,下面我们将使用hive中的销售数据(第五小结中的hive数据),首先切换数据库到saledata并查看有几个表:
    hiveContext.sql("use saledata")
    hiveContext.sql("show tables").collect().foreach(println)
    可以看到有在第五小节定义的3个表:

    现在查询一下所有订单中每年的销售单数、销售总额:
    //所有订单中每年的销售单数、销售总额
    //三个表连接后以count(distinct a.ordernumber)计销售单数,sum(b.amount)计销售总额
    hiveContext.sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear order by c.theyear").collect().foreach(println)
    运行结果:

    再做一个稍微复杂点的查询,求出所有订单每年最大金额订单的销售额:
    /************************
    所有订单每年最大金额订单的销售额:
    第一步,先求出每份订单的销售额以其发生时间
    select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber 
    第二步,以第一步的查询作为子表,和表tblDate连接,求出每年最大金额订单的销售额
    select c.theyear,max(d.sumofamount) from tbldate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear
    *************************/
    
    hiveContext.sql("select c.theyear,max(d.sumofamount) from tbldate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear").collect().foreach(println)
    运行结果:

    最后做一个更复杂的查询,求出所有订单中每年最畅销货品:
    /************************
    所有订单中每年最畅销货品:
    第一步:求出每年每个货品的销售金额
    select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid
    
    第二步:求出每年单品销售的最大金额
    select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear
    
    第三步:求出每年与销售额最大相符的货品就是最畅销货品
    select distinct  e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear
    *************************/
    
    hiveContext.sql("select distinct  e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear").collect().foreach(println)
    运行结果:

    3:混合使用
    在sqlContext或hiveContext中,来源于不同数据源的表在各自生命周期中可以混用,但是不同实例之间的表不能混合使用。

    3.1 sqlContext中混合使用:
    //sqlContext中混合使用
    //sqlContext中来自rdd的表rddTable和来自parquet文件的表parquetTable混合使用
    sqlContext.sql("select a.name,a.age,b.age from rddTable a join parquetTable b on a.name=b.name").collect().foreach(println)
    运行结果:

    3.2 hiveContext中混合使用:
    //hiveContext中混合使用
    //创建一个hiveTable,并将数据加载,注意people.txt第二列有空格,所以age取string类型
    hiveContext.sql("CREATE TABLE hiveTable(name string,age string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ")
    hiveContext.sql("LOAD DATA LOCAL INPATH '/home/mmicky/mboo/MyClass/doc/sparkSQL/data/people.txt' INTO TABLE hiveTable")
    
    //创建一个源自parquet文件的表parquetTable2,然后和hiveTable混合使用
    hiveContext.parquetFile("/sparksql/people.parquet").registerAsTable("parquetTable2")
    hiveContext.sql("select a.name,a.age,b.age from hiveTable a join parquetTable2 b on a.name=b.name").collect().foreach(println)
    运行结果:

    4:缓存之使用
    sparkSQL的cache可以使用两种方法来实现:
    • cacheTable()方法
    • CACHE TABLE命令
    千万不要先使用cache SchemaRDD,然后registerAsTable ;使用RDD的cache()将使用原生态的cache,而不是针对SQL优化后的内存列存储。看看cacheTable的源代码:

    在默认的情况下,内存列存储的压缩功能是关闭的,要使用压缩功能需要配置变量COMPRESS_CACHED。

    在sqlContext里可以如下使用cache:
    //sqlContext的cache使用
    sqlContext.cacheTable("rddTable")
    sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
    
    sqlContext.sql("CACHE TABLE parquetTable")
    sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
    观察webUI,可以看到cache的信息。(注意cache是lazy的,要有action才会实现;uncache是eager的,可以立即实现)

    使用如下命令可以取消cache:
    sqlContext.uncacheTable("rddTable")
    sqlContext.sql("UNCACHE TABLE parquetTable")
    同样的,在hiveContext也可以使用上面的方法cache或uncache(hiveContext继承于sqlContext)。

    5:DSL之使用
    sparkSQL除了支持HiveQL和SQL-92语法外,还支持DSL(Domain Specific Language)。在DSL中,使用scala符号'+标示符表示基础表中的列,spark的execution engine会将这些标示符隐式转换成表达式。另外可以在API中找到很多DSL相关的方法,如where()、select()、limit()等等,详细资料可以查看catalyst模块中的dsl子模块,下面为其中定义几种常用方法:

    关于DSL的使用,随便举个例子,结合DSL方法,很容易上手:
    //DSL演示
    val teenagers_dsl = rddpeople.where('age >= 10).where('age <= 19).select('name)
    teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)

    6:Tips
    上面介绍了sparkSQL的基础应用,sparkSQL还在高速发展中,存在者不少缺陷,如:
    • scala2.10.4本身对case class有22列的限制,在使用RDD数据源的时候就会造成不方便;
    • sqlContext中3个表不能同时join,需要两两join后再join一次;
    • sqlContext中不能直接使用values插入数据;
    • 。。。
    总的来说,hiveContext还是令人满意,sqlContext就有些差强人意了。另外,顺便提一句,在编写sqlContext应用程序的时候,case class要定义在object之外。




沪ICP备19023445号-2号
友情链接