hbase协处理器hbase observerr有几种

【HBase】1009-HBase的协处理器(coprocessor)统计行数 - CSDN博客
【HBase】1009-HBase的协处理器(coprocessor)统计行数
引入&的原因
HBase作为列族数据库无法建立“二级索引”,难以执行求和、计数、排序等操作。为解决这些问题,HBase0.92 之后引入协处理器(Coprocessor),实现一些新特性,能够轻易建立二次索引、复杂过滤器、以及访问控制。
协处理器两个插件
(1)观察者(observer)
提供三种观察者接口:
RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等。
WALObserver:提供WAL相关操作钩子。
MasterObserver:提供DDL-类型的操作钩子。如创建、删除、修改数据表等。
(2)终端(endpoint)
EndPoint协处理器
(1)ObServer协处理器:允许集群在正常的客户端操作过程中可以有不同的行表现!
(2)EndPoint协处理器:允许你扩展集群能力,对客户端应用开放新的运行命令,在RegionServer上执行
HBase的协处理器(coprocessor)统计函数
(1)在使用HBase的协处理器(coprocessor)之前,需要启动协处理器,有两种方案。
方案1:启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site.xml这个文件来实现
&property&
&name&hbase.coprocessor.user.region.classes&/name&
&value&org.apache.hadoop.hbase.coprocessor.AggregateImplementation&/value&
&/property&
方案2:启用表aggregation,只对特定的表生效。通过HBase Shell 来实现
create 'stu', {NAME =& 'info', VERSIONS =& 5}
1、disable指定表。
hbase& disable 'stu'
2、添加aggregation
hbase&alter 'stu', METHOD =& 'table_att','coprocessor'=&'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
3、重启指定表
hbase& enable 'stu'
(2)JAVA代码统计表中列族的行数
public class MyAggregationClient {
public static void main(String[] args) throws Throwable {
Configuration customConf = new Configuration();
customConf.set(&hbase.rootdir&, &hdfs://mycluster:8020/hbase&);
customConf.setStrings(&hbase.zookeeper.quorum&, &mycluster:2181&);
// 提高RPC通信时长
customConf.setLong(&hbase.rpc.timeout&, 600000);
// 设置Scan缓存
customConf.setLong(&hbase.client.scanner.caching&, 1000);
// 默认为9000毫秒
customConf.set(&zookeeper.session.timeout&, &180000&);
Configuration configuration = HBaseConfiguration.create(customConf);
AggregationClient aggregationClient = new AggregationClient(configuration);
Scan scan = new Scan();
// 指定扫描列族,唯一值
scan.addFamily(Bytes.toBytes(&info&));
long rowCount = aggregationClient.rowCount(TableName.valueOf(&stu&),new LongColumnInterpreter(), scan);
System.out.println(&row count is & + rowCount);
HBase的协处理器案例
协处理器其中的一个作用是使用Observer创建二级索引
本文已收录于以下专栏:
相关文章推荐
1.起因(Why HBase
Coprocessor)
HBase作为列族数据库最经常被人诟病的特性包括:无法轻易建立“二级索引”,难以执行求和、计数、排序等操作。比如,在旧版本的(统计数据表的总...
这几天为了满足业务需求,自己写了个Coprocessor,这里写一篇博客记录一下。
使用Coprocessor的目的是这样的,假如你的业务使你不得不进行全表查询,如果使用传统的Scan的话,那么全表查...
Hbase 协处理器 CoprocessorCoprocessor简介HBase 是一款基于 Hadoop 的 key-value 数据库,它提供了对 HDFS 上数据的高效随机读写服务,完美地填补了...
//using-hbase-coprocessor.html
HBase的Coprocessor是模仿谷歌BigTable的C...
在《HA Hadoop-2.7.3+Hbase-1.2.4搭建》的基础上,整理记录如何在eclipse上进行进行hbase java api的应用。
hadoop以及hbase都是搭建在ubuntu虚...
//using-hbase-coprocessor.html
HBase的Coprocessor是模仿谷歌BigTable的C...
1.关于filter的用法及说明参见这三篇博文,基本涵盖了绝大部分filter,很详细,还有实测代码,感谢博主的付出,特收藏.
HBase Filter:.cn...
前面说过,Hbase做聚合,分组之类的运算很不方便,更不要说join之类的,更是麻烦。有没有好点的办法呢?这里介绍下HBase--Coprocessor。Hbase有两种类型的Coprocessor,...
观察者的设计意图是允许用户通过插入代码来重载协处理器框架的upcall方法,而具体的事件触发的callback方法由HBase的核心代码来执行。协处理器框架处理所有的callback调用细节,协处理器...
public class TestAddColumnBaseRegionObserver extends BaseRegionObserver{
public void p...
他的最新文章
讲师:何宇健
讲师:董岩
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)hbase高级编程:hbase(新版) 0.985协处理器中的BaseRegionServerObserver - about云开发 - Powered by Discuz!
about云开发
标题: hbase高级编程:hbase(新版) 0.985协处理器中的BaseRegionServerObserver
作者: pig2& & 时间:
标题: hbase高级编程:hbase(新版) 0.985协处理器中的BaseRegionServerObserver 本帖最后由 pig2 于
07:07 编辑
1.协处理分为几种?
2.二者各有什么不同?
3.hbase coprocessor新版中做了哪些改变?
现在hbase的coprocessor有两种完全不同的实现,分别是observer模式与endpoint模式,它们分别对应两个issue。我们可以将observer模式看成数据库中的触发器,而endpoint可以看成是存储过程。
关于coprocessor我们可以从类继承关系上看到,如下图所示:
-10-31-41.jpg (55.27 KB, 下载次数: 13)
00:53 上传
共有旧版本中(0.92)三个Observer对象,即MasterObserver,RegionObserver和WALObserver,新版增加RegionServerObserver。它们的工作原理类似于钩子函数,在真实的函数实现前加入pre(),实现后加入post()方法,来实现对操作进行一些嵌入式的改变。
在新版本中,包括hbase0.96及hbase 0.985(其它版本尚未查看)增加了对象BaseRegionServerObserver
这个类功能:合并两个HRegion
hbase coprocessor的实现分为observer与endpoint,其中observer类似于触发器,主要在服务端工作,而endpoint类似于存储过程,主要在client端工作
observer可以实现权限管理、优先级设置、监控、ddl控制、二级索引等功能,而endpoint可以实现min、mas、avg、sum等功能
coprocessor可以动态加载
附上代码BaseRegionServerObserver代码:
/*
* Licensed under the Apache License, Version 2.0 (the &License&);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*& &&&http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an &AS IS& BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.
import java.io.IOE
import java.util.L
import org.apache.hadoop.classification.InterfaceA
import org.apache.hadoop.classification.InterfaceS
import org.apache.hadoop.hbase.CoprocessorE
import org.apache.hadoop.hbase.HBaseInterfaceA
import org.apache.hadoop.hbase.client.M
import org.apache.hadoop.hbase.regionserver.HR
* An abstract class that implements RegionServerObserver.
* By extending it, you can create your own region server observer without
* overriding all abstract methods of RegionServerObserver.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class BaseRegionServerObserver implements RegionServerObserver {
&&@Override
&&public void preStopRegionServer(ObserverContext&RegionServerCoprocessorEnvironment& env)
& && &throws IOException { }
&&@Override
&&public void start(CoprocessorEnvironment env) throws IOException { }
&&@Override
&&public void stop(CoprocessorEnvironment env) throws IOException { }
&&@Override
&&public void preMerge(ObserverContext&RegionServerCoprocessorEnvironment& ctx, HRegion regionA,
& && &HRegion regionB) throws IOException { }
&&@Override
&&public void postMerge(ObserverContext&RegionServerCoprocessorEnvironment& c, HRegion regionA,
& && &HRegion regionB, HRegion mergedRegion) throws IOException { }
&&@Override
&&public void preMergeCommit(ObserverContext&RegionServerCoprocessorEnvironment& ctx,
& && &HRegion regionA, HRegion regionB, List&Mutation& metaEntries) throws IOException { }
&&@Override
&&public void postMergeCommit(ObserverContext&RegionServerCoprocessorEnvironment& ctx,
& && &HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
&&@Override
&&public void preRollBackMerge(ObserverContext&RegionServerCoprocessorEnvironment& ctx,
& && &HRegion regionA, HRegion regionB) throws IOException { }
&&@Override
&&public void postRollBackMerge(ObserverContext&RegionServerCoprocessorEnvironment& ctx,
& && &HRegion regionA, HRegion regionB) throws IOException { }
}
复制代码
作者: mr.andy& & 时间:
楼主, 之前遇到一个面试题, 关于hbase, 怎么做多表关联查询?除了用hive同步hbase做映射之外, 还有其他办法吗?&&
作者: xiqiang_chen& & 时间:
赞一个~感谢分享~
作者: pig2& & 时间:
楼主, 之前遇到一个面试题, 关于hbase, 怎么做多表关联查询?除了用hive同步hbase做映射之外, 还有其他办法 ...
hbase多表关联查询很难做,目前没有完美的方案,可以看看二级索引。
作者: mr.andy& & 时间:
hbase多表关联查询很难做,目前没有完美的方案,可以看看二级索引。
好的 谢谢.
作者: ainubis& & 时间:
O(∩_∩)O谢谢分享
作者: 超超大猴子& & 时间:
版主,头像是谁啊?
作者: a_zhen& & 时间:
正在研究,真好啊
作者: yangyufans& & 时间:
传参之后如何接收
欢迎光临 about云开发 (/)
Powered by Discuz! X3.2Hbase 协处理器 Coprocessor - CSDN博客
Hbase 协处理器 Coprocessor
Hbase 协处理器 Coprocessor
Coprocessor简介
HBase 是一款基于 Hadoop 的 key-value 数据库,它提供了对 HDFS 上数据的高效随机读写服务,完美地填补了 Hadoop MapReduce 仅适于批处理的缺陷,利用协处理器,用户可以编写运行在 HBase Server 端的代码。HBase 支持两种类型的协处理器,Endpoint 和 Observer。
Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行,势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体的执行效率就会提高很多。
另外一种协处理器叫做 Observer Coprocessor,这种协处理器类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子,在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。
开发环境准备
Linux发行版系统,我的是centos6.5,要以下这三个主要的工具:
JDK 1.6 以上版本
Hbase 0.98
Google Protobuf 2.5.0
jdk的安装,Hadoop、zookeeper及HBase的安装此处不介绍。
安装 Google Protobuf
老版本的 HBase(即 HBase 0.96 之前) 采用 Hadoop RPC 进行进程间通信。在 HBase 0.96 版本中,引入了新的进程间通信机制 protobuf RPC,基于 Google 公司的 protocol buffer 开源软件。
下载 Protobuf 2.5.0 版本的源代码安装包
下载:wget href="/files/protobuf-2.5.0.tar.bz2
解压: tar&& -jxvf protobuf-2.5.0.tar.bz2
确保您已经安装了 gcc 和 gcc-c++ 包。
mkdir $HOME/tools/protobuf-2.5.0
./configure --prefix=$HOME/tools/protobuf-2.5.0
make install
编辑环境变量
加入 protoc 的路径
export PROTO_HOME=$HOME/tools/protobuf-2.5.0
export PATH=$PROTO_HOME:$PATH
安装 Maven
tar -xzvf apache-maven-3.3.9-bin.tar.gz
mv apache-maven-3.3.9 $HOME/tools
vi .bashrc
export MAVEN_HOME=$HOME/tools/apache-maven-3.3.9
export PATH=$MAVEN_HOME/bin:$PATH
开发 HBase 协处理器的流程
对于 Endpoint 类型的协处理器,其开发流程如下:
1、建立一个 Java 工程;
2、定义用户 ClientHBase 通信的 RFC,采用 Protobuf 语言和工具完成定义;
3、编写 HBase 协处理器的 Client 端和 Server 端代码;
其中,Client 端代码负责调用协处理器并处理返回结果,Server 端代码将运行在 Region Server 上,实现具体的任务;
4、对编译好的代码进行部署和测试。
对于 Observer 类型的协处理器,不需要定义 RPC,也不需要开发客户端代码。当相应的事件发生时,Observer 代码将自动在 Server 端执行。因此仅仅需要编写 Server 端的代码。
利用 Coprocessor 来实现获得单个 Region 的行数。
用 HBase Shell 的 count 命令来获取某张表的数据量。不过这是一个全表扫描过程,非常浪费资源,也很慢。
利用 Observer 协处理器在每一次 put 操作时,将统计该 Region 的行数,并保存在一个计数器中;在每一次 delete 操作时,将该计数器减 1。利用 Endpoint 协处理器,将该计数器的数值返回给 Client 端调用;为了在 Observer 和 Endpoint 协处理间共享行数计数器,我们将该计数器保存在 ZooKeeper 中。在客户端,调用 Endpoint 协处理器获取指定 Region 的行数计数器,并将所有的返回值求和即可。
创建 maven 工程
$ mkdir $PROJECT_HOME
用于存放该项目代码。
maven创建项目:
$ cd $PROJECT_HOME
$ mvn archetype:generate -DgroupId=org.ibm.developerworks -DartifactId=regionCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
Endpoint 协处理器
Endpiont 协处理器的工作是返回 Region 的行数计数器。
用 Protobuf 编写和定义 RPC
Endpoint 协处理器读取 Region 的行数计数器,然后将该值返回给调用的客户端。因此 RPC 需要一个整数类型的返回值代表行数。仅仅返回行数的情况下,客户端并不需要为 RPC 定义任何输入参数,不过为了演示输入和输出,我们额外为这个 RPC 设计了一个输入参数:reCount。这个参数是一个布尔变量,当为 true 时,表明用户需要 Endpoint 扫描遍历 Region 来计算行数;当其为 false,表示直接使用 Observer 协处理器维护的计数器。前者需要扫描整个 Region,非常慢;后者效率很高。
getRowCount RPC proto 定义
option java_package = "org.ibm.developerworks";
option java_outer_classname = "getRowCount";
option java_generic_services =
option optimize_for = SPEED;
message getRowCountRequest{
required bool reCount = 1;
message getRowCountResponse {
optional int64 rowCount = 1;
service ibmDeveloperWorksService {
rpc getRowCount(getRowCountRequest)
returns(getRowCountResponse);
将以上代码保存为文件 ibmDeveloperworksDemo.proto。可以看到,这里定义了一个 RPC,名字叫做 getRowCount。该 RPC 有一个入口参数,用消息 getRowCountRequest 表示;RPC 的返回值用消息 getRowCountResponse 表示。Service 是一个抽象概念,RPC 的 Server 端可以看作一个 Service,提供某种服务。在 HBase 协处理器中,Service 就是 Server 端需要提供的 Endpoint 协处理器服务,可以为 HBase 的客户端提供服务。在一个 Service 中可以提供多个 RPC,在本文中,我们仅仅定义了一个 RPC,实际工作中往往需要定义多个。
将该文件存放在工程的 src/main/protobuf 目录下。
$ mkdir $PROJECT_HOME/rowCount/src/main/protobuf
$ mv ibmDeveloperworksDemo.proto $PROJECT_HOME/rowCount/src/main/protobuf
用 Protobuf 编译器将该 proto 定义文件编译为 Java 代码,并放到 Maven 工程下。
$ cd $PROJECT_HOME/rowCount/src/main/protobuf
$ protoc --java_out=$PROJECT_HOME/rowCount/src/main/java ibmDeveloperworksDemo.proto
现在可以看到在工程的 src/main/java/org/ibm/developerworks 目录下生成了一个名为 getRowCount.java 的文件。这个 Java 文件就是 RPC 的 Java 代码,在后续的 Server 端代码和 Client 端代码中都要用到这个 Java 文件。
为了编译新生成的 Protobuf Java 代码,我们还需要修改 Maven 的 pom.xml 文件,加入对 protobuf-2.5.0 的依赖,这样 Maven 就可以自动下载相应的 jar 包,完成编译。
在 pom.xml 文件中加入如下的内容即可:
Protobuf 在 pom.xml 中的依赖
&dependency&
&groupId&com.google.protobuf&/groupId&
&artifactId&protobuf-java&/artifactId&
&version&2.5.0&/version&
&/dependency&
现在可以尝试进行第一编译了:
mvn clean compile
Server 端代码
Coprocessor 接口定义了两个接口函数,start 和 stop。
协处理器在 Region 打开的时候被 RegionServer 自动加载,并会调用器 start 接口,完成初始化工作。一般的该接口函数中仅仅需要将协处理器的运行上下文环境变量 CoprocessorEnviorment保存到本地即可。
start 接口
//这两个类成员是后续代码用来操作 ZooKeeper 的,在 start() 中进行初始化
private String zNodePath = "/hbase/ibmdeveloperworks/demo";
private ZooKeeperWatcher zkw =
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.re = (RegionCoprocessorEnvironment)
RegionServerServices rss = re.getRegionServerServices();
//获取 ZooKeeper 对象,这个 ZooKeeper 就是本 HBase 实例所连接的 ZooKeeper
zkw = rss.getZooKeeper();
//用 region name 作为 znode 的节点名后缀
zNodePath=zNodePath+re.getRegion().getRegionNameAsString();
throw new CoprocessorException("Must be loaded on a table region!");
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
我们的协处理器还需要实现 CoprocessorService 接口。该接口仅仅定义了一个接口函数 getService()。我们仅需要将本实例返回即可。HBase 的 RegionServer 在接受到客户端的调用请求时,将调用该接口获取实现了 RPC Service 的实例,因此本函数一般情况下就是返回自身实例即可。
getService 接口
Just returns a reference to this object, which implements the RowCounterService interface.
public Service getService() {
完成了以上三个接口函数之后,Endpoint 的框架代码就完成了。每个 Endpoint 协处理器都必须实现这些框架代码,而且写法雷同。
Endpoint 协处理器真正的业务代码都在每一个 RPC 函数的具体实现中。
在本文中,我们的 Endpoint 协处理器仅提供一个 RPC 函数,即 getRowCount。我将分别介绍编写该函数的几个主要工作:了解函数的定义,参数列表;处理入口参数;实现业务逻辑;设置返回参数。
函数 getRowCount 在 Server 端的函数定义如下。
public void getRowCount(RpcController controller, getRowCount.getRowCountRequest request,
RpcCallback&getRowCount.getRowCountResponse& done)
每一个 RPC 函数的参数列表都是固定的,有三个参数。第一个参数 RpcController 是固定的,所有 RPC 的第一个参数都是它,这是 HBase 的 Protobuf RPC 协议定义的;第二个参数为 RPC 的入口参数;第三个参数为返回参数。入口和返回参数分别由代码清单 1 的 proto 文件中的 getRowCountRequest 和 getRowCountResponse 定义。
分析入口参数
request 包含了入口参数,从 proto 定义中可以知道,这个入口参数只有一个 field,布尔类型的 reCount。我们将该参数从 Protobuf 消息中反序列化:
boolean reCount=request.getReCount();
如果您编写的 RPC 包含多个 field,每一个 field 都可以通过 request.getXXX() 函数来获得,其中 XXX 表示 field 的名字。
实现函数逻辑
我们的 RPC 的主要业务逻辑为获得 Region 的行数,当 reCount 为 true 时,需要遍历 Region 然后对结果集进行计数来获得行数;当 reCount 为 false 时,直接读取表示行数的变量。
从 ZooKeeper 得到计数器值
byte[] data=ZKUtil.getData(zkw,zNodePath);
rowcount = Bytes.toLong(data);
catch (Exception e) {("Exception during getData"); }
设置返回参数
得到了行数之后,用它来设置返回参数,即 getRowCount.CountResponse 消息的 rowCount 域。协处理器将该值返回给客户端。
getService 接口
getRowCount.getRowCountResponse response =
response = getRowCount.getRowCountResponse.newBuilder().setCount(rowcount).build();
//将 rowcount 设置为 CountResponse 消息的 rowCount
done.run(response); //Protobuf 的返回
至此,EndPoint 协处理器完成。
修改 pom.xml 加入 HBase 的 dependency
在以上代码中,包含了大量的 HBase 代码,因此在编译之前,需要修改 pom.xml 加入 HBase 的依赖。
清单 9. HBase 的 Maven 依赖
&dependency&
&groupId&org.apache.hbase&/groupId&
&artifactId&hbase-common&/artifactId&
&version&0.98.20-hadoop2&/version&
&/dependency&
&dependency&
&groupId&org.apache.hbase&/groupId&
&artifactId&hbase-protocol&/artifactId&
&version&0.98.20-hadoop2&/version&
&/dependency&
&dependency&
&groupId&org.apache.hbase&/groupId&
&artifactId&hbase-client&/artifactId&
&version&0.98.20-hadoop2&/version&
&/dependency&
&dependency&
&groupId&org.apache.hbase&/groupId&
&artifactId&hbase-server&/artifactId&
&version&0.98.20-hadoop2&/version&
&/dependency&
Observer 协处理器
Observer 协处理器利用 postPut,postDelete 等几个钩子函数来维护 Region 的行数计数器。当 put 操作被调用时,我们认为一条新的数据被插入 Region,因此计数器应该加 1。同样当 Delete 被调用时,将计数器减 1。
Observer 协处理器的框架代码
和 Endpoint 协处理器相同,首先需要编写通用的框架代码。类 rowCountObserver 是我们准备开发的协处理器,首先它必须继承 BaseRegionObserver 类。然后我们需要重载 start() 和 stop() 两个方法。最后重载 prePut,preDelete 等 hook 方法。
Observer 协处理器框架代码
public class rowCountObserver extends BaseRegionObserver {
RegionCoprocessorE
public void start(CoprocessorEnvironment e) throws IOException {
env = (RegionCoprocessorEnvironment)
RegionServerServices rss = re.getRegionServerServices();
m_region = re.getRegion();
zNodePath = zNodePath+m_region.getRegionNameAsString();
zkw = rss.getZooKeeper();
myrowcount = 0 ; //
if(ZKUtil.checkExists(zkw,zNodePath) == -1) {
LOG.error("LIULIUMI: cannot find the znode");
ZKUtil.createWithParents(zkw,zNodePath);
("znode path is : " + zNodePath);
} catch (Exception ee) {LOG.error("LIULIUMI: create znode fail"); }
public void stop(CoprocessorEnvironment e) throws IOException {
// nothing to do here
start 执行初始化操作,包括保存 CoprocessorEnviorment 对象,获取 ZooKeeper Watcher,创建 znode 等工作。主要就是创建了和 Endpoint 协处理直接共享数据用的 znode。
rowcount 的初始化
本文中,我们将代表行数的数字存放在 znode 中。znode 保存在 ZooKeeper 中,由 ZooKeeper 保证持久性。但是在第一次初始化的时候,我们还是需要统计当前的行数以便对 znode 进行正确的初始化。比如 Region 目前已经保存了 100 行数据,那么我们应该将 znode 的值设置为 100。
为此,我们需要对 Region 进行 scan 操作。然而 start() 方法调用的时候,region 还未完全初始化完成,因此无法调用 scan 操作,因此我们将利用 Observer 的 postOpen() 钩子函数来对 znode 进行初始化。postOpen 在 Region 被打开成功之后调用,因此所有的 Region 操作都可以执行。
在 postOpen 函数内,我们将利用 Scan 对象对 Region 进行遍历,求得行数,并用该值对 znode 进行初始化,代码如下。
postOpen 代码
public void postOpen(ObserverContext&RegionCoprocessorEnvironment& e)
long count = 0;
//Scan 获取当前 region 保存的行数
Scan scan = new Scan();
InternalScanner scanner =
scanner = m_region.getScanner(scan);
List&Cell& results = new ArrayList&Cell&();
boolean hasMore =
hasMore = scanner.next(results);
if(results.size()&0)
} while (hasMore);
//用当前的行数设置 ZooKeeper 中的计数器初始值
ZKUtil.setData(zkw,zNodePath,Bytes.toBytes(count));
//设置 myrowcount 类成员,用来表示当前 Region 的 rowcount
myrowcount =
catch (Exception ee) {("setData exception");}
维护 rowcount 的 hook 方法
在 Observer 协处理器中,我们需要实现以下几个 hook 方法来维护行数:
preDelete:在数据被删除前调用
prePut:在数据被插入前调用
在 preDelete 中,将计数器减 1;在 prePut 中,将计数器加 1。代码如下:
getService 接口
public void preDelete(ObserverContext&RegionCoprocessorEnvironment& e,
Delete delete,
WALEdit edit,
Durability durability)
throws IOException {
//计数器减 1
myrowcount--;
//更新 znode
ZKUtil.setData(zkw,zNodePath,Bytes.toBytes(myrowcount));
catch (Exception ee) {("setData exception");}
public void prePut(ObserverContext&RegionCoprocessorEnvironment& e,
WALEdit edit,
Durability durability)
throws IOException {
//计数器加 1
myrowcount++;
//更新 znode
ZKUtil.setData(zkw,zNodePath,Bytes.toBytes(myrowcount));
catch (Exception ee) {("setData exception");}
在 preDelete 和 prePut 中需要先调用 get 判断给定数据是否存在,再进行计数器的增加或者减一操作。
实现 Client 端代码
HBase 提供了客户端 Java 包 org.apache.hadoop.hbase.client.coprocessor。它提供以下三种方法来调用协处理器提供的服务:
Table.coprocessorService(byte[])
Table.coprocessorService(Class, byte[], byte[],Batch.Call),
Table.coprocessorService(Class, byte[], byte[], Batch.Call, Batch.Callback)
Endpoing 协处理器在 Region 上下文中运行,一个 HBase 表可能有多个 Region。因此客户端可以指定调用某一个单个 Region 上的协处理器,在单个 Region 上进行处理并返回一定结果;也可以调用一定范围内的若干 Region 上的协处理器并发执行,并对结果进行汇总处理。
调用单个 Region 上的协处理器 RPC
第一个方法使用 API coprocessorService(byte[]),这个函数只调用单个 Region 上的协处理器。
该方法采用 RowKey 指定 Region。这是因为 HBase 的客户端很少会直接操作 Region,一般不需要知道 Region 的名字;况且在 HBase 中,Region 名会随时改变,所以用 rowkey 来指定 Region 是最合理的方式。使用 rowkey 可以指定唯一的一个 Region,如果给定的 rowkey 并不存在,只要在某个 Region 的 rowkey 范围内,依然可以用来指定该 Region。比如 Region 1 处理 [row1, row100] 这个区间内的数据,则 rowkey=row1 就由 Region 1 来负责处理,换句话说,我们可以用 row1 来指定 Region 1,无论 rowkey 等于”row1”的记录是否存在。
coprocessorService 方法返回类型为 CoprocessorRpcChannel 的对象,该 RPC 通道连接到由 rowkey 指定的 Region 上,通过这个通道,就可以调用该 Region 上部署的协处理器 RPC。通过 Protobuf 定义了 RPC Service。调用 Service 的 newBlockingStub() 方法,将 CoprocessorRpcChannel 作为输入参数,就可以得到 RPC 调用的 stub 对象,进而调用远端的 RPC。
获取单个 Region 的 rowcount
long singleRegionCount(String tableName, String rowkey,boolean reCount)
long rowcount = 0;
Configuration config = new Configuration();
HConnection conn = HConnectionManager.createConnection(config);
HTableInterface tbl = conn.getTable(tableName);
//获取 Channel
CoprocessorRpcChannel channel = tbl.coprocessorService(rowkey.getBytes());
org.ibm.developerworks.getRowCount.ibmDeveloperWorksService.BlockingInterface service =
org.ibm.developerworks.getRowCount.ibmDeveloperWorksService.newBlockingStub(channel);
//设置 RPC 入口参数
org.ibm.developerworks.getRowCount.getRowCountRequest.Builder request =
org.ibm.developerworks.getRowCount.getRowCountRequest.newBuilder();
request.setReCount(reCount);
//调用 RPC
org.ibm.developerworks.getRowCount.getRowCountResponse ret =
service.getRowCount(null, request.build());
//解析结果
rowcount = ret.getRowCount();
catch(Exception e) {e.printStackTrace();}
调用多个 Region 上的协处理器 RPC,不使用 callback
Batch.Call 接口实现
Batch.Call&ibmDeveloperWorksService, getRowCountResponse& callable =
new Batch.Call&ibmDeveloperWorksService, getRowCountResponse&() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback&getRowCountResponse& rpcCallback =
new BlockingRpcCallback&getRowCountResponse&();
//下面重载 call 方法
public getRowCountResponse call(ibmDeveloperWorksService instance) throws IOException {
//初始化 RPC 的入口参数,设置 reCount 为 true
//Server 端会进行慢速的遍历 region 的方法进行统计
org.ibm.developerworks.getRowCount.getRowCountRequest.Builder builder =
getRowCountRequest.newBuilder();
builder.setreCount(true);
//RPC 调用
instance.getRowCount(controller, builder.build(), rpcCallback);
//直接返回结果,即该 Region 的 rowCount
return rpcCallback.get();
获取全表行数的代码
long getTableRowCountSlow(string tableName) {
//创建 Table 实例, HBase 1.0
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(tableName);
//创建 HTable 实例,HBase 0.98
HConnection connection = HConnectionManager.createConnection(config);
HTable table = connection.getTable(tableName);
Batch.Call&ibmDeveloperWorksService, getRowCountResponse& callable =
... 省略代码,参考代码清单 2
results = table.coprocessorService(ibmDeveloperWorksService.class, null, null,
callable);
long totalRowCount = 0;
for( r : results)
totalRowCount += r.value();
return totalRowC
调用多个 Region 上的协处理器 RPC–使用 callback
Batch.Callback 接口实现
//定义总的 rowCount 变量
final AtomicLong totalRowCount = new AtomicLong();
//定义 callback
Batch.Callback& Long & callback =
new Batch.Callback&Long&() {
public void update(byte[] region, byte[] row, getRowCountResponse result) {
//直接将 Batch.Call 的结果,即单个 region 的 rowCount 累加到 totalRowCount
totalRowCount.getAndAdd(result.getRowCount());
getTableRowCountFast 实现
long getTableRowCountFast(string tableName) {
//创建 Table 实例, HBase 1.0
Connection connection = ConnectionFactory.createConnection(conf);
TableName TABLE = TableName.valueOf(tableName);
Table table = connection.getTable(TABLE);
//创建 HTable 实例,HBase 0.98
HConnection connection = HConnectionManager.createConnection(config);
HTable table = connection.getTable(tableName);
Batch.Call&ibmDeveloperWorksService, getRowCountResponse& callable =
new Batch.Call&ibmDeveloperWorksService, getRowCountResponse&() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback&getRowCountResponse& rpcCallback =
new BlockingRpcCallback&getRowCountResponse&();
//下面重载 call 方法
public getRowCountResponse call(ibmDeveloperWorksService instance)
throws IOException {
//初始化 RPC 的入口参数,设置 reCount 为 false
//Server 端会进行慢速的遍历 region 的方法进行统计
org.ibm.developerworks.getRowCount.getRowCountRequest.Builder builder =
getRowCountRequest.newBuilder();
builder.setreCount(false);
//RPC 调用
instance.getRowCount(controller, builder.build(), rpcCallback);
//直接返回结果,即该 Region 的 rowCount
return rpcCallback.get();
//定义总的 rowCount 变量
AtomicLong totalRowCount = new AtomicLong();
//定义 callback
Batch.Callback& Long & callback =
new Batch.Callback&Long&() {
public void update(byte[] region, byte[] row, Long result) {
//直接将 Batch.Call 的结果,即单个 region 的 rowCount 累加到 totalRowCount
totalRowCount.getAndAdd(result);
table.coprocessorService( ibmDeveloperWorksService.class, null, null,
callable, callback);
return totalRowC
批处理 coprocessorService
除了以上三种直接调用 coprocessorService 的方法之外,HBase 还提供另外两个更加高效的客户端调用方法,能够对 coprocessorService 进行批处理,进一步提高调用效率:
&R extends com.google.protobuf.Message&
void batchCoprocessorService(MethodDescriptor methodDescriptor,
com.google.protobuf.Message request,
byte[] startKey,
byte[] endKey,
R responsePrototype,
Batch.Callback&R& callback)
Map&byte[],R& batchCoprocessorService(MethodDescriptor methodDescriptor,
com.google.protobuf.Message request,
byte[] startKey,
byte[] endKey,
R responsePrototype)
batchCoprocessorService 的调用方法
long getTableRowCountBatch(String tableName) {
//连接 Hbase
Configuration config = new Configuration();
HConnection connection = HConnectionManager.createConnection(config);
HTableInterface table = connection.getTable(tableName);
//设置 request 参数
org.ibm.developerworks.getRowCount.getRowCountRequest.Builder builder =
getRowCountRequest.newBuilder();
builder.setReCount(false);
//开始和结束 rowkey
byte[] s= Bytes.toBytes("r1");
byte[] e= Bytes.toBytes("t1");
//调用 batchCoprocessorService
results = table.batchCoprocessorService(
ibmDeveloperWorksService.getDescriptor().findMethodByName("getRowCount"),
builder.build(),s, e,
getRowCountResponse.getDefaultInstance());
Collection&getRowCountResponse& resultsc = results.values();
for( getRowCountResponse r : resultsc)
totalRowCount += r.getRowCount();
return totalRowC
部署和运行
利用 Java 代码实现协处理器部署
boolean createTable(string tableName) {
//HBase 1.0 创建 Table
Configuration config = new Configuration();
Table table =
TableName TABLE = TableName.valueOf(tableName);
Admin admin= new Admin(config);
HTableDescriptor tableDesc = new HTableDescriptor(TABLE);
//HBase 0.98 创建 Table
Configuration config = new Configuration();
HBaseAdmin admin = new HBaseAdmin(config);
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
//添加 coprocessor
tableDesc.addCoprocessor(“org.ibm.developerworks.coprocessor.getRowCountEndpoint”);
tableDesc.addCoprocessor(“org.ibm.developerworks.coprocessor.rowCountObserver”);
//省去其他的 HTableDescriptor 操作代码
admin.createTable(tableDesc);
代码下载地址:
本文已收录于以下专栏:
相关文章推荐
1.起因(Why HBase
Coprocessor)
HBase作为列族数据库最经常被人诟病的特性包括:无法轻易建立“二级索引”,难以执行求和、计数、排序等操作。比如,在旧版本的(统计数据表的总...
//using-hbase-coprocessor.html
HBase的Coprocessor是模仿谷歌BigTable的C...
原文:/blog/1993058
二级索引方案
协处理器的概念、作用和类型不介绍,可以参看:...
这几天为了满足业务需求,自己写了个Coprocessor,这里写一篇博客记录一下。
使用Coprocessor的目的是这样的,假如你的业务使你不得不进行全表查询,如果使用传统的Scan的话,那么全表查...
HBase的协处理器介绍
What we have built is a framework that provides a library and runtime environment for executing user...
HBase协处理器知识点
1、为什么引入协处理器?
在旧版(0.92HBase版本之前)的HBase中是没有引入协处理器的概念的。这样存在的问题是:创建二级索引较难,很难进行简单的排序、求和...
简要介绍HBase协处理器的用途及分类,并通过代码实例演示了HBase协处理器的基本使用方法。...
Introduction to Coprocessors
本文的内容就是一个简单的注释,写点自己的感受罢了,不喜欢翻译英文,内容全部来自/titles/...
/muzili-ykt/p/6056066.html
主要内容:
1. HBase协处理器介绍
2. 观察者(Obser...
他的最新文章
讲师:何宇健
讲师:董岩
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)}

我要回帖

更多关于 hbase 协处理器 网易 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信