ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
* 使用idea 创建新的maven工程,使用scala-simple原型。 ![](https://img.kancloud.cn/9c/bf/9cbf1e0d7b60ba2ba2307b0655afc073_844x470.png) ~~~xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bizzbee.spark</groupId> <artifactId>spark-train</artifactId> <version>1.0</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.8</scala.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.9</version> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project> ~~~ * 之后创建一个简单的Kafka生产者类。 ~~~java package com.bizzbee.spark.kafka; //import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer" /* * 简单的生产者示例 * * */ public class SimpleKafkaProducer{ public static void main(String[] args) throws InterruptedException { // Check arguments length value // if(args.length == 0){ // System.out.println("Enter topic name"); // return; // } //Assign topicName to string variable String topicName = "bizzbee-replicated-topic"; // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id //请一定将服务器ip配到hosts中去!!!!!!!!有毒。。。 props.put("bootstrap.servers", "spark:9095,spark:9094,spark:9093"); // //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 3); //Specify buffer size in config //props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. //props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 100; i++){ producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); Thread.sleep(1000); System.out.println("Message sent successfully"); } producer.flush(); } } ~~~ * 请一定注意将服务器ip配置到hosts当中去!!!!!!!! *然后检查是否开启zookeeper以及Kafka。 * 检查topic对应的bloker。 * `kafka-topics.sh --describe --zookeeper spark:2181` ``` Topic:bizzbee PartitionCount:1 ReplicationFactor:1 Configs: Topic: bizzbee Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic:bizzbee-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: bizzbee-replicated-topic Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2,1 Topic:bizzbee-topic PartitionCount:1 ReplicationFactor:1 Configs: Topic: bizzbee-topic Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic:bizzbee_topic PartitionCount:1 ReplicationFactor:1 Configs: Topic: bizzbee_topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2 Topic:jjj PartitionCount:1 ReplicationFactor:1 Configs: Topic: jjj Partition: 0 Leader: 2 Replicas: 2 Isr: 2 ``` * 可以看到当前的topic :bizzbee-replicated-topic有三个blocker,并且都是启动状态。所以当前生产者向这三个bloker进行生产。 * 然后在服务器终端起一个消费者。bloker写三个之中的一个就可以了。 `kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic` ![](https://img.kancloud.cn/53/48/5348190aea2350323272afedcddd6c2c_844x470.png)