oracle竖列的数据怎么变成一行
276
2022-11-17
Flink中State管理与恢复之状态后端Backend案例
设置 HDFS 文件系统的状态后端,取消 Job 之后再次恢复 Job。
package stateimport org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.streaming.api.CheckpointingModeimport org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/** * @Author yqq * @Date 2021/12/26 23:55 * @Version 1.0 */object TestCheckPointByHDFS { //使用WordCount案例来测试一下HDFS的状态后端,先运行一段时间Job,然后cansol,在重新启动,看看状态是否是连续的 def main(args: Array[String]): Unit = { //1.初始化Flink流计算的环境 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //开启CheckPoint并设置一些参数 environment.enableCheckpointing(5000)//每个5秒开启一次CheckPoint environment.setStateBackend(new FsStateBackend("hdfs://mycluster/checkpoint/cp1"))//存放检查点数据 environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) environment.getCheckpointConfig.setCheckpointTimeout(5000) environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1) environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//终止job保留检查点数据 //修改并行度 environment.setParallelism(1) //2.导入隐式转换 import org.apache.flink.streaming.api.scala._ //3.读取数据,读取sock流中的数据,DataStream => 相当于spark中的DStream val stream: DataStream[String] = environment.socketTextStream("node1", 8888) //4.转换和处理数据 val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) //分组算子,0 或者 1 代表下标,前面的DataStream[二元组],0=>代表单词 1=>代表出现的次数 .sum(1) //聚合累加算子 //5.打印结果 result.print("结果") //6.启动流计算程序 environment.execute("wordCount") }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~