上一篇文章中,我们讲了如何在将 offset 保存在 zk 中,以及进行重用,但是程序中有个小问题“如果程序停了很长很长一段后再启动,zk 中保存的 offset 已经过期了,那会怎样呢?”本文将解决这个问题
如果 kafka 上的 offset 已经过期,那么就会报 OffsetOutOfRange 的异常,因为之前保存在 zk 的 offset 已经 topic 中找不到了。所以我们需要在 从 zk 找到 offset 的这种情况下增加一个判断条件,如果 zk 中保存的 offset 小于当前 kafka topic 中最小的 offset,则设置为 kafka topic 中最小的 offset。假设我们上次保存在 zk 中的 offset 值为 123(某一个 partition),然后程序停了一周,现在 kafka topic 的最小 offset 变成了 200,那么用前文的代码,就会得到 OffsetOutOfRange 的异常,因为 123 对应的数据已经找不到了。下面我们给出,如何获取 <topic, parition> 的最小 offset,这样我们就可以进行对比了
val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
val tp = TopicAndPartition(topic, i)
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
val consumerMin = new SimpleConsumer("broker_host", 9092, 10000, 10000, "getMinOffset") //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
if (curOffsets.length > 0 && nextOffset < curOffsets.head) { // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
nextOffset = curOffsets.head
}
fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的现象
val topic_name = "topic_name" //topic_name 表示我们希望获取的 topic 名字
val topic2 = List(topic_name)
val req = new TopicMetadataRequest(topic2, 0)
val getLeaderConsumer = new SimpleConsumer("broker_host", 9092, 10000, 10000, "OffsetLookup") // 第一个参数是 kafka broker 的host,第二个是 port
val res = getLeaderConsumer.send(req)
val topicMetaOption = res.topicsMetadata.headOption
val partitions = topicMetaOption match {
case Some(tm) =>
tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String] // 将结果转化为 partition -> leader 的映射关系
case None =>
Map[Int, String]()
}
到此,在 spark streaming 中将 kafka 的 offset 保存到 zk,并重用的大部分情况都覆盖到了
您可能也喜欢: | ||||
![]() Spark Streaming 中使用 zookeeper 保存 offset 并重用 |
![]() 我的2011和2012 |
![]() 本站文章链接改变 |
![]() gcj_2011前三题 |
![]() select && poll 函数 |
无觅 |