kettle——学习

发布于:2025-09-12 ⋅ 阅读:(23) ⋅ 点赞:(0)

一、官方文档

最新的 Pentaho 数据集成(又名 Kettle)文档

国产服务器(aarch64) Kettle 修改

文档

二、安装

因为是Java 开发的,所以安装方式都是通用的,解压应用包就可以了,不区分win和linux版本。win运行.bat,Linux 运行.sh。

注意: jdk版本。

报这个错误就是jdk版本太低
在这里插入图片描述

三、快速生成迁移脚本

简单迁移,适合表结构没有变动的同构数据库迁移。
在这里插入图片描述

四、动态获取表,然后迁移数据

参考B站这个视频实现。kettle数据整库迁移方案

五、循环

这种连接头尾相连的就是循环,一般搭配一个判断使用。如果不判断,会一直循环。
在这里插入图片描述
注意: kettle 本身就是分批获取数据的,不用分页查询也是可以的,可以一次性查询全部数据,就算数据量有一亿也不会有问题,内存不会溢出

(一)整体预览

在这里插入图片描述

(二)设置初始值

在这里插入图片描述

脚本后面的:ture; 是控制流程的,如果是ture;流向成功,如下:在这里插入图片描述
如果是false; 流向失败,如下:
在这里插入图片描述

(三)循环判断

在这里插入图片描述

(四)核心业务

在这里插入图片描述

(五)日志,可选

在这里插入图片描述

(六)变量控制

里面可以用一些JS函数,应该是通用的。
在这里插入图片描述

六、转换中JS脚本的使用

在这里插入图片描述
使用非常简单,需要用什么直接双击就能用了。如果不知道方法什么意思,怎么用,鼠标右键有示例
在这里插入图片描述
在这里插入图片描述

七、转换-过滤记录

Kettle 过滤记录

正则表达式匹配的一个例子:
匹配日期
在这里插入图片描述

八、Java 脚本使用

技巧:先把kettle里面常用的jar导入开发工具里面,方便查看里面的方法。不然会有很多方法你不知道怎么调用。
在这里插入图片描述

导入idea这种开发工具,然后在里面查看方法、类

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

  1. data 等价于 RowMeta;
  2. 获取输入Java 脚本组件的表数据和表头:表数据Object[] inputRow = getRow()。表头(表字段等信息):RowMetaInterface inputRowMeta = getInputRowMeta();
  3. 更改数据表结构和数据:克隆输入元数据作为输出元数据RowMetaInterface outputRowMeta = inputRowMeta.clone();
    改变putRow(outputRowMeta, outputRow);
  1. 新增字段的简单例子
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.logging.LogChannel;
import cn.hutool.core.util.IdUtil;


private RowMetaInterface outputRowMeta; // 输出行元数据
private boolean isFirstRow = true; // 标记是否为第一行,用于初始化元数据
private String newFieldName = "ID"; // 新字段名
private int newFieldType = ValueMetaInterface.TYPE_STRING; // 新字段类型(字符串)
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  // 获取输入行
    Object[] inputRow = getRow();
    if (inputRow == null) {
        setOutputDone();
        return false;
    }

    // 初始化元数据(仅第一次执行)
    if (isFirstRow) {
        isFirstRow = false;
        // 获取输入元数据
        RowMetaInterface inputRowMeta = getInputRowMeta();
        if (inputRowMeta == null) {
            logError("输入行元数据为空,无法添加新字段");
            return false;
        }
        
        // 克隆输入元数据作为输出元数据
        outputRowMeta = inputRowMeta.clone();
        
        // 定义新字段元数据
        ValueMetaInterface newFieldMeta = new ValueMetaString("ID");
        newFieldMeta.setLength(36);
        
        // 添加新字段到元数据
        outputRowMeta.addValueMeta(newFieldMeta);
        logBasic("成功添加新字段: " + newFieldName + ",输出字段总数: " + outputRowMeta.size()+",原字段总数:"+inputRowMeta.size());

    }

    // 构建输出行
    int inputRowLength = inputRow.length;
    Object[] outputRow = new Object[inputRowLength + 1];

    
    // 复制原始字段
    System.arraycopy(inputRow, 0, outputRow, 0, inputRowLength);

    int originalIdIndex = outputRowMeta.indexOfValue("ID");

    String uuid = IdUtil.getSnowflake(0, 0).nextIdStr();
	outputRow[originalIdIndex] = uuid; // 注意值的类型需与 newFieldType 匹配
   
    // 输出新行outputRowMeta表头,outputRow 数据
    putRow(outputRowMeta, outputRow);

	

    return true;
}
    
  1. 如果只需要简单遍历里面的字段,然后处理一下
    双击里面就有简单的示例
    在这里插入图片描述
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.logging.LogChannel;
import cn.hutool.core.util.IdUtil;
import java.util.Date;
import org.pentaho.di.core.row.value.ValueMetaDate;

private RowMetaInterface outputRowMeta; // 输出行元数据
private boolean isFirstRow = true; // 标记是否为第一行,用于初始化元数据

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  // 获取输入行
    Object[] inputRow = getRow();
    if (inputRow == null) {
        setOutputDone();
        return false;
    }

    // 初始化元数据(仅第一次执行)
    if (isFirstRow) {
        isFirstRow = false;
        // 获取输入元数据
        RowMetaInterface inputRowMeta = getInputRowMeta();
        if (inputRowMeta == null) {
            logError("输入行元数据为空,无法添加新字段");
            return false;
        }
        
        // 克隆输入元数据作为输出元数据
        outputRowMeta = inputRowMeta.clone();

        int originalIdIndex = outputRowMeta.indexOfValue("ID");
        int originalUuidIndex = outputRowMeta.indexOfValue("UUID");
        int originalCreateTimeIndex = outputRowMeta.indexOfValue("CREATE_TIME");
        int originalUpdateTimeIndex = outputRowMeta.indexOfValue("UPDATE_TIME");
        
		if(originalIdIndex==-1){
			// 定义新字段元数据
        	ValueMetaInterface ID = new ValueMetaString("ID");
        	ID.setLength(20);
        
        	// 添加新字段到元数据
        	outputRowMeta.addValueMeta(ID);
		}
		if(originalUuidIndex==-1){
			ValueMetaInterface UUID = new ValueMetaString("UUID");
        	UUID.setLength(20);
        
        	// 添加新字段到元数据
        	outputRowMeta.addValueMeta(UUID);
		}        
		if(originalCreateTimeIndex==-1){
			ValueMetaInterface createTime = new ValueMetaString("CREATE_TIME");
        	
        
        	// 添加新字段到元数据
        	outputRowMeta.addValueMeta(createTime);
		}        
		if(originalUpdateTimeIndex==-1){
			ValueMetaInterface updateTime = new ValueMetaDate("UPDATE_TIME");
        	
        
        	// 添加新字段到元数据
        	outputRowMeta.addValueMeta(updateTime);
		}        


    }
   // LogChannel.GENERAL.logBasic("字段个数:"+outputRowMeta.size());
	
    // 构建输出行
    int inputRowLength = outputRowMeta.size();
    Object[] outputRow = new Object[inputRowLength];

    
    // 复制原始字段
    System.arraycopy(inputRow, 0, outputRow, 0, inputRowLength);

        int originalIdIndex = outputRowMeta.indexOfValue("ID");
        if(originalIdIndex!=-1){
            Object fieldIdValue = outputRow[originalIdIndex];
            if (fieldIdValue == null) {
                String uuid = IdUtil.getSnowflake(0, 0).nextIdStr();
                outputRow[originalIdIndex] = uuid;
            }
        }

        int originalUuidIndex = outputRowMeta.indexOfValue("UUID");
        if(originalIdIndex!=-1){
            Object fieldUuidValue = outputRow[originalUuidIndex];
            if (fieldUuidValue == null) {
                String uuid = IdUtil.getSnowflake(0, 0).nextIdStr();
                outputRow[originalUuidIndex] = uuid;
            }
        }
        
        int originalCreateTimeIndex = outputRowMeta.indexOfValue("CREATE_TIME");
        if(originalIdIndex!=-1){
            Object fieldCreateTimeValue = outputRow[originalCreateTimeIndex];
            if (fieldCreateTimeValue == null) {
                outputRow[originalCreateTimeIndex] = new Date();
            }
        }
        
        int originalUpdateTimeIndex = outputRowMeta.indexOfValue("UPDATE_TIME");
        if(originalIdIndex!=-1){
            Object fieldUpdateTimeValue = outputRow[originalUpdateTimeIndex];
            if (fieldUpdateTimeValue == null) {
                outputRow[originalUpdateTimeIndex] = new Date();
            }
        }
   
    // 输出新行
    putRow(outputRowMeta, outputRow);

	

    return true;
}
    

网站公告

今日签到

点亮在社区的每一天
去签到