ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] # 过滤器查询 过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器 过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端; 注意,**可以用`reset()`重置过滤器** hbase过滤器的比较运算符: ~~~ LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除所有 ~~~ Hbase过滤器的比较器(指定比较机制): ~~~ BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[]) BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同 NullComparator 判断给定的是否为空 BitComparator 按位比较 RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL SubstringComparator 判断提供的子串是否出现在value中 ~~~ # Hbase的过滤器分类 ## 比较过滤器 **行键过滤器RowFilter** 筛选出匹配的所有的行 ~~~ Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22"))); scan.setFilter(filter1); ~~~ **列族过滤器FamilyFilter** ~~~ Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3"))); scan.setFilter(filter1); ~~~ **列过滤器QualifierFilter** ~~~ filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2"))); scan.setFilter(filter1); ~~~ **值过滤器 ValueFilter** ~~~ Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") ); scan.setFilter(filter1); ~~~ ## 专用过滤器 **单列值过滤器 SingleColumnValueFilter** ----会返回满足条件的整行 ~~~ SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"), CompareFilter.CompareOp.NOT_EQUAL, new SubstringComparator("val-5")); filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回 scan.setFilter(filter1); ~~~ **SingleColumnValueExcludeFilter** 与上相反 **前缀过滤器 PrefixFilter----针对行键** 筛选出具有特定前缀的行键的数据 ~~~ Filter filter = new PrefixFilter(Bytes.toBytes("row1")); scan.setFilter(filter1); ~~~ **列前缀过滤器 ColumnPrefixFilter** ~~~ Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2")); scan.setFilter(filter1); ~~~ **仅仅是行键过滤器 KeyOnlyFilter** 这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用: ~~~ Filter kof = new KeyOnlyFilter(); // OK 返回所有的行,但值全是空 ~~~ **随机行过滤器 RandomRowFilter** 从名字上就可以看出其大概的用法,本过滤器的作用就是按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个RandomRowFilter会返回不通的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器: ~~~ Filter rrf = new RandomRowFilter((float) 0.8); // OK 随机选出一部分的行 ~~~ **包含起始行,但不包含终止行 InclusiveStopFilter** 扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行,如果我们想要同时包含起始行和终止行,那么我们可以使用此过滤器: ~~~ Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1")); // OK 包含了扫描的上限在结果之内 ~~~ **时间戳过滤器 TimestampsFilter** 需要在扫描结果中对版本进行细粒度控制。一个版本是指一个列在一个特定时间的值。 ~~~ filter = TimestampsFilter (1435747469212, 1435738500459); ~~~ ~~~ timestamps.add(1479788961691L); timestamps.add(1479788676517L); timestamps.add(1479788812565L); Filter filter = new TimestampsFilter(timestamps); ~~~ 返回的结果就是在这个时间戳的数据 ## 过滤器代码 ~~~ package com.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Iterator; public class HbaseDemo { private Configuration conf = null; private Connection conn = null; @Before public void init() throws IOException { //构建个配置 conf = HBaseConfiguration.create(); //对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了 //因为hbase的客户端找hbase读写数据完全不用经过hmaster conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181"); conn = ConnectionFactory.createConnection(conf); } @Test public void testFilter() throws IOException { //针对行键的前缀过滤器,row key,前缀过滤 Filter pf = new PrefixFilter(Bytes.toBytes("liu")); testScan(pf); //行过滤器 //比较运算符 //小于,BinaryComparator比较器按照字节字典, LESS排在user002他之前的row key都出来 RowFilter rf1 = new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("user002"))); //在row key中包含00就符合 RowFilter rf2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("00")); testScan(rf1); System.out.println("*****************"); testScan(rf2); //针对指定一个列的value来过滤,会显示一个完整的列 //列族名base_info,列标识符password,值是123456 //注意这边选择的运算符 SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "password".getBytes(), CompareFilter.CompareOp.EQUAL, "123456".getBytes()); //如果指定的列缺失,则也过滤掉 scvf.setFilterIfMissing(true); testScan(scvf); System.out.println("************"); //针对指定一个列的value的比较器来过滤 //正则比较器 //包含以zhang这个字符串开头的value值,符合这个要求的列 RegexStringComparator comparator1 = new RegexStringComparator("^zhang"); //子串包含si的值,符合这个条件的列 ByteArrayComparable comparator2 = new SubstringComparator("si"); //第三个参数可更换 SingleColumnValueFilter scvf1 = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareFilter.CompareOp.EQUAL, comparator2); testScan(scvf1); //针对列族名的过滤器,返回结果中只会包含满足条件的列族中的数据 //等于,列族中名称info FamilyFilter ff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("info"))); //包含这个base前置的列族的对应列 FamilyFilter ff2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base"))); testScan(ff2); //针对列名的过滤器,返回结果中只会包含满足条件的列的数据 QualifierFilter qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("password"))); QualifierFilter qf2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("us"))); testScan(qf2); //跟SingleColumnValueFilter结果不同,只返回符合条件的该column //列名前缀过滤 ColumnPrefixFilter cf = new ColumnPrefixFilter("passw".getBytes()); testScan(cf); //指定多个列条件,但是这些条件是或的关系 byte[][] prefixes = {Bytes.toBytes("username"), Bytes.toBytes("password")}; MultipleColumnPrefixFilter mcf = new MultipleColumnPrefixFilter(prefixes); testScan(mcf); //多个过滤器 //等于,前置比较器 FamilyFilter ff20 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base"))); //列前缀比较器 ColumnPrefixFilter cf1 = new ColumnPrefixFilter("passw".getBytes()); //多个过滤器都要满足 //如果是只想满足一个条件,FilterList.Operator.MUST_PASS_ONE FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); filterList.addFilter(ff20); filterList.addFilter(cf1); testScan(filterList); } public void testScan(Filter filter) throws IOException { Table t_user_info = conn.getTable(TableName.valueOf("t_user_info")); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = t_user_info.getScanner(scan); //迭代器 Iterator<Result> iter = scanner.iterator(); while (iter.hasNext()) { //获取一行记录 Result result = iter.next(); //获取到每一个cell CellScanner cellScanner = result.cellScanner(); //遍历cell while (cellScanner.advance()) { Cell current = cellScanner.current(); byte[] familyArray = current.getFamilyArray(); byte[] valueArray = current.getValueArray(); byte[] qualifierArray = current.getQualifierArray(); byte[] rowArray = current.getRowArray(); System.out.print(new String(rowArray, current.getRowOffset(), current.getRowLength()) + " "); System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength())); System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength())); System.out.print(" " + new String(valueArray, current.getValueOffset(), current.getValueLength())); System.out.println(); } System.out.println("-----------------------------"); } } } ~~~ # 分页过滤器 PageFilter ~~~ package com.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Iterator; public class HbaseDemo { private Configuration conf = null; private Connection conn = null; @Before public void init() throws IOException { //构建个配置 conf = HBaseConfiguration.create(); //对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了 //因为hbase的客户端找hbase读写数据完全不用经过hmaster conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181"); conn = ConnectionFactory.createConnection(conf); } //分页查询 @Test public void pageScan() throws IOException, InterruptedException { final byte[] POSTFIX = {0x00}; //获取表 Table table = conn.getTable(TableName.valueOf("t_user_info")); //分页过滤器,每页多少条数据 PageFilter filter = new PageFilter(3); //起始行号,这边设为空 byte[] lastRow = null; //总共的记录 int totalRows = 0; while (true) { Scan scan = new Scan(); scan.setFilter(filter); //当上次起始行不为空 if (lastRow != null) { //设置本次查询的起始行键 //上次起始行加上后置,加后置可以获取上次结束行作为本次的起始行 byte[] startRow = Bytes.add(lastRow, POSTFIX); //设置为起始行 scan.setStartRow(startRow); } //获取整个扫描的结果 ResultScanner scanner = table.getScanner(scan); //定义本地的行号 int localRows = 0; //结果 Result result; //遍历一页的结果 while ((result = scanner.next()) != null) { //localRows显示本地行号每页中的行号,result会调用toString System.out.println(++localRows + ":" + result); //全局行号++ totalRows++; //上次起始的行号设置为这次结束的行号 lastRow = result.getRow(); } scanner.close(); if (localRows == 0) { break; } } //打印本次总行数 System.out.println("total rows:" + totalRows); } } ~~~