oracle竖列的数据怎么变成一行
458
2022-11-17
Flink中Window详解之Window的聚合函数ProcessWindowFunction
前面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的 窗口函数,虽然已经满足绝大多数场景,但在某些情况下,统计更复杂的指标可能需要依赖 于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到 ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数 据 元 素 的 结 果 计 算 , 例 如 对 整 个 窗 口 数 据 排 序 取 TopN, 这 样 的 需 要 就 必 须 使 用 ProcessWindowFunction。
package windowimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.ProcessWindowFunctionimport org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector/** * @Author yqq * @Date 2021/12/27 21:20 * @Version 1.0 */case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)object ProcessWindowFunctionTest { def main(args: Array[String]): Unit = { val environment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._ //每隔5秒中统计每个基站的日志数量 //读取数据源 val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888) .map(line => { val arr: Array[String] = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) //开窗 stream.map(log=>{(log.sid,1)}) .keyBy(_._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction[(String,Int),(String,Long),String,TimeWindow]{//一个窗口结束的时候调用一次 override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = { println("------------") //整个窗口的数据保存到Iterable,里面有很多数据 out.collect((key,elements.size)) } }) .print() environment.execute() }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~