一、sqoop概述和安装
sqoop简介
1. sqoop是apache基金会的一个顶级项目(已经退役,)
2. sqoop是一款可以将数据在RDBMS和Hadoop生态中导入导出的项目
3. sqoop最后一个版本是1.4.7apache有一个专门管理退役软件的仓库。
退役原因:
1. 软件开发的不够好,弃用了。
2. 没什么提升空间了,已经趋近于完美了,不再更新了,apache对这样的项目,如果发现一两年左右都没有更新时,就会开会决定移动到退役仓库中。
sqoop应用场景
sqoop原理
1. sqoop采用了MapReduce的N个MapTask 来进行导入导出数据。 并行执行,以便提高效率
2. 在导入时,sqoop会逐行读取RDBMS中的数据,并落地到相应的Hadoop生态中
3. 真正导入数据前,会根据指定的切分字段,先计算数据的范围,然后使用切分器将数据的值范围按照MapTask的数量平分,由MapTask开始导入数据。(注意:有可能某个MapTask要处理的数据范围是空的)
sqoop安装
步骤1) 上传、解压、更名、配置环境变量、生效、验证
[root@test01 ~]# tar -zxvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /usr/local/
[root@test01 ~]# cd /usr/local/
[root@test01 local]# mv sqoop-1.4.7.bin__hadoop-2.6.0/ sqoop
[root@test01 local]# vim /etc/profile
[root@test01 local]# source /etc/profile
[root@test01 local]# sqoop-version
步骤2)配置sqoop的环境脚本文件
[root@test01 local]# cd sqoop/conf/
[root@test01 conf]# cp sqoop-env-template.sh sqoop-env.sh
[root@test01 conf]# vim sqoop-env.sh
...........省略.............
export HADOOP_COMMON_HOME=/usr/local/hadoop
#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/usr/local/hadoop
#set the path to where bin/hbase is available
export HBASE_HOME=/usr/local/hbase
#Set the path to where bin/hive is available
export HIVE_HOME=/usr/local/hive
#Set the path for where zookeper config dir is
export ZOOCFGDIR=/usr/local/zookeeper/conf
步骤3)导入mysql驱动到sqoop的lib目录
1. 如果连接的mysql是5.7版本以下, 可以导入mysql-connector-java-5.1.xxx-bin.jar
2. 如果连接的mysql是8.0版本以上, 需要导入mysql-connector-java-8.0.xxx-bin.jar
二、常用指令介绍
2.1 查看常用指令
[root@test01 lib]# sqoop help
Available commands:
codegen Generate code to interact with database records
create-hive-table Import a table definition into Hive
eval Evaluate a SQL statement and display the results
export Export an HDFS directory to a database table
help List available commands
import Import a table from a database to HDFS
import-all-tables Import tables from a database to HDFS
import-mainframe Import datasets from a mainframe server to HDFS
job Work with saved jobs
list-databases List available databases on a server
list-tables List available tables in a database
merge Merge results of incremental imports
metastore Run a standalone Sqoop metastore
version Display version information
2.2 指令帮助信息
[root@test01 ~]# sqoop help list-databases
Common arguments:
--connect <jdbc-uri> 用于指定连接jdbc的url
--username <username> 用户名
--password <password> 密码
................省略......................
2.3去官网查询
三、sqoop的基本用法(重点)
3.1 查询数据库和表
连接mysql,一定要远程授权
5.7版本:
grant all privileges on *.* to root@'%' identified by '远程密码' with grant option
5.8版本:
百度
3.1.1 查看数据库:sqoop list-databases
1) 连接windows上的mysql8.0的写法:
sqoop list-databases \
--connect jdbc:mysql://10.20.152.47:3306?serverTimezone=UTC \
--username root \
--password mmforu
2) 连接qianfeng03上的mysql5.7的写法
sqoop list-databases \
--connect jdbc:mysql://qianfeng03:3306 \
--username root \
--password @Mmforu45
注意:查看所有库,url只需要写到3306
3.1.2 查看表
sqoop list-tables \
--connect jdbc:mysql://test03:3306/hive \
--username root \
--password @Mmforu45
注意:要指定一个具体的database进行查看
3.2 Sqoop的Import
sqoop将数据从RDBMS中传输到Hadoop生态中,叫导入,即import
3.2.1 数据准备
create database sz2103 default character set utf8;
use sz2103;
-- emp表 有主键约束
DROP TABLE IF EXISTS `emp`;
CREATE TABLE `emp` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `emp` VALUES ('7200', 'SMITH', 'CLERK', '7902', '1980-12-17', '800', null, '20');
INSERT INTO `emp` VALUES ('7499', 'ALLEN', 'SALESMAN', '7698', '1981-02-20', '1600', '300', '30');
INSERT INTO `emp` VALUES ('7509', 'TOM', 'SALESMAN', '7698', '1983-02-20', '1300', '400', '20');
INSERT INTO `emp` VALUES ('7521', 'WARD', 'SALESMAN', '7698', '1981-02-22', '1250', '500', '30');
INSERT INTO `emp` VALUES ('7599', 'JONES', 'MANAGER', '7800', '1981-04-02', '2975', null, '20');
INSERT INTO `emp` VALUES ('7600', 'MARTIN', 'SALESMAN', '7698', '1981-09-28', '1250', '1400', '30');
INSERT INTO `emp` VALUES ('7698', 'BLAKE', 'MANAGER', '7800', '1981-05-01', '2850', null, '30');
INSERT INTO `emp` VALUES ('7782', 'CLARK', 'MANAGER', '7800', '1981-06-09', '2450', null, '10');
INSERT INTO `emp` VALUES ('7799', 'SCOTT', 'ANALYST', '7599', '1987-04-19', '3000', null, '20');
INSERT INTO `emp` VALUES ('7800', 'KING', 'PRESIDENT', null, '1981-11-17', '5000', null, '10');
INSERT INTO `emp` VALUES ('7844', 'TURNER', 'SALESMAN','7698', '1981-09-08', '1500', '0', '30');
INSERT INTO `emp` VALUES ('7876', 'ADAMS', 'CLERK', '7799', '1987-05-23', '1100', null, '20');
INSERT INTO `emp` VALUES ('7900', 'JAMES', 'CLERK', '7698', '1981-12-03', '950', null, '30');
INSERT INTO `emp` VALUES ('7902', 'FORD', 'ANALYST', '7599', '1981-12-03', '3000', null, '20');
INSERT INTO `emp` VALUES ('7934', 'MILLER', 'CLERK', '7782', '1982-01-23', '1300', null, '10');
INSERT INTO `emp` VALUES ('8000', 'superman', null, '7782', '1982-01-23', '1300', null, '10');
select * from emp;
-- 学生表 没有主键
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`s_id` varchar(20) NOT NULL,
`s_name` varchar(20) NOT NULL DEFAULT '',
`s_birth` varchar(20) NOT NULL DEFAULT '',
`s_sex` varchar(10) NOT NULL DEFAULT ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `student` VALUES
('01','赵雷','1990-01-01','男'),
('02','钱电','1990-12-21','男'),
('03','孙风','1990-05-20','男'),
('04','李云','1990-08-06','男'),
('05','周梅','1991-12-01','女'),
('06','吴兰','1992-03-01','女'),
('07','郑竹','1989-07-01','女'),
('08','王菊','1990-01-20','女'),
('09','张飞','1990-9-25','男'),
('10','刘备','1990-01-25','男'),
('11','关羽','1990-01-25','男');
select * from student;
3.2.2 mysql->hdfs
案例1)导入有主键的表到hdfs
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
-m 3
注意:再次导入时,会报目录已经存在,可以使用--delete-target-dir
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
-m 3 \
--delete-target-dir
总结
1. 默认分隔符是逗号
2. 如何切分的?
计算主键字段的最大和最小值,然后差值/MapTask的数量,然后平均
案例2)导入无主键的表到hdfs上
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table student \
--target-dir /sqoop/student \
-m 3 \
--delete-target-dir
以上内容会报错
解决方式如下:
sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table student \
--target-dir /sqoop/student \
-m 1 \
--delete-target-dir
参数:--delete-target-dir 用于删除已经存在的目录
第二种解决方式如下:
sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table student \
--target-dir /sqoop/student \
--split-by s_id \
--delete-target-dir
注意事项:如果表没有主建约束的字段
方式1: 指定--split-by参数
方式2: MapTask的数量只能是1
注意: 如果报以下错误,需要将提示中的属性和值添加到导入语句中
ERROR tool.ImportTool: Import failed: java.io.IOException: Generating splits for a textual index column allowed only in case of "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" property passed as a parameter
sqoop import \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table student \
--target-dir /sqoop/student \
--split-by s_id \
--delete-target-dir
案例3)指定列分隔符进行导入
参数:–fields-terminated-by 用于指定列分隔符
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
-m 3 \
--delete-target-dir
案例4)测试columns
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--columns "empno,ename" \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
-m 3 \
--delete-target-dir
案例5)测试where
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--columns "empno,ename,deptno" \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
-m 3 \
--delete-target-dir \
--where 'deptno = 10 or deptno = 30'
注意:
参数--where后的条件建议 加上引号
案例6)测试-e|–query
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select empno,mgr,deptno from emp where \$CONDITIONS and deptno = 10" \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
--split-by 'empno' \
-m 3 \
--delete-target-dir \
注意事项:
1. 当使用--query|-e 时, 一定不能用--table 2. 当使用--query|-e 时, 一定要使用--split-by 注意:指定的字段必须是select子句中的某一个字段 3. 当使用--query|-e 时, 在from tablename 后一定要在where后带上$CONDITIONS关键词, 注意:如果是双引号,$前要添加转义字符 4. 当使用--query|-e 时, 参数where失效,所以没有必要指定 5. 当使用--query|-e 时, 参数columns的作用,是对select中的字段进行筛选。 但是,用的不多,没有必要在columns里进行筛选。
案例7)测试字符串类型的null处理和非字符串类型的null处理
参数:–null-string --null-non-string 这两个参数是指将mysql里的null值 导入到hdfs上以什么的样式显示。
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
--split-by 'empno' \
-m 3 \
--delete-target-dir \
--null-string '\\N' \
--null-non-string 'null'
注意: 在hive表中,hive的null使用\N表示
3.2.3 mysql–>hive
案例1)导入hive中
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--split-by 'empno' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--hive-table 'sz2103.emp_1'
不需要提前在hive中创建表。
注意的问题如下:
1. sqoop导入hive的原理是:先导入到hdfs上 ,然后再使用load data指令上传到hive的表目录下
2. sqoop指令会在所在机器上临时运行hive客户端使用load进行加载数据,所以要注意客户端的问题,是本地还是远程。
3. 如果报以下错误,说明sqoop找不到hive的common包
java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf
解决办法如下:
1. 将hive的lib目录下的hive-common-2.xxxx.jar 拷贝到sqoop的lib目录
案例2)测试–hive-overwrite
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--split-by 'empno' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--hive-overwrite \
--hive-table 'sz2103.emp_1'
案例3)指定分隔符
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--split-by 'empno' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--hive-overwrite \
--hive-table 'sz2103.emp_1' \
--fields-terminated-by '\t'
结论:
1. 添加--hive-overwrite会覆盖原有的hive的表数据,反之没有此参数,表示追加数据。
2. 必须要有--hive-import选项。
3. 从mysql中导入到hive中时,如果不指定分隔符,默认使用的是^A,
4. 可以使用--fields-terminated-by指定分隔符, 注意,该表要么不存在,要么表的分隔符和你要指定的分隔符一致。
案例4) 测试–create-hive-table
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query "select * from emp where \$CONDITIONS" \
--split-by 'empno' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--hive-overwrite \
--hive-table 'sz2103.emp_11' \
--fields-terminated-by '\t' \
--create-hive-table
结论:
如果表提前存在,就会报错
案例5)分区导入hive
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
--delete-target-dir \
--hive-import \
--create-hive-table \
--hive-overwrite \
--hive-table 'sz2103.emp_12' \
--fields-terminated-by '\t' \
--hive-partition-key "year" \
--hive-partition-value "10"
注意事项
1. 可以提前创建分区表,方便指定分区字段
2. 如果没有提前创建分区表,导入语句会主动创建分区表。分区字段不要是mysql里查询的字段。
3.2.4 mysql->hbase
案例演示
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--query 'select empno,ename,deptno from emp where $CONDITIONS' \
--target-dir /sqoop/emp \
--delete-target-dir \
--hbase-table 'myns:emp1001' \
--column-family 'f1' \
--split-by 'empno' \
--hbase-row-key "empno,deptno"
结论:
导入时: 可以使用--query和--split-by的组合形式
也可以使用--table
1. rowkey是由--hbase-row-key指定的,如果没有指定,则使用表的主键或者--split-by指定的切分字段充当.
2. 也可以使用--hbase-row-key指定复合主键,字段是用逗号分开。复合主键是由下划线拼接而成。
3. 大小写问题:
如果指定的是--query|-e,
--split-by和--hbase-row-key的字段的大小写都要select子句中保持一致。
如果是--table 或者是select * 都应该大写。
3.3 Sqoop的export
Table 29. Export control arguments:
3.3.1 hdfs->mysql
注意:导出时,mysql中的表必须提前创建。
1)创建一个带有主键约束的表emp_1,字段数与hdfs上的一致
drop table `emp_1`;
CREATE TABLE `emp_1` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2)创建一个带有主键约束的表emp_2,字段数少于hdfs上的列数
CREATE TABLE `emp_2` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3)创建一个带有主键约束的表emp_3,字段数多于hdfs上的列数
CREATE TABLE `emp_3` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
`desc` varchar(100),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
4)创建一个无主键的表
CREATE TABLE `emp_4` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
`desc` varchar(100)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
案例1)有主键字段、字段数相同
注意类型是否匹配,字符串转成int,要注意是不是纯数字字符串。
[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_1 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'
注意事项:
1. 分隔符: hdfs上的数据的分隔符,必须在导出语句时,指定出来
2. 类型能否隐式转换, 隐式转换不了就会报错
案例2)有主键字段,字段数少于hdfs上列数
[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_2 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'
案例3)代有主键字段的表,字段数多于hdfs上列数
[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_3 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'
[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_1 \
--columns "empno,job,ename,deptno" \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'
总结:
通过案例2和3 总结如下:
--1. 导出时,如果不指定columns,是按照hdfs上的顺序给建表期间指定的字段顺序一一赋值。与mysql的字段的个数无关。注意mysql字段的not null约束情况。
--2. 如果指定了columns参数,也是按照顺序将hdfs上的列依次赋值给columns指定的字段。 注意数据类型的隐式转换。
案例4)导出到无主键约束的表中
[root@test01 ~]# sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_4 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'
总结:
mysql的表没有主键约束,更方便导出到msyql里,如果mysql中有主键,则要注意主键字段的重复值问题。
案例5)测试类型没有对应上
DROP TABLE IF EXISTS `emp_5`;
CREATE TABLE `emp_5` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
说明:emp_5的第五个字段sal是int类型,但是hdfs上的第五列是日期格式“yyyy-MM-dd”。
sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_5 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t'
注意:一定会报错
结论:
1. 导出时,mysql中的表必须提前创建。
2. 导出时,字段之间的切分符号默认是逗号。如果hdfs上的文件不是逗号分隔符,需要使用--fields-terminated-by或--input-fields-terminated-by参数指定分隔符
3. 导出时,是按照hdfs上文件从左到右的顺序给mysql表的字段赋值
4. 导出时,mysql的表的字段数与hdfs上的列数可以不相同
5. 导出时,字段类型要一致或者可以隐式转换。
6. 带有主键约束的mysql表,要注意导出的数据的主键约束的情况,不能重复
7. 使用--columns给mysql中的某些字段赋值时,没有包含在参数中的字段要么有默认值,要么不能设置not null
8. 在sqoop1.4.7中,hdfs上的字符串'null'是可以转成mysql中字符串类型字段的null值,
也可以转成mysql中非字符串类型字段的null值。
案例6)input-null-string和input-null-non-string参数的用法
就是将hdfs上的字符串序列转成mysql中的string的null或者是非String类型的null.
说明:
1. --input-null-string
该参数的作用:将字符串序列,转成mysql里字符串类型的null
2. --input-null-non-string
该参数的作用:将非字符串序列,转成mysql中整型的null,但是可能会失败。
如果是对null-non-string转到hdfs上的数字字符串,可以在导出时,再次转成null.
如果不是参数null-non-string转成的数字字符串,在导出时,会报错。
drop table `emp_6`;
CREATE TABLE `emp_6` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
sqoop export \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp_6 \
--export-dir /sqoop/emp \
--fields-terminated-by '\t' \
--input-null-string "SMITH" \
--input-null-non-string '10000'
sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir /sqoop/emp \
--fields-terminated-by '\t' \
--delete-target-dir \
--null-string '\\N' \
--null-non-string '10000'
四、sqoop的高阶用法
4.1 增量导入
4.1.1 增量和全量的概念
在实际生产环境中,数据要源源不断的导入到数据仓库中进行分析。而增量导入是必须的,也可以避免很多重复的数据出现。
增量导入的概念:就是将数据源的新数据(旧数据已经导入过)导入到hadoop生态系统中。
订单表 : 订单号,商品ID,用户ID,时间
全量导入: 当数据变化的量相对较少时,可以重新覆盖导入。
用户信息表
商品信息表
店铺信息
4.1.2 增量导入的方式
增量导入的方式有三种
1. 使用通用参数:--append
2. 使用-e|--query,指定条件进行增量导入
3. 使用--incremental 和 --last-value --check-column 组合进行增量导入
--incremental 用来指定导入模式, append|lastmodified
--last-value 记录上一次导入的最大值,表示这一次从这个值开始导入
--check-column 用于指定要检查的字段。
4.1.3 案例演示
1)使用–query方式增量导入到hive中
# 假如昨天导入的数据是10和20号部门的数据,如下:
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
-e "select * from emp where \$CONDITIONS and deptno in(10,20)" \
--split-by "empno" \
--target-dir "/temp/emp" \
--delete-target-dir \
--hive-import \
--hive-table "sz2103.emp_inrem" \
--fields-terminated-by "\t" \
-m 2
#今天导入新增加的30号部门的数据
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
-e "select * from emp where \$CONDITIONS and deptno in(30)" \
--split-by "empno" \
--target-dir "/temp/emp" \
--delete-target-dir \
--hive-import \
--hive-table "sz2103.emp_inrem" \
--fields-terminated-by "\t" \
-m 2
2) 测试通用参数–append
小贴士:
- 想要增量导入,就不能使用–delete-target-dir ,需要使用–append参数向hdfs上追加数据,
- 如果不指定–append参数,会报目录已经存在。
# 模拟昨天的数据:
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_increm" \
--delete-target-dir \
--fields-terminated-by "\t" \
--where "deptno=10 or deptno = 20" \
-m 1
# 进行增量导入30号部门的数据
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--append \
--table emp \
--target-dir "/sqoop/emp_increm" \
--fields-terminated-by "\t" \
--where "deptno=30" \
-m 1
3)测试incremental,向hdfs上增量导入数据
–last-value: 从指定的值开始检查,如果有大于此值的数据,就认为是新数据,才会增量导入。导入后,会记录这次导入的最大值,为下一次增量导入做提示。
–check-column: 检查指定列,是否有新值,那么从意义上来说,指定的列应该具有唯一,非空,自增的特点。
# 模拟昨天的数据:
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_hdfs" \
--delete-target-dir \
--fields-terminated-by "\t" \
--where "empno<7700" \
-m 1
# 增量导入
[root@test01 ~]# sqoop import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_hdfs" \
--fields-terminated-by "\t" \
--incremental append \
--last-value 7700 \
--check-column "empno" \
-m 1
小贴士:
如果last-value的值由人来维护,那么很可能出现重复数据,人指定的值可能会出错,所以应该使用一种机制来自动维护last-value的值。
4.2 job的应用
4.2.1 job的优点
–1. 可以将导入导出语句创建到job里,然后可以重复的执行job。避免重写一堆参数,也可以使用定时器来定时执行job
–2. 如果job里封装了增量导入语句,由于job里自动维护了一堆参数,包括–last-value,因此执行job后,job会记录last-value的最新的值,以便下次导入新的行数据。
–3. 基于要维护last-value这样的值,因此涉及到元数据的存储。默认是存储到$HOME/.sqoop/下,但是不安全,因此也可以维护到mysql里。
4.2.2 job的参数
[root@test01 ~]# sqoop job --help
[root@test01 ~]# sqoop help job
usage: sqoop job [GENERIC-ARGS] [JOB-ARGS] [-- [<tool-name>] [TOOL-ARGS]]
Job management arguments:
--create <job-id> Create a new saved job
--delete <job-id> Delete a saved job
--exec <job-id> Run a saved job
--help Print usage instructions
--list List saved jobs
--meta-connect <jdbc-uri> Specify JDBC connect string for the
metastore
--show <job-id> Show the parameters for a saved job
--verbose Print more information while working
4.2.3 案例演示1
1)创建一个job,相当于将导入语句封装到job里。可以通过执行job来进行导入功能
[root@test01 ~]# sqoop job --create job1 \
-- import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_job" \
--delete-target-dir \
--fields-terminated-by "\t" \
--where "empno<7700" \
-m 1
注意事项:
–1. 创建job时,一定要添加import参数,而且要与前面的–之间有一个空格
–2. 如果报以下错误:
ERROR sqoop.Sqoop: Got exception running Sqoop: java.lang.NullPointerException
java.lang.NullPointerException at org.json.JSONObject.(JSONObject.java:144)
需要将java-json.jar包导入到sqoop的lib目录下
–3. 如果第一次创建时,失败的话,可能会产生jobid. 需要删除后再执行
2)查询所有的job
[root@test01 ~]# sqoop job --list
3)执行job
小贴士:会提示输入mysql的密码, 虽然job里有密码。
[root@test01 ~]# sqoop job --exec job1
4)写一个定时脚本执行,规定中午12点整和半夜12整执行
[root@test01 ~]# crontab -e
0 12,0 * * * /usr/local/sqoop/bin/sqoop job --exec job1
4.2.4 案例演示2
1)创建一个带有增量导入的job
[root@test01 ~]# sqoop job \
--create job2 \
-- import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_hdfs" \
--fields-terminated-by "\t" \
--incremental append \
--check-column "empno" \
-m 1
2) 查看指定job
[root@test01 ~]# sqoop job --show job2
3) 执行job
[root@test01 ~]# sqoop job --exec job2
4)在mysql的emp表中插入三条新记录
insert into emp (empno,ename,job,deptno) values (10001,'zhangsan','clerk',10);
insert into emp (empno,ename,job,deptno) values (10002,'lisi','clerk',10);
insert into emp (empno,ename,job,deptno) values (10003,'wangwu','clerk',10);
5)再次执行job2
[root@test01 ~]# sqoop job --exec job2
会看到之导入了大于10000的三条记录。last-value变成了10003
4.3 metastore的应用
4.3.1 metastore的介绍
1. metastore服务是sqoop的元数据服务,用于存储sqoop的job相关信息
2. 默认会使用内嵌的元数据库,存储位置位于~/.sqoop/下
3. 由于安全性问题,将job信息保存于关系型数据库中是最合适的,关系型数据库可以选择mysql
4.3.2 配置metastore服务项
步骤1)修改用户自定义配置文件
[root@test01 conf]# vim sqoop-site.xml
<configuration>
<!-- 是否要启动客户端自动连接元数据库 -->
<property>
<name>sqoop.metastore.client.enable.autoconnect</name>
<value>true</value>
</property>
<!-- 连接元数据库的url -->
<property>
<name>sqoop.metastore.client.autoconnect.url</name>
<value>jdbc:mysql://test03:3306/sqoop</value>
</property>
<!-- 连接元数据库的username -->
<property>
<name>sqoop.metastore.client.autoconnect.username</name>
<value>root</value>
</property>
<!-- 连接元数据库的密码 -->
<property>
<name>sqoop.metastore.client.autoconnect.password</name>
<value>@Mmforu45</value>
</property>
<!-- 是否要记住密码 -->
<property>
<name>sqoop.metastore.client.record.password</name>
<value>true</value>
</property>
<property>
<name>sqoop.metastore.server.location</name>
<value>/usr/local/sqoop/sqoop-metastore/shared.db</value>
</property>
<property>
<name>sqoop.metastore.server.port</name>
<value>16000</value>
</property>
</configuration>
步骤2)在mysql中创建sqoop数据库
mysql> create database sqoop;
步骤3)启动sqoop的服务项
[root@test01 ~]# sqoop metastore
或者后台启动
[root@test01 ~]# nohup sqoop metastore &
步骤4)在sqoop库中要维护一张表SQOOP_ROOT
#方式1:可以使用sqoop的某一个语句进行连接,这样就可以创建此表
执行如下语句即可
sqoop job --list --meta-connect 'jdbc:mysql://test03:3306/sqoop?user=root&password=@Mmforu45'
执行后,SQOOP_ROOT表创建成功, 有可能报错,但是表会创建成功。
#方式2)手动维护此表
CREATE TABLE `SQOOP_ROOT` (
`version` int(11) DEFAULT NULL,
`propname` varchar(128) NOT NULL,
`propval` varchar(256) DEFAULT NULL,
UNIQUE KEY `SQOOP_ROOT_unq` (`version`,`propname`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
步骤5)需要向SQOOP_ROOT中添加一条记录
insert into SQOOP_ROOT values (NULL, 'sqoop.hsqldb.job.storage.version', '0');
步骤6)修改两张表的存储引擎为myisam(innoDB引擎会引起事务锁超时情况)
alter table SQOOP_ROOT engine=myisam;
alter table SQOOP_SESSIONS engine=myisam;
如果执行上述语句时,报错,那么就再次执行一次:
sqoop job --list --meta-connect 'jdbc:mysql://test03:3306/sqoop?user=root&password=@Mmforu45'
然后再修改存储引擎
4.3.3 案例演示1
1)创建一个job
[root@test01 ~]# sqoop job --create job1 \
-- import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_job" \
--delete-target-dir \
--fields-terminated-by "\t" \
--where "empno<7700" \
-m 1
2)查询metastore里保存的job
sqoop job --list \
--meta-connect 'jdbc:mysql://test03:3306/sqoop?user=root&password=@Mmforu45'
小贴士:由于sqoop-site.xml里的自动连接属性为true,所以可以不添加–meta-connect参数
3)执行metastore里保存的job
[root@test01 ~]# sqoop job --exec job1
4.3.4 案例演示2
1)创建增量导入的job
[root@qianfeng01 ~]# sqoop job --create job2 \
-- import \
--connect jdbc:mysql://test03:3306/sz2103 \
--username root \
--password @Mmforu45 \
--table emp \
--target-dir "/sqoop/emp_me" \
--fields-terminated-by "\t" \
--incremental append \
--check-column "empno" \
-m 1
注意:第一次创建增量job时,last-value参数可以不指定,默认导入全表的数据,然后会自动维护last-value的值
2)执行
sqoop job --exec job2
注意:想要使用metastore保存增量导入的job,那么SQOOP_SESSIONS的存储引擎必须是myisam。否则在执行job,想要保存last-value时,会失败
3)在mysql的emp表中插入两条新数据
insert into emp (empno,ename,job,deptno) values (10004,'lisi','clerk',10);
insert into emp (empno,ename,job,deptno) values (10005,'wangwu','clerk',10);
4)再次执行job2
sqoop job --exec job2
验证方式:
1. 可以查看hdfs上的目录的数据文件是否新增
2. 可以去mysql的sqoop库下查看SQOOP_SESSIONS里的数据
4.4 sqoop的优化
1. mapTask的数量不能超过yarn的Vcore的数目,默认情况下yarn的VCORE的数量是8
2. 如果数据量比较小,在200M以内,建议使用一个MapTask
3. split-by指定的字段最好是均分分布的数字类型或者是时间类型。
4. --fetch-size的数目的数据总大小最好在128M以内。