c语言sscanf函数的用法是什么
201
2023-07-12
MapTask阶段shuffle源码分析
1. 收集阶段
在Mapper中,调用context.write(key,value)实际是调用代理NewOutPutCollector的wirte方法
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException {
output.write(key, value);
}
实际调用的是MapOutPutBuffer的collect(),在进行收集前,调用partitioner来计算每个key-value的分区号
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
2. NewOutPutCollector对象的创建
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
// 创建实际用来收集key-value的缓存区对象
collector = createSortingCollector(job, reporter);
// 获取总的分区个数
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
// 默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区
partitioner = new org.apache.hadoop.mapreduce.Partitioner
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
3. 创建环形缓冲区对象
@SuppressWarnings("unchecked")
private
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
// 从当前Job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用MapOutputBuffer.class
Class>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
int remainingCollectors = collectorClasses.length;
Exception lastException = null;
for (Class clazz : collectorClasses) {
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
// 创建缓冲区对象
MapOutputCollector
ReflectionUtils.newInstance(subclazz, job);
// 创建完缓冲区对象后,执行初始化
collector.init(context);
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
} catch (Exception e) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
if (--remainingCollectors > 0) {
msg += " (" + remainingCollectors + " more collector(s) to try)";
}
lastException = e;
LOG.warn(msg, e);
}
}
throw new IOException("Initialization of all the collectors failed. " +
"Error in last collector was :" + lastException.getMessage(), lastException);
}
3. MapOutPutBuffer的初始化  mfNuug; 环形缓冲区对象
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
job = context.getJobConf();
reporter = context.getReporter();
mapTask = context.getMapTask();
mapOutputFile = mapTask.getMapOutputFile();
sortPhase = mapTask.getSortPhase();
spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
// 获取分区总个数,取决于ReduceTask的数量
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks
// 从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
// 获取mapreduce.task.io.sort.mb,如果没设置,就是100MB
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
// 在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
// 存放key-value
kvbuffer = new byte[maxMemUsage];
bufvoid = kvbuffer.length;
// 存储key-value的属性信息,分区号,索引等
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();mfNuug
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
maxRec = kvmeta.capacity() / NMETA;
softLimit = (int)(kvbuffer.length * spillper);
bufferRemaining = softLimit;
LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
// k/v serialization
// 获取快速排序的Key的比较器,排序只按照key进行排序!
comparator = job.getOutputKeyComparator();
// 获取key-value的序列化器
keyClass = (Class
valClass = (Class
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
// output counters
mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputmfNuugByteCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
// 溢写到磁盘,可以使用一个压缩格式! 获取指定的压缩编解码器
// compression
if (job.getCompressMapOutput()) {
Class extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
}
// 获取Combiner组件
// combiner
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector
} else {
combineCollector = null;
}
spillInProgress = false;
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
// 设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程!
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
// 启动线程
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}
4. Paritionner的获取
从配置中读取mapreduce.job.partitioner.class,如果没有指定,采用HashPartitioner.class
如果reduceTask > 1, 还没有设置分区组件,使用HashPartitioner
@SuppressWarnings("unchecked")
public Class extends Partitioner,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class extends Partitioner,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
public class HashPartitioner
/** Use {@link Object#hashCode()} to partition. **/
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
分区号的限制:0 <= 分区号 < 总的分区数(reduceTask的个数)
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
5.MapTask shuffle的流程
①在map()调用context.write()
②调用MapoutPutBuffer的collect()
调用分区组件Partitionner计算当前这组key-value的分区号
③将当前key-value收集到MapOutPutBuffer中
如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!
④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!
排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!
⑤ 开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中
如果没有定义Combiner,直接溢写!
如果定义了Combiner,使用CombinerRunner.conbine()对key-value处理后再次溢写!
⑥多次溢写后,每次溢写都会产生一个临时文件
⑦最后,执行一次flush(),将剩余的key-value进行溢写
⑧MergeParts: 将多次溢写的结果,保存为一个总的文件!
在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!
如果定义了Conbiner,Conbiner会再次运行(前提是溢写的文件个数大于3)!
否则,就直接溢写!
⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据
6. Combiner
combiner其实就是Reducer类型:
Class extends Reducer
(Class extends Reducer
Combiner的运行时机:
MapTask:
①每次溢写前,如果指定了Combiner,会运行
②将多个溢写片段,进行合并为一个最终的文件时,也会运行Combiner,前提是片段数>=3
ReduceTask:
③reduceTask在运行时,需要启动shuffle进程拷贝MapTask产生的数据!
数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!
数据过多,内部不够,将部分数据溢写在磁盘!
如果有溢写的过程,那么combiner会再次运行!
①一定会运行,②,③需要条件!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。如果你想了解更多相关内容请查看下面相关链接
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~