#yyds干货盘点# Hadoop学习整理之MapReduce框架原理

网友投稿 238 2022-11-22

#yyds干货盘点# Hadoop学习整理之MapReduce框架原理

一、MapReduce框架原理

1 InputFormat数据输入

1.1.1 切片与MapTask并行度决定机制

1.1.2 Job提交流程源码和切片源码详解

1)Job提交流程源码详解

waitForCompletion() submit(); // 1建立连接 connect(); // 1)创建提交Job的代理 new Cluster(getConfiguration()); // (1)判断是本地运行环境还是yarn集群运行环境 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)创建给集群提交数据的Stag路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)获取jobid ,并创建Job路径 JobID jobId = submitClient.getNewJobID(); // 3)拷贝jar包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)计算切片,生成切片规划文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路径写XML配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交Job,返回提交状态 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

1.1.3 FileInputFormat切片机制

1.1.4 TextInputFormat

1)FileInputFormat实现类思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。2)TextInputFormatTextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。以下是一个示例,比如,一个分片包含了如下4条文本记录。Rich learning formIntelligent learning engineLearning more convenientFrom the real demand for more close to the enterprise每条记录表示为以下键/值对:(0,Rich learning form)(20,Intelligent learning engine)(49,Learning more convenient)(74,From the real demand for more close to the enterprise)

1.1.5 CombineTextInputFormat切片机制

1.2 MapReduce工作流程

上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件(3)多个溢出文件会被合并成大的溢出文件(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序(5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据(6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)(7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)注意:(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。

1.3 Shuffle机制

1.3.2 Partition分区

1.3.3Partition分区案例实操

手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。增加一个分区类

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner { @Override public int getPartition(Text text, FlowBean flowBean, int numPartitions) { //获取手机号前三位prePhone String phone = text.toString(); String prePhone = phone.substring(0, 3); //定义一个分区号变量partition,根据prePhone设置分区号 int partition; if("136".equals(prePhone)){ partition = 0; }else if("137".equals(prePhone)){ partition = 1; }else if("138".equals(prePhone)){ partition = 2; }else if("139".equals(prePhone)){ partition = 3; }else { partition = 4; } //最后返回分区号partition return partition; } }

在驱动函数中增加自定义数据分区设置和ReduceTask设置

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;

public class FlowDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1 获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2 关联本Driver类 job.setJarByClass(FlowDriver.class); //3 关联Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4 设置Map端输出数据的KV类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //5 设置程序最终输出的KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //8 指定自定义分区器 job.setPartitionerClass(ProvincePartitioner.class); //9 同时指定相应数量的ReduceTask job.setNumReduceTasks(5); //6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("D:\\inputflow")); FileOutputFormat.setOutputPath(job, new Path("D\\partitionout")); //7 提交Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); }

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:【i.MX 8M】MYC-JX8MX开发板评测
下一篇:SpringBoot项目集成xxljob实现全纪录
相关文章

 发表评论

暂时没有评论,来抢沙发吧~