c语言sscanf函数的用法是什么
236
2022-11-22
#yyds干货盘点# Hadoop之MapReduce内核源码解析
MapReduce内核源码解析
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。(2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。(3)Reduce阶段:reduce()函数将计算结果写到HDFS上。
ReduceTask并行度决定机制
MapTask并行度由切片个数决定,切片个数由输入文件和切片规则决定。1)设置ReduceTask并行度(个数)ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:
// 默认值是1,手动设置为4 job.setNumReduceTasks(4);
MapTask & ReduceTask源码解析
1)MapTask源码解析流程
=================== MapTask ===================
context.write(k, NullWritable.get()); //自定义的map方法的写出,进入 output.write(key, value); //MapTask727行,收集方法,进入两次 collector.collect(key, value,partitioner.getPartition(key, value, partitions)); HashPartitioner(); //默认分区器 collect() //MapTask1082行 map端所有的kv全部写出后会走下面的close方法 close() //MapTask732行 collector.flush() // 溢出刷写方法,MapTask735行,提前打个断点,进入 sortAndSpill() //溢写排序,MapTask1505行,进入 sorter.sort() QuickSort //溢写排序方法,MapTask1625行,进入 mergeParts(); //合并文件,MapTask1527行,进入 collector.close(); //MapTask739行,收集器关闭,即将进入ReduceTask
2)ReduceTask源码解析流程
=================== ReduceTask ===================
if (isMapOrReduce()) //reduceTask324行,提前打断点 initialize() // reduceTask333行,进入 init(shuffleContext); // reduceTask375行,走到这需要先给下面的打断点 totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl第120行,提前打断点 merger = createMergeManager(context); //合并方法,Shuffle第80行// MergeManagerImpl第232 235行,提前打断点 this.inMemoryMerger = createInMemoryMerger(); //内存合并 this.onDiskMerger = new OnDiskMerger(this); //磁盘合并 rIter = shuffleConsumerPlugin.run(); eventFetcher.start(); //开始抓取数据,Shuffle第107行,提前打断点 eventFetcher.shutDown(); //抓取结束,Shuffle第141行,提前打断点 copyPhase.complete(); //copy阶段完成,Shuffle第151行 taskStatus.setPhase(TaskStatus.Phase.SORT); //开始排序阶段,Shuffle第152行 sortPhase.complete(); //排序阶段完成,即将进入reduce阶段 reduceTask382行 reduce(); //reduce阶段调用的就是我们自定义的reduce方法,会被调用多次 cleanup(context); //reduce完成之前,会最后调用一次Reducer里面的cleanup方法
Join应用
Reduce Join
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
代码实操
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class TableBean implements Writable { private String id; //订单id private String pid; //产品id private int amount; //产品数量 private String pname; //产品名称 private String flag; //判断是order表还是pd表的标志字段 public TableBean() { } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public String toString() { return id + "\t" + pname + "\t" + amount; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { this.id = in.readUTF(); this.pid = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.flag = in.readUTF(); } }
(2)编写TableMapper类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TableMapper extends Mapper
(3)编写TableReducer类
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class TableReducer extends Reducer
(4)编写TableDriver类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("D:\\input")); FileOutputFormat.setOutputPath(job, new Path("D:\\output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
Map Join
1)使用场景Map Join适用于一张表十分小、一张表很大的场景。2)优点思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。3)具体办法:采用DistributedCache(1)在Mapper的setup阶段,将文件读取到缓存集合中。(2)在Driver驱动类中加载缓存。
//缓存普通文件到Task运行节点。 job.addCacheFile(new URI("file:///e:/cache/pd.txt")); //如果是集群运行,需要设置HDFS路径 job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
实现代码
(1)先在MapJoinDriver驱动类中添加缓存文件
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置加载jar包路径 job.setJarByClass(MapJoinDriver.class); // 3 关联mapper job.setMapperClass(MapJoinMapper.class); // 4 设置Map输出KV类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 加载缓存数据 job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt")); // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0 job.setNumReduceTasks(0); // 6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("D:\\input")); FileOutputFormat.setOutputPath(job, new Path("D:\\output")); // 7 提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
(2)在MapJoinMapper类中的setup方法中读取缓存文件
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~