MapReduce单词统计

网友投稿 220 2022-11-26

MapReduce单词统计

WordcountMapper类

package com.sky.mr.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.junit.Test; import java.io.IOException; public class WordcountMapper extends Mapper { //由于每读一行文本数据,就要调用一次map方法,为了避免多次创建对象,浪费内存资源,将Text,IntWritable对象创建在 //map方法之外 Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取每一行的文本内容 String line = value.toString(); //按空格分割 String[] words = line.split(" "); //转换数据格式,输出 for ( String word: words) { k.set(word); context.write(k, v); } } }

WordcountReducer类

package com.sky.mr.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordcountReducer extends Reducer { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //求每组相同key的总个数 int sum = 0; for ( IntWritable count:values) { sum += count.get(); } //输出 v.set(sum); context.write(key, v); } }

WordcountDriver类

package com.sky.mr.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1、获取配置信息以及job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2、设置jar包路径 job.setJarByClass(WordcountDriver.class); //3、关联自定义mapper和reducer类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //4、设置Map输出key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置最终结果key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置文件输入输出路径 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //7、将封装了MapReduce程序运行参数的job对象,提交到Yarn集群 boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }

输入文件

import org apache hadoop io import org apache hadoop io import org apache hadoop import java io IOException

输出文件

IOException 1apache 3hadoop 3import 4io 3java 1org 3

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:spring @schedule注解如何动态配置时间间隔
下一篇:Hadoop HA 集群配置文件
相关文章

 发表评论

暂时没有评论,来抢沙发吧~