1.Flume 事务
Flume 应用两个独立的事务别离负责从 soucrce 到 channel,以及从 channel 到 sink 的事件传递。
在 Source 到 Channel 之间的叫 put 事务,在 Channel 到 Sink 之间的叫 Take 事务。
事务两个个性就是:胜利了提交,失败了回滚。
1.1 put 事务
从 source 到 channel 过程中,数据在 flume 中会被封装成 Event 对象,多个 event 被放到一个事务中,
而后把这个蕴含 events 的事务放到 channel 中。
- 1. 事务开始的时候会调用一个 doPut 办法,doPut 办法的会将这批数据 batch data,也就是一批 event 放到 putList 中。
doPut 传递的数据的大小能够通过参数 bathchSize 配置。
putList 的大小则通过 channel 的参数 transactionCapacity 进行配置。
-
2 当数据胜利寄存到 putList 之后,调用 doCommit()办法,putList 中所有的 event 进入 channel()中,
- 1)胜利则清空 putList.
-
2) 不胜利的状况
- 从 putList 传输到 channel 过程出问题,在 doCommit 提交之后,事务在向 channel 放的过程中,遇到问题。
sink 那边取数据速度要比 Source 这边放数据速度慢,导致 channel 中的数据积压,这个时候就会造成 putList 中的数据放不进去。
这时会进行事务的回滚操作,调用 doRollback 办法,doRollback 办法会做两个事件:- 1、清空 putList 中的数据;- 2、抛出 channelException 异样。
当 source 捕捉到 doRollback 抛出的异样,就会把方才的一批数据从新采集一下,采集完之后从新走事务的流程。
- 在数据采集的过程中也有可能呈现问题,同样是调用 doRollback 办法来对事务进行回滚。
1.2 take 事务
- 1. 事务开始时,调用 doTake 办法, 将 channel 中的 event 提取到(剪切)takeList 中,
- 2. 如果前面的 sink 是 HDFS Sink,同时在写入 HDFS 的 IO 缓冲流中放一份 event。
-
3. 当 takeList 中寄存的 Event 达到约定数量(batchSize),就会调用 doCommit 办法:
-
胜利执行状况下:
- 如果是 HDFS Sink,那么手动调用 IO 流的 flush 办法,将 IO 流缓冲区的数据写入到 HDFS 磁盘中,同时清空 takeList 中的数据
-
失败状况下:
- 1. 网络提早等起因导致传输数据失败,
调用 doRollback 办法来进行回滚,takeList 中还有备份数据,所以将 takeList 中的数据一成不变地还给 channel,这时候就实现了事务的回滚。
- 2. 如果 takeList 数据有一部分传输胜利了,剩下的因为网络提早传输失败了。
同样会调用 doRollback 办法来进行回滚,它会把整个 takeList 中的数据返回给 channel,而后持续进行数据的读写。
如此一来,再次进行事务时候,就会存在数据反复的可能。
-
2.Flume 外部原理
- 1). Source 采集数据
EventBuilder.withBody(body)将数据封装成 Event 对象,
getChannelProcessor().processEvent(event)将数据交给 Channel Processor
通过源码能够看到,以 avro source 为例
public Void append(AvroFlumeOGEvent evt) throws AvroRemoteException
{
.....
Event event = EventBuilder.withBody(evt.getBody().array(), headers); // 将数据封装成 Event 对象,try {getChannelProcessor().processEvent(event); // 将数据交给 Channel Processor
this.counterGroup.incrementAndGet("rpc.events");
} catch (ChannelException ex) {return null;}
this.counterGroup.incrementAndGet("rpc.successful");
return null;
}
- 2)Channel Processor 将 Event 事件传递给拦截器链 interceptorChain.intercept(event),而后将数据返回给 Channel Processor。
- 3)Channel Processor 将拦挡过滤之后的 Event 事件传递给 Channel 选择器(Channel Selector)),Channel Selector 返回给 Channel Processor 写入 event 事件的 Channel 列表
其中 Channel Selectors 有两种类型:
– 1.Replicating Channel Selector : 将 source 过去的 events 发往所有的 channel(相当于复制多份,默认应用的 channel selector)
– 2.Multiplexing Channel Selector:能够指定 source 发过来的 events 发往的 channel
- 4)Channel Processor 依据 Channel 选择器的抉择后果,将 Event 事件写入相应的 Channel
看下 channel Processor 源码
首先结构器中间接定义了 selector 和拦截器 interceptorChain
public ChannelProcessor(ChannelSelector selector)
{
this.selector = selector;
this.interceptorChain = new InterceptorChain();}
而后在 processEvent 和 processEventBatch(List<Event> events)
public void processEvent(Event event)
{event = this.interceptorChain.intercept(event); // 提交到拦截器链
if (event == null) {return;}
List requiredChannels = this.selector.getRequiredChannels(event); // 提交到 channel 选择器
for (Iterator localIterator = requiredChannels.iterator(); localIterator.hasNext();) {reqChannel = (Channel)localIterator.next();
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {tx.begin();
reqChannel.put(event);
tx.commit();} catch (Throwable t) {tx.rollback();
if ((t instanceof Error)) {LOG.error("Error while writing to required channel:" + reqChannel, t);
throw ((Error)t);
}if ((t instanceof ChannelException)) {throw ((ChannelException)t);
}
throw new ChannelException("Unable to put event on required channel:" + reqChannel, t);
}
finally
{if (tx != null)
tx.close();}
}
Channel reqChannel;
List optionalChannels = this.selector.getOptionalChannels(event);
for (Channel optChannel : optionalChannels) {
Transaction tx = null;
try {tx = optChannel.getTransaction();
tx.begin();
optChannel.put(event); // 将 event 事件写入 channel
tx.commit();} catch (Throwable t) {tx.rollback();
LOG.error("Unable to put event on optional channel:" + optChannel, t);
if ((t instanceof Error))
throw ((Error)t);
}
finally {if (tx != null)
tx.close();}
}
}
}
。
-
5)SinkProcessor 启动 sink,sink 在 channel 中去轮询,取出 channel 中的 event 事件。
SinkProcessor 有三种,- DefaultSinkProcessor(默认的,外部无任何逻辑,只是单纯的调用 sink)、
- LoadBalancingSinkProcessor(负载平衡)、
- FaioverSinkProcessor(容灾复原)