在 Canal 中配置 Kafka 是一个非常常见的应用场景,通常用于将 MySQL 的数据变更实时同步到 Kafka 消息队列中,供下游系统进行消费。
这个过程主要有两种方式:
推荐方式:使用 Canal Adapter
- 优点:这是目前官方推荐的主流方式。Adapter 作为一个独立的组件,解耦了 Canal Server 和目标存储(如 Kafka、Elasticsearch、HBase 等)。配置更灵活,支持动态路由、数据ETL、多种数据源和目标源的组合。
- 架构:
MySQL -> Canal Server -> Canal Adapter -> Kafka
旧版方式:Canal Server 直连 Kafka
- 优点:架构简单,少一个组件。
- 缺点:功能相对单一,耦合度高,不方便进行数据转换和动态路由,已不被官方主推。
- 架构:
MySQL -> Canal Server -> Kafka
下面我将重点介绍 推荐方式(使用 Canal Adapter) 的配置步骤,因为它更强大和灵活。
准备工作
在开始配置前,请确保你已经准备好以下环境:
- MySQL 数据库:并已开启 Binlog(格式为
ROW
)。 - Zookeeper 集群:Kafka 依赖 Zookeeper。
- Kafka 集群:可以正常生产和消费消息。
- Canal Server:已下载并可以成功启动,能够连接到 MySQL。
- Canal Adapter:已下载
canal.adapter-x.x.x.tar.gz
并解压。
核心配置步骤 (使用 Canal Adapter)
第 1 步:配置 Canal Server
这一步的目的是让 Canal Server 将数据变更发送出来,供 Canal Adapter 来消费。
修改
conf/canal.properties
这个是 Canal Server 的全局配置文件。你需要确保 Canal Server 的模式是tcp
,以便 Adapter 可以连接。# canal server 的工作模式,tcp模式是默认的,提供给adapter/client消费 canal.server.mode = tcp
修改
conf/example/instance.properties
这是example
实例的配置文件。你需要确保这个实例已经正确配置并可以连接到你的 MySQL。# ... 其他MySQL连接配置 ... canal.instance.mysql.slaveId = 1 canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal # ... # 这一步非常重要! # 指定数据的输出格式。Canal Adapter 消费的是 protobuf 格式。 # canal.mq.topic 这个可以不用配置,因为我们用Adapter来管理topic
注意:在 Adapter 模式下,
instance.properties
中不需要配置任何canal.mq.*
相关的 Kafka 参数。Server 的职责只是抓取 Binlog 并通过 TCP 端口暴露出去。
第 2 步:配置 Canal Adapter (核心)
这是整个配置过程的重点。所有与 Kafka 相关的配置都在 Adapter 中完成。
- 进入 Canal Adapter 的
conf
目录。 - 核心配置文件是
application.yml
。
打开 application.yml
,你会看到两个主要部分:canal.conf
和 adapter.conf
。
2.1 配置 canal.conf
这部分是配置 Adapter 如何去连接 Canal Server。
canal.conf:
# Canal Server 的连接模式,这里我们用 tcp
mode: tcp
# Canal Server 的地址和端口
flatMessage: false
zookeeperHosts: ""
servers: 127.0.0.1:11111 # 你的Canal Server的IP和端口
destinations: example # 你要订阅的Canal Server实例名,多个用逗号隔开
user: "" # Canal 1.1.5+ 支持用户名密码
passwd: ""
2.2 配置 adapter.conf
(重点中的重点)
这部分是配置 Adapter 如何将数据写入到目标端(即 Kafka)。
adapter.conf:
# 数据源分组,可以有多个group
groups:
- groupId: g1 # 组ID,可以自定义
# 外部适配器配置,即数据要写入到哪里
outerAdapters:
- name: logger # 用于调试,会在日志中打印数据变更信息,建议保留
- name: kafka # 这是我们要配置的Kafka适配器
key: my_kafka # 唯一标识,可以自定义
properties:
# Kafka brokers 地址
bootstrap.servers: 192.168.1.100:9092,192.168.1.101:9092
# 指定发送到哪个Kafka Topic。这里是固定topic
# 如果想动态生成topic,比如根据库名和表名,可以使用占位符
# topic: my_canal_data
topic: ${db}_${table} # 推荐!动态topic,例如:test_db_user_table
# Kafka生产者的一些配置 (按需配置)
acks: 1
compression.type: lz4
# ... 其他 Kafka Producer 配置
# Canal消息到Kafka消息的映射关系
# partition: 0 # 固定分区
# partitionsNum: 3 # topic总分区数,用于哈希计算
# hashColumns: id,name # 使用哪些列的值进行hash来决定分区
# partition-hash: ${db}.${table}:${id} # 更灵活的分区hash策略
关键参数解释:
outerAdapters
: 定义了多个输出目标。我们在这里定义了一个名为kafka
的输出。name: kafka
: 表明这是一个 Kafka 适配器。properties
:bootstrap.servers
: 【必填】 你的 Kafka 集群地址。topic
: 【必填】 目标 Topic 名称。- 固定 Topic:
topic: my_test_topic
,所有数据都进入这一个 Topic。 - 动态 Topic:
topic: ${db}_${table}
,这是非常强大的功能。它会根据数据变更发生的数据库名(db
)和表名(table
)自动创建或发送到对应的 Topic。例如,testdb.user
表的变更会发送到testdb_user
这个 Topic。
- 固定 Topic:
partition-hash
: 指定如何计算分区。id
是最常用的分区键,可以保证同一条记录的所有变更(INSERT, UPDATE, DELETE)都进入同一个分区,从而保证了处理的顺序性。
第 3 步:启动服务
请按照以下顺序启动所有组件:
- 启动 Zookeeper
- 启动 Kafka
- 启动 MySQL
- 启动 Canal Server
sh bin/startup.sh
- 启动 Canal Adapter
sh bin/startup.sh
第 4 步:验证
查看日志:
- 查看
logs/canal/canal.log
,确保 Canal Server 成功连接到 MySQL。 - 查看
logs/adapter/adapter.log
,确保 Adapter 成功连接到 Canal Server 和 Kafka。应该能看到类似 “start adapter for canal-client: … destination: example” 和 Kafka 连接成功的日志。
- 查看
在 MySQL 中进行操作:
在你的数据库中,对某个表进行 INSERT, UPDATE, DELETE 操作。
例如,在testdb.user
表中插入一条数据。使用 Kafka 消费者工具验证:
打开一个终端,使用 Kafka 自带的消费者脚本来监听 Topic。如果使用动态 Topic (
${db}_${table}
), 假设你的库是testdb
,表是user
:# 你的kafka安装目录 cd /path/to/kafka/ # 监听 testdb_user 这个topic bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic testdb_user --from-beginning
如果使用固定 Topic (
my_canal_data
):bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic my_canal_data --from-beginning
观察输出:
如果一切正常,你会在消费者终端看到一个 JSON 格式的消息,类似下面这样:{ "data": [ { "id": "1", "name": "John Doe", "email": "john.doe@example.com" } ], "database": "testdb", "es": 1668888888000, "id": 1, "isDdl": false, "mysqlType": { "id": "bigint(20)", "name": "varchar(255)", "email": "varchar(255)" }, "old": null, "pkNames": [ "id" ], "sql": "", "sqlType": { "id": -5, "name": 12, "email": 12 }, "table": "user", "ts": 1668888888123, "type": "INSERT" }
这个 JSON 结构包含了数据变更的详细信息:
data
: 变更后的数据行。old
: 变更前的数据行 (对 UPDATE 操作有值)。type
: 操作类型 (INSERT, UPDATE, DELETE)。database
: 数据库名。table
: 表名。
至此,Canal 通过 Adapter 将数据变更同步到 Kafka 的配置就完成了。这种方式扩展性好,强烈推荐使用。