Flume安装与配置及Flume应用典型实例

大数据技术 潘老师 3年前 (2021-05-07) 1799 ℃ (0) 扫码查看

一、Flume安装

1)下载
官网下载地址:http://flume.apache.org/download.html
这里我们下载1.9版本的:
Flume安装与配置及Flume应用典型实例
2)上传至/usr/flume目录,然后执行如下指令解压:

tar -zxvf apache-flume-1.9.0-bin.tar.gz

3)配置Flume的环境变量,在/etc/profile中新增如下:

#配置新增
export FLUME_HOME=/usr/flume/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
#执行生效指令
source /etc/profile

到这里,Flume就安装完成了。

二、Flume典型应用

1、本地数据读取

首先我们来看一个从本地目录下搜集数据的实例。具体操作步骤如下:
1)在flume目录下新建名为flume1.conf的配置文件,内容如下:

#指定sources、sinks、channels
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#配置Source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/usr/flume/source
#配置Sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/usr/flume/sink
#将Channel类型设置为memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#把Source和Sink绑到Channel上
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

此配置定义了一个名为a1的代理。a1具有侦听端口/usr/flume/source目录中的数据的源,在内存中缓冲事件数据的通道以及将事件数据记录到/usr/flume/sink目录。配置文件为各个组件命名,然后描述它们的类型和配置参数。给定的配置文件可能会定义几个命名的代理。当启动给定的Flume进程时,会传递一个标志,告诉它要显示哪个命名的代理。
2)创建相应的文件夹
/usr/flume/下分别创建source(作为数据源)和sink(作为输出目录)文件夹,

mkdir source
mkdir sink

3)启动Flume代理
使用flume-ng命令启动Flume代理,同时在console控制台打印日志,命令如下:

flume-ng agent --conf conf --conf-file /usr/flume/flume1.conf --name a1 -Dflume.root.logger=INFO,console

4)测试数据
/usr/flume/source文件夹下导入一个test.txt文本文件Flume客户端会有如下显示:
Flume安装与配置及Flume应用典型实例
一旦Flume成功启动并完成日志收集,再查看source文件夹的test.txt文件时,则变成了test.txt.COMPLETED,sink文件夹中会收集到文件。
Flume安装与配置及Flume应用典型实例

2、收集至HDFS

1)在前一个案例基础上,我们参考flume1.conf配置,新建flume2.conf配置文件如下:

#指定sources、sinks、channels
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#配置Source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/usr/flume/source
#配置Sink
a1.sinks.k1.type=hdfs
#配置收集后的文件,放在HDFS的哪个位置
a1.sinks.k1.hdfs.path=hdfs://master:9820/flume/data
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240000
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=3
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#将Channel类型设置为memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#把Source和Sink绑到Channel上
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2)启动hadoop

start-all.sh

3)启动Flume代理

flume-ng agent --conf conf --conf-file /usr/flume/flume2.conf --name a1 -Dflume.root.logger=INFO,console

4)在source目录下新建test2.txt文件,这时我们可能或发现如下报错:

2021-05-06 17:57:19,198 ERROR hdfs.HDFSEventSink: process failed
java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
    at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679)
    at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221)
    at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:745)

这是由于flume内依赖的guava.jar和hadoop内的版本不一致会造成,我们需要查看hadoop安装目录下share/hadoop/common/libguava.jar版本,再查看flume安装目录下libguava.jar的版本,如果两者不一致,删除版本低的,并拷贝高版本过去。此处我们发现flume中版本为guava-11.0.2.jar,hadoop中版本为guava-27.0-jre.jar,所以,此处我把flume中的删除,把hadoop中的复制过去

#删除
rm /usr/flume/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar
#复制
cp /usr/hadoop/hadoop-3.2.1/share/hadoop/common/lib/guava-27.0-jre.jar /usr/flume/apache-flume-1.9.0-bin/lib/

5)重启flume,发现正常采集到hdfs
Flume安装与配置及Flume应用典型实例

3、基于日期分区的数据收集

有些时候,由于收集的文件很大,或者因业务需求,需要将收集到的数据按照日期分开存储。本案例基于上面的HDFS收集,再根据年、月、日、时分分别生成文件,具体操作步骤如下:
1)在前一个案例基础上,我们参考flume2.conf配置,新建flume3.conf配置文件如下:

#指定sources、sinks、channels
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#配置Source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/usr/flume/source
#配置Sink
a1.sinks.k1.type=hdfs
#配置收集后的文件,放在HDFS的哪个位置,并按日期分别存储
a1.sinks.k1.hdfs.path=hdfs://master:9820/flume/data/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240000
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=3
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#将Channel类型设置为memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#把Source和Sink绑到Channel上
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2)启动hadoop和flume代理:

flume-ng agent --conf conf --conf-file /usr/flume/flume3.conf --name a1 -Dflume.root.logger=INFO,console

3)在source添加test3.txt文件,发现正常采集到hdfs中并按日期分别存储
Flume安装与配置及Flume应用典型实例


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/java/bigdata/3158.html
喜欢 (3)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】