企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 位移提交 aka. Consumer offset * Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程称为`提交位移(Committing Offsets)` * Consumer 需要为分配给它的每个分区提交各自的位移数据 * 位移提交的语义保障是 Consumer 端负责的,Kafka 只会无脑接受 * 从开发者角度,位移提交分为自动提交 & 手动提交 * 从 Consumer 端的角度,位移提交分为同步提交 & 异步提交 ## 自动提交 * Kafka Consumer 后台提交 * 开启自动提交:`enable.auto.commit=true` * 配置自动提交间隔:Consumer 端:`auto.commit.interval.ms`,默认 5s ``` Java Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "2000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } ``` * 自动提交位移的顺序 * 配置 enable.auto.commit = true * Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息 * 因此 poll 方法是先提交上一批消息的 offset,再处理下一批消息 * 因此自动提交不会出现消费丢失,但会`重复消费` * 重复消费举例 * Consumer 每 5s 提交 offset * 假设提交 offset 后的 3s 发生了 Rebalance * Rebalance 之后的所有 Consumer 从上一次提交的 offset 处继续消费 * 因此 Rebalance 发生前 3s 的消息会被重复消费 * 这是机制缺陷 ## 手动提交 * 使用 KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新 offset * 该方法为同步操作,等待直到 offset 被成功提交才返回 * 示例如下 ``` Java while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 处理提交失败异常 } } ``` * 因此 commitSync 在处理完所有消息之后 * 手动提交优势 * 能够把控 offset 的时机和频率 * 手动提交缺陷 * 调用 commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果 * 会影响 TPS * 可以选择拉长提交间隔,但有以下问题 * 会导致 Consumer 的提交频率下降 * Consumer 重启后,会有`更多`的消息被消费 ## 同步提交 * commitSync ## 异步提交 * 鉴于手动同步提交的问题,Kafka 提供另一个 API * KafkaConsumer#commitAsync() * 该 API 是异步的,提供 callback ``` Java while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); }); } ``` * commitAsync 的问题 * 出现问题不会自动重试 * 因为异步重试导致 offset 会过期,重试是没有意义的 ## 最佳实践 * 手动提交组合 commitSync & commitAsync * 原因 * 利用 commitSync 的自动重试规避瞬时错误,e.g. 网络抖动、Broker GC * 不想总处于阻塞状态影响 TPS * 最佳实践 Code ``` Java try { while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 commitAysnc(); // 使用异步提交规避阻塞 } } catch(Exception e) { handle(e); // 处理异常 } finally { try { consumer.commitSync(); // 最后一次提交使用同步阻塞式提交 } finally { consumer.close(); } } ``` ## 更精细管理 offset * KafkaConsumerAPI 有一组更方便的 API * i.e. 直接提交最新一条消息的 offset 如何更加细粒度化地提交 offset? * Background * poll 返回的是 5k 条消息 * 期望一个大事务分成若干小事务提交 * KafkaConsumerAPI 新方法 * commitSync(Map<TopicPartition, OffsetAndMetadata>) * commitAsync(Map<TopicPartition, OffsetAndMetadata>) * TopicPartition i.e. 消费分区 * OffsetAndMetadata i.e. 位移数据 * 该方法能够实现更细粒度更新 offset,而不受限于 poll 的消息数量 ``` private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); int count = 0; …… while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record: records) { process(record); // 处理消息 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1); if(count % 100 == 0) consumer.commitAsync(offsets, null); // 回调处理逻辑是null count++; } } ```