DB2-Db2ValueConverters

发布于:2024-08-01 ⋅ 阅读:(166) ⋅ 点赞:(0)

提示:Db2ValueConverters 类是Debezium框架的一部分,用于处理IBM DB2数据库的数据类型转换。


前言

提示:Db2ValueConverters 的主要职责是在DB2数据库特有的数据类型和Debezium或Kafka Connect所使用的数据类型之间进行转换


提示:以下是本篇文章正文内容

一、核心功能

核心功能详细说明

1. 数据类型映射与Schema构建

Db2ValueConverters 通过重写父类JdbcValueConverters中的schemaBuilder方法,为DB2数据库中的特定数据类型构建Kafka Connect的Schema。例如,对于DB2的TINYINT类型,它会被映射为Kafka Connect的int16类型,这是因为DB2的TINYINT实际上是一个8位无符号整数,其值域为0至255,而Kafka Connect的int16类型能更好地表示这一范围。

对于DB2特有的DECFLOAT类型,Db2ValueConverters提供了专门的处理逻辑。它会根据配置的DecimalMode来决定如何构建Schema,是使用精确的VariableScaleDecimal还是使用SpecialValueDecimal

2. 值转换

Db2ValueConverters通过重写的converter方法,为DB2的每种数据类型提供了一个值转换器。这些转换器负责将数据库中的原始值转换为Kafka Connect或Debezium可识别的格式。例如,对于DECFLOAT类型,Db2ValueConverters定义了convertDecfloat方法,该方法处理SpecialValueDecimal对象,根据配置的DecimalMode进行必要的精度调整,最终生成符合要求的BigDecimal对象。

3. 时间精度处理

对于时间相关的数据类型,Db2ValueConverters通过重写getTimePrecision方法来获取DB2列的时间精度,即列的刻度。这对于正确处理时间戳类型的精度至关重要,确保下游系统接收到的时间数据与数据库中的原始数据保持一致。

4. 特殊值处理

DB2数据库中可能存在一些特殊的数值,如无穷大或非数字值(NaN)。Db2ValueConverters通过convertDecfloat方法中的逻辑,确保这些特殊值能够被正确识别和转换,避免数据转换过程中的异常或不一致。

5. 配置灵活性

Db2ValueConverters提供了两个构造函数,其中一个接受DecimalModeTemporalPrecisionMode参数。这允许用户在创建Db2ValueConverters实例时,根据具体需求定制数值和时间类型数据的处理方式,增加了框架的灵活性和适应性。

二、代码分析

package io.debezium.connector.db2;

import java.math.BigDecimal;
import java.sql.Types;
import java.time.ZoneOffset;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;

/**
 * Conversion of DB2 specific datatypes.
 * 
 * @author Jiri Pechanec, Peter Urbanetz
 *
 */
// 转换DB2数据库特有的数据类型。
public class Db2ValueConverters extends JdbcValueConverters {

    public Db2ValueConverters() {
        // 默认构造函数,调用父类默认构造函数。
    }

    // 构造函数,允许用户在创建Db2ValueConverters实例时指定DecimalMode和TemporalPrecisionMode。
    public Db2ValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode) {
        super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, null, null, null);
        // 调用父类构造函数,设置默认时区为UTC,以及其他默认配置。
    }

    @Override
    public SchemaBuilder schemaBuilder(Column column) {
        // 重写schemaBuilder方法,用于构建Kafka Connect的Schema。
        switch (column.jdbcType()) {
            // 如果是TINYINT类型,由于其值域为0-255,故使用int16类型表示。
            case Types.TINYINT:
                return SchemaBuilder.int16();
            // 如果是OTHER类型且类型名为DECFLOAT,调用decfloatSchema方法构建Schema。
            case Types.OTHER:
                if (matches(column.typeName().toUpperCase(), "DECFLOAT")) {
                    return decfloatSchema(column);
                }
            // 其他类型,调用父类的schemaBuilder方法。
            default:
                return super.schemaBuilder(column);
        }
    }

    @Override
    public ValueConverter converter(Column column, Field fieldDefn) {
        // 重写converter方法,用于创建值转换器。
        switch (column.jdbcType()) {
            // TINYINT类型,创建转换器,将TINYINT转换为short类型。
            case Types.TINYINT:
                return (data) -> convertSmallInt(column, fieldDefn, data);
            // OTHER类型且类型名为DECFLOAT,创建转换器,调用convertDecfloat方法进行转换。
            case Types.OTHER:
                if (matches(column.typeName().toUpperCase(), "DECFLOAT")) {
                    return (data) -> convertDecfloat(column, fieldDefn, data, decimalMode);
                }
            // 其他类型,调用父类的converter方法。
            default:
                return super.converter(column, fieldDefn);
        }
    }

    protected Object convertDecfloat(Column column, Field fieldDefn, Object data, DecimalMode mode) {
        // 处理DECFLOAT类型数据的转换。
        SpecialValueDecimal value;
        BigDecimal newDecimal;

        // 如果data已经是SpecialValueDecimal类型,直接使用。
        if (data instanceof SpecialValueDecimal) {
            value = (SpecialValueDecimal) data;

            if (value.getDecimalValue().isEmpty()) {
                // 如果DecimalValue为空,创建一个新的SpecialValueDecimal。
                return SpecialValueDecimal.fromLogical(value, mode, column.name());
            }
        } else {
            // 否则,尝试将data转换为BigDecimal。
            final Object o = toBigDecimal(column, fieldDefn, data);

            if (!(o instanceof BigDecimal)) {
                // 如果转换失败,直接返回原数据。
                return o;
            }
            value = new SpecialValueDecimal((BigDecimal) o);
        }

        // 调整BigDecimal的精度。
        newDecimal = withScaleAdjustedIfNeeded(column, value.getDecimalValue().get());

        if (mode == DecimalMode.PRECISE) {
            // 如果DecimalMode为PRECISE,进一步处理BigDecimal的精度。
            newDecimal = newDecimal.stripTrailingZeros();
            if (newDecimal.scale() < 0) {
                newDecimal = newDecimal.setScale(0);
            }

            // 创建VariableScaleDecimal或SpecialValueDecimal。
            return VariableScaleDecimal.fromLogical(fieldDefn.schema(), new SpecialValueDecimal(newDecimal));
        }

        // 创建SpecialValueDecimal。
        return SpecialValueDecimal.fromLogical(new SpecialValueDecimal(newDecimal), mode, column.name());
    }

    // 返回DB2列的时间精度,默认为7。
    @Override
    protected int getTimePrecision(Column column) {
        return column.scale().get();
    }

    // 空实现,用于转换带时区的时间戳,但在这个类中没有具体实现。
    protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
        return super.convertTimestampWithZone(column, fieldDefn, data);
    }

    // 检查列的类型名称是否完全匹配或以指定前缀开始。
    protected static boolean matches(String upperCaseTypeName, String upperCaseMatch) {
        if (upperCaseTypeName == null) {
            return false;
        }
        return upperCaseMatch.equals(upperCaseTypeName) || upperCaseTypeName.startsWith(upperCaseMatch + "(");
    }

    // 为DECFLOAT类型构建Schema。
    private SchemaBuilder decfloatSchema(Column column) {
        if (decimalMode == DecimalMode.PRECISE) {
            return VariableScaleDecimal.builder();
        }
        return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElse(0));
    }
}


总结

提示:Db2ValueConverters 类在Debezium框架中承担着桥梁的角色,它不仅负责将DB2数据库中的数据类型映射到Kafka Connect和Debezium支持的数据类型,还提供了详细的转换逻辑,确保数据在转换过程中不会丢失精度或产生错误。通过精心设计的数据类型映射和转换规则,Db2ValueConverters保证了从DB2数据库捕获的变更事件能够被下游系统准确无误地理解和消费。


网站公告

今日签到

点亮在社区的每一天
去签到