sgg快餐项目-2项目前数据同步

发布于:2024-08-08 ⋅ 阅读:(134) ⋅ 点赞:(0)

一、同步策略选择

数据同步就是通过同步工具将数据拿到数仓中。

同步策略:

  • 全量同步:每次同步将表的所有数据都同步过来。

优点:同步逻辑简单,使用简单

缺点:有数据冗余。

适用于数据量小的情况。

  • 增量同步:只同步改变的数据。

增量同步通常在首日需要做全量同步。

优点:没有数据冗余

缺点:同步逻辑相对比较复杂,使用比较麻烦。

适用于数据量大,变化频率低的数据。(如果数据量大且变化频率高,建议全量同步)

针对本快餐项目,哪些表使用增量,哪些使用全量?

全量同步通常使用DataX、Sqoop等基于select的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用Maxwell、Canal等工具, 在本项目中,全量使用DataX,增量使用Maxwell。

 二、Datax安装部署

2.1 DataX简介

简介:DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。(把hdfs导入到mysql,把mysql数据导入到hdfs)

总结:DataX就是不同数据源之间的桥梁。

2.2 DataX架构

2.3 DataX运行流程

 2.4 dataX调度

2.5 DataX安装部署

        安装DataX

  • 将datax安装包上传到/opt/software
  • 解压到/opt/module并且改名字

datax安装之后不需要进行任何的配置就可以使用,那怎么用?

(根据以往的组件使用经验,一般都是在bin目录下有个脚本,然后运行这个脚本就可以启动或停止)

cd /opt/module/datax/bin
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

-- 注意DataX的作用就是把数据从某个数据源读出来,再写入某个数据源。

  使用/opt/module/datax/bin/datax.py 文件,该文件运行的话需要一个配置文件,该配置文件会指明一些参数:reader在哪,writer在哪等。

-- 那该json文件格式是什么?

 datax提供了生成json文件的模型,我们通过该命令生成json文件模板,然后再对其进行修改。

python bin/datax.py -r mysqlreader -w hdfswriter

2.6  同步MySQL数据到HDFS案例

案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录

需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableModeQuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。(使用sql语句也会涉及到table,column,where,只不过使用sql语句。)

1.准备json文件:github.com 从该github网址下,mysql读的地方,复制json模板。(tablemodule)

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3  # 并发度
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "000000",  # mysql的账户和密码
                        "column": [
                            "id",
                            "create_time",
                            "phone_number"   # 读取哪些列的数据。
                        ],
                        "splitPk": "id",  # 表示使用哪个字段对表进行切片,一般推荐使用主键,不切片就不写,表示一个表一个切片,这是切片会分成多个task读取。
                        "where":"id<=100 and id>=5"  # 条件
                        "connection": [
                            {
                                "table": [
                                    "customer"  # 导哪张表的数据,如果是多张表必须保证表的字段一样。
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://hadoop102:3306/fast_food"  # mysql库的地址
                                ]
                            }
                        ]
                    }
                },
               

2.把该json文件中的writer部分删除,还是在刚才的网址,复制hdfs的writer部分。

"writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://hadoop102:8020",  # hdfs地址
                        "fileType": "text",  # 保存数据的形式
                        "path": "/datas",  # 保存数据的路径
                        "fileName": "customer",  # 文件名的前缀,hdfswriter写入时的文件名,实际执行时会在文件名后添加随机的后缀作为实际文件名
                           # column:读几个字段,写几个字段,不支持部分写入
                        "column": [
                            {
                                "name": "id",
                                "type": "string"
                            },
                            {
                                "name": "create_time",
                                "type": "string"
                            },
                            {
                                "name": "phone_nuber",
                                "type": "string"
                            }
                            
                        ],
                        "writeMode": "append",  # append 有该文件时,追加 |nonconflict 有该文件时报错|truncate 有该文件时,清除再写入
                        "fieldDelimiter": "\t",
                    }
                }
            }
        ]
    }
}

3.在/opt/module/datax/job目录下,创建该文件,取名test.json

4.启动:python bin/datax.py job/test.json(这样会报错,因为hdfs写数据的哪个目录必须先存在)

5.创建datas目录: hdfs dfs -mkdir /datas,之后在执行。

注意:

  1. datax读写数据是为了将数据导入到数据仓库,比如从sql中读数据,然后写到hdfs上,后边hive就可以拿hdfs的数据去分析。
  2. hdfs写数据由两种type:text和orc
  • 使用text时,列名可以随便写,因为text的方式,列名不会被保存,但是列的类型要和后续hive建表时的类型一致。
  • 使用orc时,列名和类型必须和hive表一致.

QuerySQLMode的形式配置和上边不同的是:不需要where,colunm,table,在connection下添加一个querysql字段:

一个sql语句是一个切片。

"querySql":
{"select id ,create_time,phone_number from customer where id >=5 and id <=100"}

 2.7 同步HDFS数据到MySQL

案例要求:同步HDFS上的/base_province目录下的数据到MySQL gmall数据库下的test_province表。

需求分析:要实现该功能,需选用HDFSReaderMySQLWriter

2.8 DataX传参 

在把数据写到hdfs上时,(如2.6),需要配置一个hdfs的位置来保存这个数据,但是实际生产中,每天上传数据的目录不一样,这时候需要将目录以参数的方式传给DataX的json文件。

1.首先需要在hdfs上创建相应的目录。

hdfs dfs -mkdir /datas/2023-06-16

2.修改hdfs上边写的路径:

"path":"datas/"--->
"path":"datas/${dt} # 这里使用shell脚本的方式进行接收参数。

3.将参数传递给配置文件

python bin/datax.py -p "-Ddt=2023-06-16" job/test.json

2.9 DataX的优化 

2.9.1 速度调整

这里的限速表示多少条数据。 

channel表示并发数,同时跑几个task。

 配置示例:

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576 //单个channel byte限速1M/s
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte" : 5242880 //总byte限速5M/s
            }
        },
        ...
    }
}

2.9.2 内存调整

对应job里边有对应的channel,channel会占用内存,当channel并发比较多时,占用内存就比较多,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。

建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。

怎样调整?

调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json

2.10 datax只在Hadoop02上配置了。 

 三、全量数据同步

3.1 数据通道

全量表数据由DataX从MySQL业务数据库直接同步到HDFS,具体数据流向,如下图所示。

  •  其中,在hdfs上,/origin_data 表示目录,/fast_food/表示不同的库,/db表示业务数据,/*full表示全量同步的某个表的名字,/2023-06-14表示哪天同步的。

 3.2 编写相关json脚本

这里已经在上边示例过了,不再过多解释。

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "create_time",
              "update_time",
              "name",
              "phone_number",
              "type",
              "region_id"
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://hadoop102:3306/fast_food?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8"
                ],
                "table": [
                  "shop"
                ]
              }
            ],
            "password": "000000",
            "splitPk": "",
            "username": "root"
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "bigint"
              },
              {
                "name": "create_time",
                "type": "string"
              },
              {
                "name": "update_time",
                "type": "string"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "phone_number",
                "type": "string"
              },
              {
                "name": "type",
                "type": "bigint"
              },
              {
                "name": "region_id",
                "type": "bigint"
              }
            ],
            "compress": "gzip",
            "defaultFS": "hdfs://hadoop102:8020",
            "fieldDelimiter": "\t",
            "fileName": "shop",
            "fileType": "text",
            "path": "${targetdir}",
            "writeMode": "truncate",
            "nullFormat": ""
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
  • vim  /job/test4.json 首先把上边内容创建为脚本。
  • hdfs dfs -mkdir  -p /datas/shop/2023-06-17  在hdfs上边创建目录,因为下一步要把mysql的数据导入到hdfs上的该目录下。
  • 查看hadoop102:9870发现确实有创建的目录。
  • 接下来开始导入数据:python bin/datax.py -p "-Dtargetdir=/datas/shop/2023-06-17"  job/test4.json

我们可以发现,在导不同表中的数据的时候,字段,条件、表名等都要对应相应的表进行更改,所以不同的表json文件不同,所以我们要为每张表编写一个DataX的json配置文件。 

 3.3 全量表json文件生成。

针对不同的表生成不同的json文件用于导入数据。(拿来就用)

  1. 创建/opt/module/gen_datax_config。
  2. 将生成器上传到服务器的/opt/module/gen_datax_config目录。
  3. 上传这两个包到该目录下。
  4. jar包是用来生成json文件的,那生成哪些json文件?在configuration.properties中配置。
  5. 配置好jar包需要的配置之后,使用java -jar 运行该jar包。
  6. 在/opt/module/datax/job/import/下会发现生成好了这些表的json文件。
  7. 随便运行一个可以成功。
  8. 感觉该jar包是用来生成json文件的,用java写的,该包用python写也可以。

3.4 全量表数据同步脚本

每个表每天都要全量同步,所以为了避免每天都得执行同步命令,所以封装一个全量同步脚本。(拿来就用)

#!bin/bash
#mysql_to_hdfs_full.sh/表名|日期
#1.判断参数是否传入。
if [ $# -lt 1]
then 
echo "必须传入参数"
fi

# 2、判断日期是否传入,如果日期传入,那么全量导该日期的数据,否则导前一天的数据。
["$2"] && datastr=$2 || datastr=$(date -d "-1 day" +%F)


# 这里写一个函数,供3使用
function import_data(){
       tableNames=$*
       for tablename in ${tableNames}
       do 
            #判断hdfs上边对应的目录是否存在,不存在则创建,“hdfs dfs -test -e 目录” 如果存在返回0,不存在返回1
            hdfs dfs -test -e /origin_data/fast_food/db/${tablename}_full/${datastr}
            if [ $? -eq 1 ]
            then  
                   hdfs dfs -mkdir -p  /origin_data/fast_food/db/${tablename}_full/${datastr}
            fi
            python /opt/module/datax/bin/datax.py -p"Dtargetdir=/origin_data/fast_food/db/${tablename}_full/${datastr}" /opt/module/datax/job/import/fast_food.${tablename}.json
        

       done


}

# 3 根据传进来的表名匹配数据
case $1 in 
"all")
import data "shop" "region" "promotion" "product_spu_attr" "product_spu_attr_value" "product_spu" "product_sku" "product_group" "product_category" "product_group_sku"
;;

"shop")
import_data $1
;;

"region")
import_data $1
;;

"promotion")
import_data $1
;;

"product_spu_attr")
import_data $1
;;

"product_spu_attr_value")
import_data $1
;;

"product_spu")
import_data $1
;;

"product_sku")
import_data $1
;;

"product_group")
import_data $1
;;

"product_group_sku")
import_data $1
;;

"product_category")
import_data $1
;;

*)
echo "表名输入错误"
  • 然后: vim /home/bigdata/bin/mysql_to_hdfs_full.sh 创建该bash文件。
  • chmod +x  /home/bigdata/bin/mysql_to_hdfs_full.sh
  • mysql_to_hdfs_full.sh all 2023-06-05

之后 全量数据 同步只需要每天执行该脚本即可。

四、增量数据导入


网站公告

今日签到

点亮在社区的每一天
去签到