多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# Consumer 管理 TCP ## 何时创建 TCP? * Consumer 的主要程序入口:`KafkaConsumer` 类 * 和 Producer 不同,构建 KafkaConsumer 实例时不会创建 TCP 连接 * TCP 连接是在调用 KafkaConsumer.poll 方法时创建的 * 更细的,poll 方法内部有 3 个时机创建 TCP * 发起 FindCoordinator 请求时 * Consumer 启动调用 poll 时,向 Kafka 集群发送 FindCoordinator 请求,希望获得哪个 Broker 是它的协调者 * Consumer 可以发送 FindCoordinator 到集群中任意服务器 * 实际请求会向负载最小的那台 Broker 发送请求 * 负载评估:看消费者连接的所有 Broker 中谁的待发送请求最少。(i.e. 站在 Cosumer 端而非全局) * 连接协调者时 * Broker 处理完 FindCoordinator 后,返回 Response,告诉哪个 Broker 是协调者 * Consumer 因此创建到该 Broker 的 Socket 连接 * 消费数据时 * Consumer 会为每个要消费的分区创建与该分区 Leader Replica 所在的 Broker 连接的 TCP ## 创建多少个 TCP? * 很大的一个 ID 由来:由 Integer.MAX_VALUE 减去协调者所在 Broker 的真实 ID 计算得来。 * ID = -1:Consumer 首次启动对 Kafka 集群一无所知时的 ID * Consumer 创建 3 类 TCP 连接 * 确定协调者和获取集群元数据 * 连接协调者,令其执行组成员管理操作 * 执行实际的消息获取 * 注:第三类 TCP 创建后,会弃用第一类连接 # 何时关闭 TCP? * 用户主动关闭 * 调用 KafkaConsumer.close() * 执行 Kill -2 / Kill -9 * Kafka 自动关闭 * 由 Consumer 端参数 connection.max.idle.ms 控制 * 默认 9分钟 ## 可能的问题 * 对于使用假的 -1 ID,无法重用这个连接