企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 分析 为什么需要用mapreduce去访问hbase的数据? ——加快分析速度和扩展分析能力 Mapreduce访问hbase数据作分析一定是在离线分析的场景下应用 ![](https://box.kancloud.cn/6d3e6f8c1a68f8a9ba33e2b494eabaa8_720x419.png) # 代码 ## 从Hbase中读取数据分析写入hdfs ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class HbaseReader { public static String t_user_info = "t_user_info"; //这边泛型决定出去 static class HdfsSinkMapper extends TableMapper<Text, NullWritable> { //key代表row key,value代表这一行结果 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { byte[] bytes = key.copyBytes(); //把row key变为string String rowkey = new String(bytes); //从这行中取数据 //注意bash_info这个列族下面的username这个列要有,不然会报空指针异常 byte[] usernameBytes = value.getValue("base_info".getBytes(), "username".getBytes()); String username = new String(usernameBytes); context.write(new Text(rowkey + "\t" + username), NullWritable.get()); } } //reduce从map中拿数据 static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181"); Job job = Job.getInstance(conf); job.setJarByClass(HbaseReader.class); // job.setMapperClass(HdfsSinkMapper.class); Scan scan = new Scan(); //初始化 TableMapReduceUtil.initTableMapperJob(t_user_info, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job); job.setReducerClass(HdfsSinkReducer.class); FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/output")); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } } ~~~ ## 从hdfs中读取数据写入Hbase 测试数据 ~~~ 13902070000 www.baidu.com beijing 13902070006 www.google.com.hk beijing 13902070012 www.google.com.hk shanghai 13902070018 www.baidu.com shanghai 13902070024 www.baidu.com guanzhou 13902070030 www.baidu.com tianjin ~~~ 创建表 ~~~ create 'flow_fields_import', 'info' ~~~ bean代码 ~~~ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean> { private Text phone; private Text url; public FlowBean() { } public FlowBean(Text phone, Text url) { super(); this.phone = phone; this.url = url; } public FlowBean(String phone, String url) { super(); this.phone = new Text(phone); this.url = new Text(url); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone.toString()); out.writeUTF(url.toString()); } @Override public void readFields(DataInput in) throws IOException { phone = new Text(in.readUTF()); url = new Text(in.readUTF()); } @Override public String toString() { final StringBuilder sb = new StringBuilder("{"); sb.append("\"phone\":") .append(phone); sb.append(",\"url\":") .append(url); sb.append('}'); return sb.toString(); } public Text getPhone() { return phone; } public void setPhone(Text phone) { this.phone = phone; } public Text getUrl() { return url; } public void setUrl(Text url) { this.url = url; } @Override public int compareTo(FlowBean o) { return this.phone.toString().compareTo(o.getPhone().toString()); } } ~~~ **mapreduce代码** ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; /** * User: jdxia * Date: 2018/7/25 * Time: 15:40 */ public class HbaseSinker { public static String flow_fields_import = "flow_fields_import"; public static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(" "); String phone = fields[0]; String url = fields[1]; FlowBean bean = new FlowBean(phone, url); context.write(bean, NullWritable.get()); } } public static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable> { @Override protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Put put = new Put(key.getPhone().getBytes()); put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes()); context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181"); HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); boolean tableExists = hBaseAdmin.tableExists(flow_fields_import); if (tableExists) { hBaseAdmin.disableTable(flow_fields_import); hBaseAdmin.deleteTable(flow_fields_import); } HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import)); HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("f1".getBytes()); desc.addFamily(hColumnDescriptor); hBaseAdmin.createTable(desc); Job job = Job.getInstance(conf); job.setJarByClass(HbaseSinker.class); job.setMapperClass(HbaseSinkMrMapper.class); TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/data.txt")); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Mutation.class); job.waitForCompletion(true); } } ~~~