Flume 支持的数据源种类有很多,可以来自directory、http、kafka等。Flume提供了Source组件用来采集数据源。
常见的 Source 有:
- avro source:监听 Avro 端口来接收外部 avro 客户端的事件流。
avro-source接收到的是经过avro序列化后的数据,然后反序列化数据继续传输。如果是avrosource的话,源数据必须是经过avro序列化后的数据。利用 Avro source可以实现多级流动、扇出流、扇入流等效果。接收通过flume提供的avro客户端发送的日志信息。
Avro是Hadoop的一个数据序列化系统,由Hadoop的创始人Doug Cutting(也是 Lucene,Nutch等项目的创始人)开发,设计用于支持大批量数据交换的应用。
它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro提供的机制使动态语言可以方便地处理Avro数据;
-
exec source:可以将命令产生的输出作为source。如ping 192.168.234.163、tail -f hive.log。
-
netcat source:一个NetCat Source用来监听一个指定端口,并接收监听到的数据。
-
spooling directory source:将指定的文件加入到“自动搜集”目录中。flume会持续监听这个目录,把文件当做source来处理。注意:一旦文件被放到目录中后,便不能修改,如果修改,flume会报错。此外,也不能有重名的文件。
-
Taildir Source(Flume 1.7):监控指定的多个文件,一旦文件内有新写入的数据,就会将其写入到指定的sink内,本来源可靠性高,不会丢失数据。其不会对于跟踪的文件有任何处理,不会重命名也不会删除,不会做任何修改。目前不支持Windows系统,不支持读取二进制文件,支持一行一行的读取文本文件。
采集到的日志需要进行缓存,Flume提供了Channel组件用来缓存数据。
常见的Channel 有:
-
memory channel:缓存到内存中(最常用)
-
file channel:缓存到文件中
-
JDBC channel:通过JDBC缓存到关系型数据库中
-
kafka channel:缓存到kafka中
缓存的数据最终需要进行保存,Flume提供了Sink组件用来保存数据。
常见的 Sink 有:
-
logger sink:将信息显示在标准输出上,主要用于测试
-
avro sink:Flume events发送到sink,转换为Avro events,并发送到配置好的hostname/port。从配置好的channel按照配置好的批量大小批量获取events
-
null sink:将接收到events全部丢弃
-
HDFS sink:将 events 写进HDFS。支持创建文本和序列文件,支持两种文件类型压缩。文件可以基于数据的经过时间、大小、事件的数量周期性地滚动
-
Hive sink:该sink streams 将包含分割文本或者JSON数据的events直接传送到Hive表或分区中。使用Hive 事务写events。当一系列events提交到Hive时,它们马上可以被Hive查询到
-
HBase sink:保存到HBase中
-
kafka sink:保存到kafka中
日志采集就是根据业务需求选择合适的Source、Channel、Sink,并将其组合在一起。
案例
监听本机端口数据实时显示输出
业务需求:监听本机 8888 端口,Flume将监听的数据实时显示在控制台
需求分析:
环境为刚才已装Flume的Linux123
1. 安装 telnet 工具
2. 检查 8888 端口是否被占用
如果该端口被占用,可以选择使用其他端口完成任务
3. 创建 Flume Agent 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| -- 进入Flume安装目录的conf目录下创建flume-netcat-logger.conf文件,并添加配置 cd /opt/lagou/servers/flume-1.9.0/conf vi flume-netcat-logger.conf
# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1 a1.sources = r1 a1.channels = c1 a1.sinks = k1
# source a1.sources.r1.type = netcat a1.sources.r1.bind = Linux123 a1.sources.r1.port = 8888
# channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100
# sink a1.sinks.k1.type = logger
# source、channel、sink之间的关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
|
Memory Channel 是使用内存缓冲Event的Channel实现。速度比较快速,容量会受到 jvm 内存大小的限制,可靠性不够高。适用于允许丢失数据,但对性能要求较高的日志采集业务。
4. 启动Flume Agent,进入日志监听页面
1
| flume-ng agent --name a1 --conf-file $FLUME_HOME/conf/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
|
5. 再打开一个Linux123的窗口,使用 telnet 向本机的 8888 端口发送消息
1 2 3
| telnet Linux123 8888
-- 随便输入字符,enter回车发送
|
6. 在 Flume 日志监听页面查看数据接收情况
监控日志文件信息到HDFS
业务需求:监控本地日志文件,收集内容实时上传到HDFS
需求分析:
-
使用 tail -F 命令即可找到本地日志文件产生的信息
-
source 选择 exec。exec 监听一个指定的命令,获取命令的结果作为数据源。
-
source组件从这个命令的结果中取数据。当agent进程挂掉重启后,可能存在数据丢失
-
channel 选择 memory
-
sink 选择 HDFS
1 2 3
| tail -f 等同于--follow=descriptor,根据文件描述符进行追踪,当文件改名或被删除,追踪停止
tail -F 等同于--follow=name --retry,根据文件名进行追踪,并保持重试,即该文件被 删除或改名后,如果再次创建相同的文件名,会继续追踪
|
环境为刚才已装Flume的Linux123
1. 环境准备。
Flume要想将数据输出到HDFS,必须持有Hadoop相关jar包。
将commons-configuration-1.6.jar hadoop-auth-2.9.2.jar hadoop-common-2.9.2.jar hadoop-hdfs-2.9.2.jar commons-io-2.4.jar htrace-core4-4.1.0-incubating.jar拷贝到 $FLUME_HOME/lib 文件夹下
1 2 3 4 5 6 7 8
| cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib
cp commons-configuration-1.6.jar $FLUME_HOME/lib/ cp hadoop-auth-2.9.2.jar $FLUME_HOME/lib/ cp hadoop-common-2.9.2.jar $FLUME_HOME/lib/ cp hadoop-hdfs-2.9.2.jar $FLUME_HOME/lib/ cp commons-io-2.4.jar $FLUME_HOME/lib/ cp htrace-core4-4.1.0-incubating.jar $FLUME_HOME/lib/
|
2. 创建配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| -- 进入Flume安装目录的conf目录下创建flume-exec-hdfs.conf文件,并添加配置 cd /opt/lagou/servers/flume-1.9.0/conf vi flume-exec-hdfs.conf
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2
# Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /tmp/root/hive.log
# Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 10000 a2.channels.c2.transactionCapacity = 500
# Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://Linux121:9000/flume/%Y%m%d/%H%M # 上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- # 是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true # 积攒500个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 500
# 设置文件类型,支持压缩。DataStream没启用压缩 a2.sinks.k2.hdfs.fileType = DataStream # 1分钟滚动一次 a2.sinks.k2.hdfs.rollInterval = 60 # 128M滚动一次 a2.sinks.k2.hdfs.rollSize = 134217700 # 文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 # 最小冗余数 a2.sinks.k2.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
|
3. 启动Agent
1
| flume-ng agent --name a2 --conf-file $FLUME_HOME/conf/flume-exec-hdfs.conf -Dflume.root.logger=INFO,console
|
4. 启动Hadoop和Hive,操作Hive产生日志
1 2 3 4 5
| start-dfs.sh start-yarn.sh
# 在命令行多次执行 hive -e "show database"
|
5. 在HDFS上查看文件
监控目录采集信息到HDFS
业务需求:监控指定目录,收集信息实时上传到HDFS
需求分析:
spooldir Source监听一个指定的目录,即只要向指定目录添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,写入到channel。sink处理完之后,标记该文件已完成处理,文件名添加 .completed 后缀。虽然是自动监控整个目录,但是只能监控文件,如果以追加的方式向已被处理的文件中添加内容,source并不能识别。
需要注意的是:
环境为刚才已装Flume的Linux123
1. 创建配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| -- 进入Flume安装目录的conf目录下创建flume-spooldir-hdfs.conf文件,并添加配置 cd /opt/lagou/servers/flume-1.9.0/conf vim flume-spooldir-hdfs.conf
-- 创建监控目录 mkdir /root/upload
# Name the components on this agent a3.sources = r3 a3.channels = c3 a3.sinks = k3
# Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /root/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true
# 忽略以.tmp结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 10000 a3.channels.c3.transactionCapacity = 500
# Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://Linux121:9000/flume/upload/%Y%m%d/%H%M # 上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- # 是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true # 积攒500个Event,flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 500 # 设置文件类型 a3.sinks.k3.hdfs.fileType = DataStream # 60秒滚动一次 a3.sinks.k3.hdfs.rollInterval = 60 # 128M滚动一次 a3.sinks.k3.hdfs.rollSize = 134217700 # 文件滚动与event数量无关 a3.sinks.k3.hdfs.rollCount = 0 # 最小冗余数 a3.sinks.k3.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
|
2. 启动Agent
1
| flume-ng agent --name a3 --conf-file $FLUME_HOME/conf/flume-spooldir-hdfs.conf -Dflume.root.logger=INFO,console
|
3. 向upload文件夹中添加文件
1 2 3
| cp nohup.out /root/upload/a.log cp nohup.out /root/upload/b.log cp nohup.out /root/upload/c.log
|
4. 查看HDFS上的数据
一般使用 HDFS Sink 都会采用滚动生成文件的方式,滚动生成文件的策略有:
HDFS Sink 其他重要配置:
如果要避免HDFS Sink产生小文件,参考如下参数设置:
1 2 3 4 5 6 7 8 9
| a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sinks.k1.hdfs.path=hdfs://Linux121:9000/flume/events/%Y/%m/% d/%H/%M
a1.sinks.k1.hdfs.minBlockReplicas=1 a1.sinks.k1.hdfs.rollInterval=3600 a1.sinks.k1.hdfs.rollSize=0 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=0
|
监控日志文件采集数据到HDFS、本地文件系统
业务需求:监控日志文件,收集信息上传到HDFS 和 本地文件系统
需求分析:
taildir Source。Flume 1.7.0加入的新Source,相当于 spooldir source + exec source。可以监控多个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有数据丢失的问题。
目前不适用于Windows系统;其不会对于跟踪的文件有任何处理,不会重命名也不会删除,不会做任何修改。不支持读取二进制文件,支持一行一行的读取文本文件。
环境为刚才已装Flume的Linux123
1. 创建第一个配置文件
flume-taildir-avro.conf 配置文件包括:1个 taildir source,2个 memory channel,2个 avro sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| -- 进入Flume安装目录的conf目录下创建flume-taildir-avro.conf文件,并添加配置 cd /opt/lagou/servers/flume-1.9.0/conf vim flume-taildir-avro.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2
# 将数据流复制给所有channel a1.sources.r1.selector.type = replicating
# source a1.sources.r1.type = taildir # 被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔 a1.sources.r1.filegroups = f1 # 被监控文件夹的绝对路径 备注:.*log 是正则表达式;这里写成 *.log 是错误的 a1.sources.r1.filegroups.f1 = /tmp/root/.*log # 记录每个文件最新消费位置 a1.sources.r1.positionFile = /root/flume/taildir_position.json
# sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = Linux123 a1.sinks.k1.port = 9091
a1.sinks.k2.type = avro a1.sinks.k2.hostname = Linux123 a1.sinks.k2.port = 9092
# channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 500
a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
|
2. 创建第二个配置文件
flume-avro-hdfs.conf配置文件包括:1个 avro source,1个 memory channel,1个 hdfs sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| -- 进入Flume安装目录的conf目录下创建flume-avro-hdfs.conf文件,并添加配置 cd /opt/lagou/servers/flume-1.9.0/conf vim flume-avro-hdfs.conf
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1
# Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = Linux123 a2.sources.r1.port = 9091
# Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 500
# Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://Linux121:9000/flume2/%Y%m%d/%H # 上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = flume2- # 是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true # 500个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 500 # 设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream # 60秒生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 60 a2.sinks.k1.hdfs.rollSize = 0 a2.sinks.k1.hdfs.rollCount = 0 a2.sinks.k1.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
|
3. 创建第三个配置文件
flume-avro-file.conf配置文件包括:1个 avro source,1个 memory channel,1个 file_roll sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| -- 进入Flume安装目录的conf目录下创建flume-avro-file.conf文件,并添加配置 cd /opt/lagou/servers/flume-1.9.0/conf vim flume-avro-file.conf
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2
# Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = Linux123 a3.sources.r1.port = 9092
# Describe the sink a3.sinks.k1.type = file_roll # 目录需要提前创建好 a3.sinks.k1.sink.directory = /root/flume/output
# Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 10000 a3.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
|
4. 分别启动3个Agent
1 2 3 4 5 6 7 8
| # 创建所需文件目录 mkdir -p /root/flume/output/
flume-ng agent --name a3 --conf-file $FLUME_HOME/conf/flume-avro-file.conf -Dflume.root.logger=INFO,console &
flume-ng agent --name a2 --conf-file $FLUME_HOME/conf/flume-avro-hdfs.conf -Dflume.root.logger=INFO,console &
flume-ng agent --name a1 --conf-file $FLUME_HOME/conf/flume-taildir-avro.conf -Dflume.root.logger=INFO,console &
|
5. 执行hive命令产生日志
1
| hive -e "show databases"
|
6. 分别检查HDFS文件、本地文件、以及消费位置文件