企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# Topic 管理 ## Topic 日常管理 * Kafka 自带 kafka-topics 脚本创建 Topic ``` bash bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1 ``` * 从 V2.2 开始推荐使用 --bootstrap-server 替代 --zookeeper * --bootstrap-server 会使用 Kafka 安全体系 * --bootstrap-server 不必直接与 ZK 交互 * 查询 topics ``` bin/kafka-topics.sh --bootstrap-server broker_host:port --list ``` * 查询单个 topic 下的详细数据 ``` bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name> ``` * 会返回`可见`的 topic 详细数据 * 修改 topic partitions ``` bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数> ``` * 修改 topic 级别参数 ``` bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760 ``` * 此处 --zookeeper 而非 --bootstrap-server 是因为后者是设置动态参数 * 变更 replicas 数 * 修改 topic 限速 ``` bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0 bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test ``` * topic partition 迁移 ``` bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name> ``` ## 特殊 topic * 两类特殊 topic * __consumer_offsets * __transaction_state * 查看 Consumer Group 提交的 offsets ``` bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning ``` * 读取 topic,查看 Consumer Group 状态 ``` bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning ``` ## 常见 topic 错误处理 * topic 删除失效 * 常见原因 * Replica 所在的 Broker 宕机 * 待删除的 topic 的部分 partition 依然在执行迁移过程 * 对于第一点原因:自行恢复 * 对于第二点原因:可以采用如下操作 * 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。 * 手动删除该主题在磁盘上的分区目录。 * 在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。 * 第三部会造成大面积的 Partition Leader 重选举 * __consumer_offsets 占用太多磁盘 * 使用 jstack 查看 kafka-log-cleaner-thread 前缀的线程状态 * 一般情况是该线程挂掉,无法及时清理内部 topic * 解决办法:重启 Broker