Canal 中配置 Kafka详解

发布于:2025-07-10 ⋅ 阅读:(19) ⋅ 点赞:(0)

在 Canal 中配置 Kafka 是一个非常常见的应用场景,通常用于将 MySQL 的数据变更实时同步到 Kafka 消息队列中,供下游系统进行消费。

这个过程主要有两种方式:

  1. 推荐方式:使用 Canal Adapter

    • 优点:这是目前官方推荐的主流方式。Adapter 作为一个独立的组件,解耦了 Canal Server 和目标存储(如 Kafka、Elasticsearch、HBase 等)。配置更灵活,支持动态路由、数据ETL、多种数据源和目标源的组合。
    • 架构MySQL -> Canal Server -> Canal Adapter -> Kafka
  2. 旧版方式:Canal Server 直连 Kafka

    • 优点:架构简单,少一个组件。
    • 缺点:功能相对单一,耦合度高,不方便进行数据转换和动态路由,已不被官方主推。
    • 架构MySQL -> Canal Server -> Kafka

下面我将重点介绍 推荐方式(使用 Canal Adapter) 的配置步骤,因为它更强大和灵活。


准备工作

在开始配置前,请确保你已经准备好以下环境:

  1. MySQL 数据库:并已开启 Binlog(格式为 ROW)。
  2. Zookeeper 集群:Kafka 依赖 Zookeeper。
  3. Kafka 集群:可以正常生产和消费消息。
  4. Canal Server:已下载并可以成功启动,能够连接到 MySQL。
  5. Canal Adapter:已下载 canal.adapter-x.x.x.tar.gz 并解压。

核心配置步骤 (使用 Canal Adapter)

第 1 步:配置 Canal Server

这一步的目的是让 Canal Server 将数据变更发送出来,供 Canal Adapter 来消费。

  1. 修改 conf/canal.properties
    这个是 Canal Server 的全局配置文件。你需要确保 Canal Server 的模式是 tcp,以便 Adapter 可以连接。

    # canal server 的工作模式,tcp模式是默认的,提供给adapter/client消费
    canal.server.mode = tcp
    
  2. 修改 conf/example/instance.properties
    这是 example 实例的配置文件。你需要确保这个实例已经正确配置并可以连接到你的 MySQL。

    # ... 其他MySQL连接配置 ...
    canal.instance.mysql.slaveId = 1
    canal.instance.master.address = 127.0.0.1:3306
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    # ...
    
    # 这一步非常重要!
    # 指定数据的输出格式。Canal Adapter 消费的是 protobuf 格式。
    # canal.mq.topic 这个可以不用配置,因为我们用Adapter来管理topic
    

    注意:在 Adapter 模式下,instance.properties不需要配置任何 canal.mq.* 相关的 Kafka 参数。Server 的职责只是抓取 Binlog 并通过 TCP 端口暴露出去。

第 2 步:配置 Canal Adapter (核心)

这是整个配置过程的重点。所有与 Kafka 相关的配置都在 Adapter 中完成。

  1. 进入 Canal Adapter 的 conf 目录
  2. 核心配置文件是 application.yml

打开 application.yml,你会看到两个主要部分:canal.confadapter.conf

2.1 配置 canal.conf

这部分是配置 Adapter 如何去连接 Canal Server。

canal.conf:
  # Canal Server 的连接模式,这里我们用 tcp
  mode: tcp
  # Canal Server 的地址和端口
  flatMessage: false
  zookeeperHosts: ""
  servers: 127.0.0.1:11111 # 你的Canal Server的IP和端口
  destinations: example # 你要订阅的Canal Server实例名,多个用逗号隔开
  user: "" # Canal 1.1.5+ 支持用户名密码
  passwd: ""
2.2 配置 adapter.conf (重点中的重点)

这部分是配置 Adapter 如何将数据写入到目标端(即 Kafka)。

adapter.conf:
  # 数据源分组,可以有多个group
  groups:
    - groupId: g1 # 组ID,可以自定义
      # 外部适配器配置,即数据要写入到哪里
      outerAdapters:
        - name: logger # 用于调试,会在日志中打印数据变更信息,建议保留
        - name: kafka # 这是我们要配置的Kafka适配器
          key: my_kafka # 唯一标识,可以自定义
          properties:
            # Kafka brokers 地址
            bootstrap.servers: 192.168.1.100:9092,192.168.1.101:9092
            # 指定发送到哪个Kafka Topic。这里是固定topic
            # 如果想动态生成topic,比如根据库名和表名,可以使用占位符
            # topic: my_canal_data
            topic: ${db}_${table}  # 推荐!动态topic,例如:test_db_user_table
            
            # Kafka生产者的一些配置 (按需配置)
            acks: 1
            compression.type: lz4
            # ... 其他 Kafka Producer 配置
            
            # Canal消息到Kafka消息的映射关系
            # partition: 0          # 固定分区
            # partitionsNum: 3      # topic总分区数,用于哈希计算
            # hashColumns: id,name  # 使用哪些列的值进行hash来决定分区
            # partition-hash: ${db}.${table}:${id} # 更灵活的分区hash策略

关键参数解释

  • outerAdapters: 定义了多个输出目标。我们在这里定义了一个名为 kafka 的输出。
  • name: kafka: 表明这是一个 Kafka 适配器。
  • properties:
    • bootstrap.servers: 【必填】 你的 Kafka 集群地址。
    • topic: 【必填】 目标 Topic 名称。
      • 固定 Topictopic: my_test_topic,所有数据都进入这一个 Topic。
      • 动态 Topictopic: ${db}_${table},这是非常强大的功能。它会根据数据变更发生的数据库名(db)和表名(table)自动创建或发送到对应的 Topic。例如,testdb.user 表的变更会发送到 testdb_user 这个 Topic。
    • partition-hash: 指定如何计算分区。id 是最常用的分区键,可以保证同一条记录的所有变更(INSERT, UPDATE, DELETE)都进入同一个分区,从而保证了处理的顺序性。
第 3 步:启动服务

请按照以下顺序启动所有组件:

  1. 启动 Zookeeper
  2. 启动 Kafka
  3. 启动 MySQL
  4. 启动 Canal Server
    sh bin/startup.sh
    
  5. 启动 Canal Adapter
    sh bin/startup.sh
    
第 4 步:验证
  1. 查看日志

    • 查看 logs/canal/canal.log,确保 Canal Server 成功连接到 MySQL。
    • 查看 logs/adapter/adapter.log,确保 Adapter 成功连接到 Canal Server 和 Kafka。应该能看到类似 “start adapter for canal-client: … destination: example” 和 Kafka 连接成功的日志。
  2. 在 MySQL 中进行操作
    在你的数据库中,对某个表进行 INSERT, UPDATE, DELETE 操作。
    例如,在 testdb.user 表中插入一条数据。

  3. 使用 Kafka 消费者工具验证
    打开一个终端,使用 Kafka 自带的消费者脚本来监听 Topic。

    • 如果使用动态 Topic (${db}_${table}), 假设你的库是 testdb,表是 user

      # 你的kafka安装目录
      cd /path/to/kafka/
      
      # 监听 testdb_user 这个topic
      bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic testdb_user --from-beginning
      
    • 如果使用固定 Topic (my_canal_data):

      bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic my_canal_data --from-beginning
      
  4. 观察输出
    如果一切正常,你会在消费者终端看到一个 JSON 格式的消息,类似下面这样:

    {
      "data": [
        {
          "id": "1",
          "name": "John Doe",
          "email": "john.doe@example.com"
        }
      ],
      "database": "testdb",
      "es": 1668888888000,
      "id": 1,
      "isDdl": false,
      "mysqlType": {
        "id": "bigint(20)",
        "name": "varchar(255)",
        "email": "varchar(255)"
      },
      "old": null,
      "pkNames": [
        "id"
      ],
      "sql": "",
      "sqlType": {
        "id": -5,
        "name": 12,
        "email": 12
      },
      "table": "user",
      "ts": 1668888888123,
      "type": "INSERT"
    }
    

    这个 JSON 结构包含了数据变更的详细信息:

    • data: 变更后的数据行。
    • old: 变更前的数据行 (对 UPDATE 操作有值)。
    • type: 操作类型 (INSERT, UPDATE, DELETE)。
    • database: 数据库名。
    • table: 表名。

至此,Canal 通过 Adapter 将数据变更同步到 Kafka 的配置就完成了。这种方式扩展性好,强烈推荐使用。


网站公告

今日签到

点亮在社区的每一天
去签到