💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
# Kafka实战二 ### 前言 在上一章 [Kafka实战](http://blog.csdn.net/u013291394/article/details/50231681) 中我们在局域网中搭建了一个Kafka节点,并尝试了通过命令行脚本来实现本地消息的发布与接收,了解了主从节点之间的关系等。这一章主要实现在本机通过Java代码实现对局域网中的Kafka节点进行消息的发布与接收。 ### 准备工作 在Java中进行Kafka编程需要依赖kafka和kafka-clients两个包,下面直接提供maven配置文件pom.xml,不要忘记修改工程名: ~~~ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Test</groupId> <artifactId>Test</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency> </dependencies> </project> ~~~ ### 生产者 生产者有两种发布方式,同步和异步。异步方式增加了一个Callback参数来实现在消息成功发送后,开展后续的工作。 ~~~ import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; // 是否需要异步发送 private final Boolean isAsync; // 装有Kafka的机器的IP地址 private final String serverIp = "10.64.***.***"; public Producer(String topic, Boolean isAsync) { Properties props = new Properties(); props.put("bootstrap.servers", serverIp+":9092"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<Integer, String>(props); this.topic = topic; this.isAsync = isAsync; } public void run() { int messageNo = 1; while(true) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { try { producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } ++messageNo; } } } class DemoCallBack implements Callback { private long startTime; private int key; private String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * 当异步发送完成后需要进行的处理 **/ public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } } ~~~ 调用方式: ~~~ // 开启生产者线程后,会向Kafka节点中对应的topic发送Message_**类型的消息 boolean isAsync = true; Producer producerThread = new Producer(KafkaProperties.topic, isAsync); producerThread.start(); ~~~ ### 消费者 消费者用来接收特定话题的消息。 ~~~ import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import kafka.utils.ShutdownableThread; public class Consumer extends ShutdownableThread { private final KafkaConsumer<Integer, String> consumer; private final String topic; private final String serverIp = "10.64.***.***"; public Consumer(String topic) { super("KafkaConsumerExample", false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverIp+":9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); this.topic = topic; } @Override public void doWork() { consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } } @Override public String name() { return null; } @Override public boolean isInterruptible() { return false; } } ~~~ 调用方式: ~~~ //开启消费者线程后,会接收到之前生产者发送的消息 Consumer consumerThread = new Consumer(KafkaProperties.topic); consumerThread.start(); ~~~ ### 总结 通过上面的简单的例子,我们就可以在自己的工程中向Kafka发送消息,并接收到自己订阅的消息了。