Canal+RabbitMQ数据同步环境配置

发布于:2024-09-17 ⋅ 阅读:(48) ⋅ 点赞:(0)

Canal 是阿里巴巴开发的开源工具,主要用于解析 MySQL 的 binlog 日志,从而实现数据同步。Canal 会模拟 MySQL 从库的协议,订阅主库的 binlog,从而获取数据库的变更信息。

将 Canal 解析到的 MySQL 数据库变更消息通过 RabbitMQ 分发给下游的消费服务。RabbitMQ 在这个场景中充当消息中间件,保证消息的可靠传递和队列管理。

大致流程:

  1. MySQL 发生数据变更(插入、更新、删除等)。
  2. Canal 订阅 MySQL 的 binlog,解析出变更的具体数据。
  3. Canal 通过 RabbitMQ 将变更数据发送到指定队列。
  4. RabbitMQ 分发消息,下游消费者监听并处理消息,完成数据同步。

1. 配置Mysql主从同步

在 MySQL 中开启 binlog,并配置一个从库账号,供 Canal 使用。

1.1 修改 MySQL 配置文件 my.cnf ,确保开启 binlog 和 ROW 格式。

在MySQL配置文件my.cnf设置如下信息,开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,ROW 模式表示以行为单位记录每个被修改的行的变更。

修改如下:

vi /usr/mysql/conf/my.cnf

[mysqld]
#打开binlog
log-bin=mysql-bin
#选择ROW(行)模式
binlog-format=ROW
#配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

1.2 创建 Canal 用户,并赋予足够的权限来读取 binlog

进入mysql容器:docker exec -it mysql /bin/bash

-- 使用命令登录:mysql -u root -p

-- 创建用户 用户名:canal 密码:canal

create user 'canal'@'%' identified WITH mysql_native_password by 'canal';

-- 授权 *.*表示所有库

GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  • SELECT: 允许用户查询(读取)数据库中的数据。

  • REPLICATION SLAVE 允许用户作为 MySQL 复制从库,用于同步主库的数据。

  • REPLICATION CLIENT: 允许用户连接到主库并获取关于主库状态的信息。

1.3 重启MySQL,查看配置信息

(1)使用命令查看是否打开binlog模式:SHOW VARIABLES LIKE 'log_bin';

ON表示开启binlog模式。

(2)show variables like 'binlog_format';

binlog_format 的值为 row 时,表示 MySQL 服务器当前配置为使用行级别的二进制日志记录,这对于数据库复制和数据同步来说更为安全,因为它记录了对数据行的确切更改。

(3)查看binlog日志文件列表:SHOW BINARY LOGS;

(4)查看当前正在写入的binlog文件:SHOW MASTER STATUS;

2. 安装Canal

2.1 获取canal镜像

docker pull canal/canal-server:latest

2.2 配置 Canal 连接到 MySQL

修改 Canal 配置文件 conf/example/instance.properties,配置 Canal 连接到 MySQL。

# MySQL 连接信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal_password
canal.instance.defaultDatabaseName = your_db_name

2.3 配置 RabbitMQ 作为消息推送的目标

配置 Canal 的 conf/canal.properties 文件,集成 RabbitMQ 作为消息推送的目标。

# 配置 Canal 的 MQ 发送
canal.mq.servers = 127.0.0.1:5672
canal.mq.queue = example_queue   # 指定RabbitMQ的队列
canal.mq.exchange = canal-exchange
canal.mq.username = guest
canal.mq.password = guest

2.4 启动 Canal Server

sh bin/startup.sh

3. 安装RabbitMQ

3.1 拉取镜像

docker pull registry.cn-hangzhou.aliyuncs.com/itheima/rabbitmq:3.9.17-management-delayed

3.2 启动rabbitmq管理端

使用 RabbitMQ 管理界面(通常位于 http://localhost:15672),创建一个 exchangequeue,并配置消息路由。

# 启动rabbitmq管理端
rabbitmq-plugins enable rabbitmq_management
# 启动延迟队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange