c语言sscanf函数的用法是什么
248
2022-11-26
八、MapReduce--job提交源码分析
一、源码分析
1、提交job的入口
通过 job.waitForCompletion(true)完成job的提交以及运行,下面从这个方法入手分析源码。
//-----------------job.java public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { //如果job的状态为未运行,则提交任务 if (this.state == Job.JobState.DEFINE) { this.submit(); } if (verbose) { //监控并打印运行信息 this.monitorAndPrintJob(); } else { int completionPollIntervalMillis = getCompletionPollInterval(this.cluster.getConf()); while(!this.isComplete()) { try { Thread.sleep((long)completionPollIntervalMillis); } catch (InterruptedException var4) { } } } return this.isSuccessful(); }
2、this.submit() 提交job
//-----------------job.java
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
//确定job状态为未运行
this.ensureState(Job.JobState.DEFINE);
//使用新api
this.setUseNewAPI();
//主要就是初始化cluster对象中的client,用于和集群连接通信。分为yarn client和local client
this.connect();
//通过cluster对象获取job提交器,将存储job信息的文件系统以及client作为参数
final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());
//提交job,并运行
this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction
上面这里涉及到三个重要过程方法:this.connect() 主要初始化了提交job的clientthis.getJobSubmitter() 给job封装了很多apisubmitter.submitJobInternal(Job.this, Job.this.cluster) 提交job,并运行下面详细看看这三个方法具体做了啥
3、this.connect()初始化client
//-----------------job.java
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException {
//创建cluster连接对象,用于连接集群,提供了很多api
if (this.cluster == null) {
this.cluster = (Cluster)this.ugi.doAs(new PrivilegedExceptionAction
这代码最重要的就是创建了一个 Cluster对象,下面看看这个类的构造方法。
//----------------------------Cluster.java public Cluster(Configuration conf) throws IOException { this((InetSocketAddress)null, conf); } public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.fs = null; this.sysDir = null; //job工作目录 this.stagingAreaDir = null; this.jobHistoryDir = null; //客户端和server通信协议提供者 this.providerList = null; //将job的配置conf保存 this.conf = conf; //获取当前用户 this.ugi = UserGroupInformation.getCurrentUser(); //对job提交器client进行初始化 this.initialize(jobTrackAddr, conf); } //这里就是初始化client的方法了,主要就是获得 this.client private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.initProviderList(); Iterator i$ = this.providerList.iterator(); while(i$.hasNext()) { /* provider这里也有分 YarnClientProtocolProvider 以及LocalClientProtocolProvider 即本地和yarn两种provider */ ClientProtocolProvider provider = (ClientProtocolProvider)i$.next(); LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { /*判断jobTrackAddr是否为空,也就是以远程集群还是本地的方式运行job. 远程集群的话,就创建yarn 提交器,:YARNRunner,通过YarnClientProtocolProvider创建 本地的话,就创建本地local 提交器:LocalRunner,通过 LocalClientProtocolProvider创建 主要是根据 mapreduce.framework.name 在conf中的值是local还是yarn来创建对应的runner */ if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { this.clientProtocolProvider = provider; //可以看到这里client就是上面通过provider创建的 this.client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); //只要成功创建了client 和 provider就退出 break; } LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } catch (Exception var7) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", var7); } } if (null == this.clientProtocolProvider || null == this.client) { throw new IOException("Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses."); } }
可以看到Cluster对象主要就是初始化了 clientProtocolProvider 以及 client 两个对象。也就是provider和client,client是通过provider.create创建的。
下面可以看看ClientProtocolProvider和 ClientProtocol这两个类。这两个类都是抽象类,那么看他们对应有哪些实现子类。
ClientProtocolProvider: YarnClientProtocolProvider LocalClientProtocolProvider ClientProtocol: YARNRunner LocalJobRunner
可以看看YarnClientProtocolProvider 以及LocalClientProtocolProvider的create方法
public class LocalClientProtocolProvider extends ClientProtocolProvider { ......... public ClientProtocol create(Configuration conf) throws IOException { String framework = conf.get("mapreduce.framework.name", "local"); if (!"local".equals(framework)) { return null; } else { conf.setInt("mapreduce.job.maps", 1); //创建LocalJobRunner return new LocalJobRunner(conf); } } ..................... } //=============================================================== public class YarnClientProtocolProvider extends ClientProtocolProvider { ................................... public ClientProtocol create(Configuration conf) throws IOException { //创建 YARNRunner return "yarn".equals(conf.get("mapreduce.framework.name")) ? new YARNRunner(conf) : null; } ........................... }
总的来说,就是provider分为YarnClientProtocolProvider 以及LocalClientProtocolProvider,分别用于创建client中的 YARNRunner 和 LocalJobRunner。表示job运行方式有本地和yarn两种。
至此,this.client以及this.provider这两个在Cluster对象中的对象初始化完成。
4、this.getJobSubmitter()封装submitter
//-------------------job.java public JobSubmitter getJobSubmitter(FileSystem fs, ClientProtocol submitClient) throws IOException { return new JobSubmitter(fs, submitClient); }
创建个 JobSubmitter对象,看看构造方法
//------------------JobSubmitter.java JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) throws IOException { this.submitClient = submitClient; this.jtFs = submitFs; }
看起来,没啥特别, 就是把文件系统fs以及 上面cluster中初始化的client保存起来。但是其实这个类中有很多方法后面会调用。后面讲
5、submitter.submitJobInternal()提交job
这个方法是整个job提交过程中的核心,要注意看
//------------------JobSubmitter.java
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
//检查配置的输出是否已存在,已存在会抛出异常
this.checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
//获取所有job工作总目录
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//获取ip地址对象
InetAddress ip = InetAddress.getLocalHost();
//设置提交job的主机名和ip
if (ip != null) {
this.submitHostAddress = ip.getHostAddress();
this.submitHostName = ip.getHostName();
conf.set("mapreduce.job.submithostname", this.submitHostName);
conf.set("mapreduce.job.submithostaddress", this.submitHostAddress);
}
//通过client向集群申请运行job,获取到对应的jobid.这个submitclient是前面cluster初始化完成的
JobID jobId = this.submitClient.getNewJobID();
job.setJobID(jobId);
//创建存储job相关资源数据的目录对象.存储job配置文件、切片信息文件、程序jar包等
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
JobStatus var24;
try {
conf.set("mapreduce.job.user.name", UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop."org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set("mapreduce.job.dir", submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");
//获取访问namenode中特定目录授权
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{submitJobDir}, conf);
this.populateTokenCache(conf, job.getCredentials());
//验证token相关
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance("HmacSHA1");
keyGen.init(64);
} catch (NoSuchAlgorithmException var19) {
throw new IOException("Error generating shuffle secret key", var19);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt("mapreduce.am.max-attempts", 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediatedata spill is enabled");
}
//复制job的临时文件,以及运行的jar包到submitJobDir下
this.copyAndConfigureFiles(job, submitJobDir);
//获取存储job配置信息文件路径,一般命名为:submitJobDir/job.xml
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
LOG.debug("Creating splits at " + this.jtFs.makeQualified(submitJobDir));
//将切片信息存储到submitJobDir下,并返回切片数目。会调用 InputFormat.getSplits()来获取规划的切片信息
//切片信息会写入到 submitJobDir/job.split,切片信息条目的元信息写入到 submitJobDir/job.splitmetainfo
int maps = this.writeSplits(job, submitJobDir);
conf.setInt("mapreduce.job.maps", maps);
LOG.info("number of splits:" + maps);
//传输队列名称
String queue = conf.get("mapreduce.job.queuename", "default");
//submitClient其实就是cluster的client
AccessControlList acl = this.submitClient.getQueueAdmins(queue);
conf.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean("mapreduce.job.token.tracking.ids.enabled", false)) {
ArrayList
总结一下上面的主要流程:(1)Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); 获取job总的工作目录
(2)JobID jobId = this.submitClient.getNewJobID();job.setJobID(jobId);通过处理client向集群申请jobid,并保持到job的配置信息中。
(3)Path submitJobDir = new Path(jobStagingArea, jobId.toString());获取当前job的工作目录,以及jobid命名
(4)this.copyAndConfigureFiles(job, submitJobDir);复制job的临时文件,运行的jar包到submitJobDir下
(5)Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);获取job配置信息文件的路径。命名为:submitJobDir/job.xml
(6)int maps = this.writeSplits(job, submitJobDir);将切片信息存储到submitJobDir下,并返回切片数目。会调用 InputFormat.getSplits()来获取规划的切片信息。切片信息会写入到 submitJobDir/job.split,切片信息条目的元信息写入到 submitJobDir/job.splitmetainfo。
(7)this.writeConf(conf, submitJobFile);将job配置信息写入到 submitJobDir/job.xml 中
(8)status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());正式提交job,获取job的提交状态
下面挑比较复杂的看看这些的具体实现。
重点在于job任务的资源的生成,如切片文件的生成。
=================================================================
(1)this.copyAndConfigureFiles(job, submitJobDir);
复制job的临时文件,运行的jar包到submitJobDir下
//------------------JobSubmitter.java private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException { JobResourceUploader rUploader = new JobResourceUploader(this.jtFs); rUploader.uploadFiles(job, jobSubmitDir); job.getWorkingDirectory(); } //----------------------JobResourceUploader.java public void uploadFiles(Job job, Path submitJobDir) throws IOException { ...................... String files = conf.get("tmpfiles"); String libjars = conf.get("tmpjars"); String archives = conf.get("tmparchives"); String jobJar = job.getJar(); ..................代码长,就截取一点,这些就是要复制到job目录的文件类型 }
可以看到主要复制jar包以及相关的文件到job工作目录下。
(2)Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
获取job配置信息文件的路径。命名为:submitJobDir/job.xml
//-----------------------------JobSubmissionFiles.java public static Path getJobConfPath(Path jobSubmitDir) { return new Path(jobSubmitDir, "job.xml"); }
(3)int maps = this.writeSplits(job, submitJobDir);
将切片信息存储到submitJobDir下,并返回切片数目。会调用 InputFormat.getSplits()来获取规划的切片信息。切片信息会写入到 submitJobDir/job.split,切片信息条目的元信息写入到 submitJobDir/job.splitmetainfo。返回的是切片数目
//------------------------JobSubmitter.java private int writeSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = this.writeNewSplits(job, jobSubmitDir); } else { maps = this.writeOldSplits(jConf, jobSubmitDir); } return maps; }
没什么特别的,主要就是区分新旧api,我们看 this.writeNewSplits
//------------------------JobSubmitter.java
private
获取 inputformat对象,通过inputformat的getSplits() 获取规划切片信息,然后JobSplitWriter.createSplitFiles()创建切片信息文件。下面最后这个方法
//------------------JobSplitWriter.createSplitFiles
public static
这里主要生成两个主要文件jobSubmitDir/job.split:切片信息文件,记录每个切片的信息,比如路径,block位置,偏移量等jobSubmitDir/job.splitmetainfo:切片信息文件中每个信息条目的索引位置,如每条切片信息在 job.split中的起始位置,长度等
下面看看这两个文件的生成首先是jobSubmitDir/job.split
private static
主要就是将split中的切片信息条目对象序列化写入到文件中,并生成jobSubmitDir/job.splitmetainfo中要写入的信息,也就是切片文件的索引信息接着看看 writeJobSplitMetaInfo()
private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, FsPermission p, int splitMetaInfoVersion, SplitMetaInfo[] allSplitMetaInfo) throws IOException { //写入切片信息条目的元信息,创建一个输出流 FSDataOutputStream out = FileSystem.create(fs, filename, p); out.write(JobSplit.META_SPLIT_FILE_HEADER); WritableUtils.writeVInt(out, splitMetaInfoVersion); WritableUtils.writeVInt(out, allSplitMetaInfo.length); SplitMetaInfo[] arr$ = allSplitMetaInfo; int len$ = allSplitMetaInfo.length; //逐条写入 for(int i$ = 0; i$ < len$; ++i$) { SplitMetaInfo splitMetaInfo = arr$[i$]; splitMetaInfo.write(out); } out.close(); }
这里其实很明显了,就是将切片文件索引信息写入到 jobSubmitDir/job.splitmetainfo
(4)status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
正式提交job,获取job的提交状态
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { this.addHistoryToken(ts); //这里就是将job配置,以及job资源的hdfs目录路径传入 ApplicationSubmissionContext appContext = this.createApplicationSubmissionContext(this.conf, jobSubmitDir, ts); try { //提交job,返回的appid ApplicationId applicationId = this.resMgrDelegate.submitApplication(appContext); //根据appid创建appMaster ApplicationReport appMaster = this.resMgrDelegate.getApplicationReport(applicationId); String diagnostics = appMaster == null ? "application report is null" : appMaster.getDiagnostics(); if (appMaster != null && appMaster.getYarnApplicationState() != YarnApplicationState.FAILED && appMaster.getYarnApplicationState() != YarnApplicationState.KILLED) { return this.clientCache.getClient(jobId).getJobStatus(jobId); } else { throw new IOException("Failed to run job : " + diagnostics); } } catch (YarnException var8) { throw new IOException(var8); } }
这里主要就是提交job,创建appMaster。最后获取job状态。
二、总结
一个job提交流程主要如下:1、和MapReduce集群建立连接 this.connect()这里面最重要就是创建了 client,有 YARNRunner和LocalJobRunner两种方式。后续用来和server端通信、提交job等。
2、正式提交job ,submitter.submitJobInternal(Job.this, cluster)(1)Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); 获取job总的工作目录
(2)JobID jobId = this.submitClient.getNewJobID();job.setJobID(jobId);通过处理client向集群申请jobid,并保持到job的配置信息中。
(3)Path submitJobDir = new Path(jobStagingArea, jobId.toString());获取当前job的工作目录,以及jobid命名
(4)this.copyAndConfigureFiles(job, submitJobDir);复制job的临时文件,运行的jar包到submitJobDir下
(5)Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);获取job配置信息文件的路径。命名为:submitJobDir/job.xml
(6)int maps = this.writeSplits(job, submitJobDir);将切片信息存储到submitJobDir下,并返回切片数目。会调用 InputFormat.getSplits()来获取规划的切片信息。切片信息会写入到 submitJobDir/job.split,切片信息条目的元信息写入到 submitJobDir/job.splitmetainfo。
(7)this.writeConf(conf, submitJobFile);将job配置信息写入到 submitJobDir/job.xml 中
(8)status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());正式提交job,获取job的提交状态
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~