MapReduce WordCount Combiner程序
MapReduce WordCount Combiner程序
MapReduce WordCount Combiner程序
注意使用Combiner之后的累加情况是不同的;
pom.xml
4.0.0
com.stono
mr01
1.0-SNAPSHOT
jar
mr01
http://maven.apache.org
UTF-8
1.7
UTF-8
yyyy-MM-dd HH:mm:ss
2.7.2
1.1.2
1.7.25
0.10.2.1
jdk.tools
jdk.tools
1.8
system
D:/Java/jdk1.8.0_161/lib/tools.jar
org.slf4j
slf4j-api
${slf4j.version}
org.apache.hadoop
hadoop-common
${hadoop-mapreduce-client.version}
org.apache.hadoop
hadoop-mapreduce-client-core
${hadoop-mapreduce-client.version}
junit
junit
3.8.1
test
maven-compiler-plugin
2.3.2
1.7
org.apache.maven.plugins
maven-jar-plugin
false
com.bsr.combiner.JobRunner
Mapper:
package com.bsr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
四个参数的含义
第一个参数:map中key-value的key的类型,默认值是输入行的偏移量
第二个参数:map中key-value的value的类型 在此需求中是某一行的内容(数据)
第三个参数:reduce中key-value中的key类型
第四个参数:redece的输出参数int
但是在mapreduce中涉及到了网络间的传输,所以需要序列化,而hadoop提供了相关的序列化类型
long-LongWritable
String-Text
int-IntWritable
*/
public class MapperWordCount extends Mapper{
/*重写mapper的map方法 编写自己的逻辑
* key是偏移量不用管
* value是一行的内容 例:hello zhangsan you you
* context是返回结果
*/
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
String[] values=value.toString().split(" ");//对得到的一行数据进行切分 在此需求中是以空格进行切分
for(String word:values){
context.write(new Text(word), new IntWritable(1));//遍历数组 输出map的返回值 即
}
}
}
Combiner:
package com.bsr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Combiner extends Reducer{
@Override
protected void reduce(Text key, Iterable values,
Context context)
throws IOException, InterruptedException {
int count=0;//初始一个计数器
for(IntWritable value:values){ count ++;//对values进行遍历 每次加1
}
context.write(key,new IntWritable(count));//最后写返回值
}
}
reduce:
package com.bsr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* 此方法是WordCount的reduce
* 参数:1.map阶段返回的key类型String-Text
* 2.map阶段返回值中value的类型Int-IntWritable
* 3.reduce阶段返回值中key的类型String-Text
* 4.reduce阶段返回值中value的类型Int-IntWritable
*/
public class ReducerWordCount extends Reducer{
/*
* 实现父类的reduce方法
*key是一组key-value的相同的哪个key
*values是一组key-value的所有value
*key value 的情况,比如
*
* context 返回值,
*/
@Override
protected void reduce(Text key, Iterable values,
Context context)throws IOException, InterruptedException {
int count=0;//初始一个计数器
for(IntWritable value:values){
count = count + i.get();//对values进行遍历 需要累加
}
context.write(key,new IntWritable(count));//最后写返回值
}
}
Job:
package com.bsr.combiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class JobRunner {
/*
* 提交写好的mapreduce程序 当做一个Job进行提交
*
*/
public static void main(String[] args) throws Exception {
//读取classpath下的所有xxx-site.xml配置文件,并进行解析
Configuration conf=new Configuration();
FileSystem fs = FileSystem.get(configuration);
String s = "/wc/output2";
Path path = new Path(s);
fs.delete(path, true)
Job wcjob=Job.getInstance(conf);//初始一个job
//通过主类的类加载器机制获取到本job的所有代码所在的jar包
wcjob.setJarByClass(JobRunner.class);
//指定本job使用的mapper类
wcjob.setMapperClass(MapperWordCount.class);
//指定本job使用的reducer类
wcjob.setReducerClass(ReducerWordCount.class);
//设置本job使用的从combiner类
wcjob.setCombinerClass(Combiner.class);
//指定mapper输出的kv的数据类型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(IntWritable.class);
//指定reducer输出的kv数据类型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.class);
//指定本job要处理的文件所在的路径
FileInputFormat.setInputPaths(wcjob, new Path("/wc/data/"));
//指定本job输出的结果文件放在哪个路径
FileOutputFormat.setOutputPath(wcjob, new Path("/wc/output2/"));
//将本job向hadoop集群提交执行
boolean res=wcjob.waitForCompletion(true);
System.exit(res?0:1);//执行成功的话正常退出系统执行有误则终止执行
}
}
注意:的讲解
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~