IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Spark Streaming Ran out of messages before reaching ending offset 异常

    klion26发表于 2016-12-16 06:23:13
    love 0

    现象

    Spark Streaming 处理数据过程中遇到 Ran out of messages before reaching ending offset 异常,导致程序一直 hang 住(因为我们希望接上次消费从而不丢失数据)

    分析

    通过异常栈信息,我们知道异常从 KafkaRDD.scala#211 行抛出,下面是相应代码

    206 override def getNext(): R = {
          if (iter == null || !iter.hasNext) {
    208        iter = fetchBatch
          }
    210      if (!iter.hasNext) {
    211        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
    212        finished = true
            null.asInstanceOf[R]
          } else {
            val item = iter.next()
            if (item.offset >= part.untilOffset) {
    217          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
              finished = true
              null.asInstanceOf[R]
            } else {
              requestOffset = item.nextOffset
              messageHandler(new MessageAndMetadata(
                part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
            }
          }
    226    }

    通过分析,我们知道这个地方是实际从 Kafka 读取数据的逻辑,首先会调用 fetchBatch 函数(208 行),然后进行逻辑判断,数据是否读取完毕,是否发生异常

    其中 211 行的异常表示还未读取到 part.untilOffset 但是当前迭代器中没有数据了;217 行表示当前读取的数据如果超过了 part.untilOffset ,那么在这个时候退出当前 batch(offset 从 fromOffset 逐次加一增加的,正常的逻辑肯定会和 part.untilOffset 相等)

    我们知道异常从 211 行抛出来的,也知道了异常的最直接原因,那么这个原因是什么造成的呢?

    211 行的代码执行了,也就是 210 行的 if 语句未 true,这样的话,207 行的逻辑也应该为 true。这样的话 iter 就是 fetchBatch 返回的迭代器了。接下来我们看看 fetchBatch 的代码

    188 private def fetchBatch: Iterator[MessageAndOffset] = {
    189      val req = new FetchRequestBuilder()
    190        .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
            .build()
    192      val resp = consumer.fetch(req)
          handleFetchErr(resp)
          // kafka may return a batch that starts before the requested offset
          resp.messageSet(part.topic, part.partition)
    196       .iterator
            .dropWhile(_.offset < requestOffset)
        }

    我们发现 192 行会通过 consumer 从 kafka 获取数据,本次从哪获取数据,以及获取多少分别由 190 行的 topic, partition 和 kc.config.fetchMessageMaxBytes 指定。我们查看 kc.config.fetchMessageMaxBytes,发现默认使用的是 1M

    ConsumerConfig.scala
    29 val FetchSize = 1024 * 1024
    
    114 val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)

    从这里我们知道每次从 kafka 上最多获取 1M 的数据(这也是为什么需要在 KafkaRDD.getNext 函数的开头通过 iter.hasNext() 来判断是否需要调用 fetchBatch

    然后看到 fetchBatch 函数对应的 196 行,获取迭代器作为返回值,查看相应代码,跳转到 ByteBufferMessageSet.scala

    139 override def iterator: Iterator[MessageAndOffset] = internalIterator()
    145 private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
        new IteratorTemplate[MessageAndOffset] {
            ......
    152      def makeNextOuter: MessageAndOffset = {
            // if there isn't at least an offset and size, we are done
            if (topIter.remaining < 12)
              return allDone()
            val offset = topIter.getLong()
            val size = topIter.getInt()
            if(size < Message.MinHeaderSize)
              throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
            
     160       // we have an incomplete message
     161       if(topIter.remaining < size)
     162         return allDone()
           ....
     185     }

    从 161 行我们可以看出,如果读取的消息是一条不完整的,那么本次不处理,默认本次消息读取完成。

    上面所有的链条穿起来就抛出了我们文章开始的异常。

    1. 从 kafka 读取 1M 的数据(默认大小)
    2. 发现读取的数据不完整(这个消息的大小大于 1M),所以本次读取的 迭代器 为空
    3. 发现迭代器为空,但是当前的 offset 和 part.untilOffset 不想等,抛出异常

    解决方案

    通过设置 kafkaParam 的参数 fetch.message.max.bytes 就行了,我们设置成 2M(大于一条数据的最大值即可),就能够运行成功了

    您可能也喜欢:

    Spark Streaming 从指定时间戳开始消费 kafka 数据

    Spark Streaming 中使用 zookeeper 保存 offset 并重用

    Spark Streaming 中使用 zookeeper 保存 offset 并重用(二)

    Spark Streaming 往 HDFS 写文件,自定义文件名

    Spark Streaming 自适应上游 kafka topic partition 数目变化
    无觅


沪ICP备19023445号-2号
友情链接