#yyds干货盘点#Hadoop中,切片split逻辑,FileOutputFormat.getSplits()源码解析

网友投稿 240 2022-11-22

#yyds干货盘点#Hadoop中,切片split逻辑,FileOutputFormat.getSplits()源码解析

其实虽然说是源码解析,但根本没那个本事,只是看了段视频,跟着在源码里写了点注释而已。      Hadoop中,MapReduce时,会对文件进行切片,这其中涉及到了FileOutputFormat.getSplits()。该方法的作用是得到切片。      所有的英文都是源码,所有的中文都是我跟着敲的注释。      把包名类名和导包先给大家贴上。

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapreduce.lib.input; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapred.LocatedFileStatusFetcher; import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.StringUtils; import com.google.common.collect.Lists; /** * A base class for file-based {@link InputFormat}s. * *

FileInputFormat is the base class for all file-based * InputFormats. This provides a generic implementation of * {@link #getSplits(JobContext)}. * Subclasses of FileInputFormat can also override the * {@link #isSplitable(JobContext, Path)} method to ensure input-files are * not split-up and are processed as a whole by {@link Mapper}s. */ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class FileInputFormat extends InputFormat { .... }

/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */ public List getSplits(JobContext job) throws IOException { // 类似秒表,开始计时 StopWatch sw = new StopWatch().start(); // minSize默认是1, maxSize默认是Long.MAX_VALUE long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits // 定义了一个集合,存储产生的切片 List splits = new ArrayList(); // 获取输入的文件,如果输入的是一个文件夹,则是文件夹下的所有文件。所以是一个List List files = listStatus(job); for (FileStatus file: files) { // 获取文件路径 Path path = file.getPath(); // 获取文件的大小 long length = file.getLen(); if (length != 0) { // 存储块的位置 BlockLocation[] blkLocations; // 判断是不是单机模式,如果是单机模式从本地来读;如果不是,则从hdfs上读取文件 if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { // 如果不是本地文件,则从hdfs上读取 FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } // 判断这个文件是否可切,如果不可切,则整个文件作为一个切片处理 if (isSplitable(job, path)) { // 获取块的大小,元数据里有 long blockSize = file.getBlockSize(); /** * computeSplitSize方法的源码如下: * protected long computeSplitSize(long blockSize, long minSize, * long maxSize) { * return Math.max(minSize, Math.min(maxSize, blockSize)); * } * 如果需要调大切片,需要调节minSize; 如果需要调小,则调节maxSize * FileInputFormat.setMaxInputSplitSize() * FileInputFormat.setMinInputSplitSize() */ long splitSize = computeSplitSize(blockSize, minSize, maxSize); // length是文件的大小 // long length = file.getLen(); long bytesRemaining = length; // private static final double SPLIT_SLOP = 1.1; // 10% slop // 当剩余的字节个数 / 切片个数 大于1.1,继续切;如果不大于,判断剩下的字节数是不是0,如果不是0,按照一个切片处理 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } // 判断剩下的字节数是不是0,如果不是0,按照一个切片处理 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable // 文件不可切的处理方式 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { // 如果文件为空,则整个文件作为一个切片处理 //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); // 类似秒表,停止计时 sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; }

这是computeSplitSize()方法的源码:

protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }

暂时就这么多~

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:WiMAX宽带无线接入的频谱分配及接口分析
下一篇:声强测试系统中DSP和ADC的接口电路
相关文章

 发表评论

暂时没有评论,来抢沙发吧~