kafka rebalance系列:static membership优化

kafka 2.3增加group.instance.id配置,支持对client静态化配置id,减少client重启后加入group导致rebalance。

rebalance 流程

在了解static membership优化之前,先简单学习rebalance流程(kafka 2.3以前)。 这篇文章对理解rebalance流程很有帮助,推荐阅读:Apache Kafka Rebalance Protocol, or the magic behind your streams applications

下面图片都是来自这篇文章。

JoinGroup

当一个consumer启动,通过向kafka broker coordinator发送FindCoordinator请求找到group coordinator,然后发送JoinGroup

join-group.png

注意请求带上了session.timeout.msmax.poll.interval.ms,协助coordinator踢出超时的client。

JoinGroup请求,使得coordinator进入屏障,在min(rebalance timeout, group.initial.rebalance.delay.ms)时间内,等待收集其他consumer发送的JoinGroup请求。

group内第一个consumer被选作group leader。coordinator向leader发送JoinGroup响应,包含当前活跃成员列表。其他成员收到空响应。 group leader负责partition分配。

join-group-2.png

SyncGroup

所有成员向coordinator发送SyncGroup请求。 其中,group leader的SyncGroup请求包含了partition分配结果。 其他成员的SyncGroup请求是空请求。

sync-group.png

coordinator响应所有的SyncGroup请求。每个consumer收到响应后,知道自己分配到的partition,监听分区并且拉取消息。

sync-group-2.png

Heartbeat

consumer后台线程发送心跳:heartbeat.interval.ms

在rebalance阶段,coordinator收到心跳信息,则认为这个consumer需要rejoin。

heartbeat.png

coordinator通知其他consumer,在下一个heartbeat周期,进行JoinGroup、SyncGroup操作。

rejoin.png

在整个rebalance阶段,在重新分区之前,consumer都不会再处理任何消息。 默认的rebalance timeout (max.poll.interval.ms)为5min,是非常长的,会导致产生很大的consumer-lag。

consumer滚动更新的问题

Transient failures are those that occur temporarily and for a short period of time.

并非所有的consumer异常都需要触发rebalance。它们可能稍后就会重新加入group,比如滚动升级。

而当一个新的成员加入group,请求里面不包含任何membership信息。Coordinator分配一个UUID作为此成员id,并且缓存起来。这个consumer后续的生命周期内,都会使用这个member id。那么这个consumer rejoin的时候不会触发rebalance。

但是,一个刚刚重启的consumer,本地内存里面没有member id或者generation id。因此它重新加入group,会触发rebalance。GroupCoordinator也不保证原来这个member处理的分区会被重新分配回去。

于是问题变为:如何在重启之后,还能正常识别consumer。

restart.png

KIP-345 static membership

为了解决这个问题,KIP-345增加static membership特性:增加group.instance.id选项(client端)。Group instance id是用户指定的、区分不同client的标识。 现在GroupCoordinator识别一个consumer,可以通过

  • coordinator-assigned member ID (client重启后丢失)
  • group.instance.id (client重启后不丢失)

有了static membership之后,触发consumer group rebalace的条件:

  • A new member joins
  • A leader rejoins (possibly due to topic assignment change)
  • An existing member offline time is over session timeout
  • Broker receives a leave group request containing a list of group.instance.ids

为了使用static membership配置,需要server和client都升级到kafka 2.3版本。

开启static membership之后,还要考虑session.timeout.ms是否足够大。

When using static membership, it’s recommended to increase the consumer property session.timeout.ms large enough so that the broker coordinator will not trigger rebalance too frequently.

参考

Built with Hugo
Theme Stack designed by Jimmy