Combiner 合并 知识点 案例

网友投稿 288 2022-11-24

Combiner 合并 知识点 案例

一、概述 1、Combiner是MR程序中Mapper和Reducer之外的一种组件 2、Combiner继承Reducer 3、Combiner在每个Map Task的节点上运行, Reducer接收全局的Mapper结果 4、Combiner对每个Map Task的输出进行局部汇总,减少网络传输 5、并不是所有的运算,都可以使用局部汇总,如求平均值 二、自定义Combiner类 1、继承Reducer,重写reduce方法 2、在driver中设置job的Combiner驱动 3、Combiner的输入kv 与 Mapper的输出 kv 一致, Combiner的输出kv 与 Reducer的输入 kv一致 4、reduce()的作用是局部统计Map Task的输出结果与Mapper的语法相似 三、WordCount 1、Mapper package com.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1. 读取行 String line = value.toString(); // 2. 切割 String[] words = line.split("\\s"); // 3. 循环写入 for (String word : words) { k.set(word); context.write(k, v); } } } 2、Combiner package com.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountCombiner extends Reducer { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; // 1.累加 for (IntWritable value : values) { sum += value.get(); } v.set(sum); // 2.写入 context.write(key, v); } } 3、Reducer package com.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1. 累加 int sum = 0; for (IntWritable value : values) { sum += value.get(); } v.set(sum); // 2. 写入 context.write(key, v); } } 4、Driver package com.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:\\a\\inputFile\\test.txt", "E:\\a\\output3"}; // 1.job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.设置jar job.setJarByClass(WordCountDriver.class); // 3.关联mapper和reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4.设置mapper输出的 k, v job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5.设置输出结果的k, v job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 7.设置 Combiner 类 job.setCombinerClass(WordCountCombiner.class); // 6.设置文件的输入输出值 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7.提交任务 boolean wait = job.waitForCompletion(true); System.exit(wait? 0: 1); } }  注意: 因为Combiner和Reducer的代码逻辑一样 因此在Driver中添加下面内容即可 job.setCombinerClass(WordCountReducer.class);

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:自定义 OutputFormat案例
下一篇:西门子plc拓扑编辑器
相关文章

 发表评论

暂时没有评论,来抢沙发吧~