c语言sscanf函数的用法是什么
250
2022-11-21
5.Flink实时项目之业务数据准备
1. 流程介绍
在上一篇文章中,我们已经把客户端的页面日志,启动日志,曝光日志分别发送到kafka对应的主题中。在本文中,我们将把业务数据也发送到对应的kafka主题中。
通过maxwell采集业务数据变化,相当于是ods数据,把采集的数据发送到kafka的topic(ods_base_db_m)中,然后flink从kafka消费数据,这个过程有维度数据,就放到hbase中,其他事实数据再发送给kafka作为dwd层。flink消费kafka数据可以做一些简单的ETL处理,比如过滤空值,长度限制。
2. 消费kafka数据及ETL操作
项目地址:gmall-realtime 的dwd包下创建类:BaseDbTask.java
具体步骤就看代码了
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
/**
* 从kafka读取业务数据
* @author: zhangbao
* @date: 2021/8/15 21:10
* @desc:
**/
public class BaseDbTask {
public static void main(String[] args) {
//1.获取flink环境
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
//设置并行度
env.setParallelism(4);
//设置检查点
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseDbApp"));
//指定哪个用户读取hdfs文件
System.setProperty("HADOOP_USER_NAME","zhangbao");
//2.从kafka获取topic数据
String topic = "ods_base_db_m";
String group = "base_db_app_group";
FlinkKafkaConsumer
3. 动态分流
由于MaxWell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为 Flink 实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
一种是用 Zookeeper 存储,通过 Watch 感知数据变化。 另一种是用 mysql 数据库存储,周期性的同步或使用flink-cdc实时同步。
这里选择第二种方案,周期性同步,flink-cdc方式可自行尝试,主要是 mysql 对于配置数据初始化和维护管理,用 sql 都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。
所以就有了如下图:
业务数据保存到Kafka 的主题中,维度数据保存到Hbase 的表中。
4. mysql配置
① 在 gmall-realtime 模块添加依赖
② 单独创建数据库gmall2021_realtime
create database gmall2021_realtime; CREATE TABLE `table_process` ( `source_table` varchar(200) NOT NULL COMMENT '来源表', `operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete', `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka', `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)', `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段', `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段', `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展', PRIMARY KEY (`source_table`,`operate_type`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
③ 创建实体类
package com.zhangbao.gmall.realtime.bean; import lombok.Data; /** * @author: zhangbao * @date: 2021/8/22 13:06 * @desc: **/ @Data public class TableProcess { //动态分流 Sink 常量 改为小写和脚本一致 public static final String SINK_TYPE_HBASE = "hbase"; public static final String SINK_TYPE_KAFKA = "kafka"; public static final String SINK_TYPE_CK = "clickhouse"; //来源表 private String sourceTable; //操作类型 insert,update,delete private String operateType; //输出类型 hbase kafka private String sinkType; //输出表(主题) private String sinkTable; //输出字段 private String sinkColumns; //主键字段 private String sinkPk; //建表扩展 private String sinkExtend; }
④ mysql工具类
package com.zhangbao.gmall.realtime.utils;
import com.google.common.base.CaseFormat;
import com.zhangbao.gmall.realtime.bean.TableProcess;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang.reflect.FieldUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* @author: zhangbao
* @date: 2021/8/22 13:09
* @desc:
**/
public class MysqlUtil {
private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
private static final String URL = "jdbc:mysql://192.168.88.71:3306/gmall2021_realtime?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8";
private static final String USER_NAME = "root";
private static final String USER_PWD = "123456";
public static void main(String[] args) {
String sql = "select * from table_process";
List
5. 程序分流
如图定义一个mapFunction函数
1.在open方法中初始化配置信息,并周期开启一个任务刷新配置 2.在任务中根据配置创建数据表 3.分流
主任务流程
package com.zhangbao.gmall.realtime.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.app.func.TableProcessFunction;
import com.zhangbao.gmall.realtime.bean.TableProcess;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.OutputTag;
/**
* 从kafka读取业务数据
* @author: zhangbao
* @date: 2021/8/15 21:10
* @desc:
**/
public class BaseDbTask {
public static void main(String[] args) {
//1.获取flink环境
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
//设置并行度
env.setParallelism(4);
//设置检查点
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseDbApp"));
//指定哪个用户读取hdfs文件
System.setProperty("HADOOP_USER_NAME","zhangbao");
//2.从kafka获取topic数据
String topic = "ods_base_db_m";
String group = "base_db_app_group";
FlinkKafkaConsumer
创建TableProcessFunction自定义任务
这里包括上面说的四个步骤
初始化并周期读取配置数据 执行每条数据 过滤字段 标记数据流向,根据配置写入对应去向,维度数据就写入hbase,事实数据就写入kafka
package com.zhangbao.gmall.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.bean.TableProcess;
import com.zhangbao.gmall.realtime.common.GmallConfig;
import com.zhangbao.gmall.realtime.utils.MysqlUtil;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
/**
* @author: zhangbao
* @date: 2021/8/26 23:24
* @desc:
**/
@Log4j2(topic = "gmall-logger")
public class TableProcessFunction extends ProcessFunction
6. 重启策略
flink程序在运行时,有错误会抛出异常,程序就停止了,但当开始checkpoint检查点时,flink重启策略就是开启的,如果程序出现异常了,程序就会一直重启,并且重启次数是Integer.maxValue,这个过程也看不到错误信息,是很不友好的。
flink可以设置重启策略,所以在我们开启checkpoint检查点时,设置不需要重启就可以看到错误信息了:
env.setRestartStrategy(RestartStrategies.noRestart());
下面我们测试一下。
package com.zhangbao.gmall.realtime.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.app.func.TableProcessFunction;
import com.zhangbao.gmall.realtime.bean.TableProcess;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.OutputTag;
/**
* 从kafka读取业务数据
* @author: zhangbao
* @date: 2021/8/15 21:10
* @desc:
**/
public class Test {
public static void main(String[] args) {
//1.获取flink环境
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
//设置并行度
env.setParallelism(4);
//设置检查点
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseDbApp"));
//指定哪个用户读取hdfs文件
System.setProperty("HADOOP_USER_NAME","zhangbao");
//flink重启策略,
// 如果开启上面的checkpoint,重启策略就是自动重启,程序有问题不会有报错,
// 如果没有开启checkpoint,就不会自动重启,所以这里设置不需要重启,就可以查看错误信息
env.setRestartStrategy(RestartStrategies.noRestart());
//2.从kafka获取topic数据
String topic = "ods_base_db_m";
String group = "test_group";
FlinkKafkaConsumer
在程序对数据进行转换过程中,我们加了 System.out.println(4/0); 这样一行代码,肯定会抛出异常的。
在设置不需要重启后,就可以看到错误信息了,当你把设置不需要重启一行代码注释掉,就会发现程序是一直在运行中的,并且没有任何错误信息。
在实际应用中,根据需要可以自行设置。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~