Kafka分区管理大师指南:扩容、均衡、迁移与限流全解析

发布于:2025-02-15 ⋅ 阅读:(14) ⋅ 点赞:(0)

#作者:孙德新

分区分配操作(kafka-reassign-partitions.sh)

1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)

Kafka系统提供了一个分区重新分配工具(kafka-reassign-partitions.sh),该工具可用于在Broker之间迁移分区。理想情况下,将确保所有Broker的数据和分区均匀分配。分区重新分配工具无法自动分析Kafka群集中的数据分布并迁移分区以实现均匀的负载均衡。因此,管理员在操作的时候,必须弄清楚应该迁移哪些Topic或分区。
Kafka物理节点扩容方法:只要每个kafka配置文件的brokerid 需要全局唯一,不冲突,启动新kafka节点,即可自动加入原有kafka集群。但是原集群topic数据不会自动移动到新kafka节点上,需要手动迁移。新topic则可以自动分配到新节点上。

–topics-to-move-json-file 指定json文件,文件内容为topic配置 Json文件格式如下:

{
  "topics": [
    {"topic": "test_create_topic1"}
  ],
  "version": 1
}

–generate 尝试给出副本重分配的策略,该命令并不实际执行
–broker-list 指定想要分配的Broker节点,用于尝试给出分配策略,与–generate搭配使用 。–broker-list “0,1,2,3”
–broker-list举例:
./kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list “0,1,2,3” --generate

执行完毕之后会打印如下,需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它
Current partition replica assignment//当前副本分配方式

Current partition replica assignment//当前副本分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[2],"log_dirs":["any"]}]}

Proposed partition reassignment configuration//期望的重新分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}

–reassignment-json-file 指定要重分配的json文件,与–execute搭配使用

–execute 执行。开始执行重分配任务,与–reassignment-json-file搭配使用

–verify 验证任务是否执行成功,检查分区重新分配的状态。当有使用–throttle限流的话,该命令还会移除限流;该命令很重要,不移除限流对正常的副本之间同步会有影响。 该选项用于检查分区重新分配的状态,同时–throttle流量限制也会被移除掉; 否则可能会导致定期复制操作的流量也受到限制。

–throttle 迁移过程Broker之间现在流程传输的速率,单位 bytes/sec – throttle 500000 。迁移过程注意流量陡增对集群的影响。Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限,当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响

> sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute -- throttle 50000000

加上一个–throttle 50000000 参数, 那么执行移动分区的时候,会被限制流量在50000000 B/s,加上参数后可以看到如下

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上–replica-alter-log-dirs-throttle 这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流;

如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file

–replica-alter-log-dirs-throttle broker内部副本跨路径迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上。单位bytes/sec --replica-alter-log-dirs-throttle 100000

–disable-rack-aware 关闭机架感知能力,在分配的时候就不参考机架的信息

–bootstrap-server 如果是副本跨路径迁移必须有此参数

下面是topic数据均衡操作(迁移操作也是这个思路,只是最终kafka节点目标少):
背景:原有3台kafka集群:192.168.40.11–13,扩容一台kafka 192.168.40.14,把原有3台数据均衡到4台kafka上
首先在现有集群的基础上再添加⼀个Kafka节点,进行物理组件节点扩容,然后使⽤Kafka⾃带的kafka-reassign-partitions.sh ⼯具来重新分布分区,进行数据均衡。该⼯具有三种使⽤模式:
generate模式,给定需要重新分配的Topic,⾃动⽣成reassign plan(并不执⾏)。在此模式下,给定Topic列表和Broker列表,该工具会生成候选重新分配,以将指定Topic的所有分区迁移到新Broker中。此选项仅提供了一种方便的方法,可在给定Topic和目标Broker列表的情况下生成分区重新分配计划。
execute模式,根据指定的reassign plan重新分配Partition。在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。 (使用–reassignment-json-file选项)。由管理员手动制定自定义重新分配计划,也可以使用–generate选项提供。
verify模式,验证重新分配Partition是否成功。在此模式下,该工具将验证最后一次–execute期间列出的所有分区的重新分配状态。状态可以有成功、失败或正在进行等状态。

基本步骤如下:

  1. 查看集群中当前所有可用的topic,目标是名为three的topic进行数据均衡到1234,4个kafka节点上
[root@localhost bin]# ./kafka-topics.sh --list --bootstrap-server 192.168.40.11:9092
__consumer_offsets
first
second
three
topic_first
  1. 查看计划数据均衡的topic的详细信息
    列出 对应的topic名称的详细信息,包括topic的分区数量,副本及leader等。
./kafka-topics.sh --describe --zookeeper  zk-ip:port  --topic  topic名称  (命令模板)
[root@localhost bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.40.11:9092  --topic three
[2024-03-06 01:25:53,446] WARN [AdminClient clientId=adminclient-1] Connection to node 4 (/192.168.40.14:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: three	PartitionCount: 5	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: three	Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 2,3,1
	Topic: three	Partition: 1	Leader: 2	Replicas: 2,1,3	Isr: 2,3,1
	Topic: three	Partition: 2	Leader: 3	Replicas: 3,2,1	Isr: 2,3,1
	Topic: three	Partition: 3	Leader: 1	Replicas: 1,2,3	Isr: 2,3,1
	Topic: three	Partition: 4	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1

  1. 编制负载均衡topic主题清单的json文件:
    借助kafka-reassign-partitions.sh⼯具⽣成reassign 计划,不过我们先得按照要求定义⼀个json⽂件,文件名自定义。⾥⾯说明哪些topic需要重新分区,版本号固定为1,⽂件内容如下:
[root@node1 ~]# cat topics-to-move.json 
{ 	"topics": [ 		{ "topic":"three" } 	], 	"version":1}

或如下

[root@localhost ~]# cat topic-to-move.json 
{"topics": [{"topic": "three"}],
"version":1
}
  1. 生成副本均衡迁移计划:
    然后使⽤ kafka-reassign-partitions.sh⼯具利用–generate⽣成reassign计划
./kafka-reassign-partitions.sh --zookeeper 192.168.12.100:2181 --topics-to-move-json-file  ./plans/topic-to-move.json  --broker-list "1,2,3,4,5,6" --generate  (命令模板)

参数–broker-list “1,2,3,4” 表示期望均衡、迁移的本kafka集群全部节点(也可是部分节点,但节点个数不能小于副本数量)。执行后输出两段内容,其中黄色标识部分就是副本存储计划文件内容,⽣成的就是将分区重新分布到broker 上的结果

[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --topics-to-move-json-file  /root/topic-to-move.json  --broker-list  "1,2,3,4"  --generate
Current partition replica assignment  //目前的副本分配状态,这个需要保留,可以用它回滚!
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration    //期望计划的副本分配状态
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,3,4],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}

注:
上述黄色输出内容说明解释,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],是计划将"topic":"three","partition":0分区分配到broker2、4、1上,后面内容也同样
  1. 编制存储计划json文件:
    上面最后一段(黄色标识)就是副本存储计划文件内容,⽣成的就是将分区重新分布到broker 上的结果,再创建一个存储计划json文件,文件名自定义
[root@localhost bin]#  vim  increase-replication-factor.json
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,3,4],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}
  1. 执行存储计划:
    该命令会将指定分区的副本重新分配到新的 broker上。集群控制器通过为每个分区添加新副本实现重新分配(增加复制系数)。新的副本将从分区的首领那里复制所有数据。根据分区大小的不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新副本上的。在复制完成之后,控制器将旧副本从副本清单里移除(恢复到原先的复制系数)。
[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file    /root/increase-replication-factor.json  --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3,three-4
  1. 验证副本存储计划:
    在重分配进行过程中或者完成之后,可以使用 kafka-reassign-partitions.sh 工具验证重分配 的状态。它可以显示重分配的进度、已经完成重分配的分区以及错误信息。
[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file    /root/increase-replication-factor.json  --verify
Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.
Reassignment of partition three-4 is complete.

Clearing broker-level throttles on brokers 1,2,3,4    //说明已经分配在kafka集群的全部4个节点上了
Clearing topic-level throttles on topic three
  1. 再次验证副本存储计划:
    说明没问题,成功。14主机上分配了分区0、2、3,共3个分区
[root@localhost bin]#  ./kafka-topics.sh --describe --bootstrap-server 192.168.40.11:9092  --topic three
Topic: three	PartitionCount: 5	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: three	Partition: 0	Leader: 1	Replicas: 2,4,1	Isr: 2,1,4
	Topic: three	Partition: 1	Leader: 2	Replicas: 3,1,2	Isr: 2,3,1
	Topic: three	Partition: 2	Leader: 4	Replicas: 4,2,3	Isr: 2,3,4
	Topic: three	Partition: 3	Leader: 1	Replicas: 1,3,4	Isr: 3,1,4
	Topic: three	Partition: 4	Leader: 2	Replicas: 2,1,3	Isr: 2,3,1
到kafka-14主机查看数据,已经有数据了,分区0、2、3在14主机上
[root@localhost log]# ll
total 16
-rw-r--r--. 1 root root   0 Mar  4 21:48 cleaner-offset-checkpoint
-rw-r--r--  1 root root   4 Mar  6 06:55 log-start-offset-checkpoint
-rw-r--r--  1 root root  88 Mar  6 01:47 meta.properties
-rw-r--r--  1 root root  34 Mar  6 06:55 recovery-point-offset-checkpoint
-rw-r--r--  1 root root  34 Mar  6 06:55 replication-offset-checkpoint
drwxr-xr-x  2 root root 141 Mar  6 03:01 three-0
drwxr-xr-x  2 root root 141 Mar  6 03:06 three-2
drwxr-xr-x  2 root root 141 Mar  6 03:01 three-3
[root@localhost log]# cd three-0
[root@localhost three-0]# ll
total 0
-rw-r--r-- 1 root root 10485760 Mar  6 03:01 00000000000000000000.index
-rw-r--r-- 1 root root        0 Mar  6 03:01 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Mar  6 03:01 00000000000000000000.timeindex
-rw-r--r-- 1 root root        0 Mar  6 03:01 leader-epoch-checkpoint

1.2、修改topic分区partition的副本数(扩缩容副本)

分区重分配工具提供了一些特性,用于改变分区的复制系数(副本数)。如果在创建分区时指定了错误的复制系数、副本数(比如在创建主题时没有足够多可用的 broker),那么就有必要修改它们。这可以通过创建一个 JSON 对象来完成,该对象使用分区重新分配的执行步骤中使用的格式,显式指定分区所需的副本数量。集群将完成重分配过程,并使用新的复制系数、副本数。
Kafka不能通过命令行方式修改副本,需要通过json等其他方式修改。无论是增加或减少副本,都可以针对每个分区进行个性增加或减少,不是必须所有分区都增加或同时减少。

增加副本操作:

创建json文件:
1)方法一:使用命令自动创建(有待进一步验证)
指定要修改副本数的Topic和分区,以及新的副本分配。这个JSON文件可以通过kafka-reassign-partitions.sh脚本的–generate选项自动生成。命令模板如下,11 14 18为kafka的broker id。
首先创建一个文件addReplicas.json,现有业务topic名称为aaa

[root@kafka18 ~]# cat addReplicas.json 
{
  "topics": [
    {"topic": "aaa"}
  ],
  "version": 1
}


[root@kafka18 ~]# /usr/local/kafka_2.13-2.7.1/bin/kafka-reassign-partitions.sh --bootstrap-server  192.168.40.18:9092 --topics-to-move-json-file addReplicas.json --broker-list "11,14,18" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"aaa","partition":0,"replicas":[14,18,11],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":1,"replicas":[11,14,18],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":2,"replicas":[18,11,14],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"aaa","partition":0,"replicas":[11,14,18],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":1,"replicas":[14,18,11],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":2,"replicas":[18,11,14],"log_dirs":["any","any","any"]}]}

2)方法二:手动编辑创建
假设topic已有3个分区2个副本,需要增加到3个副本。创建一个预定义的topic分区副本json文件,即副本存储计划:

]# cat addReplicas.json   //该名称自定义,该文件与下面reassign.json文件内容一致,仅格式不同,使用reassign.json文件
{
    "version": 1,         //固定写法
    "partitions": [
        {
            "topic": "test",    //业务的topic名称
            "partition": 0,    //partition的序号,必须从0开始,是第一个分区
            "replicas": [  //期望增加到的副本数,数字为broker的id,说明期望增加到012,3个broker上,为3个副本
                0,
                1,
		        2
            ]
        },
        {
            "topic": "test",
            "partition": 1,
            "replicas": [
                1,
                0,
                2
            ]
        },
        {
            "topic": "test",
            "partition": 2,
            "replicas": [
                2,
                0,
                1
            ]
        }
    ]
}

或如下格式编写
在这里插入图片描述
整体操作步骤如下:

首先查看topic的副本情况,发现有5个分区,每个分区有2个副本
bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092 --describe
Topic: three PartitionCount: 5 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: three Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: three Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: three Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: three Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: three Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2,3

首先编写配置文件,把上面查询输出复制过来即可:

vim  reassign.json
{"version":1,
  "partitions":[
     {"topic":"three","partition":0,"replicas":[1,3,2]},
     {"topic":"three","partition":1,"replicas":[2,1,3]},
     {"topic":"three","partition":2,"replicas":[3,2,1]},
     {"topic":"three","partition":3,"replicas":[1,2,3]},
     {"topic":"three","partition":4,"replicas":[2,3,1]}
]}

执行副本存储计划

]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"three","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"three","partition":2,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"three","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"three","partition":4,"replicas":[2,3],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3,three-4

查看执行的状态:

[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --verify
Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.
Reassignment of partition three-4 is complete.

Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic three

再次查看验证结果,黄色部分broker的id,说明副本都增加成功了,分布在原有kafka集群了

]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092  --describe
Topic: three	PartitionCount: 5	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: three	Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2
	Topic: three	Partition: 1	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
	Topic: three	Partition: 2	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1
	Topic: three	Partition: 3	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: three	Partition: 4	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1

减少topic副本操作:(即从某个kafka节点下线副本)

把下面4分区3副本,名为topic_first的topic,减少计划如下,黄色为计划减掉的副本

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092  --describe
Topic: topic_first	PartitionCount: 4	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: topic_first	Partition: 0	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
	Topic: topic_first	Partition: 1	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: topic_first	Partition: 2	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
	Topic: topic_first	Partition: 3	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1

我们必须保留Leader副本,再把计划保留的其他Follower副本编辑上,首先编写配置文件,把上面查询输出复制过来即可:

vim  reassign.json
{"version":1,
  "partitions":[
     {"topic":"topic_first","partition":0,"replicas":[3,1]},
     {"topic":"topic_first","partition":1,"replicas":[1]},
     {"topic":"topic_first","partition":2,"replicas":[2,1]},
     {"topic":"topic_first","partition":3,"replicas":[3,2]}
]}

执行计划

]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute  //有zk写法
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file reassign.json --execute  //无zk

Json文件要用绝对路径,或者确保可以找到json文件

]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"topic_first","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic_first-0,topic_first-1,topic_first-2,topic_first-3

查询执行结果

]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --verify
Status of partition reassignment:
Reassignment of partition topic_first-0 is complete.
Reassignment of partition topic_first-1 is complete.
Reassignment of partition topic_first-2 is complete.
Reassignment of partition topic_first-3 is complete.

Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic topic_first

再次查看 Topic 的详情,否按照预想减少副本,结果成功!

[root@localhost bin]#  ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092  --describe
Topic: topic_first	PartitionCount: 4	ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: topic_first	Partition: 0	Leader: 3	Replicas: 3,1	Isr: 3,1
	Topic: topic_first	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic_first	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1
	Topic: topic_first	Partition: 3	Leader: 3	Replicas: 3,2	Isr: 3,2

故障恢复方法:
宕机如何恢复:
少部分副本宕机
当leader宕机了,会从follower选择⼀个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader⾥pull数据。
全部副本宕机
当全部副本宕机了有两种恢复⽅式:
等待ISR中的⼀个恢复后,并选它作为leader。(等待时间较⻓,降低可⽤性)
选择第⼀个恢复的副本作为新的leader,⽆论是否在ISR中。(并未包含之前leader commit的数据,因此造成数据丢失)

只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护⼀个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是⼀些分区的副本。
只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的⽣产者。
如果这个集合有增减,kafka会更新zookeeper上的记录。

如果所有的ISR副本都失败了怎么办?此时有两种⽅法可选:
等待ISR集合中的副本复活,
选择任何⼀个⽴即可⽤的副本,⽽这个副本不⼀定是在ISR集合中。需要设置 unclean.leader.election.enable=true,如果设置为true就意味着当leader下线时候可以从非ISR集合中选举出新的leader,这样有可能造成数据的丢失
这两种⽅法各有利弊,实际⽣产中按需选择。
如果要等待ISR副本复活,虽然可以保证⼀致性,但可能需要很⻓时间。⽽如果选择⽴即可⽤的副本,则很可能该副本并不⼀致。

总结:
Kafka中Leader分区选举,通过维护⼀个动态变化的ISR集合来实现,⼀旦Leader分区丢掉,则从ISR中随机挑选⼀个副本做新的Leader分区。
如果ISR中的副本都丢失了,则:
可以等待ISR中的副本任何⼀个恢复,接着对外提供服务,需要时间等待。
从OSR中选出⼀个副本做Leader副本,此时会造成数据丢失

还有一个ISR,该参数全称,in-sync replica,它维护了一个集合,例如截图里的2,0,1,代表2,0,1副本保存的消息日志与leader 副本是保持一致的,只有保持一致的副本(包括所有副本),才会被维护在ISR集合里,当出现一定程度的不同步时,就会将该对应已经不一致的副本移出ISR集合,但是,这种移出并非永久的,一旦被移出的副本慢慢又恢复与leader一样时,那么,又会被加回isr集合当中。注意一点,只有在这个ISR里的副本服务器,才能在leader出现问题时有机会被选举为新的leader。

补充
用步骤1.1的 --generate 获取一下当前的分配情况,得到如下json

{
    "version": 1,
    "partitions": [{
        "topic": "test_create_topic1",
        "partition": 2,
        "replicas": [2],
        "log_dirs": ["any"]
    }, {
        "topic": "test_create_topic1",
        "partition": 1,
        "replicas": [1],
        "log_dirs": ["any"]
    }, {
        "topic": "test_create_topic1",
        "partition": 0,
        "replicas": [0],
        "log_dirs": ["any"]
}]}

假如想把所有分区的副本都变成2, 那只需修改"replicas": []里面的值了,这里面是Broker列表,排在第一个的是Leader; 所以根据自己想要的分配规则修改一下json文件就变成如下
{
    "version": 1,
    "partitions": [{
        "topic": "test_create_topic1",
        "partition": 2,
        "replicas": [2,0],
        "log_dirs": ["any","any"]
    }, {
        "topic": "test_create_topic1",
        "partition": 1,
        "replicas": [1,2],
        "log_dirs": ["any","any"]
    }, {
        "topic": "test_create_topic1",
        "partition": 0,
        "replicas": [0,1],
        "log_dirs": ["any","any"]
    }]}

注意log_dirs里面的数量要和replicas数量匹配;或者直接把log_dirs选项删除掉; 这个log_dirs是副本跨路径迁移时候的绝对路径
执行–execute

如果想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。可以重新运行execute命令,用相同的reassignment-json-file:
验证–verify,完事之后,副本数量就增加了

副本缩容
副本缩容跟扩容是一个意思; 当副本分配少于之前的数量时候,多出来的副本会被删除;
比如刚新增了一个副本,想重新恢复到一个副本
执行下面的json文件

{
  "version": 1,
  "partitions": [{
    "topic": "test_create_topic1",
    "partition": 2,
    "replicas": [2],
    "log_dirs": ["any"]
  }, {
    "topic": "test_create_topic1",
    "partition": 1,
    "replicas": [1],
    "log_dirs": ["any"]
  }, {
    "topic": "test_create_topic1",
    "partition": 0,
    "replicas": [0],
    "log_dirs": ["any"]
  }]}

1.3、Partition Reassign场景限流

Reassign中文重新分配
Partition Reassign限流注意事项:

限流速度不能过小,如果限流速度过小,将不能触发实际的reassign复制过程。
限流参数不会对正常的副本fetch流量进行限速。
任务完成后,您需要通过verify参数移除Topic和Broker上的限速参数配置。
如果刚开始已经设置了throttle参数,则可以通过execute命令再次修改throttle参数。
如果刚开始没有设置throttle参数,则需要使用kafka-configs.sh命令修改Topic上的leader.replication.throttled.replicas和follower.replication.throttled.replicas参数、修改Broker上的leader.replication.throttled.rate和follower.replication.throttled.rate参数。

使用kafka-reassign-partitions.sh来进行Partition Reassign操作,通过使用throttle参数来设置限流的大小。示例如下所示:

  1. 创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

通过以下命令查看Topic详情。

./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  1. 执行以下命令,模拟数据写入。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  1. 设置throttle参数并执行reassign操作。
    创建reassignment-json-file文件reassign.json,写入如下内容。
{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}

执行reassign操作。
由于模拟的写入速度为10 Mbit/s,所以将reassign限流速度设置为30 Mbit/s。

./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute
  1. 查看限流参数。
    查看指定节点的Broker参数。2是broker的id
    ./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe

查看指定Topic的参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe

  1. 查看reassign任务执行情况。
    ./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify

说明:任务完成后,您需要重复执行上述命令以移除限流参数。

1.4、节点内副本移动到不同目录的场景限流

通过kafka-reassign-partitions.sh可以进行Broker节点内的副本迁移,参数replica-alter-log-dirs-throttle可以对节点内的迁移IO进行限制。示例如下所示:

  1. 创建测试Topic。
    执行以下命令,创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

可以通过以下命令查看Topic详情。

./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  1. 执行以下命令,模拟数据写入。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  1. 设置参数replica-alter-log-dirs-throttle并执行reassign操作。
创建文件reassign.json,将目标目录写入reassignment文件中,内容如下。
{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}

执行replicas movement操作。
./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
  1. 查看限流参数。
    Broker节点内目录间移动副本会在Broker上配置限流参数,参数名为Brokerreplica.alter.log.dirs.io.max.bytes.per.second。

执行以下命令,查看指定节点的Broker参数。

./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
  1. 查看reassign任务执行情况。
    ./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify

说明:任务完成后,您需要重复执行上述命令以移除限流参数。

1.5、集群Broker恢复时,副本数据同步场景限流

重要提示:
限流速度不能过小,如果限流速度过小,将不能触发实际的reassign复制过程。
限流参数不会对正常的副本fetch流量进行限速。
数据恢复完成后,需要使用kafka-configs.sh命令删除相应的参数。

步骤:
当Broker重启时,需要从leader副本进行副本数据的同步。在Broker节点迁移、坏盘修复重新上线等场景时,由于之前的副本数据完全丢失、副本数据恢复会产生大量的同步流量,有必要对恢复过程进行限流避免恢复流量过大影响正常流量。

  1. 创建测试Topic。
    执行以下命令,创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

可以通过以下命令查看Topic详情。

./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  1. 执行以下命令,写入测试数据。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  1. 通过kafka-configs.sh命令设置限流参数。
//设置Topic上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"

//设置Broker上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
  1. 停止Broker 1节点进程。

  2. 删除Broker 1上的副本数据,模拟数据丢失的场景。
    rm -rf /mnt/disk2/kafka/log/test-throttled-0/

  3. 启动Broker 1节点,观察限流参数是否起作用。

  4. 待Broker 1相应的副本恢复到ISR列表后,使用kafka-configs.sh命令删除限流参数的配置。

//删除Topic上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled

//删除Broker上的限流参数
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0