消费组应该算是kafka
中一个比较有特色的设计模式了,而他的重平衡机制也是我们在实际生产使用中,无法避免的一个问题。
Consumer Group
为kafka
提供了可扩展、高容错特性的消费者机制。简单介绍下,大致有以下特点:
Consumer Group
内可以有多个Consumer
实例,该实例可以是一个进程,也可以是进程下的多线程Consumer Group
有一个唯一标识的Group ID
Consumer Group
之间相互独立,互不影响Consumer Group
内实例,与订阅的topic
分区关系是一对一,或一对多的关系,Consumer Group
会通过Coordinator
尽量保持公平分配理想情况下,我们应该设置Consumer
实例的数量等于该Group
订阅topic
的分区总数,可以最大发挥消费性能。若设置的Consumer
实例数少于订阅的分区数,则会为每个Consumer
实例分配多个分区,消费性能会有所下降。若设置的Consumer
实例数大于订阅的分区数,则会为每个Consumer
实例分配 1 个分区进行消费,多余的Consumer
实例则会闲置,只会浪费资源。
重平衡(Rebalance
)就是让一个Consumer Group
下所有的Consumer
实例,合理分配消费订阅topic
的所有分区的过程。有 3 种情况会触发Consumer Group
的Rebalance
:
Group
下实例数发生变化。有新的Consumer
实例加入或者离开组。topic
数发生变化。Consumer Group
可以使用正则的方式订阅topic
,比如 consumer.subscribe(Pattern.compile(“public.*log”))
,该Group
订阅所有以 public 开头,log 结尾的topic
。这期间,新建了一个满足这样条件的topic
,那么该Group
也会发生Rebalance
。topic
分区数发生变化。比如topic
扩分区的时候,也会触发Rebalance
。单看上面任一触发条件,都没啥毛病。问题在于Rebalance
过程中会出现以下问题:
Rebalance
过程的表现有些类似JVM FGC
的情况,期间整个应用都会夯住,所有Consumer
实例都会停止消费,等待Rebalance
完成。Rebalance
过程中,所有Consumer
实例都会参与重新分配。即便Consumer Group
中部分Consumer
实例分配合理,也需要打散重新分配,会导致TCP
重新建立连接,是一个比较重的操作,较为浪费资源。Rebalance
的耗时取决于Consumer Group
下的实例数量,一旦实例数过多,耗时极长,会造成大量消费延迟。对于上述Rebalance
带来的一些弊端,从目前的社区版来看,暂时还没有很好的解决办法,我们只能尽量避免Rebalance
的发生。
在生产业务场景中,很多Rebalance
都是预期外或者不必要的。我们应用的TPS
大多是被这类Rebalance
拖慢的。
从上述的 3 个Rebalance
触发条件抓手,后两条topic
数量及分区数变化,一般都是主动运维的相关操作,这种操作带来的Rebalance
一般是必然发生,难以避免的,我们组要来讨论下Consumer Group
组成员变化引发的Rebalance
。
Consumer Group
实例增加的情况比较单一,当新启动一个Consumer
的group.id
已经存在,Coordinator
会接管这个新实例,将其加入group.id
相同的组,并重分配分区。这种操作场景,一般都还是预期内的,可能是通过扩容来提高TPS
的操作。Consumer Group
实例数减少的情况就比较复杂了。除了正常停止下线某些Consumer
实例,还会出现Coordinator
误判实例为已停止状态,从而主动踢出Group
。导致Rebalance
发生。每个Consumer
会定期向Coordinator
发心跳包,保持keepalive
。如果因为某些特殊原因,如网络抖动时,某个Consumer
实例没有及时发送心跳请求,Coordinator
会将其判定为离线,并从Group
中移除,并开启新一轮Rebalance
。针对这个问题,可以通过设置Consumer
端一下几个参数来进行优化调整:
Consumer Group
内实例的心跳超时时间,默认值是 10sRebalance
,默认值为 3spoll
方法的时间间隔,默认值为 5min。期间没消费完poll
回的消息,Coordinator
会开启新一轮Rebalance
根据平时的实践经验,建议:session.timeout.ms=6s
heartbeat.interval.ms=2s
原则上最好是满足session.timeout.ms >= 3 * heartbeat.interval.ms
公式。
max.poll.interval.ms
则需要根据下游实际消费能力进行调整,尽量设置的大一点,需要大于下游的最大消息处理时间。
如果进行完上述的各种调整后,还是频发触发Rebalance
,最好再去排查下Consumer
端的 GC 情况,实际生产环境中我经常碰到因为 GC 设置问题导致的Consumer
程序频发 FGC 的问题,从而导致非预期内的Rebalance
。