Spark读写HBase表
摘要
本文介绍如何使用 Spark 2.3.2 实现对 HBase 1.4.8 表的读写操作,通过 Scala 语言将 CSV 数据写入 HBase,并利用 Spark SQL 分析数据。代码示例涵盖数据批量写入、全表扫描、数据类型转换及结构化查询,适合大数据开发人员快速掌握 Spark 与 HBase 的集成方法。
一、实验环境准备
1. 技术版本
- Spark:2.3.2
- HBase:1.4.8
- Scala:2.11
- 开发工具:IntelliJ IDEA
- 依赖管理:Maven
2. Maven 依赖配置
在 pom.xml
中添加以下依赖:
<dependencies>
<!-- Spark 核心与 SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<!-- HBase 客户端与 MapReduce 支持 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>1.4.8</version>
</dependency>
<!-- Hadoop 客户端(与 HBase 兼容) -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>
二、实验步骤
1. 数据准备
- 文件路径:
D:\\JavaProjects\\SparkAllProjects\\data\\emp.csv
- 数据格式(示例):
字段含义:7369,SMITH,CLERK,7902,1980-12-17,800,,20 7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30
员工ID,姓名,职位,上级ID,入职日期,薪资,奖金,部门ID
2. HBase 表结构设计
- 表名:
employee
- 列族:
info
- 列标识:
ename
:姓名job
:职位mgr
:上级IDhiredate
:入职日期salary
:薪资comm
:奖金deptNo
:部门ID
3. 代码实现
3.1 数据写入 HBase(writeDataToHBase
方法)
- 核心逻辑:
- 读取 CSV 文件为 DataFrame。
- 按分区遍历数据,批量创建
Put
对象。 - 通过 HBase 连接将数据写入表中,避免单条写入性能瓶颈。
- 核心代码
def writeDataToHBase(spark: SparkSession): Unit = { // 2.读取数据文件 val empDF = spark.read.csv("file:///D:\\JavaProjects\\SparkAllProjects\\data\\emp.csv") // 3.按照DataFrame分区写入HBase表中 empDF.foreachPartition(p => { // 3.1 配置HBase连接地址:初始化conf配置对象、配置zk连接地址及其端口 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "s1,s2,s3") conf.set("hbase.zookeeper.property.clientPort", "2181") // 3.2 在每个HBase节点中创建HBase的链接对象 val conn = ConnectionFactory.createConnection(conf) // 3.3 获取HBase目标表 val table = conn.getTable(TableName.valueOf(TABLE_NAME)) // 批量提交时,使用list列表存储put,当达到batchSize大小时提交一次 val batchSize = 14 // emp.csv就14条记录,所以设置成14条提交一次 var puts = List[Put]() // puts数组 // 3.4 将dataframe中的每个分区数据写入table表中 try { p.foreach(row => { // 3.5 获取每行中的各个列的数据 val empNo = row.getString(0) val ename = row.getString(1) val job = row.getString(2) val mgr = row.getString(3) val hireDate = row.getString(4) val salary = row.getString(5) val comm = row.getString(6) val deptNo = row.getString(7) var mgrStr = "0" if (mgr != null) { mgrStr = mgr } var commStr = "0.0" if (comm != null) { commStr = comm } // 3.6 设置rowkey:按照empNo val rowKey = Bytes.toBytes(empNo) // 3.7 创建Put对象,设置列族中的列和字段 val put = new Put(rowKey) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("ename"), Bytes.toBytes(ename)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("job"), Bytes.toBytes(job)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("mgr"), Bytes.toBytes(mgrStr)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("hiredate"), Bytes.toBytes(hireDate)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("salary"), Bytes.toBytes(salary)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("comm"), Bytes.toBytes(commStr)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("deptNo"), Bytes.toBytes(deptNo)) // 3.8 将该Put对象加入Table对象中 puts = put :: puts if (puts.size >= batchSize) { // 转成Java中的ArrayList val javaLists = new util.ArrayList[Put](puts.size) puts.foreach(javaLists.add) // table调用put添加javaLists table.put(javaLists) // 添加完成后,清空 puts = List[Put]() } // 处理剩余数据: 当不满足14条数据时,把剩余数据写入HBase表中 if (puts.nonEmpty) { val javaLists = new util.ArrayList[Put](puts.size) puts.foreach(javaLists.add) table.put(javaLists) } }) } finally { // 确保资源释放 if (table != null) table.close() if (conn != null) conn.close() } }) }
3.2 数据读取与分析(readHBaseData
方法
- 核心逻辑:
- 使用
ResultScanner
全表扫描 HBase 数据。 - 将二进制数据转换为样例类
Employee
,自动推断 DataFrame 的 Schema。 - 通过 Spark SQL 执行聚合查询(如按部门统计薪资总和)。
- 使用
- 核心代码:
case class Employee( employee_id: Int, employee_name: String, job_title: String, manager_id: Int, hire_date: String, salary: Double, bonus: Double, department_id: Int ) def readHBaseData(spark: SparkSession): DataFrame = { // 1. 配置 HBase 连接参数 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "s1,s2,s3") // 替换为你的 ZK 地址 conf.set("hbase.zookeeper.property.clientPort", "2181") // 2. 创建 HBase 连接和表对象 val conn = ConnectionFactory.createConnection(conf) val table = conn.getTable(TableName.valueOf(TABLE_NAME)) var scanner: ResultScanner = null try { // 3. 构造扫描器(Scan)并配置 val scan = new Scan() .addFamily(Bytes.toBytes(CF_NAME)) // 读取指定列族下的所有列 .setCaching(500) // 提升批量读取性能 .setCacheBlocks(false) // 4. 获取扫描结果迭代器 scanner = table.getScanner(scan) // 5. 遍历结果并转换为 Employee 对象 val employees = ListBuffer[Employee]() val it = scanner.iterator() while (it.hasNext) { val result: Result = it.next() // 提取行键(假设 rowkey 是 employee_id 的字符串形式) val rowKeyStr = Bytes.toString(result.getRow) val employeeId = rowKeyStr.toInt // 转换为 Int(需确保 rowkey 是数字) // 提取各列数据(根据 HBase 实际存储的列名调整) val name = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("ename"))) val job = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("job"))) val mgrStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("mgr"))) val hireDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("hiredate"))) val salaryStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("salary"))) val commStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("comm"))) val deptNoStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("deptNo"))) // 处理可能的空值或转换异常(示例:默认值为 0) val managerId = if (mgrStr != null && mgrStr.nonEmpty) mgrStr.toInt else 0 val salary = if (salaryStr != null && salaryStr.nonEmpty) salaryStr.toDouble else 0 val bonus = if (commStr != null && commStr.nonEmpty) commStr.toDouble else 0 val departmentId = if (deptNoStr != null && deptNoStr.nonEmpty) deptNoStr.toInt else 0 // 创建 Employee 对象并添加到列表 employees += Employee( employee_id = employeeId, employee_name = name, job_title = job, manager_id = managerId, hire_date = hireDate, salary = salary, bonus = bonus, department_id = departmentId ) } // 6. 将 Employee 列表转换为 DataFrame(自动推断 Schema) import spark.implicits._ spark.createDataFrame(employees.toList) } catch { case e: Exception => println(s"读取 HBase 数据失败: ${e.getMessage}") throw e // 抛异常终止流程 } finally { // 7. 释放所有资源(关键!避免连接泄漏) if (scanner != null) scanner.close() if (table != null) table.close() if (conn != null) conn.close() } }
3.3 Spark SQL 分析
- 读取数据后,通过 Spark SQL 执行聚合查询:
val df = readHBaseData(spark) df.createOrReplaceTempView("emp") spark.sql(""" SELECT department_id, SUM(salary + bonus) AS total FROM emp GROUP BY department_id ORDER BY total DESC """).show(false)
- 其他分析查询请自行操作。
3.4 完整代码
- 在IDEA中创建名为WriteAndReadDataToHBase单例对象
- 添加如下完成代码:
package com.lpssfxy.spark.datasource import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Result, ResultScanner, Scan} import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.{DataFrame, SparkSession} import java.util import scala.collection.mutable.ListBuffer case class Employee( employee_id: Int, // 对应 HBase rowkey(假设 rowkey 是员工ID) employee_name: String, // 对应列族 info:ename job_title: String, // 对应列族 info:job manager_id: Int, // 对应列族 info:mgr hire_date: String, // 对应列族 info:hiredate salary: Double, // 对应列族 info:salary bonus: Double, // 对应列族 info:comm department_id: Int // 对应列族 info:deptNo ) /** * spark读写HBase表employee */ object WriteAndReadDataToHBase { // 定义表名称及其列族名称 private val TABLE_NAME = "employee" private val CF_NAME = "info" def main(args: Array[String]): Unit = { // 1. 准备环境:SparkSession初始化 val spark = SparkSession .builder() .appName("WriteDataToHBase") .master("local[*]") .getOrCreate() // 2.调用writeDataToHBase方法将文件写入HBase中 //writeDataToHBase(spark) // 3. 读HBase数据 val df = readHBaseData(spark) //df.show(false) df.createOrReplaceTempView("emp") spark.sql("select department_id,sum(salary+bonus) as total from emp group by department_id order by total desc").show(false) // 4.停止SparkSession对象,释放资源 spark.stop() } /** * 写数据到HBase表中 * @param spark */ def writeDataToHBase(spark: SparkSession): Unit = { // 2.读取数据文件 val empDF = spark.read.csv("file:///D:\\JavaProjects\\SparkAllProjects\\data\\emp.csv") // 3.按照DataFrame分区写入HBase表中 empDF.foreachPartition(p => { // 3.1 配置HBase连接地址:初始化conf配置对象、配置zk连接地址及其端口 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "s1,s2,s3") conf.set("hbase.zookeeper.property.clientPort", "2181") // 3.2 在每个HBase节点中创建HBase的链接对象 val conn = ConnectionFactory.createConnection(conf) // 3.3 获取HBase目标表 val table = conn.getTable(TableName.valueOf(TABLE_NAME)) // 批量提交时,使用list列表存储put,当达到batchSize大小时提交一次 val batchSize = 14 // emp.csv就14条记录,所以设置成14条提交一次 var puts = List[Put]() // puts数组 // 3.4 将dataframe中的每个分区数据写入table表中 try { p.foreach(row => { // 3.5 获取每行中的各个列的数据 val empNo = row.getString(0) val ename = row.getString(1) val job = row.getString(2) val mgr = row.getString(3) val hireDate = row.getString(4) val salary = row.getString(5) val comm = row.getString(6) val deptNo = row.getString(7) var mgrStr = "0" if (mgr != null) { mgrStr = mgr } var commStr = "0.0" if (comm != null) { commStr = comm } // 3.6 设置rowkey:按照empNo val rowKey = Bytes.toBytes(empNo) // 3.7 创建Put对象,设置列族中的列和字段 val put = new Put(rowKey) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("ename"), Bytes.toBytes(ename)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("job"), Bytes.toBytes(job)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("mgr"), Bytes.toBytes(mgrStr)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("hiredate"), Bytes.toBytes(hireDate)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("salary"), Bytes.toBytes(salary)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("comm"), Bytes.toBytes(commStr)) put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("deptNo"), Bytes.toBytes(deptNo)) // 3.8 将该Put对象加入Table对象中 puts = put :: puts if (puts.size >= batchSize) { // 转成Java中的ArrayList val javaLists = new util.ArrayList[Put](puts.size) puts.foreach(javaLists.add) // table调用put添加javaLists table.put(javaLists) // 添加完成后,清空 puts = List[Put]() } // 处理剩余数据: 当不满足14条数据时,把剩余数据写入HBase表中 if (puts.nonEmpty) { val javaLists = new util.ArrayList[Put](puts.size) puts.foreach(javaLists.add) table.put(javaLists) } }) } finally { // 确保资源释放 if (table != null) table.close() if (conn != null) conn.close() } }) } /** * 读取HBase表数据 * @param spark * @return */ def readHBaseData(spark: SparkSession): DataFrame = { // 1. 配置 HBase 连接参数 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "s1,s2,s3") // 替换为你的 ZK 地址 conf.set("hbase.zookeeper.property.clientPort", "2181") // 2. 创建 HBase 连接和表对象 val conn = ConnectionFactory.createConnection(conf) val table = conn.getTable(TableName.valueOf(TABLE_NAME)) var scanner: ResultScanner = null try { // 3. 构造扫描器(Scan)并配置 val scan = new Scan() .addFamily(Bytes.toBytes(CF_NAME)) // 读取指定列族下的所有列 .setCaching(500) // 提升批量读取性能 .setCacheBlocks(false) // 4. 获取扫描结果迭代器 scanner = table.getScanner(scan) // 5. 遍历结果并转换为 Employee 对象 val employees = ListBuffer[Employee]() val it = scanner.iterator() while (it.hasNext) { val result: Result = it.next() // 提取行键(假设 rowkey 是 employee_id 的字符串形式) val rowKeyStr = Bytes.toString(result.getRow) val employeeId = rowKeyStr.toInt // 转换为 Int(需确保 rowkey 是数字) // 提取各列数据(根据 HBase 实际存储的列名调整) val name = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("ename"))) val job = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("job"))) val mgrStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("mgr"))) val hireDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("hiredate"))) val salaryStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("salary"))) val commStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("comm"))) val deptNoStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("deptNo"))) // 处理可能的空值或转换异常(示例:默认值为 0) val managerId = if (mgrStr != null && mgrStr.nonEmpty) mgrStr.toInt else 0 val salary = if (salaryStr != null && salaryStr.nonEmpty) salaryStr.toDouble else 0 val bonus = if (commStr != null && commStr.nonEmpty) commStr.toDouble else 0 val departmentId = if (deptNoStr != null && deptNoStr.nonEmpty) deptNoStr.toInt else 0 // 创建 Employee 对象并添加到列表 employees += Employee( employee_id = employeeId, employee_name = name, job_title = job, manager_id = managerId, hire_date = hireDate, salary = salary, bonus = bonus, department_id = departmentId ) } // 6. 将 Employee 列表转换为 DataFrame(自动推断 Schema) import spark.implicits._ spark.createDataFrame(employees.toList) } catch { case e: Exception => println(s"读取 HBase 数据失败: ${e.getMessage}") throw e // 抛异常终止流程 } finally { // 7. 释放所有资源(关键!避免连接泄漏) if (scanner != null) scanner.close() if (table != null) table.close() if (conn != null) conn.close() } } }
三、实验结果
1. 数据写入验证
- 控制台输出无异常日志,HBase 表
employee
中生成对应 rowkey 的记录。 - 通过 HBase Shell 命令
scan 'employee'
可查看数据:hbase> scan 'employee', {LIMIT => 2} ROW COLUMN+CELL 7369 column=info:ename, timestamp=... value=SMITH 7369 column=info:job, timestamp=... value=CLERK # ...(其他列省略)
2. 数据读取与分析结果
原始数据展示:
+-----------+-------------+-----------+-----------+----------+------+-----+-------------+ |employee_id|employee_name|job_title |manager_id |hire_date |salary|bonus|department_id| +-----------+-------------+-----------+-----------+----------+------+-----+-------------+ |7369 |SMITH |CLERK |7902 |1980-12-17|800.0 |0.0 |20 | |7499 |ALLEN |SALESMAN |7698 |1981-02-20|1600.0|300.0|30 | # ...(其他行省略)
Spark SQL 聚合结果:
+-------------+------------+ |department_id|total | +-------------+------------+ |30 |20150.0 | |20 |10875.0 | |10 |8750.0 | +-------------+------------+
四、常见问题与优化
1. 空值处理
- HBase 列值为
null
时,result.getValue
返回null
,需通过Option
或判空逻辑处理:val commStr = Option(result.getValue(...)).map(Bytes.toString).getOrElse("0.0")
2. 性能优化
- 批量写入:调整
batchSize
(建议 500-2000),减少 RPC 调用次数。 - 扫描缓存:通过
scan.setCaching(500)
提升全表扫描效率。 - 数据类型:避免过度使用字符串类型,对数值字段直接存储二进制数据(如
Bytes.toDouble
)。
3. 集群配置(可选做)
- 在
spark-submit
中添加 HBase 配置文件(如hbase-site.xml
),确保 Executor 节点访问 ZooKeeper:spark-submit --files hbase-site.xml ...
五、总结
本文通过实际案例演示了 Spark 与 HBase 的集成流程,实现了从 CSV 数据写入 HBase 到结构化数据分析的完整链路。核心要点包括:
- 使用
foreachPartition
实现批量写入,避免单条操作性能损耗。 - 通过
ResultScanner
和样例类映射,简化 HBase 数据到 DataFrame 的转换。 - 利用 Spark SQL 对 HBase 数据进行高效分析,发挥分布式计算优势。
该实验适用于海量结构化数据的存储与分析场景,可进一步扩展至实时数据处理或机器学习模型训练。