HBase的Java客户端应用和优化
HBase的Java客户端操作
-
- 创建Maven工程,添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
<scope>test</scope>
</dependency>
</dependencies> -
- 定义类,初始化Connection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.lagou.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.junit.Before;
import java.io.IOException;
public class HbaseClientDemo {
Configuration conf=null;
Connection conn=null;
HBaseAdmin admin =null;
public void init () throws IOException {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","Linux121,Linux122");
conf.set("hbase.zookeeper.property.clientPort","2181");
conn = ConnectionFactory.createConnection(conf);
}
public void destroy(){
if(admin!=null){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(conn !=null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} -
- 创建表,在上述类中添加方法
1
2
3
4
5
6
7
8
9
10
11
public void createTable() throws IOException {
admin = (HBaseAdmin) conn.getAdmin();
//创建表描述器
HTableDescriptor teacher = new HTableDescriptor(TableName.valueOf("teacher"));
//设置列族描述器
teacher.addFamily(new HColumnDescriptor("info"));
//执行创建操作
admin.createTable(teacher);
System.out.println("teacher表创建成功!!");
} -
- 插入数据,在上述类中添加方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//插入一条数据
public void putData() throws IOException {
//获取一个表对象
Table t = conn.getTable(TableName.valueOf("teacher"));
//设定rowkey
Put put = new Put(Bytes.toBytes("110"));
//列族,列,value
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("addr"), Bytes.toBytes("beijing"));
//执行插入
t.put(put);
// t.put();可以传入list批量插入数据
// 关闭table对象
t.close();
System.out.println("插入成功!!");
} -
- 删除数据,在上述类中添加方法
1
2
3
4
5
6
7
8
9
10
11
12
13//删除一条数据
public void deleteData() throws IOException {
//需要获取一个table对象
final Table worker = conn.getTable(TableName.valueOf("worker"));
//准备delete对象
final Delete delete = new Delete(Bytes.toBytes("110"));
//执行删除
worker.delete(delete);
//关闭table对象
worker.close();
System.out.println("删除数据成功!!");
} -
- 查询某个列族数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24//查询某个列族数据
public void getDataByCF() throws IOException {
//获取表对象
HTable teacher = (HTable) conn.getTable(TableName.valueOf("teacher"));
//创建查询的get对象
Get get = new Get(Bytes.toBytes("110"));
//指定列族信息
get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"));
get.addFamily(Bytes.toBytes("info"));
//执行查询
Result res = teacher.get(get);
Cell[] cells = res.rawCells();
//获取改行的所有cell对象
for (Cell cell : cells) {
//通过cell获取rowkey,cf,column,value
String cf = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
System.out.println(rowkey + "----" + cf + "---" + column + "---" + value);
}
//关闭表对象资源
teacher.close();
} -
- 通过Scan全表扫描
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// 全表扫描
public void scanAllData() throws IOException {
HTable teacher = (HTable) conn.getTable(TableName.valueOf("teacher"));
Scan scan = new Scan();
ResultScanner resultScanner = teacher.getScanner(scan);
for (Result result : resultScanner) {
Cell[] cells = result.rawCells();
//获取改行的所有cell对象
for (Cell cell : cells) {
//通过cell获取rowkey,cf,column,value
String cf = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
System.out.println(rowkey + "----" + cf + "--" + column + "---" + value);
}
}
teacher.close();
} -
- 通过startRowKey和endRowKey进行扫描
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// 通过startRowKey和endRowKey进行扫描查询
public void scanRowKey() throws IOException {
HTable teacher = (HTable) conn.getTable(TableName.valueOf("teacher"));
Scan scan = new Scan();
scan.setStartRow("0001".getBytes());
scan.setStopRow("2".getBytes());
ResultScanner resultScanner = teacher.getScanner(scan);
for (Result result : resultScanner) {
Cell[] cells = result.rawCells();
//获取改行的所有cell对象
for (Cell cell : cells) {
//通过cell获取rowkey,cf,column,value
String cf = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
System.out.println(rowkey + "----" + cf + "--" + column + "---" + value);
}
}
teacher.close();
}
Hbase 协处理器
协处理器概述
官方文档地址:http://hbase.apache.org/book.html#cp
访问HBase的方式是使用scan或get获取数据,在获取到的数据上进行业务运算。但是在数据量非常大的时候,比如一个有上亿行及十万个列的数据集,再按常用的方式移动获取数据就会遇到性能问题。客户端也需要有强大的计算能力以及足够的内存来处理这么多的数据。
此时就可以考虑使用Coprocessor(协处理器)。将业务运算代码封装到Coprocessor中并在RegionServer上运行,即在数据实际存储位置执行,最后将运算结果返回到客户端。利用协处理器,用户可以编写运行在 HBase Server 端的代码。
Hbase Coprocessor类似以下概念:
触发器和存储过程:一个Observer Coprocessor有些类似于关系型数据库中的触发器,通过它我们可以在一些事件(如Get或是Scan)发生前后执行特定的代码。Endpoint Coprocessor则类似于关系型数据库中的存储过程,因为它允许我们在RegionServer上直接对它存储的数据进行运算,而非是在客户端完成运算。
MapReduce:MapReduce的原则就是将运算移动到数据所处的节点。Coprocessor也是按照相同的原则去工作的。
AOP:如果熟悉AOP的概念的话,可以将Coprocessor的执行过程视为在传递请求的过程中对请求进行了拦截,并执行了一些自定义代码。
协处理器类型
- Observer
协处理器与触发器(trigger)类似:在一些特定事件发生时回调函数(也被称作钩子函数,hook)被执行。这些事件包括一些用户产生的事件,也包括服务器端内部自动产生的事件。
-
协处理器框架提供的接口如下
-
RegionObserver:用户可以用这种的处理器处理数据修改事件,它们与表的region联系紧密。
-
MasterObserver:可以被用作管理或DDL类型的操作,这些是集群级事件。
-
WALObserver:提供控制WAL的钩子函数
-
- Endpoint
这类协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器在Regionserver中执行一段代码,并将 RegionServer 端执行结果返回给客户端进一步处理。
- Endpoint常见用途
-
聚合操作
假设需要找出一张表中的最大数据,即 max 聚合操作,普通做法就是必须进行全表扫描,然后Client代码内遍历扫描结果,并执行求最大值的操作。这种方式存在的弊端是无法利用底层集群的并发运算能力,把所有计算都集中到 Client 端执行,效率低下。
使用Endpoint Coprocessor,用户可以将求最大值的代码部署到 HBase RegionServer 端,HBase会利用集群中多个节点的优势来并发执行求最大值的操作。也就是在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给Client。在Client进一步将多个 Region 的最大值汇总进一步找到全局的最大值。
Endpoint Coprocessor的应用我们后续可以借助于Phoenix非常容易就能实现。针对Hbase数据集进行聚合运算直接使用SQL语句就能搞定。
Observer 案例
-
需求
通过协处理器Observer实现Hbase当中t1表插入数据,指定的另一张表t2也需要插入相对应的数据
-
实现思路
通过Observer协处理器捕捉到t1插入数据时,将数据复制一份并保存到t2表中
-
开发步骤
-
- Hbase shell执行命令创建t1和t2表
1
2
3create 't1','info'
create 't2','info' -
- 在上述的maven工程中的pom文件添加依赖
1
2
3
4
5
6<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency> -
- 在上述的maven工程中编写Observer协处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package com.lagou.coprocessor;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class MyProcessor extends BaseRegionObserver {
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
//把自己需要执行的逻辑定义在此处,向t2表插入数据,数据具体是什么内容与Put一样
final HTableInterface t2 = e.getEnvironment().getTable(TableName.valueOf("t2"));
//解析t1表的插入对象put
final Cell cell = put.get(Bytes.toBytes("info"), Bytes.toBytes("name")).get(0);
//table对象.put
final Put put1 = new Put(put.getRow());
put1.add(cell);
t2.put(put1);
//执行向t2表插入数据
t2.close();
}
} -
- 打成Jar包,上传HDFS
1
2
3
4
5
6
7
8cd /opt/lagou/softwares
mv original-hbaseStudy-1.0-SNAPSHOT.jar processor.jar
hdfs dfs -mkdir -p /processor
hdfs dfs -put processor.jar /processor -
- 挂载协处理器
1
2
3
4
5
6hbase(main):056:0> describe 't1'
hbase(main):055:0> alter 't1',METHOD => 'table_att','Coprocessor'=>'hdfs://Linux121:9000/processor/processor.jar|com.lagou.hbase.processor.MyProcessor|1001|'
#再次查看't1'表
hbase(main):043:0> describe 't1' -
- 验证协处理器
向t1表中插入数据(shell方式验证)
1
2
3
4hbase(main):056:0> put 't1','rk1','info:name','lisi'
# 查看t2表是否也插入相同数据
hbase(main):057:0> get 't1','rk1','info:name' -
- 卸载协处理器
1
2
3
4
5hbase(main):058:0> disable 't1'
hbase(main):059:0> alter 't1',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
hbase(main):060:0> enable 't2'
-
HBase表的RowKey设计
RowKey的基本介绍
ASCII码字典顺序。如:
012,0,123,234,3
0,3,012,123,234
0,012,123,234,3
字典序的排序规则:先比较第一个字节,如果相同,然后比对第二个字节,以此类推,如果到第X个字节,其中一个已经超出了rowkey的长度,短rowkey排在前面。
RowKey长度原则
rowkey是一个二进制码流,可以是任意字符串,最大长度64kb,实际应用中一般为10-100bytes, 以byte[]形式保存,一般设计成定长。
建议越短越好,不要超过16个字节。设计过长会降低memstore内存的利用率和HFile存储数据的效率。
RowKey散列原则
建议将rowkey的高位作为散列字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。
RowKey唯一原则
必须在设计上保证其唯一性,
访问hbase table中的行:有3种方式:
-
单个rowkey
-
rowkey 的range
-
全表扫描(一定要避免全表扫描)
实现方式:
-
org.apache.hadoop.hbase.client.Get
-
scan方法: org.apache.hadoop.hbase.client.Scan
scan使用的时候注意:setStartRow,setEndRow 限定范围, 范围越小,性能越高。
RowKey排序原则
HBase的Rowkey是按照ASCII有序设计的,我们在设计Rowkey时要充分利用这点.
HBase表的热点
什么是热点
检索habse的记录首先要通过row key来定位数据行。当大量的client访问hbase集群的一个或少数几个节点,造成少数region server的读/写请求过多、负载过大,而其他region server负载却很小,就造成了“热点”现象
热点的解决方案
-
- 预分区
预分区的目的让表的数据可以均衡的分散在集群中,而不是默认只有一个region分布在集群的一 个节点上。
-
- 加盐
这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配 一个随机前缀以使得它和之前的rowkey的开头不同。
4个region,[,a),[a,b),[b,c),[c,]
原始数据:abc1,abc2,abc3.
加盐后的rowkey:a-abc1,b-abc2,c-abc3
abc1,a
abc2,b
-
- 哈希
哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测 的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据。
原始数据: abc1,abc2,abc3
哈希:
md5(abc1)=92231b…, 9223-abc1
md5(abc2) =32a131122…, 32a1-abc2
md5(abc3) = 452b1…, 452b-abc3
-
- 反转
反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部 分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。 15X,13X,
HBase的二级索引
HBase表按照rowkey查询性能是最高的。rowkey就相当于hbase表的一级索引!!
为了HBase的数据查询更高效、适应更多的场景,诸如使用非rowkey字段检索也能做到秒级响应,或者支持各个字段进行模糊查询和多字段组合查询等, 因此需要在HBase上面构建二级索引, 以满足现实中更复杂多样的业务需求。
hbase的二级索引其本质就是建立hbase表中列与行键之间的映射关系。
常见的二级索引我们一般可以借助各种其他的方式来实现,例如Phoenix或者solr或者ES等
布隆过滤器在hbase的应用
之前再讲hbase的数据存储原理的时候,我们知道hbase的读操作需要访问大量的文件,大部分的实现通过布隆过滤器来避免大量的读文件操作。
布隆过滤器的原理
通常判断某个元素是否存在用的可以选择hashmap。但是 HashMap 的实现也有缺点,例如存储容量占比高,考虑到负载因子的存在,通常空间是不能被用满的,而一旦你的值很多例如上亿的时候,那 HashMap 占据的内存大小就变得很可观了。
Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合。
hbase 中布隆过滤器来过滤指定的rowkey是否在目标文件,避免扫描多个文件。使用布隆过滤器来判断。
布隆过滤器返回true,在结果不一定正确,、如果返回false则说明确实不存在。
Bloom Filter案例
布隆过滤器,已经不需要自己实现,Google已经提供了非常成熟的实现。
-
- 在上述的maven工程中的pom文件添加依赖
1 | <dependency> |
-
- 使用,预估数据量1w,错误率需要减小到万分之一。使用如下代码进行创建。
1 | public static void main(String[] args) { |