使用Flink CDC实时监控MySQL数据库变更

发布于:2024-06-28 ⋅ 阅读:(15) ⋅ 点赞:(0)

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
        .serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海
        .hostname("localhost") // MySQL的IP地址
        .port(3306) // MySQL的端口
        .username("root") // MySQL的用户名
        .password("123456") // MySQL的密码
        .databaseList("my_db") // 监控的数据库
        .tableList("my_db.user") // 监控的数据库下的表
        .deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化
        .startupOptions(StartupOptions.initial()) // 启动选项
        .build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

/**
*  自定义DeserializationSchema进行反序列化。
*/

public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
   @Override
   public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
       //创建JSON对象用于存储最终数据
       JSONObject result = new JSONObject();
       String topic = sourceRecord.topic();
       String[] fields = topic.split("\\.");
       String database = fields[1];
       String tableName = fields[2];
       Struct value  = (Struct)sourceRecord.value();
       //获取before数据
       Struct before = value.getStruct("before");
       JSONObject beforeJson = getJson(before);
       //获取after数据
       Struct after = value.getStruct("after");
       JSONObject afterJson = getJson(after);
       //获取操作类型
       Envelope.Operation operation = Envelope.operationFor(sourceRecord);
       //将字段写入JSON对象
       result.put("database",database);
       result.put("tableName",tableName);
       result.put("type",operation);
       result.put("before",beforeJson);
       result.put("after",afterJson);
       //输出数据
       collector.collect(result.toJSONString());
   }
   /**
    *  获取字段值并写入result对象
    * @param before
    * @return
    */
   private JSONObject getJson(Struct before) {
       JSONObject jsonObject = new JSONObject();
       if(before != null){
           Schema beforeSchema = before.schema();
           List<Field> beforeFields = beforeSchema.fields();
           for (Field field : beforeFields) {
               Object beforeValue = before.get(field);
               jsonObject.put(field.name(), beforeValue);
           }
       }
       return jsonObject;
   }
   @Override
   public TypeInformation getProducedType() {
       return BasicTypeInfo.STRING_TYPE_INFO;

   }
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。