使用前提:
一、mysql开启了logibin
在mysql的安装路径下的my.ini中
【mysqlid】下
添加
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
参考gitee的项目,即拉即用。参考地址
zhanghl/spring-boot-debeziumhttps://gitee.com/zhl001/spring-boot-debezium项目中一个三个文件,pom和两个类需要参考。
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.13.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.felord</groupId>
<artifactId>spring-boot-debezium</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-debezium</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<debezium.version>1.5.2.Final</debezium.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--springboot与mybatis的整合包-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
<!--mysql驱动包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--springboot与JDBC整合包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--sqlserver数据源-->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>sqljdbc4</artifactId>
<version>4.0</version>
</dependency>
</dependencies>
两个java类
DebeziumConfiguration.java
package cn.felord.debezium.debezium;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Map;
import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;
/**
* The type Debezium configuration.
*
* @author n1
* @since 2021 /6/1 17:01
*/
@Configuration
public class DebeziumConfiguration {
/**
* Debezium 配置.
*
* @return configuration
*/
@Bean
io.debezium.config.Configuration debeziumConfig() {
return io.debezium.config.Configuration.create()
// 连接器的Java类名称
.with("connector.class", MySqlConnector.class.getName())
// 偏移量持久化,用来容错 默认值
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
// 偏移量持久化文件路径 默认/tmp/offsets.dat 如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
// 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
.with("offset.storage.file.filename", "/tmp/offsets.dat")
// 捕获偏移量的周期
.with("offset.flush.interval.ms", "1")
// 连接器的唯一名称
.with("name", "mysql-connector")
// 数据库的hostname
.with("database.hostname", "10.1.1.1")
// 端口
.with("database.port", "3306")
// 用户名
.with("database.user", "canal")
// 密码
.with("database.password", "canal")
// 包含的数据库列表,你的数据库
.with("database.include.list", "md_test")
// 是否包含数据库表结构层面的变更,建议使用默认值true
.with("include.schema.changes", "false")
// mysql.cnf 配置的 server-id
.with("database.server.id", 1)
// MySQL 服务器或集群的逻辑名称
.with("database.server.name", "customer-mysql-db-server")
// 历史变更记录
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
// 历史变更记录存储位置,存储DDL
.with("database.history.file.filename", "/tmp/dbhistory.dat")
.build();
}
/**
* Debezium server bootstrap debezium server bootstrap.
*
* @param configuration the configuration
* @return the debezium server bootstrap
*/
@Bean
DebeziumServerBootstrap debeziumServerBootstrap(io.debezium.config.Configuration configuration) {
DebeziumServerBootstrap debeziumServerBootstrap = new DebeziumServerBootstrap();
DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(configuration.asProperties())
.notifying(this::handlePayload)
.build();
debeziumServerBootstrap.setDebeziumEngine(debeziumEngine);
return debeziumServerBootstrap;
}
private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
recordChangeEvents.forEach(r -> {
SourceRecord sourceRecord = r.record();
String topic = sourceRecord.topic();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
// 判断操作的类型 过滤掉读 只处理增删改 这个其实可以在配置中设置
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Envelope.Operation.READ) {
String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
// 获取增删改对应的结构体数据
Struct struct = (Struct) sourceRecordChangeValue.get(record);
// 将变更的行封装为Map
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
// 这里简单打印一下
System.out.println("operation = " + operation);
System.out.println("data = " + payload);
if(operation.toString().equals("CREATE")){
System.out.println("新增记录一条");
}
//tabelName
if(topic.split("\\.").length > 2){
String tableName = topic.split("\\.")[2];
System.out.println("tabelName" + tableName);
}
}
}
});
}
}
DebeziumServerBootstrap.java
package cn.felord.debezium.debezium;
import io.debezium.engine.DebeziumEngine;
import lombok.Data;
import lombok.SneakyThrows;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* @author n1
* @since 2021/6/2 10:45
*/
@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {
private final Executor executor = Executors.newSingleThreadExecutor();
private DebeziumEngine<?> debeziumEngine;
@Override
public void start() {
executor.execute(debeziumEngine);
}
@SneakyThrows
@Override
public void stop() {
debeziumEngine.close();
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
}
}
结语:
太强了,比canal强10倍,非侵入性。对比可参考:
为什么是debezium
这么多技术框架,为什么选debezium?
看起来很多。但一一排除下来就debezium和canal。
sqoop,kettle,datax之类的工具,属于前大数据时代的产物,地位类似于web领域的structs2。而且,它们基于查询而非binlog日志,其实不属于CDC。首先排除。
flink cdc是大数据领域的框架,一般web项目的数据量属于大材小用了。
同时databus,maxwell相对比较冷门,用得比较少。
「最后不用canal的原因有以下几点:」
canal需要安装,这违背了“如非必要,勿增实体”的原则。
canal只能对MYSQL进行CDC监控。有很大的局限性。
大数据领域非常流行的flink cdc(阿里团队主导)底层使用的也是debezium,而非同是阿里出品的canal。
debezium可借助kafka组件,将变动的数据发到kafka topic,后续的读取操作只需读取kafka,可有效减少数据库的读取压力。可保证一次语义,至少一次语义。
同时,也可基于内嵌部署模式,无需我们手动部署kafka集群,可满足”如非必要,勿增实体“的原则。
而且canal只支持源端MySQL版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。