提示:
Db2ValueConverters
类是Debezium框架的一部分,用于处理IBM DB2数据库的数据类型转换。
前言
提示:Db2ValueConverters
的主要职责是在DB2数据库特有的数据类型和Debezium或Kafka Connect所使用的数据类型之间进行转换
提示:以下是本篇文章正文内容
一、核心功能
核心功能详细说明
1. 数据类型映射与Schema构建
Db2ValueConverters
通过重写父类JdbcValueConverters
中的schemaBuilder
方法,为DB2数据库中的特定数据类型构建Kafka Connect的Schema。例如,对于DB2的TINYINT
类型,它会被映射为Kafka Connect的int16
类型,这是因为DB2的TINYINT
实际上是一个8位无符号整数,其值域为0至255,而Kafka Connect的int16
类型能更好地表示这一范围。
对于DB2特有的DECFLOAT
类型,Db2ValueConverters
提供了专门的处理逻辑。它会根据配置的DecimalMode
来决定如何构建Schema,是使用精确的VariableScaleDecimal
还是使用SpecialValueDecimal
。
2. 值转换
Db2ValueConverters
通过重写的converter
方法,为DB2的每种数据类型提供了一个值转换器。这些转换器负责将数据库中的原始值转换为Kafka Connect或Debezium可识别的格式。例如,对于DECFLOAT
类型,Db2ValueConverters
定义了convertDecfloat
方法,该方法处理SpecialValueDecimal
对象,根据配置的DecimalMode
进行必要的精度调整,最终生成符合要求的BigDecimal
对象。
3. 时间精度处理
对于时间相关的数据类型,Db2ValueConverters
通过重写getTimePrecision
方法来获取DB2列的时间精度,即列的刻度。这对于正确处理时间戳类型的精度至关重要,确保下游系统接收到的时间数据与数据库中的原始数据保持一致。
4. 特殊值处理
DB2数据库中可能存在一些特殊的数值,如无穷大或非数字值(NaN)。Db2ValueConverters
通过convertDecfloat
方法中的逻辑,确保这些特殊值能够被正确识别和转换,避免数据转换过程中的异常或不一致。
5. 配置灵活性
Db2ValueConverters
提供了两个构造函数,其中一个接受DecimalMode
和TemporalPrecisionMode
参数。这允许用户在创建Db2ValueConverters
实例时,根据具体需求定制数值和时间类型数据的处理方式,增加了框架的灵活性和适应性。
二、代码分析
package io.debezium.connector.db2;
import java.math.BigDecimal;
import java.sql.Types;
import java.time.ZoneOffset;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
/**
* Conversion of DB2 specific datatypes.
*
* @author Jiri Pechanec, Peter Urbanetz
*
*/
// 转换DB2数据库特有的数据类型。
public class Db2ValueConverters extends JdbcValueConverters {
public Db2ValueConverters() {
// 默认构造函数,调用父类默认构造函数。
}
// 构造函数,允许用户在创建Db2ValueConverters实例时指定DecimalMode和TemporalPrecisionMode。
public Db2ValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode) {
super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, null, null, null);
// 调用父类构造函数,设置默认时区为UTC,以及其他默认配置。
}
@Override
public SchemaBuilder schemaBuilder(Column column) {
// 重写schemaBuilder方法,用于构建Kafka Connect的Schema。
switch (column.jdbcType()) {
// 如果是TINYINT类型,由于其值域为0-255,故使用int16类型表示。
case Types.TINYINT:
return SchemaBuilder.int16();
// 如果是OTHER类型且类型名为DECFLOAT,调用decfloatSchema方法构建Schema。
case Types.OTHER:
if (matches(column.typeName().toUpperCase(), "DECFLOAT")) {
return decfloatSchema(column);
}
// 其他类型,调用父类的schemaBuilder方法。
default:
return super.schemaBuilder(column);
}
}
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
// 重写converter方法,用于创建值转换器。
switch (column.jdbcType()) {
// TINYINT类型,创建转换器,将TINYINT转换为short类型。
case Types.TINYINT:
return (data) -> convertSmallInt(column, fieldDefn, data);
// OTHER类型且类型名为DECFLOAT,创建转换器,调用convertDecfloat方法进行转换。
case Types.OTHER:
if (matches(column.typeName().toUpperCase(), "DECFLOAT")) {
return (data) -> convertDecfloat(column, fieldDefn, data, decimalMode);
}
// 其他类型,调用父类的converter方法。
default:
return super.converter(column, fieldDefn);
}
}
protected Object convertDecfloat(Column column, Field fieldDefn, Object data, DecimalMode mode) {
// 处理DECFLOAT类型数据的转换。
SpecialValueDecimal value;
BigDecimal newDecimal;
// 如果data已经是SpecialValueDecimal类型,直接使用。
if (data instanceof SpecialValueDecimal) {
value = (SpecialValueDecimal) data;
if (value.getDecimalValue().isEmpty()) {
// 如果DecimalValue为空,创建一个新的SpecialValueDecimal。
return SpecialValueDecimal.fromLogical(value, mode, column.name());
}
} else {
// 否则,尝试将data转换为BigDecimal。
final Object o = toBigDecimal(column, fieldDefn, data);
if (!(o instanceof BigDecimal)) {
// 如果转换失败,直接返回原数据。
return o;
}
value = new SpecialValueDecimal((BigDecimal) o);
}
// 调整BigDecimal的精度。
newDecimal = withScaleAdjustedIfNeeded(column, value.getDecimalValue().get());
if (mode == DecimalMode.PRECISE) {
// 如果DecimalMode为PRECISE,进一步处理BigDecimal的精度。
newDecimal = newDecimal.stripTrailingZeros();
if (newDecimal.scale() < 0) {
newDecimal = newDecimal.setScale(0);
}
// 创建VariableScaleDecimal或SpecialValueDecimal。
return VariableScaleDecimal.fromLogical(fieldDefn.schema(), new SpecialValueDecimal(newDecimal));
}
// 创建SpecialValueDecimal。
return SpecialValueDecimal.fromLogical(new SpecialValueDecimal(newDecimal), mode, column.name());
}
// 返回DB2列的时间精度,默认为7。
@Override
protected int getTimePrecision(Column column) {
return column.scale().get();
}
// 空实现,用于转换带时区的时间戳,但在这个类中没有具体实现。
protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
return super.convertTimestampWithZone(column, fieldDefn, data);
}
// 检查列的类型名称是否完全匹配或以指定前缀开始。
protected static boolean matches(String upperCaseTypeName, String upperCaseMatch) {
if (upperCaseTypeName == null) {
return false;
}
return upperCaseMatch.equals(upperCaseTypeName) || upperCaseTypeName.startsWith(upperCaseMatch + "(");
}
// 为DECFLOAT类型构建Schema。
private SchemaBuilder decfloatSchema(Column column) {
if (decimalMode == DecimalMode.PRECISE) {
return VariableScaleDecimal.builder();
}
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElse(0));
}
}
总结
提示:Db2ValueConverters
类在Debezium框架中承担着桥梁的角色,它不仅负责将DB2数据库中的数据类型映射到Kafka Connect和Debezium支持的数据类型,还提供了详细的转换逻辑,确保数据在转换过程中不会丢失精度或产生错误。通过精心设计的数据类型映射和转换规则,Db2ValueConverters
保证了从DB2数据库捕获的变更事件能够被下游系统准确无误地理解和消费。