在 DDD 中优雅的发送 Kafka 消息

发布于:2024-12-19 ⋅ 阅读:(12) ⋅ 点赞:(0)

前言
1:host 映射
在这里插入图片描述
下载 SwitchHost 配置一个映射地址。点击 + 添加一个本地环境,之后配置你的 IP kafka 这样就能找这个地址了。IP 为你本地的IP,如果是云服务器就是公网IP地址
使用docker-compose.yml进行一键部署安装

version: '3.0'
# docker-compose -f docker-compose.yml up -d
services:
  zookeeper:
    image: zookeeper:3.9.0
    container_name: zookeeper
    restart: always
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
      ZOOKEEPER_CLIENT_PORT: 2181
      ALLOW_ANONYMOUS_LOGIN: yes
      TZ: Asia/Shanghai
    networks:
      - my-network

  kafka:
    image: bitnami/kafka:3.7.0
    container_name: kafka
    volumes:
      - /etc/localtime:/etc/localtime
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_MESSAGE_MAX_BYTES: "2000000"
      KAFKA_ENABLE_KRAFT: no
      JMX_PORT: 9999
      TZ: Asia/Shanghai
    depends_on:
      - zookeeper
    networks:
      - my-network

  kafka-eagle:
    image: echo21bash/kafka-eagle:3.0.2
    container_name: kafka-eagle
    environment:
      KAFKA_EAGLE_ZK_LIST: zookeeper:2181
    volumes:
      - ./kafka-eagle/system-config.properties:/opt/kafka-eagle/conf/system-config.properties
    ports:
      - "8048:8048"
    depends_on:
      - kafka
    networks:
      - my-network

networks:
  my-network:
    driver: bridge

脚本在代码中提供了完整的语句
消息流程
在这里插入图片描述
代码结构
在这里插入图片描述
1:domain 是领域层,提供一个个领域服务。如果一个工程有多个领域,则有不同的 a、b、c 领域包,每个包下有一套【event、model、repository、service】。
2:在领域层定义的 event 事件,里面涵盖了事件消息。而这个事件消息可以让 UserRepository 继承实现。最终完成消息发送。
3:最后是 trigger 触发器层,所有的 http、rpc、job、mq 都是一种触发行为。通过触发器的 listener 监听,来接收 mq 消息。
环境配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
      
...

# 配置主题
kafka:
  topic:
    group: xmg-group
    user: xmg-topic

配置发送事件

@Slf4j
@Component
public class EventPublisher {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
        try {
            String messageJson = JSON.toJSONString(eventMessage);
            kafkaTemplate.send(topic, messageJson);
            log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);
        } catch (Exception e) {
            log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);
            throw e;
        }
    }

}

事件消息定义

public class UserMessageEvent extends BaseEvent<UserMessageEvent.UserMessage> {

    @Value("${kafka.topic.user}")
    private String topic;

    @Override
    public EventMessage<UserMessage> buildEventMessage(UserMessage data) {
        return EventMessage.<UserMessage>builder()
                .id(RandomStringUtils.randomNumeric(11))
                .timestamp(new Date())
                .data(data)
                .build();
    }

    @Override
    public String topic() {
        return topic;
    }

    /**
     * 要推送的事件消息,聚合到当前类下。
     */
    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserMessage {
        private String userId;
        private String userName;
        private String userType;
    }

}

事件消息发送

@Service
public class UserRepository extends UserMessageEvent implements IUserRepository {

    @Resource
    private EventPublisher publisher;

    @Override
    public void doSaveUser(UserEntity userEntity) {
        // 推送消息
        publisher.publish(this.topic(), this.buildEventMessage(UserMessageEvent.UserMessage.builder()
                .userId(userEntity.getUserId())
                .userName(userEntity.getUserName())
                .userType(userEntity.getUserTypeVO().getDesc())
                .build()));
    }
    
}

事件消息监听

@Slf4j
@Component
public class KafkaMessageListener {

    @KafkaListener(topics = "${kafka.topic.user}", groupId = "${kafka.topic.group}", concurrency = "1")
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional<?> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            try {
                // 逻辑处理

                // 确认消息消费完成,如果抛异常消息会进入重试
                ack.acknowledge();
                log.info("Kafka消费成功! Topic:" + topic + ",Message:" + msg);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("Kafka消费失败!Topic:" + topic + ",Message:" + msg, e);
            }
        }
    }

}

测试验证

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserServiceTest {

    @Resource
    private IUserService userService;

    @Test
    public void test_register() throws InterruptedException {
        while (true) {
            UserEntity userEntity = new UserEntity();
            userEntity.setUserId("10001");
            userEntity.setUserName("小明哥");
            userEntity.setUserTypeVO(UserTypeVO.T8);

            userService.register(userEntity);
            Thread.sleep(1500);
        }

    }

}

好了 至此 在 DDD 中优雅的发送 Kafka 消息 学习结束了 友友们 点点关注不迷路 老铁们!!!!!


网站公告

今日签到

点亮在社区的每一天
去签到