MapReduce 中 JoinReduce 示例代码

网友投稿 230 2022-11-25

MapReduce 中 JoinReduce 示例代码

自定义类

package groupby;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;

public class join implements Writable {

private int empno ; private String ename ; private String deptno ; private String deptname ; private String flage ; public join(int empno,String ename,String deptno,String deptname,String flage ){ this.ename =ename ; this.deptname = deptname ; this.deptno = deptno ; this.empno = empno ; this.flage = flage ; } public join(){} public int getEmpno() { return empno; } public void setEmpno(int empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getDeptno() { return deptno; } public void setDeptno(String deptno) { this.deptno = deptno; } public String getDeptname() { return deptname; } public void setDeptname(String deptname) { this.deptname = deptname; } public String getFlage() { return flage; } public void setFlage(String flage) { this.flage = flage; } @Override public String toString() { return empno + "\t" + ename +"\t" + deptno + "\t" + deptname ; } @Override public void write(DataOutput out) throws IOException { out.writeInt(empno); out.writeUTF(ename); out.writeUTF(deptno); out.writeUTF(deptname); out.writeUTF(flage); } @Override public void readFields(DataInput in) throws IOException { this.empno = in.readInt(); this.ename = in.readUTF() ; this.deptno = in.readUTF() ; this.deptname = in.readUTF(); this.flage = in.readUTF() ; }

}

package groupby;

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;import java.io.IOException;

import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath;

public class mapreduce {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

String input = "data1" ; String output = "out1" ; final Configuration co = new Configuration() ; //获取 Job 对象 final Job job = Job.getInstance(co); //设置class job.setJarByClass(groupby.mapreduce.class); //设置mapper 和 Reduce job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置 Mapper 阶段输出数据的key 和value job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(join.class); //设置 Reducer 阶段输出数据的key 和value job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(join.class); //设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); //删除输出路径中的 文件 Path outDir = getOutputPath(job) ; if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) { File file = new File(outDir.toUri()) ; if(file.isDirectory()){ File[] childrenFiles = file.listFiles(); for (File childFile:childrenFiles){ childFile.delete() ; } } file.delete(); } //提交 job final boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } /** * 文件中的偏移量,单行文件内容, 分类的key , 存储数据自定义的类 * * */ public static class MyMapper extends Mapper { private String name = "" ; @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit in = (FileSplit) context.getInputSplit(); name = in.getPath().getName() ; } //join(int empno,String ename,String deptno,String deptname,String flage ) @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] js = value.toString().split("\t") ; if(name.contains("dept")){ if(js.length ==3){ context.write(new IntWritable(Integer.valueOf(js[0].trim())),new join(0,"",js[0].trim(),js[1].trim(),"d")); } }else{ if(js.length == 8){ int deptno = Integer.valueOf(js[7].trim() ); join jj = new join(Integer.valueOf(js[0].trim() ),js[1].trim(),js[7].trim(),"","e") ; context.write(new IntWritable(deptno),jj); } } } } /** * Text, Access, NullWritable,Access * 对应mapper 中的 * 分类的key , 存储数据自定义的类 */ public static class MyReducer extends Reducer { @Override protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { String name = ""; for (join value : values) { if(value.getFlage().equalsIgnoreCase("d")){ name = value.getDeptname() ; break; } } for (join value : values) { if(value.getFlage().equalsIgnoreCase("e")){ value.setDeptname(name); context.write(NullWritable.get(),value); } } } }

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:CDH6 开启kerberos Hive的Sentry 用户权限分配
下一篇:解析Intel Optane SSD写寿命翻倍
相关文章

 发表评论

暂时没有评论,来抢沙发吧~