# 位移主题
* Kafka 中的内部主题(Internal Topic) aka. __consumer_offsets
* __consumer_offsets 在 Kafka 源码中正式名字:位移主题。i.e. Offsets Topic
## 背景
* 老版本
* Consumer 重启后根据 ZK 中 offset 从上次位置继续消费
* 但是 ZK 不适合高频写操作
* 新版本
* 将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中
* 因此该 Topic 的主要作用:保存 Kafka Consumer offset
## 消息格式
* 其消息格式是 Kafka 自定义的,即用户不可修改格式,不可随意写
* 有 Consumer API 可以去写,自定义的 Producer 不能往这个 Topic 去写
* 消息格式就是 KV pair
* 消息格式 Key:<GroupID, Topic, Partition>
* 标识这个 offset 数据是哪个 Consumer 的
* 除了 Consumer Group,Kafka 还支持 Standalone Consumer。运行机制不同,但位移管理相同。
* Consumer offset 在 Partition 层面,因此还需要有 Topic、Partition
* 消息题 Value:保存 offset(简单来看)
* 另外两种消息格式
* 用于保存 Consumer Group 信息的消息,i.e. 目的是注册 Consumer Group 的。
* 用于删除 Group 过期位移甚至是删除 Group 的消息
* i.e. tombstone 消息,墓碑消息,aka. delete mark。
* 消息体是 null
* 何时
* 一旦某个 Consumer Group 下所有的 Consumer Instance 都停止了
* 它们的位移数据已被删除
* Kafka 会向位移主题的对应分区写入 tombstone 消息,表示彻底删除这个 Group 的信息
## 位移主题创建
* 当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题
* Kafka 自动创建,分区数配置根据 Broker 参数 `offsets.topic.num.partitions`,默认是 50,i.e. Kafka 会创建一个 50 分区的位移主题。
* 位移主题的副本数取决于 Broker 参数 `offsets.topic.replication.factor`,默认是 3。
* 可以选择手动创建位移主题
* e.g. 使用 Kafka API 创建
Kafka Consumer 提交位移会写到位移主题,提交方式
* 自动提交位移
* Consumer 端参数 `enable.auto.commit=true`
* Consumer 在后台默默定期提交位移
* 提交间隔控制参数:`auto.commit.innterval.ms`
* Spark、Flink 等与 Kafka 集成的大数据框架是禁用自动提交位移的
* 手动提交位移
* Consumer 端参数:`enable.auto.commit=false`
* Kafka Consumer API 提供了位移提交的方法,e.g. consumer.commitSync
* 自动提交位移的一个问题
* 只要 Consumer 一直启动,就会无限期地向位移主题写入消息
* 极端例子:
* 假设 Consumer 消费到某个 Topic的最新一条消息,位移是 100
* 之后该 Topic 没有产生新消息
* Consumer offset 永远保持在 100
* 由于自动提交 offset,offset topic 会不停写入 offset=100 的消息
* 这要求 Kafka 必须要有这对 offset topic 的消息删除策略,否则会占满存储
* Kafka 删除 offset topic 中过期消息的方法
* 使用 Compaction,aka. 整理(参考 JVM GC)
* Compact 策略中的过期定义
* 同一个 Key 的两条消息 M1、M2,如果 M1 的发送时间早于 M2,则 M1 是过期消息
* Compact:扫描日志的所有消息,删除过期消息,剩下的消息整理。
![](https://img.kancloud.cn/86/a4/86a44073aa60ac33e0833e6a9bfd9ae7_681x397.jpeg)
* Log Cleaner
* Kafka 后台定期巡检待 Compact 的 Topic 的线程
- 概览
- 入门
- 1. 消息引擎系统
- 2. Kafka 术语
- 3. 分布式流处理平台
- 4. Kafka “发行版”
- 5. Kafka 版本号
- 基本使用
- 6. 生产集群部署
- 7. 集群参数配置
- 客户端实践与原理
- 9. Consumer 分区机制
- 10. Consumer 压缩算法
- 11. 无消息丢失配置
- 12. 客户端高级功能
- 13. Producer 管理 TCP
- 14. 幂等生产者和事务生产者
- 15. 消费者组
- 16. 位移主题
- 17. 消费者组重平衡(TODO)
- 18. 位移提交
- 19. CommitFailedException
- 20. 多线程开发者实例
- 21. Consumer 管理 TCP
- 22. 消费者组消费进度监控
- Kafka 内核
- 23. 副本机制
- 24. 请求处理
- 25. Rebalance 全流程
- 26. Kafka 控制器
- 27. 高水位和 Leader Epoch
- 管理与监控
- 28. Topic 管理
- 29. Kafka 动态配置
- 30. 重设消费者组位移
- 31. 工具脚本
- 32. KafkaAdminClient
- 33. 认证机制
- 34. 云下授权
- 35. 跨集群备份 MirrorMaker
- 36. 监控 Kafka
- 37. Kafka 监控框架
- 38. 调优 Kafka
- 39. 实时日志流处理平台
- 流处理
- 40. Kafka Streams
- 41. Kafka Streams DSL
- 42. Kafka Streams 金融
- Q&A