4 主题和分区
4 主题和分区
概述
主题和分区是 Kafka 的两个核心概念。主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。分区的划分不仅为 Kafka 提高了可伸缩性、水平扩展的功能,还通过多副本机制来为 Kafka 提高数据冗余来提高数据可靠性。
从 Kafka 的底层实现来说,主题和分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。不过对于使用 Kafka 进行消息收发的普通用户而言,了解到分区这一层面足以应对大部分的使用场景。
主题的管理
主题的管理包括创建主题、查看主题信息、修改主题和删除主题等操作。可以通过 Kafka 提供的 kafka-topics.sh
脚本来执行这些操作,这个脚本位于 $KAFKA_HOME/bin/
目录下,其核心代码仅有一行,具体如下:
$ exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
可以看到其实质上是调用了 kafka.admin.TopicCommand
类来执行主题管理的操作。
主题的管理并非只有使用 kafka-topics.sh
脚本这一种方式,我们还可以通过 KafkaAdminClient 的方式实现(这种方式实质上是通过发送 CreateTopicsRequest、DeleteTopicsRequest)等请求来实现的,对于 XXXRequest 系列的细节会在后续小节后详细介绍。甚至我们还可以通过直接操纵日志文件和 Zookeeper 节点来实现。下面安装创建主题、查看主题信息、修改主题、删除主题的顺序来介绍其中的细节。
创建主题
如果 broker 端配置参数 auto.create.topics.enable
设置为 true(默认值就是 true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为 num.partitions
(默认值为 1)、副本银子为 default.replication.factor
(默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会和上面一样创建一个相应的主题。很多时候,这种自动创建主题的行为都是非预期的。除非有特殊应用需求,否则不建议将 auto.create.topics.enable
参数设置为 true,这个参数会增加主题的管理与维护的难度。
更加推荐也更加通用的方式是通过 kafka-topics.sh
脚本来创建主题。示例如下:
$ kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-create --partitions 1 --replication-factor 1
$ /opt/kafka/kafkalogs
$ ls
... topic-create-0 ...
上面的实例中创建了一个分区数为 1、副本银子为 1 的主题。在执行完脚本之后,Kafka 会在 log.dir 或 log.dirs 参数所配置的目录下创建相应的主题分区,默认情况下这个目录为 /tmp/kafka-logs
。如上,可以看到 节点创建了 1 个文件夹 topic-create-0
。命名方式可以概括为 <topic>-<partition>
。严谨地说,其实 <topic>-<partition>
这类文件夹对应的不是分区,分区同主题一样是一个逻辑的概念而没有物理上的存在。我们这里只看到了一个分区,我们可以创建多个分区,分区会按策略分配到各个节点中。
主题、分区、副本和 Log 的关系如下图所示。主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是 Log 层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 中,这样才能提供有效的数据冗余。 当创建一个主题时会在 Zookeeper 的 /brokers/topics/ 目录下创建一个同名的实节点。该节点中记录了该主题的分区副本分配方案。示例如下:
$ sh zkCli.sh
$ ls /brokers/topics
$ get /brokers/topics/topic-create
..."partitions":{"0":[1]...
其中 "partitions":{"0":[1]
表示分区 0 分配了一个副本,在 brokerId 为 1 的 broker 节点中。
kafka-topics.sh
脚本中的 zookeeper、partitions、replication-factor 和 topic 这 4 个参数分别代表 Zookeeper 连接地址、分区数、副本因子和主题名称。另一个 create 参数表示的是创建主题的指令类型,在 kafka-topics.sh 脚本中对应的还有 list、describe、alter 和 delete 这 4 个同级别的指令类型,每个类型所需要的也不尽相同。
$ kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-create
Topic:topic-create PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topic-create Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic
和 Partition
分别表示主题名称和分区号。PartitionCount
表示主题中分区的个数,ReplicationFactor
表示副本因子,而 configs
表示创建或修改主题时指定的参数配置。Leader
表示分区的 leader
副本所对应的 brokerId
,Isr
表示分区的 ISR
集合,Replicas
表示分区的所有的副本分配情况,即 AR
集合,其中的数字都表示等待是 brokerId
。 kafka-topics.sh
脚本中还提供了一个 replica-assignment
参数来手动指定分区副本的分配方案。replica-assignment
参数的用法归纳如下:-- replica-assignment <String: broker_id_for_part1_replica1: broker_id_for_part1_replica2, broker_id_for_part2_replica1:broker_id_for_part2_replica2, ...>
。 这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区和分区之间用逗号“,”隔开,分区内多个副本用冒号“:”隔开。并且在使用 replica-assignment
参数创建主题时不需要原本必备的 partitions
和 replication-factor
这两个参数。
示例如下:
$ kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-create-same --replica-assignment 2:0,0:1,1:2,2:1
注意同一个分区内的副本不能有重复,比如指定了 0:0,1:1 这种,就会报出 AdminCommand-FailedException 异常。配置不合理也会有其他异常,就不一一举例了。
在创建主题时我们还可以通过 config 参数来设置所要创建主题的相关参数,通过这个参数可以覆盖原本的默认配置。在创建主题时可以同时设置多个参数,具体的用法归纳如下:--config <String:name1=value1> --config <String:name2=value2>
,创建好之后,我们还可以通过 Zookeeper 客户端查看所设置的参数。对应的 Zookeeper 节点为 /config/topics/[topic],示例如下:get /config/topics/topic-config
。
创建主题时对于主题名称的命名方式也很有讲究。首先是不能与已经存在的主题同名,否则会报错。kafka-topics.sh
脚本中还提供了一个 if-not-exists 参数,如果在创建主题时带上了这个参数,那么在发生命名冲突时将不做任何处理(既不创建主题,也不报错)。 Kafka-topics.sh 脚本在创建主题时还会检测是否包含“.”或“”字符。因为在 Kafka 的内部做埋点时会根据主题的名称来命名 metrics 的名称,并且会将“.”改为下划线“”。假设遇到一个名称为“topic.1_2”的主题,还有一个名称为“topic_1_2”的主题,那么最后的 metrics 的名称都会为“topic_1_2”,这样就发生了名称冲突。举个例子如下:
$ kafka-topics.sh --zookeeper localhost:2181 --create --topic topic.1_2 --partitions 1 --replication-factor 1
$ kafka-topics.sh --zookeeper localhost:2181 --create --topic topic_1_2 --partitions 1 --replication-factor 1
TIP
主题的命名同样不推荐使用双下划线“__”开头,因为以双下划线开头的主题一般都看作 Kafka 的内部主题,比如 __consumer_offsets 和 __transaction_state。主题的名称必须由大小写字母、数字、点号“.”、连接线“-”、下划线“_”组成,不能为空,不能只有点号“.”,也不能只有双点号“..”,且长度不能超过 249。
Kafka 从 0.10.x 版本开始支持指定 broker 的机架信息(机架的名称)。如果指定了机架信息,则在分区副本分配时会尽可能地让分区副本分配到不同的机架上。指定机架信息是通过 broker 端参数 broker.rack 来配置的,比如配置当前 broker 所在的机架为 “RACK1”:broker.rack=RACK1
。 如果一个集群中有部分 broker 指定了机架信息,并且其余的 broker 没有指定机架信息,那么在执行 kafka-topics.sh
脚本创建主题时会报错。此时若要成功创建主题,要么将集群中的所有 broker 都加上机架信息或都去掉机架信息,要么使用 disable-rack-aware
参数来忽略机架信息。如果集群中的所有 broker 都有机架信息,那么也可以使用 disable-rack-aware 参数来忽略机架信息对分区副本的分配影响。
kafka-topics 脚本实质上是调用了 kafka.admin.TopicCommand
类,通过向 TopicCommand 类中传入一些关键参数来实现主题的管理。我们也可以直接调用 TopicCommand 类中的 main() 函数来直接管理主题,比如这里创建一个分区数为 1、副本因子为 1 的主题 topic-create-api
,示例代码见 com.kafka.stu.chapter04.topicmng.TopicCommandCreate
。这种方式也可以适用于对主题的删、改、查等操作的实现,只需修改对应的参数即可。不过更推荐 KafkaAdminClient
来代替这种方式。
分区副本的分配
在生产者和消费者中都有分区分配的概念。生产者的分区分配是指为每条消息指定其所要发往的分区,消费者中的分区分配是指为消费者指定其可以消费消息的分区。
本节要介绍的分区分配是指为集群制定创建主题时的分区副本分配方案,即在哪个 broker 中创建哪些分区的副本。
在创建主题时,如果使用了 replica-assignment
参数,那么就按照指定的方案来进行分区副本的创建;如果没有使用 replica-assignment
参数,那么就需要按照内部的逻辑来计算分配方案了。使用 kafka-topics.sh
脚本创建主题时的内部分配逻辑按照机架信息划分两种策略:未指定机架信息和指定机架信息。如果集群中所有的 broker
节点都没有配置 broker.rack
参数,或者使用 disable-rack-aware
参数来创建主题,那么采用的就是未指定机架信息的分配策略,否则采用的就是指定机架信息的分配策略。
具体分析有兴趣的可以查看源码 Kafka分区副本分配解析。
查看主题
使用 list
指令可以查看当前所有可用的主题,示例如下:$ kafka-topics.sh --zookeeper localhost:2181 -list
。另外 describe 指令如果不适用 --topic
指定主题,则会展示出所有主题的详细信息,$ kafka-topics.sh --zookeeper localhost:2181 -describe
。 在使用 describe 指定查看主题时还可以额外指定 topics-with-overrides、under-replicated-partitions 和 unavailable-partitions
这三个参数来增加一些附加功能。
在使用 topics-with-overrides 参数可以找出所有包含覆盖配置的主题,它只会列出包含了与集群不一样配置的主题。注意使用 topics-with-overrides
参数时只显示原本只是用 describe 指令的第一行信息,参考示例如下: $ kafka-topics.sh --zookeeper localhost:2181 -describe --topics-with-overrides --topic helloworld
。 under-replicated-partitions 和 unavailable-partitions 参数都可以找出有问题的分区。通过 under-replicated-partitions 参数可以找出所有包含失效副本的分区。包含失效副本的分区可能正在进行同步操作,也有可能同步发生异常,此时分区的 ISR 集合小于 AR 集合。对于通过该参数查询到的分区要重点监控,因为这很可能意味着集群中的某个 broker 已经失效或同步效率降低等。
修改主题
当一个主题被创建之后,依然允许我们对其做一定的修改,比如修改分区个数、修改配置等。这个修改的功能就是由 kafka-topics.sh
脚本中的 alter
指令提供的。
增加主题分区数命令如:kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-config --partitions 3
。当主题中的消息包含 key 时(即 key 不为 null),根据 key 计算分区的行为就会受到影响。当 topic-config
的分区数为 1 时,不管消息的 key 为何值,消息都会发往这一个分区;当分区数 增加到 3 时,就会根据消息的 key 来计算分区号,原本发往分区 0 的消息现在有可能会发往分区 1 或分区 2。如此还会影响既定消息的顺序,所以在增加分区数时一定要三思而后行。对于基于 key 计算的主题而言,建议在一开始就设置号分区数量,避免以后对其进行调整。
目前 Kafka 只支持增加分区数而不减少分区数。减少分区数时会报出 InvalidPartitionException 的异常。
TIP
为什么不支持减少分区?
在创建主题时有一个 if-not-exists 参数来忽略一些异常,在这里也有对应的参数,如果所要修改的主题不存在,可以通过 if-exists
参数来忽略异常。除了修改分区数,我们还可以使用 kafka-topics.sh
脚本的 alter
指令来变更主题的配置。在创建主题的时候我们可以通过 config 参数来设置所要创建主题的相关参数,通过这个参数可以覆盖原本的默认配置。在创建完主题之后,我们还可以通过 alter 指令配合 config 参数增加或修改一些配置以覆盖它们配置原有的值。示例如下:$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-config --config segment.bytes=1048577
。
我们可以通过 delete-config
参数来删除之前覆盖的配置,使其恢复原有的默认值,下面的示例将主题 topic-config
中所有修改过的 3 个配置都删除:$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-confg --delete-config segment.bytes
。
注意到在变更(增、删、改)配置的操作执行之后都会提示一段告警信息,指明了使用 kafka-topics.sh
脚本的 alter 指令来变更主题配置的功能已经过时,将在未来版本中删除,并且推荐使用 kafka-configs.sh
脚本来实现相关功能。
配置管理
kafka-configs.sh
脚本是专门用来对配置进行操作的,这里的操作是指在运行状态下修改原有的配置,如此可以达到动态变更的目的。kafka-configs.sh
脚本包含变更配置 alter
和查看配置 describe
这两种指令类型。同使用 kafka-topics.sh
脚本变更配置的原则一样,增、删、改的行为都可以看作变更操作,不过 kafka-configs.sh
脚本不仅可以支持操作主题相关的配置,还可以支持操作 broker
、用户和客户端这 3 个类型的配置。
kafka-configs.sh
脚本使用 entity-type
参数来指定操作配置的类型,并且使用 entity-name
参数来指定操作配置的名称。比如查看主题 topic-config
的配置可以按如下方式执行:$ kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name topic-config
。
--describe 指定了查看配置的指令动作,--entity-type 指定了查看配置的实体类型,--entity-name 指定了查看配置的实体名称。entity-type 只可以配置 4 个值:topics、brokers、clients、和 users,entity-type 与 entity-name 的对应关系如下表所示:
entity-type 的释义 | entity-name 的释义 |
---|---|
主题类型的配置,取值为 topics | 指定主题的名称 |
broker 类型的配置,取值为 brokers | 指定 brokerId 值,即 broker 中 broker.id 参数配置的值 |
客户端类型的配置,取值为 clients | 指定 clientId 值,即 KafkaProducer 或 KafkaConsumer 的 client.id 参数配置的值 |
用户类型的配置,取值为 users | 指定用户名 |
使用 alter
指令变更配置时,需要配合 add-config
和 delete-config
这两个参数一起使用。add-config
参数用来实现配置的增、改、即覆盖原有的配置;delete-config
参数用来实现配置的删,即删除被覆盖的配置以恢复默认值。
下面的示例演示了 add-config
参数的用法,覆盖了主题 topic-config
的两个配置 clean.policy
和 max.message.bytes
(示例执行之前主题 topic-config 无任何被覆盖的配置):kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name topic-config --add-config cleanup.policy=compact,max.message.bytes=10000
。
使用 delete-config
参数配置时,同 add-config
参数一样支持多个配置的操作,多个配置之间用逗号“,”分隔,如下的配置是删除刚刚添加的配置:kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name topic-config --delete-config cleanup.policy,max.message.bytes
。
使用 kafka-configs.sh
脚本来变更配置时,会在 Zookeeper 中创建一个命名形式为 /config/<entity-type>/<entity-name>
的节点,并将变更的配置写入这个节点,比如对于主题 topic-config
而言,对应的节点名称为 /config/topics/topic-config
。可以使用 Zookeeper 命令 get /config/topics/topic-config
。
变更配置时还会在 Zookeeper 中的 /config/changes/
节点下创建一个以“config_change_”为前缀的持久顺序节点(PERSISTENT_SEQUENTIAL),节点命名形式可以归纳为 /config/changes/config_change_<seqNo>
。比如示例中的主题 topic-config
与此对应的节点名称和节点内容查看命令如下:get /config/changes/config_change_0000000010
,seqNo 是一个单调递增的 10 位数字的字符串,不足位则用 0 补齐。
查看(describe)配置时,就是从 /config/<entity-type>/<entity-name>
节点中获取相应的数据内容。如果使用 kafka-configs.sh
脚本查看信息时没有指定 entity-name
参数的值,则会查看 entity-type
所对应的所有配置信息。示例如下:kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics
。
主题端参数
与主题相关的所有配置参数在 broker
层面都有对应参数,比如主题端参数 cleanup.policy
对应的 broker
层面的 log.cleanup.policy
。如果没有修改过主题的任何配置参数,那么就会使用 broker
端的对应参数作为其默认值。可以在创建主题时覆盖相应参数的默认值,也可以在创建完主题之后变更相应参数的默认值。比如在创建主题的时候没有指定 cleanup.policy
参数的值,那么就使用 log.cleanup.policy
参数所配置的值作为 cleanup.policy
的值。
下图展示了主题端参数与 broker
端参数的对照关系。
删除主题
如果确定不再使用一个主题,那么最好的方式是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。kafka-topics.sh
脚本中的 delete 指令就可以用来删除主题,比如删除一个主题 topic-delete:
$ kafka-topic.sh --zookeeper localhost:2181 --delete --topic topic-delete
执行完删除命令之后会有相关的提示信息,这个提示信息和 broker 端配置参数 delete.topic.delete
有关。必须配置为 true 才能够删除主题,默认为 true,如果配置为 false,那么会忽略删除主题的操作。
如果要删除的主题是 Kafka 的内部主题,那么删除时就会报错。尝试删除一个不存在的主题也会报错,不过可以使用 if-exists
参数来忽略异常。
删除主题的行为本质上只是在 Zookeeper 中的 /admin/delete_topics
路径下创建一个待删除主题同名的节点,以此标记该主题为待删除的状态。与创建主题相同的是,真正删除主题的动作也是由 Kafka 的控制器完成的。因此可以使用 zkCli.sh 来删除主题:$ create /admin/delete_topics/topic-delete
。
我们还可以通过手动的方式来删除主题。主题中的元数据存储在 Zookeeper 中的 /brokers/topics
和 /config/topics
路径下,主题中的消息数据存储在 log.dir
或 log.dirs
配置的路径下,我们只需要手动删除这些地方的内容即可,具体步骤略。
TIP
注意,删除主题是一个不可逆的操作。一旦删除之后,与其相关的所有消息数据会被全部删除。
kafka-topics.sh
脚本的使用在这里介绍完了,脚本的各个参数略。
初识 KafkaAdminClient
区别于 kafka-topics.sh
脚本,以 API 方式供程序调用,方便打造集管理、监控、运维、告警为一体的生态平台。
基本使用
略,其实和 kafka-topics.sh
类似,只是换了层皮肤。
主题合法性验证
一般情况下,Kafka 生产环境中的 auto.create.topics.enable
参数会被设置为 false,即自动创建主题这条路会被堵住。但是可以通过 KafkaAdminClient 创建主题,此时主题可能不符合运维规范,比如命名不规范,副本因子数太低等,这都会影响后期的系统运维。
我们可以做相应的规范处理。Kafka broker 端有一个这样的参数:create.topic.policy.class.name
,默认值为 null。它提供了一个入口用来验证主题创建的合法性。使用方式以及代码略。
分区的管理
主要介绍于分区相关的只是和操作,包括优先副本的选举、分区重分配、复制限流、修改副本因子等内容。
优先副本的选举
分区使用多副本机制来提升可靠性,但只有 leader 副本对外提供读写服务,而 follower 副本只负责在内部进行消息的同步。当一个分区的 leader 副本不可用,那么就意味着整个分区变得不可用,此时就需要重新选举一个新的 leader 副本来继续对外提供服务。
在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到 Kafka 集群的各个 broker 节点上,对应的 leader 副本的分配也比较均匀。
随着时间的更替,Kafka 集群的 broker 节点不可避免地会遇到沓机或崩溃的问题,当分区的 leader 节点发生故障时,其中一个 follower 节点就会成为新的 leader 节点,这样就会导致集群的负载不均衡,从而影响整体的健壮性和稳定性。当原来的 leader 节点恢复之后重新加入集群时,它只能成为一个新的 follower 节点而不再对外提供服务。
为了有效地治理负载失衡的情况,Kafka 引入了优先副本(preferred replica)的概念。所谓的有限副本是指在 AR 集合列表中的第一个副本。Kafka 要确保所有主题的优先副本在 Kafka 集群中均匀分布,这样就保证了所有分区的 leader 均衡分布。如果 leader 分布过于几种,就会造成集群负载不均衡。
所谓的优先副本的选举是指通过一定的方式促使优先副本选举为 leader 副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。需要注意的是,“分区平衡”并不意味着 Kafka 集群的负载均衡,因为还需要考虑集群中的分区分配是否均衡。更进一步,每个分区的 leader 副本的负载也不是相同的(如硬件原因等)。
在 Kafka 中可以提供分区自动平衡的功能,与此对应的 broker 端参数是 auto.leader.rebalance.enable
,此参数的默认值为 true,即默认开启。不过在生产环境中不建议将其设置为 true,因为这可能引起负面的性能问题,也有可能引起客户端一定时间的阻塞。因为其是定时执行的,执行时间无法掌控。
Kakfa 中 kafka-perferred-replica-election.sh
脚本提供了对分区 leader 副本进行重新平衡的功能。优先副本的选举过程是一个安全的过程,Kafka 客户端可以自动感知分区 leader 副本的变更。脚本执行后,主题的所有 leader 副本的分布会和刚创建的一样,所有的优先副本都成为 leader 副本。
kafka-perferred-replica-election.sh
脚本中还提供了 path-to-json-file
参数来小批量地对部分分区执行优先副本的选举操作。
在实际生产环境中,一般使用 path-to-json-file
参数来分批、手动地执行优先副本的选举操作。尤其是在应对大规模的 Kafka 集群时,理应杜绝采用非 path-to-json-file
参数的选举操作方式。同事,优先副本的选举操作也要注意避开业务高峰期,一面带来性能方面的负面影响。
分区重分配
当集群中的一个节点下线(沓机或有计划的下线),Kafka 并不会将这些失效的分区副本自动地迁移到集群中剩余的可用 broker 节点上;当集群中新增 broker 节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中。
为了解决上述问题,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。Kafka 提给了 kafka-reassign-partitions.sh
脚本来执行分区重分配的工作,它可以再集群扩容、broker 节点失效的场景下对分区进行迁移。脚本使用步骤参考如下,具体书中示例步骤略:
- 创建主题和一个 JSON 文件,文件内容为要进行分区重分配的主题清单。
- 根据 JSON 文件和指定所要分配的 broker 节点列表来生成一份候选的重分配方案。
- 执行具体的重分配动作。
- 验证查看分区中分配的进度(可选)。
除了让脚本自动生成候选方案,用户还可以自定义重分配方案,这样也就不需要执行第一步和第二步的操作了。
分区重分配的基本原理是先通过控制器为每个分区添加新副本(增加副本因子),新的副本将从分区的 leader 副本那里复制所有的数据。根据分区的大小不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新副本上的。在复制完成之后,控制器将旧副本从副本清单里溢出(恢复为原先的副本因子数)。注意在重分配的过程中要确保有足够的空间。
分区重分配对集群的性能有很大的影响,需要占用额外的资源,比如网络和磁盘。在实际操作中,我们将降低重分配的粒度,分成多个小批次来执行,以此来将负面的影响降到最低。
TIP
如果要将某个 broker 下线,那么在执行分区重分配动作之前最好先关闭或重启 broker。这样这个 broker 就不再是任何分区的 leader 节点了,它的分区就可以被分配给集群中的其他 broker。这样可以减少 broker 间的流量复制,以此提升重分配的性能,以及减少对集群的影响。
复制限流
如果集群中某个主题或某个分区的流量在某段时间内特别大,那么只靠减小粒度是不足以应对的,这时就需要有一个限流的机制,可以对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响。
副本间的复制限流有两种实现方式:kafka-config.sh
脚本和 kafka-reassign-partitions.sh
脚本。
- kafka-config.sh。主要以动态配置的方式来达到限流的目的,在
broker
级别有两个与复制限流相关的配置参数:follower.replication.throttled.rate
和leader.replication.throttled.rate
,前者用于设置follower
副本复制的速度,后者用于设置leader
副本传输的速度,他们的单位都是B/s
,通常情况下,两者的速度是相同的。
在主题级别也有两个相关的参数来限制复制的速度:leader.replication.throttled.replicas
和follower.replication.throttled.replicas
,它们分别用来配置被限制速度的主题所对应的leader
副本列表和follower
副本列表。
复制重分配限流示例略。 - kafka-reassign-partitions.sh。提供了限流的功能,只需一个 throttle 参数即可。
限流示例略。
修改副本因子
创建主题之后我们还可以修改分区的个数,同样可以修改副本因子(副本数)。
修改副本因子示例略。
如何选择合适的分区数
如何选择合适的分区数?这是很多 Kafka 的使用者经常面临的问题,不过对这个问题而言,似乎并没有非常权威的答案。而且这个问题显然也没有固定的答案,只能从某些角度来做具体的分析,最终还是要根据实际的业务场景、软件条件、硬件条件、负载情况等来做具体的考量。
性能测试工具
Kafka 本身提供的用于生产者性能测试的 kafka-producer-perf-test.sh
和用于消费者性能测试的 kafka-consumer-perf-test.sh
。
具体示例略。
分区数越多吞吐量就越高吗
分区是 Kafka 中最小的并行操作单元,对生产者而言,每一个分区的数据写入是完全可以并行化的;对消费者而言,Kafka 只允许单个分区中的消息被一个消费者线程消费,一个消息组的消费并行度完全依赖于所消费的分区数。如此看来,如果一个主题中的分区数越多,理论上所能达到的吞吐量就越大,那么真的是这样吗?
消息中间件的性能一般是指吞吐量(广义来说还包括延迟)。抛开硬件资源的影响,消息写入的吞吐量还会受到消息大小、消息压缩方式、消息发送方式(同步/异步)、消息确认类型(acks)、副本因子等参数的影响,消息消费的吞吐量还会受到应用逻辑处理速度的影响。
测试示例略,消息生产者的测试的结果是:随着分区数的增长,相应的吞吐量也跟着上涨。一旦分区数超过了某个阈值之后,整体的吞吐量是不升反降的。也就是说,并不是分区数越多吞吐量也越大。消费者性能测试总体趋势和消息生产者的类似。
TIP
为什么会有不升反降的临界值?
分区数的上限
一味地增加分区数并不能使吞吐量一直得到提升,并且分区数也并不能一直增加,如果超过默认的配置之值,还会引起 Kafka 进程的崩溃。崩溃的原因是文件描述符不足,可以使用命令 ulimit -Sn
命令查看。
如何避免这种异常情况?对于一个高并发、高性能的应用来说,1024 或 4096 的文件描述符未免太少,可以适当调大这个参数。比如使用 ulimit -n 65535
命令将上限提高到 65535,这样足以应对大多数的应用情况,再高也完全没有必要了。
考量因素
如何选择合适的分区数?一个“恰如七分”的答案就是视具体情况而定。
- 从吞吐量考虑合适的分区数。
- 创建主题之后,虽然我们还能够增加分区的个数,但基于 key 计算的主题需要严谨对待。对于与 key 高关联的应用,在创建主题时可以适当地多创建一些分区。,以满足为了的需求。
- 有些应用场景会要求主题中的消息都能保证顺序性,这种可以设定分区数为 1,通过分区有序性的这一特性来达到主题有序性的目的。
- 分区数的多少还会影响系统的可用性。比如当 broker 发生故障后,选举新的 leader 时,大量的分区需要同时进行 leader 角色切换。
- 分区数越多也会让 Kafka 的正常启动和关闭的耗时变得越长。同时,主题的分区数越多不仅会增加日志清理的耗时,而且在被删除时也会耗费更多的时间。对旧版的生产者和消费者的客户端而言,分区数越多,也会增加它们的开销,不过这一点在新版的生产者和消费者客户端中有效地得到了抑制。
参考文献
- [深入理解 Kafka 核心设计与实践原理]