十三、MapReduce--output输出源码分析
当reducetask执行完成后,就会将结果的KV写入到指定路径下。下面分析这个output过程。
1、首先看 ReduceTask.run() 这个执行入口
//--------------------------ReduceTask.java
public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
if (this.isMapOrReduce()) {
this.copyPhase = this.getProgress().addPhase("copy");
this.sortPhase = this.getProgress().addPhase("sort");
this.reducePhase = this.getProgress().addPhase("reduce");
}
TaskReporter reporter = this.startReporter(umbilical);
boolean useNewApi = job.getUseNewReducer();
//reducetask初始化工作
this.initialize(job, this.getJobID(), reporter, useNewApi);
if (this.jobCleanup) {
this.runJobCleanupTask(umbilical, reporter);
} else if (this.jobSetup) {
this.runJobSetupTask(umbilical, reporter);
} else if (this.taskCleanup) {
this.runTaskCleanupTask(umbilical, reporter);
} else {
this.codec = this.initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = this.conf.getCombinerClass();
CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null;
Class extends ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
rIter = shuffleConsumerPlugin.run();
this.mapOutputFilesOnDisk.clear();
this.sortPhase.complete();
this.setPhase(Phase.REDUCE);
this.statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
//开始运行reducetask
if (useNewApi) {
this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
} else {
this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
}
shuffleConsumerPlugin.close();
this.done(umbilical, reporter);
}
和MapTask类似,主要有 this.initialize() 以及 this.runNewReducer() 这两个方法。做了初始化以及开始运行task的操作。
2、this.initialize()
//----------------------------------------ReduceTask.java
public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {
//创建上下文对象
this.jobContext = new JobContextImpl(job, id, reporter);
this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);
//修改reducetask的状态为运行中
if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {
this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);
}
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
//反射获取outputformat类对象。getOutputFormatClass这个方法在JobContextImpl中。
//默认是TextOutputFormat.class
this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);
this.committer = this.outputFormat.getOutputCommitter(this.taskContext);
} else {
this.committer = this.conf.getOutputCommitter();
}
//获取输出路径
Path outputPath = FileOutputFormat.getOutputPath(this.conf);
if (outputPath != null) {
if (this.committer instanceof FileOutputCommitter) {
FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));
} else {
FileOutputFormat.setWorkOutputPath(this.conf, outputPath);
}
}
this.committer.setupTask(this.taskContext);
Class extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);
this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);
LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);
if (this.pTree != null) {
this.pTree.updateProcessTree();
this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();
}
}
主要就是初始化上下文对象,获取outputformat对象。
3、this.runNewReducer()
//-----------------------------------------------ReduceTask.java
private void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException {
//匿名内部类,用于构建key,value的迭代器
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rIter.getKey();
}
public Progress getProgress() {
return rIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rIter.next();
reporter.setProgress(rIter.getProgress().getProgress());
return ret;
}
};
TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);
//反射获取Reducer对象
org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
//获取RecordWriter对象,用于将结果写入到文件中
org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext);
job.setBoolean("mapred.skip.on", this.isSkipping());
job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
//创建reduceContext对象,用于reduce任务
org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass);
//开始运行reduce
try {
reducer.run(reducerContext);
} finally {
//关闭输出流
trackedRW.close(reducerContext);
}
}
可以看到,主要做了以下工作:1)获取reducer对象,用于运行run() ,也就是运行reduce方法2)创建 RecordWriter对象3)创建reduceContext4)开始运行reducer中的run
4、ReduceTask.NewTrackingRecordWriter()
//--------------------------------------NewTrackingRecordWriter.java
static class NewTrackingRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter {
private final org.apache.hadoop.mapreduce.RecordWriter real;
private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
private final List fsStats;
NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException {
this.outputRecordCounter = reduce.reduceOutputCounter;
this.fileOutputByteCounter = reduce.fileOutputByteCounter;
List matchedStats = null;
if (reduce.outputFormat instanceof FileOutputFormat) {
matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration());
}
this.fsStats = matchedStats;
long bytesOutPrev = this.getOutputBytes(this.fsStats);
//通过outputFormat创建RecordWriter对象
this.real = reduce.outputFormat.getRecordWriter(taskContext);
long bytesOutCurr = this.getOutputBytes(this.fsStats);
this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
.....................
}
重点的就是通过outputFormat.getRecordWriter来创建 RecordWriter 对象。上面也说到,outputFormat默认就是 TextOutputFormat,所以下面看看TextOutputFormat.getRecordWriter()
5、TextOutputFormat.getRecordWriter()
public class TextOutputFormat extends FileOutputFormat {
public TextOutputFormat() {
}
//可以看到,返回的是静态内部类TextOutputFormat.LineRecordWriter
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
boolean isCompressed = getCompressOutput(job);
//key和value的分隔符,默认是 \t
String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");
//分为压缩和非压缩输出
if (!isCompressed) {
//获取输出路径
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
//创建输出流
FSDataOutputStream fileOut = fs.create(file, progress);
return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator);
} else {
Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job);
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
//返回LineRecordWriter对象
return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
}
}
//这里就是 LineRecordWriter 类
protected static class LineRecordWriter implements RecordWriter {
private static final byte[] NEWLINE;
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8);
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text)o;
this.out.write(to.getBytes(), 0, to.getLength());
} else {
this.out.write(o.toString().getBytes(StandardCharsets.UTF_8));
}
}
//将KV输出
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (!nullKey || !nullValue) {
//先写key
if (!nullKey) {
this.writeObject(key);
}
//接着写入key和value之间的分隔符
if (!nullKey && !nullValue) {
this.out.write(this.keyValueSeparator);
}
//最后写入value
if (!nullValue) {
this.writeObject(value);
}
//接着写入新的一行
this.out.write(NEWLINE);
}
}
public synchronized void close(Reporter reporter) throws IOException {
this.out.close();
}
static {
NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
}
}
}
可以看到,最终返回的RecordWriter对象是 LineRecordWriter 类型的。接着回到3中,看 reduceContext这个对象的类
6、reduceContext = ReduceTask.createReduceContext()
protected static Reducer.Context createReduceContext(Reducer reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException {
ReduceContext reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);
Reducer.Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext);
return reducerContext;
}
可以看到reducerContext是一个ReduceContextImpl类对象。下面看看ReduceContextImpl 这个类的构造方法
//---------------------------------ReduceContextImpl.java
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws InterruptedException, IOException {
//父类是 TaskInputOutputContextImpl,把outputformat对象传递进去了
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(this.buffer);
this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(this.buffer);
this.hasMore = input.next();
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}
这里面,它继续调用了父类的构造方法,把outputformat对象传递进去了。继续看看父类 TaskInputOutputContextImpl
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter output, OutputCommitter committer, StatusReporter reporter) {
//可以看到这里的output就是recordWriter对象
super(conf, taskid, reporter);
this.output = output;
this.committer = committer;
}
//这里的逻辑其实就是先读取KV到 this.key和this.value中,如果没有KV就返回false,否则返回true
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
//调用recordWriter的write方法,将KV输出,默认是LineRecordWriter这个类
public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
this.output.write(key, value);
可以看到,这里有3个抽象方法(在子类ReduceContextImpl中实现了逻辑,和RecordWriter无关),以及write这个具体方法。分别用于获取KV以及将结果KV写入。write这个写入方法,就是调用的 recordWriter的write方法,也就是5中创建的LineRecordWriter对象中的write方法,将KV输出。
7、reducer.run()
public void run(Reducer.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
while(context.nextKey()) {
this.reduce(context.getCurrentKey(), context.getValues(), context);
Iterator iter = context.getValues().iterator();
if (iter instanceof ValueIterator) {
((ValueIterator)iter).resetBackupStore();
}
}
} finally {
this.cleanup(context);
}
}
可以看到,这里就是调用6中创建的 reduceContext中的方法来获取KV。而且在reduce方法中,我们会通过 context.write(key,value)来将结果KV输出。调用的其实就是 LineRecordWriter对象中的write方法。
8、总结
输出的过程中,主要涉及到两个对象 OutputFormat 以及 RecordWriterOutputFormat :创建输出流,以及创建RecordWriter对象RecordWriter:将KV输出到文件中的 write方法
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~