kafka-docker版

发布于:2025-03-10 ⋅ 阅读:(16) ⋅ 点赞:(0)

Kafka-docker版

1 概述

1.1 定义

Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQucue),主要应用于大数据实时处理领域。它是一个开源的分布式事件流平台( Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

1.2 消息队列

目前企业中比较常见的消息队列产品主要有 Kafka、ActiveMQ 、RabbitMQ、RocketMQ 等。 在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

1、传统消息队列的应用场景

传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。

(1)缓冲/消峰

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

从上图可以看出:消息发送端的流量10亿QPS,而服务器的处理能力只有1000万QPS,这样就造成服务器不堪重负‌,从而压垮服务器。如果采用消息队列(下图),消息的发布者不会将消息直接发送给特定的订阅者(服务器),而是将消息发布到消息队列,订阅者根据自身的处理情况,按照自己节奏来接收信息,避免了压垮消费者的情况。

(2)解耦

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

当服务A调用服务B时,服务A只需要将消息发送到消息队列,从消息队列中返回结果,不需要直接调用服务B,由消息队列去和服务B进行通信。这样如果服务B发生了故障,不影响服务A的正常运行。

3、异步通信

允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

2、消息队列的两种模式

(1)点对点模式

消费者主动拉取数据,消息收到后清除消息。

(2)发布/订阅模式

可以有多个topic主题(浏览、点赞、收藏、评论等)

  • 消费者消费数据之后,不删除数据。
  • 每个消费者相互独立,都可以消费到数据。

1.3 Kafka 基础架构

(1)Producer:消息生产者,就是向 Kafka broker 发消息的客户端。

(2)Consumer:消息消费者,向 Kafka broker 获取消息的客户端。

(3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

(4)Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。

(5)Topic:主题。即消息的分类,生产者和消费者面向的都是一个 topic。

(6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。

(7)Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。

(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。

(9)Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

2 docker部署Kafka

2.1 安装准备

1、集群规划

部署3台broker:

zookeeper集群
(已部署)
zk1宿主机ip:端口 zk2宿主机ip:端口 zk3宿主机ip:端口
192.168.10.101:2181 192.168.10.102:2181 192.168.10.103:2183
192.168.100.104:2181 192.168.100.104:2182 192.168.100.104:2183
kafka集群 kf1宿主机ip:端口 kf2宿主机ip:端口 宿主机ip:端口
192.168.10.201:9092 192.168.10.202:9092 192.168.10.203:9092
192.168.100.104:9022 192.168.100.104:9093 192.168.100.104:9094

2、下载镜像

[root@hadoop104 ~]# docker pull wurstmeister/kafka

2.2 安装

1、安装第1个broker

[root@hadoop104 local]# docker run -it -d --restart always --name kf1 –network zk_net --ip 192.168.10.201 -p 9092:9092 -p 19999:9999 -e KAFKA_BROKER_ID=1 --env KAFKA_ZOOKEEPER_CONNECT=192.168.100.104:2181,192.168.100.104:2182,192.168.100.104:2183 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.104:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

参数说明:

–env KAFKA_ZOOKEEPER_CONNECT=192.168.100.104:2181,192.168.100.104:2182,192.168.100.104:2183: 设置环境变量,指定ZooKeeper的连接字符串。使用宿主机ip:端口,提供给外网使用。

–env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT:// 192.168.100.104:9092: 设置环境变量,指定Kafka的advertised listeners(消息监听)。使用宿主机ip:端口,提供给外网使用。

–env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。

2、安装第2个broker

[root@hadoop104 local]# docker run -it -d --restart always --name kf2 --network zk_net --ip 192.168.10.202 -p 9093:9092 -p 29999:9999 -e KAFKA_BROKER_ID=2 --env KAFKA_ZOOKEEPER_CONNECT=192.168.100.104:2181,192.168.100.104:2182,192.168.100.104:2183 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.104:9093 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

3、安装第3个broker

[root@hadoop104 local]# docker run -it -d --restart always --name kf3 --network zk_net --ip 192.168.10.203 -p 9094:9092 -p 39999:9999 -e KAFKA_BROKER_ID=3 --env KAFKA_ZOOKEEPER_CONNECT=192.168.100.104:2181,192.168.100.104:2182,192.168.100.104:2183 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.104:9094 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

4、测试

(1)进入第1个broker

[root@hadoop104 local]# docker exec -it kf1 /bin/bash

创建主题first

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --create --partitions 1 --replication-factor 3 -topic first

Created topic first.

(2)进入第2个broker

[root@hadoop104 local]# docker exec -it kf2 /bin/bash

创建主题second

root@3adde69a4342:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --create --partitions 1 --replication-factor 3 -topic second

Created topic second.

(3)进入第3个broker

[root@hadoop104 local]# docker exec -it kf3 /bin/bash

创建主题three

root@0c2c89bbfdd0:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --create --partitions 1 --replication-factor 3 -topic three

Created topic three.

查看所有主题

root@0c2c89bbfdd0:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --list

first

second

three

kafka集群部署成功。

3 Kafka操作

3.1 主题命令行操作

1、查看操作主题命令参数

kafka-topics.sh --help

参数详解:

参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。
–create 创建主题。
–delete 删除主题。
–alter 修改主题。
–list 查看所有主题。
–describe 查看主题详细描述。
–partitions <Integer: # of partitions> 设置分区数。
–replication-factor<Integer: replication factor> 设置分区副本。
–config <String: name=value> 更新系统默认的配置。

2、操作实例

(1)查看当前服务器中的所有 topic

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --list

(2)创建 topic test01

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --create --partitions 1 --replication-factor 3 -topic test01

选项说明:

–topic 定义 topic 名

–replication-factor 定义副本数

–partitions 定义分区数

(3)查看 test01主题的详情

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --describe --topic test01

(4)修改test01主题分区数(注意:分区数只能增加,不能减少)

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --alter --topic test01 --partitions 3

再次查看 test01 主题的详情

(5)删除 topic test01

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --delete --topic test01

3.2 生产者命令行操作

1、查看操作生产者命令参数

kafka-console-producer.sh --help

参数详解:

参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。

2、发送消息

root@14b37abf58ad:/# kafka-console-producer.sh --bootstrap-server 192.168.100.104:9092 --topic first

hello world

hello kafka

3.3 消费者命令行操作

1、查看操作消费者命令参数

kafka-console-consumer.sh --help

参数详解:

参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。
–from-beginning 从头开始消费。
–group <String: consumer group id> 指定消费者组名称。

2、消费消息

重开1个或2个Linux终端,分别启动kf1,kf2容器终端,进行消费,kf1容器不断发送消息,查看消费者的接收情况。

[root@hadoop104 ~]# docker exec -it kf2 /bin/bash

root@3adde69a4342:/#

(1)消费 first 主题中的数据。

root@3adde69a4342:/# kafka-console-consumer.sh --bootstrap-server 192.168.100.104:9092 --topic first

Hello a

Hello b

(2)把主题中所有的数据都读取出来(包括历史数据)。

root@3adde69a4342:/# kafka-console-consumer.sh --bootstrap-server 192.168.100.104:9092 --topic first --from-beginning

Hello World

Hello Kafka

Hello a

Hello b

4 Kafka 生产者

4.1 生产者消息发送流程

1、发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k。

linger.ms:如果数据迟迟未达到batch.size, sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。

2、应答acks原理:

0: 生产者发送过来的数据,不需要等数据落盘应答。生产者发送了信息之后,不要等待kafka回复,如果kafka上的节点挂了后,这个消息就丢失了。

1: 生产者发送过来的数据,Leader收到数据后应答。可能leader收到消息后,还没同步leader挂了,消息丢失了。因为新的Leader不会收到消息,生产者已经认为发送成功了。

-1 (all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。ISR队列是指有Leader保持同步的Follower+Leader集合。

3、生产者重要参数列表

参数名称 描述
bootstrap.servers 生产者连接集群所需的 broker 地址清单。例如 192.168.100.104:9092。可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。 幂等性即1n=1。
retries 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd。

4.2 异步发送 API

异步发送意味着消息将被发送到Kafka,而不需要等待Kafka的确认。这通常提供了最佳的性能,但也意味着如果发送过程中发生错误,你可能无法立即知道。在实际应用中,异步发送适合你不需要立即知道消息是否成功发送的情况。

1、不带回调函数的异步发送

不带回调的异步发送,意味着如果发送过程中发生错误,你可能无法立即知道。

【例4.2-01】编写不带回调函数的 API 代码。

需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker。

(1)创建工程,导入依赖

 <dependency> 

      <groupId>org.apache.kafka</groupId> 

      <artifactId>kafka-clients</artifactId> 

      <version>3.0.0</version> 

 </dependency>

<dependency>

  <groupId>junit</groupId>

  <artifactId>junit</artifactId>

  <version>4.12</version>

  <scope>test</scope>

</dependency> 

(2)编写kafka配置类KafkaConfig

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaConfig {

// 创建kafka配置对象

public  static Properties properties = new Properties();

static {

    // 给kafka配置对象添加配置信息:bootstrap.servers

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

            "192.168.100.104:9092");

}

/**

 * key,value序列化

 */

public static void  serializer(){

    // key,value序列化(必须):key.serializer,value.serializer

    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

            StringSerializer.class.getName());

    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

            StringSerializer.class.getName());

}

}

(3)编写生产者类CustomProducer

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class CustomProducer {

/**

 * 不带回调的异步发送

 * @param topic

 * @param msg

 */

public void AsyncSendMsg(String topic,String msg) {

    KafkaProducer<String,String> kafkaProducer = null;

    try {

        // 1. key-value序列化

        KafkaConfig.serializer();

        // 2. 创建kafka生产者对象

        kafkaProducer = new KafkaProducer<String, String>(KafkaConfig.properties);

        // 3. 调用send方法,发送消息

        ProducerRecord<String, String> message = new ProducerRecord<>(topic, msg);

        kafkaProducer.send(message);

    } catch (Exception e) {

        throw new RuntimeException(e);

    } finally {

        // 4. 关闭资源

        kafkaProducer.close();

    }

}

}

(4)测试

①开启 Kafka 消费者。

root@3adde69a4342:/# kafka-console-consumer.sh --bootstrap-server 192.168.100.104:9092 --topic first

②编写测试类CustomProducerTest

public class CustomProducerTest {

CustomProducer customProducer = new CustomProducer();

@Test

public void test1(){

    customProducer. AsyncSendMsg("first","Hello IDE");

}

}

③执行程序,观察消费者终端控制台中是否接收到消息:

Hello IDE

2、带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

【例4.2-02】编写带回调函数的异步发送 API代码

需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker,发送成功时返回topic的分区信息,否则抛出异常。

(1)在CustomProducer类添加异常发送方法

/**

 * 带回调的异步发送

 * @param topic

 * @param msg

 */

public void AsyncSendMsgCallback(String topic,String msg) {

    KafkaProducer<String, String> kafkaProducer = null;

    try {

        // 1. key-value序列化

        KafkaConfig.serializer();

        // 2. 创建kafka生产者对象

        kafkaProducer = new KafkaProducer<String,String>(KafkaConfig.properties);

        // 3. 调用send方法,发送消息

        ProducerRecord<String, String> message = new ProducerRecord<>(topic, msg);

        // 添加回调 new Callback(),重写onCompletion方法,该方法在Producer收到ack时调用,为异步调用

        kafkaProducer.send(message, new Callback() {

            @Override

            public void onCompletion(RecordMetadata recordMetadata,Exception e) {

                if (e == null) {

                    // 没有异常,输出信息到控制台

                    System.out.println(" 主 题 : " +

                            recordMetadata.topic() + "->"  + "分区:" + recordMetadata.partition());

                } else {

                    // 出现异常打印

                    e.printStackTrace();

                }

            }

        });

    } catch (Exception e) {

        throw new RuntimeException(e);

    } finally {

        // 4. 关闭资源

        kafkaProducer.close();

    }

}

/**

 * 带回调的异步发送

 * @param topic

 * @param msg

 * @param callback 回调函数对象

 */

public void AsyncSendMsgCallback(String topic,String msg,Callback callback) {

    KafkaProducer<String, String> kafkaProducer = null;

    try {

        // 1. key-value序列化

        KafkaConfig.serializer();

        // 2. 创建kafka生产者对象

        kafkaProducer = new KafkaProducer<String,String>(KafkaConfig.properties);

        // 3. 调用send方法,发送消息

        ProducerRecord<String, String> message = new ProducerRecord<>(topic, msg);

        // 添加回调 new Callback(),重写onCompletion方法,该方法在Producer收到ack时调用,为异步调用

        kafkaProducer.send(message,callback);

    } catch (Exception e) {

        throw new RuntimeException(e);

    } finally {

        // 4. 关闭资源

        kafkaProducer.close();

    }

}

(2)测试

①修改主题first为3个分区

root@3adde69a4342:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --alter --topic first --partitions 3

②在测试类CustomProducerTest添加测试方法

@Test

public void test2() throws InterruptedException {

    String[] msgs = {"Hello World","Hello Hadoop","Hello Hive","Hello Flume","Hello Kafka"};

    for (String msg : msgs){

        customProducer.AsyncSendMsgCallback("first",msg);

        // 延迟一会会看到数据发往不同分区

        Thread.sleep(500);

    }

}

程序执行的结果:

主 题 : first->分区:2

主 题 : first->分区:1

主 题 : first->分区:1

主 题 : first->分区:0

主 题 : first->分区:0

③观察消费者终端控制台中是否接收到消息。

④测试单独处理回调,执行下面的测试方法,执行结果与②相同

@Test

public void test3() throws InterruptedException {

    String[] msgs = {"Hello World","Hello Hadoop","Hello Hive","Hello Flume","Hello Kafka"};

    for (String msg : msgs){

        customProducer.AsyncSendMsgCallback("first",msg,new MyCallback());

        // 延迟一会会看到数据发往不同分区

        Thread.sleep(500);

    }

}

/**

 * 回调处理类

 */

class MyCallback implements Callback{

    @Override

    public void onCompletion(RecordMetadata recordMetadata, Exception e) {

        if (e == null){

            System.out.println(" 主 题 : " +

                    recordMetadata.topic() + "->"  + "分区:" + recordMetadata.partition());

        }else{

            e.printStackTrace();

        }

    }

}

4.3 同步发送 API

同步发送意味着消息将被发送到Kafka,并且需要等待来自Kafka的确认。如果发生错误,它将抛出异常。在实际应用中,同步发送适合你需要确保消息被成功处理的情况。

在编码时,只需在异步发送的基础上,再调用一下 get()方法即可。

4.4 生产者分区

1、分区好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

2、生产者发送消息的分区策略

(1)默认的分区器 DefaultPartitioner

默认的分区器提供了生成发送消息的原始方法:

①指明partition的情况下,直接将指明的值作为partition值。

例如,partition=0,所有数据写入分区0。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable

headers)

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable

headers)

public ProducerRecord(String topic, Integer partition, K key, V value)

②没有指明partition值但有key的情况下,将key的hash值与topic的 partition数进行取余得到partition值。

例如,key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那 么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

public ProducerRecord(String topic, K key, V value)

③既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

例如,第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。

public ProducerRecord(String topic, V value)

(2)自定义分区器

研发人员可以根据企业需求,自己重新实现分区器。需要自定义分区类,实现 Partitioner 接口,并重写 partition()方法。见【例4.4-03】

【例4.4-01】将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

(1)在CustomProducer类中添加重载方法

/**

 * 发送消息到指定分区

 * @param topic

 * @param key

 * @param partition

 * @param msg

 */

public void AsyncSendMsgCallback(String topic,String key,Integer partition,String msg) {

    KafkaProducer<String,String> kafkaProducer = null;

    try {

        // 1. key-value序列化

        KafkaConfig.serializer();

        // 2. 创建kafka生产者对象

        kafkaProducer = new KafkaProducer<String,String>(KafkaConfig.properties);

        // 3. 调用send方法,发送消息

        ProducerRecord<String, String> message = new ProducerRecord<>(topic, partition, key, msg);

        // 3.1  添加回调 new Callback()

        kafkaProducer.send(message,new Callback() {

            // 3.2 重写onCompletion方法,该方法在Producer收到ack时调用,为异步调用

            @Override

            public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                if (e == null) {

                    // 没有异常,输出信息到控制台

                    System.out.println(" 主 题 : " +

                            recordMetadata.topic() + "->"  + "分区:" + recordMetadata.partition());

                } else {

                    // 出现异常打印

                    e.printStackTrace();

                }

            }

        });

    } catch (Exception e) {

        throw new RuntimeException(e);

    } finally {

        // 4. 关闭资源

        kafkaProducer.close();

    }

}

(2)测试,查看回调信息

@Test

public void test4() throws InterruptedException {

    String[] msgs =  {"Hello World","Hello Hadoop","Hello Hive","Hello Flume","Hello Kafka"};

    for (String msg : msgs){

        customProducer.AsyncSendMsgCallback("first","test",1,msg);

        // 延迟一会会看到数据发往不同分区

        Thread.sleep(500);

    }

}

程序执行结果:

主 题 : first->分区:1

主 题 : first->分区:1

主 题 : first->分区:1

主 题 : first->分区:1

主 题 : first->分区:1

【例4.4-02】没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

(1)在CustomProducer类中添加重载方法

/**

 * 发送数据到key分区

 * @param topic

 * @param key

 * @param msg

 */

public void AsyncSendMsgCallback(String topic,String key,String msg) {

    KafkaProducer<String,String> kafkaProducer = null;

    try {

        // 1. key-value序列化

        KafkaConfig.serializer();

        // 2. 创建kafka生产者对象

        kafkaProducer = new KafkaProducer<String,String>(KafkaConfig.properties);

        // 3. 调用send方法,发送消息

        ProducerRecord<String, String> message = new ProducerRecord<>(topic, key, msg);

        // 3.1  添加回调 new Callback()

        kafkaProducer.send(message, new Callback() {

            // 3.2 重写onCompletion方法,该方法在Producer收到ack时调用,为异步调用

            @Override

            public void onCompletion(RecordMetadata recordMetadata,Exception e) {

                if (e == null) {

                    // 没有异常,输出信息到控制台

                    System.out.println(" 主 题 : " +

                            recordMetadata.topic() + "->"  + "分区:" + recordMetadata.partition());

                } else {

                    // 出现异常打印

                    e.printStackTrace();

                }

            }

        });

    } catch (Exception e) {

        throw new RuntimeException(e);

    } finally {

        // 4. 关闭资源

        kafkaProducer.close();

    }

}

(2)测试,查看回调信息

@Test

public void test5() throws InterruptedException {

    Map <String,String> msgs = new HashMap<>();

    msgs.put("w","Hello Hadoop");

    msgs.put("v","Hello Hive");

    msgs.put("f","Hello Flume");

    msgs.put("k","Hello Kafka");

    for (String key : msgs.keySet()){

        customProducer.AsyncSendMsgCallback("first",key,msgs.get(key));

        // 延迟一会会看到数据发往不同分区

        Thread.sleep(500);

    }

}

程序执行结果:

主 题 : first->分区:2

主 题 : first->分区:0

主 题 : first->分区:0

主 题 : first->分区:2

【例4.4-03】自定义一个分区器,假如接收的是手机号码,将138和139开头的手机号分别存放到主题tel的0号和1号分区,其它的存放到2号分区。

(1)创建有3个分区的主题tel

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --create --partitions 3 --replication-factor 3 -topic tel

(2)定义类TelPartitioner实现 Partitioner 接口。 需要重写 partition()方法

/**

    1. 实现接口Partitioner
    1. 实现3个方法:partition,close,configure
    1. 编写partition方法,返回分区号

*/

public class TelPartitioner implements Partitioner {

/**

 返回信息对应的分区

 @param topic         主题

 @param key           消息的key

 @param keyBytes      消息的key序列化后的字节数组

 @param value         消息的value

 @param valueBytes    消息的value序列化后的字节数组

 @param cluster       集群元数据可以查看分区信息

 @return

 */

@Override

public int partition(String topic,Object key,byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

    // 获取消息

    String msgValue = value.toString();

    // 创建partition

    int partition;

    // 判断消息是的前3位

    if ("138".equals(msgValue.substring(0,3))){

        partition = 0;

    }else if("139".equals(msgValue.substring(0,3))){

        partition = 1;

    }else {

        partition = 2;

    }

    // 返回分区号

    return partition;

}

// 关闭资源

@Override

public void close() {

}

// 配置方法

@Override

public void configure(Map<String, ?> configs) {

}

}

(3)在CustomProducer类中添加重载方法,在生产者的配置中添加分区器参数

/**

 * 发送消息到自定义分区

 * @param topic

 * @param partitioner

 * @param msg

 */

public void AsyncSendMsgAndCallback(String topic, Partitioner partitioner,String msg) {

    KafkaProducer<String, String> kafkaProducer = null;

    try {

// 0. 添加自定义分区器

        KafkaConfig.properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,partitioner.getClass().getName());

        // 1. key-value序列化

        KafkaConfig.serializer();

        // 2. 创建kafka生产者对象

        kafkaProducer = new KafkaProducer<String,String>(KafkaConfig.properties);

        // 3. 调用send方法,发送消息

        ProducerRecord<String, String> message = new ProducerRecord<>(topic, msg);

        // 3.1  添加回调 new Callback()

        kafkaProducer.send(message,new Callback() {

            // 3.2 重写onCompletion方法,该方法在Producer收到ack时调用,为异步调用

            @Override

            public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                if (e == null) {

                    // 没有异常,输出信息到控制台

                    System.out.println(" 主 题 : " +

                            recordMetadata.topic() + "->"  + "分区:" + recordMetadata.partition());

                } else {

                    // 出现异常打印

                    e.printStackTrace();

                }

            }

        });

    } catch (Exception e) {

        throw new RuntimeException(e);

    } finally {

        // 4. 关闭资源

        kafkaProducer.close();

    }

}

(4)编写并执行测试类,在控制台观察回调信息

@Test

public void test6() throws InterruptedException {

    String[] msgs =  {"13857849003","13978493009","15089233993","17179102889","13879102889"};

    // 自定义分区器

    Partitioner partitioner = new TelPartitioner();

    for (String msg : msgs){

        customProducer.AsyncSendMsgAndCallback("tel",partitioner,msg);

        // 延迟一会会看到数据发往不同分区

        Thread.sleep(500);

    }

}

程序的执行结果:

主 题 : tel->分区:0

主 题 : tel->分区:1

主 题 : tel->分区:2

主 题 : tel->分区:2

主 题 : tel->分区:0

4.5 生产经验

1、如何提高吞吐量

Kafka的吞吐量可以通过多种参数进行调整以提高。以下是一些关键参数及其说明:

(1)num.partitions: 增加分区数可以并行化处理,从而提高吞吐量。

(2)replication.factor: 增加副本数可以提供高可用性,但也会增加吞吐量,因为Kafka可以在副本之间负载均衡。

(3)message.max.bytes: 增加最大消息大小可以处理大型消息。

(4)batch.size: 增加批处理大小可以减少网络请求的次数,从而提高吞吐量。

(5)linger.ms: 延迟发送以填满更多消息,从而提高批处理效率。

(6)max.request.size: 增加最大请求大小可以处理大型批次。

(7)receive.buffer.bytes 和 send.buffer.bytes: 增加网络接收和发送缓冲区的大小可以提高吞吐量。

调整Kafka配置参数通常在server.properties文件中进行。你可以根据你的应用需求进行调整。

在KafkaConfig类中添加配置属性方法:

/**

 *  设置批次大小

 * @param batchSize 批次大小,默认16K

 */

public static void setBatchSize(Long batchSize) {

    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);

}

/**

 * 设置等待时间

 * @param lingerMs 等待时间,默认 0

 */

public static void setLingerMs(Long lingerMs) {

    properties.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);

}

/**

 * 设置RecordAccumulator缓冲区大小

 * @param bufferMemory 缓冲区大小,默认 32M

 */

public static void setBufferMemory(Long bufferMemory) {

    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);

}

/**

 * 设置compression.type:压缩

 * @param compressionType 压缩类型,默认 none,可配置值 gzip、snappy、lz4 和 zstd

 */

public static void setCompressionType(String compressionType) {

    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,compressionType);

}

2、 如何提高数据可靠性

回顾生产者消息发布流程,其应答级别分为0,1,-1,如下图所示:

(1)acks=0,生产者发送过来数据就不管了,可靠性差,效率高。

(2)acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等。

(3)acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低。

在生产环境中,acks= 0很少使用,acks=1一般用于传输普通日志,允许丢个别数据, acks=-1一般用于传输高可靠性(例如钱)相关的数据,对可靠性要求比较高的场景。

根据生产经验,数据数据完全可靠条件如下:

数据完全可靠条件 = ACK级别=-1 + 分区副本>=2 + ISR里应答的最小副本数量>=2

在KafkaConfig类中添加配置属性方法:

/**

 * 设置 acks

 * @param level ack级别:0,1,-1

 */

public static void setAcks(String level) {

    properties.put(ProducerConfig.ACKS_CONFIG, level);

}

/**

 * 设置重试次数 retries

 * @param times 重试次数,默认是 int 最大值,2147483647

 */

public static void setRetries(int times) {

    properties.put(ProducerConfig.RETRIES_CONFIG, times);

}

【思考一】Leader收到数据,所有Follower都开始同步数据,但有一个Follower因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。 这样就不用等长期联系不上或者已经故障的节点。

【思考二】如果分区副本设置为1个,或 者ISR里应答的最小副本数量 ( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,那么数据是不是就完全可靠了呢?

仍然有丢数的风险(leader:0,isr:0)。

3、数据去重

(1)数据传递语义

①至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2 。可以保证数据不丢失,但是不能保证数据不重复。

②最多一次(At Most Once)= ACK级别设置为0。可以保证数据不重复,但是不能保证数据不丢失。

③精确一次(Exactly Once) = 幂等性 + 至少一次。对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

(2)幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。 即1n=1(n为次数)

重复数据的判断标准:具有相同主键(key)的消息提交时,Broker只会持久化一条。数据封装成 (pid,partition,seqnumber)。

其中:

PID是Kafka每次重启都会分配一个新的序号。

Partition 表示分区号。

Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。 因为生产者producer写到kafka可能会出现消息重复,比如设置ack=all,写入到kafka的leader时,leader挂掉了,没有及时反馈ack,导致生产者再次发送消息就会出现重复消息落盘。这种情况可以设置kafka的属性用来开启幂等。但是这种幂等只能保证 producer没有挂掉的情况下,因为幂等的原理是 kafka缓存了一份 pid,partition,seqnumber 的数据,如果命中则说明之前缓存了,但是如果producer挂掉了重启后,它的pid就会变化,partition也有可能变化,就会导致消息会出现重复状况。所以kafka 0.11版本加入了事务机制(见6.2)。

如何使用幂等性

开启参数 enable.idempotence 默认为 true,false 关闭。

在KafkaConfig类中添加配置属性方法:

/**

 * 开关幂等性

 * @param opened

 */

public static void setIdempotence(Boolean opened){

    properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,opened);

}

【例4.5-01】实现发送消息精确一次和去重。

(1)在CustomProducer类中添加重载方法

/**

 * 发送精确一次+去重消息

 * @param topic

 * @param key

 * @param msg

 */

public void ExactlyOnceSend (String topic,String key,String msg){

    KafkaProducer<String, String> kafkaProducer = null;

    try {

        /*

         0.精确一次 = 幂等性 + 至少一次

         至少一次 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

         */

        // 0.1 设置幂等性,默认为true

        KafkaConfig.setIdempotence(true);

        // 0.2 设置ACK级别

        KafkaConfig.setAcks("-1");

        // 1 key-value序列化

        KafkaConfig.serializer();

        // 2. 创建kafka生产者对象

        kafkaProducer = new KafkaProducer<String, String>(KafkaConfig.properties);

        // 3. 调用send方法,发送消息

        ProducerRecord<String, String> message = new ProducerRecord<>(topic,key,msg);

        // 3.1  添加回调 new Callback()

        kafkaProducer.send(message, new Callback() {

            // 3.2 重写onCompletion方法,该方法在Producer收到ack时调用,为异步调用

            @Override

            public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                if (e == null) {

                    // 没有异常,输出信息到控制台

                    System.out.println(" 主 题 : " +

                            recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());

                } else {

                    // 出现异常打印

                    e.printStackTrace();

                }

            }

        });

    } catch (Exception e) {

        //关闭资源

        kafkaProducer.close();

        throw new RuntimeException(e);

    }

}

(2)测试,查看运行结果:

@Test

public void test7() throws InterruptedException {

    Map <String,String> msgs = new HashMap<>();

    msgs.put("w","Hello Hadoop");

    msgs.put("v","Hello Hive");

    msgs.put("f","Hello Flume");

    msgs.put("k","Hello Kafka");

    msgs.put("f","Hello Flume");

    msgs.put("k","Hello Kafka");

    for (String key : msgs.keySet()){

        customProducer. ExactlyOnceSend ("first",key,msgs.get(key));

        // 延迟一会会看到数据发往不同分区

        Thread.sleep(500);

    }

}

程序执行结果:

主 题 : first->分区:2

主 题 : first->分区:0

主 题 : first->分区:0 第3条已去重

主 题 : first->分区:2 第4知已去重

5 Kafka 消费者

5.1 Kafka 消费方式

1、pull (拉)模式:

客户端主动从服务端拉取数据。

优点:客户端可以根据自己的消费能力来消费数据,不存在消息堆积的情况。

缺点:消息处理可能不及时,可能存在大量无效请求,客户端需要考虑拉取频率逻辑。

在Kafka中,使用该模式时,消费者可以自主选择从哪个分区开始拉取消息,并可以自主控制拉取消息的速度。Kafka作为消费者维护着一个offset,表示消费者已经消费的消息偏移量;当消费者拉取消息时,Kafka会返回该消费者还没有消费的消息。

Pull模式允许客户端根据自己的需求和能力来获取数据。但是,这也意味着客户端需要更多的逻辑来控制数据的拉取和处理。

2、push (推)模式:

Push模式是一种消息传递模式,其中服务端主动将消息推送给客户端。

优点:消息处理的及时性很高,一旦服务端收到消息后,就立刻将消息推送给消费者,消费者能立刻对收到的消息进行消费。

缺点:当消息量比较大时,对消费者性能要求较高,由于消费者无法控制服务端消息的推送速度,因此一旦消息量大,那么消费者消费的压力就比较大。

Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的消费速率。例如推送的速度是50m/s,Consumer1、Consumer2就来不及处理消息。

Push模式提供了消息处理的及时性,但是在处理大量消息时可能会对消费者造成压力。

5.2 Kafka 消费者工作流程

1、消费者总体工作流程

(1)生产者向分区中的每个Leader发送一批批的数据。

(2)Follower主动与Leader同步数据,保证数据的可靠性。

(3)一个分区的数据只能被一个消费者消费。消费者可以消费某一个分区的数据,一个消费者也可以消费多个分区的数据,消费者与消费者之间是完全独立的。

(4)每一个分区的数据只能由消费者组中的一个消费者进行消费。(把消费者组当成一个独立的消费者,同一个分区不能由同一个消费者组里面两个及以上的消费者消费)。

(5)消费到哪里的具体位置为offset,offset保存在系统主题_consumer_offsets中。

2、消费者组原理

(1)消费组Consumer Group(CG)

由多个消费者的groupid相同consumer组成。

①消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

②消费者组之间互不影响。所有的消费者都属于某个消费者组,消费者组是逻辑上的一个订阅者。

③如果消费组中的消费者数超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。

(2)协调者coordinator

辅助实现消费者组的初始化和分区的分配。

coordinator节点选择 = groupid的hashcode值 % 50(consumer_offsets的分区数量)

例如: groupid的hashcode值 = 1,1 % 50 = 1,那么 consumer_offsets 主题的1号分区在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

(3)消费者组初始化流程

生产者把数据发送到Kafka集群,选择节点的coordinator。

①每个消费者都往选出的coordinator发送请求,表示要加入到组当中。

②coordinator会从消费者中选出一个消费者作为Leader。

③coordinator会把收集到的所有topic信息都发送给消费者的Leader。

④Leader制定消费方案。

⑤制定计划后,Leader将消费方案发给coordinator。

⑥coordinator把消费方案下发给各个消费者。

⑦每个消费者会定期给coordinator发送心跳反应(默认3s),一旦超时(session.timeout.ms=45s)则该消费者会被移除并触发再平衡,别的消费者继续完成接下来的任务;或消费者处理消息的时间过长(max.poil.interval.ms=5分钟),也会触发再平衡。

(4)消费者组详细消费流程

①消费者组创建消费者网络连接客户端,主要用于与Kafka集群进行交互。

②消费者调用sendFetches方法用于抓取数据的初始化。

③消费者网络连接客户端调用send方法发送请求。

④Leader通过回调方法onSuccess把数据拉取到消息队列里。

⑤消费者一次拉取一批次数据,经过反序列化、拦截器再进行数据处理。

(3)消费者重要参数

参数名称 描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和
value.deserializer
指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions __consumer_offsets 的分区数,默认是 50 个分区。
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。
该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes 默认 Default: 52428800(50 m)。消费者获取服务器端一批
消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝
对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

5.3 消费者 API

消费者消费消息时,必须配置消费组。消费类型分为:

(1)订阅主题

(2)订阅分区

【例5.3-01】创建一个消费者,消费 first 主题中数据。

(1)在KafkaConfig类添加反序列化方法

/**

 * key,value反序列化

 */

public static void  deserializer(){

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class.getName());

    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class.getName());

}

(2)创建CustomConsumer消费者类

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.example.config.KafkaConfig;

import java.time.Duration;

import java.util.List;

public class CustomConsumer {

/**

 * 消费主题数据

 * @param topic

 * @param groupId

 */

public void subscribe(String topic, String groupId){

    // 1. 反序列化k,v

    KafkaConfig.deserializer();

    // 2. 配置消费组id

    KafkaConfig.properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

    // 3. 创建消费者对象

    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(KafkaConfig.properties);

    // 4. 注册要消费的主题,可以同时消费多个主题

    List<String>  topics = new ArrayList<>();

    topics.add(topic);

    kafkaConsumer.subscribe(topics);

    // 5. 拉取数据并打印

    while (true) {

        // 5.1 设置1s中消费一批数据

        ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

        // 5.2 打印消费到的数据

        for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {

            System.out.println(consumerRecord.topic()+ "->" + consumerRecord.partition() 

                    + ":" + consumerRecord.value());

        }

    }

}

/**

 * 消费主题数据

 * @param topics  可以同时消费多个主题

 * @param groupId

 */

public void subscribe(List<String> topics, String groupId){

    // 1. 反序列化k,v

    KafkaConfig.deserializer();

    // 2. 配置消费组id

    KafkaConfig.properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

    // 3. 创建消费者对象

    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(KafkaConfig.properties);

    // 4. 注册要消费的主题

    kafkaConsumer.subscribe(topics);

    // 5. 拉取数据并打印

    while (true) {

        // 5.1 设置1s中消费一批数据

        ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

        // 5.2 打印消费到的数据

        for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {

           System.out.println(consumerRecord.topic()+ "->" + consumerRecord.partition() 

                    + ":" + consumerRecord.value());

        }

    }

}

}

(3)创建测试类CustomConsumerTest,并执行测试方法

public class CustomConsumerTest {

CustomConsumer customConsumer = new CustomConsumer();

@Test

public void test1(){

    customConsumer.subscribe("first","g_first");

}

}

消费者程序执行后,一直处于监听状态,等待发送者发送消息。

(4)启动生产者,并发送数据

运行CustomProducerTest的test5()方法。

(5)在 IDEA 控制台观察接收到的数据。

first->2:Hello Hive

first->0:Hello Flume

first->0:Hello Hadoop

first->2:Hello Kafka

(6)用两个消费者消费主题first消息

①在测试类添加测试方法,消费的主题和消费组名相同,执行程序

@Test

public void test2(){

    customConsumer.subscribe("first","g_first");

}

②重复第(4)步

③分别查看test1,test2的控制台:

test1的输出:

first->2:Hello Hive

first->2:Hello Kafka

test2的输出:

first->0:Hello Flume

first->0:Hello Hadoop

从上例可以看出:

  • 一个主题(或多个主题)被一个消费组的一个或多个消费者消费。
  • 一个分消费者可以消费多个分区。
  • 一个分区只能被一个消费者消费。
  • 一个消费组可以组织多多消费者对主题和不同分区进行消费,增加并行度。

【例5.3-02】创建消费者,消费指定分区的数据。

(1)在CustomConsumer类添加方法

/**

 * 消费指定分区数据

 * @param topic

 * @param groupId

 * @param partitions 可以消费多个分区

 */

public void assign(String topic,String groupId,int... partitions){

    // 1. 反序列化k,v

    KafkaConfig.deserializer();

    // 2. 配置消费组id

    KafkaConfig.properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

    // 3. 创建消费者对象

    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(KafkaConfig.properties);

    // 4. 消费某个主题的某个分区数据

    ArrayList<TopicPartition> topicPartitions = new ArrayList<>();

    // 4.1 添加多个分区

    for(int partition : partitions){

        topicPartitions.add(new TopicPartition(topic, partition));

    }

    // 4.2 消费数据

    kafkaConsumer.assign(topicPartitions);

    // 5. 拉取数据并打印

    while (true) {

        // 5.1 设置1s中消费一批数据

        ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

        // 5.2 打印消费到的数据

        for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {

            System.out.println(consumerRecord.topic()+ "->" + consumerRecord.partition() 

                    + ":" + consumerRecord.value());

        }

    }

}

(3)在测试类添加测试方法,并执行测试方法

@Test

public void test3(){

    customConsumer.assign("first","g_first",1,2);

}

(4)运行生产者测试类CustomProducerTest的test5()方法,向多分区添加数据

(5)在 IDEA 控制台,观察接收到的数据,只能消费到 1,2 号分区数据。

first->2:Hello Hive

first->2:Hello Kafka

5.4 生产经验

1、解决问题:

(1)一个consumer group中有多个consumer组成,一个 topic有多个partition组成,到底由哪个consumer来消费哪个 partition的数据?

(2)Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略。

2、分区策略及参数配置

参数名称 描述
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡。 partition.assignment.strategy 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。 可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、 CooperativeSticky

3、Range 以及再平衡

Range 是对每个 topic 而言的。 首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。 假如现在有 7 个分区,3 个消费者,排序后的分区将会 是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。 例如,7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多 消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。 通过 partitions数/consumer数来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。

注意:如果只是针对 1 个 topic 而言,C0消费者多消费1 个分区影响不是很大。但是如果有 N个 topic,那么针对每 个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!

【例5.4-01】Range 以及再平衡策略

(1)创建2个主题test1,test2,分别为3个副本,5个分区

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --create --partitions 5 --replication-factor 3 -topic test1

root@14b37abf58ad:/# kafka-topics.sh --bootstrap-server 192.168.100.104:9092 --create --partitions 5 --replication-factor 3 -topic test2

(2)在customComsumeTest类添加三个测试方法,并执行程序

@Test

public void test4(){

    List<String> topics = new ArrayList<>();

    topics.add("test1");

    topics.add("test2");

    customConsumer.subscribe(topics,"g_test");

}

@Test

public void test5(){

    List<String> topics = new ArrayList<>();

    topics.add("test1");

    topics.add("test2");

    customConsumer.subscribe(topics,"g_test");

}

@Test

public void test6(){

    List<String> topics = new ArrayList<>();

    topics.add("test1");

    topics.add("test2");

    customConsumer.subscribe(topics,"g_test");

}

(3)在customProducerTest添加测试方法,并执行代码

@Test

public void test8() throws InterruptedException {

    String[] msgs = {"Hello World","Hello Hadoop","Hello Hive","Hello Flume","Hello Kafka"};

    String[] topics = {"test1","test2"};

    for(String topic : topics){

        for (int i = 0; i < msgs.length;i++){

            customProducer.AsyncSendMsgCallback(topic,String.valueOf(i),i,msgs[i]);

            // 延迟一会会看到数据发往不同分区

            Thread.sleep(500);

        }

    }

}

(4)观看 3 个消费者分别消费哪些分区的数据。

发现,10个分区按4,4,2消费的。

(5)测试Range 分区分配再平衡

①停止掉 test4 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

发现0,1号分区数据被test5号消费者读取。

说明:test4 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

②再次重新发送消息观看结果(45s 以后)。

发现:

test5号消费者:消费到 0、1、2号分区数据。

test6号消费者:消费到 3、4号分区数据。

说明:消费者 test4 已经被踢出消费者组,所以重新按照 range 方式分配。

4、RoundRobin 以及再平衡

RoundRobin 针对集群中所有Topic而言。 RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

【例5.4-02】RoundRobin 分区分配策略

(1)在KafkaConfig类添加方法

/**

 * 设置消费者消费策略

 * @param strategy 策略(roundRobin,sticky )

 */

public static void setStrategy(String strategy){

    if(strategy.toLowerCase().equals("rountrobin")){

        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,

                "org.apache.kafka.clients.consumer.RoundRobinAssignor");

    }else{

        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,

                "org.apache.kafka.clients.consumer.StickyAssignor");

    }

}

(2)在CustomConsumer类重载方法

/**

 * 指定消费策略

 * @param topics

 * @param groupId

 * @param strategy

 */

public void subscribe(List<String> topics,String groupId,String strategy ){

    // 1. 反序列化k,v

    KafkaConfig.deserializer();

    // 配置消费策略

    KafkaConfig.setStrategy(strategy);

    // 2. 配置消费组id

    KafkaConfig.properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

    // 3. 创建消费者对象

    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(KafkaConfig.properties);

    // 4. 注册要消费的主题

    kafkaConsumer.subscribe(topics);

    // 5. 拉取数据并打印

    while (true) {

        // 5.1 设置1s中消费一批数据

        ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

        // 5.2 打印消费到的数据

        for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {

            System.out.println(consumerRecord.topic()+ "->" + consumerRecord.partition() 

                    + ":" + consumerRecord.value());

        }

    }

}

(3)依次在 CustomConsumerTest类的test4、test5、test6 三个消费者测试类中修改代

码 ,并执行代码

// 修改分区分配策略消费

//customConsumer.subscribe(topics,“g_test”);

customConsumer.subscribe(topics,“g_test”,“roundrobin”);

(4)重启 3 个消费者,重复发送消息的步骤,观看分区结果。

发现,10个分区按4,3,3消费的。

(5)停止掉test4号消费者(45s以后,再发送消息)

发现,10个分区按5,5消费的。

说明:消费者 test4已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

4、Sticky 以及再平衡

Sticky粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

可以看到会尽量保持分区的个数近似划分分区。

【例5.4-03】Sticky分区策略

(1)依次在 CustomConsumerTest类的test4、test5、test6 三个消费者测试类中修改代

码 ,并执行代码

// 修改分区分配策略消费

//customConsumer.subscribe(topics,“g_test”);

//customConsumer.subscribe(topics,“g_test”,“roundrobin”);

customConsumer.subscribe(topics,“g_test”,“sticky”);

(4)重启 3 个消费者,重复发送消息的步骤,观看分区结果。

test4结果:

test1->1:Hello Hadoop

test2->0:Hello World

test2->2:Hello Hive

test2->4:Hello Kafka

test5结果:

test1->0:Hello World

test2->1:Hello Hadoop

test2->3:Hello Flume

test6结果:

test1->2:Hello Hive

test1->3:Hello Flume

test1->4:Hello Kafka

(5)停止掉test4号消费者(消费4个分区的消费者)(45s以后,再发送消息)

发现,test5号和test6号消费者原有消费分区并没有改变,只是新增了2个分区,test4 号消费者的任务会按照粘性规则,尽可能均衡的随机分成2个分区数据,分别由 test5 号消费者或者 test6 号消费者消费。

test5结果:

test1->0:Hello World

test2->1:Hello Hadoop

test2->3:Hello Flume

test1->1:Hello Hadoop

test2->2:Hello Hive

test6结果:

test1->2:Hello Hive

test1->3:Hello Flume

test1->4:Hello Kafka

test2->0:Hello World

test2->4:Hello Kafka

5.5 offset 位移

Kafka 的 offset 位移是指分区中的特定消息的位置指示器,表示从哪个位置开始读取或写入数据。在 Kafka 中,每个 consumer 都维护自己的 offset 位移,用于记录 consumer 消费消息的进度。

要管理 Kafka 的 offset 位移,通常有两种方式:自动提交和手动提交。在自动提交模式下,Kafka 会定期(或在特定条件下)自动提交消费者的 offset。在手动提交模式下,你需要在消费消息之后手动调用 API 来提交 offset。

1、offset 的默认维护位置

消费者位移offset存储在哪呢?

kafka0.9版本之前,consumer默认将offset保存在Zookeeper中。从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets,这样可以大量减少和zookeeper的交互。

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+

分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

2、提交 offset

(1)自动提交

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

参数名称 描述
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。

(2)在消费者API中添加以下语句,自动提交offset

// 是否自动提交offset

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

// 提交offset的时间周期1000ms,默认5s

properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

(2)手动提交

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。 手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试;而异步提交则没有失败重试机制,故有可能提交失败。

commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。

commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了。

①同步提交 offset

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

在消费者API中添加以下语句,同步提交offset

// 是否自动提交offset

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

// 在消费完成一批数据后,同步提交

consumer.commitSync();

②异步提交 offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

在消费者API中添加以下语句,同步提交offset

// 是否自动提交offset

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

// 在消费完成一批数据后,异步提交

consumer.commitAsync();

(3)指定 Offset 消费

当消费者组第一次消费或者服务器上不再存在当前偏移量(例如该数据已被删除)时,可以通过设置 auto.offset.reset 参数来指定偏移量的重置策略。

auto.offset.reset = earliest | latest | none

  • earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
  • latest(默认值):自动将偏移量重置为最新偏移量。
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
  • 任意指定 offset 位移开始消费

【例5.5-01】任意指定 offset 位移开始消费

(1)在CustomConsume类中重载方法

/**

 * 指定消费的偏移量

 * @param topics

 * @param groupId

 * @param offset

 */

public void subscribe(List<String> topics,String groupId,Long offset ){

    // 1. 反序列化k,v

    KafkaConfig.deserializer();

    // 2. 配置消费组id

    KafkaConfig.properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

    // 3. 创建消费者对象

    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(KafkaConfig.properties);

    // 4. 注册要消费的主题

    kafkaConsumer.subscribe(topics);

    // 获取消费者分区分配信息(有了分区分配信息才能开始消费)

    Set<TopicPartition> assignment= new HashSet<>();

    while (assignment.size() == 0){

        kafkaConsumer.poll(Duration.ofSeconds(1));

        assignment = kafkaConsumer.assignment();

    }

    // 遍历所有分区,并指定offset的位置开始消费

    for (TopicPartition tp: assignment) {

        kafkaConsumer.seek(tp,offset);

    }

    // 5. 拉取数据并打印

    while (true) {

        // 5.1 设置1s中消费一批数据

        ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

        // 5.2 打印消费到的数据

        for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {

            System.out.println(consumerRecord.topic()+ "->" + consumerRecord.partition()

                    + ":" + consumerRecord.value());

        }

    }

}

(2)添加测试方法,进行测试

@Test

public void test7(){

    List<String> topics = new ArrayList<>();

    topics.add("first");

    // 注意:每次执行完,需要修改消费者组名

    customConsumer.subscribe(topics,"g_frist_10L",10L);

}

【例5.5-01】指定时间消费。在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。

(1)在CustomConsume类中重载方法

/**

 * 指定时间消费

 * @param topics

 * @param groupId

 * @param date

 */

public void subscribe(List<String> topics, String groupId, Date date ){

    // 1. 反序列化k,v

    KafkaConfig.deserializer();

    // 2. 配置消费组id

    KafkaConfig.properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

    // 3. 创建消费者对象

    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(KafkaConfig.properties);

    // 4. 注册要消费的主题

    kafkaConsumer.subscribe(topics);

    // 获取消费者分区分配信息(有了分区分配信息才能开始消费)

    Set<TopicPartition> assignment= new HashSet<>();

    while (assignment.size() == 0){

        kafkaConsumer.poll(Duration.ofSeconds(1));

        assignment = kafkaConsumer.assignment();

    }

    // 封装集合存储,每个分区对应的指定时间数据

    Map<TopicPartition,Long> timestampToSearch = new HashMap<>();

    for (TopicPartition topicPartition : assignment) {

        timestampToSearch.put(topicPartition,date.getTime());

    }

    // 获取指定时间开始消费的每个分区的offset

    Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);

    // 遍历每个分区,对每个分区设置消费时间。

    for (TopicPartition topicPartition : assignment) {

        OffsetAndTimestamp 	offsetAndTimestamp 	= offsets.get(topicPartition);

        // 根据时间指定开始消费的位置

        if (offsetAndTimestamp != null){

            kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());

        }

    }

    // 5. 拉取数据并打印

    while (true) {

        // 5.1 设置1s中消费一批数据

        ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

        // 5.2 打印消费到的数据

        for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {

             System.out.println(consumerRecord.timestamp()+ ":" +

                    consumerRecord.topic()+ "->" + consumerRecord.partition()

                    + ":" + consumerRecord.value());

        }

    }

}

(2)添加测试方法,进行测试

@Test

public void test8() throws ParseException {

    List<String> topics = new ArrayList<>();

    topics.add("first");

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    Date startTime = sdf.parse("2024-09-14 09:50:00");

    System.out.println(startTime.getTime()); //打印时间与结果进行对比

    customConsumer.subscribe(topics,"g_frist_09_50",startTime);

}

6 Kafka生产经验

6.1 延时队列和死信队列

Kafka的延时队列和死信队列通常是指消息的延迟处理和处理失败的消息的存储机制。在实际应用中,通常会结合这两种队列来实现延时处理和错误处理的机制。

1、延时队列

Kafka可以通过设置消息的时间戳(timestamp)和消息的生存时间(timeToLive,简称TTL)来实现延时队列。消息在达到一定年龄后才能被消费者消费。

2、死信队列

当Kafka中的消息消费失败,且达到预设的重试次数后,这些消息可以被重新发送到一个特定的死信队列(Dead-Letter-Queue,简称DLQ)。

在Kafka中没有提供延时功能,我们可以结合重试发送到原队列,实现延时消费。

【例6.1-01】实现延时消费。在实际生产中,一个未支付的订单在30分钟后过期,即在订单生成时由生产者发送订单编号,30分钟后消费者消费订单编号,查询该订单的状态,如果订单还是未支付状态,将此订单删除。

(1)在CustomProducer类添加延时发送方法

/**

 * 延时发送

 * @param topic

 * @param key

 * @param msg

 * @param delay 延时时间

 */

public void AsyncSendMsgCallback(String topic, String key,String msg,Long delay) {

    KafkaProducer<String, String> kafkaProducer = null;

    try {

        // 1. key-value序列化

        KafkaConfig.serializer();

        // 2. 创建kafka生产者对象

        kafkaProducer = new KafkaProducer<String, String>(KafkaConfig.properties);

        // 3. 调用send方法,发送消息

        ProducerRecord<String, String> message = new ProducerRecord<>(topic, key,msg);

        // 通过header传递参数

        message.headers().add(new RecordHeader("delayms",String.valueOf(delay).getBytes(StandardCharsets.UTF_8)));

        // 添加回调 new Callback(),重写onCompletion方法,该方法在Producer收到ack时调用,为异步调用

        kafkaProducer.send(message);

    } catch (Exception e) {

        throw new RuntimeException(e);

    } finally {

        // 4. 关闭资源

        kafkaProducer.close();

    }

}

(2)在CustomConsumer类添加延时消费方法

/**

 * 延时消费

 * @param topics

 * @param groupId

 * @param customProducer

 */

public void subscribe(List<String> topics, String groupId,CustomProducer customProducer){

    // 0.定义时间等级集合,单位为秒,这些等级提供了灵活的消息延迟投递选项,适用于需要定时处理的任务。

    int[] levels = {1,5,10,30,60,2*60,3*60,4*60,5*60,6*60,7*60,8*60,9*60,10*60,20*60,30*60,60*60,2*60*60};

    int level = 0;//下标

    // 1. 反序列化k,v

    KafkaConfig.deserializer();

    // 2.1 配置消费组id

    KafkaConfig.properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

    // 2.1 禁用自动提交offset

    KafkaConfig.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

    // 3. 创建消费者对象

    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(KafkaConfig.properties);

    // 4. 注册要消费的主题,可以同时消费多个主题

    kafkaConsumer.subscribe(topics);

    // 5. 拉取数据并打印

    while (true) {

        // 5.1 设置消费一批数据

        ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(levels[level++]));

        // 5.2 打印消费到的数据

        Long current = System.currentTimeMillis() / 1000;//获取当前时间

        for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {

            // 获取消息的header

            Headers headers = consumerRecord.headers();

            // 遍历headers

            for (Header header : headers) {

                String headerKey = header.key();

                // 如果有延时标记,读取延时时间

                if(headerKey.equals("delayms")){

                    Long delay = Long.valueOf(new String(header.value()));

                    // 如果没有达到消费时间,重新发送到原队列,否则消费消息

                    if(current < delay / 1000){

                        String topic = consumerRecord.topic();

                        String key = consumerRecord.key();

                        String msg = consumerRecord.value();

                        customProducer.AsyncSendMsgCallback(topic,key,msg,delay);

                        // 手动提交offset

                        kafkaConsumer.commitSync();

                    }else {

                        System.out.println(consumerRecord.timestamp()+ ":" +

                                consumerRecord.topic()+ "->" + consumerRecord.partition()

                                + ":" + consumerRecord.value());

                    }

                }

            }

        }

        // 重置时间等级

        if(level > 17){

            level = 0;

        }

    }

}

(3)测试延时队列

①创建消费者,并执行程序

@Test

public void test9() throws ParseException {

    List<String> topics = new ArrayList<>();

    topics.add("second");

    customConsumer.subscribe(topics,"g_second",new CustomProducer());

}

②创建生产者,并发送消息

@Test

public void test9(){

    // 半分钟后消费

    Long timestramp = System.currentTimeMillis() + 30 * 1000;

    // 生成订单号

    String order = UUID.randomUUID().toString();

    customProducer.AsyncSendMsgCallback("second","delay",order,timestramp);

}    

在消费者端30秒钟后,消费到消息。

6.2 生产者事务

1、什么是事务

Kafka事务,是为确保在一个事务中发送的多条消息,要么都成功,要么都失败,没有事务反查机制。这里的多条消息不一定在同一个topic和partition,可以是发往多个topic和partition的消息。Kafka这种事务机制,单独使用场景不多。更多是配合Kafka幂等机制,实现Kafka的精确一次语义。

例如:将所有订单消息保存在Kafka主题Order,在Flink集群中运行一个计算任务,统计每分钟的订单收入,然后把结果保存在另一个Kafka主题Income。要保证计算结果准确,就要确保无论Kafka集群或者Flink集群中任何节点故障,每条消息都只能被计算一次,不能重复计算,否则计算结果就错。很重要的限制条件:数据须来自Kafka且计算结果都保存到Kafka,才可应用到Kafka的精确一次机制。

所以Kafka的精确一次是为解决在“读数据-计算-保存结果”的计算过程中,数据不重也不丢失。

2、事务协调者

在服务端协调整个事务。非独立进程,而是Broker进程的一部分,协调者和分区一样通过选举保证高可用。Kafka集群有一个特殊的用于记录事务日志的topic,该事务日志topic的实现和普通topic一样,里面记录数据类似“开启事务”和“提交事务”这样的事务日志。日志topic同样也包含很多分区。

Kafka集群中,可存在多个协调者,每个协调者负责管理和使用事务日志中的几个分区。就是为能并行执行多个事务,提升性能。

3、Kafka 事务原理

kafka的事务确保两点:

① 生产者到kafka服务端的事务保障

②消费者从kafka拉取数据的事务

kafka提供的事务机制是 第①点,对于第②点来说只能自己在消费端实现幂等性。

因为生产者producer写到kafka可能会出现消息重复,比如 设置ack=all,写入到kafka的leader时,leader挂掉了,没有及时反馈ack,导致生产者再次发送消息就会出现重复消息落盘。这种情况可以设置kafka的属性用来开启幂等。但是这种幂等只能保证 producer没有挂掉的情况下,因为幂等的原理是 kafka缓存了一份 pid,partition,seqnumber 的数据,如果命中则说明之前缓存了,但是如果producer挂掉了重启后,它的pid就会变化,partition也有可能变化,就会导致消息会出现重复状况。

开启时事务后,会存在 transaction_id ,封装成( transaction_id, pid,partition,seqnumber, 消费位置等等) 保存在kafka上,如果producer 挂了重新启动的时候,会自动寻找kafka中的这个 transaction_id,找到的话就会恢复到挂掉之前的状态 ,然后进行消费。kafka事务保证了要么全部成功,要么全部失败。

还有一个很重要的点是 要在consumer端 设置isolation.level为read_committed状态,它默认是read_uncommitted状态,这是什么意思呢?

也就是开启事务之后,生产者调用send发送数据时他就会直接向kafka插入数据,只不过是这个数据后面追加了一个状态,这个状态是read_uncommited代表未提交,只有producer调用了commitTransaction时候这些数据在kafka中才会都标记为read_commited。所以如果在 consumer消费方没有设置isolation.level为read_committed状态(默认是read_uncommited),那么当producer 出现异常或者宕机或者在事务提交之前发送的数据也依然能读取到。还有一个情况是,当生产者中代码捕获到了异常,并进行abortTransaction,而消费者并没有设置隔离级别为read_committed,但却读不到消息呢,那我们可以想象成当生产者调用abortTransaction时接下来的消息肯定不会发送到服务器,并且已发送到服务器上的消息还会直接删掉,这样理解就可以了。

4、Kafka 事务操作步骤

① 设置transactional.id 和消费级别

Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。有 了 transactional.id,即使客户端挂掉了, 它重启后也能继续处理未完成的事务。

在KafkaConfig类添加方法

/**

 * 设置事务id

 * @param tid 事务id(任意起名)

 */

public static void setTransactionId(String tid) {

    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,tid);

}

/**

 * 消费已提交数据

 * @param tid

 */

public static void setIsolation(String tid) {

    KafkaConfig.properties.put("isolation.level", "read_committed");

}

② 初始化事务

void initTransactions();

③ 开启事务

void beginTransaction() throws ProducerFencedException;

④ 发送或消费数据

在事务内提交已经消费的偏移量(主要用于消费者)

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

⑤ 提交事务或回滚事务

void commitTransaction() throws ProducerFencedException;

void abortTransaction() throws ProducerFencedException;

(3)Kafka 事务操作实例

单个 Producer,使用事务保证消息的仅一次发送

① 在CustomProducer类重载方法

/**

 * 生产者事务

 * @param msgMap

 * @param tid

 */

public void sendTrasactionMsg(Map<String,List<String>> msgMap, String tid) {

    // 1. key-value序列化

    KafkaConfig.serializer();

    // 2 生成事务id

    KafkaConfig.setTransactionId(tid);

    // 3. 创建kafka生产者对象

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(KafkaConfig.properties);

    // 4. 调用send方法,发送消息

    try {

        // 4.1初始化事务

        kafkaProducer.initTransactions();

        // 4.2 开启事务

        kafkaProducer.beginTransaction();

        // 4.3 发送数据

        for(String topic : msgMap.keySet()){

            /**

             * 判断一个主题是否存在

             */

            // 创建AdminClient实例

            AdminClient admin = AdminClient.create(KafkaConfig.properties);

            // 获取所有主题的列表

            ListTopicsResult topics = admin.listTopics();

            KafkaFuture<Set<String>> names = topics.names();

            if(!names.get().contains(topic)){

                throw new Exception("没有创建主题" + topic);

            }

            for(String msg : msgMap.get(topic)){

                ProducerRecord<String, String> message = new ProducerRecord<>(topic, msg);

                kafkaProducer.send(message, new Callback() {

                    @Override

                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                        if (e == null) {

                            // 没有异常,输出信息到控制台

                            System.out.println(" 主 题 : " + recordMetadata.topic()

                                    + "->分区:" + recordMetadata.partition());

                        } else {

                            // 出现异常打印

                            e.printStackTrace();

                        }

                    }

                });

            }

        }

        // 4.4 提交事务

        kafkaProducer.commitTransaction();

    }catch (Exception e){

        // 4.4 回滚事务

        kafkaProducer.abortTransaction();

    }finally {

        // 关闭资源

        kafkaProducer.close();

    }

}

② 在CustomConsumer类重载方法

/**

 * 消费已提交事务数据

 * @param topics  可以同时消费多个主题

 * @param groupId

 */

public void subscribeTransaction(List<String> topics, String groupId){

    // 0. 读取已提交数据

    KafkaConfig.setIsolation();

    // 1. 反序列化k,v

    KafkaConfig.deserializer();

    // 2. 配置消费组id

    KafkaConfig.properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

    // 3. 创建消费者对象

    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(KafkaConfig.properties);

    // 4. 注册要消费的主题

    kafkaConsumer.subscribe(topics);

    // 5. 拉取数据并打印

    while (true) {

        // 5.1 设置1s中消费一批数据

        ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

        // 5.2 打印消费到的数据

        for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {

            System.out.println(consumerRecord.topic()+ "->" + consumerRecord.partition()

                    + ":" + consumerRecord.value());

        }

    }

}

③编写消费者测试方法,并执行程序

@Test

public void test10(){

    List<String> topics = new ArrayList<>();

    topics.add("first");

    topics.add("last");

    customConsumer.subscribeTransaction(topics,"g_tran");

}

④编写生产者测试方法,并执行程序

@Test

public void test10(){

    // 生成事务ID

    String tid = UUID.randomUUID().toString();

    // TreeMap按主题排序,先发送first成功,再发送last失败

    Map<String, List<String>> map = new TreeMap<>();

    List<String> firstMsgs = new ArrayList<>();

    List<String> lastMsgs = new ArrayList<>();

    firstMsgs.add("first消息1");

    firstMsgs.add("first消息2");

    lastMsgs.add("last消息1");

    lastMsgs.add("last消息2");

    map.put("first",firstMsgs);

    map.put("last",lastMsgs);//主题last是不存在的

    // 发送批量消息

    customProducer.sendTrasactionMsg(map,tid);

}

查看消费者测试方法的消费结果,如果消费主题是已创建的主题,可以消费数据,如果消费主题未创建,消费不以消费,生产者未提交成功。

6.3数据有序

1、数据有序的重要性

Kafka是一款流行的分布式消息队列系统,它能够快速、可靠地处理大量数据。在使用Kafka进行数据传输时,数据的有序性是一个重要的问题。如果数据按照预期顺序到达,那么应用程序可以更好地处理这些数据;但如果数据乱序到达,那么可能会导致应用程序出现错误或异常。

保证事件处理的顺序性和因果关系,避免业务逻辑错误。例如转账业务,必须确保先扣款后充值,如果顺序颠倒会造成账户余额计算错误。

维护数据库和数据存储的一致性。如果同一资源的变更事件被乱序处理,很可能会导致数据不一致的问题。计算指标和统计结果必须依赖有序数据。如果处理顺序错乱,统计页面浏览量、排名等都将错误。

2、数据有序和数据乱序

Kafka 在消息处理的顺序性方面有一些机制,但并不保证消息的严格有序性。以下是 Kafka 处理消息顺序性的一些特点:

(1)分区内有序性: 在每个分区内,消息是有序存储的。Kafka 保证对于每个分区,消息的写入和消费是按照消息的顺序进行的。这意味着对于同一个分区的消息,它们将按照发送的顺序被消费。这样保证了在单个分区内的消息顺序性。

(2)分区间无序性: 在多个分区之间,消息的顺序性不能得到保证。不同分区的消息在 Kafka 集群中是并行处理的,而且 Kafka 也不会跨分区地维护全局有序性。因此,对于多个分区的消息,它们在消费者端接收的顺序可能与发送顺序不一致。

(3)消息复制: Kafka 支持多副本复制,每个分区可以有多个副本存储在不同的 Broker 上。在进行消息复制时,Kafka 会保证消息的副本在各个 Broker 上的复制顺序与领导者(Leader)分区中的消息顺序保持一致,从而确保数据的一致性。

3、如何保证数据有序性

(1)Topic配置Partition数为1,这样全局只有一个Partition,数据默认有序。但这制约了Kafka的可扩展性。

(2)Producer端对消息打上全局唯一的序号ID,或者使用Kafka自带的分区器按序号对消息Partition。消费时可以按序号排序。

(2)Consumer端采用只订阅单个Partition的方式消费数据,而不是跨Partition订阅,这保证了一个Partition顺序。Consumer端自己维护偏移及排序,跨Partition订阅后MERGE排序。可在DB中存 last offset。

(3)将相关联的消息发送到同一个Partition,不同类型消息分配到不同Partition。减小分区数可减少排序问题。采用可以重置偏移的Kafka消费者模式,以读取历史有序数据。采用Spark Streaming等支持有序接收Kafka数据并处理的框架。

(4)开启幂等性,将 max.in.flight.requests.per.connection设置为小于等于5。启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据, 如果开启了幂等性且缓存的 请求个数小于等于5个。会在服务端重新排序,故无论如何,都可以保证最近5个request的数据都是有序的。

综上所述,Kafka 在分区内保证消息的顺序性,但在分区间不能保证消息的严格有序性。这是因为 Kafka 的设计目标是实现高吞吐量和可伸缩性,而严格保证全局有序性会带来性能和可用性方面的限制。因此,对于一些应用场景,可能需要通过其他方式来实现更严格的消息有序性,例如应用程序层面的排序或使用单个分区来确保全局有序性。

7 Kafka Broker

7.1 Broker工作流程

1、Broker

Broker是指Kafka集群中的一个节点,负责处理客户端请求,时也处理客户端发送数据的存储与复制。Kafka集群由多个Broker节点组成,并且每个Broker都有全局唯一的id标识。可以说: Broker是整个Kafka集群的基本单元,也是整个集群的核心组件。

可以通过以下命令在zookeeper中查看:

[root@hadoop104 ~]# docker exec -it zk1 /bin/bash

root@245b7b533b30:/apache-zookeeper-3.5.8-bin# zkCli.sh

[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids

[1, 2, 3]

2、Controller

(1)Controller

一般集群模式分为主从模式、主备模式等。但是在kafka集群中各broker间不存在主从、主备的关系。 但是为了能够更好的管理broker上下线、topic分区等工作,broker间会选取出Controller的角色,由当前角色做管理工作。 整个集群中只会出现一个Controller角色。Controller一般是与Zookeeper协助管理整个Kafka集群。

可以通过以下命令在zookeeper中获取Controller:

[zk: localhost:2181(CONNECTED) 4] get /controller

{“version”:1,“brokerid”:1,“timestamp”:“1726186563322”}

(2)Controller职责

当某个broker成功当选Controller之后,那么他将为整个Kafka集群服务,包括但不限定于:

  • 集群内Broker节点管理: 包括新增、下线等。
  • 副本Leader选举、分区重分配。

总之,所有与Zookeeper相交互的工作,都是Controller角色来完成。

(3)选举过程

Controller是在broker间的选举产生的:

①在每个broker启动时,都会在zookeeper中注册,而第一个注册成功的broker节点即被选举为Controller。

②如果Broker0中Leader挂了,Controller监听到节点变化,获取ISR,选举新的Leader (在isr中存活为前提,按照AR中排在前面的优先)。

AR:Kafka分区中的所有副本。

ISR:表示和 Leader 保持同步的 Follower 集合。

具体流程如下图所示:

3、Broker 重要参数

参数名称 描述
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hours Kafka 中数据保存的时间,默认 7 天。
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

7.2 Kafka 副本

Kafka 副本作用:

(1)提高数据可靠性。

(2)Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

(3)Kafka中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。

Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

AR = ISR + OSR

ISR:表示和 Leader 保持同步的 Follower 集合。

OSR:表示 Follower 与 Leader 副本同步时,延迟过多的副本。

7.3 文件存储

1、文件存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

说明:日志存储参数配置

参数 描述
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。

2、文件清理策略

对于传统的MQ而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,可能永久保留所有数据(实际地没必要),因此Kafka提供两种策略删除旧数据:

(1)基于时间,Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认 7 天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

  • log.retention.bytes,默认等于-1,表示无穷大。

那么日志一旦超过了设置的时间或大小,怎么处理呢?

Kafka 中提供的日志清理策略有 delete 和 compact 两种:

(1)delete 日志删除:将过期数据删除

  • log.cleanup.policy = delete 所有数据启用删除策略

(2)compact 日志压缩:对于相同key的不同value值,只保留最后一个版本。

  • log.cleanup.policy = compact所有数据启用压缩策略

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。 这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息 集里就保存了所有用户最新的资料。

3、高效读写数据

(1)Kafka 本身是分布式集群,可以采用分区技术,并行度高。

(2)读数据采用稀疏索引,可以快速定位要消费的数据。

稀疏索引只为某些搜索码值建立索引记录;在搜索时,找到其最大的搜索码值小于或等于所查找记录的搜索码值的索引项,然后从该记录开始向后顺序查询直到找到为止。

(3)顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

(4)页缓存 + 零拷贝技术

零拷贝: Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。

PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。