1.HDFS副本数
dfs.replication(HDFS)
文件副本数,通常设为3,不推荐修改。
如果测试环境只有二台虚拟机(2个datanode节点),此值要修改为2。
2.Yarn基础配置
2.1NodeManager配置
2.1.1CPU配置
配置项:yarn.nodemanager.resource.cpu-vcores
表示该节点服务器上yarn可以使用的虚拟CPU个数,默认值是8,推荐将值配置与物理CPU线程数相同,如果节点CPU核心不足8个,要调小这个值,yarn不会智能的去检测物理核心数。
2.1.2内存配置
配置项:yarn.nodemanager.resource.memory-mb
设置该nodemanager节点上可以为容器分配的总内存,默认为8G,如果节点内存资源不足8G,要减少这个值,yarn不会智能的去检测内存资源,一般按照服务器剩余可用内存资源进行配置。生产上根据经验一般要预留15-20%的内存,那么可用内存就是实际内存*0.8,比如实际内存是64G,那么64*0.8=51.2G,我们设置成50G就可以了(固定经验值)。
注意,要同时设置yarn.scheduler.maximum-allocation-mb为一样的值,yarn.app.mapreduce.am.command-opts(JVM内存)的值要同步修改为略小的值(-Xmx1024m)。
也就是:
yarn.nodemanager.resource.memory-mb
yarn.scheduler.maximum-allocation-mb : 与第一个保持一致
yarn.app.mapreduce.am.command-opts : 略小于第一个配置的值(0.9)
2.1.3本地目录
yarn.nodemanager.local-dirs(Yarn)
NodeManager 存储中间数据文件的本地文件系统中的目录列表。
如果单台服务器上有多个磁盘挂载,则配置的值应当是分布在各个磁盘上目录,这样可以充分利用节点的IO读写能力。
3.MapReduce内存配置
当MR内存溢出时,可以根据服务器配置进行调整。
1.mapreduce.map.memory.mb: 在运行MR的时候, 一个mapTask需要占用多大内存
为作业的每个 Map 任务分配的物理内存量(MiB),默认为0,自动判断大小。
2.mapreduce.reduce.memory.mb::在运行MR的时候, 一个reduceTask需要占用多大内存
为作业的每个 Reduce 任务分配的物理内存量(MiB),默认为0,自动判断大小。
3.mapreduce.reduce.java.opts: 在运行MR的时候, 一个reduceTask对应jvm需要占用多大内容
Map和Reduce的JVM配置选项。
注意:
mapreduce.map.java.opts一定要小于mapreduce.map.memory.mb;
mapreduce.reduce.java.opts一定要小于mapreduce.reduce.memory.mb,格式-Xmx4096m。
注意:
此部分所有配置均不能大于Yarn的NodeManager内存配置。
4.Hive基础配置
4.1hiveserver2的内存大小配置
配置项: HiveServer2 的 Java 堆栈大小(字节)
注意:Hiveserver2异常退出,导致连接失败的问题就是因为它
比如:
配置:HiveServer2 的 Java 堆栈大小(字节)
4.2动态生成分区的线程数
配置: hive.load.dynamic.partitions.thread
用于加载动态生成的分区的线程数。加载需要将文件重命名为它的最终位置,并更新关于新分区的一些元数据。默认值为 15 。
当有大量动态生成的分区时,增加这个值可以提高性能。根据服务器配置修改。
4.3监听输入文件线程数
hive.exec.input.listing.max.threads
Hive用来监听输入文件的最大线程数。默认值:15。
当需要读取大量分区时,增加这个值可以提高性能。根据服务器配置进行调整。
5.压缩配置
5.1Map输出压缩
除了创建表时指定保存数据时压缩,在查询分析过程中,Map的输出也可以进行压缩。由于map任务的输出需要写到磁盘并通过网络传输到reducer节点,所以通过使用LZO、LZ4或者Snappy这样的快速压缩方式,是可以获得性能提升的,因为需要传输的数据减少了。
MapReduce配置项:
mapreduce.map.output.compress
设置是否启动map输出压缩,默认为false。在需要减少网络传输的时候,可以设置为true。
mapreduce.map.output.compress.codec
设置map输出压缩编码解码器,默认为org.apache.hadoop.io.compress.DefaultCodec,推荐使用SnappyCodec:org.apache.hadoop.io.compress.SnappyCodec。
5.2Reduce结果压缩
是否对任务输出结果压缩,默认值false。对传输数据进行压缩,既可以减少文件的存储空间,又可以加快数据在网络不同节点之间的传输速度。
配置项:
mapreduce.output.fileoutputformat.compress
是否启用 MapReduce 作业输出压缩。
mapreduce.output.fileoutputformat.compress.codec
指定要使用的压缩编码解码器,推荐SnappyCodec。
mapreduce.output.fileoutputformat.compress.type
指定MapReduce作业输出的压缩方式,默认值RECORD,可配置值有:NONE、RECORD、BLOCK。推荐使用BLOCK,即针对一组记录进行批量压缩,压缩效率更高。
5.3Hive执行过程通用压缩设置
主要包括压缩/解码器设置和压缩方式设置:
mapreduce.output.fileoutputformat.compress.codec(Yarn)
map输出所用的压缩编码解码器,默认为org.apache.hadoop.io.compress.DefaultCodec;
推荐使用SnappyCodec:org.apache.hadoop.io.compress.SnappyCodec
mapreduce.output.fileoutputformat.compress.type
输出产生任务数据的压缩方式,默认值RECORD,可配置值有:NONE、RECORD、BLOCK。推荐使用BLOCK,即针对一组记录进行批量压缩,压缩效率更高。
5.4Hive多个Map-Reduce中间数据压缩
控制 Hive 在多个map-reduce作业之间生成的中间文件是否被压缩。压缩编解码器和其他选项由上面Hive通用压缩mapreduce.output.fileoutputformat.compress.*确定。
set hive.exec.compress.intermediate=true;
5.5Hive最终结果压缩
控制是否压缩查询的最终输出(到 local/hdfs 文件或 Hive table)。压缩编解码器和其他选项由 上面Hive通用压缩mapreduce.output.fileoutputformat.compress.*确定。
set hive.exec.compress.output=true;
6.全量流程
OLTP原始数据(mysql)——》数据采集(ODS)——》清洗转换(DWD)——》统计分析(DWS)——》导出至OLAP(Mysql)
7.操作
7.1.业务mysql->hive_ODS
code:
--访问咨询主表
sqoop import \
--connect jdbc:mysql://192.168.52.150:3306/nev \
--username root \
--password 123456 \
--query 'SELECT
id,create_date_time,session_id,sid,create_time,seo_source,seo_keywords,ip,
AREA,country,province,city,origin_channel,USER AS user_match,manual_time,begin_time,end_time,
last_customer_msg_time_stamp,last_agent_msg_time_stamp,reply_msg_count,
msg_count,browser_name,os_info, "2021-09-24" AS starts_time
FROM web_chat_ems_2019_07 where 1=1 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m 1--访问咨询附属表
sqoop import \
--connect jdbc:mysql://192.168.52.150:3306/nev \
--username root \
--password 123456 \
--query 'SELECT
*, "2021-09-24" AS start_time
FROM web_chat_text_ems_2019_07 where 1=1 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m 1
校验:
查看mysql共计有多少条数据
SELECT COUNT(1) FROM web_chat_ems_2019_07;
SELECT COUNT(1) FROM web_chat_text_ems_2019_07;
到hive中对表查询一下一共多少条数据
SELECT COUNT(1) FROM itcast_ods.web_chat_ems;
SELECT COUNT(1) FROM itcast_ods.web_chat_text_ems;
查询其中一部分数据, 观察数据映射是否OK
select * from itcast_ods.web_chat_ems limit 10;
SELECT * FROM itcast_ods.web_chat_text_ems limit 10;
7.2.ODS->DWD
数据清洗与转换,并少量做维度退化
转换: 将create_time转换为 yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo:
方案: 通过 **year() quarter() month() day() hour() quarter() **
select year(‘2019-07-01 23:45:00’) ; – 2019
select month(‘2019-07-01 23:45:00’) ; – 7
select day(‘2019-07-01 23:45:00’) ; – 1
select hour(‘2019-07-01 23:45:00’) ; – 23
select quarter(‘2019-07-01 23:45:00’); – 3
code:
select
wce.session_id,
wce.sid,
unix_timestamp(wce.create_time) as create_time,
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
substr(wce.create_time,12,2) as hourinfo,
substr(wce.create_time,1,4) as yearinfo,
quarter(wce.create_time) as quarterinfo,
substr(wce.create_time,6,2) as monthinfo,
substr(wce.create_time,9,2) as dayinfo
from itcast_ods.web_chat_ems wce join itcast_ods.web_chat_text_ems wcte
on wce.id = wcte.id;
报错:Error while processing statement: FAILED: Execution Error, return code 137 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
原因: 在执行转换操作的时候, 由于需要进行二表联查操作, 其中一个表数据量比较少, 此时hive会对其优化, 采用map join的方案进行处理, 而map join需要将小表的数据加载到内存中, 但是内存不足, 导致出现内存溢出错误,
解决:
关闭掉map join 让其采用reduce join即可
命令:
set hive.auto.convert.join= false;
执行后:
灌入DWD:可能出现分区问题,所以用的动态分区
解决如下:
--动态分区配置
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
wce.session_id,
wce.sid,
unix_timestamp(wce.create_time) as create_time,
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
substr(wce.create_time,12,2) as hourinfo,
substr(wce.create_time,1,4) as yearinfo,
quarter(wce.create_time) as quarterinfo,
substr(wce.create_time,6,2) as monthinfo,
substr(wce.create_time,9,2) as dayinfo
from itcast_ods.web_chat_ems wce join itcast_ods.web_chat_text_ems wcte
on wce.id = wcte.id;
7.3.DWD->DWS
DWS层作用: 细化维度统计操作
7.3.1以时间为基准, 统计总访问量
-- 统计每年的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'-1' as from_url,
'5' as grouptype,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo;解释:
- Partition: 指定了分区列
yearinfo
,quarterinfo
,monthinfo
,dayinfo
。- Select: 选择了需要统计的字段,包括用户ID、会话ID和IP地址的不同数量,以及一些固定的默认值。
- From: 数据来源于
itcast_dwd.visit_consult_dwd
表。- Group By: 按
yearinfo
进行分组,以便统计每年的总访问量。存放路径:
-- 统计每年每季度的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo;
-- 统计每年每季度每月的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo;
-- 统计每年每季度每月每天的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo;
-- 统计每年每季度每月每天每小时的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
7.3.2基于时间统计各个受访页面的访问量
-- 统计每年各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
from_url,
'4' as grouptype,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,from_url;-- 统计每年,每季度各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
from_url,
'4' as grouptype,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,from_url;-- 统计每年,每季度,每月各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
from_url,
'4' as grouptype,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,from_url;-- 统计每年,每季度,每月.每天各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
from_url,
'4' as grouptype,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,from_url;
-- 统计每年,每季度,每月.每天,每小时各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
from_url,
'4' as grouptype,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,from_url;
7.3.3基于时间统计总咨询量
-- 统计每年的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'3' as grouptype,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo;
-- 统计每年每季度的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'3' as grouptype,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo;
-- 统计每年每季度每月的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'3' as grouptype,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo;
-- 统计每年每季度每月每天的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'3' as grouptype,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo;
-- 统计每年每季度每月每天每小时的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'3' as grouptype,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
7.3.4基于时间,统计各个地区的咨询量
-- 统计每年各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'1' as grouptype,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,area;
-- 统计每年每季度各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'1' as grouptype,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,area;
-- 统计每年每季度每月各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'1' as grouptype,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,area;
-- 统计每年每季度每月每天各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'1' as grouptype,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;
-- 统计每年每季度每月每天每小时各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'1' as grouptype,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;
7.4DWS->mysql
7.4.1MySQL中创建目标表
create database scrm_bi default character set utf8mb4 collate utf8mb4_general_ci;
-- 访问量的结果表:
CREATE TABLE IF NOT EXISTS scrm_bi.visit_dws (
sid_total INT COMMENT '根据sid去重求count',
sessionid_total INT COMMENT '根据sessionid去重求count',
ip_total INT COMMENT '根据IP去重求count',
area varchar(32) COMMENT '区域信息',
seo_source varchar(32) COMMENT '搜索来源',
origin_channel varchar(32) COMMENT '来源渠道',
hourinfo varchar(32) COMMENT '创建时间,统计至小时',
time_str varchar(32) COMMENT '时间明细',
from_url varchar(32) comment '会话来源页面',
groupType varchar(32) COMMENT '产品属性类型:1.地区;2.搜索来源;3.来源渠道;4.会话来源页面;5.总访问量',
time_type varchar(32) COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;',
yearinfo varchar(32) COMMENT '年' ,
quarterinfo varchar(32) COMMENT '季度',
monthinfo varchar(32) COMMENT '月',
dayinfo varchar(32) COMMENT '天'
)comment 'EMS访客日志dws表';-- 咨询量的结果表:
CREATE TABLE IF NOT EXISTS scrm_bi.consult_dws
(
sid_total INT COMMENT '根据sid去重求count',
sessionid_total INT COMMENT '根据sessionid去重求count',
ip_total INT COMMENT '根据IP去重求count',
area varchar(32) COMMENT '区域信息',
origin_channel varchar(32) COMMENT '来源渠道',
hourinfo varchar(32) COMMENT '创建时间,统计至小时',
time_str varchar(32) COMMENT '时间明细',
groupType varchar(32) COMMENT '产品属性类型:1.地区;2.来源渠道',
time_type varchar(32) COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;',
yearinfo varchar(32) COMMENT '年' ,
quarterinfo varchar(32) COMMENT '季度',
monthinfo varchar(32) COMMENT '月',
dayinfo varchar(32) COMMENT '天'
)COMMENT '咨询量DWS宽表';
7.4.2DWS->sqoop->mysql
咨询量数据
sqoop export \
--connect jdbc:mysql://192.168.52.150:3306/scrm_bi \
--username root \
--password 123456 \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
-m 1(指定编码格式)
sqoop export \
--connect "jdbc:mysql://192.168.52.150:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
-m 1
访问量数据导出
sqoop export \
--connect "jdbc:mysql://192.168.52.150:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table visit_dws \
--hcatalog-database itcast_dws \
--hcatalog-table visit_dws \
-m 1
8.增量流程
8.1数据采集
增量更新和全量更新区别是,增量更新采用的是T+1模式,分析的数据只有一天的量。
- Sql中需要增加where条件,只查询昨天一天的数据,而不是所有表数据。
- Sqoop指定分区:增量更新每次只更新一天的数据,也可以使用静态分区方式导入。通过指定hive-partition-key和hive-partition-value两个参数实现。hive-partition-key用来指定分区字段名,hive-partition-value用来指定分区的值,也就是昨天的日期(今天0点采集的数据属于昨天)。
(此处模拟原表某一天的数据)
-- 模拟访问咨询主表数据
CREATE TABLE web_chat_ems_2021_09 AS
SELECT * FROM web_chat_ems_2019_07 WHERE create_time BETWEEN '2019-07-01 00:00:00' AND '2019-07-01 23:59:59';
-- 修改表中的时间字段
UPDATE web_chat_ems_2021_09 SET create_time = CONCAT('2021-09-25 ',SUBSTR(create_time,12));
-- 模拟访问咨询附属表数据
CREATE TABLE web_chat_text_ems_2021_09 AS
SELECT
temp2.*
FROM
(SELECT * FROM web_chat_ems_2019_07
WHERE create_time BETWEEN '2019-07-01 00:00:00' AND '2019-07-01 23:59:59') temp1
JOIN web_chat_text_ems_2019_07 temp2 ON temp1.id = temp2.id ;
-- 注意: 在生产环境中, 每个表中应该都是有若干天的数据, 而不是仅仅只有一天
只需要采集新增的这一天的数据即可
获取新增的这一天的数据
-- 访问咨询的主表数据:
SELECT
id,create_date_time,session_id,sid,create_time,seo_source,seo_keywords,ip,
AREA,country,province,city,origin_channel,USER AS user_match,manual_time,begin_time,end_time,
last_customer_msg_time_stamp,last_agent_msg_time_stamp,reply_msg_count,
msg_count,browser_name,os_info, '2021-09-26' AS starts_time
FROM web_chat_ems_2021_09 WHERE create_time BETWEEN '2021-09-25 00:00:00' AND '2021-09-25 23:59:59'
-- 访问咨询的附属表的数据
SELECT
temp2.*, '2021-09-26' AS start_time
FROM (SELECT id FROM web_chat_ems_2021_09 WHERE create_time BETWEEN '2021-09-25 00:00:00' AND '2021-09-25 23:59:59') temp1
JOIN web_chat_text_ems_2021_09 temp2 ON temp1.id = temp2.id
将增量的SQL集成到增量的sqoop脚本
-- 访问咨询主表
sqoop import \
--connect jdbc:mysql://192.168.52.150:3306/nev \
--username root \
--password 123456 \
--query "SELECT
id,create_date_time,session_id,sid,create_time,seo_source,seo_keywords,ip,
AREA,country,province,city,origin_channel,USER AS user_match,manual_time,begin_time,end_time,
last_customer_msg_time_stamp,last_agent_msg_time_stamp,reply_msg_count,
msg_count,browser_name,os_info, '2021-09-26' AS starts_time
FROM web_chat_ems_2021_09 WHERE create_time BETWEEN '2021-09-25 00:00:00' AND '2021-09-25 23:59:59' and \$CONDITIONS" \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m 1
-- 访问咨询附属表
sqoop import \
--connect jdbc:mysql://192.168.52.150:3306/nev \
--username root \
--password 123456 \
--query "SELECT
temp2.*, '2021-09-26' AS start_time
FROM (SELECT id FROM web_chat_ems_2021_09 WHERE create_time BETWEEN '2021-09-25 00:00:00' AND '2021-09-25 23:59:59') temp1
JOIN web_chat_text_ems_2021_09 temp2 ON temp1.id = temp2.id where 1=1 and \$CONDITIONS" \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m 1
通过shell脚本的方式来执行, 通过脚本自动获取上一天的日期, 最后将shell脚本放置在ooize中, 进行自动化调度操作
shell:
#!/bin/bash
export SQOOP_HOME=/usr/bin/sqoop
if [ $# == 1 ]
then
dateStr=$1
else
dateStr=`date -d '-1 day' +'%Y-%m-%d'`
fi
dateNowStr=`date +'%Y-%m-%d'`
yearStr=`date -d ${dateStr} +'%Y'`
monthStr=`date -d ${dateStr} +'%m'`
jdbcUrl='jdbc:mysql://192.168.52.150:3306/nev'
username='root'
password='123456'
m='1'
${SQOOP_HOME} import \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--query "SELECT
id,create_date_time,session_id,sid,create_time,seo_source,seo_keywords,ip,
AREA,country,province,city,origin_channel,USER AS user_match,manual_time,begin_time,end_time,
last_customer_msg_time_stamp,last_agent_msg_time_stamp,reply_msg_count,
msg_count,browser_name,os_info, '${dateNowStr}' AS starts_time
FROM web_chat_ems_${yearStr}_${monthStr} WHERE create_time BETWEEN '${dateStr} 00:00:00' AND '${dateStr}
23:59:59' and \$CONDITIONS" \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m ${m}
${SQOOP_HOME} import \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--query "SELECT
temp2.*, '${dateNowStr}' AS start_time
FROM (SELECT id FROM web_chat_ems_${yearStr}_${monthStr} WHERE create_time BETWEEN '${dateStr} 00:00:00'
AND '${dateStr} 23:59:59') temp1
JOIN web_chat_text_ems_${yearStr}_${monthStr} temp2 ON temp1.id = temp2.id where 1=1 and \$CONDIT
IONS" \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m ${m}
测试脚本:sh edu_mode_1_collect.sh
在hive中查看对应分区下的数据, 如果有, 说明抽取成功
select * from itcast_ods.web_chat_ems where starts_time='2021-09-26' limit 10;
完成自动化调度操作:配置工作流
配置定时,启动
8.2增量数据转换
sql:
--动态分区配置
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
wce.session_id,
wce.sid,
unix_timestamp(wce.create_time) as create_time,
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
substr(wce.create_time,12,2) as hourinfo,
substr(wce.create_time,1,4) as yearinfo,
quarter(wce.create_time) as quarterinfo,
substr(wce.create_time,6,2) as monthinfo,
substr(wce.create_time,9,2) as dayinfo
from (select * from itcast_ods.web_chat_ems where starts_time='2021-09-26') wce join (select * from itcast_ods.web_chat_text_ems where start_time='2021-09-26') wcte
on wce.id = wcte.id;
shell执行清理:
#!/bin/bash
export HIVE_HOME=/usr/bin/hive
if [ $# == 1 ]
then
dateStr=$1
else
dateStr=`date +'%Y-%m-%d'`
fi
sqlStr="
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set hive.exec.orc.compression.strategy=COMPRESSION;
insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
wce.session_id,
wce.sid,
unix_timestamp(wce.create_time) as create_time,
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
substr(wce.create_time,12,2) as hourinfo,
substr(wce.create_time,1,4) as yearinfo,
quarter(wce.create_time) as quarterinfo,
substr(wce.create_time,6,2) as monthinfo,
substr(wce.create_time,9,2) as dayinfo
from (select * from itcast_ods.web_chat_ems where starts_time='${dateStr}') wce join (select * from itcast_ods.web_chat_text_ems where start_time='${dateStr}') wcte
on wce.id = wcte.id;
"
${HIVE_HOME} -e "${sqlStr}" -S
校验:select * from itcast_dwd.visit_consult_dwd where yearinfo='2021' and monthinfo='09' and dayinfo='25'
配置工作流
8.3增量的SQL统计
-- 访问量: 5条
-- 统计每年的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'-1' as from_url,
'5' as grouptype,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='2021'
group by yearinfo;
-- 统计每年每季度的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='2021' and quarterinfo='3'
group by yearinfo,quarterinfo;
-- 统计每年每季度每月的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='2021' and quarterinfo='3' and monthinfo='09'
group by yearinfo,quarterinfo,monthinfo;
-- 统计每年每季度每月每天的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='2021' and quarterinfo='3' and monthinfo='09' and dayinfo='25'
group by yearinfo,quarterinfo,monthinfo,dayinfo;
-- 统计每年每季度每月每天每小时的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='2021' and quarterinfo='3' and monthinfo='09' and dayinfo='25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;-- 咨询量: 5条
-- 统计每年各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'1' as grouptype,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='2021'
group by yearinfo,area;
-- 统计每年每季度各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'1' as grouptype,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='2021' and quarterinfo='3'
group by yearinfo,quarterinfo,area;
-- 统计每年每季度每月各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'1' as grouptype,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='2021' and quarterinfo='3' and monthinfo='09'
group by yearinfo,quarterinfo,monthinfo,area;
-- 统计每年每季度每月每天各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'1' as grouptype,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='2021' and quarterinfo='3' and monthinfo='09' and dayinfo='25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;
-- 统计每年每季度每月每天每小时各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'1' as grouptype,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='2021' and quarterinfo='3' and monthinfo='09' and dayinfo='25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;
编写shell脚本
#!/bin/bash
export HIVE_HOME=/usr/bin/hiveif [ $# == 1 ]
then
dateStr=$1
else
dateStr=`date -d '-1 day' +'%Y-%m-%d'`fi
yearStr=`date -d ${dateStr} +'%Y'`
monthStr=`date -d ${dateStr} +'%m'`
month_for_quarter=`date -d ${dateStr} +'%-m'`
quarterStr=$((($month_for_quarter-1)/3+1))dayStr=`date -d ${dateStr} +'%d'`
sqlStr="
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set hive.exec.orc.compression.strategy=COMPRESSION;alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}}',dayinfo='-1');alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}}',dayinfo='-1');insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'-1' as from_url,
'5' as grouptype,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}'
group by yearinfo;insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}'
group by yearinfo,quarterinfo;insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}'
group by yearinfo,quarterinfo,monthinfo;insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo;insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'-1' as from_url,
'5' as grouptype,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'1' as grouptype,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}'
group by yearinfo,area;insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'1' as grouptype,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}'
group by yearinfo,quarterinfo,area;insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'1' as grouptype,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}'
group by yearinfo,quarterinfo,monthinfo,area;insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'1' as grouptype,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'1' as grouptype,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;"
${HIVE_HOME} -e "${sqlStr}" -S
shell:
#!/bin/bash export HIVE_HOME=/usr/bin/hive if [ $# == 1 ] then dateStr=$1 else dateStr=`date -d '-1 day' +'%Y-%m-%d'` fi yearStr=`date -d ${dateStr} +'%Y'` monthStr=`date -d ${dateStr} +'%m'` month_for_quarter=`date -d ${dateStr} +'%-m'` quarterStr=$((($month_for_quarter-1)/3+1)) dayStr=`date -d ${dateStr} +'%d'` sqlStr=" set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true; set hive.exec.orc.compression.strategy=COMPRESSION; alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1'); alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1'); alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}}',dayinfo='-1'); alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1'); alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1'); alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}}',dayinfo='-1'); insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, '-1' as area, '-1' as seo_source, '-1' as origin_channel, '-1' as hourinfo, yearinfo as time_str, '-1' as from_url, '5' as grouptype, '5' as time_type, yearinfo, '-1' as quarterinfo, '-1' as monthinfo, '-1' as dayinfo from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' group by yearinfo; insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, '-1' as area, '-1' as seo_source, '-1' as origin_channel, '-1' as hourinfo, concat(yearinfo,'_',quarterinfo) as time_str, '-1' as from_url, '5' as grouptype, '4' as time_type, yearinfo, quarterinfo, '-1' as monthinfo, '-1' as dayinfo from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}' group by yearinfo,quarterinfo; insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, '-1' as area, '-1' as seo_source, '-1' as origin_channel, '-1' as hourinfo, concat(yearinfo,'-',monthinfo) as time_str, '-1' as from_url, '5' as grouptype, '3' as time_type, yearinfo, quarterinfo, monthinfo, '-1' as dayinfo from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' group by yearinfo,quarterinfo,monthinfo; insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, '-1' as area, '-1' as seo_source, '-1' as origin_channel, '-1' as hourinfo, concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str, '-1' as from_url, '5' as grouptype, '2' as time_type, yearinfo, quarterinfo, monthinfo, dayinfo from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}' group by yearinfo,quarterinfo,monthinfo,dayinfo; insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, '-1' as area, '-1' as seo_source, '-1' as origin_channel, hourinfo, concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str, '-1' as from_url, '5' as grouptype, '1' as time_type, yearinfo, quarterinfo, monthinfo, dayinfo from itcast_dwd.visit_consult_dwd where yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}' group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo; insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, area, '-1' as origin_channel, '-1' as hourinfo, yearinfo as time_str, '1' as grouptype, '5' as time_type, yearinfo, '-1' as quarterinfo, '-1' as monthinfo, '-1' as dayinfo from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' group by yearinfo,area; insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, area, '-1' as origin_channel, '-1' as hourinfo, concat(yearinfo,'_',quarterinfo) as time_str, '1' as grouptype, '4' as time_type, yearinfo, quarterinfo, '-1' as monthinfo, '-1' as dayinfo from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' group by yearinfo,quarterinfo,area; insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, area, '-1' as origin_channel, '-1' as hourinfo, concat(yearinfo,'-',monthinfo) as time_str, '1' as grouptype, '3' as time_type, yearinfo, quarterinfo, monthinfo, '-1' as dayinfo from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' group by yearinfo,quarterinfo,monthinfo,area; insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, area, '-1' as origin_channel, '-1' as hourinfo, concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str, '1' as grouptype, '2' as time_type, yearinfo, quarterinfo, monthinfo, dayinfo from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}' group by yearinfo,quarterinfo,monthinfo,dayinfo,area; insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo) select count(distinct sid) as sid_total, count(distinct session_id) as sessionid_total, count(distinct ip) as ip_total, area, '-1' as origin_channel, hourinfo, concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str, '1' as grouptype, '1' as time_type, yearinfo, quarterinfo, monthinfo, dayinfo from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo='${yearStr}' and quarterinfo='${quarterStr}' and monthinfo='${monthStr}' and dayinfo='${dayStr}' group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area; " ${HIVE_HOME} -e "${sqlStr}" -S
测试:
select * from itcast_dws.visit_dws where yearinfo='2021';然后配置到oozie
8.4增量导出
shell:
#!/bin/bash
export SQOOP_HOME=/usr/bin/sqoop
if [ $# == 1 ]
then
TD_DATE=$1
else
TD_DATE=`date -d '-1 day' +'%Y-%m-%d'`
fiTD_YEAR=`date -d ${TD_DATE} +%Y`
mysql -uroot -p123456 -h192.168.52.150 -P3306 -e "delete from scrm_bi.visit_dws where yearinfo='$TD_YEAR'; delete from scrm_bi.consult_dws where yearinfo='$TD_YEAR';"
jdbcUrl='jdbc:mysql://192.168.52.150:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8'
username='root'
password='123456'${SQOOP_HOME} export \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--table visit_dws \
--hcatalog-database itcast_dws \
--hcatalog-table visit_dws \
--hcatalog-partition-keys yearinfo \
--hcatalog-partition-values $TD_YEAR \
-m 1${SQOOP_HOME} export \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
--hcatalog-partition-keys yearinfo \
--hcatalog-partition-values $TD_YEAR \
-m 1
测试:select * from visit_dws where yearinfo='2021';然后制作工作流