告别数据不一致!Spring Boot+Canal+RocketMQ实现精准的MySQL变更监听

发布于:2025-07-08 ⋅ 阅读:(11) ⋅ 点赞:(0)

下面我将提供一个完整的 Spring Boot 项目,集成 Canal 和 RocketMQ 实现 MySQL binlog 监听和异步处理。

项目结构

src/
├── main/
│   ├── java/
│   │   └── com/
│   │       └── example/
│   │           └── canalrocketmqdemo/
│   │               ├── CanalClientRunner.java        // Canal 客户端
│   │               ├── BinlogEventProducer.java      // RocketMQ 生产者
│   │               ├── BinlogEventConsumer.java      // RocketMQ 消费者
│   │               ├── BinlogEvent.java              // 事件模型
│   │               ├── config/
│   │               │   └── RocketMQConfig.java       // RocketMQ 配置
│   │               └── Application.java              // 主应用
│   └── resources/
│       ├── application.yml                           // 配置文件
│       └── rocketmq.properties                       // RocketMQ 属性文件

完整代码实现

1. 添加依赖 (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>canal-rocketmq-demo</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.4</version>
        <relativePath/>
    </parent>

    <properties>
        <java.version>17</java.version>
        <rocketmq.version>5.1.4</rocketmq.version>
        <canal.version>1.1.7</canal.version>
        <lombok.version>1.18.30</lombok.version>
    </properties>

    <dependencies>
        <!-- Spring Boot Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!-- RocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        
        <!-- Canal -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>${canal.version}</version>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <!-- JSON -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        
        <!-- 连接池 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2. 配置文件 (application.yml)

server:
  port: 8080

spring:
  application:
    name: canal-rocketmq-demo

# Canal 配置
canal:
  server: 127.0.0.1:11111
  destination: example
  batch-size: 1000
  username: ""
  password: ""
  retry:
    max-attempts: 5
    backoff: 5000

# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: canal-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 3
  consumer:
    group: canal-consumer-group
    consume-timeout: 30000

3. RocketMQ 配置类 (RocketMQConfig.java)

package com.example.canalrocketmqdemo.config;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;

import java.util.List;

@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Bean
    public RocketMQTemplate rocketMQTemplate() {
        RocketMQTemplate template = new RocketMQTemplate();
        template.setNameServer(nameServer);
        return template;
    }

    @Bean
    public RocketMQMessageConverter rocketMQMessageConverter() {
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> converterList = compositeConverter.getConverters();
        for (MessageConverter messageConverter : converterList) {
            if (messageConverter instanceof MappingJackson2MessageConverter) {
                ((MappingJackson2MessageConverter) messageConverter).setSerializedPayloadClass(String.class);
            }
        }
        return converter;
    }
}

4. Binlog 事件模型 (BinlogEvent.java)

package com.example.canalrocketmqdemo;

import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
public class BinlogEvent {
    public enum EventType { INSERT, UPDATE, DELETE }

    private String database;
    private String table;
    private EventType eventType;
    private Long executeTime;
    private List<Map<String, String>> before;   // DELETE/UPDATE 前的数据
    private List<Map<String, String>> after;    // INSERT/UPDATE 后的数据

    // 获取唯一标识(用于幂等处理)
    public String getUniqueId() {
        return database + ":" + table + ":" + executeTime;
    }
}

5. RocketMQ 生产者 (BinlogEventProducer.java)

package com.example.canalrocketmqdemo;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class BinlogEventProducer {

    private final RocketMQTemplate rocketMQTemplate;
    
    @Value("${rocketmq.producer.group}")
    private String producerGroup;
    
    @Value("${rocketmq.topic.binlog}")
    private String binlogTopic;

    @Autowired
    public BinlogEventProducer(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
    }

    public void sendEvent(BinlogEvent event) {
        try {
            rocketMQTemplate.syncSend(binlogTopic, 
                MessageBuilder.withPayload(event)
                    .setHeader("EVENT_TYPE", event.getEventType().name())
                    .build());
            
            // 可添加日志记录发送情况
        } catch (Exception e) {
            // 添加重试或错误处理逻辑
            throw new RuntimeException("发送消息到RocketMQ失败", e);
        }
    }
}

6. Canal 客户端 (CanalClientRunner.java)

package com.example.canalrocketmqdemo;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Component
public class CanalClientRunner implements CommandLineRunner {

    @Value("${canal.server}")
    private String canalServer;
    
    @Value("${canal.destination}")
    private String destination;
    
    @Value("${canal.batch-size}")
    private int batchSize;
    
    @Value("${canal.username}")
    private String username;
    
    @Value("${canal.password}")
    private String password;
    
    @Value("${canal.retry.max-attempts}")
    private int maxRetryAttempts;
    
    @Value("${canal.retry.backoff}")
    private long retryBackoff;
    
    @Autowired
    private BinlogEventProducer eventProducer;

    private CanalConnector connector;
    private volatile boolean running = true;

    @Override
    public void run(String... args) {
        // 初始化连接
        initConnector();
        
        // 启动处理线程
        new Thread(this::process).start();
        
        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
    }

    private void initConnector() {
        String[] serverParts = canalServer.split(":");
        String host = serverParts[0];
        int port = Integer.parseInt(serverParts[1]);
        
        connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress(host, port),
            destination,
            username,
            password
        );
        
        connectWithRetry();
    }

    private void connectWithRetry() {
        int attempts = 0;
        while (attempts < maxRetryAttempts) {
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                return;
            } catch (Exception e) {
                attempts++;
                if (attempts >= maxRetryAttempts) {
                    throw new RuntimeException("连接Canal失败,达到最大重试次数", e);
                }
                try {
                    Thread.sleep(retryBackoff);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void process() {
        while (running) {
            try {
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                
                if (batchId == -1 || size == 0) {
                    Thread.sleep(1000);
                    continue;
                }
                
                processEntries(message.getEntries());
                connector.ack(batchId);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                // 处理异常,尝试重连
                handleProcessingException(e);
            }
        }
    }

    private void processEntries(List<Entry> entries) {
        for (Entry entry : entries) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || 
                entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                Header header = entry.getHeader();
                
                BinlogEvent event = new BinlogEvent();
                event.setDatabase(header.getSchemaName());
                event.setTable(header.getTableName());
                event.setExecuteTime(header.getExecuteTime());
                event.setEventType(mapEventType(rowChange.getEventType()));
                
                for (RowData rowData : rowChange.getRowDatasList()) {
                    if (event.getEventType() == BinlogEvent.EventType.UPDATE || 
                        event.getEventType() == BinlogEvent.EventType.DELETE) {
                        event.setBefore(convertColumns(rowData.getBeforeColumnsList()));
                    }
                    
                    if (event.getEventType() == BinlogEvent.EventType.INSERT || 
                        event.getEventType() == BinlogEvent.EventType.UPDATE) {
                        event.setAfter(convertColumns(rowData.getAfterColumnsList()));
                    }
                    
                    // 发送事件到RocketMQ
                    eventProducer.sendEvent(event);
                }
            } catch (Exception e) {
                // 记录错误日志,不要阻断主流程
                System.err.println("处理binlog条目失败: " + e.getMessage());
            }
        }
    }

    private List<Map<String, String>> convertColumns(List<Column> columns) {
        return columns.stream()
                .map(col -> Map.of(
                    "name", col.getName(),
                    "value", col.getValue(),
                    "type", col.getMysqlType(),
                    "updated", String.valueOf(col.getUpdated())
                ))
                .collect(Collectors.toList());
    }
    
    private BinlogEvent.EventType mapEventType(EventType eventType) {
        switch (eventType) {
            case INSERT: return BinlogEvent.EventType.INSERT;
            case UPDATE: return BinlogEvent.EventType.UPDATE;
            case DELETE: return BinlogEvent.EventType.DELETE;
            default: return null;
        }
    }

    private void handleProcessingException(Exception e) {
        System.err.println("处理Canal消息异常: " + e.getMessage());
        try {
            Thread.sleep(retryBackoff);
            connectWithRetry(); // 尝试重新连接
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    private void shutdown() {
        running = false;
        if (connector != null) {
            connector.disconnect();
        }
    }
}

7. RocketMQ 消费者 (BinlogEventConsumer.java)

package com.example.canalrocketmqdemo;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    topic = "${rocketmq.topic.binlog}",
    consumerGroup = "${rocketmq.consumer.group}"
)
public class BinlogEventConsumer implements RocketMQListener<BinlogEvent> {

    private static final Logger logger = LoggerFactory.getLogger(BinlogEventConsumer.class);

    @Override
    public void onMessage(BinlogEvent event) {
        try {
            switch (event.getEventType()) {
                case INSERT:
                    handleInsert(event);
                    break;
                case UPDATE:
                    handleUpdate(event);
                    break;
                case DELETE:
                    handleDelete(event);
                    break;
            }
            logger.info("处理成功: {}.{} {}", 
                event.getDatabase(), event.getTable(), event.getEventType());
        } catch (Exception e) {
            logger.error("处理事件失败: {}", event, e);
            // 这里可以添加重试逻辑或发送到死信队列
        }
    }

    private void handleInsert(BinlogEvent event) {
        // 实际业务处理:更新缓存、同步到ES、数据仓库等
        logger.info("INSERT 事件: {}.{}", event.getDatabase(), event.getTable());
        logger.debug("新增数据: {}", event.getAfter());
    }

    private void handleUpdate(BinlogEvent event) {
        logger.info("UPDATE 事件: {}.{}", event.getDatabase(), event.getTable());
        logger.debug("变更前: {}", event.getBefore());
        logger.debug("变更后: {}", event.getAfter());
    }

    private void handleDelete(BinlogEvent event) {
        logger.info("DELETE 事件: {}.{}", event.getDatabase(), event.getTable());
        logger.debug("删除数据: {}", event.getBefore());
    }
}

8. 主应用类 (Application.java)

package com.example.canalrocketmqdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

环境准备与部署步骤

1. MySQL 配置

  1. 修改 MySQL 配置文件 (my.cnf 或 my.ini):

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1
    expire_logs_days=3
  2. 重启 MySQL 服务

  3. 创建 Canal 用户:

    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

2. Canal Server 部署

  1. 下载 Canal: https://github.com/alibaba/canal/releases

  2. 解压并修改配置:

    cd canal
    vi conf/example/instance.properties

    修改内容:

    properties

    canal.instance.master.address=127.0.0.1:3306
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal_password
    canal.instance.filter.regex=.*\\..*
  3. 启动 Canal:

    sh bin/startup.sh  # Linux
    bin/startup.bat    # Windows

3. RocketMQ 部署

  1. 下载 RocketMQ: https://rocketmq.apache.org/download

  2. 启动 NameServer:

    nohup sh bin/mqnamesrv &
  3. 启动 Broker:

    nohup sh bin/mqbroker -n localhost:9876 &
  4. 创建 Topic:

    sh bin/mqadmin updateTopic -n localhost:9876 -t binlog-events -b localhost:10911

4. 启动 Spring Boot 应用

mvn spring-boot:run

5. 测试验证

  1. 在 MySQL 中执行操作:

    -- 创建测试表
    CREATE TABLE test_table (
      id INT AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(50),
      email VARCHAR(100)
    );
    
    -- 执行DML操作
    INSERT INTO test_table (name, email) VALUES ('John', 'john@example.com');
    UPDATE test_table SET email = 'john.doe@example.com' WHERE id = 1;
    DELETE FROM test_table WHERE id = 1;
  2. 查看应用日志:

    INSERT 事件: test_db.test_table
    UPDATE 事件: test_db.test_table
    DELETE 事件: test_db.test_table
  3. 使用 RocketMQ 控制台查看消息:

    http://localhost:9876

生产环境优化建议

1. 性能优化

// 在 CanalClientRunner 中添加批量发送
private void processEntries(List<Entry> entries) {
    List<BinlogEvent> events = new ArrayList<>();
    
    for (Entry entry : entries) {
        // ... 解析事件 ...
        events.add(event);
    }
    
    // 批量发送
    if (!events.isEmpty()) {
        eventProducer.sendBatchEvents(events);
    }
}

// 在 BinlogEventProducer 中添加批量发送方法
public void sendBatchEvents(List<BinlogEvent> events) {
    List<Message<BinlogEvent>> messages = events.stream()
        .map(event -> MessageBuilder.withPayload(event).build())
        .collect(Collectors.toList());
    
    rocketMQTemplate.syncSend(binlogTopic, messages);
}

2. 消息过滤

在 application.yml 中添加:

canal:
  filter:
    tables: user, product, order.*  # 只监听指定表

在 CanalClientRunner 中:

@Value("${canal.filter.tables:}")
private String tableFilter;

private void initConnector() {
    // ...
    connector.subscribe(tableFilter.isEmpty() ? ".*\\..*" : tableFilter);
}

3. 消息轨迹和监控

// 在发送消息时添加消息轨迹
public void sendEvent(BinlogEvent event) {
    rocketMQTemplate.syncSendOrderly(binlogTopic, 
        MessageBuilder.withPayload(event).build(), 
        event.getUniqueId());
    
    // 添加监控指标
    Metrics.counter("rocketmq.sent.messages", 
        "table", event.getTable(),
        "type", event.getEventType().name())
        .increment();
}

4. 死信队列处理

// 在 RocketMQConfig 中添加死信队列配置
@Bean
public RocketMQListenerContainerFactory rocketMQListenerContainerFactory() {
    DefaultRocketMQListenerContainerFactory factory = new DefaultRocketMQListenerContainerFactory();
    factory.setConsumerGroup("${rocketmq.consumer.group}");
    factory.setNameServerAddress("${rocketmq.name-server}");
    
    // 设置重试次数
    factory.setMaxReconsumeTimes(3);
    
    // 设置死信队列主题
    factory.setDLQTopic("DLQ_${rocketmq.topic.binlog}");
    
    return factory;
}

常见问题解决

1. Canal 连接问题

症状: 连接 Canal Server 失败
解决:

  • 检查 Canal Server 是否运行: netstat -an | grep 11111

  • 确认用户名/密码是否正确

  • 查看 Canal Server 日志: logs/canal/canal.log

2. RocketMQ 发送失败

症状: 消息发送超时或失败
解决:

  • 检查 NameServer 状态: mqadmin clusterList -n localhost:9876

  • 增加发送超时时间: rocketmq.producer.send-message-timeout=5000

  • 检查网络连接

3. 消息消费积压

症状: RocketMQ 控制台显示消息积压
解决:

  • 增加消费者实例数量

  • 调整批量消费大小

  • 优化消费者处理逻辑

4. 数据不一致问题

症状: 处理后的数据与源数据不一致
解决:

  • 添加幂等处理逻辑

  • 实现数据校对机制

  • 记录消息处理状态

总结

本教程完整实现了 Spring Boot 项目整合 Canal 和 RocketMQ 监听 MySQL binlog 的方案:

  1. Canal 客户端:负责监听 MySQL binlog 变更

  2. RocketMQ 生产者:将 binlog 事件发送到消息队列

  3. RocketMQ 消费者:异步处理变更事件

  4. 完整配置:包含 MySQL、Canal Server 和 RocketMQ 的配置

通过使用 RocketMQ 作为消息中间件,系统获得了以下优势:

  • 解耦:数据处理与数据捕获分离

  • 可靠性:消息持久化保证不丢失

  • 扩展性:消费者可水平扩展

  • 容错性:重试机制和死信队列

  • 顺序性:相同表的数据变更保持顺序

生产环境部署时,请根据实际流量调整:

  • Canal 的 batch-size 参数

  • RocketMQ 的 Topic 分区数

  • 消费者并发数

  • 消息存储时间

此方案适用于需要实时数据同步的场景,如缓存更新、搜索引擎同步、实时分析等。


网站公告

今日签到

点亮在社区的每一天
去签到