在Springboot中连接Kafka程序
1. 加入依赖包
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
2. 配置*application.yml*
```yaml
spring:
kafka:
producer:
bootstrap-servers: 192.168.8.222:9092
consumer:
bootstrap-servers: 192.168.8.222:9092
```
3. 创建一个测试API
```java
@RestController
public class TestController {
@Resource
KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/start")
public String test() {
new Thread(() -> {
try {
FileReader reader = new FileReader("E:\\Data\\MakePart\\rfid0901\\part4rfid0901.txt");
BufferedReader bufferedReader = new BufferedReader(reader);
String msg;
while ((msg = bufferedReader.readLine()) != null) {
kafkaTemplate.send("gosuncn", msg);
}
bufferedReader.close();
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
return "OK";
}
}
```
4. 创建消费者
```java
@Component
public class KafkaConsumer {
@KafkaListener(groupId = "cigc", id = "1", topics = {"gosuncn"})
public void cigc1(ConsumerRecord<String, String> record) {
System.out.print(" cigc1 topic --> " + record.topic());
System.out.print(" partition --> " + record.partition());
System.out.print(" offset --> " + record.offset());
System.out.print(" key --> " + record.key());
System.out.println(" value --> " + record.value());
}
@KafkaListener(groupId = "cigc", id = "2", topics = {"gosuncn"})
public void cigc2(ConsumerRecord<String, String> record) {
System.out.print(" cigc2 topic --> " + record.topic());
System.out.print(" partition --> " + record.partition());
System.out.print(" offset --> " + record.offset());
System.out.print(" key --> " + record.key());
System.out.println(" value --> " + record.value());
}
}
```