rebalance 流程
在了解static membership优化之前,先简单学习rebalance流程(kafka 2.3以前)。 这篇文章对理解rebalance流程很有帮助,推荐阅读:Apache Kafka Rebalance Protocol, or the magic behind your streams applications。
下面图片都是来自这篇文章。
JoinGroup
当一个consumer启动,通过向kafka broker coordinator发送FindCoordinator请求找到group coordinator,然后发送JoinGroup。

注意请求带上了session.timeout.ms和max.poll.interval.ms,协助coordinator踢出超时的client。
JoinGroup请求,使得coordinator进入屏障,在min(rebalance timeout, group.initial.rebalance.delay.ms)时间内,等待收集其他consumer发送的JoinGroup请求。
group内第一个consumer被选作group leader。coordinator向leader发送JoinGroup响应,包含当前活跃成员列表。其他成员收到空响应。
group leader负责partition分配。

SyncGroup
所有成员向coordinator发送SyncGroup请求。
其中,group leader的SyncGroup请求包含了partition分配结果。
其他成员的SyncGroup请求是空请求。

coordinator响应所有的SyncGroup请求。每个consumer收到响应后,知道自己分配到的partition,监听分区并且拉取消息。

Heartbeat
consumer后台线程发送心跳:heartbeat.interval.ms。
在rebalance阶段,coordinator收到心跳信息,则认为这个consumer需要rejoin。

coordinator通知其他consumer,在下一个heartbeat周期,进行JoinGroup、SyncGroup操作。

在整个rebalance阶段,在重新分区之前,consumer都不会再处理任何消息。
默认的rebalance timeout (max.poll.interval.ms)为5min,是非常长的,会导致产生很大的consumer-lag。
consumer滚动更新的问题
Transient failures are those that occur temporarily and for a short period of time.
并非所有的consumer异常都需要触发rebalance。它们可能稍后就会重新加入group,比如滚动升级。
而当一个新的成员加入group,请求里面不包含任何membership信息。Coordinator分配一个UUID作为此成员id,并且缓存起来。这个consumer后续的生命周期内,都会使用这个member id。那么这个consumer rejoin的时候不会触发rebalance。
但是,一个刚刚重启的consumer,本地内存里面没有member id或者generation id。因此它重新加入group,会触发rebalance。GroupCoordinator也不保证原来这个member处理的分区会被重新分配回去。
于是问题变为:如何在重启之后,还能正常识别consumer。

KIP-345 static membership
为了解决这个问题,KIP-345增加static membership特性:增加group.instance.id选项(client端)。Group instance id是用户指定的、区分不同client的标识。
现在GroupCoordinator识别一个consumer,可以通过
- coordinator-assigned member ID (client重启后丢失)
group.instance.id(client重启后不丢失)
有了static membership之后,触发consumer group rebalace的条件:
- A new member joins
- A leader rejoins (possibly due to topic assignment change)
- An existing member offline time is over session timeout
- Broker receives a leave group request containing a list of
group.instance.ids
为了使用static membership配置,需要server和client都升级到kafka 2.3版本。
开启static membership之后,还要考虑session.timeout.ms是否足够大。
When using static membership, it’s recommended to increase the consumer property
session.timeout.mslarge enough so that the broker coordinator will not trigger rebalance too frequently.