夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发

发布于:2025-03-28 ⋅ 阅读:(32) ⋅ 点赞:(0)

夯实 kafka 系列|第六章:自定义注解 @EvalEventListener 开发

1.前言

在文章 《夯实 kafka 系列|第五章:基于 kafka 分布式事件框架 eval-event》 留了一个功能未实现

  • @EvalEventListener 监听分布式事件

本文来讨论这个自定义注解如何开发。

源码已更新到 github

2.需求

通过 @EvalEventListener 监听到分布式事件 MyEvent(自定义事件)

@EvalEventListener 
public void onEvalEvent(MyEvent event){
    ...
}

3.思路

3.1 @kafkaListener 注解的实现

我们先看看 kafka 官方如何实现 @kafkaListener,然后参照这个来进行实现。

3.1.1 KafkaListenerAnnotationBeanPostProcessor(重点)
  • 作用:在 Spring 容器初始化时扫描所有 Bean,识别带有 @KafkaListener 注解的方法。
  • 流程
    1. 遍历所有 Bean 的类方法。
    2. 通过反射检查方法是否标注 @KafkaListener
    3. 将符合条件的监听方法封装为 MethodKafkaListenerEndpoint
3.1.2 监听器端点注册
  • 将解析后的端点信息注册到 KafkaListenerEndpointRegistry(管理所有监听容器的中央注册表)。
3.1.3 监听器容器创建
  • 容器工厂 ConcurrentKafkaListenerContainerFactory
  • 作用:根据 @KafkaListener 的配置创建 ConcurrentMessageListenerContainer
3.1.4 动态代理生成(重点)
  • 对带有 @KafkaListener 的方法生成动态代理,使其具备消息处理能力。
3.1.5 总结

一共分为三步

  • 在 Bean 的初始化后生命周期阶段,找到所有标注 @KafkaListener 的方法和类
  • 整理好所有需要的元数据信息(Bean Method topic 等等),用于动态代理+消费 kafka 消息
  • 动态代理,执行类的方法,即标注 @KafkaListener 的方法

3.2 参考实现

我们大致实现流程如下:

  • EvalEventListenerAnnotationBeanPostProcessor 实现 BeanPostProcessor

  • 遍历所有 Bean 的类方法,通过反射检查方法是否标注 @EvalEventListener

  • 找到 topic 和 class 映射关系

  • 启动线程,拉取 topic 消息

  • 找到 @EvalEventListener 对应的 class,以及 method

  • 反射执行目标类的 method 方法

4.开发

3.1 @EvalEventListener 注解

package com.csdn.event.kafka.annotation;

import java.lang.annotation.*;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EvalEventListener {
    /**
     * 指定监听的事件类型(默认根据方法参数推断)
     */
    Class<?>[] eventType() default {};
}

3.2 EvalEventAnnotationDefinition 注解定义

用于存储 Bean Method topic 等等信息

public class EvalEventAnnotationDefinition {
    private String topic;

    // 事件的目标类
    private Class<?> targetClass;

    // 事件的类型
    private Class<?> eventClass;

    private Method method;
    ...
}

3.3 EvalEventListenerAnnotationBeanPostProcessor 初始化后处理器

  • 找到标注 @EvalEventListener 类和方法
  • 循环执行 processEvalEventListener 方法
    • 封装 EvalEventAnnotationDefinition 信息
    • 启动 EvalEventListenerAnnotationThread 线程
/**
 * This class processes beans after their initialization to find methods annotated with
 * {@link EvalEventListener} and register them as event listeners.
 */
@Component
public class EvalEventListenerAnnotationBeanPostProcessor<T extends EvalEvent> implements BeanPostProcessor, ApplicationContextAware {

    ...

    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        Map<Method, Set<EvalEventListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<Set<EvalEventListener>>) method -> {
                    Set<EvalEventListener> listenerMethods = findListenerAnnotations(method);
                    return (!listenerMethods.isEmpty() ? listenerMethods : null);
                });
        if (annotatedMethods.isEmpty()) {
            // No methods found
            logger.debug("No @EvalEventListener annotations found on bean '{}'", beanName);
        } else {
            // Non-empty set of methods
            for (Map.Entry<Method, Set<EvalEventListener>> entry : annotatedMethods.entrySet()) {
                Method method = entry.getKey();
                for (EvalEventListener listener : entry.getValue()) {
                    processEvalEventListener(listener, method, bean);
                }
            }
            logger.debug(" {} @KafkaListener methods processed on bean '{}'"
                    , annotatedMethods.size(), beanName + ": " + annotatedMethods);
        }
        return bean;
    }

    private void processEvalEventListener(EvalEventListener listener, Method method, Object bean) {
        // topic
        Class<?>[] eventTypeArr = listener.eventType();
        Class<?> eventType = eventTypeArr.length > 0 ? eventTypeArr[0] : null;
        if (eventType == null) {
            // 从方法参数中获取
            eventType = method.getParameterTypes()[0];
        }
        if (eventType == null) {
            throw new IllegalStateException("Event type not specified for @EvalEventListener on method: " + method);
        }
        try {
            Object o = eventType.newInstance();
            if (!(o instanceof EvalEvent)) {
                throw new IllegalStateException("Event type must be a subclass of EvalEvent: " + eventType);
            }
            EvalEvent event = (EvalEvent) o;
            EvalEventAnnotationDefinition evalEventDefinition = new EvalEventAnnotationDefinition(event.getTopic(), bean.getClass(), eventType, method);
            EvalEventListenerAnnotationThread<T> evalEventListenerThread =
                    new EvalEventListenerAnnotationThread<>(evalEventDefinition, eventKafkaConsumerFactory, applicationContext);
            evalEventListenerThread.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
...
}

3.4 EvalEventListenerAnnotationThread 消费逻辑

  • 创建KafkaConsumer
  • 拉取 kafka 消息
  • 动态代理执行目标类的方法
  • ack 提交
public class EvalEventListenerAnnotationThread<T extends EvalEvent> extends Thread {

 ...

    @Override
    public void run() {
        // 1. 创建KafkaConsumer
        KafkaConsumer<String, ?> consumer;
        try {
            consumer = eventKafkaConsumerFactory.buildKafkaConsumer(evalEventAnnotationDefinition.getTargetClass());
            List<String> topicList = new ArrayList<>();
            topicList.add(evalEventAnnotationDefinition.getTopic());
            consumer.subscribe(topicList);
        } catch (Exception e) {
            log.error("KafkaConsumer构造失败", e);
            e.printStackTrace();
            return;
        }
        // 2. 消费消息
        try {
            while (true) {
                try {
                    // 3. 拉取消息
                    ConsumerRecords<String, ?> records = consumer.poll(
                            Duration.ofMillis(500));
                    if (records.isEmpty()) {
                        continue;
                    }
                    // 4. 处理消息
                    dispatch(records);
                    // 5. 使用异步提交规避阻塞
                    consumer.commitAsync();
                } catch (Exception e) {
                    log.error("消息处理异常", e);
                }
            }
        } finally {
            try {
                // 6.最后一次提交使用同步阻塞式提交
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }

    private void dispatch(ConsumerRecords<String, ?> records) {
        for (ConsumerRecord<String, ?> record : records) {
            try {
                Object data = record.value();
                if (data == null) {
                    log.warn("接收到空消息记录,跳过处理");
                    continue;
                }

                // 转换事件数据为指定类型
                T event = (T) ConvertUtil.convertEvent(data, evalEventAnnotationDefinition.getEventClass());
                if (event == null) {
                    log.error("事件数据转换失败,跳过处理");
                    continue;
                }

                Class<?> targetClass = evalEventAnnotationDefinition.getTargetClass();
                Method method = evalEventAnnotationDefinition.getMethod();

                if (targetClass == null || method == null) {
                    log.error("目标类或方法为空,无法处理事件");
                    continue;
                }

                // 从 Spring 容器获取 bean 实例
                Object target;
                try {
                    // 先尝试从 Spring 容器获取
                    target = applicationContext.getBean(targetClass);
                } catch (NoSuchBeanDefinitionException e) {
                    // 如果容器中没有,尝试创建新实例
                    log.warn("Spring 容器中未找到 {} 的实例,将创建新实例", targetClass.getSimpleName());
                    target = targetClass.getDeclaredConstructor().newInstance();
                    // 可选:如果需要依赖注入,可以在这里自动装配
                    autowireBean(target);
                }
                method.setAccessible(true);
                // 调用目标方法
                method.invoke(target, event);

            } catch (InstantiationException | IllegalAccessException e) {
                log.error("创建目标类实例失败: {}", e.getMessage(), e);
            } catch (NoSuchMethodException e) {
                log.error("找不到目标类的无参构造方法: {}", e.getMessage(), e);
            } catch (InvocationTargetException e) {
                log.error("方法调用失败: {}", e.getTargetException().getMessage(), e.getTargetException());
            } catch (Exception e) {
                log.error("事件处理失败: {}", e.getMessage(), e);
            }
        }
    }

 ...

}

5.测试

5.1 测试代码

@Component
public class UserAnnotationEventListener  {
    @EvalEventListener
    public void onEvent(UserCreatedEvent event) {
        System.out.println("Received event: " + JSONObject.toJSONString(event));
    }
}

5.2 源码调试

断点1 EvalEventListenerAnnotationBeanPostProcessor

  • 扫描到 UserAnnotationEventListener 以及对应的方法

请添加图片描述

断点2

  • 封装 EvalEventAnnotationDefinition

请添加图片描述

断点3 EvalEventListenerAnnotationThread

  • 拉取 kafka 消息

请添加图片描述

断点4 EvalEventListenerAnnotationThread 动态代理执行方法

请添加图片描述

5.3 测试结果

请添加图片描述