企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# Spring Boot Kafka `JsonSerializer`示例 > 原文: [https://howtodoinjava.com/kafka/spring-boot-jsonserializer-example/](https://howtodoinjava.com/kafka/spring-boot-jsonserializer-example/) 学习使用[**`JsonSerializer`**](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/serializer/JsonSerializer.html)和[`JsonDeserializer`](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/serializer/JsonDeserializer.html)类从 [Apache Kafka](https://howtodoinjava.com/kafka/tutorial-introduction/) 主题存储和检索 JSON,并返回 Java 模型对象。 ## 1\. 先决条件 * 请按照本指南在您的机器上设置 [Kafka](https://howtodoinjava.com/kafka/getting-started-windows-10/) 。 * 我们正在修改 [Spring boot 和 Kafka HelloWorld 应用](https://howtodoinjava.com/kafka/spring-boot-with-kafka/)。 * 还要确保您的计算机上至少安装了 [Java8](https://howtodoinjava.com/java/basics/jdk-jre-jvm/#downloads) 和 [Maven](https://howtodoinjava.com/maven/how-to-install-maven-on-windows/) 。 ## 2\. 应用配置 在`application.properties`文件中,我们添加了以下配置。 ```java server.port=9000 spring.kafka.consumer.bootstrap-servers: localhost:9092 spring.kafka.consumer.group-id: group-id spring.kafka.consumer.auto-offset-reset: earliest spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.producer.bootstrap-servers: localhost:9092 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer ``` * `spring.kafka.consumer.key-deserializer`指定键的反序列化器类。 * `spring.kafka.consumer.value-deserializer`指定值的反序列化器类。 * `spring.kafka.consumer.properties.spring.json.trusted.packages`指定允许反序列化的程序包模式的逗号分隔列表。 `'*'`表示反序列化所有程序包。 * `spring.kafka.producer.key-deserializer`指定键的序列化器类。 * `spring.kafka.producer.value-deserializer`指定值的序列化器类。 ## 3\. 模型类 我们创建了`User`类,该类将发送给 Kafka。 其实例将由`JsonSerializer`序列化为字节数组。 这个字节数组最终由 Kafka 存储到给定的分区中。 反序列化期间,`JsonDeserializer`用于以字节数组的形式从 Kafka 接收 JSON,并将`User`对象返回给应用。 ```java public class User { private long userId; private String firstName; private String lastName; public long getUserId() { return userId; } public void setUserId(long userId) { this.userId = userId; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return "User [userId=" + userId + ", firstName=" + firstName + ", lastName=" + lastName + "]"; } } ``` ## 4\. Kafka 生产者 生产者 API 只是在 HTTP POST API 中使用用户信息。 然后,它创建一个新的`User`对象,并使用[`KafkaTemplate`](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html)发送给 Kafka。 ```java @PostMapping(value = "/createUser") public void sendMessageToKafkaTopic( @RequestParam("userId") long userId, @RequestParam("firstName") String firstName, @RequestParam("lastName") String lastName) { User user = new User(); user.setUserId(userId); user.setFirstName(firstName); user.setLastName(lastName); this.producerService.saveCreateUserLog(user); } ``` ```java @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public void saveCreateUserLog(User user) { logger.info(String.format("User created -> %s", user)); this.kafkaTemplate.send(AppConstants.TOPIC_NAME_USER_LOG, user); } ``` ## 5\. Kafka 消费者 用户被实现为`@KafkaListener`,每次在主题中添加新条目时都会得到通知。 ```java @KafkaListener(topics = AppConstants.TOPIC_NAME_USER_LOG, groupId = AppConstants.GROUP_ID) public void consume(User user) { logger.info(String.format("User created -> %s", user)); } ``` ## 6\. 测试 使用任何 REST API 测试器,并向 API `http://localhost:9000/kafka/createUser`发送一些消息,如下所示。 留言栏:`http://localhost:9000/kafka/createUser?userId=1&firstName=Lokesh&lastName=Gupta` 观察控制台日志: ```java 2020-05-24 23:36:47.132 INFO 2092 --- [nio-9000-exec-4] 2020-05-26 01:03:52.722 INFO 11924 --- [nio-9000-exec-6] c.h.k.demo.service.KafKaProducerService : User created -> User [userId=1, firstName=Lokesh, lastName=Gupta] 2020-05-26 01:03:52.729 INFO 11924 --- [ntainer#1-0-C-1] c.h.k.demo.service.KafKaConsumerService : User created -> User [userId=1, firstName=Lokesh, lastName=Gupta] ``` ## 7\. 总结 在此 SpringBoot kafka `JsonSerializer`示例中,我们学习了使用`JsonSerializer`对 Java 对象进行序列化和反序列化并存储在 Kafka 中。 学习愉快! [源码下载](https://github.com/lokeshgupta1981/Kafka-Tutorials)