5.Flink实时项目之业务数据准备

网友投稿 250 2022-11-21

5.Flink实时项目之业务数据准备

1. 流程介绍

在上一篇文章中,我们已经把客户端的页面日志,启动日志,曝光日志分别发送到kafka对应的主题中。在本文中,我们将把业务数据也发送到对应的kafka主题中。

通过maxwell采集业务数据变化,相当于是ods数据,把采集的数据发送到kafka的topic(ods_base_db_m)中,然后flink从kafka消费数据,这个过程有维度数据,就放到hbase中,其他事实数据再发送给kafka作为dwd层。flink消费kafka数据可以做一些简单的ETL处理,比如过滤空值,长度限制。

2. 消费kafka数据及ETL操作

项目地址:gmall-realtime 的dwd包下创建类:BaseDbTask.java

具体步骤就看代码了

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zhangbao.gmall.realtime.utils.MyKafkaUtil; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; /** * 从kafka读取业务数据 * @author: zhangbao * @date: 2021/8/15 21:10 * @desc: **/ public class BaseDbTask { public static void main(String[] args) { //1.获取flink环境 LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); //设置并行度 env.setParallelism(4); //设置检查点 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000); env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseDbApp")); //指定哪个用户读取hdfs文件 System.setProperty("HADOOP_USER_NAME","zhangbao"); //2.从kafka获取topic数据 String topic = "ods_base_db_m"; String group = "base_db_app_group"; FlinkKafkaConsumer kafkaSource = MyKafkaUtil.getKafkaSource(topic, group); DataStreamSource jsonStrDs = env.addSource(kafkaSource); //3.对数据进行json转换 SingleOutputStreamOperator jsonObjDs = jsonStrDs.map(jsonObj -> JSON.parseObject(jsonObj)); //4.ETL, table不为空,data不为空,data长度不能小于3 SingleOutputStreamOperator filterDs = jsonObjDs.filter(jsonObject -> jsonObject.getString("table") != null && jsonObject.getJSONObject("data") != null && jsonObject.getString("data").length() > 3); filterDs.print("json str --->>"); try { env.execute("base db task"); } catch (Exception e) { e.printStackTrace(); } } }

3. 动态分流

由于MaxWell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。

在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为 Flink 实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?

我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

这种可以有两个方案实现

一种是用 Zookeeper 存储,通过 Watch 感知数据变化。 另一种是用 mysql 数据库存储,周期性的同步或使用flink-cdc实时同步。

这里选择第二种方案,周期性同步,flink-cdc方式可自行尝试,主要是 mysql 对于配置数据初始化和维护管理,用 sql 都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。

所以就有了如下图:

业务数据保存到Kafka 的主题中,维度数据保存到Hbase 的表中。

4. mysql配置

① 在 gmall-realtime 模块添加依赖

org.projectlombok lombok 1.18.12 provided commons-beanutils commons-beanutils 1.9.3 com.google.guava guava 29.0-jre mysql mysql-connector-java 5.1.47

② 单独创建数据库gmall2021_realtime

create database gmall2021_realtime; CREATE TABLE `table_process` ( `source_table` varchar(200) NOT NULL COMMENT '来源表', `operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete', `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka', `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)', `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段', `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段', `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展', PRIMARY KEY (`source_table`,`operate_type`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

③ 创建实体类

package com.zhangbao.gmall.realtime.bean; import lombok.Data; /** * @author: zhangbao * @date: 2021/8/22 13:06 * @desc: **/ @Data public class TableProcess { //动态分流 Sink 常量 改为小写和脚本一致 public static final String SINK_TYPE_HBASE = "hbase"; public static final String SINK_TYPE_KAFKA = "kafka"; public static final String SINK_TYPE_CK = "clickhouse"; //来源表 private String sourceTable; //操作类型 insert,update,delete private String operateType; //输出类型 hbase kafka private String sinkType; //输出表(主题) private String sinkTable; //输出字段 private String sinkColumns; //主键字段 private String sinkPk; //建表扩展 private String sinkExtend; }

④ mysql工具类

package com.zhangbao.gmall.realtime.utils; import com.google.common.base.CaseFormat; import com.zhangbao.gmall.realtime.bean.TableProcess; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang.reflect.FieldUtils; import java.sql.*; import java.util.ArrayList; import java.util.List; /** * @author: zhangbao * @date: 2021/8/22 13:09 * @desc: **/ public class MysqlUtil { private static final String DRIVER_NAME = "com.mysql.jdbc.Driver"; private static final String URL = "jdbc:mysql://192.168.88.71:3306/gmall2021_realtime?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8"; private static final String USER_NAME = "root"; private static final String USER_PWD = "123456"; public static void main(String[] args) { String sql = "select * from table_process"; List list = getList(sql, TableProcess.class, true); for (TableProcess tableProcess : list) { System.out.println(tableProcess.toString()); } } public static List getList(String sql,Class clz, boolean under){ Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { Class.forName(DRIVER_NAME); conn = DriverManager.getConnection(URL, USER_NAME, USER_PWD); ps = conn.prepareStatement(sql); rs = ps.executeQuery(); List resultList = new ArrayList<>(); ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); while (rs.next()){ System.out.println(rs.getObject(1)); T obj = clz.newInstance(); for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); String propertyName = ""; if(under){ //指定数据库字段转换为驼峰命名法,guava工具类 propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL,columnName); } //通过guava工具类设置属性值 BeanUtils.setProperty(obj,propertyName,rs.getObject(i)); } resultList.add(obj); } return resultList; } catch (Exception throwables) { throwables.printStackTrace(); new RuntimeException("msql 查询失败!"); } finally { if(rs!=null){ try { rs.close(); } catch (SQLException throwables) { throwables.printStackTrace(); } } if(ps!=null){ try { ps.close(); } catch (SQLException throwables) { throwables.printStackTrace(); } } if(conn!=null){ try { conn.close(); } catch (SQLException throwables) { throwables.printStackTrace(); } } } return null; } }

5. 程序分流

如图定义一个mapFunction函数

1.在open方法中初始化配置信息,并周期开启一个任务刷新配置 2.在任务中根据配置创建数据表 3.分流

主任务流程

package com.zhangbao.gmall.realtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zhangbao.gmall.realtime.app.func.TableProcessFunction; import com.zhangbao.gmall.realtime.bean.TableProcess; import com.zhangbao.gmall.realtime.utils.MyKafkaUtil; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.OutputTag; /** * 从kafka读取业务数据 * @author: zhangbao * @date: 2021/8/15 21:10 * @desc: **/ public class BaseDbTask { public static void main(String[] args) { //1.获取flink环境 LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); //设置并行度 env.setParallelism(4); //设置检查点 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000); env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseDbApp")); //指定哪个用户读取hdfs文件 System.setProperty("HADOOP_USER_NAME","zhangbao"); //2.从kafka获取topic数据 String topic = "ods_base_db_m"; String group = "base_db_app_group"; FlinkKafkaConsumer kafkaSource = MyKafkaUtil.getKafkaSource(topic, group); DataStreamSource jsonStrDs = env.addSource(kafkaSource); //3.对数据进行json转换 SingleOutputStreamOperator jsonObjDs = jsonStrDs.map(jsonObj -> JSON.parseObject(jsonObj)); //4.ETL, table不为空,data不为空,data长度不能小于3 SingleOutputStreamOperator filterDs = jsonObjDs.filter(jsonObject -> jsonObject.getString("table") != null && jsonObject.getJSONObject("data") != null && jsonObject.getString("data").length() > 3); //5.动态分流,事实表写会kafka,维度表写入hbase OutputTag hbaseTag = new OutputTag(TableProcess.SINK_TYPE_HBASE){}; //创建自定义mapFunction函数 SingleOutputStreamOperator kafkaTag = filterDs.process(new TableProcessFunction(hbaseTag)); DataStream hbaseDs = kafkaTag.getSideOutput(hbaseTag); filterDs.print("json str --->>"); try { env.execute("base db task"); } catch (Exception e) { e.printStackTrace(); } } }

创建TableProcessFunction自定义任务

这里包括上面说的四个步骤

初始化并周期读取配置数据 执行每条数据 过滤字段 标记数据流向,根据配置写入对应去向,维度数据就写入hbase,事实数据就写入kafka

package com.zhangbao.gmall.realtime.app.func; import com.alibaba.fastjson.JSONObject; import com.zhangbao.gmall.realtime.bean.TableProcess; import com.zhangbao.gmall.realtime.common.GmallConfig; import com.zhangbao.gmall.realtime.utils.MysqlUtil; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.*; /** * @author: zhangbao * @date: 2021/8/26 23:24 * @desc: **/ @Log4j2(topic = "gmall-logger") public class TableProcessFunction extends ProcessFunction { //定义输出流标记 private OutputTag outputTag; //定义配置信息 private Map tableProcessMap = new HashMap<>(); //在内存中存放已经创建的表 Set existsTable = new HashSet<>(); //phoenix连接对象 Connection con = null; public TableProcessFunction(OutputTag outputTag) { this.outputTag = outputTag; } //只执行一次 @Override public void open(Configuration parameters) throws Exception { //初始化配置信息 log.info("查询配置表信息"); //创建phoenix连接 Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); con = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER); refreshDate(); //启动一个定时器,每隔一段时间重新获取配置信息 //delay:延迟5000执行,每隔5000执行一次 Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { refreshDate(); } },5000,5000); } //每进来一个元素,执行一次 @Override public void processElement(JSONObject jsonObj, Context context, Collector collector) throws Exception { //获取表的修改记录 String table = jsonObj.getString("table"); String type = jsonObj.getString("type"); JSONObject data = jsonObj.getJSONObject("data"); if(type.equals("bootstrap-insert")){ //maxwell更新历史数据时,type类型是bootstrap-insert type = "insert"; jsonObj.put("type",type); } if(tableProcessMap != null && tableProcessMap.size()>0){ String key = table + ":" + type; TableProcess tableProcess = tableProcessMap.get(key); if(tableProcess!=null){ //数据发送到何处,如果是维度表,就发送到hbase,如果是事实表,就发送到kafka String sinkType = tableProcess.getSinkType(); jsonObj.put("sink_type",sinkType); String sinkColumns = tableProcess.getSinkColumns(); //过滤掉不要的数据列,sinkColumns是需要的列 filterColumns(data,sinkColumns); }else { log.info("no key {} for mysql",key); } if(tableProcess!=null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)){ //根据sinkType判断,如果是维度表就分流,发送到hbase context.output(outputTag,jsonObj); }else if(tableProcess!=null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)){ //根据sinkType判断,如果是事实表就发送主流,发送到kafka collector.collect(jsonObj); } } } //过滤掉不要的数据列,sinkColumns是需要的列 private void filterColumns(JSONObject data, String sinkColumns) { String[] cols = sinkColumns.split(","); //转成list集合,用于判断是否包含需要的列 List columnList = Arrays.asList(cols); Set> entries = data.entrySet(); Iterator> iterator = entries.iterator(); while (iterator.hasNext()){ Map.Entry next = iterator.next(); String key = next.getKey(); //如果不包含就删除不需要的列 if(!columnList.contains(key)){ iterator.remove(); } } } //读取配置信息,并创建表 private void refreshDate() { List processList = MysqlUtil.getList("select * from table_process", TableProcess.class, true); for (TableProcess tableProcess : processList) { String sourceTable = tableProcess.getSourceTable(); String operateType = tableProcess.getOperateType(); String sinkType = tableProcess.getSinkType(); String sinkTable = tableProcess.getSinkTable(); String sinkColumns = tableProcess.getSinkColumns(); String sinkPk = tableProcess.getSinkPk(); String sinkExtend = tableProcess.getSinkExtend(); String key = sourceTable+":"+operateType; tableProcessMap.put(key,tableProcess); //在phoenix创建表 if(TableProcess.SINK_TYPE_HBASE.equals(sinkType) && operateType.equals("insert")){ boolean noExist = existsTable.add(sinkTable);//true则表示没有创建表 if(noExist){ createTable(sinkTable,sinkColumns,sinkPk,sinkExtend); } } } } //在phoenix中创建表 private void createTable(String table, String columns, String pk, String ext) { if(StringUtils.isBlank(pk)){ pk = "id"; } if(StringUtils.isBlank(ext)){ ext = ""; } StringBuilder sql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + table +"("); String[] split = columns.split(","); for (int i = 0; i < split.length; i++) { String field = split[i]; if(pk.equals(field)){ sql.append(field + " varchar primary key "); }else { sql.append("info." + field +" varchar "); } if(i < split.length-1){ sql.append(","); } } sql.append(")").append(ext); //创建phoenix表 PreparedStatement ps = null; try { log.info("创建phoenix表sql - >{}",sql.toString()); ps = con.prepareStatement(sql.toString()); ps.execute(); } catch (SQLException throwables) { throwables.printStackTrace(); }finally { if(ps!=null){ try { ps.close(); } catch (SQLException throwables) { throwables.printStackTrace(); throw new RuntimeException("创建phoenix表失败"); } } } if(tableProcessMap == null || tableProcessMap.size()==0){ throw new RuntimeException("没有从配置表中读取配置信息"); } } }

6. 重启策略

flink程序在运行时,有错误会抛出异常,程序就停止了,但当开始checkpoint检查点时,flink重启策略就是开启的,如果程序出现异常了,程序就会一直重启,并且重启次数是Integer.maxValue,这个过程也看不到错误信息,是很不友好的。

flink可以设置重启策略,所以在我们开启checkpoint检查点时,设置不需要重启就可以看到错误信息了:

​ env.setRestartStrategy(RestartStrategies.noRestart());

下面我们测试一下。

package com.zhangbao.gmall.realtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zhangbao.gmall.realtime.app.func.TableProcessFunction; import com.zhangbao.gmall.realtime.bean.TableProcess; import com.zhangbao.gmall.realtime.utils.MyKafkaUtil; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.OutputTag; /** * 从kafka读取业务数据 * @author: zhangbao * @date: 2021/8/15 21:10 * @desc: **/ public class Test { public static void main(String[] args) { //1.获取flink环境 LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); //设置并行度 env.setParallelism(4); //设置检查点 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000); env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseDbApp")); //指定哪个用户读取hdfs文件 System.setProperty("HADOOP_USER_NAME","zhangbao"); //flink重启策略, // 如果开启上面的checkpoint,重启策略就是自动重启,程序有问题不会有报错, // 如果没有开启checkpoint,就不会自动重启,所以这里设置不需要重启,就可以查看错误信息 env.setRestartStrategy(RestartStrategies.noRestart()); //2.从kafka获取topic数据 String topic = "ods_base_db_m"; String group = "test_group"; FlinkKafkaConsumer kafkaSource = MyKafkaUtil.getKafkaSource(topic, group); DataStreamSource jsonStrDs = env.addSource(kafkaSource); jsonStrDs.print("转换前-->"); //3.对数据进行json转换 SingleOutputStreamOperator jsonObjDs = jsonStrDs.map(jsonObj ->{ System.out.println(4/0); JSONObject jsonObject = JSON.parseObject(jsonObj); return jsonObject; }); jsonObjDs.print("转换后-->"); try { env.execute("base db task"); } catch (Exception e) { e.printStackTrace(); } } }

在程序对数据进行转换过程中,我们加了 System.out.println(4/0); 这样一行代码,肯定会抛出异常的。

在设置不需要重启后,就可以看到错误信息了,当你把设置不需要重启一行代码注释掉,就会发现程序是一直在运行中的,并且没有任何错误信息。

在实际应用中,根据需要可以自行设置。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Hadoop究竟是什么鬼
下一篇:Type-C产业链全景分析 告诉您有那些商业机会
相关文章

 发表评论

暂时没有评论,来抢沙发吧~