Spring框架集成Kakfa的方式
springboot集成kafka的方式
添加maven依赖
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置application.yml
spring:
kafka:
producer:
bootstrap-servers: ip:port
topics: topics
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
topics: topics
bootstrap-servers: ip:port
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
ssl.truststore.location: client.truststore.jks
ssl.truststore.password: trus_password
ssl.endpoint.identification.algorithm:
创建kafka生产者和消费者
在Spring Boot应用中,正确配置application.properties
或application.yml
后,Spring Boot的Kafka自动配置(KafkaAutoConfiguration
)会自动创建和装配KafkaTemplate
和KafkaConsumer
等相关的Bean。
KafkaTemplate
:用于发送消息到KafkaConsumerFactory
:创建Kafka消费者的工厂KafkaListenerContainerFactory
:为@KafkaListener
方法创建消息监听容器。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaMessageService {
@Value("${spring.kafka.producer.topics}")
private String outputTopic;
@Autowired
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* 监听输入主题的消息
* @param message 接收到的消息
*/
@KafkaListener(topics = "${spring.kafka.consumer.topics}")
public void listen(String message) {
log.info("Received message: message = {}", topic, message);
// todo 处理消息
// 发送到输出主题
kafkaTemplate.send(outputTopic, processedMessage);
log.info("Sent Processed Message: {}", processedMessage);
}
}
手动配置kafka生产者和消费者
如果需要更复杂的配置,也可以自定义kafka的配置类。
kafka消费者配置类:
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put("security.protocol", securityProtocol);
props.put("sasl.mechanism", saslMechanism);
props.put("sasl.jaas.config", saslJaasConfig);
props.put("ssl.truststore.location", truststoreLocation);
props.put("ssl.truststore.password", truststorePassword);
props.put("ssl.endpoint.identification.algorithm", endpointIdentificationAlgorithm);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 设置并发消费者数量,模拟多个独立的消费者并发处理消息
factory.setConcurrency(3);
// 设置手动提交
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
kafka生产者配置类:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keyDeserializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueDeserializer;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>(4);
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
监听消息并处理:
@Component
@Slf4j
public class KafkaMessageProcess {
@Value("${spring.kafka.producer.topics}")
private String outTopic;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}")
public void listen(@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment acknowledgment) {
log.info("Received message: topic = {}, message = {}", topic, message);
// 手动确认消息,提交当前消息的偏移量(offset)到Kafka。Kafka会记录这个偏移量,表示该消息(及之前的所有消息)已被成功消费。
acknowledgment.acknowledge();
}
private void process(String message) {
// todo process msg
}
}
KafkaListener 源码分析
@KafkaListener
的注册
- 扫描注解:在bean初始化阶段,
KafkaListenerAnnotationBeanPostProcessor
由于实现了BeanPostProcessor
,会扫描所有 Bean,查找@KafkaListener
注解
KafkaListenerAnnotationBeanPostProcessor
// 注:省略了部分代码
// BeanPostProcessor接口提供的方法,是 Spring 框架的核心扩展机制之一,允许在 Bean 初始化后进行自定义处理。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 扫描标注了KafkaListener的类
Collection<KafkaListener> classLevelListeners = this.findListenerAnnotations(targetClass);
// 扫描标注了KafkaListener的方法
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (methodx) -> {
Set<KafkaListener> listenerMethods = this.findListenerAnnotations(methodx);
return !listenerMethods.isEmpty() ? listenerMethods : null;
});
// 遍历扫描到的方法,解析签名
Iterator var13 = annotatedMethods.entrySet().iterator();
Map.Entry<Method, Set<KafkaListener>> entry = (Map.Entry)var13.next();
Method method = (Method)entry.getKey();
Iterator var11 = ((Set)entry.getValue()).iterator();
while(var11.hasNext()) {
KafkaListener listener = (KafkaListener)var11.next();
// 扫描到后,后续的解析注册逻辑
this.processKafkaListener(listener, method, bean, beanName);
}
return bean;
}
- 解析注解:提取
topics
、groupId
、containerFactory
等信息。
KafkaListenerAnnotationBeanPostProcessor
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
// 解析注解,将注解元数据、方法、bean等静态配置封装到endpoint
this.processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);
String containerFactory = this.resolve(kafkaListener.containerFactory());
KafkaListenerContainerFactory<?> listenerContainerFactory = this.resolveContainerFactory(kafkaListener, containerFactory, beanName);
// 将上一步扫描到的listener、method等封装成endpoint,进行注册
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
}
- 注册监听端点:调用
KafkaListenerEndpointRegistrar.registerEndpoint()
注册监听器。
KafkaListenerEndpointRegistrar
public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized(this.endpointDescriptors) {
// 是否立即启动,
// true:立即创建并启动对应的 MessageListenerContainer(Kafka 消费者容器)
// false: 仅将端点信息保存到 endpointDescriptors 集合中,后续统一创建并启动
if (this.startImmediately) {
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, this.resolveContainerFactory(descriptor), true);
} else {
this.endpointDescriptors.add(descriptor);
}
}
}
// 统一创建KafkaMessageListenerContainer并启动
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
synchronized (this.listenerContainers) {
// 创建MessageListenerContainer,这个方法是创建容器的环节,源码看下一节分析
MessageListenerContainer container = createListenerContainer(endpoint, factory);
// 将创建好的容器放到一个线程安全的map中
this.listenerContainers.put(id, container);
if (startImmediately) {
// 启动
startIfNecessary(container);
}
}
}
KafkaListenerContainerFactory
创建监听容器
KafkaMessageListenerContainer
是 Spring Kafka 的核心组件之一,负责 管理和执行 Kafka 消费者的消息监听逻辑,封装了原生 KafkaConsumer
,提供了线程管理、消息拉取、监听器调用、错误处理等功能。
暂时回到我们开头的配置,这里我们配置的容器创建工厂是ConcurrentKafkaListenerContainerFactory
,concurrency=3
表示启动三个线程并发处理消息,这个时候,则会由ConcurrentKafkaListenerContainerFactory
创建ConcurrentMessageListenerContainer
。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 设置并发消费者数量
factory.setConcurrency(3);
// 设置手动提交
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
在在ConcurrentMessageListenerContainer
中有一个集合,到时候会根据concurrency
创建对应数量的KafkaMessageListenerContainer
子容器。
private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
容器创建代码
AbstractKafkaListenerContainerFactory
//
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
C instance = createContainerInstance(endpoint);
JavaUtils.INSTANCE
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
}
endpoint.setupListenerContainer(instance, this.messageConverter);
// 初始化容器的配置,endpoint中有静态的配置,比如topic信息、KafkaListener标记的方法、bane等,这里会将这些信息复制到容器中,还有
initializeContainer(instance, endpoint);
customizeContainer(instance);
return instance;
}
protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint);
ConcurrentKafkaListenerContainerFactory
// 调用子类的方法,这里是通过模板方法的设计模式,在抽象类中定义好整个流程,具体部分的实现由子类完成
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
if (topicPartitions != null && topicPartitions.length > 0) {
ContainerProperties properties = new ContainerProperties(topicPartitions);
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
Collection<String> topics = endpoint.getTopics();
if (!topics.isEmpty()) { // NOSONAR
ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); // NOSONAR
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
}
}
启动容器,消费消息
前面我们提到容器创建好后有一个启动的过程,也就是这一行代码startIfNecessary(container);
,会真正启动容器,进一步触发消费者线程(ListenerConsumer
)的初始化并开始消息消费流程。
KafkaListenerEndpointRegistrar
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
// 调用到AbstractMessageListenerContainer的start方法,
public final void start() {
checkGroupId();
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
doStart();
}
}
}
// 调用到ConcurrentMessageListenerContainer的doStart()方法,执行真正的启动逻辑
protected void doStart() {
if (!isRunning()) {
// 根据concurrency创建对应数量的子容器
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container =
constructContainer(containerProperties, topicPartitions, i);
configureChildContainer(i, container);
if (isPaused()) {
container.pause();
}
// 启动子容器
container.start();
// 保存到子容器列表
this.containers.add(container);
}
}
}
// 调用到KafkaMessageListenerContainer的doStart,启动子容器
protected void doStart() {
// 创建消费者线程
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
// 阻塞等待消费者线程真正启动完成。
this.startLatch = new CountDownLatch(1);
// 提交到线程池,异步启动消费者线程。
this.listenerConsumerFuture = consumerExecutor
.submitListenable(this.listenerConsumer);
}
消费消息的逻辑在ListenerConsumer
中,该类实现了Runnable
接口的run()
方法,在run()
方法中实现了拉取消息,并通过反射调用我们自定义的业务方法,进行消息处理等自定义逻辑。
ListenerConsumer
public void run() {
while (isRunning()) {
try {
// 从kafka拉取消息并通过反射调用业务方法
pollAndInvoke();
}
catch (Exception e) {
handleConsumerException(e);
}
finally {
clearThreadState();
}
}
}
protected void pollAndInvoke() {
// 拉取消息
ConsumerRecords<K, V> records = doPoll();
// 通过反射调用到我们自定义的方法进行消息处理
invokeIfHaveRecords(records);
}