MapReduce Cross 示例

网友投稿 245 2022-11-24

MapReduce Cross 示例

MapReduce Cross 示例

MapReduce Cross 示例 package com.bsr.cross; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 第一次mr--目的是获取某一人是哪些人的好友 * * */ public class Cross { //输入:A:B,C,D,F,E,O //输出:B->A C->A D->A F->A E->A O->A public static class Map extends Mapper{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] value1=value.toString().split(":"); String[] value2=value1[1].split(","); for (String string : value2) { context.write(new Text(string), new Text(value1[0])); } } } public static class Reduce extends Reducer{ // 输入A>E>F>.... // 输出 B A,E,F,J @Override protected void reduce(Text key, Iterable value,Context context) throws IOException, InterruptedException { StringBuffer sb=new StringBuffer(); for (Text text : value) { sb.append(text+","); } context.write(key, new Text(sb.toString())); } } public static void main(String[] args) throws Exception { //读取classpath下的所有xxx-site.xml配置文件,并进行解析 Configuration conf=new Configuration(); FileSystem fs = FileSystem.get(configuration); String s = "/wc/output3"; Path path = new Path(s); fs.delete(path, true); Job job=Job.getInstance(conf); //通过主类的类加载器机制获取到本job的所有代码所在的jar包 job.setJarByClass(Cross.class); //指定本job使用的mapper类 job.setMapperClass(Map.class); //指定本job使用的reducer类 job.setReducerClass(Reduce.class); //指定mapper输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定reducer输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定本job要处理的文件所在的路径 FileInputFormat.setInputPaths(job, new Path("/wc/data/")); FileOutputFormat.setOutputPath(job, new Path("/wc/output3")); //将本job向hadoop集群提交执行 boolean flag=job.waitForCompletion(true); System.exit(flag?0:1); } } 进行了逻辑的转换;

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java如何获取用户登录ip、浏览器信息、SessionId
下一篇:如何更好地设计面向在板烧录的产品
相关文章

 发表评论

暂时没有评论,来抢沙发吧~