package com.oneday.hbase.model;
import java.util.Map;
import java.util.HashMap;
public class HBaseEntity {
private String rowKey;
private String tableName;
private Map<String, Map<String, String>> familyQualifierValue;
private long timestamp;
public HBaseEntity() {
this.familyQualifierValue = new HashMap<>();
this.timestamp = System.currentTimeMillis();
}
public HBaseEntity(String tableName, String rowKey) {
this();
this.tableName = tableName;
this.rowKey = rowKey;
}
// 添加列族和列
public void addColumn(String family, String qualifier, String value) {
familyQualifierValue.computeIfAbsent(family, k -> new HashMap<>()).put(qualifier, value);
}
// 获取列值
public String getColumnValue(String family, String qualifier) {
Map<String, String> qualifiers = familyQualifierValue.get(family);
return qualifiers != null ? qualifiers.get(qualifier) : null;
}
// Getters and Setters
public String getRowKey() { return rowKey; }
public void setRowKey(String rowKey) { this.rowKey = rowKey; }
public String getTableName() { return tableName; }
public void setTableName(String tableName) { this.tableName = tableName; }
public Map<String, Map<String, String>> getFamilyQualifierValue() { return familyQualifierValue; }
public void setFamilyQualifierValue(Map<String, Map<String, String>> familyQualifierValue) {
this.familyQualifierValue = familyQualifierValue;
}
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
@Override
public String toString() {
return "HBaseEntity{" +
"rowKey='" + rowKey + '\'' +
", tableName='" + tableName + '\'' +
", familyQualifierValue=" + familyQualifierValue +
", timestamp=" + timestamp +
'}';
}
}
package com.oneday.hbase.service;
import com.oneday.hbase.model.HBaseEntity;
import org.apache.hadoop.hbase.client.Result;
import java.util.List;
import java.util.Map;
public interface HBaseService {
/**
* 创建表
*/
boolean createTable(String tableName, String... columnFamilies);
/**
* 删除表
*/
boolean deleteTable(String tableName);
/**
* 检查表是否存在
*/
boolean tableExists(String tableName);
/**
* 插入或更新数据
*/
boolean put(HBaseEntity entity);
/**
* 批量插入或更新数据
*/
boolean putBatch(List<HBaseEntity> entities);
/**
* 根据rowKey获取数据
*/
HBaseEntity get(String tableName, String rowKey);
/**
* 根据rowKey和列族获取数据
*/
HBaseEntity get(String tableName, String rowKey, String family);
/**
* 根据rowKey、列族和列获取数据
*/
String get(String tableName, String rowKey, String family, String qualifier);
/**
* 扫描表数据
*/
List<HBaseEntity> scan(String tableName);
/**
* 根据条件扫描表数据
*/
List<HBaseEntity> scan(String tableName, String startRow, String stopRow);
/**
* 根据前缀扫描数据
*/
List<HBaseEntity> scanByPrefix(String tableName, String prefix);
/**
* 删除数据
*/
boolean delete(String tableName, String rowKey);
/**
* 删除指定列族的数据
*/
boolean delete(String tableName, String rowKey, String family);
/**
* 删除指定列的数据
*/
boolean delete(String tableName, String rowKey, String family, String qualifier);
/**
* 批量删除数据
*/
boolean deleteBatch(String tableName, List<String> rowKeys);
/**
* 获取表的行数
*/
long count(String tableName);
}
package com.oneday.hbase.service.impl;
import com.oneday.hbase.exception.HBaseOperationException;
import com.oneday.hbase.model.HBaseEntity;
import com.oneday.hbase.service.HBaseService;
import com.oneday.hbase.util.HBaseConnectionManager;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service
public class HBaseServiceImpl implements HBaseService {
private static final Logger logger = LoggerFactory.getLogger(HBaseServiceImpl.class);
@Autowired
private HBaseConnectionManager connectionManager;
@Override
public boolean createTable(String tableName, String... columnFamilies) {
if (columnFamilies == null || columnFamilies.length == 0) {
throw new IllegalArgumentException("至少需要一个列族");
}
try (Admin admin = connectionManager.getConnection().getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (admin.tableExists(table)) {
logger.warn("表 {} 已存在", tableName);
return false;
}
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
for (String family : columnFamilies) {
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(family))
.setMaxVersions(3)
.setTimeToLive(Integer.MAX_VALUE)
.build();
builder.setColumnFamily(familyDescriptor);
}
admin.createTable(builder.build());
logger.info("成功创建表: {}", tableName);
return true;
} catch (IOException e) {
logger.error("创建表失败: {}", tableName, e);
throw new HBaseOperationException("CREATE_TABLE", tableName, null, "创建表失败", e);
}
}
@Override
public boolean deleteTable(String tableName) {
try (Admin admin = connectionManager.getConnection().getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (!admin.tableExists(table)) {
logger.warn("表 {} 不存在", tableName);
return false;
}
if (admin.isTableEnabled(table)) {
admin.disableTable(table);
}
admin.deleteTable(table);
logger.info("成功删除表: {}", tableName);
return true;
} catch (IOException e) {
logger.error("删除表失败: {}", tableName, e);
throw new HBaseOperationException("DELETE_TABLE", tableName, null, "删除表失败", e);
}
}
@Override
public boolean tableExists(String tableName) {
try (Admin admin = connectionManager.getConnection().getAdmin()) {
return admin.tableExists(TableName.valueOf(tableName));
} catch (IOException e) {
logger.error("检查表是否存在失败: {}", tableName, e);
throw new HBaseOperationException("TABLE_EXISTS", tableName, null, "检查表是否存在失败", e);
}
}
@Override
public boolean put(HBaseEntity entity) {
if (entity == null || entity.getRowKey() == null || entity.getTableName() == null) {
throw new IllegalArgumentException("实体、rowKey和tableName不能为空");
}
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(entity.getTableName()))) {
Put put = new Put(Bytes.toBytes(entity.getRowKey()));
for (Map.Entry<String, Map<String, String>> familyEntry : entity.getFamilyQualifierValue().entrySet()) {
String family = familyEntry.getKey();
for (Map.Entry<String, String> qualifierEntry : familyEntry.getValue().entrySet()) {
String qualifier = qualifierEntry.getKey();
String value = qualifierEntry.getValue();
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier),
entity.getTimestamp(), Bytes.toBytes(value));
}
}
table.put(put);
logger.debug("成功插入数据: table={}, rowKey={}", entity.getTableName(), entity.getRowKey());
return true;
} catch (IOException e) {
logger.error("插入数据失败: table={}, rowKey={}", entity.getTableName(), entity.getRowKey(), e);
throw new HBaseOperationException("PUT", entity.getTableName(), entity.getRowKey(), "插入数据失败", e);
}
}
@Override
public boolean putBatch(List<HBaseEntity> entities) {
if (entities == null || entities.isEmpty()) {
return true;
}
// 按表名分组
Map<String, List<HBaseEntity>> tableGroups = new HashMap<>();
for (HBaseEntity entity : entities) {
tableGroups.computeIfAbsent(entity.getTableName(), k -> new ArrayList<>()).add(entity);
}
try {
for (Map.Entry<String, List<HBaseEntity>> entry : tableGroups.entrySet()) {
String tableName = entry.getKey();
List<HBaseEntity> tableEntities = entry.getValue();
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
List<Put> puts = new ArrayList<>();
for (HBaseEntity entity : tableEntities) {
Put put = new Put(Bytes.toBytes(entity.getRowKey()));
for (Map.Entry<String, Map<String, String>> familyEntry : entity.getFamilyQualifierValue().entrySet()) {
String family = familyEntry.getKey();
for (Map.Entry<String, String> qualifierEntry : familyEntry.getValue().entrySet()) {
String qualifier = qualifierEntry.getKey();
String value = qualifierEntry.getValue();
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier),
entity.getTimestamp(), Bytes.toBytes(value));
}
}
puts.add(put);
}
table.put(puts);
logger.debug("成功批量插入数据: table={}, count={}", tableName, puts.size());
}
}
return true;
} catch (IOException e) {
logger.error("批量插入数据失败", e);
throw new HBaseOperationException("PUT_BATCH", null, null, "批量插入数据失败", e);
}
}
@Override
public HBaseEntity get(String tableName, String rowKey) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
if (result.isEmpty()) {
return null;
}
return convertResultToEntity(tableName, result);
} catch (IOException e) {
logger.error("获取数据失败: table={}, rowKey={}", tableName, rowKey, e);
throw new HBaseOperationException("GET", tableName, rowKey, "获取数据失败", e);
}
}
@Override
public HBaseEntity get(String tableName, String rowKey, String family) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Get get = new Get(Bytes.toBytes(rowKey));
get.addFamily(Bytes.toBytes(family));
Result result = table.get(get);
if (result.isEmpty()) {
return null;
}
return convertResultToEntity(tableName, result);
} catch (IOException e) {
logger.error("获取数据失败: table={}, rowKey={}, family={}", tableName, rowKey, family, e);
throw new HBaseOperationException("GET", tableName, rowKey, "获取数据失败", e);
}
}
@Override
public String get(String tableName, String rowKey, String family, String qualifier) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result result = table.get(get);
if (result.isEmpty()) {
return null;
}
byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
return value != null ? Bytes.toString(value) : null;
} catch (IOException e) {
logger.error("获取数据失败: table={}, rowKey={}, family={}, qualifier={}",
tableName, rowKey, family, qualifier, e);
throw new HBaseOperationException("GET", tableName, rowKey, "获取数据失败", e);
}
}
@Override
public List<HBaseEntity> scan(String tableName) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Scan scan = new Scan();
scan.setCaching(1000); // 设置缓存大小
try (ResultScanner scanner = table.getScanner(scan)) {
List<HBaseEntity> entities = new ArrayList<>();
for (Result result : scanner) {
entities.add(convertResultToEntity(tableName, result));
}
return entities;
}
} catch (IOException e) {
logger.error("扫描表失败: table={}", tableName, e);
throw new HBaseOperationException("SCAN", tableName, null, "扫描表失败", e);
}
}
@Override
public List<HBaseEntity> scan(String tableName, String startRow, String stopRow) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(stopRow));
scan.setCaching(1000);
try (ResultScanner scanner = table.getScanner(scan)) {
List<HBaseEntity> entities = new ArrayList<>();
for (Result result : scanner) {
entities.add(convertResultToEntity(tableName, result));
}
return entities;
}
} catch (IOException e) {
logger.error("扫描表失败: table={}, startRow={}, stopRow={}", tableName, startRow, stopRow, e);
throw new HBaseOperationException("SCAN", tableName, null, "扫描表失败", e);
}
}
@Override
public List<HBaseEntity> scanByPrefix(String tableName, String prefix) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Scan scan = new Scan();
scan.setFilter(new PrefixFilter(Bytes.toBytes(prefix)));
scan.setCaching(1000);
try (ResultScanner scanner = table.getScanner(scan)) {
List<HBaseEntity> entities = new ArrayList<>();
for (Result result : scanner) {
entities.add(convertResultToEntity(tableName, result));
}
return entities;
}
} catch (IOException e) {
logger.error("按前缀扫描表失败: table={}, prefix={}", tableName, prefix, e);
throw new HBaseOperationException("SCAN_PREFIX", tableName, null, "按前缀扫描表失败", e);
}
}
@Override
public boolean delete(String tableName, String rowKey) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
logger.debug("成功删除数据: table={}, rowKey={}", tableName, rowKey);
return true;
} catch (IOException e) {
logger.error("删除数据失败: table={}, rowKey={}", tableName, rowKey, e);
throw new HBaseOperationException("DELETE", tableName, rowKey, "删除数据失败", e);
}
}
@Override
public boolean delete(String tableName, String rowKey, String family) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addFamily(Bytes.toBytes(family));
table.delete(delete);
logger.debug("成功删除列族数据: table={}, rowKey={}, family={}", tableName, rowKey, family);
return true;
} catch (IOException e) {
logger.error("删除列族数据失败: table={}, rowKey={}, family={}", tableName, rowKey, family, e);
throw new HBaseOperationException("DELETE", tableName, rowKey, "删除列族数据失败", e);
}
}
@Override
public boolean delete(String tableName, String rowKey, String family, String qualifier) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
table.delete(delete);
logger.debug("成功删除列数据: table={}, rowKey={}, family={}, qualifier={}",
tableName, rowKey, family, qualifier);
return true;
} catch (IOException e) {
logger.error("删除列数据失败: table={}, rowKey={}, family={}, qualifier={}",
tableName, rowKey, family, qualifier, e);
throw new HBaseOperationException("DELETE", tableName, rowKey, "删除列数据失败", e);
}
}
@Override
public boolean deleteBatch(String tableName, List<String> rowKeys) {
if (rowKeys == null || rowKeys.isEmpty()) {
return true;
}
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
List<Delete> deletes = new ArrayList<>();
for (String rowKey : rowKeys) {
deletes.add(new Delete(Bytes.toBytes(rowKey)));
}
table.delete(deletes);
logger.debug("成功批量删除数据: table={}, count={}", tableName, deletes.size());
return true;
} catch (IOException e) {
logger.error("批量删除数据失败: table={}", tableName, e);
throw new HBaseOperationException("DELETE_BATCH", tableName, null, "批量删除数据失败", e);
}
}
@Override
public long count(String tableName) {
try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
Scan scan = new Scan();
scan.setFilter(new org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter());
long count = 0;
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
count++;
}
}
return count;
} catch (IOException e) {
logger.error("统计表行数失败: table={}", tableName, e);
throw new HBaseOperationException("COUNT", tableName, null, "统计表行数失败", e);
}
}
/**
* 将HBase Result转换为HBaseEntity
*/
private HBaseEntity convertResultToEntity(String tableName, Result result) {
HBaseEntity entity = new HBaseEntity(tableName, Bytes.toString(result.getRow()));
for (Cell cell : result.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long timestamp = cell.getTimestamp();
entity.addColumn(family, qualifier, value);
entity.setTimestamp(timestamp);
}
return entity;
}
}
package com.oneday.hbase;
import com.oneday.hbase.model.HBaseEntity;
import com.oneday.hbase.service.HBaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
public class HBaseServiceApplication implements CommandLineRunner {
@Autowired
private HBaseService hbaseService;
public static void main(String[] args) {
SpringApplication.run(HBaseServiceApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// 测试HBase操作
testHBaseOperations();
}
private void testHBaseOperations() {
String tableName = "user_info";
try {
// 1. 创建表
System.out.println("=== 创建表 ===");
boolean created = hbaseService.createTable(tableName, "basic", "contact");
System.out.println("创建表结果: " + created);
// 2. 插入数据
System.out.println("\n=== 插入数据 ===");
HBaseEntity user1 = new HBaseEntity(tableName, "user001");
user1.addColumn("basic", "name", "张三");
user1.addColumn("basic", "age", "25");
user1.addColumn("contact", "email", "zhangsan@example.com");
user1.addColumn("contact", "phone", "13800138000");
boolean putResult = hbaseService.put(user1);
System.out.println("插入数据结果: " + putResult);
// 3. 查询数据
System.out.println("\n=== 查询数据 ===");
HBaseEntity retrievedUser = hbaseService.get(tableName, "user001");
System.out.println("查询结果: " + retrievedUser);
// 4. 查询特定列
System.out.println("\n=== 查询特定列 ===");
String name = hbaseService.get(tableName, "user001", "basic", "name");
System.out.println("用户姓名: " + name);
// 5. 扫描表
System.out.println("\n=== 扫描表 ===");
List<HBaseEntity> allUsers = hbaseService.scan(tableName);
System.out.println("表中所有数据: " + allUsers);
// 6. 统计行数
System.out.println("\n=== 统计行数 ===");
long count = hbaseService.count(tableName);
System.out.println("表行数: " + count);
// 7. 删除数据
System.out.println("\n=== 删除数据 ===");
boolean deleteResult = hbaseService.delete(tableName, "user001");
System.out.println("删除数据结果: " + deleteResult);
} catch (Exception e) {
System.err.println("测试过程中发生错误: " + e.getMessage());
e.printStackTrace();
}
}
}