目录
在上一篇我们深度解析了Kafka的运行操作原理以及集群消息消费机制等,请点击下方链接获取
本篇我们将着重实战
使用Kafka-Client实现消息收发
引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
发送端:
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@SpringBootTest
class KafkaProducerTests {
private final static String TOPIC_NAME = "muse-rp";
private final static Integer PARTITION_ONE = 0;
private final static Integer PARTITION_TWO = 1;
private final static Gson GSON = new Gson();
/**
* 同步阻塞——消息发送
*/
@Test
void testBlockingSendMsg() {
/** 初始化生产者属性信息 */
Properties properties = initProducerProp();
/** 创建消息发送的客户端 */
Producer<Integer, String> producer = new KafkaProducer<>(properties);
Message message;
for (int i=0; i< 3; i++) {
/** 构造消息 */
message = new Message(i, "BLOCKING_MSG_"+i);
ProducerRecord<Integer, String> producerRecord;
int SEND_MSG_METHOD = 0;
switch (SEND_MSG_METHOD) {
case 0: /** 【发送方式1】未指定发送的分区 */
producerRecord = new ProducerRecord<>(TOPIC_NAME, GSON.toJson(message));
break;
case 1: /** 【发送方式2】未指定发送的分区,根据第二个参数key来判断发送到哪个分区*/
producerRecord = new ProducerRecord<>(TOPIC_NAME, message.getMegId(), GSON.toJson(message));
break;
default: /** 【发送方式3】指定发送的分区 */
producerRecord = new ProducerRecord(TOPIC_NAME, PARTITION_ONE, message.getMegId(), GSON.toJson(message));
}
/** 同步阻塞——等待消息发送成功 */
try {
Future<RecordMetadata> recordMetadataFuture = producer.send(producerRecord);
log.info("调用send方法完毕,msg={}", producerRecord.value());
RecordMetadata recordMetadata = recordMetadataFuture.get();
log.info("[topic]={}, [partition]={}, [offset]={}", recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
} catch (Throwable e) {
log.error("发送消息异常!", e);
}
}
producer.close(); // close方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer
}
/**
* 异步回调——消息发送
*/
@Test
void testNoBlockingSendMsg() {
/** 初始化生产者属性信息 */
Properties properties = initProducerProp();
/** 创建消息发送的客户端 */
Producer<Integer, String> producer = new KafkaProducer<>(properties);
CountDownLatch latch = new CountDownLatch(5);
Message message;
for (int i=0; i< 5; i++) {
message = new Message(i, "NO_BLOCKING_MSG_" + i);
/** 指定发送的分区 */
ProducerRecord<Integer, String> producerRecord = new ProducerRecord(TOPIC_NAME, PARTITION_ONE,
message.getMegId(), GSON.toJson(message));
/** 异步回调方式发送消息 */
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
log.error("消息发送失败!", exception);
}
if (metadata != null) {
log.info("[topic]={}, [partition]={}, [offset]={}", metadata.topic(), metadata.partition(),
metadata.offset());
}
latch.countDown();
});
log.info("调用send方法完毕,msg={}", producerRecord.value());
}
producer.close();
}
/**
* 初始化生产者属性
*/
private Properties initProducerProp() {
Properties properties = new Properties();
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095"); // 配置kafka的Broker列表
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
/**
* 发出消息持久化机制参数
* acks=0: 表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息
* acks=1: 表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。
* 这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。
* acks=-1: 表示kafka ISR列表中所有的副本同步数据成功,才返回消息给客户端,这是最强的数据保证。min.insync.replicas 这个配置是
* 用来设置同步副本个数的下限的, 并不是只有 min.insync.replicas 个副本同步成功就返回ack。而是,只要acks=all就意味着
* ISR列表里面的副本必须都要同步成功。
*/
properties.put(ProducerConfig.ACKS_CONFIG, "1");
/**
* 发送失败重试的次数,默认是间隔100ms
* 重试能保证消息发送的可靠性,但是也可能造成消息重复发送,所以需要在消费者端做好幂等性处理
*/
properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms
/**
* 设置发送消息的本地缓冲区
* 如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值为32MB
*/
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024);
/**
* 设置批量发送消息的大小
* kafka本地线程会从缓冲区去取数据,然后批量发送到Broker,默认值16KB,即:一个批次满足16KB就会发送出去。
*/
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
/**
* 默认值为0,表示消息必须立即被发送,但这样会影响性能
* 一般设置10ms左右,也就是说这个消息发送完后会进入本地的一个批次中,如果10ms内,这个批次满足了16KB,那么就会随着批次一起被发送出去
* 如果10ms内,批次没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长
*/
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
/** 把发送的key和消息value从字符串序列化为字节数组 */
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
}
消费端:
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@SpringBootTest
class KafkaConsumerTests {
private final static String TOPIC_NAME = "muse-rp";
private final static String CONSUMER_GROUP_NAME = "museGroup";
private final static Integer PARTITION_ONE = 0;
private final static Gson GSON = new Gson();
/**
* 自动提交offset
*/
@Test
void testAutoCommitOffset() throws Throwable {
Properties properties = initConsumerProp();
/** 是否自动提交offset,默认:true*/
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
/** 自动提交offset的时间间隔 */
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
/** 配置Rebalance策略 */
// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays
// .asList(RangeAssignor.class, CooperativeStickyAssignor.class));
/** 创建消息发送的客户端 */
Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);
int RECV_MSG_METHOD = 0;
switch (RECV_MSG_METHOD) {
case 0: /** 【接收方式1】未指定接收的分区 */
consumer.subscribe(Lists.newArrayList(TOPIC_NAME));
break;
case 1: /** 【接收方式2】指定分区消费 */
consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
break;
case 2: /** 【接收方式3】指定从头开始消费 */
consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
break;
default: /**【接收方式4】指定分区和offset进行消费*/
consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
consumer.seek(new TopicPartition(TOPIC_NAME, PARTITION_ONE), 10);
}
ConsumerRecords<Integer, String> records;
while (true) {
/** 长轮询的方式拉取消息 */
records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),
record.offset(), record.value());
}
Thread.sleep(3000);
}
}
/**
* 手动提交offset
*/
@Test
void testManualCommitOffset() throws Throwable {
Properties properties = initConsumerProp();
/** 是否自动提交offset,默认:true*/
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/** 创建消息发送的客户端 */
Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Lists.newArrayList(TOPIC_NAME));
// consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));
ConsumerRecords<Integer, String> records;
while (true) {
/** 长轮询的方式拉取消息 */
records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),
record.offset(), record.value());
}
boolean isSync = true;
if (records.count() > 0) {
if (isSync) {
/** 【手动同步提交offset】当前线程会阻塞直到offset提交成功;常用同步提交方式 */
consumer.commitSync();
} else {
/** 【手动异步提交offset】当前线程提交offset不会阻塞,可以继续执行后面的逻辑代码 */
consumer.commitAsync((offsets, exception) -> {
log.error("offset={}", GSON.toJson(offsets));
if (exception != null) {
log.error("提交offset发生异常!", exception);
}
});
}
}
Thread.sleep(1000);
}
}
/**
* 初始化消费者配置
*/
private Properties initConsumerProp() {
Properties properties = new Properties();
// 配置kafka的Broker列表
// properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
/** 配置消费组——museGroup */
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
/**
* offset的重置策略——例如创建一个新的消费组,offset是不存在的,如何对offset赋值消费
* latest(默认):只消费自己启动之后发送到主题的消息。
* earliest:第一次从头开始消费,以后按照消费offset记录继续消费。
*/
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
/** Consumer给Broker发送心跳的时间间隔 */
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
/** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
/** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
/** key和value的反序列化 */
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return properties;
}
}
SpringBoot集成
引入maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
import javax.annotation.Resource;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.muse.springbootdemo.entity.Message;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class ProducerService {
private final static String TOPIC_NAME = "muse-rp";
private final static Integer PARTITION_ONE = 0;
private final static Integer PARTITION_TWO = 1;
@Resource
private KafkaTemplate<String, Message> kafkaTemplate;
/**
* 同步阻塞——消息发送
*/
public void blockingSendMsg() throws Throwable {
Message message;
for (int i=0; i< 5; i++) {
message = new Message(String.valueOf(i), "BLOCKING_MSG_SPRINGBOOT_" + i);
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_ONE,
"" + message.getMegId(), message);
SendResult<String, Message> sendResult = future.get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
log.info("---BLOCKING_MSG_SPRINGBOOT--- [topic]={}, [partition]={}, [offset]={}", recordMetadata.topic(),
recordMetadata.partition(), recordMetadata.offset());
}
}
/**
* 异步回调——消息发送
*/
public void noBlockingSendMsg() {
Message message;
for (int i=0; i< 5; i++) {
message = new Message(String.valueOf(i), "NO_BLOCKING_MSG_SPRINGBOOT_" + i);
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_ONE,
"" + message.getMegId(), message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息发送失败!", ex);
}
@Override
public void onSuccess(SendResult<String, Message> result) {
RecordMetadata recordMetadata = result.getRecordMetadata();
log.info("---NO_BLOCKING_MSG_SPRINGBOOT---[topic]={}, [partition]={}, [offset]={}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
});
}
}
}
消费端
package com.muse.springbootdemo.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;
import com.muse.springbootdemo.entity.Message;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class ConsumerService {
private final static String TOPIC_NAME = "muse-rp";
private final static String CONSUMER_GROUP_NAME = "museGroup";
/**
* 消息消费演示
*/
@KafkaListener(topics = TOPIC_NAME, groupId = CONSUMER_GROUP_NAME)
public void listenGroup(ConsumerRecord<String, Message> record) {
log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),
record.offset(), record.value());
}
}
下一篇将解析关于Kafka应用过程中的常见问题及大厂高频面试题
- 包括 1)防止消息丢失;2)防止重复消费通过幂等处理;3)顺序消费需单分区单消费者;4)消息积压时提升消费能力;5)延迟队列通过时间判断实现;6)高吞吐依靠页面缓存+顺序写+零拷贝技术等问题 将在12h内更新
Kafka应用过程中的高频问题