Hadoop之——以1.x版本和0.x版本分别实现单词统计功能

网友投稿 255 2022-11-20

Hadoop之——以1.x版本和0.x版本分别实现单词统计功能

本文提供一个以Hadoop MapReduce方式统计文本中每个单词的数量的例子,包含1.x版本和0.x版本的实现,同时简要说明了两个版本的不同,不多说,直接上代码

一、Hadoop 1.x版本的实现

package com.lyz.hadoop.count;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;/** * 利用Hadoop MapReduce统计文本中每个单词的数量 * @author liuyazhuang */public class WordCount { //要统计的文件位置 static final String INPUT_PATH = "hdfs://liuyazhuang:9000/d1/hello"; //统计结果输出的位置 static final String OUT_PATH = "hdfs://liuyazhuang:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); //如果已经存在输出文件,则先删除已存在的输出文件 if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final Job job = new Job(conf , WordCount.class.getSimpleName()); //1.1指定读取的文件位于哪里 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); //map输出的类型。如果的类型与类型一致,下面两行代码可以省略 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //1.3 分区 job.setPartitionerClass(HashPartitioner.class); //有一个reduce任务运行 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); //指定reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outPath); //指定输出文件的格式化类 job.setOutputFormatClass(TextOutputFormat.class); //把job提交给JobTracker运行 job.waitForCompletion(true); } /** * KEYIN 即k1 表示行的偏移量 * VALUEIN 即v1 表示行文本内容 * KEYOUT 即k2 表示行中出现的单词 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 */ static class MyMapper extends Mapper{ protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { final String[] splited = v1.toString().split("\t"); for (String word : splited) { context.write(new Text(word), new LongWritable(1)); } }; } /** * KEYIN 即k2 表示行中出现的单词 * VALUEIN 即v2 表示行中出现的单词的次数 * KEYOUT 即k3 表示文本中出现的不同单词 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 * */ static class MyReducer extends Reducer{ protected void reduce(Text k2, java.lang.Iterable v2s, Context ctx) throws java.io.IOException ,InterruptedException { long times = 0L; for (LongWritable count : v2s) { times += count.get(); } ctx.write(k2, new LongWritable(times)); }; }}

控制台打印信息

运行结果

二、Hadoop 0.x版本的实现

package com.lyz.hadoop.old;import java.io.IOException;import java.net.URI;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.HashPartitioner;import com.lyz.hadoop.count.WordCount;/** * Hadoop中的一些老的API用法 * Hadoop版本1.x的包一般是mapreduce * Hadoop版本0.x的包一般是mapred * @author liuyazhuang * */public class OldApp { //要统计的文件位置 static final String INPUT_PATH = "hdfs://liuyazhuang:9000/d1/hello"; //统计结果输出的位置 static final String OUT_PATH = "hdfs://liuyazhuang:9000/out"; /** * Hadoop老版本与新版本相比,不同点是: * 1:不再使用Job,而是使用JobConf * 2、包名是mapred而不是mapreduce * 3、不使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(JobConf对象) */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); //如果已经存在输出文件,则先删除已存在的输出文件 if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final JobConf job = new JobConf(conf , WordCount.class); //1.1指定读取的文件位于哪里 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormat(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); //map输出的类型。如果的类型与类型一致,下面两行代码可以省略 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //1.3 分区 job.setPartitionerClass(HashPartitioner.class); //有一个reduce任务运行 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); //指定reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outPath); //指定输出文件的格式化类 job.setOutputFormat(TextOutputFormat.class); //把job提交给JobTracker运行 JobClient.runJob(job); } /** * 新api extends Mapper * 老api extends MapReduceBase implements Mapper * @author liuyazhuang * */ static class MyMapper extends MapReduceBase implements Mapper{ @Override public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String[] splited = value.toString().split("\t"); for (String word : splited) { output.collect(new Text(word), new LongWritable(1)); } } } /** * 新api extends Reducer * 老api extends MapReduceBase implements Reducer * @author liuyazhuang * */ static class MyReducer extends MapReduceBase implements Reducer{ @Override public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { long times = 0; while (values.hasNext()) { times += values.next().get(); } output.collect(key, new LongWritable(times)); } }}

控制台打印信息

运行结果

总结:

1、包名不同

Hadoop版本1.x的包一般是mapreduce

Hadoop版本0.x的包一般是mapred

2、作业处理类不同

Hadoop版本1.x用Job,

Hadoop版本0.x使用JobConf

3、 作业提交方式不同

Hadoop版本1.x使用job.waitForCompletion(true)提交作业

Hadoop版本0.x使用JobClient.runJob(JobConf对象)提交作业

4、Mapper实现不同

Hadoop版本1.x api extends Mapper

Hadoop版本1.x api extends MapReduceBase implements Mapper

5、Reducer实现不同

Hadoop版本1.x api extends Reducer

Hadoop版本1.x api extends MapReduceBase implements Reducer

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:新Macbook Pro的接口、内存限制英特尔背锅?
下一篇:Hadoop之——MapReduce实战(二)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~