Spark Streaming 作业在运行过程中,上游 topic 增加 partition 数目从 A 增加到 B,会造成作业丢失数据,因为该作业只从 topic 中读取了原来的 A 个 partition 的数据,新增的 B-A 个 partition 的数据会被忽略掉。
为了作业能够长时间的运行,一开始遇到这种情况的时候,想到两种方案:
方案 1 是简单直接,第一反应的结果,但是效果不好,需要用户人工介入,而且可能需要删除 checkpoint 文件
方案 2 从根本上解决问题,用户不需要关心上游 partition 数目的变化,但是第一眼会觉得较难实现。
方案 1 很快被 pass 掉,因为人工介入的成本太高,而且实现起来很别扭。接下来考虑方案 2.
Spark Streaming 程序中使用 Kafka 的最原始方式为 KafkaUtils.createDirectStream
通过源码,我们找到调用链条大致是这样的
KafkaUtils.createDirectStream
-> new DirectKafkaInputDStream
-> 最终由 DirectKafkaInputDStream#compute(validTime : Time)
函数来生成 KafkaRDD。
而 KafkaRDD 的 partition 数和 作业开始运行时 topic 的 partition 数一致,topic 的 partition 数保存在 currentOffsets 变量中,currentOffsets 是一个 Map[TopicAndPartition, Long]类型的变量,保存每个 partition 当前消费的 offset 值,但是作业运行过程中 currentOffsets 不会增加 key,就是是不会增加 partition,这样导致每次生成 KafkaRDD 的时候都使用 开始运行作业时 topic 的 partition 数作为 KafkaRDD 的 partition 数,从而会造成数据的丢失。
我们只需要在每次生成 KafkaRDD 的时候,将 currentOffsets 修正为正常的值(往里面增加对应的 partition 数,总共 B-A 个,以及每个增加的 partition 的当前 offset 从零开始)。
private[streaming]
修饰的。package org.apache.spark.streaming
进行修饰即可,这样可以绕过不能继承 DirectKafkaInputDStream 的问题。这个问题解决后,我们还需要修改 Object KafkaUtils
,让该 Object 内部调用我们修改后的 DirectKafkaInputDStream(我命名为 MTDirectKafkaInputDStream)package org.apache.spark.streaming
进行修饰总结下,我们需要做两件事
package org.apache.spark.streaming.kafka.mt import com.meituan.data.util.Constants import com.meituan.service.inf.kms.client.Kms import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.javaapi.{TopicMetadata, TopicMetadataRequest} import kafka.javaapi.consumer.SimpleConsumer import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.kafka.{DirectKafkaInputDStream, KafkaRDD} import scala.collection.JavaConverters._ import scala.util.control.Breaks._ import scala.reflect.ClassTag /** * Created by qiucongxian on 10/27/16. */ class MTDirectKafkaInputDStream[ K: ClassTag, V: ClassTag, U <: Decoder[K]: ClassTag, T <: Decoder[V]: ClassTag, R: ClassTag]( @transient ssc_ : StreamingContext, val MTkafkaParams: Map[String, String], val MTfromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ) extends DirectKafkaInputDStream[K, V, U, T, R](ssc_, MTkafkaParams , MTfromOffsets, messageHandler) { private val kafkaBrokerList : String = "host1:port1,host2:port2,host3:port3" //根据自己的情况自行修改 override def compute(validTime: Time) : Option[KafkaRDD[K, V, U, T, R]] = { /** * 在这更新 currentOffsets 从而做到自适应上游 partition 数目变化 */ updateCurrentOffsetForKafkaPartitionChange() super.compute(validTime) } private def updateCurrentOffsetForKafkaPartitionChange() : Unit = { val topic = currentOffsets.head._1.topic val nextPartitions : Int = getTopicMeta(topic) match { case Some(x) => x.partitionsMetadata.size() case _ => 0 } val currPartitions = currentOffsets.keySet.size if (nextPartitions > currPartitions) { var i = currPartitions while (i < nextPartitions) { currentOffsets = currentOffsets + (TopicAndPartition(topic, i) -> 0) i = i + 1 } } logInfo(s"######### ${nextPartitions} currentParttions ${currentOffsets.keySet.size} ########") } private def getTopicMeta(topic: String) : Option[TopicMetadata] = { var metaData : Option[TopicMetadata] = None var consumer : Option[SimpleConsumer] = None val topics = List[String](topic) val brokerList = kafkaBrokerList.split(",") brokerList.foreach( item => { val hostPort = item.split(":") try { breakable { for (i <- 0 to 3) { consumer = Some(new SimpleConsumer(host = hostPort(0), port = hostPort(1).toInt, soTimeout = 10000, bufferSize = 64 * 1024, clientId = "leaderLookup")) val req : TopicMetadataRequest = new TopicMetadataRequest(topics.asJava) val resp = consumer.get.send(req) metaData = Some(resp.topicsMetadata.get(0)) if (metaData.get.errorCode == ErrorMapping.NoError) break() } } } catch { case e => logInfo(s" ###### Error in MTDirectKafkaInputDStream ${e} ######") } } ) metaData } }
在修改过后的 KafkaUtils 文件中,将所有的 DirectKafkaInputDStream
都替换为 MTDirectKafkaInputDStream
即可