企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
### **3.1.2 一个非常简单的示例** ***** 如下所示,您可以使用纯 Java 来发送和接收消息: ~~~ @Test public void testAutoCommit() throws Exception { logger.info("Start auto"); ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener(new MessageListener<Integer, String>() { @Override public void onMessage(ConsumerRecord<Integer, String> message) { logger.info("received: " + message); latch.countDown(); } }); KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps); container.setBeanName("testAuto"); container.start(); Thread.sleep(1000); // wait a bit for the container to start KafkaTemplate<Integer, String> template = createTemplate(); template.setDefaultTopic(topic1); template.sendDefault(0, "foo"); template.sendDefault(2, "bar"); template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); assertTrue(latch.await(60, TimeUnit.SECONDS)); container.stop(); logger.info("Stop auto"); } ~~~ ~~~ private KafkaMessageListenerContainer<Integer, String> createContainer( ContainerProperties containerProps) { Map<String, Object> props = consumerProps(); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; } private KafkaTemplate<Integer, String> createTemplate() { Map<String, Object> senderProps = senderProps(); ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps); KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf); return template; } ~~~ ~~~ private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } ~~~ 以上代码用于配置消费者客户端参数。 * ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG:指定连接 Kafka 集群所需的 broker 地址清单。 * GROUP_ID_CONFIG:此消费者所隶属的消费组的唯一标识,即消费组的名称。 * ENABLE_AUTO_COMMIT_CONFIG:配置是否开启自动提交消费位移的功能,默认开启。 * AUTO_COMMIT_INTERVAL_MS_CONFIG:当 enable.auto.commit 参数设置为 true 时才生效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔 * SESSION_TIMEOUT_MS_CONFIG:消费组管理协议中用来检测消费者是否失效的超时时间。 * KEY_DESERIALIZER_CLASS_CONFIG:消息中 key 所对应的反序列化类,需要实现 org.apache.kafka.common.serialization.Deserializer 接口。 * VALUE_DESERIALIZER_CLASS_CONFIG:消息中 value 所对应的反序列化类,需要实现 org.apache.kafka.common.serialization.Deserializer 接口。 ~~~ private Map<String, Object> senderProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } ~~~ <br >