【回顾】RDD的依赖关系及任务、阶段的划分

网友投稿 275 2022-08-25

【回顾】RDD的依赖关系及任务、阶段的划分

文章目录

​​RDD的依赖关系及任务、阶段的划分​​

​​1、RDD 血统关系​​​​2、RDD 依赖关系​​​​3、RDD 窄依赖​​​​4、RDD 宽依赖​​​​5、RDD 阶段划分​​​​6、RDD 任务划分​​

RDD的依赖关系及任务、阶段的划分

1、RDD 血统关系

RDD 只支持​​粗粒度转换​​,​​即在大量记录上执行的单个操作​​。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。​​RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区​​。

RDD不会保存数据。RDD为了提供容错性,需要将RDD间的关系保存下来。一旦出现错误,可以根据血缘关系将数据源重新读取进行计算。

toDebugString():返回此 RDD 的描述及其用于调试的递归依赖项

/** A description of this RDD and its recursive dependencies for debugging. */def toDebugString: String = { // Get a debug description of an rdd without its children def debugSelf(rdd: RDD[_]): Seq[String] = { import Utils.bytesToString val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else "" val storageInfo = rdd.context.getRDDStorageInfo(_.id == rdd.id).map(info => " CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format( info.numCachedPartitions, bytesToString(info.memSize), bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize))) s"$rdd [$persistence]" +: storageInfo } ........

WordCount测试:

val fileRDD: RDD[String] = sc.textFile("input/1.txt")println(fileRDD.toDebugString)println("----------------------")val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))println(wordRDD.toDebugString)println("----------------------")val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))println(mapRDD.toDebugString)println("----------------------")val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)println(resultRDD.toDebugString)resultRDD.collect()(2) data/t1.txt MapPartitionsRDD[1] at textFile at Spark01_toDebugString.scala:10 [] | data/t1.txt HadoopRDD[0] at textFile at Spark01_toDebugString.scala:10 []----------------------(2) MapPartitionsRDD[2] at flatMap at Spark01_toDebugString.scala:14 [] | data/t1.txt MapPartitionsRDD[1] at textFile at Spark01_toDebugString.scala:10 [] | data/t1.txt HadoopRDD[0] at textFile at Spark01_toDebugString.scala:10 []----------------------(2) MapPartitionsRDD[3] at map at Spark01_toDebugString.scala:18 [] | MapPartitionsRDD[2] at flatMap at Spark01_toDebugString.scala:14 [] | data/t1.txt MapPartitionsRDD[1] at textFile at Spark01_toDebugString.scala:10 [] | data/t1.txt HadoopRDD[0] at textFile at Spark01_toDebugString.scala:10 []----------------------(2) ShuffledRDD[4] at reduceByKey at Spark01_toDebugString.scala:22 [] +-(2) MapPartitionsRDD[3] at map at Spark01_toDebugString.scala:18 [] | MapPartitionsRDD[2] at flatMap at Spark01_toDebugString.scala:14 [] | data/t1.txt MapPartitionsRDD[1] at textFile at Spark01_toDebugString.scala:10 [] | data/t1.txt HadoopRDD[0] at textFile at Spark01_toDebugString.scala:10 []

注意: ​​+-(2)​​ 表示存在shuffle操作,中断。

​​返回顶部​​

2、RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

dependencies():获取此 RDD 的依赖项列表,同时考虑 RDD 是否已设置检查点。

/*** Get the list of dependencies of this RDD, taking into account whether the* RDD is checkpointed or not.*/final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { dependencies_ = getDependencies } dependencies_ }}

WordCount测试:

val fileRDD = sc.textFile("data/t1.txt")println(fileRDD.dependencies)println("----------------------")val wordRDD = fileRDD.flatMap(_.split(" "))println(wordRDD.dependencies)println("----------------------")val mapRDD = wordRDD.map((_,1))println(mapRDD.dependencies)println("----------------------")val resultRDD = mapRDD.reduceByKey(_+_)println(resultRDD.dependencies)resultRDD.collect()List(org.apache.spark.OneToOneDependency@6d469831)----------------------List(org.apache.spark.OneToOneDependency@2cc04358)----------------------List(org.apache.spark.OneToOneDependency@58516c91)----------------------List(org.apache.spark.ShuffleDependency@1907874b)

​​返回顶部​​

3、RDD 窄依赖

窄依赖表示​​每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition使用​​,窄依赖我们形象的比喻为独生子女。

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)

任务的划分:

窄依赖中下层数据只依赖于上层父级分区的数据,所以只需要各分区间自行计算即可,也就是每个分区开设一个单独的Task按照先后顺序执行就行了。

​​返回顶部​​

4、RDD 宽依赖

宽依赖表示​​同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖​​,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]]

任务的划分:

宽依赖同一个父级层的数据会被多个下级层分区依赖,所以对于数据的管理需要分别展开,父级层各分区的数据分别交由对应的Task执行操作,并且必须等每个分区全部完成后才能够进行下一步,存在阶段的概念(紫色部分),shuffle操作后再交由新分区对应的Task执行。

​​返回顶部​​

5、RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向、不会闭环。

例如,DAG 记录了 RDD 的转换过程和任务的阶段

RDD 阶段划分源码:

def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*)}

穿过底层一系列的 ​​runJob()​​

// submitJob(rdd, func, partitions, callSite, resultHandler, properties) 提交job​​val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)​​// dagScheduler.runJob 通过有向无环图运行job​​dagScheduler.runJob()​​// submitJob(rdd, func, partitions, callSite, resultHandler, properties) 提交job​​val waiter = submitJob()​​

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length)}def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)}def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] = { val results = new Array[U](partitions.size) runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res) results}def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } // dagScheduler.runJob 通过有向无环图运行job dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint()} def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime // submitJob(rdd, func, partitions, callSite, resultHandler, properties) 提交job val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception }}

submitJob底层使用了JobSubmitted事件​​​val waiter = new JobWaiter(., ., ., .)eventProcessLoop.post(JobSubmitted(.,.,.,.,.,.,.))​​

def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter}

handleJobSubmitted()

一路火花带闪电,找到handleJobSubmitted事件处理器

// finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 进行阶段的划分​​finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)​​

private[scheduler] case class JobSubmitted( jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties = null) extends DAGSchedulerEvent// finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 进行阶段的划分private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage)}

createResultStage()

// 判断是否存在上一级阶段,并获取通过上面一系列操作传递过来的collect上一级的rdd​​val parents = getOrCreateParentStages(rdd, jobId)​​​// 接着创建一个ResultStage​​val stage = new ResultStage()​​

private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { // 判断是否存在上一级阶段 val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() // 首先创建一个ResultStage val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage}

getOrCreateParentStages()

// 获取传输过来RDD的宽依赖(shuffle)​​getShuffleDependencies(rdd).map { shuffleDep=>​​ // 将每一个shuffle依赖转为一个阶段​​getOrCreateShuffleMapStage(shuffleDep, firstJobId)​​ }.toListcreateShuffleMapStage()

// 创建新的阶段​​val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)​​

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { // 获取传输过来RDD的宽依赖(shuffle) getShuffleDependencies(rdd).map { shuffleDep => // 将每一个shuffle依赖转为一个阶段 getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { // 创建一个存储宽依赖的HashSet集合 val parents = new HashSet[ShuffleDependency[_, _, _]] // 创建一个访问过RDD的HashSet集合 val visited = new HashSet[RDD[_]] // 创建一个等待访问RDD的Stack val waitingForVisit = new Stack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit // 遍历依赖 toVisit.dependencies.foreach { // 如果是宽依赖,存入宽依赖的集合 case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.push(dependency.rdd) } } } // 将shuffle依赖返回 parents}private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // Create stages for all missing ancestor shuffle dependencies. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies // that were not already in shuffleIdToMapStage, it's possible that by the time we // get to a particular dependency in the foreach loop, it's been added to // shuffleIdToMapStage by the stage creation process for an earlier dependency. See // SPARK-13902 for more information. if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. // 创建shuffle阶段 createShuffleMapStage(shuffleDep, firstJobId) }}def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() // 创建新的阶段 val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep) stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { // A previously run stage generated partitions for this shuffle, so for each output // that's still available, copy information about that output location to the new stage // (so we don't unnecessarily re-compute that data). val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) val locs = MapOutputTracker.deserializeMapStatuses(serLocs) (0 until locs.length).foreach { i => if (locs(i) ne null) { // locs(i) will be null if missing stage.addOutputLoc(i, locs(i)) } } } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage}

​​返回顶部​​

6、RDD 任务划分

RDD 任务切分中间分为:​​Application​​、​​Job​​、​​Stage​​ 和 ​​Task​​

Application:初始化一个 SparkContext 即生成一个 Application;Job:一个 Action 算子就会生成一个 Job;Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

注意:Application -> Job -> Stage -> Task 每一层都是 1 对 n 的关系。

RDD 任务划分源码:

handleJobSubmitted()

// 提交阶段 — 最后的阶段​​submitStage(finalStage)​​

private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) // 提交阶段 --- 最后的阶段 submitStage(finalStage)}

submitStage():

// 如果没有上一级阶段​​submitMissingTasks(stage, jobId.get)​​

private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") // 判断是否有上一级阶段 if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") // 如果没有上一级阶段 submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) }}

submitMissingTasks():

val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { // ShuffleMapStage case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } // ResultStage case stage: ResultStage => // map算子只改变数据的结构,不会改变数据量,所以新建ResultTask的数量就是partitionsToCompute的数据量 partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) // 新建ResultTask new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } }}

findMissingPartitions():

val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()override def findMissingPartitions(): Seq[Int] = { val job = activeJob.get //0 - 最后一个RDD的分区数 (0 until job.numPartitions).filter(id => !job.finished(id))}

到了这里也就是说新建的分区数量取决于一个阶段中最后RDD的分区数量同理,我们查看ShuffleMapStage 的底层:

override def findMissingPartitions(): Seq[Int] = { // 依然是 0 - numPartitions(分区数) val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty) assert(missing.size == numPartitions - _numAvailableOutputs, s"${missing.size} missing, expected ${numPartitions -}") missing}

​​返回顶部​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:想做好软文营销,这些方案步骤要记住?(软文营销效果的四种方法)
下一篇:优惠券营销效果不好?你真的用对了吗?(优惠券有用吗)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~