企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 消费者组消费进度监控 aka. 消费者 Lag(Consumer Lag) * 滞后成都:消费者当前落后于生产者的程度 * Lag 的单位:消息数 * Kafka 监控 Lag 的层级是 Partition * 计算 Topic 级别的 Lag:需要自己汇总 * 如果 Consumer 速度无法匹及 Producer,会导致消费数据不在 OS 的 Page Cache,导致失去 Zero-copy 特性 * 最好的 Lag 应趋近于 0 * 因此,需要时刻关注消费进度 * 监控 Lag 的方法 * Kafka 自带的命令行工具 `kafka-consumer-groups` 脚本 * Kafka Java Consumer API * Kafka 自带的 JMX 监控指标 ## Kafka shell cmd * 能够监控独立消费者(Standalone Consumer) Lag * Standalone Consumer 调用 KafkaConsumer.assign() 直接消费指定 Partition * 查看 Lag ``` $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称> ``` ## Kafka Java Consumer API ``` public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 处理中断异常 // ... return Collections.emptyMap(); } catch (ExecutionException e) { // 处理ExecutionException // ... return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); } } } ``` * 调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移 * 获取订阅分区的最新消息位移 * 执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象 ## Kafka JMX * Kafka 提供了 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标 * records-lag-max * records-lead-min * Lead 值:消费者最新消费消息的 offset 与 Partition 当前第一条消息 offset 的差值 * Lag 越大的话,Lead 就越小,反之同理 * 当 Lead 越来越小,快接近于 0,有可能 Consumer 要丢消息 * 因为 Kafka 的消息有留存时间,默认 1 周 * 如果 Consumer 足够慢到要消费的数据会被 Kafka 删除 * 此时会造成丢消息假象 * Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。 ## 总结 * 生产环境推荐使用 Kafka JMX