* 使用idea 创建新的maven工程,使用scala-simple原型。
* 之后创建一个简单的Kafka生产者类。
package com.bizzbee.spark.kafka;
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer"
* 简单的生产者示例
* */
public class SimpleKafkaProducer{
public static void main(String[] args) throws InterruptedException {
// Check arguments length value
// if(args.length == 0){
// System.out.println("Enter topic name");
// return;
// }
//Assign topicName to string variable
String topicName = "bizzbee-replicated-topic";
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", "spark:9095,spark:9094,spark:9093");
//Set acknowledgements for producer requests.
props.put("acks", "all");
//If the request fails, the producer can automatically retry,
props.put("retries", 3);
//Specify buffer size in config
//props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
//props.put("buffer.memory", 33554432);
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 100; i++){
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
* 请一定注意将服务器ip配置到hosts当中去!!!!!!!!
* 检查topic对应的bloker。
* `kafka-topics.sh --describe --zookeeper spark:2181`
Topic:bizzbee PartitionCount:1 ReplicationFactor:1 Configs:
Topic: bizzbee Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic:bizzbee-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: bizzbee-replicated-topic Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2,1
Topic:bizzbee-topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: bizzbee-topic Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic:bizzbee_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: bizzbee_topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic:jjj PartitionCount:1 ReplicationFactor:1 Configs:
Topic: jjj Partition: 0 Leader: 2 Replicas: 2 Isr: 2
* 可以看到当前的topic :bizzbee-replicated-topic有三个blocker,并且都是启动状态。所以当前生产者向这三个bloker进行生产。
* 然后在服务器终端起一个消费者。bloker写三个之中的一个就可以了。
`kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic`