Flink 高效sink写入OSS

网友投稿 500 2022-11-22

Flink 高效sink写入OSS

内容框架:

背景介绍功能介绍如何配置如何使用

背景介绍

Apache Flink 简介

Apache Flink 是新一代大数据计算引擎的代表,以分布式流计算为核心,同时支持批处理。特点:

低延时:Flink 流式计算可以做到亚秒甚至毫秒级延时,相比之下 Spark 流计算很难达到秒级高吞吐:Flink 以分布式快照算法实现容错,对吞吐量的影响很小高容错:基于分布式快照算法,Flink 实现了低代价、高效的容错表现,以及 Exactly_Once 语义保证

JindoFS Flink Connector 产生背景

阿里云对象存储 Object Storage Service(OSS):

海量:无限容量,弹性伸缩安全:12个9的数据安全性,多种加密方式低成本:远低于云磁盘,且有多种存储方式、生命周期管理等节约成本高可靠:服务可用性 99.9%已服务于海量用户

Flink 应用广泛:

流计算领域业内主要解决方案Apache 基金会最活跃项目之一未来:流批一体、在线分析

Flink 使用痛点:

开源 ApacheFlink 尚不支持直接写入 OSSHadoop OSS SDK 写入性能不一定满足需求

JindoFS Flink Connector 介绍

整体架构:

两阶段 Checkpoint (检查点) 机制:

第一阶段 MPU (MultiPartUpload,分片上传) 写入 OSS第二阶段 MPU 提交

Recoverable Writer 可恢复性写入:

临时文件以普通文件格式上传 OSSSink 节点状态快照

写入 OSS vs.  写入 亚马逊S3:

Native 实现:数据写入以 C++ 代码实现,相比 Java 更高效高速读写:多线程读写临时文件,对大于1MB的文件优势尤其明显数据缓存:读写 OSS 实现本地缓存,加速外部访问

OSS 访问加速,JindoFS 提供新支持

2.如何配置

如何配置 JindoFS Flink Connector

环境要求:

集群上有开源版本 Flink 软件,版本不低于1.10.0

SDK 配置:

下载所需 SDK 文件:

jindo-flink-sink-${version}.jarjindofs-sdk-${version}.jar下载链接(Github):jar 放置于集群 Flink 目录下 lib 文件夹:

Flink 根目录通常可由 $FLINK_HOME 环境变量获取集群所有节点均需配置

Java SPI:自动加载资源,无需额外配置

文档链接(Github):

Flink Connector

确保集群能够访问 OSS Bucket

前提:已购买 OSS 产品,OSS 网站链接:OSS Bucket,例如正确配置密钥或免密服务等

使用合适的路径,流式写入OSS Bucket

写入 OSS 须使用 oss:// 前缀路径,类似于:oss:///

更多优化!用 JindoFS SDK 加速 OSS 访问,参考

Github:

JindoFS Flink Connector:Java

在程序中开启 Flink Checkpoint

前提:使用可重发的数据源,如 Kafka通过 StreamExecutionEnvironment 对象打开 Checkpoint(示例):建立:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

打开:

env.enableCheckpointing(, CheckpointingMode.EXACTLY_ONCE);

示例程序

下文中,outputStream 是一个预先形成的 DataStream 对象,若需写入 OSS,则可以这样添加 sink:

String outputPath = "oss:///"; StreamingFileSink sink= StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder("UTF-8") ).build(); outputStream.addSink(sink);

上述程序指定将 outputStream 中的String 内容写入 OSS 路径 oss:///,最后还需用 env.execute() 语句执行 Flink 作业,env 是已建立的 StreamExecutionEnvironment 对象最后,将 Java 作业打包为 jar 文件,并用 flink run 在集群提交即可

在程序中使用 JindoFS Flink Connector:Pyflink

与Java 示例类似,在 Pyflink 中使用 JindoFS Flink Connector 与写入 HDFS 等其他介质方式相同,只需:

将写入路径写作合适的 OSS 路径注意打开 Checkpoint 功能

例如,下列 Python 程序定义了一张位于 OSS 的表:

sink_dest = "oss:///"sink_ddl = f""" CREATE TABLE mySink ( uid INT, pid INT ) PARTITIONED BY ( pid ) WITH ( 'connector' = 'filesystem', 'fpath' = '{sink_dest}', 'format' = 'csv', 'sink.rolling-policy.file-size' = '2MB', 'sink.partition-commit.policy.kind' = 'success-file' ) """

然后将其添加到 StreamTableEnvironmentt_env 中即可:t_env.sql_update(sink_ddl)

在程序中使用 JindoFS Flink Connector:更多配置

用户通过 flink run 提交 java 或 pyflink 程序时,可以额外自定义一些参数,格式:

flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

目前支持“熵注入”及“分片上传并行度”两项配置

熵注入(entropyinjection):

功能:将写入路径的一段特定字符串匹配出来,用一段随机的字符串进行替换效果:削弱所谓 “片区” (sharding) 效应,提高写入效率配置参数:

oss.entropy.key=

oss.entropy.length=

分片上传并行度

配置参数:oss.upload.max.concurrent.uploads默认值:当前可用的处理器数量

效果:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Flink企业级优化全面总结(3万字长文,15张图)
下一篇:Springboot中用 Netty 开启UDP服务方式
相关文章

 发表评论

暂时没有评论,来抢沙发吧~