企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# CommitFailedExceptionn * i.e. Consumer 客户端在提交 offset 时出现了错误或异常,而且是不可恢复的严重异常。 注释 > Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 问题 * 由于开启 Rebalance,本实例将要提交 offset 的分区分配给了其他 Instance * 出现问题的原因:消费者实例连续两次调用 poll 时间间隔过长 * 通常表明,Consumer Instance 花费太长时间进行消息处理 解决办法 * 增加期望的时间间隔 `max.poll.interval.ms` * 减少 poll 方法一次性返回的消息数量,i.e. max.poll.records ## 异常抛出场景 * 手动提交 offset 时,i.e. 显式调用 KafkaConsumer.commitSync() ### 场景一 * 消息处理的总时间超过预设的 `max.poll.interval.ms` ``` … Properties props = new Properties(); … props.put("max.poll.interval.ms", 5000); consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 使用Thread.sleep模拟真实的消息处理逻辑 Thread.sleep(6000L); consumer.commitSync(); } ``` 解决方法 * 简化消息处理逻辑 * 缩短单条消息处理时间 * 增加 Consumer 端允许下游系统消费一批消息的最大时长 * i.e. max.poll.interval.ms * 减少下游系统一次性消费的消息总数 * i.e. max.poll.records * 下游系统使用多线程加速消费 * 多线程间如何处理 offset 提交有难度 ### 场景二 * 如果应用中同时出现了相同 GroupID 的消费者组和 Standalone Consumer * 当独立消费者手动提交 offset 时,会抛出此异常