Flume高级特性
拦截器
Flume支持在运行时对event进行修改或丢弃,通过拦截器来实现;
Flume里面的拦截器是实现了org.apache.flume.interceptor.Interceptor 接口的类;
拦截器可以根据配置 修改 甚至 丢弃 event;
Flume也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了;
拦截器的顺序取决于它们配置的顺序,Event 按照顺序经过每一个拦截器;
时间添加戳拦截器
这个拦截器会向每个event的header中添加一个时间戳属性进去,key默认是“ timestamp ”(也可以通过下面表格中的header来自定义),value就是当前的毫秒值(其实就是用System.currentTimeMillis()方法得到的)。如果event已经存在同名的属性,可以选择是否保留原始的值。
属性 | 默认值 | 解释 |
---|---|---|
type | - | timestamp |
header | timestamp | 向event header中添加时间戳键值对的key |
preserveExisting | false | 是否保留event header中已经存在的同名(上面header设置的key,默认是timestamp)时间戳 |
修改监听本机端口数据实时显示输出案例的 flume-netcat-logger.conf 文件,再次运行案例,观察 event header信息
1 | -- 进入Flume安装目录的conf目录修改flume-netcat-logger.conf文件 |
1 | flume-ng agent --name a1 --conf-file $FLUME_HOME/conf/flume-netcat-logger.conf -Dflume.root.logger=INFO,console |
Host添加拦截器
这个拦截器会把当前Agent的 hostname 或者 IP 地址写入到Event的header中,key默认是“host”(也可以通过配置自定义key),value可以选择使用hostname或者IP地址
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是:host |
preserveExisting | false | 如果header中已经存在同名的属性是否保留 |
useIP | true | true:使用IP地址;false:使用hostname |
hostHeader | host | 向Event header中添加host键值对的key |
1 | -- 进入Flume安装目录的conf目录新增hostname.conf文件 |
1 | flume-ng agent --name a1 --conf-file $FLUME_HOME/conf/hostname.conf -Dflume.root.logger=INFO,console |
正则表达式过滤拦截器
这个拦截器会把Event的body当做字符串来处理,并用配置的正则表达式来匹配。可以配置指定被匹配到的Event丢弃还是没被匹配到的Event丢弃。
选择器
source可以向多个channel同时写数据,所以也就产生了以何种方式向多个channel写的问题;
-
replication(复制,缺省)。数据完整地发送到每一个channel;
-
multiplexing(多路复用)。通过配置来按照一定的规则进行分发;
复制选择器
默认的选择器。
属性 | 默认值 | 解释 |
---|---|---|
selector.type | replicating | replicating |
selector.optional | – | 指定哪些channel是可选的,多个用空格分开 |
1 | a1.sources = r1 |
上面这个例子中,c3配置成了可选的。向c3发送数据如果失败了会被忽略。c1和c2没有配置成可选的,向c1和c2写数据失败会导致事务失败回滚。
多路复用选择器
selector.type | replicating | 组件类型,这个是: multiplexing |
---|---|---|
selector.header | flume.selector.header | 想要进行匹配的header属性的名字 |
selector.default | – | 指定一个默认的channel。如果没有被规则匹配到,默认会发到这个channel上 |
selector.mapping.* | – | 一些匹配规则,具体参考下面的例子 |
1 | a1.sources = r1 |
自定义选择器
自定义选择器就是开发一个 org.apache.flume.ChannelSelector 接口的实现类。实现类以及依赖的jar包在启动时候都必须放入Flume的classpath。
属性 | 默认值 | 解释 |
---|---|---|
selector.type | – | 你写的自定义选择器的全限定类名,比如:org.liyifeng.flume.channel.MyChannelSelector |
1 | a1.sources = r1 |
Sink组逻辑处理器
可以把多个sink分成一个组, Sink组逻辑处理器可以对这同一个组里的几个sink进行负载均衡 或者 其中一个sink发生故障后将输出Event的任务转移到其他的sink上。
N个sink将Event输出到对应的N个目的地的,通过 Sink组逻辑处理器 可以把这N个sink配置成负载均衡或者故障转移的工作方式:
-
负载均衡是将channel里面的Event,按照配置的负载机制(比如轮询)分别发送到sink各自对应的目的地
-
故障转移是这N个sink同一时间只有一个在工作,其余的作为备用,工作的sink挂掉之后备用的sink顶上
属性 | 默认值 | 解释 |
---|---|---|
sinks | – | 这一组的所有sink名,多个用空格分开 |
processor.type | default | 这个sink组的逻辑处理器类型,可选值 default (默认一对一的) 、 failover (故障转移) 、 load_balance (负载均衡) |
1 | a1.sinkgroups = g1 |
默认
默认的组逻辑处理器就是只有一个sink的情况,这种情况就没必要配置sink组了。前面的例子都是 source - channel - sink这种一对一,单个sink的。
故障转移
故障转移组逻辑处理器维护了一个发送Event失败的sink的列表,保证有一个sink是可用的来发送Event。
故障转移机制的工作原理是将故障sink降级到一个池中,在池中为它们分配冷却期(超时时间),在重试之前随顺序故障而增加。 Sink成功发送事件后,它将恢复到实时池。sink具有与之相关的优先级,数值越大,优先级越高。 如果在发送Event时Sink发生故障,会继续尝试下一个具有最高优先级的sink。 例如,在优先级为80的sink之前激活优先级为100的sink。如果未指定优先级,则根据配置中的顺序来选取。
要使用故障转移选择器,不仅要设置sink组的选择器为failover,还有为每一个sink设置一个唯一的优先级数值。 可以使用 maxpenalty 属性设置故障转移时间的上限(毫秒)。
属性 | 默认值 | 解释 |
---|---|---|
sinks | – | 这一组的所有sink名,多个用空格分开 |
processor.type | default | 组件类型,这个是: failover |
processor.priority. |
– | 组内sink的权重值, |
processor.maxpenalty | 30000 | 发生异常的sink最大故障转移时间(毫秒) |
1 | a1.sinkgroups = g1 |
负载均衡
负载均衡Sink 选择器提供了在多个sink上进行负载均衡流量的功能。 它维护一个活动sink列表的索引来实现负载的分配。 支持轮询( round_robin )【默认值】和随机( random )两种选择机制分配负载。
工作时,此选择器使用其配置的选择机制选择下一个sink并调用它。 如果所选sink无法正常工作,则处理器通过其配置的选择机制选择下一个可用sink。 此实现不会将失败的Sink列入黑名单,而是继续乐观地尝试每个可用的Sink。
如果所有sink调用都失败了,选择器会将故障抛给sink的运行器。
如果 backoff 设置为true则启用了退避机制,失败的sink会被放入黑名单,达到一定的超时时间后会自动从黑名单移除。 如从黑名单出来后sink仍然失败,则再次进入黑名单而且超时时间会翻倍,以避免在无响应的sink上浪费过长时间。 如果没有启用退避机制,在禁用此功能的情况下,发生sink传输失败后,会将本次负载传给下一个sink继续尝试,因此这种情况下是不均衡的。
属性 | 默认值 | 解释 |
---|---|---|
processor.sinks | – | 这一组的所有sink名,多个用空格分开 |
processor.type | default | 组件类型,这个是: load_balance |
processor.backoff | false | 失败的sink是否成倍地增加退避它的时间。 如果设置为false,负载均衡在某一个sink发生异常后,下一次选择sink的时候仍然会将失败的这个sink加入候选队列; 如果设置为true,某个sink连续发生异常时会成倍地增加它的退避时间,在退避的时间内是无法参与负载均衡竞争的。退避机制只统计1个小时发生的异常,超过1个小时没有发生异常就会重新计算 |
processor.selector | round_robin | 负载均衡机制,可选值:round_robin (轮询)、 random (随机选择)、「自定义选择器的全限定类名」:自定义的负载器要继承 AbstractSinkSelector |
processor.selector.maxTimeOut | 30000 | 发生异常的sink最长退避时间(毫秒) 如果设置了processor.backoff=true,某一个sink发生异常的时候就会触发自动退避它一段时间,这个 maxTimeOut 就是退避一个sink的最长时间 |
1 | a1.sinkgroups = g1 |
事务机制与可靠性
一提到事务,首先就想到的是关系型数据库中的事务,事务一个典型的特征就是将一批操作做成原子性的,要么都成功,要么都失败。
在Flume中一共有两个事务:
-
Put事务。在Source到Channel之间
-
Take事务。Channel到Sink之间
从 Source 到 Channel 过程中,数据在 Flume 中会被封装成 Event 对象,也就是一批 Event ,把这批 Event 放到一个事务中,把这个事务也就是这批event一次性的放入Channel 中。同理,Take事务的时候,也是把这一批event组成的事务统一拿出来到sink放到HDFS上。
Put 事务
事务开始的时候会调用一个 doPut 方法, doPut 方法将一批数据放在putList中;
putList在向 Channel 发送数据之前先检查 Channel 的容量能否放得下,如果放不下一个都不放,只能doRollback;
数据批的大小取决于配置参数 batch size 的值;
putList的大小取决于配置 Channel 的参数 transaction capacity 的大小,该参数大小就体现在putList上;(Channel的另一个参数 capacity 指的是 Channel 的容量);
数据顺利的放到putList之后,接下来可以调用 doCommit 方法,把putList中所有的 Event 放到 Channel 中,成功放完之后就清空putList;
在doCommit提交之后,事务在向 Channel 存放数据的过程中,事务容易出问题。如 Sink取数据慢,而 Source 放数据速度快,容易造成 Channel 中数据的积压,如果 putList 中的数据放不进去,会如何呢?
此时会调用 doRollback 方法,doRollback方法会进行两项操作:将putList清空;抛出ChannelException异常。source会捕捉到doRollback抛出的异常,然后source就将刚才的一批数据重新采集,然后重新开始一个新的事务,这就是事务的回滚。
Take 事务
Take事务同样也有takeList,HDFS sink配置有一个 batch size,这个参数决定 Sink从 Channel 取数据的时候一次取多少个,所以该 batch size 得小于 takeList 的大小,而takeList的大小取决于 transaction capacity 的大小,同样是channel 中的参数。
doTake方法会将channel中的event剪切到takeList中。如果后面接的是HDFS Sink的话,在把Channel中的event剪切到takeList中的同时也往写入HDFS的IO缓冲流中放一份event(数据写入HDFS是先写入IO缓冲流然后flush到HDFS);
当takeList中存放了batch size 数量的event之后,就会调用doCommit方法,doCommit方法会做两个操作:针对HDFS Sink,手动调用IO流的flush方法,将IO流缓冲区的数据写入到HDFS磁盘中;
清空takeList中的数据
flush到HDFS的时候组容易出问题。flush到HDFS的时候,可能由于网络原因超时导致数据传输失败,这个时候调用doRollback方法来进行回滚,回滚的时候由于takeList 中还有备份数据,所以将takeList中的数据原封不动地还给channel,这时候就完成了事务的回滚
但是,如果flush到HDFS的时候,数据flush了一半之后出问题了,这意味着已经有一半的数据已经发送到HDFS上面了,现在出了问题,同样需要调用doRollback方法来进行回滚,回滚并没有“一半”之说,它只会把整个takeList中的数据返回给channel,然后继续进行数据的读写。这样开启下一个事务的时候容易造成数据重复的问题。
Flume在数据进行采集传输的时候,有可能会造成数据的重复,但不会丢失数据。
Flume在数据传输的过程中是否可靠,还需要考虑具体使用Source、Channel、Sink的类型。
分析
- 分析Source
exec Source ,后面接 tail -f ,这个数据也是有可能丢的
TailDir Source ,这个是不会丢数据的,它可以保证数据不丢失
- 分析sink
Hdfs Sink,数据有可能重复,但是不会丢失
-
最后,分析channel。理论上说:要想数据不丢失的话,还是要用 File channel;memory channel 在 Flume 挂掉的时候是有可能造成数据的丢失的。
-
如果使用 TailDir source 和 HDFS sink,所以数据会重复但是不会丢失
高可用案例
需求: 实现Agent的故障转移
1. 配置环境
在Linux121、Linux122上部署Flume、修改环境变量
1 | # 在Linux123上执行 |
2. 编写conf文件
1 | # Linux123编写flume-taildir-avro2.conf |
1 | # 在Linux121编写 flume-avro-hdfs.conf |
1 | # 在Linux122编写flume-avro-hdfs.conf |
3. 分别在Linux121、Linux122、Linux123上启动对应服务(先启动下游的agent)
1 | # Linux121 |
4. 先hive.log中写入数据,检查HDFS目录
1 | # Linux123 |
5. 杀掉一个Agent,看看另外Agent是否能启动
杀掉 Linux121的进程, Linux122输出