🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
## 1.安装zookeeper * 解压zookeeper ``` tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ~/app/ ``` * 配置进环境变量 ``` export ZK_HOME=/home/bizzbee/app/zookeeper-3.4.5-cdh5.7.0 export PATH=$ZK_HOME/bin:$PATH #source 一下 ``` * 修改配置:复制conf目录下的zoo_sample.cfg * 里面的dataDir位置要修改掉。因为默认存储在/tmp下面,重启会清空。 * 所以修改到自己新建的目录。`/home/bizzbee/app/zk_tmp` ``` dataDir=/home/bizzbee/app/zk_tmp ``` * 启动zk `./zkServer.sh start` ## 2.单节点单broker * 解压kafka ``` tar -zxvf kafka_2.11-2.1.1.tgz -C ~/app/ ``` * 修改配置`config/server.properties` ``` listeners = PLAINTEXT://spark:9092 log.dirs=/home/bizzbee/app/kafka-logs #log地址同样是自己创建目录,不能在/tmp下面 zookeeper.connect=spark:2181 ``` * 启动kafka,启动之前配置kafka环境到配置文件,并source生效。 ``` kafka-server-start.sh $KAFKA_HOME/config/server.properties ``` * 创建一个topic(之前启动的zookeeper不要关掉) ``` kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 1 --partitions 1 --topic bizzbee-topic ``` * 查看所有topic ``` [bizzbee@spark config]$ kafka-topics.sh --list --zookeeper spark:2181 bizzbee-topic ``` * 发送消息(生产消息) ``` [bizzbee@spark config]$ kafka-console-producer.sh --broker-list spark:9092 --topic bizzbee-topic > ``` * 接受消息(消费) ``` kafka-console-consumer.sh --bootstrap-server spark:9092 --topic bizzbee-topic --from-beginning ``` ![](https://img.kancloud.cn/a9/49/a9490009cc6f3199fdd181aef0824983_607x760.png) 对于**消费者**,kafka中有两个设置的地方:对于老的消费者,由**\--zookeeper参数**设置;对于新的消费者,由**\--bootstrap-server参数**设置 如果使用了--zookeeper参数,那么consumer的信息将会存放在zk之中 查看的方法是使用./zookeeper-client,然后 ls /consumers/\[group\_id\]/offsets/\[topic\]/\[broker\_id-part\_id\],这个是查看某个group\_id的某个topic的offset 如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中。 > 所以注意,新的消费者应该是kafka所在的9092端口。老的消费者应该从zk的2181端口拿消息。 * 在生产端输入消息 ![](https://img.kancloud.cn/81/55/8155e445870d235c922d1d06fdb9c511_967x394.png) * from-beginning的意思是,消费者启动时之前已经消费的消息也会接收到。就是从最开始接收。 * 查看所有topic详细信息。 ``` kafka-topics.sh --describe --zookeer spark:2181 ``` ## 3.单节点多broker部署 * 复制三个之前的配置文件。 ``` [bizzbee@spark config]$ cp server.properties server1.properties [bizzbee@spark config]$ cp server.properties server2.properties [bizzbee@spark config]$ cp server.properties server3.properties ``` * 三个都进行修改(一下是需要修改的) ``` # id:1,2,3 broker.id=1 #端口9093,94,95 listeners = PLAINTEXT://spark:9093 # 日志也新建三个文件夹 log.dirs=/home/bizzbee/app/tmp/kafka-logs1 ``` * 这次以后台daemon的方式启动kafka(不占用终端) ``` kafka-server-start.sh -daemon $KAFKA_HOME/config/server1.properties & kafka-server-start.sh -daemon $KAFKA_HOME/config/server2.properties & kafka-server-start.sh -daemon $KAFKA_HOME/config/server3.properties & ``` * 创建三个副本的topic ``` kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 3 --partitions 1 --topic bizzbee-replicated-topic ``` * 查看刚创建的topic ``` [bizzbee@spark config]$ kafka-topics.sh --describe --zookeeper spark:2181 --topic bizzbee-replicated-topic Topic:bizzbee-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: bizzbee-replicated-topic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 ``` * 创建生产者: ``` kafka-console-producer.sh --broker-list spark:9093,spark:9094,spark:9095 --topic bizzbee-replicated-topic ``` * 创建消费者: ``` kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic ``` > 这里从9093,94,95 都可以取到消息。