IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Spark Streaming 中使用 zookeeper 保存 offset 并重用(二)

    klion26发表于 2016-07-15 12:58:04
    love 0

    上一篇文章中,我们讲了如何在将 offset 保存在 zk 中,以及进行重用,但是程序中有个小问题“如果程序停了很长很长一段后再启动,zk 中保存的 offset 已经过期了,那会怎样呢?”本文将解决这个问题

    如果 kafka 上的 offset 已经过期,那么就会报 OffsetOutOfRange 的异常,因为之前保存在 zk 的 offset 已经 topic 中找不到了。所以我们需要在 从 zk 找到 offset 的这种情况下增加一个判断条件,如果 zk 中保存的 offset 小于当前 kafka topic 中最小的 offset,则设置为 kafka topic 中最小的 offset。假设我们上次保存在 zk 中的 offset 值为 123(某一个 partition),然后程序停了一周,现在 kafka topic 的最小 offset 变成了 200,那么用前文的代码,就会得到 OffsetOutOfRange 的异常,因为 123 对应的数据已经找不到了。下面我们给出,如何获取 <topic, parition> 的最小 offset,这样我们就可以进行对比了

    val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
    val tp = TopicAndPartition(topic, i)
     
    val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
    val consumerMin = new SimpleConsumer("broker_host", 9092, 10000, 10000, "getMinOffset")  //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
    val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
    var nextOffset = partitionOffset.toLong
    if (curOffsets.length > 0 && nextOffset < curOffsets.head) {  // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
      nextOffset = curOffsets.head
    }
    fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的现象

    但是上面的代码有一定的问题,因为我们从 kafka 上获取 offset 的时候,需要寻找对应的 leader,从 leader 来获取 offset,而不是 broker,不然可能得到的 curOffsets 会是空的(表示获取不到)。下面的代码就是获取不同 partition 的 leader 相关代码
    val topic_name = "topic_name"     //topic_name 表示我们希望获取的 topic 名字
    val topic2 = List(topic_name)       
    val req = new TopicMetadataRequest(topic2, 0)
    val getLeaderConsumer = new SimpleConsumer("broker_host", 9092, 10000, 10000, "OffsetLookup")  // 第一个参数是 kafka broker 的host,第二个是 port
    val res = getLeaderConsumer.send(req)
    val topicMetaOption = res.topicsMetadata.headOption
    val partitions = topicMetaOption match {
      case Some(tm) =>
        tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]  // 将结果转化为 partition -> leader 的映射关系
      case None =>
        Map[Int, String]()
    }

    上面的代码能够得到所有 partition 的 leader 地址,然后将 leader 地址替换掉上面第一份代码中的 broker_list 即可。

    到此,在 spark streaming 中将 kafka 的 offset 保存到 zk,并重用的大部分情况都覆盖到了

     

    您可能也喜欢:

    Spark Streaming 中使用 zookeeper 保存 offset 并重用

    我的2011和2012

    本站文章链接改变

    gcj_2011前三题

    select && poll 函数
    无觅


沪ICP备19023445号-2号
友情链接