sqoop总结

发布于:2022-12-24 ⋅ 阅读:(1020) ⋅ 点赞:(0)

一、sqoop概述和安装

sqoop简介 

1. sqoop是apache基金会的一个顶级项目(已经退役,)
2. sqoop是一款可以将数据在RDBMS和Hadoop生态中导入导出的项目
3. sqoop最后一个版本是1.4.7

apache有一个专门管理退役软件的仓库。
退役原因: 
     1.  软件开发的不够好,弃用了。
     2.  没什么提升空间了,已经趋近于完美了,不再更新了,apache对这样的项目,如果发现一两年左右都没有更新时,就会开会决定移动到退役仓库中。

sqoop应用场景 

sqoop原理

1.  sqoop采用了MapReduce的N个MapTask 来进行导入导出数据。 并行执行,以便提高效率
2.  在导入时,sqoop会逐行读取RDBMS中的数据,并落地到相应的Hadoop生态中
3.  真正导入数据前,会根据指定的切分字段,先计算数据的范围,然后使用切分器将数据的值范围按照MapTask的数量平分,由MapTask开始导入数据。(注意:有可能某个MapTask要处理的数据范围是空的) 

sqoop安装 

步骤1) 上传、解压、更名、配置环境变量、生效、验证

[root@test01 ~]# tar -zxvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz  -C /usr/local/
[root@test01 ~]# cd /usr/local/
[root@test01 local]# mv sqoop-1.4.7.bin__hadoop-2.6.0/ sqoop
[root@test01 local]# vim /etc/profile
[root@test01 local]# source /etc/profile
[root@test01 local]# sqoop-version

步骤2)配置sqoop的环境脚本文件

[root@test01 local]# cd sqoop/conf/
[root@test01 conf]# cp sqoop-env-template.sh sqoop-env.sh
[root@test01 conf]# vim sqoop-env.sh

...........省略.............
export HADOOP_COMMON_HOME=/usr/local/hadoop

#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/usr/local/hadoop

#set the path to where bin/hbase is available
export HBASE_HOME=/usr/local/hbase

#Set the path to where bin/hive is available
export HIVE_HOME=/usr/local/hive

#Set the path for where zookeper config dir is
export ZOOCFGDIR=/usr/local/zookeeper/conf

步骤3)导入mysql驱动到sqoop的lib目录

1. 如果连接的mysql是5.7版本以下, 可以导入mysql-connector-java-5.1.xxx-bin.jar
2. 如果连接的mysql是8.0版本以上, 需要导入mysql-connector-java-8.0.xxx-bin.jar

二、常用指令介绍
2.1 查看常用指令 

[root@test01 lib]# sqoop help

Available commands:
  codegen            Generate code to interact with database records
  create-hive-table  Import a table definition into Hive
  eval               Evaluate a SQL statement and display the results
  export             Export an HDFS directory to a database table
  help               List available commands
  import             Import a table from a database to HDFS
  import-all-tables  Import tables from a database to HDFS
  import-mainframe   Import datasets from a mainframe server to HDFS
  job                Work with saved jobs
  list-databases     List available databases on a server
  list-tables        List available tables in a database
  merge              Merge results of incremental imports
  metastore          Run a standalone Sqoop metastore
  version            Display version information

2.2 指令帮助信息

[root@test01 ~]# sqoop help list-databases
Common arguments:
   --connect <jdbc-uri>                    用于指定连接jdbc的url
   --username <username>                   用户名
   --password <password>                   密码
   ................省略......................

2.3去官网查询

Sqoop User Guide (v1.4.7)

三、sqoop的基本用法(重点)

3.1 查询数据库和表

连接mysql,一定要远程授权

5.7版本:
grant all privileges on *.* to root@'%' identified by '远程密码' with grant option
5.8版本:
百度 

3.1.1 查看数据库:sqoop list-databases 

1) 连接windows上的mysql8.0的写法:
sqoop list-databases \
--connect jdbc:mysql://10.20.152.47:3306?serverTimezone=UTC \
--username root \
--password mmforu
2) 连接qianfeng03上的mysql5.7的写法

sqoop list-databases \
--connect jdbc:mysql://qianfeng03:3306 \
--username root \
--password @Mmforu45

注意:查看所有库,url只需要写到3306

3.1.2 查看表

sqoop list-tables \
--connect jdbc:mysql://test03:3306/hive \
--username root \
--password @Mmforu45

注意:要指定一个具体的database进行查看

3.2 Sqoop的Import

sqoop将数据从RDBMS中传输到Hadoop生态中,叫导入,即import

3.2.1 数据准备

create database sz2103 default character set utf8;
use sz2103;
-- emp表               有主键约束
DROP TABLE IF EXISTS `emp`;
CREATE TABLE `emp` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `emp` VALUES ('7200', 'SMITH', 'CLERK', '7902', '1980-12-17', '800', null, '20');
INSERT INTO `emp` VALUES ('7499', 'ALLEN', 'SALESMAN', '7698', '1981-02-20', '1600', '300', '30');
INSERT INTO `emp` VALUES ('7509', 'TOM', 'SALESMAN', '7698', '1983-02-20', '1300', '400', '20');
INSERT INTO `emp` VALUES ('7521', 'WARD', 'SALESMAN', '7698', '1981-02-22', '1250', '500', '30');
INSERT INTO `emp` VALUES ('7599', 'JONES', 'MANAGER', '7800', '1981-04-02', '2975', null, '20');
INSERT INTO `emp` VALUES ('7600', 'MARTIN', 'SALESMAN', '7698', '1981-09-28', '1250', '1400', '30');
INSERT INTO `emp` VALUES ('7698', 'BLAKE', 'MANAGER', '7800', '1981-05-01', '2850', null, '30');
INSERT INTO `emp` VALUES ('7782', 'CLARK', 'MANAGER', '7800', '1981-06-09', '2450', null, '10');
INSERT INTO `emp` VALUES ('7799', 'SCOTT', 'ANALYST', '7599', '1987-04-19', '3000', null, '20');
INSERT INTO `emp` VALUES ('7800', 'KING', 'PRESIDENT', null, '1981-11-17', '5000', null, '10');
INSERT INTO `emp` VALUES ('7844', 'TURNER', 'SALESMAN','7698', '1981-09-08', '1500', '0', '30');
INSERT INTO `emp` VALUES ('7876', 'ADAMS', 'CLERK', '7799', '1987-05-23', '1100', null, '20');
INSERT INTO `emp` VALUES ('7900', 'JAMES', 'CLERK', '7698', '1981-12-03', '950', null, '30');
INSERT INTO `emp` VALUES ('7902', 'FORD', 'ANALYST', '7599', '1981-12-03', '3000', null, '20');
INSERT INTO `emp` VALUES ('7934', 'MILLER', 'CLERK', '7782', '1982-01-23', '1300', null, '10');
INSERT INTO `emp` VALUES ('8000', 'superman', null, '7782', '1982-01-23', '1300', null, '10');
select * from emp;

-- 学生表  没有主键
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `s_id` varchar(20) NOT NULL,
  `s_name` varchar(20) NOT NULL DEFAULT '',
  `s_birth` varchar(20) NOT NULL DEFAULT '',
  `s_sex` varchar(10) NOT NULL DEFAULT ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `student` VALUES 
('01','赵雷','1990-01-01','男'),
('02','钱电','1990-12-21','男'),
('03','孙风','1990-05-20','男'),
('04','李云','1990-08-06','男'),
('05','周梅','1991-12-01','女'),
('06','吴兰','1992-03-01','女'),
('07','郑竹','1989-07-01','女'),
('08','王菊','1990-01-20','女'),
('09','张飞','1990-9-25','男'),
('10','刘备','1990-01-25','男'),
('11','关羽','1990-01-25','男');
select * from student;

3.2.2 mysql->hdfs


案例1)导入有主键的表到hdfs
 

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
-m 3


注意:再次导入时,会报目录已经存在,可以使用--delete-target-dir

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
-m 3 \
--delete-target-dir

总结

1. 默认分隔符是逗号

2. 如何切分的?

           计算主键字段的最大和最小值,然后差值/MapTask的数量,然后平均

案例2)导入无主键的表到hdfs上 

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table student \
--target-dir /sqoop/student \
-m 3 \
--delete-target-dir

以上内容会报错
解决方式如下:
sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table student \
--target-dir /sqoop/student \
-m 1 \
--delete-target-dir

参数:--delete-target-dir    用于删除已经存在的目录


第二种解决方式如下:
sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table student \
--target-dir /sqoop/student \
--split-by s_id \
--delete-target-dir

注意事项:如果表没有主建约束的字段

方式1: 指定--split-by参数

方式2: MapTask的数量只能是1

注意: 如果报以下错误,需要将提示中的属性和值添加到导入语句中

ERROR tool.ImportTool: Import failed: java.io.IOException: Generating splits for a textual index column allowed only in case of "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" property passed as a parameter
sqoop import \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table student \
--target-dir /sqoop/student \
--split-by s_id \
--delete-target-dir

 案例3)指定列分隔符进行导入

参数:–fields-terminated-by 用于指定列分隔符

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
-m 3 \
--delete-target-dir

案例4)测试columns 

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--columns "empno,ename" \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
-m 3 \
--delete-target-dir 

案例5)测试where

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--columns "empno,ename,deptno" \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
-m 3 \
--delete-target-dir \
--where 'deptno = 10 or deptno = 30'

注意:

参数--where后的条件建议 加上引号

案例6)测试-e|–query 

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select empno,mgr,deptno from emp where \$CONDITIONS and deptno = 10" \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
--split-by 'empno' \
-m 3 \
--delete-target-dir \

注意事项:

1. 当使用--query|-e 时, 一定不能用--table 
2. 当使用--query|-e 时, 一定要使用--split-by
   注意:指定的字段必须是select子句中的某一个字段
3. 当使用--query|-e 时,  在from tablename 后一定要在where后带上$CONDITIONS关键词, 
   注意:如果是双引号,$前要添加转义字符
4. 当使用--query|-e 时, 参数where失效,所以没有必要指定
5. 当使用--query|-e 时, 参数columns的作用,是对select中的字段进行筛选。  但是,用的不多,没有必要在columns里进行筛选。

案例7)测试字符串类型的null处理和非字符串类型的null处理

参数:–null-string --null-non-string 这两个参数是指将mysql里的null值 导入到hdfs上以什么的样式显示。

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
--split-by 'empno' \
-m 3 \
--delete-target-dir \
--null-string '\\N' \
--null-non-string 'null' 

注意: 在hive表中,hive的null使用\N表示

3.2.3 mysql–>hive 

案例1)导入hive中 

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--split-by 'empno' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--hive-table 'sz2103.emp_1'

不需要提前在hive中创建表。

注意的问题如下:

1. sqoop导入hive的原理是:先导入到hdfs上 ,然后再使用load data指令上传到hive的表目录下
2. sqoop指令会在所在机器上临时运行hive客户端使用load进行加载数据,所以要注意客户端的问题,是本地还是远程。
3. 如果报以下错误,说明sqoop找不到hive的common包
      java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf
    解决办法如下:
      1. 将hive的lib目录下的hive-common-2.xxxx.jar 拷贝到sqoop的lib目录

案例2)测试–hive-overwrite 

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--split-by 'empno' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--hive-overwrite \
--hive-table 'sz2103.emp_1'

案例3)指定分隔符

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--split-by 'empno' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--hive-overwrite \
--hive-table 'sz2103.emp_1' \
--fields-terminated-by '\t'

结论:

1. 添加--hive-overwrite会覆盖原有的hive的表数据,反之没有此参数,表示追加数据。
2. 必须要有--hive-import选项。
3. 从mysql中导入到hive中时,如果不指定分隔符,默认使用的是^A,
4. 可以使用--fields-terminated-by指定分隔符,  注意,该表要么不存在,要么表的分隔符和你要指定的分隔符一致。

案例4) 测试–create-hive-table 

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--split-by 'empno' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--hive-overwrite \
--hive-table 'sz2103.emp_11' \
--fields-terminated-by '\t' \
--create-hive-table

结论:

如果表提前存在,就会报错

案例5)分区导入hive 

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--create-hive-table \
--hive-overwrite \
--hive-table 'sz2103.emp_12' \
--fields-terminated-by '\t' \
--hive-partition-key "year" \
--hive-partition-value "10"

注意事项

1. 可以提前创建分区表,方便指定分区字段

2. 如果没有提前创建分区表,导入语句会主动创建分区表。分区字段不要是mysql里查询的字段。

3.2.4 mysql->hbase

案例演示

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query 'select empno,ename,deptno from emp where $CONDITIONS' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hbase-table 'myns:emp1001' \
--column-family 'f1' \
--split-by 'empno' \
--hbase-row-key "empno,deptno"

结论:

导入时: 可以使用--query和--split-by的组合形式
        也可以使用--table
  
1. rowkey是由--hbase-row-key指定的,如果没有指定,则使用表的主键或者--split-by指定的切分字段充当.
2. 也可以使用--hbase-row-key指定复合主键,字段是用逗号分开。复合主键是由下划线拼接而成。
3. 大小写问题:
      如果指定的是--query|-e,
       --split-by和--hbase-row-key的字段的大小写都要select子句中保持一致。
       如果是--table 或者是select *  都应该大写。

 

3.3 Sqoop的export

Table 29. Export control arguments: 

3.3.1 hdfs->mysql

注意:导出时,mysql中的表必须提前创建。

1)创建一个带有主键约束的表emp_1,字段数与hdfs上的一致
drop table `emp_1`;
CREATE TABLE `emp_1` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2)创建一个带有主键约束的表emp_2,字段数少于hdfs上的列数
CREATE TABLE `emp_2` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3)创建一个带有主键约束的表emp_3,字段数多于hdfs上的列数
CREATE TABLE `emp_3` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
`desc` varchar(100),   
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

4)创建一个无主键的表
CREATE TABLE `emp_4` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
`desc` varchar(100)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 案例1)有主键字段、字段数相同

注意类型是否匹配,字符串转成int,要注意是不是纯数字字符串。

[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_1 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'

注意事项:

1. 分隔符: hdfs上的数据的分隔符,必须在导出语句时,指定出来

2. 类型能否隐式转换, 隐式转换不了就会报错 

案例2)有主键字段,字段数少于hdfs上列数

[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_2 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'

案例3)代有主键字段的表,字段数多于hdfs上列数

[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_3 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'


[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_1 \
--columns "empno,job,ename,deptno" \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'

总结:

通过案例2和3 总结如下:
--1. 导出时,如果不指定columns,是按照hdfs上的顺序给建表期间指定的字段顺序一一赋值。与mysql的字段的个数无关。注意mysql字段的not null约束情况。
--2. 如果指定了columns参数,也是按照顺序将hdfs上的列依次赋值给columns指定的字段。  注意数据类型的隐式转换。

案例4)导出到无主键约束的表中

[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_4 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'

总结:

mysql的表没有主键约束,更方便导出到msyql里,如果mysql中有主键,则要注意主键字段的重复值问题。 

案例5)测试类型没有对应上

DROP TABLE IF EXISTS `emp_5`;
CREATE TABLE `emp_5` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
说明:emp_5的第五个字段sal是int类型,但是hdfs上的第五列是日期格式“yyyy-MM-dd”。

sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_5 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'

注意:一定会报错

结论:

1. 导出时,mysql中的表必须提前创建。
2. 导出时,字段之间的切分符号默认是逗号。如果hdfs上的文件不是逗号分隔符,需要使用--fields-terminated-by或--input-fields-terminated-by参数指定分隔符
3. 导出时,是按照hdfs上文件从左到右的顺序给mysql表的字段赋值
4. 导出时,mysql的表的字段数与hdfs上的列数可以不相同
5. 导出时,字段类型要一致或者可以隐式转换。
6. 带有主键约束的mysql表,要注意导出的数据的主键约束的情况,不能重复 
7. 使用--columns给mysql中的某些字段赋值时,没有包含在参数中的字段要么有默认值,要么不能设置not null
8. 在sqoop1.4.7中,hdfs上的字符串'null'是可以转成mysql中字符串类型字段的null值,
            也可以转成mysql中非字符串类型字段的null值。

 

案例6)input-null-string和input-null-non-string参数的用法

就是将hdfs上的字符串序列转成mysql中的string的null或者是非String类型的null.

说明:

1. --input-null-string
    该参数的作用:将字符串序列,转成mysql里字符串类型的null
2. --input-null-non-string
    该参数的作用:将非字符串序列,转成mysql中整型的null,但是可能会失败。
    如果是对null-non-string转到hdfs上的数字字符串,可以在导出时,再次转成null.
    如果不是参数null-non-string转成的数字字符串,在导出时,会报错。

drop table `emp_6`;
CREATE TABLE `emp_6` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;



sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_6 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t' \
--input-null-string "SMITH" \
--input-null-non-string '10000' 


sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
--delete-target-dir \
--null-string '\\N' \
--null-non-string '10000' 

 

四、sqoop的高阶用法

4.1 增量导入

4.1.1 增量和全量的概念

在实际生产环境中,数据要源源不断的导入到数据仓库中进行分析。而增量导入是必须的,也可以避免很多重复的数据出现。
增量导入的概念:就是将数据源的新数据(旧数据已经导入过)导入到hadoop生态系统中。
         订单表  :   订单号,商品ID,用户ID,时间
全量导入: 当数据变化的量相对较少时,可以重新覆盖导入。
         用户信息表
         商品信息表
         店铺信息

4.1.2 增量导入的方式

增量导入的方式有三种

1.  使用通用参数:--append
2.  使用-e|--query,指定条件进行增量导入
3.  使用--incremental 和 --last-value  --check-column 组合进行增量导入
       --incremental  用来指定导入模式, append|lastmodified
       --last-value   记录上一次导入的最大值,表示这一次从这个值开始导入
       --check-column 用于指定要检查的字段。 

4.1.3 案例演示

1)使用–query方式增量导入到hive中

 

# 假如昨天导入的数据是10和20号部门的数据,如下:
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
-e "select * from emp where \$CONDITIONS and deptno in(10,20)" \
--split-by "empno" \
--target-dir "/temp/emp" \
--delete-target-dir \
--hive-import \
--hive-table "sz2103.emp_inrem" \
--fields-terminated-by "\t" \
-m 2


#今天导入新增加的30号部门的数据

[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
-e "select * from emp where \$CONDITIONS and deptno in(30)" \
--split-by "empno" \
--target-dir "/temp/emp" \
--delete-target-dir \
--hive-import \
--hive-table "sz2103.emp_inrem" \
--fields-terminated-by "\t" \
-m 2

2) 测试通用参数–append

小贴士

  1. 想要增量导入,就不能使用–delete-target-dir ,需要使用–append参数向hdfs上追加数据,
  2. 如果不指定–append参数,会报目录已经存在。
# 模拟昨天的数据:
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_increm" \
--delete-target-dir \
--fields-terminated-by "\t" \
--where "deptno=10 or deptno = 20" \
-m 1


# 进行增量导入30号部门的数据
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--append \
--table emp \
--target-dir "/sqoop/emp_increm" \
--fields-terminated-by "\t" \
--where "deptno=30" \
-m 1

 3)测试incremental,向hdfs上增量导入数据

–last-value: 从指定的值开始检查,如果有大于此值的数据,就认为是新数据,才会增量导入。导入后,会记录这次导入的最大值,为下一次增量导入做提示。

–check-column: 检查指定列,是否有新值,那么从意义上来说,指定的列应该具有唯一,非空,自增的特点。

# 模拟昨天的数据:
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_hdfs" \
--delete-target-dir \
--fields-terminated-by "\t" \
--where "empno<7700" \
-m 1

# 增量导入
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_hdfs" \
--fields-terminated-by "\t" \
--incremental append \
--last-value 7700 \
--check-column "empno" \
-m 1

小贴士: 

如果last-value的值由人来维护,那么很可能出现重复数据,人指定的值可能会出错,所以应该使用一种机制来自动维护last-value的值。

4.2 job的应用

4.2.1 job的优点

–1. 可以将导入导出语句创建到job里,然后可以重复的执行job。避免重写一堆参数,也可以使用定时器来定时执行job

–2. 如果job里封装了增量导入语句,由于job里自动维护了一堆参数,包括–last-value,因此执行job后,job会记录last-value的最新的值,以便下次导入新的行数据。

–3. 基于要维护last-value这样的值,因此涉及到元数据的存储。默认是存储到$HOME/.sqoop/下,但是不安全,因此也可以维护到mysql里。

 4.2.2 job的参数

[root@test01 ~]# sqoop job --help
[root@test01 ~]# sqoop help job
usage: sqoop job [GENERIC-ARGS] [JOB-ARGS] [-- [<tool-name>] [TOOL-ARGS]]

Job management arguments:
   --create <job-id>            Create a new saved job
   --delete <job-id>            Delete a saved job
   --exec <job-id>              Run a saved job
   --help                       Print usage instructions
   --list                       List saved jobs
   --meta-connect <jdbc-uri>    Specify JDBC connect string for the
                                metastore
   --show <job-id>              Show the parameters for a saved job
   --verbose                    Print more information while working
  

4.2.3 案例演示1

1)创建一个job,相当于将导入语句封装到job里。可以通过执行job来进行导入功能

[root@test01 ~]# sqoop job --create job1 \
-- import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_job" \
--delete-target-dir \
--fields-terminated-by "\t" \
--where "empno<7700" \
-m 1

注意事项:

–1. 创建job时,一定要添加import参数,而且要与前面的–之间有一个空格

–2. 如果报以下错误:

ERROR sqoop.Sqoop: Got exception running Sqoop: java.lang.NullPointerException
java.lang.NullPointerException

​ at org.json.JSONObject.(JSONObject.java:144)

需要将java-json.jar包导入到sqoop的lib目录下

–3. 如果第一次创建时,失败的话,可能会产生jobid. 需要删除后再执行

 

2)查询所有的job

[root@test01 ~]# sqoop job --list

 3)执行job

小贴士:会提示输入mysql的密码, 虽然job里有密码。
[root@test01 ~]# sqoop job --exec job1

 4)写一个定时脚本执行,规定中午12点整和半夜12整执行

[root@test01 ~]# crontab -e
0 12,0 * * * /usr/local/sqoop/bin/sqoop job --exec job1

4.2.4 案例演示2

1)创建一个带有增量导入的job

[root@test01 ~]# sqoop job \
--create job2 \
-- import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_hdfs" \
--fields-terminated-by "\t" \
--incremental append \
--check-column "empno" \
-m 1

2) 查看指定job

[root@test01 ~]# sqoop job --show job2

3) 执行job

[root@test01 ~]# sqoop job --exec job2

4)在mysql的emp表中插入三条新记录

insert into emp (empno,ename,job,deptno) values (10001,'zhangsan','clerk',10);
insert into emp (empno,ename,job,deptno) values (10002,'lisi','clerk',10);
insert into emp (empno,ename,job,deptno) values (10003,'wangwu','clerk',10);

5)再次执行job2

[root@test01 ~]# sqoop job --exec job2

会看到之导入了大于10000的三条记录。last-value变成了10003

4.3 metastore的应用

4.3.1 metastore的介绍

1. metastore服务是sqoop的元数据服务,用于存储sqoop的job相关信息
2. 默认会使用内嵌的元数据库,存储位置位于~/.sqoop/下
3. 由于安全性问题,将job信息保存于关系型数据库中是最合适的,关系型数据库可以选择mysql

4.3.2 配置metastore服务项

步骤1)修改用户自定义配置文件

[root@test01 conf]# vim sqoop-site.xml
<configuration>
    <!-- 是否要启动客户端自动连接元数据库 -->
    <property>
        <name>sqoop.metastore.client.enable.autoconnect</name>
        <value>true</value>
    </property>
     <!-- 连接元数据库的url -->
    <property>
        <name>sqoop.metastore.client.autoconnect.url</name>
        <value>jdbc:mysql://test03:3306/sqoop</value>
    </property>
    <!-- 连接元数据库的username -->
    <property>
        <name>sqoop.metastore.client.autoconnect.username</name>
        <value>root</value>
    </property>
    <!-- 连接元数据库的密码 -->
    <property>
        <name>sqoop.metastore.client.autoconnect.password</name>
        <value>@Mmforu45</value>
    </property>
     <!-- 是否要记住密码 -->
    <property>
        <name>sqoop.metastore.client.record.password</name>
        <value>true</value>
    </property>
    <property>
        <name>sqoop.metastore.server.location</name>
        <value>/usr/local/sqoop/sqoop-metastore/shared.db</value>
    </property>
    <property>
        <name>sqoop.metastore.server.port</name>
        <value>16000</value>
    </property>
</configuration>  

步骤2)在mysql中创建sqoop数据库

mysql> create database sqoop;

步骤3)启动sqoop的服务项

[root@test01 ~]# sqoop metastore
或者后台启动
[root@test01 ~]# nohup sqoop metastore &

步骤4)在sqoop库中要维护一张表SQOOP_ROOT

#方式1:可以使用sqoop的某一个语句进行连接,这样就可以创建此表

执行如下语句即可
sqoop job --list --meta-connect 'jdbc:mysql://test03:3306/sqoop?user=root&password=@Mmforu45'

执行后,SQOOP_ROOT表创建成功,  有可能报错,但是表会创建成功。


#方式2)手动维护此表
CREATE TABLE `SQOOP_ROOT` (
  `version` int(11) DEFAULT NULL,
  `propname` varchar(128) NOT NULL,
  `propval` varchar(256) DEFAULT NULL,
  UNIQUE KEY `SQOOP_ROOT_unq` (`version`,`propname`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

步骤5)需要向SQOOP_ROOT中添加一条记录

insert into SQOOP_ROOT values (NULL, 'sqoop.hsqldb.job.storage.version', '0');

步骤6)修改两张表的存储引擎为myisam(innoDB引擎会引起事务锁超时情况)

alter table SQOOP_ROOT engine=myisam;

alter table SQOOP_SESSIONS engine=myisam;

如果执行上述语句时,报错,那么就再次执行一次:
sqoop job --list --meta-connect 'jdbc:mysql://test03:3306/sqoop?user=root&password=@Mmforu45'
然后再修改存储引擎

4.3.3 案例演示1

1)创建一个job

[root@test01 ~]# sqoop job --create job1 \
-- import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_job" \
--delete-target-dir \
--fields-terminated-by "\t" \
--where "empno<7700" \
-m 1

2)查询metastore里保存的job

sqoop job --list \
--meta-connect 'jdbc:mysql://test03:3306/sqoop?user=root&password=@Mmforu45' 

小贴士:由于sqoop-site.xml里的自动连接属性为true,所以可以不添加–meta-connect参数

3)执行metastore里保存的job

[root@test01 ~]# sqoop job --exec job1

 4.3.4 案例演示2

1)创建增量导入的job

[root@qianfeng01 ~]# sqoop job --create job2 \
-- import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_me" \
--fields-terminated-by "\t" \
--incremental append \
--check-column "empno" \
-m 1

注意:第一次创建增量job时,last-value参数可以不指定,默认导入全表的数据,然后会自动维护last-value的值

2)执行

sqoop job --exec job2

注意:想要使用metastore保存增量导入的job,那么SQOOP_SESSIONS的存储引擎必须是myisam。否则在执行job,想要保存last-value时,会失败 

3)在mysql的emp表中插入两条新数据

insert into emp (empno,ename,job,deptno) values (10004,'lisi','clerk',10);
insert into emp (empno,ename,job,deptno) values (10005,'wangwu','clerk',10);

 4)再次执行job2

sqoop job --exec job2

验证方式:

1. 可以查看hdfs上的目录的数据文件是否新增

2. 可以去mysql的sqoop库下查看SQOOP_SESSIONS里的数据

4.4 sqoop的优化 

1. mapTask的数量不能超过yarn的Vcore的数目,默认情况下yarn的VCORE的数量是8
2. 如果数据量比较小,在200M以内,建议使用一个MapTask
3. split-by指定的字段最好是均分分布的数字类型或者是时间类型。
4. --fetch-size的数目的数据总大小最好在128M以内。