代码实现:
HBaseConnection.java
package com.peizheng.bigdata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class HBaseConnection {
public static Connection connection = null;
static {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
conf.set("hbase.zookeeper.property.clientPort","2181");
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void closeConnection(){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
HBaseOperation.java
package com.peizheng.bigdata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class HBaseOperation {
public static void createNameSpace(String namespace) throws IOException {
// 1 获取admin对象 另有Table对象
Admin admin = HBaseConnection.connection.getAdmin();
// 1.1 Builder类
NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);
// 1.2 添加需求,这里是添加了自定义的描述信息
//builder.addConfiguration("user","peizheng");
// 2 调用方法,创建命名空间
admin.createNamespace(builder.build());
// 3 关闭admin
admin.close();
}
public static void createTable(String name, String[] cols) throws IOException {
Admin admin = HBaseConnection.connection.getAdmin();
HTableDescriptor hTableDescriptor = new HTableDescriptor(name);
for (String col : cols) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col);
hColumnDescriptor.setMaxVersions(5);
hTableDescriptor.addFamily(hColumnDescriptor);
}
admin.createTable(hTableDescriptor);
admin.close();
}
public static void putCell(String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(value));
table.put(put);
table.close();
}
// 查询
// 单行读取
public static void getRow(String tableName, String rowKey) throws IOException {
Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
// Result -> Cell[]
Result result = table.get(get);
// cell存储非常底层
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String value = new String(CellUtil.cloneValue(cell));
String family = new String(CellUtil.cloneFamily(cell));
String colunm = new String(CellUtil.cloneQualifier(cell));
System.out.println(family + ":" + colunm + "," + value);
}
table.close();
}
public static void getCell(String tableName, String rowKey, String familyName, String columnName) throws IOException {
Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName),Bytes.toBytes(columnName));
// Result -> Cell[]
Result result = table.get(get);
// cell存储非常底层
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String value = new String(CellUtil.cloneValue(cell));
String family = new String(CellUtil.cloneFamily(cell));
String colunm = new String(CellUtil.cloneQualifier(cell));
System.out.println(family + ":" + colunm + "," + value);
}
table.close();
}
public static void scanRows(String tableName, String startRowKey, String endRowKey) throws IOException {
Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
// 指定起始的行 (包含)
scan.setStartRow(Bytes.toBytes(startRowKey));
// 指定结束的行 (默认不包含)
scan.setStopRow(Bytes.toBytes(endRowKey));
ResultScanner scanner = table.getScanner(scan);
// Result记录一行数据,Cell数组
// ResultScanner记录多行数据,Result数组
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String value = new String(CellUtil.cloneValue(cell));
String family = new String(CellUtil.cloneFamily(cell));
String colunm = new String(CellUtil.cloneQualifier(cell));
System.out.print(family + ":" + colunm + "," + value + "\t");
}
System.out.println();
}
table.close();
}
public static void filterScan(String tableName, String startRowKey, String endRowKey, String familyName, String columnName, String val) throws IOException {
Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
// 指定起始的行 (包含)
scan.setStartRow(Bytes.toBytes(startRowKey));
// 指定结束的行 (默认不包含)
scan.setStopRow(Bytes.toBytes(endRowKey));
FilterList filterList = new FilterList();
//设置过滤器
//
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
Bytes.toBytes(familyName),
Bytes.toBytes(columnName),
CompareFilter.CompareOp.LESS_OR_EQUAL,
Bytes.toBytes(val)
);
//添加过滤器
filterList.addFilter(singleColumnValueFilter);
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
// Result记录一行数据,Cell数组
// ResultScanner记录多行数据,Result数组
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String value = new String(CellUtil.cloneValue(cell));
String family = new String(CellUtil.cloneFamily(cell));
String colunm = new String(CellUtil.cloneQualifier(cell));
System.out.print(family + ":" + colunm + "," + value + "\t");
}
System.out.println();
table.close();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
conf.set("hbase.zookeeper.property.clientPort", "2181");
HBaseConnection.connection = ConnectionFactory.createConnection(conf);
String tableName = "temperature";
String[] cols = {"cf"};
if (!HBaseConnection.connection.getAdmin().tableExists(TableName.valueOf(tableName))) {
createTable(tableName, cols);
}
BufferedReader bufferedReader = new BufferedReader(new FileReader("F:/temperature.log"));
String line;
while ((line = bufferedReader.readLine()) != null) {
String[] splits = line.split(",");
String id = splits[0].trim();
String year = splits[1].trim();
String temperature = splits[2].trim();
String rowKey = id + ":" + year;
putCell(tableName, rowKey, "cf", "id", id);
putCell(tableName, rowKey, "cf", "year", year);
putCell(tableName, rowKey, "cf", "temperature", temperature);
}
bufferedReader.close();
HBaseConnection.closeConnection();
}
}
相关运行结果:
java程序运行结果:
hbase客户端运行结果:
scan 'temperature'
报错解决
一直运行中可能是设置连接的是ip,不是master,slave1,slave2,这种,可能报错Caused by: org.apache.hadoop.hbase.MasterNotRunningException: java.net.UnknownHostExce。在网上找了半天的原因也没有找到的话参考下面文章修改 windows的ssh配置文件:
ip,主机名供参考: