|NO.Z.00015|——————————|BigDataEnd|——|Hadoop&PB级数仓.V07|——|PB数仓.v07|会员活跃度分析|自定义拦截器实现&测试|

网友投稿 249 2022-11-19

|NO.Z.00015|——————————|BigDataEnd|——|Hadoop&PB级数仓.V07|——|PB数仓.v07|会员活跃度分析|自定义拦截器实现&测试|

一、采集启动日志和事件日志

### --- 采集启动日志和事件日志~~~ 本系统中要采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下。~~~ 要想一次拿到全部日志需要监控多个目录。

### --- 总体思路~~~ taildir监控多个目录~~~ 修改自定义拦截器,不同来源的数据加上不同标志~~~ hdfs sink 根据标志写文件

二、Agent配置

### --- Agent配置[root@hadoop02 ~]# vim /data/yanqidw/conf/flume-log2hdfs3.confa1.sources = r1a1.sinks = k1a1.channels = c1# taildir sourcea1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /data/yanqidw/conf/startlog_position.jsona1.sources.r1.filegroups = f1 f2a1.sources.r1.filegroups.f1 = /data/yanqidw/logs/start/.*loga1.sources.r1.headers.f1.logtype = starta1.sources.r1.filegroups.f2 = /data/yanqidw/logs/event/.*loga1.sources.r1.headers.f2.logtype = event# 自定义拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = cn.yanqi.dw.flume.interceptor.LogTypeInterceptor$Builder# memorychannela1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 2000# hdfs sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/a1.sinks.k1.hdfs.filePrefix = startlog.a1.sinks.k1.hdfs.fileType = DataStream# 配置文件滚动方式(文件大小32M)a1.sinks.k1.hdfs.rollSize = 33554432a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.idleTimeout = 0a1.sinks.k1.hdfs.minBlockReplicas = 1# 向hdfs上刷新的event的个数a1.sinks.k1.hdfs.batchSize = 1000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

~~~ # filegroups~~~ 指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)~~~ # headers..~~~ 给 event 增加header key。不同的filegroup,可配置不同的value

三、自定义拦截器

### --- 自定义拦截器package cn.yanqi.dw.flume.interceptor;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import org.apache.commons.compress.utils.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.event.SimpleEvent;import org.apache.flume.interceptor.Interceptor;import org.junit.Test;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.time.format.DateTimeFormatter;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class LogTypeInterceptor implements Interceptor { @Override public void initialize() { } @Override // 逐条处理event public Event intercept(Event event) { // 获取 event 的 body String eventBody = new String(event.getBody(), Charsets.UTF_8); // 获取 event 的 header Map headersMap = event.getHeaders(); // 解析body获取json串 String[] bodyArr = eventBody.split("\\s+"); try{ String jsonStr = bodyArr[6]; // 解析json串获取时间戳 String timestampStr = ""; JSONObject jsonObject = JSON.parseObject(jsonStr); if (headersMap.getOrDefault("logtype", "").equals("start")){ // 取启动日志的时间戳 timestampStr = jsonObject.getJSONObject("app_active").getString("time"); } else if (headersMap.getOrDefault("logtype", "").equals("event")) { // 取事件日志第一条记录的时间戳 JSONArray jsonArray = jsonObject.getJSONArray("yanqi_event"); if (jsonArray.size() > 0){ timestampStr = jsonArray.getJSONObject(0).getString("time"); } } // 将时间戳转换为字符串 "yyyy-MM-dd" // 将字符串转换为Long long timestamp = Long.parseLong(timestampStr); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); Instant instant = Instant.ofEpochMilli(timestamp); LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); String date = formatter.format(localDateTime); // 将转换后的字符串放置header中 headersMap.put("logtime", date); event.setHeaders(headersMap); }catch (Exception e){ headersMap.put("logtime", "Unknown"); event.setHeaders(headersMap); } return event; } @Override public List intercept(List events) { List lstEvent = new ArrayList<>(); for (Event event: events){ Event outEvent = intercept(event); if (outEvent != null) { lstEvent.add(outEvent); } } return lstEvent; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } @Test public void startJunit(){ String str = "2020-08-02 18:19:32.959 [main] INFO com.yanqi.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1596342840284},\"attr\":{\"area\":\"杭州\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\"brand\":\"iphone-8\"}}"; Map map = new HashMap<>(); // new Event Event event = new SimpleEvent(); map.put("logtype", "start"); event.setHeaders(map); event.setBody(str.getBytes(Charsets.UTF_8)); // 调用interceptor处理event LogTypeInterceptor customerInterceptor = new LogTypeInterceptor(); Event outEvent = customerInterceptor.intercept(event); // 处理结果 Map headersMap = outEvent.getHeaders(); System.out.println(JSON.toJSONString(headersMap)); } @Test public void eventJunit(){ String str = "2020-08-02 18:20:11.877 [main] INFO com.yanqi.ecommerce.AppEvent - {\"yanqi_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"action\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\":1596343881690},{\"name\":\"loading\",\"json\":{\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3\",\"type\":\"1\"},\"time\":1596356988428},{\"name\":\"notification\",\"json\":{\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278},{\"name\":\"favorites\",\"json\":{\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962}],\"attr\":{\"area\":\"杭州\",\"uid\":\"2F10092A4\",\"app_v\":\"1.1.14\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1004\",\"os_type\":\"0.5.0\",\"channel\":\"QL\",\"language\":\"chinese\",\"brand\":\"xiaomi-0\"}}"; Map map = new HashMap<>(); // new Event Event event = new SimpleEvent(); map.put("logtype", "event"); event.setHeaders(map); event.setBody(str.getBytes(Charsets.UTF_8)); // 调用interceptor处理event LogTypeInterceptor customerInterceptor = new LogTypeInterceptor(); Event outEvent = customerInterceptor.intercept(event); // 处理结果 Map headersMap = outEvent.getHeaders(); System.out.println(JSON.toJSONString(headersMap)); }}

四、编译测试

### --- 准备编译日志数据文件[root@hadoop02 ~]# more /data/yanqidw/logs/data/start0802.log~~~ startJunit数据"2020-08-02 18:19:32.959 [main] INFO com.yanqi.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1596342840284},\"attr\":{\"area\":\"杭州\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\"brand\":\"iphone-8\"}}";~~~ eventJunit数据 "2020-08-02 18:20:11.877 [main] INFOcom.yanqi.ecommerce.AppEvent - {\"yanqi_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"action\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\":1596343881690},{\"name\":\"loading\",\"json\":{\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3\",\"type\":\"1\"},\"time\":1596356988428},{\"name\":\"notification\",\"json\":{\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278},{\"name\":\"favorites\",\"json\":{\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962}],\"attr\":{\"area\":\"杭州\",\"uid\":\"2F10092A4\",\"app_v\":\"1.1.14\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1004\",\"os_type\":\"0.5.0\",\"channel\":\"QL\",\"language\":\"chinese\",\"brand\":\"xiaomi-0\"}}";

### --- 编译测试~~~ # 启动startJunit测试D:\JAVA\jdk1.8.0_231\bin\java.exe -ea -Didea.test.cyclic.buffer.size=1048576 "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=58462:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar;D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\plugins\junit\lib\junit5-rt.jar;D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\plugins\junit\lib\junit-rt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\cn.yanqi.dw\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-core\1.9.0\flume-ng-core-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-sdk\1.9.0\flume-ng-sdk-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-configuration\1.9.0\flume-ng-configuration-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-configfilters\flume-ng-config-filter-api\1.9.0\flume-ng-config-filter-api-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-auth\1.9.0\flume-ng-auth-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Administrator\.m2\repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.1\commons-io-2.1.jar;C:\Users\Administrator\.m2\repository\commons-codec\commons-codec\1.8\commons-codec-1.8.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\Administrator\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.8.8\jackson-core-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.8.8\jackson-mapper-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.0.4.1\snappy-java-1.0.4.1.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;C:\Users\Administrator\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro-ipc\1.7.4\avro-ipc-1.7.4.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty\6.1.26\jetty-6.1.26.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;C:\Users\Administrator\.m2\repository\org\apache\velocity\velocity\1.7\velocity-1.7.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\Administrator\.m2\repository\io\netty\netty\3.10.6.Final\netty-3.10.6.Final.jar;C:\Users\Administrator\.m2\repository\joda-time\joda-time\2.9.9\joda-time-2.9.9.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-servlet\9.4.6.v20170531\jetty-servlet-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-security\9.4.6.v20170531\jetty-security-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-util\9.4.6.v20170531\jetty-util-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-server\9.4.6.v20170531\jetty-server-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit4 cn.yanqi.dw.flume.interceptor.LogTypeInterceptor,startJunit{"logtime":"2020-08-02","logtype":"start"}~~~ # 启动eventJunit测试D:\JAVA\jdk1.8.0_231\bin\java.exe -ea -Didea.test.cyclic.buffer.size=1048576 "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=58484:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar;D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\plugins\junit\lib\junit5-rt.jar;D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\plugins\junit\lib\junit-rt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\cn.yanqi.dw\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-core\1.9.0\flume-ng-core-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-sdk\1.9.0\flume-ng-sdk-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-configuration\1.9.0\flume-ng-configuration-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-configfilters\flume-ng-config-filter-api\1.9.0\flume-ng-config-filter-api-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flume\flume-ng-auth\1.9.0\flume-ng-auth-1.9.0.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Administrator\.m2\repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.1\commons-io-2.1.jar;C:\Users\Administrator\.m2\repository\commons-codec\commons-codec\1.8\commons-codec-1.8.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\Administrator\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.8.8\jackson-core-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.8.8\jackson-mapper-asl-1.8.8.jar;C:\Users\Administrator\.m2\repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.0.4.1\snappy-java-1.0.4.1.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;C:\Users\Administrator\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\Administrator\.m2\repository\org\apache\avro\avro-ipc\1.7.4\avro-ipc-1.7.4.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty\6.1.26\jetty-6.1.26.jar;C:\Users\Administrator\.m2\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;C:\Users\Administrator\.m2\repository\org\apache\velocity\velocity\1.7\velocity-1.7.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\Administrator\.m2\repository\io\netty\netty\3.10.6.Final\netty-3.10.6.Final.jar;C:\Users\Administrator\.m2\repository\joda-time\joda-time\2.9.9\joda-time-2.9.9.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-servlet\9.4.6.v20170531\jetty-servlet-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-security\9.4.6.v20170531\jetty-security-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-util\9.4.6.v20170531\jetty-util-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-server\9.4.6.v20170531\jetty-server-9.4.6.v20170531.jar;C:\Users\Administrator\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit4 cn.yanqi.dw.flume.interceptor.LogTypeInterceptor,eventJunit{"logtime":"2020-08-02","logtype":"event"}

五、打包并上传到服务下:

### --- 编码完成后打包上传服务器,放置在$FLUME_HOME/lib 下[root@hadoop02 ~]# ll /data/yanqidw/jars/-rw-r--r-- 1 root root 358821 Sep 28 20:47 cn.yanqi.dw-1.0-SNAPSHOT-jar-with-dependencies.jar[root@hadoop02 ~]# cp /data/yanqidw/jars/cn.yanqi.dw-1.0-SNAPSHOT-jar-with-dependencies.jar \/opt/yanqi/servers/flume-1.9.0/lib/

六、验证测试

### --- 清理环境~~~ # 清理本地环境[root@hadoop02 ~]# rm -f /data/yanqidw/conf/startlog_position.json[root@hadoop02 ~]# rm -f /data/yanqidw/logs/start/*.log[root@hadoop02 ~]# rm -f /data/yanqidw/logs/event/*.log

~~~ # 清理HDFS环境[root@hadoop02 ~]# hdfs dfs -rm -f -r /user/data/logs/event/*[root@hadoop02 ~]# hdfs dfs -rm -f -r /user/data/logs/start/*

### --- 启动agent,拷贝日志,检查HDFS文件~~~ # 启动 Agent[root@hadoop02 ~]# flume-ng agent --conf /opt/yanqi/servers/flume-1.9.0/conf/ \--conf-file /data/yanqidw/conf/flume-log2hdfs3.conf \-name a1 -Dflume.root.logger=INFO,console

~~~ # 拷贝日志~~~ 拷贝event日志数据[root@hadoop02 ~]# cp /data/yanqidw/logs/data/event0721.log /data/yanqidw/logs/event/event1.log[root@hadoop02 ~]# cp /data/yanqidw/logs/data/event0721.log /data/yanqidw/logs/event/event2.log~~~ 拷贝start日志数据[root@hadoop02 event]# cp /data/yanqidw/logs/data/start0721.log /data/yanqidw/logs/start/start1.log[root@hadoop02 event]# cp /data/yanqidw/logs/data/start0721.log /data/yanqidw/logs/start/start2.log

~~~ # 检查HDFS文件[root@hadoop02 ~]# hdfs dfs -ls /user/data/logs/eventdrwxr-xr-x - root supergroup 0 2021-09-28 21:11 /user/data/logs/event/dt=2020-07-21drwxr-xr-x - root supergroup 0 2021-09-28 21:11 /user/data/logs/event/dt=Unknown[root@hadoop02 ~]# hdfs dfs -ls /user/data/logs/startFound 2 itemsdrwxr-xr-x - root supergroup 0 2021-09-28 21:22 /user/data/logs/start/dt=2020-07-21drwxr-xr-x - root supergroup 0 2021-09-28 21:22 /user/data/logs/start/dt=Unknown

### --- 后三台启动agent~~~ # 生产环境中用以下方式启动Agent[root@hadoop02 ~]# nohup flume-ng agent --conf /opt/yanqi/servers/flume-1.9.0/conf/ \--conf-file /data/yanqidw/conf/flume-log2hdfs3.conf \-name a1 -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &

~~~ nohup,该命令允许用户退出帐户/关闭终端之后继续运行相应的进程~~~ /dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞~~~ 标准输入0,从键盘获得输入 /proc/self/fd/0~~~ 标准输出1,输出到屏幕(控制台) /proc/self/fd/1~~~ 错误输出2,输出到屏幕(控制台) /proc/self/fd/2~~~ >/dev/null 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容~~~ 2>&1 错误输出将会和标准输出输出到同一个地方~~~ >/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中

七、日志数据采集小结

### --- 日志数据采集~~~ 使用taildir source 监控指定的多个目录,可以给不同目录的日志加上不同header~~~ 在每个目录中可以使用正则匹配多个文件~~~ 使用自定义拦截器,主要功能是从json串中获取时间戳,加到event的header中~~~ hdfs sink使用event header中的信息写数据(控制写文件的位置)~~~ hdfs文件的滚动方式(基于文件大小、基于event数量、基于时间)~~~ 调节flume jvm内存的分配

Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart

——W.S.Landor

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java Collection 接口和常用方法综合详解
下一篇:|NO.Z.00019|——————————|BigDataEnd|——|Hadoop&PB级数仓.V03|——|PB数仓.v03|会员活跃度分析|json数据处理&使用UDF处理json串|
相关文章

 发表评论

暂时没有评论,来抢沙发吧~