[TOC]
# 过滤器查询
过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;
hbase过滤器的比较运算符:
~~~
LESS <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP 排除所有
~~~
Hbase过滤器的比较器(指定比较机制):
~~~
BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同
NullComparator 判断给定的是否为空
BitComparator 按位比较
RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator 判断提供的子串是否出现在value中
~~~
# Hbase的过滤器分类
## 比较过滤器
**行键过滤器RowFilter**
~~~
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
~~~
**列族过滤器FamilyFilter**
~~~
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
scan.setFilter(filter1);
~~~
**列过滤器QualifierFilter**
~~~
filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));
scan.setFilter(filter1);
~~~
**值过滤器 ValueFilter**
~~~
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") );
scan.setFilter(filter1);
~~~
## 专用过滤器
**单列值过滤器 SingleColumnValueFilter**
----会返回满足条件的整行
~~~
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回
scan.setFilter(filter1);
~~~
**SingleColumnValueExcludeFilter**
与上相反
**前缀过滤器 PrefixFilter----针对行键**
~~~
Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
scan.setFilter(filter1);
~~~
**列前缀过滤器 ColumnPrefixFilter**
~~~
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
scan.setFilter(filter1);
~~~
## 过滤器代码
~~~
package com.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Iterator;
public class HbaseDemo {
private Configuration conf = null;
private Connection conn = null;
@Before
public void init() throws IOException {
//构建个配置
conf = HBaseConfiguration.create();
//对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了
//因为hbase的客户端找hbase读写数据完全不用经过hmaster
conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");
conn = ConnectionFactory.createConnection(conf);
}
@Test
public void testFilter() throws IOException {
//针对行键的前缀过滤器,row key,前缀过滤
Filter pf = new PrefixFilter(Bytes.toBytes("liu"));
testScan(pf);
//行过滤器
//比较运算符
//小于,BinaryComparator比较器按照字节字典,排在user002他之前的row key都出来
RowFilter rf1 = new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("user002")));
//在row key中包含00就符合
RowFilter rf2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("00"));
testScan(rf1);
System.out.println("*****************");
testScan(rf2);
//针对指定一个列的value来过滤,会显示一个完整的列
//列族名base_info,列标识符password,值是123456
SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "password".getBytes(), CompareFilter.CompareOp.GREATER, "123456".getBytes());
//如果指定的列缺失,则也过滤掉
scvf.setFilterIfMissing(true);
testScan(scvf);
System.out.println("************");
//正则比较器
//包含以zhang这个字符串开头的value值,符合这个要求的列
RegexStringComparator comparator1 = new RegexStringComparator("^zhang");
//子串包含si的值,符合这个条件的列
ByteArrayComparable comparator2 = new SubstringComparator("si");
//第三个参数可更换
SingleColumnValueFilter scvf1 = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareFilter.CompareOp.EQUAL, comparator2);
testScan(scvf1);
//针对列族名的过滤器,返回结果中只会包含满足条件的列族中的数据
//等于,列族中名称inf
FamilyFilter ff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("inf")));
//包含这个base前置的列族的对应列
FamilyFilter ff2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
testScan(ff2);
//针对列名的过滤器,返回结果中只会包含满足条件的列的数据
QualifierFilter qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("password")));
QualifierFilter qf2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("us")));
testScan(qf2);
//跟SingleColumnValueFilter结果不同,只返回符合条件的该column
//前缀过滤
ColumnPrefixFilter cf = new ColumnPrefixFilter("passw".getBytes());
testScan(cf);
//指定多个列条件,但是这些条件的或的关系
byte[][] prefixes = {Bytes.toBytes("username"), Bytes.toBytes("password")};
MultipleColumnPrefixFilter mcf = new MultipleColumnPrefixFilter(prefixes);
testScan(mcf);
//多个过滤器
//等于,前置比较器
FamilyFilter ff20 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
//列前缀比较器
ColumnPrefixFilter cf1 = new ColumnPrefixFilter("passw".getBytes());
//多个过滤器都要满足
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(ff20);
filterList.addFilter(cf1);
testScan(filterList);
}
public void testScan(Filter filter) throws IOException {
Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = t_user_info.getScanner(scan);
//迭代器
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
//获取一行记录
Result result = iter.next();
//获取到每一个cell
CellScanner cellScanner = result.cellScanner();
//遍历cell
while (cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] valueArray = current.getValueArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] rowArray = current.getRowArray();
System.out.print(new String(rowArray, current.getRowOffset(), current.getRowLength()) + " ");
System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
System.out.print(" " + new String(valueArray, current.getValueOffset(), current.getValueLength()));
System.out.println();
}
System.out.println("-----------------------------");
}
}
}
~~~
# 分页过滤器 PageFilter
~~~
package com.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Iterator;
public class HbaseDemo {
private Configuration conf = null;
private Connection conn = null;
@Before
public void init() throws IOException {
//构建个配置
conf = HBaseConfiguration.create();
//对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了
//因为hbase的客户端找hbase读写数据完全不用经过hmaster
conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");
conn = ConnectionFactory.createConnection(conf);
}
//分页查询
@Test
public void pageScan() throws IOException, InterruptedException {
final byte[] POSTFIX = {0x00};
//获取表
Table table = conn.getTable(TableName.valueOf("t_user_info"));
//分页过滤器
PageFilter filter = new PageFilter(3);
//起始行号
byte[] lastRow = null;
//总共的记录
int totalRows = 0;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
//当上次起始行不为空
if (lastRow != null) {
//设置本次查询的起始行键
//上次起始行加上后置,加后置可以获取上次结束行作为本次的起始行
byte[] startRow = Bytes.add(lastRow, POSTFIX);
//设置为起始行
scan.setStartRow(startRow);
}
//获取整个扫描的结果
ResultScanner scanner = table.getScanner(scan);
//定义本地的行号
int localRows = 0;
//结果
Result result;
//遍历一页的结果
while ((result = scanner.next()) != null) {
//localRows显示本地行号每页中的行号,result会调用toString
System.out.println(++localRows + ":" + result);
//全局行号++
totalRows++;
//上次起始的行号设置为这次结束的行号
lastRow = result.getRow();
}
scanner.close();
if (localRows == 0) {
break;
}
}
//打印本次总行数
System.out.println("total rows:" + totalRows);
}
}
~~~
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介