# Topic 管理
## Topic 日常管理
* Kafka 自带 kafka-topics 脚本创建 Topic
``` bash
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
```
* 从 V2.2 开始推荐使用 --bootstrap-server 替代 --zookeeper
* --bootstrap-server 会使用 Kafka 安全体系
* --bootstrap-server 不必直接与 ZK 交互
* 查询 topics
```
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
```
* 查询单个 topic 下的详细数据
```
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
```
* 会返回`可见`的 topic 详细数据
* 修改 topic partitions
```
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
```
* 修改 topic 级别参数
```
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
```
* 此处 --zookeeper 而非 --bootstrap-server 是因为后者是设置动态参数
* 变更 replicas 数
* 修改 topic 限速
```
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
```
* topic partition 迁移
```
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
```
## 特殊 topic
* 两类特殊 topic
* __consumer_offsets
* __transaction_state
* 查看 Consumer Group 提交的 offsets
```
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
```
* 读取 topic,查看 Consumer Group 状态
```
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
```
## 常见 topic 错误处理
* topic 删除失效
* 常见原因
* Replica 所在的 Broker 宕机
* 待删除的 topic 的部分 partition 依然在执行迁移过程
* 对于第一点原因:自行恢复
* 对于第二点原因:可以采用如下操作
* 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
* 手动删除该主题在磁盘上的分区目录。
* 在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。
* 第三部会造成大面积的 Partition Leader 重选举
* __consumer_offsets 占用太多磁盘
* 使用 jstack 查看 kafka-log-cleaner-thread 前缀的线程状态
* 一般情况是该线程挂掉,无法及时清理内部 topic
* 解决办法:重启 Broker
- 概览
- 入门
- 1. 消息引擎系统
- 2. Kafka 术语
- 3. 分布式流处理平台
- 4. Kafka “发行版”
- 5. Kafka 版本号
- 基本使用
- 6. 生产集群部署
- 7. 集群参数配置
- 客户端实践与原理
- 9. Consumer 分区机制
- 10. Consumer 压缩算法
- 11. 无消息丢失配置
- 12. 客户端高级功能
- 13. Producer 管理 TCP
- 14. 幂等生产者和事务生产者
- 15. 消费者组
- 16. 位移主题
- 17. 消费者组重平衡(TODO)
- 18. 位移提交
- 19. CommitFailedException
- 20. 多线程开发者实例
- 21. Consumer 管理 TCP
- 22. 消费者组消费进度监控
- Kafka 内核
- 23. 副本机制
- 24. 请求处理
- 25. Rebalance 全流程
- 26. Kafka 控制器
- 27. 高水位和 Leader Epoch
- 管理与监控
- 28. Topic 管理
- 29. Kafka 动态配置
- 30. 重设消费者组位移
- 31. 工具脚本
- 32. KafkaAdminClient
- 33. 认证机制
- 34. 云下授权
- 35. 跨集群备份 MirrorMaker
- 36. 监控 Kafka
- 37. Kafka 监控框架
- 38. 调优 Kafka
- 39. 实时日志流处理平台
- 流处理
- 40. Kafka Streams
- 41. Kafka Streams DSL
- 42. Kafka Streams 金融
- Q&A