oracle竖列的数据怎么变成一行
299
2022-11-17
Flink中State管理与恢复之Savepint案例
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用 Checkpoints 的机制。 Savepoints 是用户以手工命令的方式触发 Checkpoint,并将结果持久化到指定的存储路径 中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机 运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从 而无法实现从端到端的 Excatly-Once 语义保证。
1.配置 Savepoints 的存储路径 在 flink-conf.yaml 中配置 SavePoint 存储的位置,设置后,如果要创建指定 Job 的 SavePoint,可以不用在手动执行命令时指定 SavePoint 的位置。
state.savepoints.dir: hdfs://mycluster/savepoint/
2.在代码中设置算子 ID 为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员 通过手动给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子 指定 ID,则会由 Flink 自动给每个算子生成一个 ID。而这些自动生成的 ID 依赖于程序的结 构,并且对代码的更改是很敏感的。因此,强烈建议用户手动设置 ID。
package stateimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/** * @Author yqq * @Date 2021/12/27 14:49 * @Version 1.0 */object TestSavePoints { def main(args: Array[String]): Unit = { //1.初始化Flink流计算的环境 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //修改并行度 // environment.setParallelism(1) //2.导入隐式转换 import org.apache.flink.streaming.api.scala._ //3.读取数据,读取sock流中的数据,DataStream => 相当于spark中的DStream val stream: DataStream[String] = environment.socketTextStream("node1", 8888) .uid("sock001") //4.转换和处理数据 val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")) .uid("flatmap001") .map((_, 1)) .uid("map001") .keyBy(0) //分组算子,0 或者 1 代表下标,前面的DataStream[二元组],0=>代表单词 1=>代表出现的次数 .sum(1) //聚合累加算子 .uid("sum001") //5.打印结果 result.print("结果") //6.启动流计算程序 environment.execute("wordCount") }}
3.触发 SavePoint 先启动Job
[root@node1 bin]# ./flink run -c state.TestSavePoints /root/test/Flink-test-1.0-SNAPSHOT.jar Starting execution of program
输入单词
[root@node1 ~]# nc -lk 8888hello flink flink
[root@node1 bin]# ./flink listWaiting for response...------------------ Running/Restarting Jobs -------------------27.12.2021 16:29:12 : 8b960baee4269a804938e31792f984d3 : wordCount (RUNNING)--------------------------------------------------------------No scheduled jobs.[root@node1 bin]# ./flink savepoint 8b960baee4269a804938e31792f984d3Triggering savepoint for job 8b960baee4269a804938e31792f984d3.Waiting for response...Savepoint completed. Path: hdfs://mycluster/savepoint/savepoint-8b960b-fc5cc036f1f2You can resume your program from this savepoint with the run command.[root@node1 bin]# ./flink cancel 8b960baee4269a804938e31792f984d3Cancelling job 8b960baee4269a804938e31792f984d3.Cancelled job 8b960baee4269a804938e31792f984d3.
从 SavePoint 启动 Job、也可以通过 Web UI 启动 Job
[root@node1 bin]# ./flink run -s hdfs://mycluster/savepoint/savepoint-8b960b-fc5cc036f1f2 -c state.TestSavePoints /root/test/Flink-test-1.0-SNAPSHOT.jar Starting execution of program
再次输入一个单词flink
[root@node1 ~]# nc -lk 8888hello flink flinkflink
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~