本文分享两种解决object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable)的方法。现在整理处理,分享给需要的兄弟们。
在使用Spark操作Hbase的时候,其返回的数据类型是RDD[ImmutableBytesWritable,Result],我们可能会对这个结果进行其他的操作,比如join等,但是因为org.apache.hadoop.hbase.io.ImmutableBytesWritable 和 org.apache.hadoop.hbase.client.Result 并没有实现 java.io.Serializable 接口,程序在运行的过程中可能发生以下的异常:
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0}))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 10); not retrying
17/03/16 16:07:48 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0}))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 10)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0}))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 10)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
at com.iteblog.HBase2Hive$.main(HBase2Hive.scala:41)
at com.iteblog.HBase2Hive.main(HBase2Hive.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
解决这个问题主要有两种方法。
指定如何序列化ImmutableBytesWritable类
我们可以手动设置如何序列化ImmutableBytesWritable类,实现如下:
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
这样就会使用 org.apache.spark.serializer.KryoSerializer 来序列化 ImmutableBytesWritable,是的 ImmutableBytesWritable 类可以在网络上传送。
将ImmutableBytesWritable转换成其他可序列化的对象
这种方法就是从ImmutableBytesWritable 对象中抽取我们需要的数据,然后将它存储在其他可序列化的对象中,如下:
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
hBaseRDD.map{item =>
val immutableBytesWritable = item._1
Bytes.toString(immutableBytesWritable.get())
}
上面实例从 ImmutableBytesWritable 对象中抽取我们要的数据并转换成String,而 String 是实现 java.io.Serializable 接口的类,所以也可以解决上面的问题。