IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Spark Streaming 从指定时间戳开始消费 kafka 数据

    klion26发表于 2016-12-02 11:27:49
    love 0

    需求

    从指定时间戳(比如 2 小时)开始消费 Kafka 数据

    思路

    我们知道通过 Kafka 的 API 可以得到指定时间戳对应数据所在的 segment 的起始 offset。那么就可以通过这个功能来粗略的实现需求。

    实现

    我们知道 KafkaUitls.createDirectStream 这个接口可以指定起始点的 offset,那么我们需要做的就变成如下三步:

    1. 获取 topic 对应的 TopicAndPartitions,得到当前 topic 有多少 partition
    2. 从 Kafka 获取每个 partition 指定时间戳所在 segment 的起始 offset
    3. 将步骤 2 中的 offset 作为参数传入 createDirectStream 即可

    通过查看源码,我们知道步骤 1 和步骤 2 中的功能在 org.apache.spark.streaming.kafka.KafkaCluster 中都已经有现成的函数了:getPartitions 和 getLeaderOffsets,分别表示获取指定 topic 的 partition 以及获取 partition 指定时间戳所在的 segment 的起始 offset,那么我们需要做的就是如何调用这两个函数实现我们的功能了。

    我们知道 KafkaCluster 的作用域是 private[spark] 所以我们需要在自己的代码中使用 package org.apache.spark(.xxx ... .yyy)(小括号中表示可以省略)来限定自己的代码,因此我们可以将步骤 1 和步骤 2 中的功能实现如下:

    package org.apache.spark.streaming.kafka
    ......      //省略其他不相关的代码
    
    def getPartitions(kafkaParams: Map[String, String], topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
            val kc = new KafkaCluster(kafkaParams)
            kc.getPartitions(topics)    //我们可以在这里处理错误,也可以将错误继续往上传递
        }
    
        def getLeaderOffsets(kafkaParams: Map[String, String], topicAndPartitions: Set[TopicAndPartition], before: Long) : Map[TopicAndPartition, Long]  = {
            val kc = new KafkaCluster(kafkaParams)
            val leaderOffsets = kc.getLeaderOffsets(topicAndPartitions, before)
            if (leaderOffsets.isLeft) {  //在本函数内部处理错误,如果有错误抛出异常
                throw new RuntimeException(s"### Exception when MTKafkaUtils#getLeaderOffsets ${leaderOffsets.left.get} ###")
            }
    
            leaderOffsets.right.get.map { case (k, v) => (k, v.offset)}  //将 Map[TopicAndPartition, LeaderOffset] 转变为 Map[TopicAndPartition, Long](Long 为对应 partition 的 offset,从 LeaderOffset 中获取)
        }

    步骤 3 直接传入参数即可,就可以从指定时间戳开始消费 Kafka 数据了

    您可能也喜欢:

    Spark Streaming 往 HDFS 写文件,自定义文件名

    Spark Streaming 自适应上游 kafka topic partition 数目变化

    Spark Streaming 从 Kafka 读取 binlog 转换成 Json

    Spark Streaming 中使用 zookeeper 保存 offset 并重用(二)

    Spark Streaming 中使用 zookeeper 保存 offset 并重用
    无觅


沪ICP备19023445号-2号
友情链接