c语言sscanf函数的用法是什么
291
2022-11-26
二、MapReduce基本编程规范
[TOC]
一、MapReduce编程基本组成
编写MapReduce的程序有至少三个必不可少的部分:mapper,reducer,driver。可选的有 partitioner,combiner而且mapper的输入输出、reducer的输入输出都是key value型的,所以要求我们在编写mapper和reducer时,必须实现明确这4个键值对中的8种数据类型,而且必须还是hadoop的可序列化类型。同时还需要注意的是,map的输出其实就是reduce的输入,所以包括的数据类型是一样的。
1、map阶段
编写基本流程1)自定义map类,需要继承 Mapper这个类2)继承Mapper 的时候,需要指定输入和输出的键值对中的类型3)必须重写继承自父类的map() 方法4)上面重写的map() 方法是每个map task对每一个输入到mapper中的键值对都会调用处理一次。
基本编写实例如下:
/*
指定Mapper
2、reduce阶段
基本编写流程1)自定义reduce类,需要继承 Reducer这个类2)继承Reducer的时候,需要指定输入和输出的键值对中的类型3)必须重写继承自父类的reduce() 方法4)上面重写的reduce() 方法是每个reduer task对每一个输入到reducer中的键值对都会调用处理一次。
基本编写实例如下:
/*
指定Reducer
3、driver阶段
这个部分是用于配置job对象的各种必须配置信息,配置完成后,将job提交给yarn执行具体配置啥下面直接上例子看好了。主要起到调度map和reduce任务执行的作用
4、partitioner阶段
这个阶段主要是对map阶段的输出进行分区,而map的分区数直接决定reduce task的数量(一般来说是一对一),编写流程如下:1)自定义分区类,继承 Partitioner
编写案例如下:
public class WordCountPartitioner extends Partitioner
5、combiner
combiner不是一个独立的阶段,它其实是包含在map阶段中的。map本身输出的键值对中,每个键值对的value都是1,就算是一样的key,也是独立一个键值对。如果重复的键值对越多,那么将map输出传递到reduce的过程中,就会占用很多带宽资源。优化的方法就是每个map输出时,先在当前map task下进行局部合并汇总,减少重复可以的出现。即
所以其实由此可以知道,其实combiner的操作和reduce的操作是一样的,只不过一个是局部,一个是全局。简单的做法就是,直接将reducer作为combiner类传入job,如:
job.setCombinerClass(WordCountReducer.class);
我们可以看看这个方法的源码:
public void setCombinerClass(Class extends Reducer> cls) throws IllegalStateException { this.ensureState(Job.JobState.DEFINE); //看到没,那个 Reducer.class this.conf.setClass("mapreduce.job.combine.class", cls, Reducer.class); }
可以清楚看到设置combine class时,可以看到多态的类型设置就是 Reducer 类型的,从这里也可以更加确定 combiner 的操作和 reducer的就是一样的。
二、wordcount编程实例
下面开始用wordcount作为例子编写一个完整的MapReduce程序
1、mapper
public class WordCountMapper extends Mapper
2、reducer
public class WordCountReducer extends Reducer
3、driver
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //这里只是方便在ide下直接运行,如果是在命令行下直接输入输入和输出文件路径即可 args = new String[]{"G:\\test2\\", "G:\\testmap6\\"}; //1.获取配置对象 Configuration conf = new Configuration(); //2.获取job对象 Job job = Job.getInstance(conf); //3.分别给job指定driver,map,reducer的类 job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4.分别指定map和reduce阶段输出的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //这里可以设置分区类,需要额外编写分区实现类 // job.setPartitionerClass(WordCountPartitioner.class); // job.setNumReduceTasks(2); //设置预合并类 //job.setCombinerClass(WordCountReducer.class); //设置inputFormat类,大量小文件优化,不设置默认使用 TextInputFormat job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job,3* 1024 * 1024); CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024); //5.数据输入来源以及结果的输出位置 // 输入的时候会根据数据源的情况自动map切片,形成切片信息(或者叫切片方案) FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //以上就是将一个job的配置信息配置完成后,下面就提交job,hadoop将跟就job的配置执行job //6.提交job任务,这个方法相当于 job.submit()之后,然后等待执行完成 //任务配置信息是提交至yarn的 MRappmanager job.waitForCompletion(true); } }
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~