# 位移提交
aka. Consumer offset
* Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程称为`提交位移(Committing Offsets)`
* Consumer 需要为分配给它的每个分区提交各自的位移数据
* 位移提交的语义保障是 Consumer 端负责的,Kafka 只会无脑接受
* 从开发者角度,位移提交分为自动提交 & 手动提交
* 从 Consumer 端的角度,位移提交分为同步提交 & 异步提交
## 自动提交
* Kafka Consumer 后台提交
* 开启自动提交:`enable.auto.commit=true`
* 配置自动提交间隔:Consumer 端:`auto.commit.interval.ms`,默认 5s
``` Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
```
* 自动提交位移的顺序
* 配置 enable.auto.commit = true
* Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息
* 因此 poll 方法是先提交上一批消息的 offset,再处理下一批消息
* 因此自动提交不会出现消费丢失,但会`重复消费`
* 重复消费举例
* Consumer 每 5s 提交 offset
* 假设提交 offset 后的 3s 发生了 Rebalance
* Rebalance 之后的所有 Consumer 从上一次提交的 offset 处继续消费
* 因此 Rebalance 发生前 3s 的消息会被重复消费
* 这是机制缺陷
## 手动提交
* 使用 KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新 offset
* 该方法为同步操作,等待直到 offset 被成功提交才返回
* 示例如下
``` Java
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
```
* 因此 commitSync 在处理完所有消息之后
* 手动提交优势
* 能够把控 offset 的时机和频率
* 手动提交缺陷
* 调用 commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
* 会影响 TPS
* 可以选择拉长提交间隔,但有以下问题
* 会导致 Consumer 的提交频率下降
* Consumer 重启后,会有`更多`的消息被消费
## 同步提交
* commitSync
## 异步提交
* 鉴于手动同步提交的问题,Kafka 提供另一个 API
* KafkaConsumer#commitAsync()
* 该 API 是异步的,提供 callback
``` Java
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
```
* commitAsync 的问题
* 出现问题不会自动重试
* 因为异步重试导致 offset 会过期,重试是没有意义的
## 最佳实践
* 手动提交组合 commitSync & commitAsync
* 原因
* 利用 commitSync 的自动重试规避瞬时错误,e.g. 网络抖动、Broker GC
* 不想总处于阻塞状态影响 TPS
* 最佳实践 Code
``` Java
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
```
## 更精细管理 offset
* KafkaConsumerAPI 有一组更方便的 API
* i.e. 直接提交最新一条消息的 offset
如何更加细粒度化地提交 offset?
* Background
* poll 返回的是 5k 条消息
* 期望一个大事务分成若干小事务提交
* KafkaConsumerAPI 新方法
* commitSync(Map<TopicPartition, OffsetAndMetadata>)
* commitAsync(Map<TopicPartition, OffsetAndMetadata>)
* TopicPartition i.e. 消费分区
* OffsetAndMetadata i.e. 位移数据
* 该方法能够实现更细粒度更新 offset,而不受限于 poll 的消息数量
```
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
count++;
}
}
```
- 概览
- 入门
- 1. 消息引擎系统
- 2. Kafka 术语
- 3. 分布式流处理平台
- 4. Kafka “发行版”
- 5. Kafka 版本号
- 基本使用
- 6. 生产集群部署
- 7. 集群参数配置
- 客户端实践与原理
- 9. Consumer 分区机制
- 10. Consumer 压缩算法
- 11. 无消息丢失配置
- 12. 客户端高级功能
- 13. Producer 管理 TCP
- 14. 幂等生产者和事务生产者
- 15. 消费者组
- 16. 位移主题
- 17. 消费者组重平衡(TODO)
- 18. 位移提交
- 19. CommitFailedException
- 20. 多线程开发者实例
- 21. Consumer 管理 TCP
- 22. 消费者组消费进度监控
- Kafka 内核
- 23. 副本机制
- 24. 请求处理
- 25. Rebalance 全流程
- 26. Kafka 控制器
- 27. 高水位和 Leader Epoch
- 管理与监控
- 28. Topic 管理
- 29. Kafka 动态配置
- 30. 重设消费者组位移
- 31. 工具脚本
- 32. KafkaAdminClient
- 33. 认证机制
- 34. 云下授权
- 35. 跨集群备份 MirrorMaker
- 36. 监控 Kafka
- 37. Kafka 监控框架
- 38. 调优 Kafka
- 39. 实时日志流处理平台
- 流处理
- 40. Kafka Streams
- 41. Kafka Streams DSL
- 42. Kafka Streams 金融
- Q&A