1. 消息分区
1.1 指定分区
Kafka 中 Topic 是对数据逻辑上的分类,而 Partition 才是数据真正存储的物理位置。所以在生产数据时,如果只是指定 Topic 的名称,其实 Kafka 时不知道将数据发送到哪一个Broker 节点的我们可以在构建数据传递 Topic 参数的同时,也可以指定数据存储的分区编号。
for ( int i = 0; i < 1; i++ ) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", 0, "key" + i, "value" + i);
final Future<RecordMetadata> send = producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if ( e != null ) {
e.printStackTrace();
} else {
System.out.println("数据发送成功:" + record.key() + "," + record.value());
}
}
});
}
1.2 未指定分区
指定分区传递数据是没有任何问题的。Kafka 会进行基本简单的校验,比如是否为空,是否小于0之类的,但是你的分区是否存在就无法判断了,所以需要从 Kafka 中获取集群元数据信息,此时会因为长时间获取不到元数据信息而出现超时异常。所以如果不能确定分区编号范围的情况下,不指定分区还是一个不错的选择。
如果不指定分区,Kafka 会根据集群元数据中的主题分区来通过算法来计算分区编号并设定:
(1)如果指定了分区,直接使用
(2)如果指定了自己的分区器,通过分区器计算分区编号,如果有效,直接使用
(3)如果指定了数据Key,且使用 Key 选择分区的场合,采用 murmur2 非加密散列算法(类似于 hash)计算数据 Key 序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号
(4)如果未指定数据 Key,或不使用 Key 选择分区,那么 Kafka 会采用优化后的粘性分区策略进行分区选择:
没有分区数据加载状态信息时,会从分区列表中随机选择一个分区。
如果存在分区数据加载状态信息时,根据分区数据队列加载状态,通过随机数获取一个权重值
根据这个权重值在队列加载状态中进行二分查找法,查找权重值的索引值
将这个索引值加1就是当前设定的分区
增加数据后,会根据当前粘性分区中生产的数据量进行判断,是不是需要切换其他的分区。判断地标准就是大于等于批次大小(16K)的2倍,或大于一个批次大小(16K)且需要切换。如果满足条件,下一条数据就会放置到其他分区。
1.3 分区器
1.3.1 增加分区器类
首先我们需要创建一个类,然后实现 Kafka 提供的分区类接口 Partitioner,接下来重写方法。这里我们只关注 partition 方法即可,因为此方法的返回结果就是需要分区编号。
/**
* TODO 自定义分区器实现步骤:
* 1. 实现Partitioner接口
* 2. 重写方法
* partition : 返回分区编号,从0开始
* close
* configure
*/
public class KafkaPartitionerMock implements Partitioner {
/**
* 分区算法 - 根据业务自行定义即可
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
* @return 分区编号,从0开始
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
1.3.2 配置分区器
public class ProducerPartitionTest {
public static void main(String[] args) {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartitionerMock.class.getName());
KafkaProducer<String, String> producer = null;
try {
producer = new KafkaProducer<>(configMap);
for ( int i = 0; i < 1; i++ ) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
final Future<RecordMetadata> send = producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if ( e != null ) {
e.printStackTrace();
} else {
System.out.println("数据发送成功:" + record.key() + "," + record.value());
}
}
});
}
} catch ( Exception e ) {
e.printStackTrace();
} finally {
if ( producer != null ) {
producer.close();
}
}
}
}
1.2 消息可靠性
对于生产者发送的数据,我们有的时候是不关心数据是否已经发送成功的,我们只要发送就可以了。在这种场景中,消息可能会因为某些故障或问题导致丢失,我们将这种情况称之为消息不可靠。虽然消息数据可能会丢失,但是在某些需要高吞吐,低可靠的系统场景中,这种方式也是可以接受的,甚至是必须的。
但是在更多的场景中,我们是需要确定数据是否已经发送成功了且 Kafka 正确接收到数据的,也就是要保证数据不丢失,这就是所谓的消息可靠性保证。
而这个确定的过程一般是通过 Kafka 给我们返回的响应确认结果我们也可以简称为 ACK 应答。根据场景,Kafka 提供了3种应答处理,可以通过配置对象进行配置。
configMap.put(ProducerConfig.ACKS_CONFIG,"-1")
1.2.1 ACK=0
当生产数据时,生产者对象将数据通过网络客户端将数据发送到网络数据流中的时候,Kafka 就对当前的数据请求进行了响应(确认应答),如果时同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,明显可以看出,这种应答方式,数据已经走网络给 Kafka 发送了,但这其实并不能保证 Kafka 能正确地接收到数据,在传输过程中如果网络出现了问题,那么数据就丢失了。也就是说这种应答确认的方式,数据的可靠性是无法保证的。不过相反,因为无需等待 Kafka 服务节点的确认,通信效率倒是比较高的,也就是系统吞吐量会非常高。
1.2.2 ACK=1
当生产数据时,Kafka Leader 副本将数据接收到并写入到了日志文件后,就会对当前的数据请求进行应答(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,可以看出,这种应答方式,数据已经存储到分区Leader 副本中,那么数据相对来讲就比较安全了,也就是可靠性比较高。之所以说相对来讲比较安全,就是因为现在只有一个节点存储了数据,而数据并没有来得及备份到 follower 副本,那么一旦当前存储数据的 broker 节点出现了故障,数据也依然会丢失。
1.2.3 ACK=-1(默认)
当生产数据时,Kafka Leader 副本和Follower 副本都已经将数据接收到并写入到日志文件后,再对当前的数据请求进行响应(确认应答),如果时同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,可以看出,这种应答方式,数据已经存储到了分区 Leader 副本和 follow 副本中,那么数据已经非常安全了,可靠性也是最高的。此时,如果 Leader副本出现了故障,那么 follower 副本能够开始起作用,因为数据已经存储了,所以数据不会丢失。
不过这里需要注意,如果假设我们的分区有 5 个follower 副本,编号为 1,2,3,4,5
但是此时只有3个副本处于和 Leader 副本之间处于数据同步状态,那么此时分区就存在一个同步副本列表,我们称之为 In Syn Replica,简称 ISR。此时,Kafka 只要保证 ISR 中所有的4个副本接收到了数据,就可以对数据请求进行响应了。无需5个副本全部接收到数据。
1.3 消息去重 & 有序
1.3.1 数据重试
由于网络或服务节点的故障,Kafka 在传输数据时,可能会导致数据丢失,所以我们才会设置 ACK 应答机制,尽可能提高数据的可靠性。但其实在某些场景中,数据的丢失并不是真正地丢失,而是"虚假丢失",比如将ACK应答设置为1,也就是说一旦 Leader 副本将数据写入文件后,Kafka就可以对请求进行响应了。
此时,如果假设由于网络故障的原因,Kafka 并没有成功将 ACK 应答信息发送给 Producer,那么此时对于 Producer 来讲,以为 Kafka 没有收到数据,所以就会一直等待响应,一旦超过某个时间阈值,就会发生超时错误,也就是说在Kafka Producer 眼里,数据已经丢了。
所以在这种情况下,kafka Producer 会尝试对超时的请求数据进行重试(retry)操作。通过重试操作将数据再次发送给kafka。
如果此时发送成功,那么 Kafka 就又收到了数据,而这两条数据是一样的,也就是说,导致了数据的重复。
1.3.2 数据乱序
数据重试(retry)功能除了可能会导致数据重复以外,还可能会导致数据乱序。假设外面需要将编号为1,2,3的三条连续数据发送给 Kafka。每条数据会对应一个连接请求。
此时,如果第一个数据的请求出现了故障,而第二个数据和第三个数据的请求正常,那么Broker 就收到了第二个数据和第三个数据,并进行了应答。
为了保证数据的可靠性,此时,Kafka Producer 会将第一条数据重新放回到缓冲区的第一个。进行重试操作
如果重试成功,Broker 收到第一条数据,你会发现。数据的顺序已经被打乱了。
1.3.3 数据幂等性
为了解决 Kafka 传输数据时,所产生的数据重复就和乱序问题,Kafka 引入了幂等性操作,所谓的幂等性,就是 Producer 同样的一条数据,无论向 Kafka 发送多少次,Kafka 都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据。
默认幂等性是不起作用的,所以如果想要使用幂等性操作,只需要在生产者对象的配置中开启幂等性配置即可。
配置项 | 配置值 | 说明 |
---|---|---|
enable.idempotence | true | 开启幂等性 |
max.in.flight.requests.per.connection | 小于等于5 | 每个连接的在途请求数,不能大于5,取值范围为[1,5] |
acks | all(-1) | 确认应答,固定值,不能修改 |
retries | >0 | 重试次数,推荐使用Int最大值 |
kafka 是如何实现数据的幂等性操作呢,这里说一下流程:
(1)开启幂等性后,为了保证数据不会重复,那么就需要给每一个请求批次的数据增加唯一性标识,kafka 中,这个标识采用的是连续的序列号数字 sequencenum,但是不同的生产者 Producer 可能序列号是一样的,所以仅仅靠 seqnum 还无法唯一标记数据,所以还需要同时对生产者进行区分,所以 Kafka 采用申请生产者 ID (producerid)的方式对生产者进行区分。这样,在发送数据前,我们就需要提前申请 producerid 以及序列号 sequencenum。
(2)Broker 中会给每一个分区记录生产者的生产状态:采用队列的方式缓存最近的5个批次数据。队列中的数据按照 seqnum 进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。
(3)如果 Broker 当前新的请求批次数据在缓存的 5个旧的批次中存在相同的,如果有相同的,那么说明有重复,当前批次数据不做任何处理。
(4)如果 Broker 当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号+1,如果是,说明是连续的,顺序没乱。如果不是,那么说明数据已经乱了,发生异常。
(5)Broker 根据异常返回响应,通知 Producer 进行重试。Producer 重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后。再进行重试即可。
(6)如果请求批次不重复,且有序,那么更新缓冲区中的批次数据。将当前批次放置在队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。
从上面的流程可以看出,Kafka 的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重,在一些对数据敏感的业务中是十分重要的。但是通过原理,我们也能明白,这种幂等性还是有缺陷的:
- 幂等性的 producer 仅做到单分区上的幂等性,即单分区消息有序不重复,多分区无法保证幂等性。
- 只能保持生产者单个会话的幂等性,无法实现跨会话的幂等性,也就是说如果一个producer 挂掉再重启,那么重启前和重启后的 producer 对象会被当成两个独立的生产者,从而获取两个不同的生产者 ID,导致 broker 端无法获取之前的状态信息,所以无法实现跨会话的幂等。要想解决这个问题,可以采用后续的事务功能。