十三、MapReduce--output输出源码分析

网友投稿 223 2022-11-26

十三、MapReduce--output输出源码分析

当reducetask执行完成后,就会将结果的KV写入到指定路径下。下面分析这个output过程。

1、首先看 ReduceTask.run() 这个执行入口

//--------------------------ReduceTask.java public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); if (this.isMapOrReduce()) { this.copyPhase = this.getProgress().addPhase("copy"); this.sortPhase = this.getProgress().addPhase("sort"); this.reducePhase = this.getProgress().addPhase("reduce"); } TaskReporter reporter = this.startReporter(umbilical); boolean useNewApi = job.getUseNewReducer(); //reducetask初始化工作 this.initialize(job, this.getJobID(), reporter, useNewApi); if (this.jobCleanup) { this.runJobCleanupTask(umbilical, reporter); } else if (this.jobSetup) { this.runJobSetupTask(umbilical, reporter); } else if (this.taskCleanup) { this.runTaskCleanupTask(umbilical, reporter); } else { this.codec = this.initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class combinerClass = this.conf.getCombinerClass(); CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null; Class clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles); shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run(); this.mapOutputFilesOnDisk.clear(); this.sortPhase.complete(); this.setPhase(Phase.REDUCE); this.statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator(); //开始运行reducetask if (useNewApi) { this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } shuffleConsumerPlugin.close(); this.done(umbilical, reporter); }

和MapTask类似,主要有 this.initialize() 以及 this.runNewReducer() 这两个方法。做了初始化以及开始运行task的操作。

2、this.initialize()

//----------------------------------------ReduceTask.java public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException { //创建上下文对象 this.jobContext = new JobContextImpl(job, id, reporter); this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter); //修改reducetask的状态为运行中 if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) { this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING); } if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } //反射获取outputformat类对象。getOutputFormatClass这个方法在JobContextImpl中。 //默认是TextOutputFormat.class this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job); this.committer = this.outputFormat.getOutputCommitter(this.taskContext); } else { this.committer = this.conf.getOutputCommitter(); } //获取输出路径 Path outputPath = FileOutputFormat.getOutputPath(this.conf); if (outputPath != null) { if (this.committer instanceof FileOutputCommitter) { FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext)); } else { FileOutputFormat.setWorkOutputPath(this.conf, outputPath); } } this.committer.setupTask(this.taskContext); Class clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class); this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf); LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree); if (this.pTree != null) { this.pTree.updateProcessTree(); this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime(); } }

主要就是初始化上下文对象,获取outputformat对象。

3、this.runNewReducer()

//-----------------------------------------------ReduceTask.java private void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException { //匿名内部类,用于构建key,value的迭代器 rIter = new RawKeyValueIterator() { public void close() throws IOException { rIter.close(); } public DataInputBuffer getKey() throws IOException { return rIter.getKey(); } public Progress getProgress() { return rIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rIter.getValue(); } public boolean next() throws IOException { boolean ret = rIter.next(); reporter.setProgress(rIter.getProgress().getProgress()); return ret; } }; TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //反射获取Reducer对象 org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job); //获取RecordWriter对象,用于将结果写入到文件中 org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext); job.setBoolean("mapred.skip.on", this.isSkipping()); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); //创建reduceContext对象,用于reduce任务 org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass); //开始运行reduce try { reducer.run(reducerContext); } finally { //关闭输出流 trackedRW.close(reducerContext); } }

可以看到,主要做了以下工作:1)获取reducer对象,用于运行run() ,也就是运行reduce方法2)创建 RecordWriter对象3)创建reduceContext4)开始运行reducer中的run

4、ReduceTask.NewTrackingRecordWriter()

//--------------------------------------NewTrackingRecordWriter.java static class NewTrackingRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter { private final org.apache.hadoop.mapreduce.RecordWriter real; private final org.apache.hadoop.mapreduce.Counter outputRecordCounter; private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter; private final List fsStats; NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException { this.outputRecordCounter = reduce.reduceOutputCounter; this.fileOutputByteCounter = reduce.fileOutputByteCounter; List matchedStats = null; if (reduce.outputFormat instanceof FileOutputFormat) { matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration()); } this.fsStats = matchedStats; long bytesOutPrev = this.getOutputBytes(this.fsStats); //通过outputFormat创建RecordWriter对象 this.real = reduce.outputFormat.getRecordWriter(taskContext); long bytesOutCurr = this.getOutputBytes(this.fsStats); this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } ..................... }

重点的就是通过outputFormat.getRecordWriter来创建 RecordWriter 对象。上面也说到,outputFormat默认就是 TextOutputFormat,所以下面看看TextOutputFormat.getRecordWriter()

5、TextOutputFormat.getRecordWriter()

public class TextOutputFormat extends FileOutputFormat { public TextOutputFormat() { } //可以看到,返回的是静态内部类TextOutputFormat.LineRecordWriter public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); //key和value的分隔符,默认是 \t String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); //分为压缩和非压缩输出 if (!isCompressed) { //获取输出路径 Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); //创建输出流 FSDataOutputStream fileOut = fs.create(file, progress); return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator); } else { Class codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job); Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); //返回LineRecordWriter对象 return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } //这里就是 LineRecordWriter 类 protected static class LineRecordWriter implements RecordWriter { private static final byte[] NEWLINE; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } //将KV输出 public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { //先写key if (!nullKey) { this.writeObject(key); } //接着写入key和value之间的分隔符 if (!nullKey && !nullValue) { this.out.write(this.keyValueSeparator); } //最后写入value if (!nullValue) { this.writeObject(value); } //接着写入新的一行 this.out.write(NEWLINE); } } public synchronized void close(Reporter reporter) throws IOException { this.out.close(); } static { NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); } } }

可以看到,最终返回的RecordWriter对象是 LineRecordWriter 类型的。接着回到3中,看 reduceContext这个对象的类

6、reduceContext = ReduceTask.createReduceContext()

protected static Reducer.Context createReduceContext(Reducer reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException { ReduceContext reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); Reducer.Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext); return reducerContext; }

可以看到reducerContext是一个ReduceContextImpl类对象。下面看看ReduceContextImpl 这个类的构造方法

//---------------------------------ReduceContextImpl.java public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws InterruptedException, IOException { //父类是 TaskInputOutputContextImpl,把outputformat对象传递进去了 super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory = new SerializationFactory(conf); this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(this.buffer); this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(this.buffer); this.hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; }

这里面,它继续调用了父类的构造方法,把outputformat对象传递进去了。继续看看父类 TaskInputOutputContextImpl

public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter output, OutputCommitter committer, StatusReporter reporter) { //可以看到这里的output就是recordWriter对象 super(conf, taskid, reporter); this.output = output; this.committer = committer; } //这里的逻辑其实就是先读取KV到 this.key和this.value中,如果没有KV就返回false,否则返回true public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; //调用recordWriter的write方法,将KV输出,默认是LineRecordWriter这个类 public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { this.output.write(key, value);

可以看到,这里有3个抽象方法(在子类ReduceContextImpl中实现了逻辑,和RecordWriter无关),以及write这个具体方法。分别用于获取KV以及将结果KV写入。write这个写入方法,就是调用的 recordWriter的write方法,也就是5中创建的LineRecordWriter对象中的write方法,将KV输出。

7、reducer.run()

public void run(Reducer.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } }

可以看到,这里就是调用6中创建的 reduceContext中的方法来获取KV。而且在reduce方法中,我们会通过 context.write(key,value)来将结果KV输出。调用的其实就是 LineRecordWriter对象中的write方法。

8、总结

输出的过程中,主要涉及到两个对象 OutputFormat 以及 RecordWriterOutputFormat :创建输出流,以及创建RecordWriter对象RecordWriter:将KV输出到文件中的 write方法

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Springboot导入本地jar后 打包依赖无法加入的解决方案
下一篇:关于万兆光模块连接方案的详细说明
相关文章

 发表评论

暂时没有评论,来抢沙发吧~