Reduce join 案例
一、需求分析
1、需求
order.txt
id pid amount
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
pd.txt
pid pname
01 小米
02 华为
03 格力
两张表合并成order.txt中的pid用 pd.txt中的pname代替
2、分析
a、map 将order.txt 和 pd.txt的数据进行组合,不同的文件对应不同的处理(flag),key为pid,value为自定义的Hadoop序列化
b、按照key会自动排序,因此不需要自定义排序
c、reduce map的数据合并
二、代码
1、自定义Hadoop序列化类
package com.wt.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TableBean implements Writable {
private String orderId;
private String pId;
private int amount;
private String pName;
private String flag;
public TableBean() {
}
public TableBean(String orderId, String pId, int amount, String pName, String flag) {
this.orderId = orderId;
this.pId = pId;
this.amount = amount;
this.pName = pName;
this.flag = flag;
}
@Override
public void write(DataOutput out) throws IOException {
// 序列化
out.writeUTF(orderId);
out.writeUTF(pId);
out.writeInt(amount);
out.writeUTF(pName);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
// 反序列化
this.orderId = in.readUTF();
this.pId = in.readUTF();
this.amount = in.readInt();
this.pName = in.readUTF();
this.flag = in.readUTF();
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getpId() {
return pId;
}
public void setpId(String pId) {
this.pId = pId;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getpName() {
return pName;
}
public void setpName(String pName) {
this.pName = pName;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return orderId + "\t" + pName + "\t" + pId + "\t";
}
}
2、Mapper
package com.wt.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TableMapper extends Mapper {
String name;
TableBean bean = new TableBean();
Text k = new Text();
// 获取文件文件名字
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
name = split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.读取行
String line = value.toString();
// 判断文件名称
if (name.startsWith("order")){
String[] fields = line.split("\t");
// 2.1 设置 value
bean.setOrderId(fields[0]);
bean.setpId(fields[1]);
bean.setAmount(Integer.parseInt(fields[2]));
bean.setpName("");
bean.setFlag("order");
// 2.2 设置 key pid
k.set(fields[1]);
}else {
String[] fields = line.split("\t");
// 3.1 设置 value
bean.setpId(fields[0]);
bean.setpName(fields[1]);
bean.setOrderId("");
bean.setAmount(0);
bean.setFlag("pd");
// 3.2 设置 key pid
k.set(fields[0]);
}
// 4. 写入
context.write(k, bean);
}
}
3、Reducer
package com.wt.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class TableReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 存储订单的集合
ArrayList orderBeans = new ArrayList();
// 订单
TableBean pdBean = new TableBean();
for (TableBean value : values) {
// 订单表
if ("order".equals(value.getFlag())){
// 拷贝传递过来的每条订单数据到集合中
TableBean orderBean = new TableBean();
try {
BeanUtils.copyProperties(orderBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderBeans.add(orderBean);
}else {
// 复制到pd
try {
BeanUtils.copyProperties(pdBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
// 表的连接
for (TableBean bean : orderBeans) {
bean.setpName(pdBean.getpName());
// 4. 写入
context.write(bean, NullWritable.get());
}
}
}
4、Driver
package com.wt.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"E:\\a\\inputjoin", "E:\\a\\output1"};
// 1.job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.jar
job.setJarByClass(TableDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
// 4.设置mapper的输出的 k v
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
// 5.设置输出的 k v
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 6.设置 文件的输入输出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7.提交job
boolean wait = job.waitForCompletion(true);
System.exit(wait? 0: 1);
}
}
思路(ReduceJoin):‘
1、自定义Hadoop序列化的类
2、Mapper分开两个文件、排序
3、Reducer按照key,传递值,设置想要输出的结果
4、设置驱动类
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~