|NO.Z.00013|——————————|BigDataEnd|——|Hadoop&PB级数仓.V05|——|PB数仓.v05|会员活跃度分析|自定义拦截器实现&测试|

网友投稿 250 2022-11-19

|NO.Z.00013|——————————|BigDataEnd|——|Hadoop&PB级数仓.V05|——|PB数仓.v05|会员活跃度分析|自定义拦截器实现&测试|

一、自定义拦截器

### --- 自定义拦截器~~~ 前面 Flume Agent 的配置使用了本地时间,可能导致数据存放的路径不正确。~~~ 要解决以上问题需要使用自定义拦截器。~~~ agent用于测试自定义拦截器。netcat source =>logger sink

### --- 创建自定义拦截器Flume-agent参数[root@hadoop02 ~]# vim /data/yanqidw/conf/flumetest1.conf# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1a1.sources = r1a1.channels = c1a1.sinks = k1# sourcea1.sources.r1.type = netcata1.sources.r1.bind = hadoop02a1.sources.r1.port = 9999a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = cn.yanqi.dw.flume.interceptor.CustomerInterceptor$Builder# channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100# sinka1.sinks.k1.type = logger# source、channel、sink之间的关系a1.sources.r1.channels = c1a1.sinks.k1.channel = c1

二、自定义拦截器实现原理

### --- 自定义拦截器的原理:~~~ 自定义拦截器要集成Flume 的 Interceptor~~~ Event 分为header 和 body(接收的字符串)~~~ 获取header和body~~~ 从body中获取"time":1596382570539,并将时间戳转换为字符串 "yyyy-MMdd"~~~ 将转换后的字符串放置header中

### --- 自定义拦截器的实现:~~~ 获取 event 的 header~~~ 获取 event 的 body~~~ 解析body获取json串~~~ 解析json串获取时间戳~~~ 将时间戳转换为字符串 "yyyy-MM-dd"~~~ 将转换后的字符串放置header中~~~ 返回event

三、创建maven项目:创建一个maven项目:cn.yanqi.dw

### --- 导入相关依赖 UTF-8 org.apache.flume flume-ng-core 1.9.0 provided com.alibaba fastjson 1.1.23 junit junit 4.12 provided maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single

四、编程实现自定义拦截器

### --- 准备测试日志文件内容[root@hadoop02 ~]# more /data/yanqidw/logs/data/start0802.log2020-07-21 16:26:23.187 [main] INFO com.yanqi.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595260800000},"attr":{"area":"杭州","uid":"2F10092A3","app_v":"1.1.14","event_type":"common","device_id":"1FB872-9A1003","os_type":"4.0","channel":"EN","language":"chinese","brand":"xiaomi-4"}}

### --- 编程代码实现自定义拦截器package cn.yanqi.dw.flume.interceptor;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.commons.compress.utils.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.event.SimpleEvent;import org.apache.flume.interceptor.Interceptor;import org.junit.Test;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.time.format.DateTimeFormatter;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class CustomerInterceptor implements Interceptor { @Override public void initialize() { } @Override // 逐条处理event public Event intercept(Event event) { // 获取 event 的 body String eventBody = new String(event.getBody(), Charsets.UTF_8); // 获取 event 的 header Map headersMap = event.getHeaders(); // 解析body获取json串 String[] bodyArr = eventBody.split("\\s+"); try{ String jsonStr = bodyArr[6]; // 解析json串获取时间戳 JSONObject jsonObject = JSON.parseObject(jsonStr); String timestampStr = jsonObject.getJSONObject("app_active").getString("time"); // 将时间戳转换为字符串 "yyyy-MM-dd" // 将字符串转换为Long long timestamp = Long.parseLong(timestampStr); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); Instant instant = Instant.ofEpochMilli(timestamp); LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); String date = formatter.format(localDateTime); // 将转换后的字符串放置header中 headersMap.put("logtime", date); event.setHeaders(headersMap); }catch (Exception e){ headersMap.put("logtime", "Unknown"); event.setHeaders(headersMap); } return event; } @Override public List intercept(List events) { List lstEvent = new ArrayList<>(); for (Event event: events){ Event outEvent = intercept(event); if (outEvent != null) { lstEvent.add(outEvent); } } return lstEvent; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomerInterceptor(); } @Override public void configure(Context context) { } } @Test public void testJunit(){ String str = "2020-07-21 16:26:23.187 [main] INFO com.yanqi.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1595260800000},\"attr\":{\"area\":\"杭州\",\"uid\":\"2F10092A3\",\"app_v\":\"1.1.14\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1003\",\"os_type\":\"4.0\",\"channel\":\"EN\",\"language\":\"chinese\",\"brand\":\"xiaomi-4\"}}\n"; Map map = new HashMap<>(); // new Event Event event = new SimpleEvent(); event.setHeaders(map); event.setBody(str.getBytes(Charsets.UTF_8)); // 调用interceptor处理event CustomerInterceptor customerInterceptor = new CustomerInterceptor(); Event outEvent = customerInterceptor.intercept(event); // 处理结果 Map headersMap = outEvent.getHeaders(); System.out.println(JSON.toJSONString(headersMap)); }}

### --- 测试编译打印~~~ # 使用时间打印时间:执行结果为:2020-07-21D:\JAVA\jdk1.8.0_231\bin\java.exe -ea -Didea.test.cyclic.buffer.size=1048576 "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=65129:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar;D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\plugins\junit\lib\junit5-rt.jar;D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\plugins\junit\lib\junit-rt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\cn.yanqi.dw\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-core\1.9.0\flume-ng-core-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-sdk\1.9.0\flume-ng-sdk-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-configuration\1.9.0\flume-ng-configuration-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-configfilters\flume-ng-config-filter-api\1.9.0\flume-ng-config-filter-api-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-auth\1.9.0\flume-ng-auth-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Administrator\.m2\repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.1\commons-io-2.1.jar;C:\Users\Administrator\.m2\repository\commons-codec\commons-codec\1.8\commons-codec-1.8.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\Administrator\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.8.8\jackson-core-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.8.8\jackson-mapper-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.0.4.1\snappy-java-1.0.4.1.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;C:\Users\Administrator\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro-ipc\1.7.4\avro-ipc-1.7.4.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty\6.1.26\jetty-6.1.26.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;C:\Users\Administrator\.m2\repository\org\apache\velocity\velocity\1.7\velocity-1.7.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\Administrator\.m2\repository\io\netty\netty\3.10.6.Final\netty-3.10.6.Final.jar;C:\Users\Administrator\.m2\repository\joda-time\joda-time\2.9.9\joda-time-2.9.9.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-servlet\9.4.6.v20170531\jetty-servlet-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-security\9.4.6.v20170531\jetty-security-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-util\9.4.6.v20170531\jetty-util-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-server\9.4.6.v20170531\jetty-server-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit4 cn.yanqi.dw.flume.interceptor.CustomerInterceptor,testJunit{"logtime":"2020-07-21"}~~~ # 若是去掉时间戳:2020-07-21 16:26:23.187 执行结果为unknown,解析不出来D:\JAVA\jdk1.8.0_231\bin\java.exe -ea -Didea.test.cyclic.buffer.size=1048576 "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=52044:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar;D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\plugins\junit\lib\junit5-rt.jar;D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\plugins\junit\lib\junit-rt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\cn.yanqi.dw\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-core\1.9.0\flume-ng-core-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-sdk\1.9.0\flume-ng-sdk-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-configuration\1.9.0\flume-ng-configuration-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-configfilters\flume-ng-config-filter-api\1.9.0\flume-ng-config-filter-api-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-auth\1.9.0\flume-ng-auth-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Administrator\.m2\repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.1\commons-io-2.1.jar;C:\Users\Administrator\.m2\repository\commons-codec\commons-codec\1.8\commons-codec-1.8.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\Administrator\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.8.8\jackson-core-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.8.8\jackson-mapper-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.0.4.1\snappy-java-1.0.4.1.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;C:\Users\Administrator\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro-ipc\1.7.4\avro-ipc-1.7.4.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty\6.1.26\jetty-6.1.26.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;C:\Users\Administrator\.m2\repository\org\apache\velocity\velocity\1.7\velocity-1.7.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\Administrator\.m2\repository\io\netty\netty\3.10.6.Final\netty-3.10.6.Final.jar;C:\Users\Administrator\.m2\repository\joda-time\joda-time\2.9.9\joda-time-2.9.9.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-servlet\9.4.6.v20170531\jetty-servlet-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-security\9.4.6.v20170531\jetty-security-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-util\9.4.6.v20170531\jetty-util-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-server\9.4.6.v20170531\jetty-server-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit4 cn.yanqi.dw.flume.interceptor.CustomerInterceptor,testJunit{"logtime":"Unknown"}

五、将自定义拦截器打包并上传到 flume/lib

### --- 将程序打包,放在/opt/yanqi/servers/flume-1.9.0/lib/目录下;~~~ ——>Maven——>clean——>package——>END

### --- 将打好的jar包上传到服务上[root@hadoop02 ~]# ls /data/yanqidw/jars/cn.yanqi.dw-1.0-SNAPSHOT-jar-with-dependencies.jar ~~~ # 创建软连接[root@hadoop02 ~]# ln -s /data/yanqidw/jars/cn.yanqi.dw-1.0-SNAPSHOT-jar-with-dependencies.jar \/opt/yanqi/servers/flume-1.9.0/lib/cn.yanqi.dw-1.0-SNAPSHOT-jar-with-dependencies.jar

六、启动Agent

### --- 启动Agent[root@hadoop02 ~]# flume-ng agent --conf /opt/yanqi/servers/flume-1.9.0/conf/ \--conf-file /data/yanqidw/conf/flumetest1.conf -name a1 \-Dflume.root.logger=INFO,console~~~接收到的日志时间参数2021-09-28 19:32:45,595 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{logtime=2020-07-21} # 拦截器拦截到的时间戳2021-09-28 19:33:47,609 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{logtime=Unknown} # 拿到的参数是没有时间戳的

### --- 验证自定义拦截器[root@hadoop02 ~]# telnet hadoop02 9999Trying 192.168.1.122...Connected to hadoop02.Escape character is '^]'.~~~ 写入日志参数2020-07-21 16:26:23.187 [main] INFO com.yanqi.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595260800000},"attr":{"area":"杭州","uid":"2F10092A3","app_v":"1.1.14","event_type":"common","device_id":"1FB872-9A1003","os_type":"4.0","channel":"EN","language":"chinese","brand":"xiaomi-4"}} OKhello world!!!OK

Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart

——W.S.Landor

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java中Calendar类的一些常用方法小结
下一篇:苹果提高利润? 新款iPhone砍掉充电接口
相关文章

 发表评论

暂时没有评论,来抢沙发吧~