企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# Producer 管理 TCP ## 为何采用 TCP? * 在开发客户端时,能够利用 TCP 本身的高级功能,e.g. 多路复用请求、同时轮询多个连接 * 多路复用请求(multiplexing request) * 指将两个或多个数据流合并到底层单一物理连接中的过程 * TCP 多路复用会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责自己的数据流 * 严谨说,TCP 并不能多路复用,它只是提供可靠的消息交付语义保证,e.g. 自动重传丢失报文 ## Kafka Producer 概览 * 开发一个 Producer 有四步 * 构造 Producer 对象所需的参数对象 * 创建 Kafka 对象实例 * 使用 KafkaProducer 的 send 方法发送消息 * 调用 KafkaProducer 的 close 方法关闭 Producer 并释放资源 ``` Java Properties props = new Properties (); props.put(“参数1”, “参数1的值”); props.put(“参数2”, “参数2的值”); …… try (Producer<String, String> producer = new KafkaProducer<>(props)) { producer.send(new ProducerRecord<String, String>(……), callback); …… } ``` ## Kafka Producer 何时创建 TCP? * 当 Producer 创建 KafkaProducer 实例时,会建立与 Broker 的 TCP 连接 * 更准确的:在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。 * Producer 会连接 bootstrap.servers 参数指定的所有 Broker * bootstrap.servers:指定了这个 Producer 启动时要连接的 Broker 地址 * 生产中通常指定 3-4 台即可 * 因为 Producer 连接到一台 Broker 就能拿到集群的 Broker 信息,不需要指定所有 Broker * TCP 连接还可能在两个地方创建: * 更新元数据后 * 当集群有新的 Broker 时,需要额外新创建 * 消息发送时 ## 何时关闭 TCP? * Producer 端关闭 TCP 的两种方式 * 用户主动关闭 * Kafka 自动关闭 * 参考 Producer 端参数:connections.max.idle.ms * 该参数默认值为 9 min。如果 9 分钟内没有任何请求流过某个 TCP 连接,则会被关闭 * Kafka 自动关闭实在 Broker 端,但发起方是客户端 * TCP 看来这种关闭属于被动关闭,即 passive close * 被动关闭的后果是会产生大量的 CLOSE_WAIT 连接 * 如果 Producer 端 connections.max.idle.ms = -1,则 KafkaProducer 建立的 TCP 连接会成为僵尸连接