🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
### **4.1.3 接收消息** ***** 可以通过配置 MessageListenerContainer 并提供消息侦听器,或使用 @KafkaListener 注解来接收消息。 <br > #### **消息监听器(Message Listeners)** ***** 使用消息侦听器容器时,必须提供一个侦听器以接收数据。 当前有八种支持的消息侦听器接口: ~~~ public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } ~~~ 1. 使用自动提交或容器管理的提交方式时,可使用 MessageListener 来处理 poll() 操作接收的单个 ConsumerRecord。 2. 使用手动提交方式时,可使用 AcknowledgingMessageListener 来处理 poll() 操作接收到的各个 ConsumerRecord。 3. 使用自动提交或容器管理的提交方式时,可使用 ConsumerAwareMessageListener 来处理 poll() 操作接收的单个 ConsumerRecord。提供了对 Consumer 对象的访问。 4. 使用手动提交方式时,可使用 AcknowledgingConsumerAwareMessageListener 来处理 poll() 操作接收到的各个 ConsumerRecord。提供了对Consumer对象的访问。 5. 使用自动提交或容器管理的提交方式时,可使用 BatchMessageListener 来处理 poll() 操作接收的所有 ConsumerRecord。使用此接口时,不支持 AckMode.RECORD,因为已为侦听器提供了完整的批处理。 6. 使用手动提交方式时,可使用 BatchAcknowledgingMessageListener 来处理 poll() 操作接收到的所有ConsumerRecord。 7. 使用自动提交或容器管理的提交方式时,可使用 BatchConsumerAwareMessageListener 来处理 poll() 操作接收的所有 ConsumerRecord。使用此接口时,不支持AckMode.RECORD,因为已为侦听器提供了完整的批处理。提供了对Consumer对象的访问。 8. 使用手动提交方式时,可使用 BatchAcknowledgingConsumerAwareMessageListener 来处理 poll() 操作接收到的所有 ConsumerRecord。提供了对 Consumer 对象的访问。 > 注意:Consumer 对象是非线程安全的,你只能在调用监听器的线程上使用它。 <br > #### **消息侦听器容器(Message Listener Containers)** ***** 提供了两个 MessageListenerContainer 实现: * KafkaMessageListenerContainer * ConcurrentMessageListenerContainer KafkaMessageListenerContainer 在单个线程上接收来自所有主题/分区的所有消息。 ConcurrentMessageListenerContainer 委托给1个或多个 KafkaMessageListenerContainer 来提供多线程使用。 从版本 2.2.7 开始,您可以将 RecordInterceptor 添加到侦听器容器中。 它将在调用侦听器之前被调用,以允许检查或修改记录。 如果拦截器返回 null,则不调用侦听器。 当侦听器为批处理侦听器时,拦截器不会被调用。 从 2.3 版开始,CompositeRecordInterceptor 可用于调用多个拦截器。 默认情况下,使用事务时,在事务启动后将调用拦截器。 从版本 2.3.4 开始,您可以将侦听器容器的 interceptBeforeTx 属性设置为在事务开始之前调用拦截器。 没有为批处理侦听器提供拦截器,因为 Kafka 已经提供了 ConsumerInterceptor。 <br > ##### **使用 KafkaMessageListenerContainer** 可用构造函数如下: ~~~ public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitions) ~~~ ConcurrentMessageListenerContainer(稍后介绍)使用第二个构造函数在消费者实例之间分配 TopicPartitionOffset。ContainerProperties 包含主题和分区相关信息,构造函数如下: ~~~ public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) public ContainerProperties(String... topics) public ContainerProperties(Pattern topicPattern) ~~~ * 第一个构造函数包含一个 TopicPartitionInitialOffset 数组参数,以明确指示容器使用哪个分区(使用 consumer 的 assign() 方法),并带有可选的初始偏移量:默认为正值; 默认情况下,负值相对于分区中的当前最后一个偏移量。 * 第二个构造函数包含一个字符串数据参数,Kafka 根据 group.id 属性分配分区(在整个组中分配分区)。 * 第三个构造函数使用正则表达式模式选择主题。 要将 MessageListener 分配给容器,请在创建 Container 时使用 ContainerProps.setMessageListener 方法: ~~~ ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() { ... }); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; ~~~ 注意,在创建 DefaultKafkaConsumerFactory 时,使用仅接受上述属性的构造函数意味着从配置中获取键和值的 Deserializer 类。 或者,可以将 Deserializer 实例传递给 DefaultKafkaConsumerFactory 构造函数以获取键或值,在这种情况下,所有消费者均共享相同的实例。 另一个选择是提供 Supplier (从版本2.3开始),该类将用于为每个消费者获取单独的 Deserializer 实例: ~~~ DefaultKafkaConsumerFactory<Integer, CustomValue> cf = new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; ~~~ 有关可设置的各种属性的更多信息,请参考 [ContainerProperties 的 Javadoc](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.html) 。 从 2.1.1 版开始,提供了一个名为 logContainerConfig 的新属性。 设为 true 并启用 INFO 日志记录后,每个侦听器容器(Listener Container)都会写入一条日志消息,以概述其配置属性。 默认情况下,主题偏移量提交的日志记录是使用 DEBUG 日志记录级别进行的。 从版本 2.1.2 开始,ContainerProperties 中有一个名为 commitLogLevel 的新属性,该属性可让您指定这些消息的日志级别。 例如,要将日志级别更改为 INFO,请使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO)。 从 2.2 版开始,添加了一个名为 missingTopicsFatal 的新容器属性(默认值:true)。 如果代理中没有任何已配置的主题,这将阻止容器启动。 如果侦听器配置为通过正则匹配,则该方法不适用。 以前,容器线程在 consumer.poll() 方法内循环,等待主题出现,同时记录许多消息。 除了日志,没有迹象表明存在问题。 若要还原以前的行为,可以将属性设置为false。 <br > ##### **使用ConcurrentMessageListenerContainer** 构造函数如下: ~~~ public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) ~~~ 它具有 concurrency 属性,例如 container.setConcurrency(3) 将创建3个 KafkaMessageListenerContainer。 Kafka 将使用其消费组管理功能在消费者之间分配分区。 > 注意:在监听多个主题时,默认的分区分布可能不是您期望的。 例如,如果您有3个主题,每个主题有5个分区,而您想使用 concurrency = 15,则只会看到5个活动使用者,每个消费者都为每个主题分配了一个分区,而其他10个消费者处于空闲状态。 这是因为默认的 Kafka PartitionAssignor 是 RangeAssignor(请参阅其 javadocs)。 对于这种情况,您可能需要考虑使用 RoundRobinAssignor,它将在所有使用者之间分配分区。 然后,将为每个消费者分配一个主题/分区。 要更改 PartitionAssignor,请在提供给 DefaultKafkaConsumerFactory 的属性中设置partition.assignment.strategy 使用者属性(ConsumerConfigs.PARTITION\_ASSIGNMENT\_STRATEGY\_CONFIG)。 使用 Spring Boot 时: ``` spring.kafka.consumer.properties.partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor ``` 对于第二个构造函数,ConcurrentMessageListenerContainer 在委托 KafkaMessageListenerContainer 中分配TopicPartition。 例如,如果提供了6个 TopicPartition,并且并发为3,则为0。 每个容器将获得2个分区。 对于5个 TopicPartition,两个容器将获得2个分区,第三个容器将得到1。如果并发大于 TopicPartitions 的数量,则并发性将向下调整,以便每个容器将获得一个分区。 注意:client.id属性(如果已设置)将附加-n,其中n是根据并发性使用的消费者实例。 启用JMX时,必须为MBean提供唯一的名称。 从1.3版开始,MessageListenerContainer提供了对基础KafkaConsumer指标的访问。 对于ConcurrentMessageListenerContainer而言,metrics()方法将返回所有目标KafkaMessageListenerContainer实例的度量。 度量标准分为Map 。 <br > #### **@KafkaListener 注解** ***** @KafkaListener 注解为简单的 POJO 侦听器提供了一种机制: ~~~ public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } } ~~~ 此机制需要在 @Configuration 类之一上使用 @EnableKafka 批注,以及用于配置基础 ConcurrentMessageListenerContainer 的侦听器容器工厂:默认情况下,应使用名称为 kafkaListenerContainerFactory 的 bean。 ~~~ @Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } } ~~~ 请注意,要设置 container 属性,必须在工厂上使用 getContainerProperties() 方法。 从 2.1.1 版本开始,现在可以为创建注解的消费者设置 client.id 属性。 clientIdPrefix 带有 -n 后缀,其中 n 是表示使用并发时的容器号的整数。 您还可以为 POJO 侦听器配置明确的主题和分区(以及可选的初始偏移量): ~~~ @KafkaListener(id = "bar", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... } ~~~ 可以在 partitions 或 partitionOffsets 属性中指定分区,但不能在两个属性中同时指定分区。 当使用手动 AckMode 时,还可以向侦听器提供该 Acknowledgment。 此示例还显示了如何使用其他容器工厂。 ~~~ @KafkaListener(id = "baz", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); } ~~~ 最后,有关消息的元数据可从消息头获得,以下头名称可用于检索消息的头: * KafkaHeaders.RECEIVED\_MESSAGE\_KEY * KafkaHeaders.RECEIVED\_TOPIC * KafkaHeaders.RECEIVED\_PARTITION\_ID * KafkaHeaders.RECEIVED\_TIMESTAMP * KafkaHeaders.TIMESTAMP\_TYPE ~~~ @KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts ) { ... } ~~~ 从版本1.1开始,可以将 @KafkaListener 方法配置为处理整批消费者记录。 要将侦听器容器工厂配置为创建批处理侦听器,请设置 batchListener 属性: ~~~ @Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); return factory; } ~~~ 简单的批处理方式: ~~~ @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list) { ... } ~~~ 通过配置主题,分区,偏移量等控制批处理的方式: ~~~ @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { ... } ~~~ 通过 Message 对象控制批处理的方式: ~~~ @KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory") public void listen14(List<Message<?>> list) { ... } @KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory") public void listen15(List<Message<?>> list, Acknowledgment ack) { ... } ~~~ 在这种情况下,不会对有效负载执行任何转换。 如果为 BatchMessagingMessageConverter 配置了 RecordMessageConverter,则还可以将通用类型添加到 Message 参数中,然后将转换有效负载。 您还可以收到 ConsumerRecord 对象的列表,但是它必须是在方法上定义的唯一参数(使用手动提交时,除了可选的Acknowledgment)。 ~~~ @KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list) { ... } @KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) { ... } ~~~ 从 2.0 版开始,id 属性(如果存在)将用作 Kafka group.id 属性,并覆盖 Consumer Factory 中的已配置属性(如果存在)。 您还可以显式设置 groupId 或将 idIsGroup 设置为 false,以恢复使用使用者工厂 group.id 的先前行为。 您可以在注解属性中使用属性占位符或 SpEL 表达式,例如... ~~~ @KafkaListener(topics = "${some.property}") @KafkaListener(topics = "#{someBean.someProperty}", groupId = "#{someBean.someProperty}.group") ~~~ 从版本2.1.2开始,SpEL 表达式支持特殊的令牌 \_\_listener,这是一个伪 bean 名称,表示此注解所在的当前 bean 实例。 例如,给定: ~~~ @Bean public Listener listener1() { return new Listener("topic1"); } @Bean public Listener listener2() { return new Listener("topic2"); } ~~~ 我们就可以使用: ~~~ public class Listener { private final String topic; public Listener(String topic) { this.topic = topic; } @KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.topic}.group") public void listen(...) { ... } public String getTopic() { return this.topic; } } ~~~ 如果在不太可能的情况下有一个名为 \_\_listener 的实际 bean,则可以使用 beanRef 属性更改表达式令牌... ~~~ @KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group") ~~~ <br > #### **Rebalance Listeners** ***** ContainerProperty 具有一个 ConsumerRebalanceListener 属性,该属性采用 Kafka 客户端的ConsumerRebalanceListener 接口的实现。 如果未提供此属性,则容器将配置一个简单的日志侦听器,该日志侦听器在 INFO 级别下记录重新平衡事件。 该框架还添加了一个子接口 ConsumerAwareRebalanceListener: ~~~ public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener { void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); } ~~~ 请注意,撤销分区时有两个回调:第一个立即调用;第二个在提交任何未决的偏移量后调用。 如果您希望在某些外部存储库中保持偏移量,则这很有用。 例如: ~~~ containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() { @Override public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { // acknowledge any pending Acknowledgments (if using manual acks) } @Override public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { // ... store(consumer.position(partition)); // ... } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // ... consumer.seek(partition, offsetTracker.getOffset() + 1); // ... } }); ~~~ <br > #### **过滤消息** ***** 在某些情况下,例如重平衡,可能会重新传递已处理的消息。框架无法知道是否已处理此类消息,即应用程序级功能。这被称为幂等接收器模式,Spring Integration 提供了其实现。 Spring Kafka 还通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装您的 MessageListener。此类采用 RecordFilterStrategy 的实现,在该实现中,您将实现 filter 方法以发出消息重复消息并应将其丢弃的信号。它具有一个附加属性 ackDiscarded,该属性指示适配器是否应确认丢弃的记录;否则,它不可用。默认情况下为 false。 使用 @KafkaListener 时,请在容器工厂上设置 RecordFilterStrategy(以及可选的 ackDiscarded),并且侦听器将包装在适当的过滤适配器中。 此外,为使用批处理消息侦听器提供了 FilteringBatchMessageListenerAdapter。 <br > #### **有状态重试** ***** 重要的是要了解,上面讨论的重试会挂起使用者线程(如果使用 BackOffPolicy);重试期间没有调用Consumer.poll()。Kafka 有两个属性来判断消费者是否存活。 session.timeout.ms 用于确定使用者是否处于活动状态。由于 0.10.1.0 版本后的心跳测试是在后台线程上发送的,因此缓慢的消费者不再会对此产生影响。 max.poll.interval.ms(默认为5分钟)用于确定使用者是否似乎被挂起(花费太长时间来处理上次轮询中的记录)。如果 poll() 之间的时间超过此配置,则代理将撤销分配的分区并执行重平衡。对于冗长的重试序列,这很容易发生。 从版本 2.1.3 开始,可以通过将有状态重试与 SeekToCurrentErrorHandler 结合使用来避免此问题。在这种情况下,每次传递尝试都将异常抛出回容器,并且错误处理程序将重新寻找未处理的偏移量,并且下一个poll() 将传递相同的消息。这样可以避免超出 max.poll.interval.ms 属性的问题(只要两次尝试之间的单个延迟不超过该时间)。因此,在使用 ExponentialBackOffPolicy 时,请务必确保 maxInterval 小于 max.poll.interval.ms 属性。要启用有状态重试,请使用 RetryingMessageListenerAdapter 构造函数,该构造函数接受有状态布尔参数(将其设置为true)。使用侦听器容器工厂进行配置(对于@KafkaListener)时,请将工厂的 statefulRetry 属性设置为 true。 <br >