* 使用idea 创建新的maven工程,使用scala-simple原型。
![](https://img.kancloud.cn/9c/bf/9cbf1e0d7b60ba2ba2307b0655afc073_844x470.png)
~~~xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bizzbee.spark</groupId>
<artifactId>spark-train</artifactId>
<version>1.0</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
~~~
* 之后创建一个简单的Kafka生产者类。
~~~java
package com.bizzbee.spark.kafka;
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer"
/*
* 简单的生产者示例
*
* */
public class SimpleKafkaProducer{
public static void main(String[] args) throws InterruptedException {
// Check arguments length value
// if(args.length == 0){
// System.out.println("Enter topic name");
// return;
// }
//Assign topicName to string variable
String topicName = "bizzbee-replicated-topic";
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
//请一定将服务器ip配到hosts中去!!!!!!!!有毒。。。
props.put("bootstrap.servers", "spark:9095,spark:9094,spark:9093");
//
//Set acknowledgements for producer requests.
props.put("acks", "all");
//If the request fails, the producer can automatically retry,
props.put("retries", 3);
//Specify buffer size in config
//props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
//props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 100; i++){
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
Thread.sleep(1000);
System.out.println("Message sent successfully");
}
producer.flush();
}
}
~~~
* 请一定注意将服务器ip配置到hosts当中去!!!!!!!!
*然后检查是否开启zookeeper以及Kafka。
* 检查topic对应的bloker。
* `kafka-topics.sh --describe --zookeeper spark:2181`
```
Topic:bizzbee PartitionCount:1 ReplicationFactor:1 Configs:
Topic: bizzbee Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic:bizzbee-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: bizzbee-replicated-topic Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2,1
Topic:bizzbee-topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: bizzbee-topic Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic:bizzbee_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: bizzbee_topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic:jjj PartitionCount:1 ReplicationFactor:1 Configs:
Topic: jjj Partition: 0 Leader: 2 Replicas: 2 Isr: 2
```
* 可以看到当前的topic :bizzbee-replicated-topic有三个blocker,并且都是启动状态。所以当前生产者向这三个bloker进行生产。
* 然后在服务器终端起一个消费者。bloker写三个之中的一个就可以了。
`kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic`
![](https://img.kancloud.cn/53/48/5348190aea2350323272afedcddd6c2c_844x470.png)