## 1.安装zookeeper
* 解压zookeeper
```
tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ~/app/
```
* 配置进环境变量
```
export ZK_HOME=/home/bizzbee/app/zookeeper-3.4.5-cdh5.7.0
export PATH=$ZK_HOME/bin:$PATH
#source 一下
```
* 修改配置:复制conf目录下的zoo_sample.cfg
* 里面的dataDir位置要修改掉。因为默认存储在/tmp下面,重启会清空。
* 所以修改到自己新建的目录。`/home/bizzbee/app/zk_tmp`
```
dataDir=/home/bizzbee/app/zk_tmp
```
* 启动zk
`./zkServer.sh start`
## 2.单节点单broker
* 解压kafka
```
tar -zxvf kafka_2.11-2.1.1.tgz -C ~/app/
```
* 修改配置`config/server.properties`
```
listeners = PLAINTEXT://spark:9092
log.dirs=/home/bizzbee/app/kafka-logs
#log地址同样是自己创建目录,不能在/tmp下面
zookeeper.connect=spark:2181
```
* 启动kafka,启动之前配置kafka环境到配置文件,并source生效。
```
kafka-server-start.sh $KAFKA_HOME/config/server.properties
```
* 创建一个topic(之前启动的zookeeper不要关掉)
```
kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 1 --partitions 1 --topic bizzbee-topic
```
* 查看所有topic
```
[bizzbee@spark config]$ kafka-topics.sh --list --zookeeper spark:2181
bizzbee-topic
```
* 发送消息(生产消息)
```
[bizzbee@spark config]$ kafka-console-producer.sh --broker-list spark:9092 --topic bizzbee-topic
>
```
* 接受消息(消费)
```
kafka-console-consumer.sh --bootstrap-server spark:9092 --topic bizzbee-topic --from-beginning
```
![](https://img.kancloud.cn/a9/49/a9490009cc6f3199fdd181aef0824983_607x760.png)
对于**消费者**,kafka中有两个设置的地方:对于老的消费者,由**\--zookeeper参数**设置;对于新的消费者,由**\--bootstrap-server参数**设置
如果使用了--zookeeper参数,那么consumer的信息将会存放在zk之中
查看的方法是使用./zookeeper-client,然后 ls /consumers/\[group\_id\]/offsets/\[topic\]/\[broker\_id-part\_id\],这个是查看某个group\_id的某个topic的offset
如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中。
> 所以注意,新的消费者应该是kafka所在的9092端口。老的消费者应该从zk的2181端口拿消息。
* 在生产端输入消息
![](https://img.kancloud.cn/81/55/8155e445870d235c922d1d06fdb9c511_967x394.png)
* from-beginning的意思是,消费者启动时之前已经消费的消息也会接收到。就是从最开始接收。
* 查看所有topic详细信息。
```
kafka-topics.sh --describe --zookeer spark:2181
```
## 3.单节点多broker部署
* 复制三个之前的配置文件。
```
[bizzbee@spark config]$ cp server.properties server1.properties
[bizzbee@spark config]$ cp server.properties server2.properties
[bizzbee@spark config]$ cp server.properties server3.properties
```
* 三个都进行修改(一下是需要修改的)
```
# id:1,2,3
broker.id=1
#端口9093,94,95
listeners = PLAINTEXT://spark:9093
# 日志也新建三个文件夹
log.dirs=/home/bizzbee/app/tmp/kafka-logs1
```
* 这次以后台daemon的方式启动kafka(不占用终端)
```
kafka-server-start.sh -daemon $KAFKA_HOME/config/server1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server3.properties &
```
* 创建三个副本的topic
```
kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 3 --partitions 1 --topic bizzbee-replicated-topic
```
* 查看刚创建的topic
```
[bizzbee@spark config]$ kafka-topics.sh --describe --zookeeper spark:2181 --topic bizzbee-replicated-topic
Topic:bizzbee-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: bizzbee-replicated-topic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
```
* 创建生产者:
```
kafka-console-producer.sh --broker-list spark:9093,spark:9094,spark:9095 --topic bizzbee-replicated-topic
```
* 创建消费者:
```
kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic
```
> 这里从9093,94,95 都可以取到消息。