Datax学习

发布于:2024-06-28 ⋅ 阅读:(52) ⋅ 点赞:(0)

DataX学习

DataX3.0概览

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。学习可见官网资料(https://github.com/alibaba/DataX)。

设计理念:

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX框架设计

image.png

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX插件体系

image.png image.png

DataX核心架构

datax_arch

DataX六大核心优势

可靠的数据质量监控

丰富的数据转换功能

精准的速度控制

强劲的同步性能

健壮的容错机制

极简的使用体验

DataX与sqoop对比

功能 DataX Sqoop
运行模式 单进程多线程 MR
分布式 不支持,可以通过调度系统规避 支持
流控 有流控功能 需要定制
统计信息 已有一些统计,上报需定制 没有,分布式的数据收集不方便
数据校验 在core部分有校验功能 没有,分布式的数据收集不方便
监控 需要定制 需要定制

DataX部署

1.下载datax jar包

地址:(https://github.com/alibaba/DataX?tab=readme-ov-file)

2.上传jar包 解压并配置环境变量

tar -zxvf datax.tar.gz 
#添加环境变量
DATAX_HOME=/usr/local/soft/datax
export PATH=$DATAX_HOME/bin:$PATH
#生效
source /etc/profile

3.自检

#给bin目录下的datax.py赋予执行权限
chmod +x datax.py 
#查看是否成功安装
python ./bin/datax.py ./job/job.json

DataX使用

在bigdata30目录下创建datax_jsons文件夹用于存储脚本

mkdir datax_jsons

小例子:创建stream2stream.json

{
    "job": {
        "setting": {
            "speed": {
                "channel":1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "我是",
                                "type": "string"
                            },
                            {
                                "value": "大",
                                "type": "string"
                            },
                            {
                                "value": "帅",
                                "type": "string"
                            },
                            {
                                "value": "哥",
                                "type": "string"
                            },
                            {
                                "value": "哈哈哈",
                                "type": "string"
                            }
                        ],
                        "sliceRecordCount": 10 // 打印次数
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true,   //是否打印
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}

运行 datax.py stream2stream.json

结果:

image.png

DataX配置文件格式

编写json文件的步骤:

1、根据你确定的reader和writer来使用命令生成对应的模板

2、去github上找到对应参数值的写法,主要是看参数是否是必须项,有什么影响

如果你不知道怎么写模版 可以去官网查看 或者使用如下命名查看DataX配置文件模板。

datax.py -r mysqlreader -w hdfswriter #以mysql-->hdfs为例

生成模版:

解读:json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [], 
                        "connection": [
                            {
                                "jdbcUrl": [], 
                                "table": []
                            }
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

然后,可以根据官网给出的样例进行更改:

image.png

在参数说明中还能了解哪些参数必选和默认值等内容:

image.png

类型转换

image.png

mysql2hdfs

更改模版:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "CId",
                            "Cname",
                            "Tid"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=UTF-8&useSSL=false"], 
                                "table": ["Course"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root" 
                        // where不是必选项 直接删掉
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "CId",
                                "type": "String"
                            },
                            {
                                "name": "Cname",
                                "type": "String"
                            },
                            {
                                "name": "Tid",
                                "type": "String"
                            }
                        ],  
                        //"compress": "NONE",      // 是否压缩   不是必选项删掉
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", // 列分隔符
                        "fileName": "Course", 
                        "fileType": "text",   // 文件类型
                        "path": "/bigdata30/dataxout1", 
                        "writeMode": "append"   //hdfswriter写入前数据清理处理模式
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行报错 : 路径不存在 说明要提前创建路径

hdfs dfs -mkdir /bigdata30/dataxout1

运行:

datax.py mysql2hdfs.json

成功 hdfs出现文件:

image.png

查看文件内容:

image.png

mysql2mysql

场景:将Y1数据库中的Course表同步到datax1数据库中

生成 MySQL 到 MySQL 同步的模板:

datax.py -r mysqlreader -w mysqlwriter

查看官网mysqlwrite模版,写出自己的模版

在datax数据库中新建Course表

create table if not exists datax1.Course(
    CId varchar(10),
    Cname nvarchar(10),
    TId varchar(10)
)
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "CId",
                            "Cname",
                            "Tid"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=UTF-8&useSSL=false"], 
                                "table": ["Course"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root" 
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "CId",
                            "Cname",
                            "Tid"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://master:3306/datax1?useUnicode=true&characterEncoding=UTF-8&useSSL=false", 
                                "table": ["Course"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行

datax.py mysql2mysql.json 

结果 在datax数据库中的Course表中出现数据:

image.png

mysql2hive

场景:

将mysql中的Studnet表数据同步到hive中的Students表中

查看模版

datax.py -r mysqlreader -w hdfswriter

在hive中建表

create table bigdata30.Students(
    Sid STRING,
    Sname STRING,
    Sage DATE,
    Ssex STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

编写配置文件mysql2hive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "SId",
                            "Sname",
                            "Sage",
                            "Ssex"
                
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=UTF-8&useSSL=false"], 
                                "table": ["Student"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root" 
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "SId",
                                "type": "string"
                            },
                            {
                                "name": "Sname",
                                "type": "string"
                            },
                            {
                                "name": "Sage",
                                "type": "Date"
                            },
                            {
                                "name": "Ssex",
                                "type": "string"
                            }
                        ], 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",",
                        "fileName": "students", 
                        "fileType": "text",  
                        "path": "/user/hive/warehouse/bigdata30.db/students", 
                        "writeMode": "append"  
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行

datax.py mysql2hive.json

hive students表中出现数据

image.png

hdfs2mysql == hive2mysql

场景:

将hive中的t_movie1表中的数据传入mysql中

查看模版

datax.py -r hdfsreader -w mysqlwriter

根据官方文档编写配置文件

DataX 内部类型 Hive表 数据类型
Long TINYINT,SMALLINT,INT,BIGINT
Double FLOAT,DOUBLE
String String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY
Boolean BOOLEAN
Date Date,TIMESTAMP

先在mysq建一张输出表

create table if not exists datax1.datax_out1(
    id bigint,
    name varchar(100),
    types varchar(100),
    shengfen varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;

编辑配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader", 
                    "parameter": {
                        "column": [
                            {
                                "index": 0,
                                "type": "long"
                               },
                               {
                                "index": 1,
                                "type": "string"
                               },                           
                               {
                                "index": 2,
                                "type": "string"
                               },                           
                               {
                                "type": "string",    // 给一个定值
                                "value":"学生"
                               }
                        ], 
                        "defaultFS": "hdfs://master:9000", 
                        "encoding": "UTF-8", 
                        "fieldDelimiter": ",", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/bigdata30.db/t_movie1/*"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "types",
                            "shengfen"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://master:3306/datax1?useUnicode=true&characterEncoding=UTF-8&useSSL=false", 
                                "table": ["datax_out1"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行 mysql中的datax_out1表中出现数据

image.png

mysql2hbase

场景

将mysql中的datax_out1表数据写入hbase中的datax_tb1中

查看模版

datax.py -r mysqlreader -w hbase20xsqlwriter  #hbase的版本是2.2.7

HBase20xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码,所以,直接使用HBaseAPI进行写入会面临手工数据转换的问题,麻烦且易错。本插件提供了SQL方式直接向Phoenix表写入数据。

  • 仅支持通过Phoenix QeuryServer导入数据,因此您Phoenix必须启动QueryServer服务才能使用本插件

所以我们不使用这种,使用Hbase11XWriter,根据官方文档挑选出必选的参数

官方文档:hbase11xwriter/doc/hbase11xwriter.md

在hbase中新建表

create 'datax_tb1','info'

编写配置文件 mysql2hbase.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "types",
                            "shengfen"
                
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/datax1?useUnicode=true&characterEncoding=UTF-8&useSSL=false"], 
                                "table": ["datax_out1"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root" 
                    }
                },
                "writer": {
                    "name": "hbase11xwriter",
                    "parameter": {
                      "hbaseConfig": {
                        "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
                      },
                      "table": "datax_tb1",
                      "mode": "normal",
                      "rowkeyColumn": [
                          {
                            "index":0,
                            "type":"long"
                          }
                      ],
                      "column": [
                        {
                          "index":1,
                          "name": "info:name",
                          "type": "string"
                        },
                        {
                          "index":2,
                          "name": "info:types",
                          "type": "string"
                        },
                        {
                          "index":3,
                          "name": "info:shengfen",
                          "type": "string"
                        }
                      ],
                      "encoding": "utf-8"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行 扫描datax_tb1表:

image.png

hbase2mysql

场景

将hbase中的datax_tb1表数据写入mysql中的datax_out2中

查看模版

datax.py -r hbase11xreader -w mysqlwriter

官方文档:hbase11xreader/doc/hbase11xreader.md

先在mysql中建表:

create table if not exists datax1.datax_out2(
    id bigint,
    name varchar(100),
    types varchar(100),
    shengfen varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;

编写配置文件 hbase2mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hbase11xreader", 
                    "parameter": {
                        "column": [
                            {
                                "name": "rowkey",
                                "type": "long"
                            },
                            {
                                "name": "info: name",
                                "type": "string"
                            },
                            {
                                "name": "info: types",
                                "type": "string"
                            },
                            {
                                "name": "info: shengfen",
                                "type": "string"
                            }
                        ], 
                        "encoding": "utf-8", 
                        "hbaseConfig": {"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"}, 
                        "mode": "normal",                       
                        "table": "datax_tb1"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "Id",
                            "name",
                            "types",
                            "shengfen"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://master:3306/datax1?useUnicode=true&characterEncoding=UTF-8&useSSL=false", 
                                "table": ["datax_out2"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行 mysql中的datax_out2中出现数据:

image.png

mysql增量同步到hive

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "splitPk": "age",
                        "where": "id > 9",   //条件筛选
                        "connection": [
                            {
                                "table": [
                                    "student"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/datax1"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://master01:9000",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/bigdata30.db/dataxstudent",
                        "fileName": "student",
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "gender",
                                "type": "string"
                            },
                            {
                                "name": "clazz",
                                "type": "string"
                            },
                            {
                                "name": "last_mod",
                                "type": "string"
                            }
                        ],
                        "writeMode": "append",   // 增量导入   全量导入  truncate
                        "fieldDelimiter": ","
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}