Flume拦截器 & 测试Flume-Kafka通道

网友投稿 307 2022-11-16

Flume拦截器 & 测试Flume-Kafka通道

1)创建Maven工程flume-interceptor

2)创建包名:com.atguigu.flume.interceptor

3)在pom.xml文件中添加如下配置

org.apache.flume flume-ng-core 1.9.0 provided com.alibaba fastjson 1.2.62 maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single

注意: scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。

4)在com.atguigu.flume.interceptor包下创建JSONUtils类

package com.atguigu.flume.interceptor;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONException;public class JSONUtils { public static boolean isJSONValidate(String log){ try { JSON.parse(log); return true; }catch (JSONException e){ return false; } }}

5)在com.atguigu.flume.interceptor包下创建LogInterceptor类

package com.atguigu.flume.interceptor;import com.alibaba.fastjson.JSON;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.List;public class ETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { byte[] body = event.getBody(); String log = new String(body, StandardCharsets.UTF_8); if (JSONUtils.isJSONValidate(log)) { return event; } else { return null; } } @Override public List intercept(List list) { Iterator iterator = list.iterator(); while (iterator.hasNext()){ Event next = iterator.next(); if(intercept(next)==null){ iterator.remove(); } } return list; } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new ETLInterceptor(); } @Override public void configure(Context context) { } } @Override public void close() { }}

6)打包

7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

[atguigu@hadoop102 lib]$ ls | grep interceptorflume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

8)分发Flume到hadoop103、hadoop104

[atguigu@hadoop102 module]$ xsync flume/

9)分别在hadoop102、hadoop103上启动Flume

[atguigu@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

[atguigu@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

测试Flume-Kafka通道

(1)生成日志

[atguigu@hadoop102 ~]$ lg.sh

(2)消费Kafka数据,观察控制台是否有数据获取到

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

说明: 如果获取不到数据,先检查Kafka、Flume、Zookeeper是否都正确启动。 再检查Flume的拦截器代码是否正常。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:【WSN通信】基于Matlab LEACH融合树多跳传输协议【含Matlab源码 1897期】
下一篇:以AT89C2051单片机为核心配置的温湿度数据采集系统设计
相关文章

 发表评论

暂时没有评论,来抢沙发吧~