flinkcdc+hudi0.10+hive(自动同步分区+压缩)

网友投稿 362 2022-11-22

flinkcdc+hudi0.10+hive(自动同步分区+压缩)

版本情况说明:flink1.13.1+scala2.11+CDH6.3.0 Hadoop3.0.0+Hive2.1.1+hudi0.10mysql cdc source DDL:CREATE TABLE `users` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(20) DEFAULT NULL, `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=64 DEFAULT CHARSET=utf8;mysql写入数据 生成binlog数据:insert into users (name) values ('hello2');insert into users (name) values ('world2');insert into users (name) values ('flink2');insert into users (id,name) values (4,'spark');insert into users (name) values ('hudi');select * from users;update users set name = 'luo flinkbj' where id = 60;delete from users where id = 61;flink sql :mysql cdc flink sql DDL:Flink SQL> CREATE TABLE mysql_users (> id BIGINT PRIMARY KEY NOT ENFORCED ,> name STRING,> birthday TIMESTAMP(3),> ts TIMESTAMP(3)> ) WITH (> 'connector' = 'mysql-cdc',> 'hostname' = '127.0.0.1',> 'port' = '3306',> 'username' = 'root',> 'password' = 'root',> 'server-time-zone' = 'Asia/Shanghai',> 'debezium.snapshot.mode'='initial',> 'database-name' = 'luo',> 'table-name' = 'users'> );设置ckp:Flink SQL> set execution.checkpointing.interval=30sec;增加分区字段,创建一个视图缓存:Flink SQL> create view my_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as `partition` FROM mysql_users;-- hudi MOR模式在线压缩测试:'compaction.async.enable' = 'true'---- MOR表压缩:hive 默认生产两张表:luo_sync_hive03_roluo_sync_hive03_rtflinksql hudi hive 分区表 DDL: CREATE TABLE luo_sync_hive03( id bigint , name string, birthday TIMESTAMP(3), ts TIMESTAMP(3), `partition` VARCHAR(20), primary key(id) not enforced --必须指定uuid 主键)PARTITIONED BY (`partition`)with( 'connector'='hudi', 'path' = 'hdfs://nameservice1/tmp/luo/hudi/luo_sync_hive03' , 'hoodie.datasource.write.recordkey.field' = 'id' , 'write.precombine.field' = 'ts' , 'write.tasks' = '1' , 'compaction.tasks' = '1' , 'write.rate.limit' = '2000' , 'table.type' = 'MERGE_ON_READ' , 'compaction.async.enable' = 'true' , 'compaction.trigger.strategy' = 'num_commits' , 'compaction.delta_commits' = '5' , 'changelog.enable' = 'true' , 'read.streaming.enable' = 'true' , 'read.streaming.check-interval' = '4' , 'hive_sync.enable' = 'true' , 'hive_sync.metastore.uris' = 'thrift://hadoop:9083' , 'hive_sync.jdbc_url' = 'jdbc:hive2://hadoop:10000' , 'hive_sync.table' = 'luo_sync_hive03' , 'hive_sync.db' = 'luo' , 'hive_sync.username' = '' , 'hive_sync.password' = '' , 'hive_sync.support_timestamp' = 'true' );-- flinksql 数据写入 hudi,并自动同步创建hive分区表+自动同步数据:insert into luo_sync_hive03 select id,name,birthday,ts,`partition` from my_v;默认 5个chk 触发compactionmysql 端 多次写入数据:insert into users (name) values ('hello25');insert into users (name) values ('world26');insert into users (name) values ('flink27');insert into users (name) values ('flink32');update users set name = 'luo flinkbj' where id = 60;delete from users where id = 61;--------- hive shell 查询hive数据:hive> show partitions luo_sync_hive03_rt;-- select count 异常处理:hive> select count(1) from luo_sync_hive03_ro;Diagnostic Messages for this Task:Error: java.io.IOException: Split class org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit not found at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:369) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:438) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)Caused by: java.lang.ClassNotFoundException: Class org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2409) at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:367) ... 7 morehive> add jar hdfs://nameservice1/tmp/luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SpringBoot 如何使用Dataway配置数据查询接口
下一篇:蓝牙耳机真无线推荐,2020年蓝牙耳机排行榜
相关文章

 发表评论

暂时没有评论,来抢沙发吧~