夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发
1.前言
在文章 《夯实 kafka 系列|第五章:基于 kafka 分布式事件框架 eval-event》 留了一个功能未实现
@EvalEventListener
监听分布式事件
本文来讨论这个自定义注解如何开发。
源码已更新到 github
2.需求
通过 @EvalEventListener
监听到分布式事件 MyEvent(自定义事件)
@EvalEventListener
public void onEvalEvent(MyEvent event){
...
}
3.思路
3.1 @kafkaListener
注解的实现
我们先看看 kafka 官方如何实现 @kafkaListener
,然后参照这个来进行实现。
3.1.1 KafkaListenerAnnotationBeanPostProcessor(重点)
- 作用:在 Spring 容器初始化时扫描所有 Bean,识别带有
@KafkaListener
注解的方法。 - 流程:
- 遍历所有 Bean 的类方法。
- 通过反射检查方法是否标注
@KafkaListener
。 - 将符合条件的监听方法封装为
MethodKafkaListenerEndpoint
。
3.1.2 监听器端点注册
- 将解析后的端点信息注册到
KafkaListenerEndpointRegistry
(管理所有监听容器的中央注册表)。
3.1.3 监听器容器创建
- 容器工厂
ConcurrentKafkaListenerContainerFactory
- 作用:根据
@KafkaListener
的配置创建ConcurrentMessageListenerContainer
。
3.1.4 动态代理生成(重点)
- 对带有
@KafkaListener
的方法生成动态代理,使其具备消息处理能力。
3.1.5 总结
一共分为三步
- 在 Bean 的初始化后生命周期阶段,找到所有标注
@KafkaListener
的方法和类 - 整理好所有需要的元数据信息(Bean Method topic 等等),用于动态代理+消费 kafka 消息
- 动态代理,执行类的方法,即标注
@KafkaListener
的方法
3.2 参考实现
我们大致实现流程如下:
EvalEventListenerAnnotationBeanPostProcessor 实现 BeanPostProcessor
遍历所有 Bean 的类方法,通过反射检查方法是否标注
@EvalEventListener
找到 topic 和 class 映射关系
启动线程,拉取 topic 消息
找到
@EvalEventListener
对应的 class,以及 method反射执行目标类的 method 方法
4.开发
3.1 @EvalEventListener 注解
package com.csdn.event.kafka.annotation;
import java.lang.annotation.*;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EvalEventListener {
/**
* 指定监听的事件类型(默认根据方法参数推断)
*/
Class<?>[] eventType() default {};
}
3.2 EvalEventAnnotationDefinition 注解定义
用于存储 Bean Method topic 等等信息
public class EvalEventAnnotationDefinition {
private String topic;
// 事件的目标类
private Class<?> targetClass;
// 事件的类型
private Class<?> eventClass;
private Method method;
...
}
3.3 EvalEventListenerAnnotationBeanPostProcessor 初始化后处理器
- 找到标注
@EvalEventListener
类和方法 - 循环执行 processEvalEventListener 方法
- 封装 EvalEventAnnotationDefinition 信息
- 启动 EvalEventListenerAnnotationThread 线程
/**
* This class processes beans after their initialization to find methods annotated with
* {@link EvalEventListener} and register them as event listeners.
*/
@Component
public class EvalEventListenerAnnotationBeanPostProcessor<T extends EvalEvent> implements BeanPostProcessor, ApplicationContextAware {
...
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
Map<Method, Set<EvalEventListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<EvalEventListener>>) method -> {
Set<EvalEventListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (annotatedMethods.isEmpty()) {
// No methods found
logger.debug("No @EvalEventListener annotations found on bean '{}'", beanName);
} else {
// Non-empty set of methods
for (Map.Entry<Method, Set<EvalEventListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (EvalEventListener listener : entry.getValue()) {
processEvalEventListener(listener, method, bean);
}
}
logger.debug(" {} @KafkaListener methods processed on bean '{}'"
, annotatedMethods.size(), beanName + ": " + annotatedMethods);
}
return bean;
}
private void processEvalEventListener(EvalEventListener listener, Method method, Object bean) {
// topic
Class<?>[] eventTypeArr = listener.eventType();
Class<?> eventType = eventTypeArr.length > 0 ? eventTypeArr[0] : null;
if (eventType == null) {
// 从方法参数中获取
eventType = method.getParameterTypes()[0];
}
if (eventType == null) {
throw new IllegalStateException("Event type not specified for @EvalEventListener on method: " + method);
}
try {
Object o = eventType.newInstance();
if (!(o instanceof EvalEvent)) {
throw new IllegalStateException("Event type must be a subclass of EvalEvent: " + eventType);
}
EvalEvent event = (EvalEvent) o;
EvalEventAnnotationDefinition evalEventDefinition = new EvalEventAnnotationDefinition(event.getTopic(), bean.getClass(), eventType, method);
EvalEventListenerAnnotationThread<T> evalEventListenerThread =
new EvalEventListenerAnnotationThread<>(evalEventDefinition, eventKafkaConsumerFactory, applicationContext);
evalEventListenerThread.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
...
}
3.4 EvalEventListenerAnnotationThread 消费逻辑
- 创建KafkaConsumer
- 拉取 kafka 消息
- 动态代理执行目标类的方法
- ack 提交
public class EvalEventListenerAnnotationThread<T extends EvalEvent> extends Thread {
...
@Override
public void run() {
// 1. 创建KafkaConsumer
KafkaConsumer<String, ?> consumer;
try {
consumer = eventKafkaConsumerFactory.buildKafkaConsumer(evalEventAnnotationDefinition.getTargetClass());
List<String> topicList = new ArrayList<>();
topicList.add(evalEventAnnotationDefinition.getTopic());
consumer.subscribe(topicList);
} catch (Exception e) {
log.error("KafkaConsumer构造失败", e);
e.printStackTrace();
return;
}
// 2. 消费消息
try {
while (true) {
try {
// 3. 拉取消息
ConsumerRecords<String, ?> records = consumer.poll(
Duration.ofMillis(500));
if (records.isEmpty()) {
continue;
}
// 4. 处理消息
dispatch(records);
// 5. 使用异步提交规避阻塞
consumer.commitAsync();
} catch (Exception e) {
log.error("消息处理异常", e);
}
}
} finally {
try {
// 6.最后一次提交使用同步阻塞式提交
consumer.commitSync();
} finally {
consumer.close();
}
}
}
private void dispatch(ConsumerRecords<String, ?> records) {
for (ConsumerRecord<String, ?> record : records) {
try {
Object data = record.value();
if (data == null) {
log.warn("接收到空消息记录,跳过处理");
continue;
}
// 转换事件数据为指定类型
T event = (T) ConvertUtil.convertEvent(data, evalEventAnnotationDefinition.getEventClass());
if (event == null) {
log.error("事件数据转换失败,跳过处理");
continue;
}
Class<?> targetClass = evalEventAnnotationDefinition.getTargetClass();
Method method = evalEventAnnotationDefinition.getMethod();
if (targetClass == null || method == null) {
log.error("目标类或方法为空,无法处理事件");
continue;
}
// 从 Spring 容器获取 bean 实例
Object target;
try {
// 先尝试从 Spring 容器获取
target = applicationContext.getBean(targetClass);
} catch (NoSuchBeanDefinitionException e) {
// 如果容器中没有,尝试创建新实例
log.warn("Spring 容器中未找到 {} 的实例,将创建新实例", targetClass.getSimpleName());
target = targetClass.getDeclaredConstructor().newInstance();
// 可选:如果需要依赖注入,可以在这里自动装配
autowireBean(target);
}
method.setAccessible(true);
// 调用目标方法
method.invoke(target, event);
} catch (InstantiationException | IllegalAccessException e) {
log.error("创建目标类实例失败: {}", e.getMessage(), e);
} catch (NoSuchMethodException e) {
log.error("找不到目标类的无参构造方法: {}", e.getMessage(), e);
} catch (InvocationTargetException e) {
log.error("方法调用失败: {}", e.getTargetException().getMessage(), e.getTargetException());
} catch (Exception e) {
log.error("事件处理失败: {}", e.getMessage(), e);
}
}
}
...
}
5.测试
5.1 测试代码
@Component
public class UserAnnotationEventListener {
@EvalEventListener
public void onEvent(UserCreatedEvent event) {
System.out.println("Received event: " + JSONObject.toJSONString(event));
}
}
5.2 源码调试
断点1 EvalEventListenerAnnotationBeanPostProcessor
- 扫描到 UserAnnotationEventListener 以及对应的方法
断点2
- 封装 EvalEventAnnotationDefinition
断点3 EvalEventListenerAnnotationThread
- 拉取 kafka 消息
断点4 EvalEventListenerAnnotationThread 动态代理执行方法