企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# Kafka系统工具 ### 前言 Kafka为我们提供了很多有用的系统工具,这些工具都放在kafka.tools包中,具体的类如下图: ![kafka-tools](https://box.kancloud.cn/2016-03-21_56efa3c6987e7.jpg "") ### 工具介绍 下面简要的介绍一下一些比较常用的工具: **1. Consumer Offset Checker** 用来展示消费者组、话题、分区、指针偏移量(offset)等值。 可选描述: ~~~ --broker-info 打印broker的信息 --group 消费者组 --help 打印帮助信息 --retry.backoff.ms <Integer> 错误查询重新尝试间隔(默认为3000) --socket.timeout.ms <Integer> 连接超时时间 (默认为6000) --topic 消费者的某一个话题,缺失时默认包含所有的话题 --zookeeper zookeeper的地址(默认为localhost:2181) ~~~ **2. Dump Log Segments** 从日志文件中打印消息或者验证日志下标是否正确。 可选描述: ~~~ --deep-iteration 使用深迭代而不是浅迭代 --files <file1, file2, ...> 输入的文件 --key-decoder-class 用自己定义的反序列化方式反序列化关键字 --max-message-size <Integer: size> 消息最大的字节数(默认为5242880) --print-data-log 同时打印出日志消息 --value-decoder-class 用自己定义的序列化方式来序列化关键字 --verify-index-only 只是认证下标 ~~~ **3. Export Zookeeper Offsets** 把不同Kafka节点分区中的指针偏移量输出到如下格式的文件中: > /consumers/group1/offsets/topic1/1-0:286894308 /consumers/group1/offsets/topic1/2-0:284803985 可选描述: ~~~ --group 消费者组 --help 打印帮助信息 --output-file 导出的文件名 --zkconnect zookeeper的地址(默认为localhost:2181) ~~~ **4. Get Offset Shell** 获得某一个消息的指针偏移量。 可选描述: ~~~ --broker-list <hostname:port,..., 每个broker的主机名和端口号 hostname:port> --max-wait-ms <Integer: ms> 每次抓取最长的等待时间 (默认为1000) --offsets <Integer: count> 返回的偏移量的数目(默认为1) --partitions <partition ids> 需要在哪些分区中查询,默认为所有的分区 --time <Long: timestamp/-1(latest)/-2 设定时间区间 (earliest)> --topic <topic> 设定特定的话题 ~~~ **5. Import Zookeeper Offsets** 把导出的指针偏移量文件再倒入并放到对应的分区中 必须的参数: group 可选描述: ~~~ --help 打印帮助信息 --input-file 需要导入的文件 --zkconnect zookeeper的地址(默认为localhost:2181) ~~~ **6. JMX Tool** 通过JMX管理来打印metrics 可选描述: ~~~ --attributes <name> 需要查询的属性的白名单 --date-format <format> 用来格式化时间信息 --help 打印帮助信息 --jmx-url <service-url> 获取JMX信息的URL --object-name <name> 查询特定的JMX对象信息,可以设置多个值, 如果不设置会查询所有的JMX对象 --reporting-interval <Integer: ms> 多久获取一次JMX信息 (默认为2000) ~~~ **7. Kafka Migration** 可以将Kafka从0.7版本迁移到0.8版本 可选描述: ~~~ --blacklist <Java regex (String)> 不需要复制的话题的黑名单 --consumer.config <config file> 消费者配置文件 --help 打印帮助信息 --kafka.07.jar <kafka 0.7 jar> kafka 0.7版本的压缩包 --num.producers <Integer: Number of 生产者实例数目(默认为1) producers> --num.streams <Integer: Number of 消费者实例数目(默认为1) consumer threads> --producer.config <config file> 生产者配置文件 --queue.size <Integer: Queue size in 在版本间迁移时消息的缓冲数目 terms of number of messages> --whitelist <Java regex (String)> 需要从旧集群复制过去的话题的白名单 --zkclient.01.jar <zkClient 0.1 jar zookeeper 0.1版本压缩包 file required by Kafka 0.7> ~~~ **8. Mirror Maker** 提供Kafka集群之间的映射关系,实现跨集群的同步。 可选描述: ~~~ --abort.on.send.failure <Stop the 出错时是否终止操作(默认为true) entire mirror maker when a send failure occurs> --blacklist <Java regex (String)> 不需要同步的话题的黑名单 --consumer.config <config file> 消费者配置文件 --consumer.rebalance.listener <A 消费者复杂均衡监听器 custom rebalance listener of type ConsumerRebalanceListener> --help 打印帮助信息 --message.handler <A custom message 在生产者和消费者之间来处理消息的处理器 handler of type MirrorMakerMessageHandler> --message.handler.args <Arguments 消费者负载均衡参数 passed to message handler constructor.> --new.consumer 在mirror maker时使用新的消费者 --num.streams <Integer: Number of 指定消费者的线程数(默认为1) threads> --offset.commit.interval.ms <Integer: 偏移量提交间隔(默认为60000) offset commit interval in millisecond> --producer.config <config file> 生产者配置文件 --rebalance.listener.args <Arguments 消费者负载均衡参数 passed to custom rebalance listener constructor as a string.> --whitelist <Java regex (String)> 需要同步的话题的白名单 ~~~ **9. State Change Log Merger** 状态转变日志整合工具,可以把来自不同节点不同时间的日志进行整合。 可选描述: ~~~ --end-time <end timestamp in the 需要整合的日志的截止时间,在这之前的 format java.text. 日志都要整合(默认为 9999-12-31 23:59:59,999) SimpleDateFormat@f17a63e7> --logs <file1,file2,...> 需要整合的日志 --logs-regex <for example: /tmp/state- 日志名的正则表达式 change.log*> --partitions <0,1,2,...> 哪些分区的日志需要整合 --start-time <start timestamp in the 需要整合的日志的开始时间,在这之后的 format java.text. 日志都要整合(默认为 0000-00-00 00:00:00,000) SimpleDateFormat@f17a63e7> --topic <topic> 哪些话题的日志需要整合 ~~~ **10. Verify Consumer Rebalance** 确认消费者是否平衡,确保每个分区只有一个消费者,因为Kafka不支持多个消费者同时对一个分区进行读写。 可选描述: ~~~ --group 消费者组 --help 打印帮助信息 --zookeeper.connect zookeeper的地址(默认为localhost:2181) ~~~ ### 使用方式 ### 脚本命令 在${KAFKA_HOME}/bin目录下(windows用户可以在${KAFKA_HOME}/bin/windows中找到对应的bat脚本)为我们封装好了很多基本的命令脚本,可以直接调用,如: ~~~ drfish@kafka:~/kafka_2.11-0.9.0.0$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group group-1 ~~~ ### 类调用 在${KAFKA_HOME}/bin目录下还有一个特殊的脚本kafka-run-class.sh,它可以调用所需的类来运行类中的方法,具体方法如下: ~~~ drfish@kafka:~/kafka_2.11-0.9.0.0$ bin/kfka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group=group-1 ~~~ 需要注意的是,当通过调用类来使用工具时,需要使用完整的类名,而且有些类名使用了缩写,如kafka.tools.ImportZkOffsets。 ### Java代码 同样我们可以用Java代码直接通过类来调用相应的工具: ~~~ String[] arg = new String[] { "--zookeeper=10.64.253.238:2181", "--group=group-1" }; ConsumerOffsetChecker.main(arg); ~~~ ### 运行结果 上面三种不同的调用方式,最后都会返回类似如下的结果: ~~~ Group Topic Pid Offset LogSize Lag Owner group-1 test 0 255229 255229 0 none ~~~ ### 总结 本文介绍了一下Kafka提供的系统工具的常用工具,并给出了通过不同方式来运用工具的方法,能够帮助大家更好地来管理Kafka集群。