c语言sscanf函数的用法是什么
305
2022-11-19
Flink入门之Flink程序开发步骤(Java语言)
我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢?
Batch Analytics,右边是 Streaming Analytics。批量计算: 统一收集数据->存储到DB->对数据进行批量处理,对数据实时性邀请不高,比如生成离线报表、月汇总,支付宝年度账单(一年结束批处理计算)Streaming Analytics 流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如 实时报表、车辆实时报警计算等等。
1.开发程序所需依赖
2.获取执行环境
flink程序开发,首要的便是需要获取其执行环境!
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();或者StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
如果使用StreamExecutionEnvironment 默认便是流式处理环境
但是flink1.12.0 开始,流批一体,我们可以自己指定当前计算程序的环境模式
指定为自动模式:AUTOMATIC
此设置后,flink将会自动识别数据源类型
有界数据流,则会采用批方式进行数据处理
无界束流,则会采用流方式进行数据处理
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
强制指定为批数据处理模式:BATCH
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
强制指定为流数据处理模式:STREAMING
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
注意点:
在flink中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATIC 或STREAMING,不可以指定为BATCH否则程序会报错!
3.加载/创建数据源
flink,是一个计算框架,在计算的前提,肯定是要有数据来源啊!
flink可以从多种场景读取加载数据,例如 各类DB 如Mysql、SQL SERVER、MongoDB、各类MQ 如Kafka、RabbitMQ、以及很多常用数据存储场景 如redis、文件(本地文件/HDFS)、scoket…我们在加载数据源的时候,便知道,该数据是有界还是无界了!
flink读取rabbitMQ消息,是有界还是无界呢?当然是无界!因为flink程序启动时,能通过连接知道什么时候MQ中有数据,什么时候没有数据吗?不知道,因为本身MQ中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待MQ中有数据到来,然后处理。
flink读取指定某个文件中的数据,那么此数据源是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!
从集合中读取数据:
DataStream
那么,这是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。
从scoket中读取数据:
DataStreamSource
这是无界数据还是有界数据呢?很明显,无界数据!因为scoket一旦连接,flink不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket中有数据到来,然后处理。
4.数据转换处理
数据转换处理,就是flink使用算子,对从数据源中获取的数据进行数据加工处理(例如 数据转换,计算等等)
例如:开窗口、低阶处理函数ProcessFuction、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。
demo示例:
DataStreamSource
5.处理后数据放置/输出
将计算后的数据,进行放置(输出/存储),可以很地方,从什么地方读取数据,自然也可以将计算结果输出到该地点。
例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket…
输出到控制台
source.print();
6.执行计算程序
启动示例:
// 1.准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置模式 (流、批、自动)// 2.加载数据源// 3.数据转换// 4.数据输出// 5.执行程序env.execute();//或者 env.execute("指定当前计算程序名");
7.完整示例
public class FlinkDemo { public static void main(String[] args) throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 2.加载数据源 DataStreamSource
IDEA执行后,输出结果:
前边序号可以理解为多线程执行时的线程名字!
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~