下面我将提供一个完整的 Spring Boot 项目,集成 Canal 和 RocketMQ 实现 MySQL binlog 监听和异步处理。
项目结构
src/ ├── main/ │ ├── java/ │ │ └── com/ │ │ └── example/ │ │ └── canalrocketmqdemo/ │ │ ├── CanalClientRunner.java // Canal 客户端 │ │ ├── BinlogEventProducer.java // RocketMQ 生产者 │ │ ├── BinlogEventConsumer.java // RocketMQ 消费者 │ │ ├── BinlogEvent.java // 事件模型 │ │ ├── config/ │ │ │ └── RocketMQConfig.java // RocketMQ 配置 │ │ └── Application.java // 主应用 │ └── resources/ │ ├── application.yml // 配置文件 │ └── rocketmq.properties // RocketMQ 属性文件
完整代码实现
1. 添加依赖 (pom.xml
)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>canal-rocketmq-demo</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<properties>
<java.version>17</java.version>
<rocketmq.version>5.1.4</rocketmq.version>
<canal.version>1.1.7</canal.version>
<lombok.version>1.18.30</lombok.version>
</properties>
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- Canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. 配置文件 (application.yml
)
server:
port: 8080
spring:
application:
name: canal-rocketmq-demo
# Canal 配置
canal:
server: 127.0.0.1:11111
destination: example
batch-size: 1000
username: ""
password: ""
retry:
max-attempts: 5
backoff: 5000
# RocketMQ 配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: canal-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 3
consumer:
group: canal-consumer-group
consume-timeout: 30000
3. RocketMQ 配置类 (RocketMQConfig.java
)
package com.example.canalrocketmqdemo.config;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.List;
@Configuration
public class RocketMQConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Bean
public RocketMQTemplate rocketMQTemplate() {
RocketMQTemplate template = new RocketMQTemplate();
template.setNameServer(nameServer);
return template;
}
@Bean
public RocketMQMessageConverter rocketMQMessageConverter() {
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeConverter = (CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> converterList = compositeConverter.getConverters();
for (MessageConverter messageConverter : converterList) {
if (messageConverter instanceof MappingJackson2MessageConverter) {
((MappingJackson2MessageConverter) messageConverter).setSerializedPayloadClass(String.class);
}
}
return converter;
}
}
4. Binlog 事件模型 (BinlogEvent.java
)
package com.example.canalrocketmqdemo;
import lombok.Data;
import java.util.List;
import java.util.Map;
@Data
public class BinlogEvent {
public enum EventType { INSERT, UPDATE, DELETE }
private String database;
private String table;
private EventType eventType;
private Long executeTime;
private List<Map<String, String>> before; // DELETE/UPDATE 前的数据
private List<Map<String, String>> after; // INSERT/UPDATE 后的数据
// 获取唯一标识(用于幂等处理)
public String getUniqueId() {
return database + ":" + table + ":" + executeTime;
}
}
5. RocketMQ 生产者 (BinlogEventProducer.java
)
package com.example.canalrocketmqdemo;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class BinlogEventProducer {
private final RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.topic.binlog}")
private String binlogTopic;
@Autowired
public BinlogEventProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
public void sendEvent(BinlogEvent event) {
try {
rocketMQTemplate.syncSend(binlogTopic,
MessageBuilder.withPayload(event)
.setHeader("EVENT_TYPE", event.getEventType().name())
.build());
// 可添加日志记录发送情况
} catch (Exception e) {
// 添加重试或错误处理逻辑
throw new RuntimeException("发送消息到RocketMQ失败", e);
}
}
}
6. Canal 客户端 (CanalClientRunner.java
)
package com.example.canalrocketmqdemo;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Component
public class CanalClientRunner implements CommandLineRunner {
@Value("${canal.server}")
private String canalServer;
@Value("${canal.destination}")
private String destination;
@Value("${canal.batch-size}")
private int batchSize;
@Value("${canal.username}")
private String username;
@Value("${canal.password}")
private String password;
@Value("${canal.retry.max-attempts}")
private int maxRetryAttempts;
@Value("${canal.retry.backoff}")
private long retryBackoff;
@Autowired
private BinlogEventProducer eventProducer;
private CanalConnector connector;
private volatile boolean running = true;
@Override
public void run(String... args) {
// 初始化连接
initConnector();
// 启动处理线程
new Thread(this::process).start();
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
private void initConnector() {
String[] serverParts = canalServer.split(":");
String host = serverParts[0];
int port = Integer.parseInt(serverParts[1]);
connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(host, port),
destination,
username,
password
);
connectWithRetry();
}
private void connectWithRetry() {
int attempts = 0;
while (attempts < maxRetryAttempts) {
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
return;
} catch (Exception e) {
attempts++;
if (attempts >= maxRetryAttempts) {
throw new RuntimeException("连接Canal失败,达到最大重试次数", e);
}
try {
Thread.sleep(retryBackoff);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
private void process() {
while (running) {
try {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}
processEntries(message.getEntries());
connector.ack(batchId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
// 处理异常,尝试重连
handleProcessingException(e);
}
}
}
private void processEntries(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
Header header = entry.getHeader();
BinlogEvent event = new BinlogEvent();
event.setDatabase(header.getSchemaName());
event.setTable(header.getTableName());
event.setExecuteTime(header.getExecuteTime());
event.setEventType(mapEventType(rowChange.getEventType()));
for (RowData rowData : rowChange.getRowDatasList()) {
if (event.getEventType() == BinlogEvent.EventType.UPDATE ||
event.getEventType() == BinlogEvent.EventType.DELETE) {
event.setBefore(convertColumns(rowData.getBeforeColumnsList()));
}
if (event.getEventType() == BinlogEvent.EventType.INSERT ||
event.getEventType() == BinlogEvent.EventType.UPDATE) {
event.setAfter(convertColumns(rowData.getAfterColumnsList()));
}
// 发送事件到RocketMQ
eventProducer.sendEvent(event);
}
} catch (Exception e) {
// 记录错误日志,不要阻断主流程
System.err.println("处理binlog条目失败: " + e.getMessage());
}
}
}
private List<Map<String, String>> convertColumns(List<Column> columns) {
return columns.stream()
.map(col -> Map.of(
"name", col.getName(),
"value", col.getValue(),
"type", col.getMysqlType(),
"updated", String.valueOf(col.getUpdated())
))
.collect(Collectors.toList());
}
private BinlogEvent.EventType mapEventType(EventType eventType) {
switch (eventType) {
case INSERT: return BinlogEvent.EventType.INSERT;
case UPDATE: return BinlogEvent.EventType.UPDATE;
case DELETE: return BinlogEvent.EventType.DELETE;
default: return null;
}
}
private void handleProcessingException(Exception e) {
System.err.println("处理Canal消息异常: " + e.getMessage());
try {
Thread.sleep(retryBackoff);
connectWithRetry(); // 尝试重新连接
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private void shutdown() {
running = false;
if (connector != null) {
connector.disconnect();
}
}
}
7. RocketMQ 消费者 (BinlogEventConsumer.java
)
package com.example.canalrocketmqdemo;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "${rocketmq.topic.binlog}",
consumerGroup = "${rocketmq.consumer.group}"
)
public class BinlogEventConsumer implements RocketMQListener<BinlogEvent> {
private static final Logger logger = LoggerFactory.getLogger(BinlogEventConsumer.class);
@Override
public void onMessage(BinlogEvent event) {
try {
switch (event.getEventType()) {
case INSERT:
handleInsert(event);
break;
case UPDATE:
handleUpdate(event);
break;
case DELETE:
handleDelete(event);
break;
}
logger.info("处理成功: {}.{} {}",
event.getDatabase(), event.getTable(), event.getEventType());
} catch (Exception e) {
logger.error("处理事件失败: {}", event, e);
// 这里可以添加重试逻辑或发送到死信队列
}
}
private void handleInsert(BinlogEvent event) {
// 实际业务处理:更新缓存、同步到ES、数据仓库等
logger.info("INSERT 事件: {}.{}", event.getDatabase(), event.getTable());
logger.debug("新增数据: {}", event.getAfter());
}
private void handleUpdate(BinlogEvent event) {
logger.info("UPDATE 事件: {}.{}", event.getDatabase(), event.getTable());
logger.debug("变更前: {}", event.getBefore());
logger.debug("变更后: {}", event.getAfter());
}
private void handleDelete(BinlogEvent event) {
logger.info("DELETE 事件: {}.{}", event.getDatabase(), event.getTable());
logger.debug("删除数据: {}", event.getBefore());
}
}
8. 主应用类 (Application.java
)
package com.example.canalrocketmqdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
环境准备与部署步骤
1. MySQL 配置
修改 MySQL 配置文件 (
my.cnf
或my.ini
):[mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1 expire_logs_days=3
重启 MySQL 服务
创建 Canal 用户:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
2. Canal Server 部署
解压并修改配置:
cd canal vi conf/example/instance.properties
修改内容:
properties
canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal_password canal.instance.filter.regex=.*\\..*
启动 Canal:
sh bin/startup.sh # Linux bin/startup.bat # Windows
3. RocketMQ 部署
下载 RocketMQ: https://rocketmq.apache.org/download
启动 NameServer:
nohup sh bin/mqnamesrv &
启动 Broker:
nohup sh bin/mqbroker -n localhost:9876 &
创建 Topic:
sh bin/mqadmin updateTopic -n localhost:9876 -t binlog-events -b localhost:10911
4. 启动 Spring Boot 应用
mvn spring-boot:run
5. 测试验证
在 MySQL 中执行操作:
-- 创建测试表 CREATE TABLE test_table ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(50), email VARCHAR(100) ); -- 执行DML操作 INSERT INTO test_table (name, email) VALUES ('John', 'john@example.com'); UPDATE test_table SET email = 'john.doe@example.com' WHERE id = 1; DELETE FROM test_table WHERE id = 1;
查看应用日志:
INSERT 事件: test_db.test_table UPDATE 事件: test_db.test_table DELETE 事件: test_db.test_table
使用 RocketMQ 控制台查看消息:
http://localhost:9876
生产环境优化建议
1. 性能优化
// 在 CanalClientRunner 中添加批量发送
private void processEntries(List<Entry> entries) {
List<BinlogEvent> events = new ArrayList<>();
for (Entry entry : entries) {
// ... 解析事件 ...
events.add(event);
}
// 批量发送
if (!events.isEmpty()) {
eventProducer.sendBatchEvents(events);
}
}
// 在 BinlogEventProducer 中添加批量发送方法
public void sendBatchEvents(List<BinlogEvent> events) {
List<Message<BinlogEvent>> messages = events.stream()
.map(event -> MessageBuilder.withPayload(event).build())
.collect(Collectors.toList());
rocketMQTemplate.syncSend(binlogTopic, messages);
}
2. 消息过滤
在 application.yml
中添加:
canal:
filter:
tables: user, product, order.* # 只监听指定表
在 CanalClientRunner
中:
@Value("${canal.filter.tables:}")
private String tableFilter;
private void initConnector() {
// ...
connector.subscribe(tableFilter.isEmpty() ? ".*\\..*" : tableFilter);
}
3. 消息轨迹和监控
// 在发送消息时添加消息轨迹
public void sendEvent(BinlogEvent event) {
rocketMQTemplate.syncSendOrderly(binlogTopic,
MessageBuilder.withPayload(event).build(),
event.getUniqueId());
// 添加监控指标
Metrics.counter("rocketmq.sent.messages",
"table", event.getTable(),
"type", event.getEventType().name())
.increment();
}
4. 死信队列处理
// 在 RocketMQConfig 中添加死信队列配置
@Bean
public RocketMQListenerContainerFactory rocketMQListenerContainerFactory() {
DefaultRocketMQListenerContainerFactory factory = new DefaultRocketMQListenerContainerFactory();
factory.setConsumerGroup("${rocketmq.consumer.group}");
factory.setNameServerAddress("${rocketmq.name-server}");
// 设置重试次数
factory.setMaxReconsumeTimes(3);
// 设置死信队列主题
factory.setDLQTopic("DLQ_${rocketmq.topic.binlog}");
return factory;
}
常见问题解决
1. Canal 连接问题
症状: 连接 Canal Server 失败
解决:
检查 Canal Server 是否运行:
netstat -an | grep 11111
确认用户名/密码是否正确
查看 Canal Server 日志:
logs/canal/canal.log
2. RocketMQ 发送失败
症状: 消息发送超时或失败
解决:
检查 NameServer 状态:
mqadmin clusterList -n localhost:9876
增加发送超时时间:
rocketmq.producer.send-message-timeout=5000
检查网络连接
3. 消息消费积压
症状: RocketMQ 控制台显示消息积压
解决:
增加消费者实例数量
调整批量消费大小
优化消费者处理逻辑
4. 数据不一致问题
症状: 处理后的数据与源数据不一致
解决:
添加幂等处理逻辑
实现数据校对机制
记录消息处理状态
总结
本教程完整实现了 Spring Boot 项目整合 Canal 和 RocketMQ 监听 MySQL binlog 的方案:
Canal 客户端:负责监听 MySQL binlog 变更
RocketMQ 生产者:将 binlog 事件发送到消息队列
RocketMQ 消费者:异步处理变更事件
完整配置:包含 MySQL、Canal Server 和 RocketMQ 的配置
通过使用 RocketMQ 作为消息中间件,系统获得了以下优势:
解耦:数据处理与数据捕获分离
可靠性:消息持久化保证不丢失
扩展性:消费者可水平扩展
容错性:重试机制和死信队列
顺序性:相同表的数据变更保持顺序
生产环境部署时,请根据实际流量调整:
Canal 的
batch-size
参数RocketMQ 的 Topic 分区数
消费者并发数
消息存储时间
此方案适用于需要实时数据同步的场景,如缓存更新、搜索引擎同步、实时分析等。