c语言sscanf函数的用法是什么
262
2022-11-26
九、MapReduce--input源码分析
当job提交至yarn之后,就会开始调度运行map任务,这里开始讲解map输入的源码分析。一个map任务的入口就是 MapTask.class 中的run() 方法
1、首先看看MapTask.run() 方法
MapTask.class
//---------------------------------MapTask.java public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (this.isMapTask()) { if (this.conf.getNumReduceTasks() == 0) { this.mapPhase = this.getProgress().addPhase("map", 1.0F); } else { this.mapPhase = this.getProgress().addPhase("map", 0.667F); this.sortPhase = this.getProgress().addPhase("sort", 0.333F); } } TaskReporter reporter = this.startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); //进行map任务的初始化 this.initialize(job, this.getJobID(), reporter, useNewApi); if (this.jobCleanup) { this.runJobCleanupTask(umbilical, reporter); } else if (this.jobSetup) { this.runJobSetupTask(umbilical, reporter); } else if (this.taskCleanup) { this.runTaskCleanupTask(umbilical, reporter); } else { //启动map任务,判断是使用新的还是旧的api if (useNewApi) { this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter); } else { this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter); } this.done(umbilical, reporter); } }
上面重点有两个方法,一个是 this.initialize()以及 this.runNewMapper()。
2、下面看看this.initialize()
//---------------------------------Task.java public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException { //创建task以及job上下文对象 this.jobContext = new JobContextImpl(job, id, reporter); this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter); //将task任务的状态改为正在运行 if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) { this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING); } if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } //获取job中配置的输出格式类,并通过反射获取该类的Class对象 this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job); //通过outputformat类获取commiter this.committer = this.outputFormat.getOutputCommitter(this.taskContext); } else { this.committer = this.conf.getOutputCommitter(); } //从FileOutputFormat获取任务结果输出路径。 /* 可能有的人会奇怪,为啥mapper这里要获取outputformat 的输出路径。 首先我们要知道,一个MapReduce任务可以只有mapper,而没有reducer的, 那么这时候程序的输出是有mapper直接输出的,这时候自然就需要知道输出的路径,这里就派上用场了 */ Path outputPath = FileOutputFormat.getOutputPath(this.conf); if (outputPath != null) { if (this.committer instanceof FileOutputCommitter) { FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext)); } else { FileOutputFormat.setWorkOutputPath(this.conf, outputPath); } } this.committer.setupTask(this.taskContext); Class extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class); this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf); LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree); if (this.pTree != null) { this.pTree.updateProcessTree(); this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime(); } }
这个方法主要做了一些初始化工作,比如创建上下文对象,获取输出outputFormat类,以及路径等。
3、下面接着看看this.runNewMapper()
//---------------------------------MapTask.java
private
可以看到,这里就是整个map任务的核心流程,做了以下工作:(1)获取mapper类对象,下面要执行里面的map方法(2)获取InputFormat对象,默认是默认inputformat为TextInputFormat(3)通过InputFormat对象获取RecordReader对象,后面用于读取数据文件(4)获取用于输出map的结果的RecordWriter对象(5)获取切片信息,比如切片所在文件的路径,起始偏移量等(6)初始化切片数据(7)开始运行mapper中的run()方法(8)运行完毕,关闭输入流,将结果通过RecordWriter刷写。(9)刷写完毕后,关闭输入流以及输出流下面看看其中的核心方法
4、this.getSplitDetails() 获取切片信息
//---------------------------------MapTask.java
private
可以看到这里主要是返回切片的反序列化之后可以读取的信息对象
5、接着看看 input.initialize()
在看这个方法之前,首先我们看看input这个对象是由哪个类创建的。它是由NewTrackingRecordReader 这个类创建的。这是个静态内部类
//---------------------------------MapTask.java
static class NewTrackingRecordReader
我们可以看到构造方法中,是调用 inputFormat对象的createRecordReader() 方法来创建RecordReader对象的,上面也说了默认inputFormat为 TextInputFormat。
//---------------------------TextInputFormat.java
public class TextInputFormat extends FileInputFormat
可以清楚看到,返回的就是 LineRecordReader 这个reader类。
接着我们继续看 input.initialize()
static class NewTrackingRecordReader
可以看到,调用 RecordReader中的 initialize 方法,也就是调用LineRecordReader 中的 initialize() 方法,下面看看
//---------------------------------------LineRecordReader.java public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit)genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647); //获取切片的数据开始位置以及终止位置 this.start = split.getStart(); this.end = this.start + split.getLength(); //获取切片对应的文件的输入流 Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); this.fileIn = fs.open(file); //如果文件有压缩,则用压缩类解压 CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file); //以压缩方式读取切片 if (null != codec) { this.isCompressedInput = true; this.decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, READ_MODE.BYBLOCK); this.in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); this.start = cIn.getAdjustedStart(); this.end = cIn.getAdjustedEnd(); this.filePosition = cIn; } else { if (this.start != 0L) { throw new IOException("Cannot seek in " + codec.getClass().getSimpleName() + " compressed stream"); } this.in = new SplitLineReader(codec.createInputStream(this.fileIn, this.decompressor), job, this.recordDelimiterBytes); this.filePosition = this.fileIn; } } else { //无压缩方式读取切片 this.fileIn.seek(this.start); //这里很重要,是真正用于读取数据的类 this.in = new UncompressedSplitLineReader(this.fileIn, job, this.recordDelimiterBytes, split.getLength()); this.filePosition = this.fileIn; } //对起始偏移量进行修正,并赋值给pos这个偏移量 if (this.start != 0L) { this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start)); } this.pos = this.start; }
这里的工作主要是给 RecordReader对象读取文件做初始化工作。主要就是获取切片的输入流对象。this.in 这里就用于后面读取数据的对象,这里就是完成了这个输入流对象的初始化。
6、接着我们回到3中,看mapper.run() 方法
这个其实就是写的mapper 的run方法:
//------------------------Mapper.java mapper.run(mapperContext);
public void run(Mapper
可以看到,这里是个while循环,通过context上下文对象获取KV,然后传入map方法中处理。
7、下面看看 context.nextKeyValue()
从3中可以看到,这个context是 MapContextImpl类型的,看看这个类
//-----------------------MapContextImpl.java..
public class MapContextImpl
在它的构造方法中,主要从3中传入了 split切片,以及 RecordReader对象。下面就是三个获取KV的方法,也就是在 mapper.run() 中调用的方法。
下面看看 this.reader.nextKeyValue()
//----------------------------------LineRecordReader.java public boolean nextKeyValue() throws IOException { if (this.key == null) { this.key = new LongWritable(); } //设置key为偏移量 this.key.set(this.pos); if (this.value == null) { this.value = new Text(); } int newSize = 0; while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) { if (this.pos == 0L) { newSize = this.skipUtfByteOrderMark(); } else { /*读取数据到value中。this.in是UncompressedSplitLineReader类型的,在LineRecordReader的initialize方法中初始化了。该类父类为LineReader。*/ //调用 LineRreader 的readline 方法。读一行数据 newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos)); this.pos += (long)newSize; } if (newSize == 0 || newSize < this.maxLineLength) { break; } LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize)); } if (newSize == 0) { this.key = null; this.value = null; return false; } else { return true; } }
可以看到,这里已经看到key和value的踪影了。key就是数据偏移量,value就是通过readLine读取的数据。如果有数据返回true,mapper.run() 通过getKey和getValue对应的KV。下面看看 this.in.readLine,也就是 LineReader.readLine()。
8、LineReader.readLine() 按行读取的reader
//---------------------------LineReader.java public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { return this.recordDelimiterBytes != null ? this.readCustomLine(str, maxLineLength, maxBytesToConsume) : this.readDefaultLine(str, maxLineLength, maxBytesToConsume); } private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); int txtLength = 0; long bytesConsumed = 0L; int delPosn = 0; int ambiguousByteCount = 0; do { int startPosn = this.bufferPosn; if (this.bufferPosn >= this.bufferLength) { startPosn = this.bufferPosn = 0; this.bufferLength = this.fillBuffer(this.in, this.buffer, ambiguousByteCount > 0); if (this.bufferLength <= 0) { if (ambiguousByteCount > 0) { str.append(this.recordDelimiterBytes, 0, ambiguousByteCount); bytesConsumed += (long)ambiguousByteCount; } break; } } for(; this.bufferPosn < this.bufferLength; ++this.bufferPosn) { if (this.buffer[this.bufferPosn] == this.recordDelimiterBytes[delPosn]) { ++delPosn; if (delPosn >= this.recordDelimiterBytes.length) { ++this.bufferPosn; break; } } else if (delPosn != 0) { this.bufferPosn -= delPosn; if (this.bufferPosn < -1) { this.bufferPosn = -1; } delPosn = 0; } } int readLength = this.bufferPosn - startPosn; bytesConsumed += (long)readLength; int appendLength = readLength - delPosn; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } bytesConsumed += (long)ambiguousByteCount; if (appendLength >= 0 && ambiguousByteCount > 0) { //看到这里就很明显了,将数据追加到 value中 str.append(this.recordDelimiterBytes, 0, ambiguousByteCount); ambiguousByteCount = 0; this.unsetNeedAdditionalRecordAfterSplit(); } if (appendLength > 0) { str.append(this.buffer, startPosn, appendLength); txtLength += appendLength; } if (this.bufferPosn >= this.bufferLength && delPosn > 0 && delPosn < this.recordDelimiterBytes.length) { ambiguousByteCount = delPosn; bytesConsumed -= (long)delPosn; } } while(delPosn < this.recordDelimiterBytes.length && bytesConsumed < (long)maxBytesToConsume); if (bytesConsumed > 2147483647L) { throw new IOException("Too many bytes before delimiter: " + bytesConsumed); } else { return (int)bytesConsumed; } }
上面重要就是读取数据的过程了,过程过于长,抓住关键的看,其实就是将读取的一行数据追加到 this.value中。
9、总结
至此,map的整个输入流程涉及到两个重要的类InputFormat -- 处理原始数据并切片;创建RecordReader 对象RecordReader -- 读取切片中的数据,处理成KV,传递KV给map方法处理
这两个都是抽象类:
public abstract class RecordReader
public abstract class InputFormat
当我们想自定义inputformat类和recordreader类时,就需要继承这两个类,并实现其中的方法。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~