oracle竖列的数据怎么变成一行
333
2022-11-17
Flink中Window详解之Window的聚合函数ReduceFunction
如果定义了 Window Assigner 之后,下一步就可以定义窗口内数据的计算逻辑,这也就 是 Window Function 的定义。Flink 中提供了四种类型的 Window Function,分别为 ReduceFunction、AggregateFunction 以及 ProcessWindowFunction,(sum 和 max)等 前三种类型的 Window Fucntion 按照计算原理的不同可以分为两大类:
一类是增量聚合函数:对应有 ReduceFunction、AggregateFunction;另一类是全量窗口函数,对应有 ProcessWindowFunction(还有 WindowFunction)。
增量聚合函数计算性能较高,占用存储空间少,主要因为基于中间状态的计算结果,窗 口中只维护中间结果状态值,不需要缓存原始数据。而全量窗口函数使用的代价相对较高, 性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口 触发的时候,对所有的原始数据进行汇总计算。
ReduceFunction 定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚 合的逻辑,然后输出类型相同的一个结果元素。
package windowimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Time/** * @Author yqq * @Date 2021/12/27 19:01 * @Version 1.0 */case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)object ReduceFunctionByWindowTest { def main(args: Array[String]): Unit = { val environment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._ //每隔5秒中统计每个基站的日志数量 //读取数据源 val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888) .map(line => { val arr: Array[String] = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) //开窗 stream.map(log=>{(log.sid,1)}) .keyBy(_._1) .timeWindow(Time.seconds(5))//开窗 .reduce((t1,t2)=>{(t1._1,t1._2+t2._2)}) .print() environment.execute() }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~