Reduce join 案例

网友投稿 248 2022-11-24

Reduce join 案例

一、需求分析 1、需求 order.txt id pid amount 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6 pd.txt pid pname 01 小米 02 华为 03 格力 两张表合并成order.txt中的pid用 pd.txt中的pname代替 2、分析 a、map 将order.txt 和 pd.txt的数据进行组合,不同的文件对应不同的处理(flag),key为pid,value为自定义的Hadoop序列化 b、按照key会自动排序,因此不需要自定义排序 c、reduce map的数据合并 二、代码 1、自定义Hadoop序列化类   package com.wt.reducejoin; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class TableBean implements Writable { private String orderId; private String pId; private int amount; private String pName; private String flag; public TableBean() { } public TableBean(String orderId, String pId, int amount, String pName, String flag) { this.orderId = orderId; this.pId = pId; this.amount = amount; this.pName = pName; this.flag = flag; } @Override public void write(DataOutput out) throws IOException { // 序列化 out.writeUTF(orderId); out.writeUTF(pId); out.writeInt(amount); out.writeUTF(pName); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { // 反序列化 this.orderId = in.readUTF(); this.pId = in.readUTF(); this.amount = in.readInt(); this.pName = in.readUTF(); this.flag = in.readUTF(); } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getpId() { return pId; } public void setpId(String pId) { this.pId = pId; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getpName() { return pName; } public void setpName(String pName) { this.pName = pName; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public String toString() { return orderId + "\t" + pName + "\t" + pId + "\t"; } } 2、Mapper package com.wt.reducejoin; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class TableMapper extends Mapper { String name; TableBean bean = new TableBean(); Text k = new Text(); // 获取文件文件名字 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); name = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.读取行 String line = value.toString(); // 判断文件名称 if (name.startsWith("order")){ String[] fields = line.split("\t"); // 2.1 设置 value bean.setOrderId(fields[0]); bean.setpId(fields[1]); bean.setAmount(Integer.parseInt(fields[2])); bean.setpName(""); bean.setFlag("order"); // 2.2 设置 key pid k.set(fields[1]); }else { String[] fields = line.split("\t"); // 3.1 设置 value bean.setpId(fields[0]); bean.setpName(fields[1]); bean.setOrderId(""); bean.setAmount(0); bean.setFlag("pd"); // 3.2 设置 key pid k.set(fields[0]); } // 4. 写入 context.write(k, bean); } } 3、Reducer package com.wt.reducejoin; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; public class TableReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 存储订单的集合 ArrayList orderBeans = new ArrayList(); // 订单 TableBean pdBean = new TableBean(); for (TableBean value : values) { // 订单表 if ("order".equals(value.getFlag())){ // 拷贝传递过来的每条订单数据到集合中 TableBean orderBean = new TableBean(); try { BeanUtils.copyProperties(orderBean, value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(orderBean); }else { // 复制到pd try { BeanUtils.copyProperties(pdBean, value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } // 表的连接 for (TableBean bean : orderBeans) { bean.setpName(pdBean.getpName()); // 4. 写入 context.write(bean, NullWritable.get()); } } } 4、Driver package com.wt.reducejoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:\\a\\inputjoin", "E:\\a\\output1"}; // 1.job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.jar job.setJarByClass(TableDriver.class); // 3.关联mapper和reducer job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); // 4.设置mapper的输出的 k v job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); // 5.设置输出的 k v job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); // 6.设置 文件的输入输出 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7.提交job boolean wait = job.waitForCompletion(true); System.exit(wait? 0: 1); } } 思路(ReduceJoin):‘ 1、自定义Hadoop序列化的类 2、Mapper分开两个文件、排序 3、Reducer按照key,传递值,设置想要输出的结果 4、设置驱动类

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SoC开发时USB Type-C传输的安全性如何保护
下一篇:最受程序员欢迎的十大编程语言
相关文章

 发表评论

暂时没有评论,来抢沙发吧~