linux怎么查看本机内存大小
368
2022-08-25
【Spark基础练习题一】
简单配置环境
package spark练习题.练习题01import org.apache.spark.rdd.{JdbcRDD, RDD}import org.apache.spark.{SparkConf, SparkContext}import org.junit.Testimport org.mortbay.util.ajax.JSONclass e01 { val conf = new SparkConf().setAppName("test").setMaster("local[6]") val sc = new SparkContext(conf)
1、创建一个1-10数组的RDD,将所有元素*2形成新的RDD
@Test def e01(): Unit ={ val rdd = sc.parallelize(1 to 10) val maprdd = rdd.map(_*2) maprdd.collect().foreach(println(_)) }
//2、创建一个10-20数组的RDD,使用mapPartitions将所有元素*2形成新的RDD
@Test def e02(): Unit ={ val rdd = sc.parallelize(10 to 20,2) val mappartitionrdd = rdd.mapPartitions(_.map(_*2)) mappartitionrdd.collect().foreach(println(_)) }
//3、创建一个元素为 1-5 的RDD,运用 flatMap创建一个新的 RDD,新的 RDD 为原 RDD 每个元素的 平方和三次方 来组成 1,1,4,8,9,27…
@Test def e03(): Unit ={ val rdd = sc.parallelize(1 to 5) val flatrdd = rdd.flatMap(x => Array(Math.pow(x,2).toInt,Math.pow(x,3).toInt)) flatrdd.foreach(println(_)) }
//4、创建一个 4 个分区的 RDD数据为Array(10,20,30,40,50,60),使用glom将每个分区的数据放到一个数组
@Test def e04(): Unit ={ val rdd = sc.parallelize(Array(10,20,30,40,50,60),4) val glom = rdd.glom().collect() glom.foreach(println(_)) for(i <- glom;j <- i) println(j) }
//5、创建一个 RDD数据为Array(1, 3, 4, 20, 4, 5, 8),按照元素的奇偶性进行分组
@Test def e05(): Unit ={ val rdd = sc.parallelize(Array(1, 3, 4, 20, 4, 5, 8)) val rdd1 = rdd.groupBy( item => if (item%2==0) "偶数" else "基数" ) rdd1.collect().foreach(print(_)) }
//6、创建一个 RDD(由字符串组成)Array(“xiaoli”, “laoli”, “laowang”, “xiaocang”, “xiaojing”, “xiaokong”),过滤出一个新 RDD(包含“xiao”子串)
@Test def e06(): Unit ={ val rdd = sc.parallelize(Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong")) val filter = rdd.filter(_.contains("xiao")) filter.collect().foreach(println(_)) }
//7、创建一个 RDD数据为1 to 10,请使用sample不放回抽样
@Test def e07(): Unit ={ val rdd = sc.parallelize(1 to 10) val sample = rdd.sample(withReplacement = false,0.5,2) sample.collect().foreach(println(_)) }
//8、创建一个 RDD数据为1 to 10,请使用sample放回抽样
@Test def e08(): Unit ={ val rdd = sc.parallelize(1 to 10) val sample = rdd.sample(withReplacement = true,0.5,1) sample.collect().foreach(println(_)) }
//9、创建一个 RDD数据为Array(10,10,2,5,3,5,3,6,9,1),对 RDD 中元素执行去重操作
@Test def e09(){ val rdd = sc.parallelize(Array(10,10,2,5,3,5,3,6,9,1)) val dupl = rdd.distinct() dupl.collect().foreach(println(_)) }
//10、创建一个分区数为5的 RDD,数据为0 to 100,之后使用coalesce再重新减少分区的数量至 2 // 创建一个分区数为5的 RDD,数据为0 to 100,之后使用repartition再重新减少分区的数量至 3
@Test def e10(){ val rdd = sc.parallelize(0 to 100,5) val coalesce = rdd.coalesce(2) println(coalesce.partitions.size) val repartition = rdd.repartition(3) println(repartition.partitions.size) }
//11、创建一个 RDD数据为1,3,4,10,4,6,9,20,30,16,请给RDD进行分别进行升序和降序排列
@Test def e11(): Unit ={ val rdd = sc.parallelize(Array(1,3,4,10,4,6,9,20,30,16)) val desc = rdd.sortBy(x => x, false) val asc = rdd.sortBy(x => x, true) desc.collect().foreach(println(_ )) asc.collect().foreach(println(_)) }
//12、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和1 to 10,求并集 // 创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算差集,两个都算 // 创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和1 to 10,计算交集 // 创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和1 to 10,计算 2 个 RDD 的笛卡尔积
@Test def e12(): Unit ={ val rdd = sc.parallelize(1 to 6) val rdd1 = sc.parallelize(1 to 10) val rdd2 = rdd.union(rdd1) rdd2.collect().foreach(println(_)) val rdd3 = sc.parallelize(1 to 6) val rdd4 = sc.parallelize(4 to 10) val rdd5 = rdd3.subtract(rdd4) val rdd5_1= rdd4.subtract(rdd3) rdd5.collect().foreach(println(_)) rdd5_1.collect().foreach(println(_)) val rdd6 = sc.parallelize(1 to 6) val rdd7 = sc.parallelize(1 to 10) val rdd8 = rdd6.intersection(rdd7) rdd8.collect().foreach(println(_)) val rdd6_1 = sc.parallelize(1 to 6) val rdd7_1 = sc.parallelize(1 to 10) val rdd8_1 = rdd6_1.cartesian(rdd7_1) rdd8_1.collect().foreach(println(_)) }
//13、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 5和11 to 15,对两个RDD拉链操作
@Test def e13(): Unit ={ val rdd = sc.parallelize(1 to 5) val rdd1 = sc.parallelize(11 to 15) val rdd2 = rdd.zip(rdd1) rdd2.collect().foreach(println(_)) }
//14、创建一个RDD数据为List((“female”,1),(“male”,5),(“female”,5),(“male”,2)),请计算出female和male的总数分别为多少
@Test def e14(): Unit ={ val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) val reduce = rdd.reduceByKey((curr,agg)=>curr+agg) reduce.collect().foreach(println(_)) }
//15、创建一个有两个分区的 RDD数据为List((“a”,3),(“a”,2),(“c”,4),(“b”,3),(“c”,6),(“c”,8)),取出每个分区相同key对应值的最大值,然后相加
@Test def e15(): Unit ={ val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2) val rdd1= rdd.glom().collect().foreach(x => println(x.mkString(","))) // 将数组转换成字符串打印出来 val rdd2 = rdd.aggregateByKey(0)(math.max(_,_),_+_) rdd2.collect().foreach(println(_)) }
//16、创建一个有两个分区的 pairRDD数据为Array((“a”, 88), (“b”, 95), (“a”, 91),(“b”, 93), (“a”, 95), (“b”, 98)),根据 key 计算每种 key 的value的平均值
@Test def e16(): Unit ={ val rdd = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2) // 1 val rdd1 = rdd.groupByKey().map(x=>x._1->x._2.sum/x._2.size) rdd1.collect().foreach(println(_)) // 2 val rdd2 = rdd.map(x=>(x._1,(x._2,1))) .reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)) .map(x=>(x._1,x._2._1/x._2._2)) rdd2.collect().foreach(println(_)) }
@Test def e17(): Unit ={ val rdd = sc.textFile("src/main/scala/spark练习题/练习题01/data/ads") //rdd.take(2).foreach(println(_)) rdd.map { x => var data = x.split(" ");(data(1),(data(4),1))} .groupByKey()// (8,CompactBuffer((28,1), (22,1)))// (6,CompactBuffer((26,1), (36,1), (32,1), (16,1), (26,1), (86,1), (76,1)))// (9,CompactBuffer((38,1), (48,1), (48,1), (78,1), (98,1)))// (1,CompactBuffer((42,1), (42,1), (32,1), (112,1))) .map{ case (province,list) => { val tuples = list.groupBy(_._1).map(x=>(x._1,x._2.size)).toList.sortWith((x,y)=>x._2>y._2).take(3) (province,tuples) } }.collect().sortBy(_._1).foreach(println) }
//18、读取 people.json 数据的文件, 每行是一个 json 对象,进行解析输出
@Test def e18(): Unit ={ import scala.util.parsing.json.JSON val rdd = sc.textFile("hdfs://192.168.64.129:9000/user/root/spark/sparkSql/people.json") val result = rdd.map(JSON.parseFull) result.collect().foreach(println(_)) }
//19、保存一个 SequenceFile 文件,使用spark创建一个RDD数据为Array((“a”, 1),(“b”, 2),(“c”, 3)),保存为SequenceFile格式的文件到hdfs上 // 读取19题的SequenceFile 文件并输出
@Test def e19(): Unit ={ val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3))) // 写入 //rdd.saveAsSequenceFile("hdfs://192.168.64.129:9000/user/root/spark/sparkSql/SequenceFile") // 读取 val rdd1 = sc.sequenceFile[String,Int]("hdfs://192.168.64.129:9000/user/root/spark/sparkSql/SequenceFile/part-00001") rdd1.collect().foreach(println(_)) }
// 20.读写 objectFile 文件,把 RDD 保存为objectFile,RDD数据为Array((“a”, 1),(“b”, 2),(“c”, 3)),并进行读取出来
@Test def e20(): Unit ={ val rdd = sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3))) //rdd.saveAsObjectFile("src/main/scala/spark练习题/练习题01/data/output20200407/20200407_objectFile") val rdd1 = sc.objectFile("src/main/scala/spark练习题/练习题01/data/output20200407/20200407_objectFile/part-00001") println(rdd1) }
//21.使用内置累加器计算Accumulator.txt文件中空行的数量
@Test def e21(): Unit ={ val rdd = sc.textFile("src/main/scala/spark练习题/练习题01/data/Accumulator") var count = sc.longAccumulator("count") rdd.foreach(x => if (x=="") count.add(1)) println(count.value) }
//22.使用Spark广播变量 //用户表: //id name age gender(0|1) //001,刘向前,18,0 //002,冯 剑,28,1 //003,李志杰,38,0 //004,郭 鹏,48,2 //要求,输出用户信息,gender必须为男或者女,不能为0,1 //使用广播变量把Map(“0” -> “女”, “1” ->“男”)设置为广播变量,最终输出格式为 //001,刘向前,18,女 //003,李志杰,38,女 //002,冯 剑,28,男 //004,郭 鹏,48,男
@Test def e22(): Unit ={ val rdd = sc.textFile("src/main/scala/spark练习题/练习题01/data/students") val broadcast = sc.broadcast(Map("0"->"女","1"->"男")) rdd.foreach{ x => var datas = x.split(","); println( datas(0)+","+datas(1)+","+datas(2)+","+ broadcast.value(datas(3)) ) } }
//23.mysql创建一个数据库spark,在此数据库中创建一张表 //CREATE TABLE user ( // id int(11) NOT NULL AUTO_INCREMENT, // username varchar(32) NOT NULL COMMENT ‘用户名称’, // birthday date DEFAULT NULL COMMENT ‘生日’, // sex char(1) DEFAULT NULL COMMENT ‘性别’, // address varchar(256) DEFAULT NULL COMMENT ‘地址’, // PRIMARY KEY (id) //) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; //数据依次是:姓名 生日 性别 省份 //请使用spark将以上数据写入mysql中,并读取出来
@Test def e23(): Unit ={ val rdd = sc.textFile("src/main/scala/spark练习题/练习题01/data/user") // 配置数据库相关信息 val Driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/spark" val username = "root" val password = "123456" //插入数据 rdd.foreachPartition{ data => // 创数据库连接 Class.forName(Driver) val connection = java.sql.DriverManager.getConnection(url,username,password) val sql = "insert into user values (Null,?,?,?,?)" data.foreach{ tuples => { val datas = tuples.split(" ") val statement = connection.prepareStatement(sql) statement.setString(1,datas(0)) statement.setString(2,datas(1)) statement.setString(3,datas(2)) statement.setString(4,datas(3)) statement.executeUpdate() statement.close() } } connection.close() } } // 读取数据库 @Test def e24(): Unit ={ // 配置数据库相关信息 val Driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/spark" val username = "root" val password = "123456" val sql = "select * from user where id between ? and ?" val jdbcRDD = new JdbcRDD( sc, () => { Class.forName(Driver) java.sql.DriverManager.getConnection(url,username,password)}, sql,0,44,3, result => { println(s"id=${result.getInt(1)},username=${result.getString(2)}" + s",birthday=${result.getDate(3)},sex${result.getString(4)}" + s",address${result.getString(5)}") } ) jdbcRDD.collect() }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~