企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
* Partition * 一个Topic包含多个分区,默认按Key Hash分区 * 每个Partition对应一个文件夹`<topic_name>-<partition_id>` * 每个Partition被视为一个有序的日志文件(LogSegment) * <mark>Replication策略是基于Partition,而不是Topic</mark> * <mark>每个Partition都有一个Leader,0或多个Followers</mark> 一个 Topic 包含多个 Partition,Topic 是逻辑概念,而 Partition 则是物理上的概念。Topic 与 Partition 的关系如下图所示。 :-: ![](https://img.kancloud.cn/b0/84/b084ddded4d319c3fbdf144168798017_1043x375.png) Topic 与 Partition 的关系 <br/> Partition 将 Topic 进行分割,从而更好地将数据均匀分布在 Kafka 集群的每一个 Broker 中。在 server.properties 配置文件中可以指定一个全局的分区数设置,这是对每个主题下的分区数的默认设置,默认为 1 (num.partitions=1)。当然每个主题也可以自己设置分区数量,如果创建主题的时候没有指定分区数量,则会使用 server.properties 中的设置。例如: ```shell [root@hadoop101 /]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1 ``` <br/> Kafka 通过一些算法尽可能将 Partition 分配到不同的 Broker 上,这样就带来两个问题: **1.生产数据时,生产者数据发往哪个分区?** (1)如果给定了分区号,直接将数据发往指定的分区。 (2)如果没有给定分区号,视消息的 key 值,通过 key 值取 hashcode 进行分区。 (3)如果即没有给定分区号,也没有 key 值,则直接轮询分区。 <br/> 最后,还可以指定自定义分区类。 (1)当分区按每条消息来到顺序轮询依次写入各分区时:kafka 会把收到的message 进行 load balance,均匀的分布在这个 topic 下的不同的 partition 上 ( `hash(message) % [broker 数量]` )。 <br/> (2)每个 partition 在内存中对应一个 index,记录每个 segment 中的第一条消息偏移。 segment file 组成:由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀`.index`和`.log`分别表示为 segment 索引文件、数据文件。segment 文件命名规则:partition 全局的第一个 segment 从 0 开始,后续每个segment 文件名为上一个全局 partition 的最大 offset(偏移 message 数)。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。 <br/> (3)一个 partition 只能被一个消费者消费(但一个消费者可以同时消费多partition),因此,如果设置的 partition 的数量小于 consumer 的数量,就会有消费者消费不到数据。所以,推荐 partition 的数量一定要大于同时运行的consumer 的数量。 <br/> **2.消费数据时,消费者消费哪个分区的数据?如下图所示。** :-: ![](https://img.kancloud.cn/c9/84/c98497082a6d76bd8ed0dd8716d57d88_1033x319.png) 消费者与分区 记住,同一时刻,<mark>一个分区只能被消费组中的一个消费者消费</mark>。如果消费者组中的消费者大于等于分区数量,则有一些消费者是多余的。但如果消费组中的消费者小于分区数量,则一个消费者将负责多个分区的消费,此时由 Consumer的`partition.assignment.strategy` 配置参数决定每个消费者可以消费哪些分区,可选策略包括:range(默认)、roundrobin。 <br/> <mark>(1)range 策略</mark> range 策略对应的实现类是 org.apache.kafka.clients.consumer.RangeAssignor。 具体规则是分区顺序排序,消费者按照字母排序。partition 的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。 ``` 例如:假设有 3 个消费者 11 个分区。 C1-0 将消费 0, 1, 2, 3 分区。 C1-2 将消费 4, 5, 6, 7 分区 。 C1-3 将消费 8, 9, 10 分区。 ``` <br/> <mark>(2)roundrobin 策略</mark> roundronbin 分配策略的具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor。 ```java props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor"); ``` 分区按照 hashcode 排序,消费者按照字母排序。 ``` 例如:假设有 3 个消费者 11 个分区。 C1-0 将消费 0, 3, 6, 9 分区 。 C1-2 将消费 1, 4, 7, 10 分区 。 C1-3 将消费 2, 5, 8 分区。 ```