一、Flume--数据采集器基本原理和使用

网友投稿 256 2022-11-26

一、Flume--数据采集器基本原理和使用

一、概述

1、flume是什么

1) Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Linux环境下运行。2) Flume基于流式架构,容错性强,也很灵活简单,架构简单。3) Flume、Kafka用来实时进行数据收集,Spark、Storm用来实时处理数据,impala用来实时查询。

2、flume的基本架构

​ 图1.1 flume架构

说到flume的架构,直接拿官网的图来说就足够了。首先在每个数据源上都会部署一个 flume agent ,这个agent就是用来采取数据的。这个agent由3个组件组成:source,channel,sink。而在flume中,数据传输的基本单位是event。下面讲讲这几个概念

(1)source

用于从数据源采集数据,并将数据传输在channel中。source支持多种数据源采集方式。比如监听端口采集数据,从文件中采集,从目录中采集,从agent的source等均可。

(4)event

传输单元,flume传输的基本单位,包括 headers和body两部分,header可以添加一些头部信息,body则是数据。

3、flume传输过程

基于上面的概念,流程基本很清晰,source监控数据源,如果产生新的数据,则获取数据,并封装成一个event,然后将event传输到channel,接着sink从channel拉取数据写入到目标源中。

二、flume的使用

1、flume部署

flume的程序本身的部署非常简单,(1)部署jdk1.8(2)解压flume的程序压缩包到指定目录,然后添加环境变量即可(3)修改配置文件

cd /opt/modules/apache-flume-1.8.0-bin 将模板配置文件复制重命名为正式配置文件 cp conf/flume-env.sh.template conf/flume-env.sh 添加jdk家目录变量 vim conf/flume-env.sh 加上这句 export JAVA_HOME=/opt/modules/jdk1.8.0_144

这就完成配置了,基本没啥难度。flume的使用重点在于agent的配置文件的编写,根据业务场景不同,配置也不同。简单来说其实就是对source,channel,sink三大组件的工作属性的配置。

2、agent定义流程

agent的配置其实就是对source、channel、sink的配置。主要有5个步骤,下面看看这个流程是怎样的。

# 1、定义的agent名称,指定使用的source sinks channels的名称 # 可以有多个source sinks channels。 .sources = .sinks = .channels = # 2、定义source工作属性。 # 基本格式就是 agent名.sources.source名.参数名=value # 第一个参数都是type,就是指定source类型的 .sources..type=xxxx .sources..=xxxx .sources..=xxxx ......... # 3、设置channel工作属性.格式都是类似的 # 第一个参数都是type,就是指定channel类型的 .channels..type=xxxxx .channels..=xxxxx .channels..=xxxxx ......... # 4、设置sink工作属性 # 第一个参数都是type,就是指定sink类型的 .sinks..type=xxxxx .sinks..=xxxxx .sinks..=xxxxx ............... # 5、设置source以及sink使用的channel,通过channel将两者连接起来 .sources..channels = .sinks..channel =

这就是agent定义的完整流程,source、channel、sink每个都有不同的类型,每个类型定义的参数会有差异。下面看看source、channel、sink中常用的类型(想看完整的全部的类型就看官网吧)

3、常用source的类型

(1)netcat--从tcp端口获取数据

常用属性: type:需指定为 netcat bind:监听的主机名或者ip port:监听的端口 例子:监听在 0.0.0.0:6666端口 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666

(2)exec--执行命令输出作为数据源

常用属性: type:需指定为 exec command:运行的命令 shell:运行名为所需的shell,如 /bin/bash -c 例子:监控文件的新增内容 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sourcesr.r1.shell = /bin/bash -c

(3)spooldir--监控目录内容

常用的属性: type:设置为 spooldir spoolDir:监控的目录路径 fileSuffix:上传完成的文件加上指定的后缀,默认是 .COMPLETED fileHeader:是否在event的header添加一个key标明该文件的绝对路径,默认为false ignorePattern:正则匹配,忽略的文件 还有其他很多参数,具体到官网上看吧 例子: a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

(4)avro--flume之间串联的中间格式

这个源比较特别,通常用在上一个flume的sink 输出,然后作为下一个flume的输入的格式。

常用的属性: type:需指定为 avro bind:监听的主机名或者ip,只能是agent所在主机的ip或者hostname port:监听的端口 例子: a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141

(5)TAILDIR--监控文件或者目录内容变化(1.7以及之后才有)

​ spoolDir有一个bug,就是已经上传完成的文件,不能再追加内容,否则会报错,而且也无法读取到新的文件内容。所以spooldir只能用来监控目录下新文件的变化,没办法监控已有文件的内容变化。以往这种情况只能使用 exec源,然后使用tail -f xxxlog 的方式来监听文件内容变化,但是这种方式有缺陷,就是容易丢失数据。而在flume1.7之后有一个新的source,叫TAILDIR,可以直接监听文件变化的内容。看看用法:

常用属性: type:TAILDIR ,记住,要全部大写 filegroups:要监听的文件组的名字,可以有多个文件组 filegroups.:指定文件组的包含哪些文件,可以使用扩展正则表达式,这里可以有的小技巧 /path/.* 这样就可以监听目录下的所有文件内容的变化 positionFile:这个文件json格式记录了目录下每个文件的inode,以及pos偏移量 fileHeader:是否添加header 属性过多,可以当官网看:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#spooling-directory-source 例子: a1.sources = r1 a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /var/log/flume/taildir_position.json a1.sources.r1.filegroups = f1 f2 有两个文件组 # 文件组1内容 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log a1.sources.r1.headers.f1.headerKey1 = value1 # 使用正则表达式指定文件组 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true a1.sources.ri.maxBatchCount = 1000

下面再说说上面说到的 positionFile 这个东东,看看它的格式:

[{"inode":408241856,"pos":27550,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/flume.log.COMPLETED"}, {"inode":406278032,"pos":0,"file":"/opt/modules/apache-flume-1.8.0-bi n/logs/words.txt.COMPLETED"},{"inode":406278035,"pos":0,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/words.txt"}, {"inode":406278036,"pos":34,"file":"/opt/modules/apache -flume-1.8.0-bin/logs/test.txt"}] 分析: 1、每个文件都是一个json串,由多个json串组成一个类似于数组的东西。 2、每个json包含内容有: inode:这个什么意思就自己具体看看文件系统的基本知识吧 pos:开始监听文件内容的起始偏移量 file:文件绝对路径名 3、小技巧: (1)如果监听目录时,某些文件已存在,那么flume默认是从文件最后作为监听起始点进行监听。当文件内容更新时,flume会获取,然后sink。接着就会更新pos值。所以因为这个特点,就算flume agent突然崩了,下一次启动时,自动从上次崩溃的pos开始监听,而不是从最新的文件末尾开始监听。这样就不会丢失数据了,而且不会重复读取旧数据。 (2)从(1)可知,pos就是实时更新的一个文件内容监听点,如果我们想文件从头开始监听,有时候有需求,需要将监听目录下的文件全部传输一边。这时候很简单,将json文件中的pos改为0就好了。 4、如果没有指定positionFile路径,默认为/USER_HOME/.flume/taildir_position.json

4、常用channel类型

(1)memory--用内存作为暂存空间

常用的属性: type:需指定为 memory capacity:存储在channel中event数量的最大值 transactionCapacity:一次传输的event的最大数量 例子: a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100

(2)file--使用磁盘文件作为暂存空间

常用的属性: type:需指定为 file checkpointDir:存储checkpoint文件的目录 dataDirs:存储数据的目录 例子: a1.channels.c1.type = file a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data

(3)SPILLABLEMEMORY--文件+内存作为暂存空间

这个类型是将内存+文件作为channel,当容量空间超过内存时就写到文件中 常用的属性: type:指定为 SPILLABLEMEMORY memoryCapacity:使用内存存储的event的最大数量 overflowCapacity:存储到文件event的最大数量 byteCapacity:使用内存存储的event的最大容量,单位是 bytes checkpointDir:存储checkpoint文件的目录 dataDirs:存储数据的目录 例子: a1.channels.c1.type = SPILLABLEMEMORY a1.channels.c1.memoryCapacity = 10000 a1.channels.c1.overflowCapacity = 1000000 a1.channels.c1.byteCapacity = 800000 a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data

(4)kafka--作为channel

生产环境中,flume+kafka也是常用的技术栈,但是一般是将kafka作为sink目标

常用属性: type:设置为 org.apache.flume.channel.kafka.KafkaChannel bootstrap.servers:kafka集群的服务器, ip:port,ip2:port,.... topic:kafka中的topic consumer.group.id:消费者的groupid 例子: a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer

5、常用sink类型

(1)logger--直接作为log信息输出

常用属性: type:logger 例子: a1.sinks.k1.type = logger

这个类型比较简单,一般用于调试时使用

(2)avro--串联flume的中间格式

这个类型主要就是用来给下一个flume作为输入的格式,是字节流的方式,而且是序列化的序列。

常用属性: type:avro hostname:输出目标的主机名或者ip,可以任意主机,不局限于本机 ip:输出到的端口 例子: a1.sinks.k1.type = avro a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545

(3)hdfs--直接写入到hdfs

常用属性: type:hdfs hdfs.path:存储路径 , hdfs://namenode:port/PATH hdfs.filePrefix:上传的文件的前缀(额外加上的) hdfs.round:是否按时间滚动文件夹 hdfs.roundValue:滚动的时间值 hdfs.roundUnit:滚动的时间的单位 hdfs.userLocalTimeStamp:是否使用本地时间戳,true还是false hdfs.batchSize:积攒多少个event才flush到hdfs 一次 hdfs.fileType:文件类型,DataStream(普通文件),SequenceFile(二进制格式,默认),CompressedStream(压缩格式) hdfs.rollInterval:多久生成一个新的文件,单位是秒 hdfs.rollSize:文件滚动大小,单位是 bytes hdfs.rollCount:文件滚动是否与event数量有关,true 还是false hdfs.minBlockReplicas:最小副本数 例子: #指定sink的类型为存储在hdfs中 a2.sinks.k2.type = hdfs # 路径命名为按小时 a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = king- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件,单位是秒 a2.sinks.k2.hdfs.rollInterval = 600 #设置每个文件的滚动大小,单位是bytes a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 #最小副本数 a2.sinks.k2.hdfs.minBlockReplicas = 1

(4)file_roll--存储到本地文件系统

常用属性: type:file_roll sink.directory:存储路径 例子: a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /var/log/flum

(5)kafka--存储到kafka集群中

常用属性: tpye:org.apache.flume.sink.kafka.KafkaSink kafka.topic:kafka话题名 kafka.bootstrap.servers:集群服务器列表,以逗号分隔 kafka.flumeBatchSize:刷写到kafka的event数量 kafka.producer.acks:接收到时返回ack信息时,写入的最少的副本数 kafka.producer.compression.type:压缩类型 例子: a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.compression.type = snappy

6、拦截器interceptors 常用类型

拦截器interceptors并不是必须的,它是工作在source和channel之间的一个组件,用于过滤source来的数据,并输出到channel。使用格式:

先指定拦截器的名字,然后对每个拦截器进行工作属性配置 .sources..interceptors = .sources..interceptors.. = xxxx

(1)timestamp时间戳拦截器

在event 的header中添加一个字段,用于标明时间戳如:headers:{timestamp:111111}。

常用属性: type:timestamp headerName:在header中的key名字,默认是 timestamp 例子: a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp

(2)host主机名拦截器

在event 的header中添加一个字段,用于标明host戳,如:headers:{host:bigdata121}。

常用属性: type:host hostHeader:在header中的key名字,默认是 host useIP:用ip还是主机名 例子: a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host

(3)UUID拦截器

在event 的header中添加一个字段,用于标明uuid如:headers:{id:111111}。

常用属性: type:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder headName:在header中的key名字,默认是 id prefix:给每个UUID添加前缀

(4)search_replace查询替换

使用正则匹配,然后替换指定字符

常用属性: type:search_replace searchPattern:匹配的正则 replaceString:替换的字符串 charset:字符集,默认UTF-8 例子:删除特定字符开头的字符串 a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ a1.sources.avroSrc.interceptors.search-replace.replaceString =

(5)regex_filter正则过滤

正则匹配,匹配到的丢弃或者留下

常用属性: type:regex_filter regex:正则 excludeEvents:true为过滤掉匹配的,false为留下匹配的 例子: a1.sources.r1.interceptors.i1.type = regex_filter a1.sources.r1.interceptors.i1.regex = ^A.* #如果excludeEvents设为false,表示过滤掉不是以A开头的events。如果excludeEvents设为true,则表示过滤掉以A开头的events。 a1.sources.r1.interceptors.i1.excludeEvents = true

(6) regex_extractor正则抽取

这里其实是利用正则的分组匹配来获取多个匹配组,然后将每个组的匹配值存储到header中,key可以自定义。

a1.sources.r1.type = exec a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /opt/Andy a1.sources.r1.interceptors = i1 # 指定类型为 regex_extractor a1.sources.r1.interceptors.i1.type = regex_extractor # 分组匹配的正则 a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*) # 两个分组各自的key别名 a1.sources.r1.interceptors.i1.serializers = s1 s2 # 分别设置key的名字 a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid a1.sources.r1.interceptors.i1.serializers.s2.name = ip

(7)自定义拦截器

继承接口 org.apache.flume.interceptor.Interceptor,实现里面的特定方法,如:

import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; public class MyInterceptor implements Interceptor { @Override public void initialize() { } @Override public void close() { } /** * 拦截source发送到通道channel中的消息 * 处理单个event * @param event 接收过滤的event * @return event 根据业务处理后的event */ @Override public Event intercept(Event event) { // 获取事件对象中的字节数据 byte[] arr = event.getBody(); // 将获取的数据转换成大写 event.setBody(new String(arr).toUpperCase().getBytes()); // 返回到消息中 return event; } // 处理event集合 @Override public List intercept(List events) { List list = new ArrayList<>(); for (Event event : events) { list.add(intercept(event)); } return list; } //用来返回拦截器对象 public static class Builder implements Interceptor.Builder { // 获取配置文件的属性 @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } }

pom.xml依赖

org.apache.flume flume-ng-core 1.8.0

在 agent的配置文件中指定拦截器

a1.sources.r1.interceptors = i1 #全类名$Builder a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder

运行命令:

bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume_Andy-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console -C 指定额外的jar包的路径,就是我们自己写的拦截器的jar包

也可以将jar包放到flume程序目录的lib目录下

三、flume案例

1、读取文件到hdfs

# 1.定义agent的名字a2.以及定义这个agent中的source,sink,channel的名字 a2.sources = r2 a2.sinks = k2 a2.channels = c2 #2.定义Source,定义数据来源 # 定义source类型是exec,执行命令的方式 a2.sources.r2.type = exec # 命令 a2.sources.r2.command = tail -F /tmp/access.log # 使用的shell a2.sources.r2.shell = /bin/bash -c #3.定义sink #指定sink的类型为存储在hdfs中 a2.sinks.k2.type = hdfs # 路径命名为按小时 a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = king- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件,单位是秒 a2.sinks.k2.hdfs.rollInterval = 600 #设置每个文件的滚动大小,单位是bytes a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 #最小副本数 a2.sinks.k2.hdfs.minBlockReplicas = 1 # 4.定义Channel,类型、容量限制、传输容量限制 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # 5.链接,通过channel将source和sink连接起来 a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2

启动flume-agent:

/opt/module/flume1.8.0/bin/flume-ng agent \ --conf /opt/module/flume1.8.0/conf/ \ flume配置目录 --name a2 \ agent名字 --conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf agent配置 -Dflume.root.logger=INFO,console 打印日志到终端

2、多flume联合,一对多

flume1:输出到flume2和flume3flume2:输出到本地文件flume3:输出到hdfs

flume1.conf

# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 将数据流复制给多个channel。启动复制模式 a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/test a1.sources.r1.shell = /bin/bash -c # 这是k1 sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = bigdata111 a1.sinks.k1.port = 4141 # 这是k2 sink a1.sinks.k2.type = avro a1.sinks.k2.hostname = bigdata111 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 给source接入连接两个channel.每个channel对应一个sink a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2

flume2.conf

# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = bigdata111 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0 #最小副本数 a2.sinks.k1.hdfs.minBlockReplicas = 1 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1

flume3.conf

# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = bigdata111 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll #备注:此处的文件夹需要先创建好 a3.sinks.k1.sink.directory = /opt/flume3 # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1

启动时,先启动flume2和flume3,最后启动flume1。启动命令不重复了。

3、多flume联合,多对一

多台server产生的日志,需要各自监控,然后汇总起来存储,这种场景很多。flume1(监听文件)和flume2(监听端口)各自收集数据,然后分别sink到flume3,flume3负责汇总写入hdfsflume1.conf

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/Andy a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = bigdata111 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

flume2.conf

# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = bigdata111 a2.sources.r1.port = 44444 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = bigdata111 a2.sinks.k1.port = 4141 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1

flume3.conf

# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = bigdata111 a3.sources.r1.port = 4141 # Describe the sink a3.sinks.k1.type = hdfs a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H #上传文件的前缀 a3.sinks.k1.hdfs.filePrefix = flume3- #是否按照时间滚动文件夹 a3.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是128M a3.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k1.hdfs.rollCount = 0 #最小冗余数 a3.sinks.k1.hdfs.minBlockReplicas = 1 # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1

启动时先启动flume3,然后启动flume1和flume2

$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf $ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf $ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf

测试可以通过 telnet bigdata111 44444 端口来发送数据可以在/opt/Andy文件中追加数据

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java中List分片的5种方法小结
下一篇:三、hbase--调优
相关文章

 发表评论

暂时没有评论,来抢沙发吧~