企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 消费者组重平衡全流程解析 * 重平衡流程:让组内的所有消费者实例就消费哪些 topic partition 达成一致 * 重平衡需要 Broker 端的 Coordinator 组件 * 以下基于 Kafka V2.3 版本 ## 触发与通知 Rebalance 3 个触发条件 * 组成员数量变化 * 订阅 Topic 数量变化 * 订阅 Topic 的 partition 数量变化 生产环境中常因第一个而 Rebalance。 每次消费者组重启时,必然会 Rebalance。 Q:Rebalance 是如何通知到其他 Consumer Instance? A:依靠 Consumer 端的心跳线程(Heartbeat Thread) * Consumer 需要定期发送心跳到 Broker 的协调者以表明其存活 * Kafka V0.10.1.0 版本之前 * 心跳是在消费者主线程完成的,即调用的 KafkaConsumer.poll 方法的线程 * 问题 * 消息处理逻辑也在这个线程 * 如果消息处理耗时较长,心跳则无法及时发送到协调者 * 0.10.1.0 开始引入单独的心跳线程 * Rebalance 的通知机制依靠心跳线程完成 * 当协调者决定 Rebalance 后,会将 `REBALANCE_IN_PROGRESS` 封装进心跳请求响应中,返回给 Consumer * Consumer 端参数 `heartbeat.interval.ms`:心跳间隔,i.e. 控制重平衡通知频率 ## 消费者组状态机 * Rebalance 开始,协调者组件开始控制 Consumer Group 的状态流转 * Kafka 设计了`消费者组状态机`,帮助协调者完成 Rebalance * Kafka 为消费者组定义了 5 个状态 ![](https://img.kancloud.cn/3c/28/3c281189cfb1d87173bc2d4b8149f38b_529x414.jpeg) ![](https://img.kancloud.cn/f1/6f/f16fbcb798a53c21c3bf1bcd5b72b006_892x343.png) * 一个消费者组最开始是 Empty * 重平衡开启后,会置于 PreparingRebalance 等待成员加入 * 之后变更到 CompletingRebalance 等待分配方案 * 最后流转到 Stable 完成 Rebalance * 当有成员变动时,消费者组状态从 Stable 变为 PreparingRebalance * 此时所有现存成员需要重新申请加入组 * 当所有组成员都退出组后,消费者组状态为 Empty * 消费者组处于 Empty 状态,Kafka 会定期自动删除过期 offset ## 消费者端重平衡流程 * Rebalance 完整流程需要 Consumer & Coordinator 共同完成 * Consumer 端 Rebalance 步骤 * 加入组 * 对应 JoinGroup 请求 * 等待 Leader Consumer 分配方案 * 对应 SyncGroup 请求 * 当组内成员加入组时,Consumer 向协调者发送 JoinGroup 请求 * 每个 Consumer 会上报自己订阅的 topic * Coordinator 收集到所有 JoinGroup 请求后,从这些成员中选择一个担任消费者组的 Leader * 通常第一个发送 JoinGroup 请求的自动成为 Leader * Leader Consumer 的任务是收集所有成员的 topic,根据信息制定具体的 partition consumer 分配方案 * 选出 Leader 后,协调者把所有 topic 信息封装到 JoinGroup Response 中,发送给 Leader * Leader Consumer 做出统一分配方案,进入到 SyncGroup 请求 * Leader Consumer 向协调者发送 SyncGroup,将分配方案发给协调者 * 其他成员也会发出 SyncGroup 请求 * 协调者以 SyncGroup Response 的方式将方案下发给所有成员 ![](https://img.kancloud.cn/e7/d4/e7d40ce1c34d66ec36bfdaaa3ec9611f_1950x780.png) ![](https://img.kancloud.cn/62/52/6252b051450c32c143f03410f6c2b75d_1950x696.png) * 所有成员成功接收到分配方案,消费者组进入 Stable 状态,开始正常消费 ## Broker 端重平衡场景 ## 新成员加入 * 消费者组处于 Stable 之后有新成员加入 ![](https://img.kancloud.cn/62/f8/62f85fb0b0f06989dd5a6f133599ca33_1950x1066.png) ### 组成员主动离开 * 主动离开:Consumer Instance 通过调用 close() 方法通知协调者退出 * 该场景涉及第三个请求:LeaveGroup 请求 ![](https://img.kancloud.cn/86/72/867245cbf6cfd26573aba1816516b26b_1950x1118.png) ### 组成员崩溃离开 * 协调者需要等待一段时间才能感知 * 这个时间段由 Consumer 端参数 `sessionn.timeout.ms` 控制 * Kafka 不会超过上述参数时间感知崩溃 * 处理流程相同 ![](https://img.kancloud.cn/bc/00/bc00d35060e1a4216e177e5b361ad40c_1950x1147.png) ### Rebalance 时组成员提交 offset * Rebalance 开启时,协调者会给予成员一段缓冲时间,要求每个成员在这段时间内快速上报自己的 offset * 再开启正常的 JoinGroup/SyncGroup 请求 ![](https://img.kancloud.cn/83/b7/83b77094d4170b9057cedfed9cdb33be_1950x1144.png)