# 消费者组消费进度监控
aka. 消费者 Lag(Consumer Lag)
* 滞后成都:消费者当前落后于生产者的程度
* Lag 的单位:消息数
* Kafka 监控 Lag 的层级是 Partition
* 计算 Topic 级别的 Lag:需要自己汇总
* 如果 Consumer 速度无法匹及 Producer,会导致消费数据不在 OS 的 Page Cache,导致失去 Zero-copy 特性
* 最好的 Lag 应趋近于 0
* 因此,需要时刻关注消费进度
* 监控 Lag 的方法
* Kafka 自带的命令行工具 `kafka-consumer-groups` 脚本
* Kafka Java Consumer API
* Kafka 自带的 JMX 监控指标
## Kafka shell cmd
* 能够监控独立消费者(Standalone Consumer) Lag
* Standalone Consumer 调用 KafkaConsumer.assign() 直接消费指定 Partition
* 查看 Lag
```
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
```
## Kafka Java Consumer API
```
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
```
* 调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移
* 获取订阅分区的最新消息位移
* 执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象
## Kafka JMX
* Kafka 提供了 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标
* records-lag-max
* records-lead-min
* Lead 值:消费者最新消费消息的 offset 与 Partition 当前第一条消息 offset 的差值
* Lag 越大的话,Lead 就越小,反之同理
* 当 Lead 越来越小,快接近于 0,有可能 Consumer 要丢消息
* 因为 Kafka 的消息有留存时间,默认 1 周
* 如果 Consumer 足够慢到要消费的数据会被 Kafka 删除
* 此时会造成丢消息假象
* Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。
## 总结
* 生产环境推荐使用 Kafka JMX
- 概览
- 入门
- 1. 消息引擎系统
- 2. Kafka 术语
- 3. 分布式流处理平台
- 4. Kafka “发行版”
- 5. Kafka 版本号
- 基本使用
- 6. 生产集群部署
- 7. 集群参数配置
- 客户端实践与原理
- 9. Consumer 分区机制
- 10. Consumer 压缩算法
- 11. 无消息丢失配置
- 12. 客户端高级功能
- 13. Producer 管理 TCP
- 14. 幂等生产者和事务生产者
- 15. 消费者组
- 16. 位移主题
- 17. 消费者组重平衡(TODO)
- 18. 位移提交
- 19. CommitFailedException
- 20. 多线程开发者实例
- 21. Consumer 管理 TCP
- 22. 消费者组消费进度监控
- Kafka 内核
- 23. 副本机制
- 24. 请求处理
- 25. Rebalance 全流程
- 26. Kafka 控制器
- 27. 高水位和 Leader Epoch
- 管理与监控
- 28. Topic 管理
- 29. Kafka 动态配置
- 30. 重设消费者组位移
- 31. 工具脚本
- 32. KafkaAdminClient
- 33. 认证机制
- 34. 云下授权
- 35. 跨集群备份 MirrorMaker
- 36. 监控 Kafka
- 37. Kafka 监控框架
- 38. 调优 Kafka
- 39. 实时日志流处理平台
- 流处理
- 40. Kafka Streams
- 41. Kafka Streams DSL
- 42. Kafka Streams 金融
- Q&A