ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] # 拦截器原理 producer拦截器(interceptor)是在kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑. 对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如**修改消息**等. 同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain). Interceptor的实现接口是`org.apache.kafka.clients.producerInterceptor`,其定义的方法包括: 1. configure(configs) 获取配置信息和初始化数据时调用 2. onSend(ProducerRecord) 该方法封装进`KafkaProducer.send`方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。**用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区**,否则会影响目标分区的计算 3. onAcknowledgement(RecordMetadata, Exception) **该方法会在消息被应答或消息发送失败时调用**,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率 4. close **关闭interceptor,主要用于执行一些资源清理工作** 如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外**倘若指定了多个interceptor,则producer将按照指定顺序调用它们**,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。 # 例子 需求: 实现一个简单的双interceptor组成的拦截链.第一个interceptor会在消息发送前将时间戳信息加到value的最前部位.第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数. ![](https://box.kancloud.cn/e9eebc0cb26c1a680e953d65cf416820_1193x562.png) **TimeInterceptor** ~~~ import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String, String> { //消息会传到这,还没有进入kafka集群 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { //一般不改变原来的主题和分区,我们给他的value加上时间 return new ProducerRecord<String, String>(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), System.currentTimeMillis() + producerRecord.value()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } } ~~~ **CounterInterceptor** ~~~ import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterInterceptor implements ProducerInterceptor<String, String> { //成功个数统计 private long successCount = 0; //失败个数统计 private long errorCount = 0; //消息不做改变 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { return producerRecord; } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (e == null) { successCount++; } else { errorCount++; } } @Override public void close() { System.out.println("成功的个数" + successCount); System.out.println("失败的个数" + errorCount); } @Override public void configure(Map<String, ?> map) { } } ~~~ 添加到kafka的生产者中 ![](https://box.kancloud.cn/5477050eaae124b0643e66b8d164235d_1618x764.png)