Scala105-Spark.sql中collect_list用法

网友投稿 208 2022-12-01

Scala105-Spark.sql中collect_list用法

import org.apache.spark.sql.functions._import spark.implicits._import org.apache.spark.ml.feature.VectorAssemblerimport org.apache.spark.ml.linalg.{Vector, Vectors}import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import org.apache.spark.sql.functions._import spark.implicits._import org.apache.spark.ml.feature.VectorAssemblerimport org.apache.spark.ml.linalg.{Vector, Vectors}import org.apache.spark.sql.{DataFrame, Row, SparkSession}

val builder = SparkSession .builder() .appName("learningScala") .config("spark.executor.heartbeatInterval","60s") .config("spark.network.timeout","120s") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer.max","512m") .config("spark.dynamicAllocation.enabled", false) .config("spark.sql.inMemoryColumnarStorage.compressed", true) .config("spark.sql.inMemoryColumnarStorage.batchSize", 10000) .config("spark.sql.broadcastTimeout", 600) .config("spark.sql.autoBroadcastJoinThreshold", -1) .config("spark.sql.crossJoin.enabled", true) .master("local[*]") val spark = builder.getOrCreate()spark.sparkContext.setLogLevel("ERROR")

builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@2b380850spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7b0a688

var df1 = Seq((1,"2019-04-01 11:45:50",11.15,"2019-04-02 11:45:49"),(2,"2019-05-02 11:56:50",10.37,"2019-05-02 11:56:51"),(3,"2019-07-21 12:45:50",12.11,"2019-08-21 12:45:50"),(2,"2019-08-01 12:40:50",14.50,"2020-08-03 12:40:50"),(5,"2019-01-06 10:00:50",16.39,"2019-01-05 10:00:50")).toDF("id","startTimeStr", "payamount","endTimeStr")df1 = df1.withColumn("startTime",$"startTimeStr".cast("Timestamp")) .withColumn("endTime",$"endTimeStr".cast("Timestamp"))df1.printSchemadf1.show()

root |-- id: integer (nullable = false) |-- startTimeStr: string (nullable = true) |-- payamount: double (nullable = false) |-- endTimeStr: string (nullable = true) |-- startTime: timestamp (nullable = true) |-- endTime: timestamp (nullable = true)+---+-------------------+---------+-------------------+-------------------+-------------------+| id| startTimeStr|payamount| endTimeStr| startTime| endTime|+---+-------------------+---------+-------------------+-------------------+-------------------+| 1|2019-04-01 11:45:50| 11.15|2019-04-02 11:45:49|2019-04-01 11:45:50|2019-04-02 11:45:49|| 2|2019-05-02 11:56:50| 10.37|2019-05-02 11:56:51|2019-05-02 11:56:50|2019-05-02 11:56:51|| 3|2019-07-21 12:45:50| 12.11|2019-08-21 12:45:50|2019-07-21 12:45:50|2019-08-21 12:45:50|| 2|2019-08-01 12:40:50| 14.5|2020-08-03 12:40:50|2019-08-01 12:40:50|2020-08-03 12:40:50|| 5|2019-01-06 10:00:50| 16.39|2019-01-05 10:00:50|2019-01-06 10:00:50|2019-01-05 10:00:50|+---+-------------------+---------+-------------------+-------------------+-------------------+df1: org.apache.spark.sql.DataFrame = [id: int, startTimeStr: string ... 4 more fields]df1: org.apache.spark.sql.DataFrame = [id: int, startTimeStr: string ... 4 more fields]

df1.createOrReplaceTempView("temp1")

val sql = s"""SELECT *,collect_list(payamount) over(partition BY id ORDER BY startTimeStr) payamount_arrayFROM temp1"""

sql: String ="SELECT *,collect_list(payamount) over(partition BY id ORDER BY startTimeStr) payamount_arrayFROM temp1"

val dfCollect = spark.sql(sql)

dfCollect: org.apache.spark.sql.DataFrame = [id: int, startTimeStr: string ... 5 more fields]

dfCollect.show()

+---+-------------------+---------+-------------------+-------------------+-------------------+---------------+| id| startTimeStr|payamount| endTimeStr| startTime| endTime|payamount_array|+---+-------------------+---------+-------------------+-------------------+-------------------+---------------+| 1|2019-04-01 11:45:50| 11.15|2019-04-02 11:45:49|2019-04-01 11:45:50|2019-04-02 11:45:49| [11.15]|| 3|2019-07-21 12:45:50| 12.11|2019-08-21 12:45:50|2019-07-21 12:45:50|2019-08-21 12:45:50| [12.11]|| 5|2019-01-06 10:00:50| 16.39|2019-01-05 10:00:50|2019-01-06 10:00:50|2019-01-05 10:00:50| [16.39]|| 2|2019-05-02 11:56:50| 10.37|2019-05-02 11:56:51|2019-05-02 11:56:50|2019-05-02 11:56:51| [10.37]|| 2|2019-08-01 12:40:50| 14.5|2020-08-03 12:40:50|2019-08-01 12:40:50|2020-08-03 12:40:50| [10.37, 14.5]|+---+-------------------+---------+-------------------+-------------------+-------------------+---------------+

SQL的基础逻辑,按照id分组,组内按照startTimeStr排序,拼接payamount组成array,array中元素排序,按照startTimeStr升序排列

2020-05-28 于南京市江宁区九龙湖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:梯度下降&随即梯度下降
下一篇:javaSE基础java自定义注解原理分析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~