RDD的特性 ---- RDD的缓存

网友投稿 256 2022-08-25

RDD的特性 ---- RDD的缓存

RDD的特性二 : RDD的缓存

一、RDD缓存的意义

首先让我们来看一个小案例

查看数据集

需求:

统计访问最多的IP,以及最少的IP

步骤

创建sc读取文件获取数据集取出IP相关信息简单清洗数据统计IP出现的系数统计出现系数最少的IP统计出现次数最多的IP

代码

package SparkRDD.RDD的缓存import org.apache.commons.lang3.StringUtilsimport org.apache.spark.{SparkConf, SparkContext}object IP { def main(args: Array[String]): Unit = { // 1. 创建sc val conf = new SparkConf().setAppName("ip").setMaster("local[6]") val sc = new SparkContext(conf) // 2. 读取文件获取数据集 val data = sc.textFile("dataset/Dataset-Unicauca-Version2-87Atts.csv") //data.take(5).foreach(println(_)) // 3. 取出IP相关信息 val ip = data.map(item => (item.split(",")(1),1) ) //ip.take(5).foreach(println(_)) // 4. 简单清洗数据 val clean = ip.filter( item => StringUtils.isNotEmpty(item._1)) // 导入lang3的包 //clean.take(20).foreach(println(_)) // 5. 统计IP出现的系数 val reduce = clean.reduceByKey( (curr,agg )=> curr+agg ) reduce.take(20).foreach(println(_)) // 6. 统计出现系数最多的IP val max = reduce.sortBy(item => item._2,ascending = false).first() println(max) // (10.200.7.218,295431) // 7. 统计出现次数最少的IP val min = reduce.sortBy(item => item._2,ascending = true).first() println(min) // (65.52.108.190,1) }}

在以上的案例中我们对数据集中的IP相关信息进行了提取,并且操作分析。在Spark-core中转换算子的作用是生成新的RDD,以及RDD之间的依赖关系(逻辑执行计划),行动算子的作用是生成job,有后续的执行端去执行物理计划,每一次行动算子的执行都会重新执行一次所有转换操作。在统计IP出现次数最多和最少的时候,我们进行了两次first的action操作,而在每一次的action操作中,又包含有两次shuffle操作(reducebykey、sortby),所以共会执行4次shuffle操作,而shuffle操作会在集群内对数据进行大量的拷贝,这样一来对于内存压力就会很大,例如上面的数据集下载下来有1G多,当然这样的数据量相对还是很少的,假如说是按照T级来运行的数据集,那就会有些慢了。这时就需要用到RDD的缓存,缓存的意义就在于在执行多个job的时候将转换操作(逻辑计划)缓存下来,直接使用,不需要重复的进行计算。

补充:

二、RDD缓存的API

1. cache()

def test: Unit ={ val conf = new SparkConf().setAppName("ip统计").setMaster("local[6]") val sc = new SparkContext(conf) val resource = sc.textFile("src/main/scala/SparkRDD/RDD的缓存/ip.txt") print(resource.take(5)) val ipRDD = resource.map( item => ( item.split(",")(0) , 1) ) val cleanRDD = ipRDD.filter( item => StringUtils.isNotEmpty(item._1) ) var aggRDD = cleanRDD.reduceByKey( (curr,agg) => curr + agg ) // 调用cache方法将Transformation操作缓存 aggRDD = aggRDD.cache() val maxRDD = aggRDD.sortBy( item => item._2,ascending = true ).first() // Action操作1 val minRDD = aggRDD.sortBy( item => item._2,ascending = false ).first() // Action操作2 println("max:"+maxRDD,"min:"+minRDD) /** 使用缓存 * ResultStage 9 (first at cacheTest.scala:25) finished in 0.006 s * Job 3 finished: first at cacheTest.scala:25, took 0.032388 s */ /** 不使用缓存 * ResultStage 9 (first at test.scala:31) finished in 0.006 s * Job 3 finished: first at test.scala:31, took 0.048808 s */ }

由上面案例可以看出使用了cache()方法进行缓存之后,代码的执行时间会有所缩短,当数据量十分庞大的时候就会十分有效。

2. persist()

@Test def test: Unit ={ val conf = new SparkConf().setAppName("ip统计").setMaster("local[6]") val sc = new SparkContext(conf) val resource = sc.textFile("src/main/scala/RDD的缓存/ip.txt") val ipRDD = resource.map( item => ( item.split(",")(0) , 1) ) val cleanRDD = ipRDD.filter( item => StringUtils.isNotEmpty(item._1) ) var aggRDD = cleanRDD.reduceByKey( (curr,agg) => curr + agg ) // 调用 persist方法 将Transformation操作缓存,并设置缓存级别 aggRDD = aggRDD.persist(StorageLevel.MEMORY_ONLY) // 默认 println(aggRDD.getStorageLevel) // 获取当前缓存级别 /** * Total input paths to process : 1 * StorageLevel(memory, deserialized, 1 replicas) */ val maxRDD = aggRDD.sortBy( item => item._2,ascending = true ).first() // Action操作1 val minRDD = aggRDD.sortBy( item => item._2,ascending = false ).first() // Action操作2 println("max:"+maxRDD,"min:"+minRDD)}

三、存储的级别:

1.使用缓存的作用

是否缓存在磁盘上? 稳定! 是否缓存使用内存上? 提高效率! 是否缓存使用堆外内存? 在Java管辖之外的,不安全,由spark管理 是否在缓存前序列化? 取决于数据量 是否需要有副本? 将数据分发给多个worker,提供多个副本 缓存级别就是spark内设的参数属性,每一个缓存级别对应了上面5个内容 底层API: object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) } * 每个缓存级别就是一个StorageLevel对象

2.缓存级别的设置与查看

// 调用 persist方法 将Transformation操作缓存,并设置缓存级别 aggRDD = aggRDD.persist(StorageLevel.MEMORY_ONLY) // 默认 println(aggRDD.getStorageLevel) // 获取当前缓存级别 /** * Total input paths to process : 1 * StorageLevel(memory, deserialized, 1 replicas) */

3.缓存级别

源码:

补充:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python报错:‘unicodeescape‘ codec can‘t decode bytes in position 2-3: truncated \UXXXXXXXX escape
下一篇:RDD的特性 ---- RDD的checkpoint
相关文章

 发表评论

暂时没有评论,来抢沙发吧~