[TOC]
# HBASE操作及API
## 1 命令行演示
### 1.1 基本shell命令
> 进入hbase命令行
~~~
./hbase shell
~~~
> 显示hbase中的表
~~~
list
~~~
> 创建user表,包含info、data两个列族
~~~
create 'user', 'info1', 'data1'
create 'user', {NAME => 'info', VERSIONS => '3'}
~~~
> 向user表中插入信息,row key为rk0001,列族info中添加name列标示符,值为zhangsan
~~~
put 'user', 'rk0001', 'info:name', 'zhangsan'
~~~
> 向user表中插入信息,row key为rk0001,列族info中添加gender列标示符,值为female
~~~
put 'user', 'rk0001', 'info:gender', 'female'
~~~
> 向user表中插入信息,row key为rk0001,列族info中添加age列标示符,值为20
~~~
put 'user', 'rk0001', 'info:age', 20
~~~
> 向user表中插入信息,row key为rk0001,列族data中添加pic列标示符,值为picture
~~~
put 'user', 'rk0001', 'data:pic', 'picture'
~~~
> 获取user表中row key为rk0001的所有信息
~~~
get 'user', 'rk0001'
~~~
> 获取user表中row key为rk0001,info列族的所有信息
~~~
get 'user', 'rk0001', 'info'
~~~
> 获取user表中row key为rk0001,info列族的name、age列标示符的信息
~~~
get 'user', 'rk0001', 'info:name', 'info:age'
~~~
> 获取user表中row key为rk0001,info、data列族的信息
~~~
get 'user', 'rk0001', 'info', 'data'
get 'user', 'rk0001', {COLUMN => ['info', 'data']}
get 'user', 'rk0001', {COLUMN => ['info:name', 'data:pic']}
~~~
> 获取user表中row key为rk0001,列族为info,版本号最新5个的信息
~~~
get 'user', 'rk0001', {COLUMN => 'info', VERSIONS => 2}
get 'user', 'rk0001', {COLUMN => 'info:name', VERSIONS => 5}
get 'user', 'rk0001', {COLUMN => 'info:name', VERSIONS => 5, TIMERANGE => [1392368783980, 1392380169184]}
~~~
> 获取user表中row key为rk0001,cell的值为zhangsan的信息
~~~
get 'people', 'rk0001', {FILTER => "ValueFilter(=, 'binary:图片')"}
~~~
> 获取user表中row key为rk0001,列标示符中含有a的信息
~~~
get 'people', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"}
put 'user', 'rk0002', 'info:name', 'fanbingbing'
put 'user', 'rk0002', 'info:gender', 'female'
put 'user', 'rk0002', 'info:nationality', '中国'
get 'user', 'rk0002', {FILTER => "ValueFilter(=, 'binary:中国')"}
~~~
> 查询user表中的所有信息
~~~
scan 'user'
~~~
> 查询user表中列族为info的信息
~~~
scan 'user', {COLUMNS => 'info'}
scan 'user', {COLUMNS => 'info', RAW => true, VERSIONS => 5}
scan 'persion', {COLUMNS => 'info', RAW => true, VERSIONS => 3}
~~~
> 查询user表中列族为info和data的信息
~~~
scan 'user', {COLUMNS => ['info', 'data']}
scan 'user', {COLUMNS => ['info:name', 'data:pic']}
~~~
> 查询user表中列族为info、列标示符为name的信息
~~~
scan 'user', {COLUMNS => 'info:name'}
~~~
> 查询user表中列族为info、列标示符为name的信息,并且版本最新的5个
~~~
scan 'user', {COLUMNS => 'info:name', VERSIONS => 5}
~~~
> 查询user表中列族为info和data且列标示符中含有a字符的信息
~~~
scan 'user', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"}
~~~
> 查询user表中列族为info,rk范围是[rk0001, rk0003)的数据
~~~
scan 'people', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'}
~~~
> 查询user表中row key以rk字符开头的
~~~
scan 'user',{FILTER=>"PrefixFilter('rk')"}
~~~
> 查询user表中指定范围的数据
~~~
scan 'user', {TIMERANGE => [1392368783980, 1392380169184]}
~~~
> 删除数据
> 删除user表row key为rk0001,列标示符为info:name的数据
~~~
delete 'people', 'rk0001', 'info:name'
~~~
> 删除user表row key为rk0001,列标示符为info:name,timestamp为1392383705316的数据
~~~
delete 'user', 'rk0001', 'info:name', 1392383705316
~~~
> 清空user表中的数据
~~~
truncate 'people'
~~~
> 修改表结构
> 首先停用user表(新版本不用)
~~~
disable 'user'
~~~
> 添加两个列族f1和f2
~~~
alter 'people', NAME => 'f1'
alter 'user', NAME => 'f2'
~~~
> 启用表
~~~
enable 'user'
~~~
> ###disable 'user'(新版本不用)
> 删除一个列族:
~~~
alter 'user', NAME => 'f1', METHOD => 'delete' 或 alter 'user', 'delete' => 'f1'
~~~
> 添加列族f1同时删除列族f2
~~~
alter 'user', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
~~~
> 将user表的f1列族版本号改为5
~~~
alter 'people', NAME => 'info', VERSIONS => 5
~~~
> 启用表
~~~
enable 'user'
~~~
> 删除表
~~~
disable 'user'
drop 'user'
get 'person', 'rk0001', {FILTER => "ValueFilter(=, 'binary:中国')"}
get 'person', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"}
scan 'person', {COLUMNS => 'info:name'}
scan 'person', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"}
scan 'person', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'}
scan 'person', {COLUMNS => 'info', STARTROW => '20140201', ENDROW => '20140301'}
scan 'person', {COLUMNS => 'info:name', TIMERANGE => [1395978233636, 1395987769587]}
delete 'person', 'rk0001', 'info:name'
alter 'person', NAME => 'ffff'
alter 'person', NAME => 'info', VERSIONS => 10
get 'user', 'rk0002', {COLUMN => ['info:name', 'data:pic']}
~~~
## 2 hbase代码开发(基本,过滤器查询)
### 2.1 基本增删改查java实现
~~~
public class HbaseDemo {
private Configuration conf = null;
@Before
public void init(){
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "weekend05,weekend06,weekend07");
}
@Test
public void testDrop() throws Exception{
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable("account");
admin.deleteTable("account");
admin.close();
}
@Test
public void testPut() throws Exception{
HTable table = new HTable(conf, "person_info");
Put p = new Put(Bytes.toBytes("person_rk_bj_zhang_000002"));
p.add("base_info".getBytes(), "name".getBytes(), "zhangwuji".getBytes());
table.put(p);
table.close();
}
@Test
public void testDel() throws Exception{
HTable table = new HTable(conf, "user");
Delete del = new Delete(Bytes.toBytes("rk0001"));
del.deleteColumn(Bytes.toBytes("data"), Bytes.toBytes("pic"));
table.delete(del);
table.close();
}
@Test
public void testGet() throws Exception{
HTable table = new HTable(conf, "person_info");
Get get = new Get(Bytes.toBytes("person_rk_bj_zhang_000001"));
get.setMaxVersions(5);
Result result = table.get(get);
List<Cell> cells = result.listCells();
for(Cell c:cells){
}
//result.getValue(family, qualifier); 可以从result中直接取出一个特定的value
//遍历出result中所有的键值对
List<KeyValue> kvs = result.list();
//kv ---> f1:title:superise.... f1:author:zhangsan f1:content:asdfasldgkjsldg
for(KeyValue kv : kvs){
String family = new String(kv.getFamily());
System.out.println(family);
String qualifier = new String(kv.getQualifier());
System.out.println(qualifier);
System.out.println(new String(kv.getValue()));
}
table.close();
}
~~~
### 2.2 过滤器查询
> 引言:过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
> 过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;
#### 1 hbase过滤器的比较运算符:
~~~
LESS <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP 排除所有
~~~
#### 2 Hbase过滤器的比较器(指定比较机制):
| BinaryComparator| 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])|
| --- | --- |
| BinaryPrefixComparator | 跟前面相同,只是比较左端的数据是否相同|
| NullComparator | 判断给定的是否为空|
| BitComparator| 按位比较|
| RegexStringComparator | 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL|
| SubstringComparator | 判断提供的子串是否出现在value中。|
#### 3 Hbase的过滤器分类
1) 比较过滤器
> 1.1 行键过滤器RowFilter
~~~
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
~~~
> 1.2 列族过滤器FamilyFilter
~~~
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
scan.setFilter(filter1);
~~~
> 1.3 列过滤器QualifierFilter
~~~
filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));
scan.setFilter(filter1);
~~~
> 1.4 值过滤器 ValueFilter
~~~
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") );
scan.setFilter(filter1);
~~~
2) 专用过滤器
> 2.1 单列值过滤器 SingleColumnValueFilter ----会返回满足条件的整行
~~~
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回
scan.setFilter(filter1);
~~~
> 2.2 SingleColumnValueExcludeFilter
> 与上相反
> 2.3 前缀过滤器 PrefixFilter----针对行键
~~~
Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
scan.setFilter(filter1);
~~~
> 2.4 列前缀过滤器 ColumnPrefixFilter
~~~
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
scan.setFilter(filter1);
~~~
> 2.5分页过滤器 PageFilter
~~~
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01:2181,spark02:2181,spark03:2181");
String tableName = "testfilter";
String cfName = "f1";
final byte[] POSTFIX = new byte[] { 0x00 };
HTable table = new HTable(conf, tableName);
Filter filter = new PageFilter(3);
byte[] lastRow = null;
int totalRows = 0;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if(lastRow != null){
//注意这里添加了POSTFIX操作,用来重置扫描边界
byte[] startRow = Bytes.add(lastRow,POSTFIX);
scan.setStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while((result = scanner.next()) != null){
System.out.println(localRows++ + ":" + result);
totalRows ++;
lastRow = result.getRow();
}
scanner.close();
if(localRows == 0) break;
}
System.out.println("total rows:" + totalRows);
}
/**
* 多种过滤条件的使用方法
* @throws Exception
*/
@Test
public void testScan() throws Exception{
HTable table = new HTable(conf, "person_info".getBytes());
Scan scan = new Scan(Bytes.toBytes("person_rk_bj_zhang_000001"), Bytes.toBytes("person_rk_bj_zhang_000002"));
//前缀过滤器----针对行键
Filter filter = new PrefixFilter(Bytes.toBytes("rk"));
//行过滤器 ---针对行键
ByteArrayComparable rowComparator = new BinaryComparator(Bytes.toBytes("person_rk_bj_zhang_000001"));
RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator);
/**
* 假设rowkey格式为:创建日期_发布日期_ID_TITLE
* 目标:查找 发布日期 为 2014-12-21 的数据
* sc.textFile("path").flatMap(line=>line.split("\t")).map(x=>(x,1)).reduceByKey(_+_).map((_(2),_(1))).sortByKey().map((_(2),_(1))).saveAsTextFile("")
*
*
*/
rf = new RowFilter(CompareOp.EQUAL , new SubstringComparator("_2014-12-21_"));
//单值过滤器1完整匹配字节数组
new SingleColumnValueFilter("base_info".getBytes(), "name".getBytes(), CompareOp.EQUAL, "zhangsan".getBytes());
//单值过滤器2 匹配正则表达式
ByteArrayComparable comparator = new RegexStringComparator("zhang.");
new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator);
//单值过滤器3匹配是否包含子串,大小写不敏感
comparator = new SubstringComparator("wu");
new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator);
//键值对元数据过滤-----family过滤----字节数组完整匹配
FamilyFilter ff = new FamilyFilter(
CompareOp.EQUAL ,
new BinaryComparator(Bytes.toBytes("base_info")) //表中不存在inf列族,过滤结果为空
);
//键值对元数据过滤-----family过滤----字节数组前缀匹配
ff = new FamilyFilter(
CompareOp.EQUAL ,
new BinaryPrefixComparator(Bytes.toBytes("inf")) //表中存在以inf打头的列族info,过滤结果为该列族所有行
);
//键值对元数据过滤-----qualifier过滤----字节数组完整匹配
filter = new QualifierFilter(
CompareOp.EQUAL ,
new BinaryComparator(Bytes.toBytes("na")) //表中不存在na列,过滤结果为空
);
filter = new QualifierFilter(
CompareOp.EQUAL ,
new BinaryPrefixComparator(Bytes.toBytes("na")) //表中存在以na打头的列name,过滤结果为所有行的该列数据
);
//基于列名(即Qualifier)前缀过滤数据的ColumnPrefixFilter
filter = new ColumnPrefixFilter("na".getBytes());
//基于列名(即Qualifier)多个前缀过滤数据的MultipleColumnPrefixFilter
byte[][] prefixes = new byte[][] {Bytes.toBytes("na"), Bytes.toBytes("me")};
filter = new MultipleColumnPrefixFilter(prefixes);
//为查询设置过滤条件
scan.setFilter(filter);
scan.addFamily(Bytes.toBytes("base_info"));
//一行
// Result result = table.get(get);
//多行的数据
ResultScanner scanner = table.getScanner(scan);
for(Result r : scanner){
/**
for(KeyValue kv : r.list()){
String family = new String(kv.getFamily());
System.out.println(family);
String qualifier = new String(kv.getQualifier());
System.out.println(qualifier);
System.out.println(new String(kv.getValue()));
}
*/
//直接从result中取到某个特定的value
byte[] value = r.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("name"));
System.out.println(new String(value));
}
table.close();
}
~~~
## 3.Hbase高级应用
### 3.1建表高级属性
> 下面几个shell 命令在hbase操作中可以起到很到的作用,且主要体现在建表的过程中,看下面几个create 属性
1) BLOOMFILTER 默认是NONE 是否使用布隆过虑及使用何种方式布隆过滤可以每列族单独启用。
> 使用 HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) 对列族单独启用布隆。
1. Default = ROW 对行进行布隆过滤。
2. 对 ROW,行键的哈希在每次插入行时将被添加到布隆。
3. 对 ROWCOL,行键 + 列族 + 列族修饰的哈希将在每次插入行时添加到布隆
* 使用方法: create 'table',{BLOOMFILTER =>'ROW'}
* 启用布隆过滤可以节省读磁盘过程,可以有助于降低读取延迟
2) VERSIONS 默认是1 这个参数的意思是数据保留1个 版本,如果我们认为我们的数据没有这么大的必要保留这么多,随时都在更新,而老版本的数据对我们毫无价值,那将此参数设为1 能节约2/3的空间
~~~
使用方法: create 'table',{VERSIONS=>'2'}
~~~
> 附:MIN_VERSIONS => '0'是说在compact操作执行之后,至少要保留的版本
3) COMPRESSION 默认值是NONE 即不使用压缩
> 这个参数意思是该列族是否采用压缩,采用什么压缩算法
~~~
使用方法: create 'table',{NAME=>'info',COMPRESSION=>'SNAPPY'}
~~~
* 建议采用SNAPPY压缩算法
* HBase中,在Snappy发布之前(Google 2011年对外发布Snappy),采用的LZO算法,目标是达到尽可能快的压缩和解压速度,同时减少对CPU的消耗;
* 在Snappy发布之后,建议采用Snappy算法(参考《HBase: The Definitive Guide》),具体可以根据实际情况对LZO和Snappy做过更详细的对比测试后再做选择。
| Algorithm | % remaining| Encoding| Decoding|
| --- | --- | --- | --- |
| GZIP| 13.4% | 21 MB/s | 118 MB/s|
| LZO | 20.5% | 135 MB/s | 410 MB/s|
| Zippy/Snappy | 22.2% | 172 MB/s| 409 MB/s|
> 如果建表之初没有压缩,后来想要加入压缩算法,可以通过alter修改schema
4) alter
> 使用方法:
> 如 修改压缩算法
~~~
disable 'table'
alter 'table',{NAME=>'info',COMPRESSION=>'snappy'}
enable 'table'
~~~
> 但是需要执行major_compact 'table' 命令之后 才会做实际的操作。
5) TTL
> 默认是 2147483647 即:Integer.MAX_VALUE 值大概是68年
> 这个参数是说明该列族数据的存活时间,单位是s
> 这个参数可以根据具体的需求对数据设定存活时间,超过存过时间的数据将在表中不在显示,待下次major compact的时候再彻底删除数据
> 注意的是TTL设定之后 MIN_VERSIONS=>'0' 这样设置之后,TTL时间戳过期后,将全部彻底删除该family下所有的数据,如果MIN_VERSIONS 不等于0那将保留最新的MIN_VERSIONS个版本的数据,其它的全部删除,比如MIN_VERSIONS=>'1' 届时将保留一个最新版本的数据,其它版本的数据将不再保存。
6) describe 'table' 这个命令查看了create table 的各项参数或者是默认值。
7) disable_all 'toplist.*' disable_all 支持正则表达式,并列出当前匹配的表的如下:
~~~
toplist_a_total_1001
toplist_a_total_1002
toplist_a_total_1008
toplist_a_total_1009
toplist_a_total_1019
toplist_a_total_1035
...
Disable the above 25 tables (y/n)? 并给出确认提示
~~~
8) drop_all 这个命令和disable_all的使用方式是一样的
9) hbase 表预分区----手动分区
> 默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候,所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按照region分区情况,在集群内做数据的负载均衡。
> 命令方式:
~~~
create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
~~~
> 也可以使用api的方式:
~~~
bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info
参数:
test_table是表名
HexStringSplit 是split 方式
-c 是分10个region
-f 是family
~~~
> 可在UI上查看结果,如图:
![](https://box.kancloud.cn/4d436820799190a23949e467b079d9d3_554x622.png)
> 这样就可以将表预先分为15个区,减少数据达到storefile 大小的时候自动分区的时间消耗,并且还有以一个优势,就是合理设计rowkey 能让各个region 的并发请求平均分配(趋于均匀) 使IO 效率达到最高,但是预分区需要将filesize 设置一个较大的值,设置哪个参数呢 hbase.hregion.max.filesize 这个值默认是10G 也就是说单个region 默认大小是10G
> 这个参数的默认值在0.90 到0.92到0.94.3各版本的变化:256M--1G--10G
>但是如果MapReduce Input类型为TableInputFormat 使用hbase作为输入的时候,就要注意了,每个region一个map,如果数据小于10G 那只会启用一个map 造成很大的资源浪费,这时候可以考虑适当调小该参数的值,或者采用预分配region的方式,并将检测如果达到这个值,再手动分配region。
### 3.2 hbase应用案例看行键设计
> 表结构设计
1) 列族数量的设定
> 以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在另一列族;
4) 行键的设计
> 语音详单:
~~~
13877889988-20150625
13877889988-20150625
13877889988-20150626
13877889988-20150626
13877889989
13877889989
13877889989
~~~
> ----将需要批量查询的数据尽可能连续存放
> CMS系统----多条件查询
> 尽可能将查询条件关键词拼装到rowkey中,查询频率最高的条件尽量往前靠
> 20150230-zhangsan-category…
> 20150230-lisi-category…
> (每一个条件的值长度不同,可以通过做定长映射来提高效率)
> 参考:《hbase 实战》----详细讲述了facebook /GIS等系统的表结构设计
### 3.3 Hbase和mapreduce结合
> 为什么需要用mapreduce去访问hbase的数据?
> ——加快分析速度和扩展分析能力
> Mapreduce访问hbase数据作分析一定是在离线分析的场景下应用
![](https://box.kancloud.cn/444eb31438c9511d4268a531f71c2025_555x309.png)
#### 3.3.1 从Hbase中读取数据、分析,写入hdfs
~~~
/**
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
* @author duanhaitao@itcast.cn
*
*/
public class HbaseReader {
public static String flow_fields_import = "flow_fields_import";
static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
byte[] bytes = key.copyBytes();
String phone = new String(bytes);
byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes());
String url = new String(urlbytes);
context.write(new Text(phone + "\t" + url), NullWritable.get());
}
}
static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01");
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseReader.class);
// job.setMapperClass(HdfsSinkMapper.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);
job.setReducerClass(HdfsSinkReducer.class);
FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
}
}
~~~
#### 3.3.2 从hdfs中读取数据写入Hbase
~~~
/**
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
}
* @author duanhaitao@itcast.cn
*
*/
public class HbaseSinker {
public static String flow_fields_import = "flow_fields_import";
static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[0];
String url = fields[1];
FlowBean bean = new FlowBean(phone,url);
context.write(bean, NullWritable.get());
}
}
static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{
@Override
protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
Put put = new Put(key.getPhone().getBytes());
put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());
context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01");
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);
if(tableExists){
hBaseAdmin.disableTable(flow_fields_import);
hBaseAdmin.deleteTable(flow_fields_import);
}
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes());
desc.addFamily(hColumnDescriptor);
hBaseAdmin.createTable(desc);
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseSinker.class);
job.setMapperClass(HbaseSinkMrMapper.class);
TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);
FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data"));
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Mutation.class);
job.waitForCompletion(true);
}
}
~~~
### 3.3 hbase高级编程
#### 3.3.1 协处理器---- Coprocessor
> 协处理器有两种:observer和endpoint
* Observer允许集群在正常的客户端操作过程中可以有不同的行为表现
* Endpoint允许扩展集群的能力,对客户端应用开放新的运算命令
1) Observer协处理器
* 正常put请求的流程:
![](https://box.kancloud.cn/bc0a4513b34a5e24cc9ec438c2bc02a9_293x201.png)
* 加入Observer协处理后的put流程:
![](https://box.kancloud.cn/e111e371f3a0d373986650af5891240f_377x275.png)
1) 客户端发出put请求
2) 该请求被分派给合适的RegionServer和region
3) coprocessorHost拦截该请求,然后在该表上登记的每个RegionObserver上调用prePut()
4) 如果没有被prePut()拦截,该请求继续送到region,然后进行处理
5) region产生的结果再次被CoprocessorHost拦截,调用postPut()
6) 假如没有postPut()拦截该响应,最终结果被返回给客户端
2) Observer的类型
1) RegionObs——这种Observer钩在数据访问和操作阶段,所有标准的数据操作命令都可以被pre-hooks和post-hooks拦截
2) WALObserver——WAL所支持的Observer;可用的钩子是pre-WAL和post-WAL
3) MasterObserver——钩住DDL事件,如表创建或模式修改
3) Observer应用场景示例
> 见下节;
> Endpoint—参考《Hbase 权威指南》
#### 3.3.2 二级索引
> row key在HBase中是以B+ tree结构化有序存储的,所以scan起来会比较效率。单表以row key存储索引,column value存储id值或其他数据 ,这就是Hbase索引表的结构。
> 由于HBase本身没有二级索引(Secondary Index)机制,基于索引检索数据只能单纯地依靠RowKey,为了能支持多条件查询,开发者需要将所有可能作为查询条件的字段一一拼接到RowKey中,这是HBase开发中极为常见的做法
> 比如,现在有一张1亿的用户信息表,建有出生地和年龄两个索引,我想得到一个条件是在杭州出生,年龄为20岁的按用户id正序排列前10个的用户列表。
> 有一种方案是,系统先扫描出生地为杭州的索引,得到一个用户id结果集,这个集合的规模假设是10万。然后扫描年龄,规模是5万,最后merge这些用户id,去重,排序得到结果。
> 这明显有问题,如何改良?
> 保证出生地和年龄的结果是排过序的,可以减少merge的数据量?但Hbase是按row key排序,value是不能排序的。
> 变通一下——将用户id冗余到row key里?OK,这是一种解决方案了,这个方案的图示如下:
![](https://box.kancloud.cn/202f53957657e08d21beb5ff7915ea84_301x209.png)
> merge时提取交集就是所需要的列表,顺序是靠索引增加了_id,以字典序保证的。
2) 按索引查询种类建立组合索引。
> 在方案1的场景中,想象一下,如果单索引数量多达10个会怎么样?10个索引,就要merge 10次,性能可想而知。
![](https://box.kancloud.cn/03dd8f0da0451db5eb394b1ea74bf5a0_330x224.png)
> 解决这个问题需要参考RDBMS的组合索引实现。
> 比如出生地和年龄需要同时查询,此时如果建立一个出生地和年龄的组合索引,查询时效率会高出merge很多。
> 当然,这个索引也需要冗余用户id,目的是让结果自然有序。结构图示如下:
![](https://box.kancloud.cn/19544ce804313f37013bd9df92e57d17_201x265.png)
> 这个方案的优点是查询速度非常快,根据查询条件,只需要到一张表中检索即可得到结果list。缺点是如果有多个索引,就要建立多个与查询条件一一对应的组合索引
> 而索引表的维护如果交给应用客户端,则无疑增加了应用端开发的负担
> 通过协处理器可以将索引表维护的工作从应用端剥离
* 利用Observer自动维护索引表示例
> 在社交类应用中,经常需要快速检索各用户的关注列表t_guanzhu,同时,又需要反向检索各种户的粉丝列表t_fensi,为了实现这个需求,最佳实践是建立两张互为反向的表:
1) 一个表为正向索引关注表 “t_guanzhu”:
~~~
Rowkey: A-B
f1:From
f1:To
~~~
2) 另一个表为反向索引粉丝表:“t_fensi”:
~~~
Rowkey: B—A
f1:From
f1:To
~~~
> 插入一条关注信息时,为了减轻应用端维护反向索引表的负担,可用Observer协处理器实现:
![](https://box.kancloud.cn/b71a6204b0473f73b210a95b4951f1ea_556x269.png)
1) 编写自定义RegionServer
~~~
public class InverIndexCoprocessor extends BaseRegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
// set configuration
Configuration conf = HBaseConfiguration.create();
// need conf.set...
HTable table = new HTable(conf, "t_fensi");
Cell fromCell = put.get("f1".getBytes(), "From".getBytes()).get(0);
Cell toCell = put.get("f1".getBytes(), "To".getBytes()).get(0);
byte[] valueArray = fromCell.getValue();
String from = new String(valueArray);
valueArray = toCell.getValue();
String to = new String(valueArray);
Put putIndex = new Put((to+"-"+from).getBytes());
putIndex.add("f1".getBytes(), "From".getBytes(),from.getBytes());
putIndex.add("f1".getBytes(), "To".getBytes(),to.getBytes());
table.put(putIndex);
table.close();
}
}
~~~
2) 打成jar包“fensiguanzhu.jar”上传hdfs
~~~
hadoop fs -put fensiguanzhu.jar /demo/
~~~
3) 修改t_fensi的schema,注册协处理器
~~~
hbase(main):017:0> alter ' t_fensi ',METHOD => 'table_att','coprocessor'=>'hdfs://spark01:9000/demo/ fensiguanzhu.jar|cn.itcast.bigdata.hbasecoprocessor. InverIndexCoprocessor|1001|'
Updating all regions with the new schema...
0/1 regions updated.
1/1 regions updated.
Done.
~~~
4) 检查是否注册成功
~~~
hbase(main):018:0> describe 'ff'
DESCRIPTION ENABLED
'ff', {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://spark01:9000/demo/fensiguanzhu.jar|cn.itcast.bi true
gdata.hbasecoprocessor.TestCoprocessor|1001|'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMF
ILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0
', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', B
LOCKCACHE => 'true'}, {NAME => 'f2', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATIO
N_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KE
EP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0250 seconds
~~~
5) 向正向索引表中插入数据进行验证
- hadoop
- linux基础
- Linux入门
- Linux进阶
- shell
- Zookeeper
- Zookeeper简介及部署
- Zookeeper使用及API
- Redis
- Redis简介安装部署
- Redis使用及API
- Java高级增强
- Java多线程增强
- Maven简介及搭建
- Hive
- Hive简介及安装
- Hive操作
- HIve常用函数
- Hive数据类型
- Flume
- Flume简介及安装
- flume 拦截器(interceptor)
- azkaban
- azKaban简介及安装
- Sqoop
- Sqoop简介及安装
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE图片资源
- MAPREDUCE加强
- HBASE
- HBASE简介及安装
- HBASE操作及API
- HBASE内部原理
- Storm
- Storm简介及安装
- Storm原理
- kafka
- kafka简介及安装
- kafka常用操作及API
- kafka原理
- kafka配置详解
- Scala
- Scala简介及安装
- Scala基础语法
- Scala实战