大数据 spark hive 总结

发布于:2025-03-11 ⋅ 阅读:(20) ⋅ 点赞:(0)

Apache Spark

简介

是一个开源的统一分析引擎,专为大规模数据处理而设计。它提供了高级API,支持Java、Scala、Python和R语言,并且包含了一个优化过的执行引擎,该引擎支持循环计算(如机器学习算法)和交互式查询。以下是Spark的一些关键特性和概念

核心特性 

  1. 速度:Spark通过内存计算提高了数据处理的速度,比Hadoop MapReduce快达10到100倍。

  2. 易用性:提供丰富的高层次API,包括DataFrame和Dataset API,简化了数据操作。

  3. 通用性:除了Map和Reduce操作之外,还支持SQL查询、流处理、机器学习和图计算等多种工作负载。

  4. 可扩展性:能够有效地在数千个节点上并行运行。

  5. 容错性:使用RDD(Resilient Distributed Dataset)抽象层,自动处理节点故障恢复。

基本概念 


1. RDD (Resilient Distributed Dataset)

  • 定义
    RDD 是 Spark 的最底层抽象,表示分布在集群节点上的不可变、可分区的数据集合。它提供低级别的 API,强调对数据的细粒度控制

  • 特点

    • 强类型:存储任意类型的对象(如 RDD[User])。

    • 手动优化:需要开发者自行处理序列化、分区、缓存等优化。

    • 函数式操作:通过 mapfilterreduce 等函数式算子处理数据。

    • 容错性:通过血统(Lineage)机制重建丢失的分区。

  • 适用场景
    非结构化数据、需要精细控制的分布式计算(如自定义分区策略)。

2. DataFrame

  • 定义
    DataFrame 是基于 RDD 构建的结构化数据抽象,类似于关系型数据库的表或 Pandas 的 DataFrame。在 Spark 1.3 中引入。

  • 特点

    • Schema 约束:数据具有明确的结构(列名、数据类型),通过 Row 对象表示行。

    • 优化执行:利用 Catalyst 优化器和 Tungsten 执行引擎,自动优化查询计划。

    • API 类型:弱类型 API(列名在运行时检查),支持 SQL 语法。

    • 跨语言支持:在 Java、Scala、Python、R 中接口一致。

  • 适用场景
    结构化/半结构化数据(如 JSON、CSV)、SQL 式查询、需要自动优化的批处理。

3. Dataset

  • 定义
    Dataset 是 Spark 1.6 引入的 API,结合了 RDD 的强类型特性和 DataFrame 的优化引擎。仅在 Scala 和 Java 中可用。

  • 特点

    • 强类型 + 结构化:兼具 RDD 的类型安全(如 Dataset[User])和 DataFrame 的优化能力。

    • 统一 API:与 DataFrame API 兼容(DataFrame = Dataset[Row])。

    • 编码器(Encoder):使用高效的二进制序列化(优于 Java 序列化)。

  • 适用场景
    需要类型安全的复杂业务逻辑、结合函数式和关系式操作的场景。

核心区别对比

特性 RDD DataFrame Dataset
数据类型 任意对象(强类型) 结构化的 Row 对象(弱类型) 强类型对象(如 Dataset[User]
序列化 Java 序列化(较慢) Tungsten 二进制编码(高效) Encoder 序列化(高效)
优化 无自动优化,需手动调优 Catalyst 优化器自动优化 Catalyst 优化器自动优化
API 风格 函数式(mapfilter 声明式(SQL/DSL) 混合式(强类型 API + DSL)
类型安全 编译时类型检查 运行时类型检查 编译时类型检查
语言支持 所有 Spark 语言 所有 Spark 语言 仅 Scala/Java

 演进关系

  • Spark 1.x:RDD → DataFrame(为结构化数据优化)。

  • Spark 2.x:DataFrame 和 Dataset 统一为 Dataset[T](DataFrame = Dataset[Row])。

如何选择?

  1. 优先用 DataFrame/Dataset
    大多数场景下,结构化数据处理更高效(Catalyst 优化 + Tungsten)。

  2. 需要类型安全时用 Dataset
    如 Scala/Java 中复杂业务逻辑。

  3. 仅底层控制时用 RDD
    如自定义分区、非结构化数据,或需直接操作分布式数据。

代码示例

spark-submit  参数

在 Spark 中,spark-submit 是提交作业到集群的核心命令。以下是常用参数及其作用,分为 基础参数资源参数和 调优参数

一、基础参数

参数 说明 示例
--master 指定集群模式 yarnlocal[*]spark://host:portk8s://...
--deploy-mode 部署模式(客户端或集群) client(默认)或 cluster(适合生产)
--class 主类名(含包路径) --class com.example.MainApp
--name 作业名称(显示在集群UI) --name "My Spark Job"
--files 上传文件到 Executor(如配置文件) --files config.json
--jars 添加依赖的 JAR 包(逗号分隔) --jars lib1.jar,lib2.jar
--packages 从仓库自动下载依赖(Maven格式) --packages org.apache.kafka:kafka-clients:3.4.0

二、资源参数

参数 说明 示例 注意事项
--executor-memory 每个 Executor 的内存 --executor-memory 4g 需预留内存给系统和开销(如总内存的10%)
--driver-memory Driver 进程的内存 --driver-memory 2g 客户端模式下需本地足够内存
--num-executors Executor 数量 --num-executors 10 根据集群资源动态调整
--executor-cores 每个 Executor 的 CPU 核数 --executor-cores 2 总核数 = num-executors * executor-cores
--total-executor-cores 所有 Executor 的总核数(Standalone 模式) --total-executor-cores 20 优先级低于 num-executors

三、调优参数

参数 说明 示例 用途
--conf spark.serializer 指定序列化方式 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 优化序列化性能
--conf spark.sql.shuffle.partitions 调整 Shuffle 分区数 --conf spark.sql.shuffle.partitions=200 避免数据倾斜或分区过大
--conf spark.default.parallelism 默认并行度 --conf spark.default.parallelism=100 控制 RDD 的分区数
--conf spark.memory.fraction Executor 内存中用于执行和存储的比例 --conf spark.memory.fraction=0.6 调整内存分配策略
--conf spark.dynamicAllocation.enabled 启用动态资源分配 --conf spark.dynamicAllocation.enabled=true 按需增减 Executor(需集群支持)

示例命令

关键注意事项 

  1. 资源分配

    • 总内存和核数不能超过集群资源上限。

    • 在 YARN 模式下,--executor-memory 包含堆外内存,需预留约 10% 的额外内存(如申请 4g,实际可用约 4g * 0.9)。

  2. 动态资源分配

    • 启用 spark.dynamicAllocation.enabled=true 时需配置 spark.shuffle.service.enabled=true(YARN 需启动 Shuffle Service)。

  3. 依赖管理

    • 优先使用 --packages 自动下载依赖,避免手动传 JAR。

    • 本地依赖用 --jars,集群依赖需预先上传到 HDFS 或共享存储。

  4. 日志与调试

    • 添加 --conf spark.eventLog.enabled=true 记录事件日志。

    • 在客户端模式下,Driver 日志输出到控制台;集群模式下需通过集群 UI 查看。

 参数优化场景

  • 数据倾斜:增大 spark.sql.shuffle.partitions 或使用 repartition

  • OOM 错误:增加 executor-memory 或调整 spark.memory.fraction

  • CPU 瓶颈:增加 num-executors 或 executor-cores

  • 网络超时:调整 spark.network.timeout(默认 120s)。

hive

Hive 是构建在 Hadoop 生态系统之上的数据仓库工具,旨在简化大规模数据的查询和管理。它通过类 SQL 语法(HiveQL)将结构化数据操作转化为 MapReduce、Tez 或 Spark 任务,适合处理海量数据(如日志、用户行为等)。以下是其核心概念和用法: 

Hive 核心特性 

特性 描述
SQL-like 语法 支持类似 SQL 的查询语言(HiveQL),降低大数据处理的学习成本。
数据存储 数据存储在 HDFS(Hadoop 分布式文件系统)中,支持多种文件格式(如 ORC、Parquet)。
元数据管理 使用 Metastore(如 MySQL)存储表结构、分区等元信息。
扩展性 支持自定义函数(UDF)、SerDe(序列化/反序列化工具)等扩展功能。
批处理 基于 MapReduce 或 Tez 引擎,适合离线批处理,不适用于实时查询

 

Hive 架构 

  1. 用户接口
    CLI、JDBC、Web UI 等工具提交 HiveQL 查询。

  2. Driver
    解析查询,生成执行计划,管理任务生命周期。

  3. 编译器
    将 HiveQL 转换为 MapReduce/Tez/Spark 任务。

  4. 元数据存储
    Metastore 存储表结构、分区、字段类型等信息。

  5. 执行引擎
    运行编译后的任务,读写 HDFS 数据。

 Hive 数据模型

表(Table)
类似关系型数据库的表,支持内部表(数据由 Hive 管理)和外部表(数据由用户管理)。

CREATE TABLE users (id INT, name STRING) STORED AS ORC; 

分区(Partition) 

按某一列的值划分数据目录,加速查询(如按日期分区)。 

CREATE TABLE logs (log_time STRING, content STRING) 
PARTITIONED BY (dt STRING);

分桶(Bucket) 

 按哈希值将数据分到多个文件,优化 JOIN 和采样效率。

CREATE TABLE orders (order_id INT, user_id INT) 
CLUSTERED BY (user_id) INTO 10 BUCKETS; 

Hive 应用场景 

 

场景 说明
离线数据分析 处理 TB/PB 级历史数据(如用户行为分析、日志统计)。
ETL 流程 清洗、转换数据后导入数据仓库(如将 CSV 转换为 ORC 格式)。
数据挖掘 结合机器学习库(如 Hive + Mahout)进行聚类、分类等操作。
报表生成 定时生成统计报表(如每日销售额汇总)。

Hive 优缺点

优点 缺点
易用性强(SQL 语法) 延迟高(分钟级响应,不适合实时查询)
可扩展性高(自定义 UDF) 不支持事务和行级更新(Hive 3 部分支持)
兼容 Hadoop 生态(HDFS、HBase 等) 需要优化分区和存储格式提升性能

 

Hive vs 传统数据库

对比项 Hive 传统数据库(如 MySQL)
数据规模 支持 PB 级数据 适合 GB/TB 级数据
响应速度 高延迟(批处理) 低延迟(实时查询)
事务支持 有限支持(Hive 3+) 完整支持 ACID
存储与计算 分离(HDFS + 计算引擎) 耦合(本地存储 + 计算

 

Hive 使用示例 

创建表并加载数据 

CREATE EXTERNAL TABLE user_logs (
  ip STRING,
  url STRING,
  time STRING
) PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/hive/data/user_logs';

LOAD DATA INPATH '/input/log_20231001.txt' INTO TABLE user_logs PARTITION (dt='2023-10-01');

 聚合查询

SELECT dt, COUNT(*) AS pv 
FROM user_logs 
WHERE dt BETWEEN '2023-10-01' AND '2023-10-07'
GROUP BY dt; 

连接多个表

 SELECT u.name, SUM(o.amount)
FROM orders o
JOIN users u ON o.user_id = u.id
GROUP BY u.name;

生态工具 

  • Hive Metastore:独立元数据服务(供 Spark、Presto 等共用)。

  • Hive on Spark:用 Spark 替代 MapReduce 提升计算速度。

  • Hive LLAP(Live Long and Process):低延迟交互式查询。

总结

  • 适用场景:离线批处理、海量数据仓库管理。

  • 替代方案:实时查询用 Impala 或 Presto;复杂分析用 Spark SQL

  • 学习建议:掌握 HiveQL 语法、分区优化和存储格式(ORC/Parquet)。