MapReduce Cross 示例
MapReduce Cross 示例
MapReduce Cross 示例
package com.bsr.cross;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 第一次mr--目的是获取某一人是哪些人的好友
*
*
*/
public class Cross {
//输入:A:B,C,D,F,E,O
//输出:B->A C->A D->A F->A E->A O->A
public static class Map extends Mapper{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] value1=value.toString().split(":");
String[] value2=value1[1].split(",");
for (String string : value2) {
context.write(new Text(string), new Text(value1[0]));
}
}
}
public static class Reduce extends Reducer{
// 输入A>E>F>....
// 输出 B A,E,F,J
@Override
protected void reduce(Text key, Iterable value,Context context)
throws IOException, InterruptedException {
StringBuffer sb=new StringBuffer();
for (Text text : value) {
sb.append(text+",");
}
context.write(key, new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
//读取classpath下的所有xxx-site.xml配置文件,并进行解析
Configuration conf=new Configuration();
FileSystem fs = FileSystem.get(configuration);
String s = "/wc/output3";
Path path = new Path(s);
fs.delete(path, true);
Job job=Job.getInstance(conf);
//通过主类的类加载器机制获取到本job的所有代码所在的jar包
job.setJarByClass(Cross.class);
//指定本job使用的mapper类
job.setMapperClass(Map.class);
//指定本job使用的reducer类
job.setReducerClass(Reduce.class);
//指定mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定reducer输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定本job要处理的文件所在的路径
FileInputFormat.setInputPaths(job, new Path("/wc/data/"));
FileOutputFormat.setOutputPath(job, new Path("/wc/output3"));
//将本job向hadoop集群提交执行
boolean flag=job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
进行了逻辑的转换;
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~