企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 多线程 Consumer Instance ## Kafka Java Consumer 设计原理 * Kafka Java Consumer 是单线程设计 * 从 Kafka V0.10.1.0,KafkaConsumer 是双线程:用户主线程 & 心跳线程 * 用户主线程 * 启动 Consumer 应用 main 方法的线程 * 心跳线程 * 只负责定期给对应的 Broker 发送心跳,标示 Consumer 的存活性(liveness) * 新版本设计:单线程 + 轮询机制: * 实现非阻塞式的消息获取 ## 多线程方案 * KafkaConsumer 类不是 thread-safe * 所有的网络 IO 处理都是发生在用户主线程中 * 不能在多个线程中共享同一个 KafkaConsumer 实例 * 可以使用 `KafkaConsumer.wakeup()` 在其他线程中唤醒 Consumer 基于非 thread-safe,两套多线程方案 * 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer Instance,负责完整的消息获取、消息处理流程 * 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑 * 处理消息交由特定的线程池来做 * 将消息获取与处理解耦 ![](https://img.kancloud.cn/40/70/4070c15055bf275c44cb7b470fb1f850_696x326.jpeg) ## Code ### 方案 1 ``` public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // 执行消息处理逻辑 } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } ``` ### 方案 2 ``` private final KafkaConsumer<String, String> consumer; private ExecutorService executors; ... private int workerNum = ...; executors = new ThreadPoolExecutor( workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); ... while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (final ConsumerRecord record : records) { executors.submit(new Worker(record)); } } .. ```