在 Logstash 中配置从 MySQL 到 Kafka 的数据传输是一个非常经典且强大的用例,常用于数据同步、CDC(变更数据捕获)和实时数据管道。
下面我将详细解析配置文件的每个部分,并提供多个场景的示例。
核心架构与组件
数据流:MySQL → Logstash (jdbc input
→ filter
→ kafka output
) → Kafka
为了实现高效的增量同步,其核心工作机制如下所示:
基础配置文件详解 (mysql-to-kafka.conf
)
input {
jdbc {
# 【必需】JDBC 连接字符串
jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
# 【必需】数据库用户名和密码
jdbc_user => "your_username"
jdbc_password => "your_password"
# 【必需】MySQL JDBC 驱动路径
# 需要手动下载 https://dev.mysql.com/downloads/connector/j/
jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 注意类名
# 【必需】要执行的 SQL 语句
# 1. 使用增量字段(如update_time, id)进行增量查询
# 2. :sql_last_value 是Logstash提供的变量,记录上一次执行的值
statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"
# 【强烈建议】定时执行,使用cron表达式。例如每分钟一次。
schedule => "* * * * *"
# 【强烈建议】记录上次查询结果的字段值(如最大的update_time或id)
# 此文件由Logstash管理,用于下次查询的:sql_last_value
record_last_run => true
last_run_metadata_path => "/path/to/.logstash_jdbc_last_run"
# 【可选】是否强制将JDBC列的字符串转换为UTF-8
jdbc_default_timezone => "UTC"
jdbc_force_standard_timezone => true
# 【可选】分页查询,用于处理大表
jdbc_paging_enabled => true
jdbc_page_size => 100000
}
}
filter {
# 此处是进行数据清洗和转换的地方,根据需求添加。
# 例如:
# 1. 删除不必要的字段
mutate {
remove_field => ["@version", "@timestamp"]
}
# 2. 如果需要,可以将记录转换为JSON字符串(如果Kafka希望接收字符串消息)
# json {
# source => "message"
# target => "value"
# }
}
output {
kafka {
# 【必需】Kafka集群的broker列表
bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"
# 【必需】目标Topic的名称
topic_id => "mysql-your_table-topic"
# 【必需】指定消息的序列化格式。通常使用JSON。
codec => json
# 【可选】消息