c语言sscanf函数的用法是什么
404
2022-11-30
浅谈Spark原理(三)惰性评估及Pipeline计算
引论中说到,现实中有些数据处理过程,将待处理的数据分割为多个数据子集后,再分布到多个节点上处理后的结果等同于在单个节点上对全部数据的处理结果。本节讨论Spark这种数据处理过程的原理。Spark采用了惰性评估(lazy evaluation)的设计思想,将RDD的操作分成了两类,一类是transformation操作,另一类是action操作,Spark根据transformation操作只是建立逻辑执行计划,不真正执行计算,只有action的操作才会触发真正的计算。计算的触发顺序是先调用子RDD的compute方法再调用父RDD的compute方法,但实际的计算是先执行父RDD的compute方法中的计算逻辑再执行子RDD的compute方法中的计算逻辑(这也才符合常理)。这样设计的好处主要有两点:1)为Spark优化数据处理过程提供了空间;2)为spark实现pipeline计算提供了可能,不必存储每个中间RDD的数据。
对这个机制的理解,一是可以阅读源代码,另一种更直观的方式是在源代码中添加日志打印来加深理解。下面通过在RDDSuite里添加一个测试用例来观察transformation的执行过程。
这个例子首先将一个数组转化为RDD,它具有两个数据分片。然后对每个元素做一个映射,然后再做一个过滤。我们直观的认识,将这个数组在一个节点上做这两个操作的结果,和分散在多个节点上做这两个操作的结果应该是一样的,不需要额外的处理。下面我们在Spark源码里添加打印验证。
通过查看Spark源码,sc.makeRDD方法生成的是一个 ParallelCollectionRDD,在ParallelCollectionRDD的compute方法里添加如下打印:
println 语句中的id是RDD的一个属性,用来标识RDD,它是一个从0开始递增的整数。s.index是 Partition的一个属性,标识一个数据分片;context.stageId标识该计算所属的stage。
后续的map及filter两个操作都是生成一个MapPartitionsRDD,在MapPartitionsRDD的compute 方法里再同样添加如下打印:
执行测试用例,打印如下:
从上述打印可以看出以下3点:
1)在compute方法中调用context.stageId()的返回值都是0,这说明map和filter操作确实在同一个Stage里执行的。
2) 先调用RDD 2的compute方法,再调用RDD 1的compute方法,最后调用RDD 0的compute方法;但是真正执行的时候是先执行map方法,再执行filter方法。
3)不是所有元素的map操作执行完后再执行filter操作,这是得益于Iterator的一个特性(注意RDD的compute方法的返回值类型是Iterator),有兴趣的读者可自行探讨。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~