Flink001---offset设置窗口起点

网友投稿 261 2022-09-17

Flink001---offset设置窗口起点

Intro

滚动窗口,想要设置窗口开始的时点,怎么弄。举例说明:

watermart设置为3s滚动窗口长度设置为5s起点设置为3,即[3,8)是一个窗口

Code

代码没啥说的,就是个offset的使用

import org.apache.flink.api.common.functions.ReduceFunctionimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingProcessingTimeWindows, TumblingEventTimeWindows}import org.apache.flink.streaming.api.windowing.time.Timeimport com.flink.sourceread.SensorReadingimport com.util.TimeUtils.convertTimeStamp2DateStrimport scala.collection.mutable.ArrayBufferobject waterMarkTest { def main(args: Array[String]): Unit = { val dataList = List( SensorReading("sensor_1", 1609473600, 35.8) ) // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置时间语义-事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 水位线,每隔50毫秒产生一个watermark // 相当于50ms检验一次,完成数据分配 env.getConfig.setAutoWatermarkInterval(50) // windows 通过nc -lp 7777 命令写入 val inputStream = env.socketTextStream("localhost", 7777) // 先转换成样例类类型(简单转换操作) val dataStream = inputStream .map(data => { val arr = data.trim.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L //秒转成毫秒 }) //waterMark延迟时间3秒 // 窗口起始点 val latetag = new OutputTag[(String, Double, Long, String)]("late") // 每5秒统计一次,窗口内各传感器所有温度的最小值,以及最新的时间戳 val resultStream = dataStream .map( data => (data.id, data.timestamp, ArrayBuffer(data.timestamp), convertTimeStamp2DateStr(data.timestamp)) ) .keyBy(_._1) // 按照二元组的第一个元素(id)分组 // 滚动时间窗口,5s滑窗,1s偏移,即统计[3,8)的数据 .window(TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(3))) .reduce((curRes, newData) => (curRes._1, newData._2, curRes._3++newData._3, convertTimeStamp2DateStr(newData._2))) resultStream.print("result") env.execute("window test") }}

测试情况

input:

sensor_1", 1609473600, 35.8sensor_1", 1609473601, 35.8sensor_1", 1609473602, 35.8sensor_1", 1609473603, 35.8sensor_1", 1609473604, 35.8sensor_1", 1609473605, 35.8sensor_1", 1609473606, 35.8sensor_1", 1609473607, 35.8sensor_1", 1609473608, 35.8sensor_1", 1609473609, 35.8sensor_1", 1609473610, 35.8sensor_1", 1609473620, 35.8

output:

result> (sensor_1",1609473602,ArrayBuffer(1609473600, 1609473601, 1609473602),2021-01-01 12:00:02)result> (sensor_1",1609473607,ArrayBuffer(1609473603, 1609473604, 1609473605, 1609473606, 1609473607),2021-01-01 12:00:07)result> (sensor_1",1609473610,ArrayBuffer(1609473608, 1609473609, 1609473610),2021-01-01 12:00:10)

测试逻辑:

输入0-10s的数据可以看到[0,1,2],[3,4,5,6,7],[8,9,10]分成三块

2021-12-03 于南京市江宁区九龙湖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PyPackage01---Pandas06_取子集subset
下一篇:CS299读书笔记
相关文章

 发表评论

暂时没有评论,来抢沙发吧~