MappReducer示例代码

网友投稿 228 2022-11-25

MappReducer示例代码

import Utils.fileUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;import java.io.IOException;

import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath;

public class mapreduce {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String input = "data/access.log" ; String output = "out" ; final Configuration co = new Configuration() ; //获取 Job 对象 final Job job = Job.getInstance(co); //设置class job.setJarByClass(mapreduce.class); //设置mapper 和 Reduce job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置 Mapper 阶段输出数据的key 和value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Access.class); //设置 Reducer 阶段输出数据的key 和value job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Access.class); //设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); //删除输出路径中的 文件 Path outDir = getOutputPath(job) ; if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) { File file = new File(outDir.toUri()) ; if(file.isDirectory()){ File[] childrenFiles = file.listFiles(); for (File childFile:childrenFiles){ childFile.delete() ; } } file.delete(); } //提交 job final boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } /** * 文件中的偏移量,单行文件内容, 分类的key , 存储数据自定义的类 * * */ public static class MyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t") ; context.write(new Text(splits[1]),new Access(splits[1],splits[splits.length-3],splits[splits.length-2])); } } /** * Text, Access, NullWritable, Access * 对应mapper 中的 * 分类的key , 存储数据自定义的类 输出key(舍去不要),存储数据自定义的类 */ public static class MyReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long ups = 0 ; long downs = 0 ; long count = 0 ; for (Access ac :values){ ups += ac.getUp() ; downs += ac.getDown(); count++ ; } context.write(NullWritable.get(), new Access(key.toString(),String.valueOf(ups),String.valueOf(downs),count)); } }

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java Spring MVC获取请求数据详解操作
下一篇:【必读】IBM大佬漫谈留存
相关文章

 发表评论

暂时没有评论,来抢沙发吧~