企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
通过将关联条件作为 map 输出的 key,将两表满足 join 条件的数据并携带数据所来源的文件信息,发往同一个 reduce task,在 reduce 中进行数据的串联。 1. **创建客户信息和订单合并后的 bean 类**。 ```java package com.kgc.mapreduce.entry; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CustomerOrders implements Writable { //客户 id private String customerId; //订单 id private String orderId; //客户名称 private String customerName; //订单状态 private String orderStatus; //标志 private int flag; public CustomerOrders() { } public CustomerOrders(String customerId, String orderId, String customerName, String orderStatus, int flag) { this.customerId = customerId; this.orderId = orderId; this.customerName = customerName; this.orderStatus = orderStatus; this.flag = flag; } /** * 序列化 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(customerId); dataOutput.writeUTF(customerName); dataOutput.writeUTF(orderId); dataOutput.writeUTF(orderStatus); dataOutput.writeInt(flag); } /** * 反序列化 * * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.customerId = dataInput.readUTF(); this.customerName = dataInput.readUTF(); this.orderId = dataInput.readUTF(); this.orderStatus = dataInput.readUTF(); this.flag = dataInput.readInt(); } @Override public String toString() { return orderId + "\t" + customerName + "\t" + orderStatus; } public String getCustomerId() { return customerId; } public void setCustomerId(String customerId) { this.customerId = customerId; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getCustomerName() { return customerName; } public void setCustomerName(String customerName) { this.customerName = customerName; } public String getOrderStatus() { return orderStatus; } public void setOrderStatus(String orderStatus) { this.orderStatus = orderStatus; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } } ``` 2. **编写 CustomerOrderMapper 程序。** ```java package com.kgc.mapreduce.mapper; import com.kgc.mapreduce.entry.CustomerOrders; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class CustomerOrderMapper extends Mapper<LongWritable, Text, Text, CustomerOrders> { private String name; private CustomerOrders customerOrders = new CustomerOrders(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取分片 FileSplit fileInput = (FileSplit) context.getInputSplit(); name = fileInput.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); if (name.startsWith("order")) {//订单文件 customerOrders.setCustomerId(split[2]); customerOrders.setOrderId(split[0]); customerOrders.setOrderStatus(split[3]); customerOrders.setFlag(1); customerOrders.setCustomerName(""); } else { customerOrders.setCustomerId(split[0]); customerOrders.setCustomerName(split[1]); customerOrders.setFlag(0); customerOrders.setOrderId(""); customerOrders.setOrderStatus(""); } context.write(new Text(customerOrders.getCustomerId()), customerOrders); } } ``` 3. **编写 COReducer 程序。** ```java package com.kgc.mapreduce.reducer; import com.kgc.mapreduce.entry.CustomerOrders; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; public class COReducer extends Reducer<Text, CustomerOrders, CustomerOrders, NullWritable> { @Override protected void reduce(Text key, Iterable<CustomerOrders> values, Context context) throws IOException, InterruptedException { // 1 准备存储订单的集合 ArrayList<CustomerOrders> orderBeans = new ArrayList<>(); // 2 准备合并 bean 对象 CustomerOrders cusBean = new CustomerOrders(); for (CustomerOrders bean : values) { if (1 == bean.getFlag()) {// 订单表 // 拷贝传递过来的每条订单数据到集合中 CustomerOrders orderBean = new CustomerOrders(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } orderBeans.add(orderBean); } else {// 客户信息表 try { // 拷贝传递过来的产品表到内存中 BeanUtils.copyProperties(cusBean, bean); } catch (Exception e) { e.printStackTrace(); } } } // 3 客户信息表与订单表的拼接 for (CustomerOrders bean : orderBeans) { bean.setCustomerName(cusBean.getCustomerName()); // 4 写数据 context.write(bean, NullWritable.get()); } } } ``` 4. **编写 CODriver 程序。** ```java package com.kgc.mapreduce.driver; import com.kgc.mapreduce.entry.CustomerOrders; import com.kgc.mapreduce.mapper.CustomerOrderMapper; import com.kgc.mapreduce.reducer.COReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CODriver { public static void main(String[] args) throws Exception { // 1 获取配置信息,或者 job 对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的 jar 包所在的本地路径 job.setJarByClass(CODriver.class); // 3 指定本业务 job 要使用的 mapper/Reducer 业务类 job.setMapperClass(CustomerOrderMapper.class); job.setReducerClass(COReducer.class); // 4 指定 mapper 输出数据的 kv 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CustomerOrders.class); // 5 指定最终输出的数据的 kv 类型 job.setOutputKeyClass(CustomerOrders.class); job.setOutputValueClass(NullWritable.class); // 6 指定 job 的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path("d:\\input")); FileOutputFormat.setOutputPath(job, new Path("d:\\output")); // 7 将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包,提交给 yarn 去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } ``` 缺点:这种方式中,合并的操作是在 reduce 阶段完成,reduce 端的处理压力太大,map 节点的运算负载则很低,资源利用率不高,且在 reduce 阶段极易产生数据倾斜。<br/> 解决方案:map 端实现数据合并。