flume自定义拦截器实现日期在hdfs上分类

网友投稿 243 2022-11-24

flume自定义拦截器实现日期在hdfs上分类

pom.xml org.apache.flume flume-ng-core 1.7.0 自定义拦截器 /** * 自定义flume的拦截器,提取body中的createTimeMS字段作为header */ public class LogCollInterceptor implements Interceptor { private final boolean preserveExisting; private LogCollInterceptor(boolean preserveExisting) { this.preserveExisting = preserveExisting; } public void initialize() { } /** * Modifies events in-place. */ public Event intercept(Event event) { Map headers = event.getHeaders(); //处理时间 byte[] json = event.getBody(); String jsonStr = new String(json); save(jsonStr); AppBaseLog log = JSONObject.parseObject(jsonStr , AppBaseLog.class); long time = log.getCreatedAtMs(); headers.put(TIMESTAMP, Long.toString(time)); save(time +""); //处理log类型的头 //pageLog String logType = "" ; if(jsonStr.contains("pageId")){ logType = "page" ; } //eventLog else if (jsonStr.contains("eventId")) { logType = "event"; } //usageLog else if (jsonStr.contains("singleUseDurationSecs")) { logType = "usage"; } //error else if (jsonStr.contains("errorBrief")) { logType = "error"; } //startup else if (jsonStr.contains("network")) { logType = "startup"; } headers.put("logType", logType); save(logType); return event; } /** * Delegates to {@link #intercept(Event)} in a loop. * * @param events * @return */ public List intercept(List events) { for (Event event : events) { intercept(event); } return events; } public void close() { } /** */ public static class Builder implements Interceptor.Builder { private boolean preserveExisting = PRESERVE_DFLT; public Interceptor build() { return new LogCollInterceptor(preserveExisting); } public void configure(Context context) { preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT); } } /** *保存 */ private void save(String log) { try { FileWriter fw = new FileWriter("/home/centos/l.log",true); fw.append(log + "\r\n"); fw.flush(); fw.close(); } catch (IOException e) { e.printStackTrace(); } } public static class Constants { public static String TIMESTAMP = "timestamp"; public static String PRESERVE = "preserveExisting"; public static boolean PRESERVE_DFLT = false; } } 导出jar部署到linux的flume的lib目录下 配置flume的配置文件 a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.zyd.app.flume.interceptor.LogCollInterceptor$Builder a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = s202:9092 a1.sources.r1.kafka.zookeeperConnect = s202:2181,s203:2181,s204:2181 a1.sources.r1.kafka.topics.regex = ^topic-app-.*$ #a1.sources.r1.kafka.consumer.group.id = g3 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/centos/applogs/%{logType}/%Y%m/%d/%H%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.roundValue = 30 a1.sinks.k1.hdfs.roundUnit = second a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 启动flume $>flume-ng agent -f applog.conf -n a1

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于8位USB单片机CH554的电容屏转接器
下一篇:浅谈Linux的路由与网关
相关文章

 发表评论

暂时没有评论,来抢沙发吧~