Logstash数据迁移之mysql-to-kafka.conf详细配置

发布于:2025-08-29 ⋅ 阅读:(19) ⋅ 点赞:(0)

在 Logstash 中配置从 MySQL 到 Kafka 的数据传输是一个非常经典且强大的用例,常用于数据同步、CDC(变更数据捕获)和实时数据管道

下面我将详细解析配置文件的每个部分,并提供多个场景的示例。

核心架构与组件

数据流:MySQL → Logstash (jdbc inputfilterkafka output) → Kafka

为了实现高效的增量同步,其核心工作机制如下所示:

在这里插入图片描述

基础配置文件详解 (mysql-to-kafka.conf)

input {
  jdbc {
    # 【必需】JDBC 连接字符串
    jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"

    # 【必需】数据库用户名和密码
    jdbc_user => "your_username"
    jdbc_password => "your_password"

    # 【必需】MySQL JDBC 驱动路径
    # 需要手动下载 https://dev.mysql.com/downloads/connector/j/
    jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 注意类名

    # 【必需】要执行的 SQL 语句
    # 1. 使用增量字段(如update_time, id)进行增量查询
    # 2. :sql_last_value 是Logstash提供的变量,记录上一次执行的值
    statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"

    # 【强烈建议】定时执行,使用cron表达式。例如每分钟一次。
    schedule => "* * * * *"

    # 【强烈建议】记录上次查询结果的字段值(如最大的update_time或id)
    # 此文件由Logstash管理,用于下次查询的:sql_last_value
    record_last_run => true
    last_run_metadata_path => "/path/to/.logstash_jdbc_last_run" 

    # 【可选】是否强制将JDBC列的字符串转换为UTF-8
    jdbc_default_timezone => "UTC"
    jdbc_force_standard_timezone => true

    # 【可选】分页查询,用于处理大表
    jdbc_paging_enabled => true
    jdbc_page_size => 100000
  }
}

filter {
  # 此处是进行数据清洗和转换的地方,根据需求添加。
  # 例如:
  
  # 1. 删除不必要的字段
  mutate {
    remove_field => ["@version", "@timestamp"]
  }

  # 2. 如果需要,可以将记录转换为JSON字符串(如果Kafka希望接收字符串消息)
  # json {
  #   source => "message"
  #   target => "value"
  # }
}

output {
  kafka {
    # 【必需】Kafka集群的broker列表
    bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"

    # 【必需】目标Topic的名称
    topic_id => "mysql-your_table-topic"

    # 【必需】指定消息的序列化格式。通常使用JSON。
    codec => json

    # 【可选】消息

网站公告

今日签到

点亮在社区的每一天
去签到