linux cpu占用率如何看
306
2022-08-25
【2020大数据应用赛样卷试题】Spark分析处理
【2020大数据应用赛样卷试题 — Spark分析处理】
Spark作为大数据第三代计算引擎,在企业中被广泛应用,下面请你完成Spark相关题目:
一、请简答Spark、Mapreduce、Hive三者区别,并说明分别在什么场景下使用三者,请分别举一个案例(5分)
Spark 与 Hive的区别:
SparkSQL替换的是Hive的查询引擎,Hive是一种基于HDFS的数据仓库,并且提供了基于SQL模型的,针对存了大数据的数据仓库,进行分布式交互查询的查询引擎,所以SparkSQL暂时并不能完全替代Hive,实际上,在生产环境中,SparkSQL也是针对Hive数据仓库中的数据进行查询,Spark本身自己不提供存储,自然不可能替代Hive作为数据仓库的功能。这也印证了,SparkSQL只是替代hive的查询引擎,但是hive构建数据仓库的地位,SparkSQL并不能替代。SparkSQL一个优点,较于Hive的查询引擎,就是速度快,原因很简单,hivesql执行底层还是用MapReduce来实现,必须经过shuffle过程(性能损耗最大,走磁盘),所以速度是很缓慢的,一些复杂的HiveSQL需要一个小时以上的时间才能执行完。但是SparkSQL由于其底层基于Spark自身的基于内存的特点,所以速度是Hive查询引擎的几倍以上。另外,SparkSQL支持大量不同的数据源,无论是关系型数据库还是文本文件,例如json,parquet,jdbc等等,同时,SparkSQL基于RDD来工作,可以与spark的其他组件无缝集成,实现许多较为复杂的功能,例如,sparksql可以直接针对hdfs文件进行执行sql语句。
Spark 与 MapReduce 的区别
Spark的计算模式也是输入MapReduce,但不局限于map和reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更加的灵活Spark提供了内存计算,可将中间结果放到内存中,不再像MapReduce那样,每一个map或reduce阶段完成后都进行落盘操作,对于迭代计算的效率更高。Spark基于DAG(有向无环图)的任务调度机制,要优于MapReduce的迭代机制Spark将数据载入内存中以后,之后的迭代计算都可以直接使用内存中的中间结果作运算,从而避免了从磁盘中频繁的读取数据,提高运行效率。
适用场景
MapReduce: MapReduce 是一种编程模型,用于大规模数据集(大于 1TB) 的并行运算。MapReduce 的典型应用场景中,目前日志分析用的比较多,还有做搜素的索引,机器学习算法包 mahout 也是之一,当然它能做的东西还有很多,比如数据掘、信息提取。Spark:拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。数据过于繁杂,并且需要让计算通过迭代,并在内存中,极大地提高效率的场景Hive:基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的 sql 查询功能,可以将 sql 语句转换为 MapReduce 任务进行运行。应用场景:十分适合数据仓库的统计分析。
二、请完成下面相关统计
请你将以下学生成绩数据,存放在Hdfs上,使用Spark读取完成下面分析 学生表字段描述:学号,姓名,年龄,性别,班级 分数表字段描述:学号,科目名,分数 科目表字段描述:科目名,总分
1、使用Spark统计每个班级学生的人数,将统计好的结果保存到文件中(请提供编程代码和截图)(3分)
输出样式: 班级,人数 文科一班,39
步骤一:配置环境
object t01 { def main(args: Array[String]): Unit = { // 配置环境 val spark = new sql.SparkSession.Builder() .appName("t01") .master("local[6]") .getOrCreate() // 导入隐式转换、函数 import spark.implicits._ import org.apache.spark.sql.functions._
步骤二:读取数据集
// 读取数据集 val students = spark.sparkContext.textFile("hdfs://192.168.64.129:9000/user/root/testdata/students.txt") students.collect().foreach(println(_))
步骤三:提取信息聚合
// 提取信息聚合 val class_students = students.map(item=>(item.split(",")(4),1)) .reduceByKey(_+_) .sortByKey() class_students.collect().foreach(println(_))
通过对读取的数据集观察,发现以逗号为分割符,所以在处理数据集时使用map算子转换形态提取有效信息时也是以逗号为分隔符。转换之后,在使用reducebykey算子进行统计。
步骤四:格式化输出样式
// 转换形态 val result = class_students.map(item=>Array(item._1,item._2).mkString(",")) result.collect().foreach(println(_)) }}
2、使用Spark sql统计每个班级总分排名前十的学生,将统计好的结果保存到文件中 (请提供编程代码和截图)(3分)
输出样式: 班级,姓名,总分 文科一班,张三,400
步骤一:配置环境
// 配置环境val spark = new sql.SparkSession.Builder() .master("local[6]") .appName("t02") .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._
步骤二:读取数据集,简单合并表
// 读取合并数据集 ---- 表连接 val students = spark.sparkContext.textFile("hdfs://192.168.64.129:9000/user/root/testdata/students.txt") .map(item => (item.split(",")(0), item.split(",")(4), item.split(",")(1)) ).toDF("学号", "班级", "姓名") students.show() val score = spark.sparkContext.textFile("hdfs://192.168.64.129:9000/user/root/testdata/score.txt") .map(item => (item.split(",")(0), item.split(",")(1), item.split(",")(2).toFloat) ).toDF("学号", "科目", "成绩") score.show() val students_info = students.join(score, students.col("学号") === score.col("学号")) .select(students.col("学号"), students.col("班级"), students.col("姓名"), score.col("科目"), score.col("成绩")) students_info.show()
学生表与成绩表是分开的,所以一开始要将数据整合到一张表上,进行表的拼接。
步骤三:求和 ---- 总分
//求和 val sum_score = students_info.select('学号, '班级, '姓名, '科目, '成绩) .groupBy('学号, '班级, '姓名) .agg(sum("成绩") as "总分") sum_score.show()
按照每个学生来进行聚合函数求取总分。
步骤四:自定义窗口函数进行排名,并抽取前10名
val window = Window.partitionBy( "班级") .orderBy('总分 desc) val rank = sum_score.select('班级, '姓名,'总分,dense_rank() over (window) as "rank") .where('rank<=10) rank.show(100)
自定义窗口函数,按照班级进行分区,总分进行排序,获取前10名,注意这里用dense_rank(),会有并列的情况。最后筛选出前10名的学生信息。
3、请使用你擅长的一种编程语言和框架统计每科都及格的学生 (请提供编程代码和结果截图)(4分)
输出样式 学号,姓名,班级,科目名,分数 1500100001,施笑槐,文科六班,语文,80
步骤一:创建配置saprksql环境
object test03 { def main(args: Array[String]): Unit = { // 创建配置环境 val spark =SparkSession.builder() .appName("score_count") .master("local[6]") .getOrCreate() import spark.implicits._
步骤二:读取对应的数据集,转为DataFrame形式
// 读取数据集 // 分数信息 val score = spark.sparkContext.textFile("hdfs://192.168.64.129:9000/user/root/testdata/score.txt") val data1 = score.map(item=>((item.split(",")(0)),(item.split(",")(1)),(item.split(",")(2).toDouble)) ) .toDF("学号","科目","成绩") data1.show()
// 学生信息 val student = spark.sparkContext.textFile("hdfs://192.168.64.129:9000/user/root/testdata/students.txt") val data2 = student.map(item=>((item.split(",")(0)),(item.split(",")(1)),(item.split(",")(4))) ) .toDF("学号","姓名","班级") data2.show()
步骤三:合并数据信息表
// 合并分数、学生信息 val data_split = data1.join(data2,data1.col("学号") === data2.col("学号")) .select(data1.col("学号"),data1.col("科目"),data1.col("成绩"),data2.col("姓名"),data2.col("班级")) .where('成绩 > 60.0) data_split.show()
这里在合并数据表的时候,提前做了个筛选,筛选出来成绩大于60的所有信息。
步骤四:自定义窗口函数处理
import org.apache.spark.sql.functions._ // 使用窗口函数进行分区统计 val window = Window.partitionBy('学号) val data60 = data_split.select('学号,'姓名,'班级,'科目,'成绩,count('成绩) over(window) as "count") data60.show()
按照学号分区,count聚合统计大于60分的科目数。
步骤五:查询满足六门均及格的学生信息系
// 查找出6门科目均合格的学生姓名 val pass_student = data60.select('学号,'姓名,'班级,'科目,'成绩) .where('count === 6) pass_student.collect().foreach(println(_)) }}
知识点:
map、reducebykey、sortby等算子的基本用法SparkSql的API使用窗口函数的使用
注意:
环境配置的时候一定要创建入口,并且还要导入隐式转换,以及sql函数包!!!(不然再在数据类型的时候 ---- txt文件读取转为DataFrame时,不能使用toDF)
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~