MapReduce WordCount Combiner程序

网友投稿 223 2022-11-24

MapReduce WordCount Combiner程序

MapReduce WordCount Combiner程序

MapReduce WordCount Combiner程序 注意使用Combiner之后的累加情况是不同的; pom.xml 4.0.0 com.stono mr01 1.0-SNAPSHOT jar mr01 http://maven.apache.org UTF-8 1.7 UTF-8 yyyy-MM-dd HH:mm:ss 2.7.2 1.1.2 1.7.25 0.10.2.1 jdk.tools jdk.tools 1.8 system D:/Java/jdk1.8.0_161/lib/tools.jar org.slf4j slf4j-api ${slf4j.version} org.apache.hadoop hadoop-common ${hadoop-mapreduce-client.version} org.apache.hadoop hadoop-mapreduce-client-core ${hadoop-mapreduce-client.version} junit junit 3.8.1 test maven-compiler-plugin 2.3.2 1.7 1.7 org.apache.maven.plugins maven-jar-plugin false com.bsr.combiner.JobRunner Mapper: package com.bsr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* 四个参数的含义 第一个参数:map中key-value的key的类型,默认值是输入行的偏移量 第二个参数:map中key-value的value的类型 在此需求中是某一行的内容(数据) 第三个参数:reduce中key-value中的key类型 第四个参数:redece的输出参数int 但是在mapreduce中涉及到了网络间的传输,所以需要序列化,而hadoop提供了相关的序列化类型 long-LongWritable String-Text int-IntWritable */ public class MapperWordCount extends Mapper{ /*重写mapper的map方法 编写自己的逻辑 * key是偏移量不用管 * value是一行的内容 例:hello zhangsan you you * context是返回结果 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values=value.toString().split(" ");//对得到的一行数据进行切分 在此需求中是以空格进行切分 for(String word:values){ context.write(new Text(word), new IntWritable(1));//遍历数组 输出map的返回值 即 } } }   Combiner: package com.bsr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class Combiner extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count=0;//初始一个计数器 for(IntWritable value:values){ count ++;//对values进行遍历 每次加1 } context.write(key,new IntWritable(count));//最后写返回值 } }   reduce: package com.bsr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* * 此方法是WordCount的reduce * 参数:1.map阶段返回的key类型String-Text * 2.map阶段返回值中value的类型Int-IntWritable * 3.reduce阶段返回值中key的类型String-Text * 4.reduce阶段返回值中value的类型Int-IntWritable */ public class ReducerWordCount extends Reducer{ /* * 实现父类的reduce方法 *key是一组key-value的相同的哪个key *values是一组key-value的所有value *key value 的情况,比如 * * context 返回值, */ @Override protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException { int count=0;//初始一个计数器 for(IntWritable value:values){ count = count + i.get();//对values进行遍历 需要累加 } context.write(key,new IntWritable(count));//最后写返回值 } }   Job: package com.bsr.combiner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; public class JobRunner { /* * 提交写好的mapreduce程序 当做一个Job进行提交 * */ public static void main(String[] args) throws Exception { //读取classpath下的所有xxx-site.xml配置文件,并进行解析 Configuration conf=new Configuration(); FileSystem fs = FileSystem.get(configuration); String s = "/wc/output2"; Path path = new Path(s); fs.delete(path, true) Job wcjob=Job.getInstance(conf);//初始一个job //通过主类的类加载器机制获取到本job的所有代码所在的jar包 wcjob.setJarByClass(JobRunner.class); //指定本job使用的mapper类 wcjob.setMapperClass(MapperWordCount.class); //指定本job使用的reducer类 wcjob.setReducerClass(ReducerWordCount.class); //设置本job使用的从combiner类 wcjob.setCombinerClass(Combiner.class); //指定mapper输出的kv的数据类型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(IntWritable.class); //指定reducer输出的kv数据类型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); //指定本job要处理的文件所在的路径 FileInputFormat.setInputPaths(wcjob, new Path("/wc/data/")); //指定本job输出的结果文件放在哪个路径 FileOutputFormat.setOutputPath(wcjob, new Path("/wc/output2/")); //将本job向hadoop集群提交执行 boolean res=wcjob.waitForCompletion(true); System.exit(res?0:1);//执行成功的话正常退出系统执行有误则终止执行 } }   注意:的讲解

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:中国电信如何将网络能力开放并高效输出并赋能客户创造价值?
下一篇:java如何获取用户登录ip、浏览器信息、SessionId
相关文章

 发表评论

暂时没有评论,来抢沙发吧~