[TOC]
# 分析
## 协处理器-coprocessor
协处理器有两种:observer和endpoint
Observer允许集群在正常的客户端操作过程中可以有不同的行为表现
Endpoint允许扩展集群的能力,对客户端应用开放新的运算命令
![](https://box.kancloud.cn/6590117d3aaf0a1a4e6f8c603f734e8f_420x342.png)
![](https://box.kancloud.cn/b0f1058b7d334185cf8a8393f2962f12_488x420.png)
1. 客户端发出put请求
2. 该请求被分派给合适的RegionServer和region
3. coprocessorHost拦截该请求,然后在该表上登记的每个RegionObserver上调用prePut()
4. 如果没有被prePut()拦截,该请求继续送到region,然后进行处理
5. region产生的结果再次被CoprocessorHost拦截,调用postPut()
6. 假如没有postPut()拦截该响应,最终结果被返回给客户端
* Observer的类型
1. RegionObs——这种Observer钩在数据访问和操作阶段,所有标准的数据操作命令都可以被pre-hooks和post-hooks拦截
2. WALObserver——WAL所支持的Observer;可用的钩子是pre-WAL和post-WAL
3. MasterObserver——钩住DDL事件,如表创建或模式修改
## 二级索引
row key在HBase中是以B+ tree结构化有序存储的,所以scan起来会比较效率。单表以row key存储索引,column value存储id值或其他数据 ,这就是Hbase索引表的结构。
由于HBase本身没有二级索引(Secondary Index)机制,基于索引检索数据只能单纯地依靠RowKey,为了能支持多条件查询,开发者需要将所有可能作为查询条件的字段一一拼接到RowKey中,这是HBase开发中极为常见的做法
比如,现在有一张1亿的用户信息表,建有出生地和年龄两个索引,我想得到一个条件是在杭州出生,年龄为20岁的按用户id正序排列前10个的用户列表。
有一种方案是,系统先扫描出生地为杭州的索引,得到一个用户id结果集,这个集合的规模假设是10万。然后扫描年龄,规模是5万,最后merge这些用户id,去重,排序得到结果。
这明显有问题,如何改良?
保证出生地和年龄的结果是排过序的,可以减少merge的数据量?但Hbase是按row key排序,value是不能排序的。
变通一下——将用户id冗余到row key里?OK,这是一种解决方案了,这个方案的图示如下:
![](https://box.kancloud.cn/7be7fcb3cb8c91ecb166e85304413bfb_418x290.png)
merge时提取交集就是所需要的列表,顺序是靠索引增加了_id,以字典序保证的。
按索引查询种类建立组合索引。
在方案1的场景中,想象一下,如果单索引数量多达10个会怎么样?10个索引,就要merge 10次,性能可想而知。
![](https://box.kancloud.cn/7ee51ae9414c930e844dad2828b3cb0d_464x318.png)
解决这个问题需要参考RDBMS的组合索引实现。
比如出生地和年龄需要同时查询,此时如果建立一个出生地和年龄的组合索引,查询时效率会高出merge很多。
当然,这个索引也需要冗余用户id,目的是让结果自然有序。结构图示如下:
![](https://box.kancloud.cn/13c73d7c850e8227341b7a43046d01cf_283x371.png)
这个方案的优点是查询速度非常快,根据查询条件,只需要到一张表中检索即可得到结果list。缺点是如果有多个索引,就要建立多个与查询条件一一对应的组合索引
而索引表的维护如果交给应用客户端,则无疑增加了应用端开发的负担
通过协处理器可以将索引表维护的工作从应用端剥离
* 利用Observer自动维护索引表示例
在社交类应用中,经常需要快速检索各用户的关注列表t_guanzhu,同时,又需要反向检索各种户的粉丝列表t_fensi,为了实现这个需求,最佳实践是建立两张互为反向的表:
* 一个表为正向索引关注表 “t_guanzhu”:
Rowkey: A-B
f1:From
f1:To
* 另一个表为反向索引粉丝表:“t_fensi”:
Rowkey: B—A
f1:From
f1:To
插入一条关注信息时,为了减轻应用端维护反向索引表的负担,可用Observer协处理器实现:
![](https://box.kancloud.cn/3c181a9f91fcac7868039dcf826e49b7_1052x395.png)
# 代码
1. 编写代码
~~~
package com.study;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import java.io.IOException;
public class FFCoprocessor extends BaseRegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
//二级索引表
Table table = conn.getTable(TableName.valueOf("index_user"));
//拦截到原始put对象中的rowkey和列的Value
byte[] row = put.getRow();
String rowkey = new String(row);
Cell addressCell = put.get("info".getBytes(), "name".getBytes()).get(0);
byte[] valueArray = addressCell.getValueArray();
String address = new String(valueArray,addressCell.getValueOffset(),addressCell.getValueLength());
String[] user_fensi = rowkey.split("-");
Put putIndex = new Put((user_fensi[1]+"-"+user_fensi[0]).getBytes());
putIndex.addColumn("info".getBytes(), "name".getBytes(),address.getBytes());
table.put(putIndex);
table.close();
}
}
~~~
2. 打成jar包“coprocess.jar”上传hdfs
~~~
hadoop fs -put coprocess.jar /
~~~
3. 创建表
~~~
create 'user_guanzhu','info'
create 'index_user','info'
~~~
4. 修改schema,注册协处理器
需要先disable一下
~~~
disable 'user_guanzhu'
~~~
注意下面的表名,jar包和类名修改为自己的
~~~
alter 'user_guanzhu',METHOD => 'table_att','coprocessor'=>'hdfs://master:9000/coprocess.jar|com.study.FFCoprocessor|1001|'
~~~
启用这个表
~~~
enable 'user_guanzhu'
~~~
5. 检查是否注册成功
~~~
describe 'user_guanzhu'
~~~
6. 向正向索引表中插入数据进行验证
~~~
put 'user_guanzhu','zhangsan-liuyifei','f1:adress','beijing'
~~~
# 删除协处理器
~~~
先disable表
disable 'user_guanzhu'
删除,$1表示是第一个协处理器
alter 'user_guanzhu',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
再enable表
enable 'user_guanzhu'
~~~
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介