# kafka消息队列详解与实战 #### 1.kafka简介 > **Kafka**是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像**Hadoop**一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。 ![ZChvHx.png](https://s2.ax1x.com/2019/06/23/ZChvHx.png) #### 2.kafaka的安装 官网下载地址: ```java http://kafka.apache.org/downloads ``` 注意前面是Scala的版本,后面才是kafka的版本,我们下载红框中的tgz文件 ![ZPA1SS.png](https://s2.ax1x.com/2019/06/23/ZPA1SS.png) 这里我的下载版本为: kafka_2.11-2.2.1,解压后我们在kafka的主目录下面启动kafka: ```java /opt/kafka/kafka_2.11-2.2.1 ``` ##### A: 使用kafka自带的zookeeper来启动kafka: ```java 直接启动命令: //1.启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties //2.查看端口是否被监听 lsof -i:2181 //3.启动kafka bin/kafka-server-start.sh config/server.properties //4.建一个topic 名称test1 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1 //5.查看topic 名称test1 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1 //6.用一个consumer订阅test1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 //6.用一个producer发布消息到test1 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1 ``` ##### B: 使用单独安装的zookeeper来启动kafka: 下载地址,这里选用的是3.4.14的版本zookeeper: ```java http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.14/ ``` 使用zookeeper_3.4.14: ```java //1.清除临时数据 rm -rf /tmp/ //2.解压到/opt/zookeeper-3.4.14 tar -xvf zookeeper-3.4.14.tar.gz //3.它所在主目录下使用zookeeper bin/zkServer.sh start conf/zoo_sample.cfg //4.检查启动状态 bin/zkServer.sh status conf/zoo_sample.cfg lsof -i:2181 //5.启动kafka bin/kafka-server-start.sh config/server.properties //6.建一个topic 名称test1 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1 //7.查看topic 名称test1 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1 //8.用一个consumer订阅test1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 //9.用一个producer发布消息到test1 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1 ```