oracle竖列的数据怎么变成一行
374
2022-11-17
Flink中Window详解之Window的聚合函数AggregateFunction
和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算 函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction 接口相对 ReduceFunction 更加灵活,实现复杂度也相对较高。AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。
package windowimport org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindowsimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector/** * @Author yqq * @Date 2021/12/27 20:42 * @Version 1.0 */case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)object AggregatFunctionTest { def main(args: Array[String]): Unit = { val environment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._ //每隔5秒统计最近8秒内,每个基站的日志数量 //读取数据源 val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888) .map(line => { val arr: Array[String] = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) //开窗 stream.map(log=>{(log.sid,1)}) .keyBy(_._1) .window(SlidingProcessingTimeWindows.of(Time.seconds(8),Time.seconds(5)))//开窗,滑动窗口 .aggregate(new MyAggregateFuntion,new MyWindowFunction) .print() environment.execute() } //MyWindowFunction 输入数据来自于 MyAggregateFuntion,在窗口结束的时候先执行MyAggregateFuntion对象的getResult,然后在执行apply方法 class MyWindowFunction extends WindowFunction[Long,(String,Long),String,TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = { out.collect((key,input.iterator.next()))//next得到的第一个值,迭代器中只有一个值 } } class MyAggregateFuntion extends AggregateFunction[(String,Int),Long,Long] { //初始化一个累加器,开始的时候为0 override def createAccumulator(): Long = 0 //来一条数据执行一次 override def add(value: (String, Int), accumulator: Long): Long = accumulator+value._2 //在窗口结束的时候执行一次 override def getResult(accumulator: Long): Long = accumulator override def merge(a: Long, b: Long): Long = a+b }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~