linux怎么查看本机内存大小
307
2022-11-16
Flume拦截器 & 测试Flume-Kafka通道
1)创建Maven工程flume-interceptor
2)创建包名:com.atguigu.flume.interceptor
3)在pom.xml文件中添加如下配置
注意: scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。
4)在com.atguigu.flume.interceptor包下创建JSONUtils类
package com.atguigu.flume.interceptor;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONException;public class JSONUtils { public static boolean isJSONValidate(String log){ try { JSON.parse(log); return true; }catch (JSONException e){ return false; } }}
5)在com.atguigu.flume.interceptor包下创建LogInterceptor类
package com.atguigu.flume.interceptor;import com.alibaba.fastjson.JSON;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.List;public class ETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { byte[] body = event.getBody(); String log = new String(body, StandardCharsets.UTF_8); if (JSONUtils.isJSONValidate(log)) { return event; } else { return null; } } @Override public List
6)打包
7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。
[atguigu@hadoop102 lib]$ ls | grep interceptorflume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
8)分发Flume到hadoop103、hadoop104
[atguigu@hadoop102 module]$ xsync flume/
9)分别在hadoop102、hadoop103上启动Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
[atguigu@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
测试Flume-Kafka通道
(1)生成日志
[atguigu@hadoop102 ~]$ lg.sh
(2)消费Kafka数据,观察控制台是否有数据获取到
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log
说明: 如果获取不到数据,先检查Kafka、Flume、Zookeeper是否都正确启动。 再检查Flume的拦截器代码是否正常。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~