1.Flume 事务

Flume应用两个独立的事务别离负责从soucrce到channel,以及从channel到sink的事件传递。
在Source到Channel之间的叫put事务,在Channel到Sink之间的叫Take事务。
事务两个个性就是:胜利了提交,失败了回滚。

1.1 put事务

从source到channel过程中,数据在flume中会被封装成Event对象,多个event被放到一个事务中,
而后把这个蕴含events的事务放到channel中。

  • 1.事务开始的时候会调用一个doPut办法,doPut办法的会将这批数据batch data,也就是一批event放到putList中。

doPut传递的数据的大小能够通过参数bathchSize配置。
putList的大小则通过channel的参数transactionCapacity进行配置。

  • 2 当数据胜利寄存到putList之后,调用doCommit()办法,putList中所有的event进入channel()中,

    • 1)胜利则清空putList.
    • 2) 不胜利的状况

      • 从putList传输到channel过程出问题,在doCommit提交之后,事务在向channel放的过程中,遇到问题。

      sink那边取数据速度要比Source这边放数据速度慢,导致channel中的数据积压,这个时候就会造成putList中的数据放不进去。
      这时会进行事务的回滚操作,调用doRollback办法,doRollback办法会做两个事件:

        - 1、清空putList中的数据;   - 2、抛出channelException异样。

      当source捕捉到doRollback抛出的异样,就会把方才的一批数据从新采集一下,采集完之后从新走事务的流程。

      • 在数据采集的过程中也有可能呈现问题,同样是调用doRollback办法来对事务进行回滚。

1.2 take事务

  • 1.事务开始时,调用doTake办法,将channel中的event提取到(剪切)takeList中,
  • 2.如果前面的sink是HDFS Sink,同时在写入HDFS的IO缓冲流中放一份event。
  • 3.当takeList中寄存的Event达到约定数量(batchSize) ,就会调用doCommit办法:

    • 胜利执行状况下:

      • 如果是HDFS Sink,那么手动调用IO流的flush办法,将IO流缓冲区的数据写入到HDFS磁盘中,同时清空takeList中的数据
    • 失败状况下:

      • 1.网络提早等起因导致传输数据失败,

      调用doRollback办法来进行回滚,takeList中还有备份数据,所以将takeList中的数据一成不变地还给channel,这时候就实现了事务的回滚。

      • 2.如果takeList数据有一部分传输胜利了,剩下的因为网络提早传输失败了。

      同样会调用doRollback办法来进行回滚,它会把整个takeList中的数据返回给channel,而后持续进行数据的读写。
      如此一来,再次进行事务时候,就会存在数据反复的可能。

2.Flume外部原理

  • 1). Source采集数据

EventBuilder.withBody(body)将数据封装成Event对象,
getChannelProcessor().processEvent(event)将数据交给Channel Processor
通过源码能够看到,以avro source为例

 public Void append(AvroFlumeOGEvent evt) throws AvroRemoteException  {    .....    Event event = EventBuilder.withBody(evt.getBody().array(), headers); // 将数据封装成Event对象,    try {      getChannelProcessor().processEvent(event); // 将数据交给Channel Processor      this.counterGroup.incrementAndGet("rpc.events");    } catch (ChannelException ex) {      return null;    }    this.counterGroup.incrementAndGet("rpc.successful");    return null;  }
  • 2)Channel Processor将Event事件传递给拦截器链interceptorChain.intercept(event),而后将数据返回给Channel Processor。
  • 3)Channel Processor将拦挡过滤之后的Event事件传递给Channel选择器(Channel Selector)),Channel Selector返回给Channel Processor写入event事件的Channel列表

其中Channel Selectors有两种类型:
  - 1.Replicating Channel Selector : 将source过去的events发往所有的channel(相当于复制多份,默认应用的channel selector)
  - 2.Multiplexing Channel Selector:能够指定source发过来的events发往的channel

  • 4)Channel Processor依据Channel选择器的抉择后果,将Event事件写入相应的Channel

看下channel Processor源码
首先结构器中间接定义了selector和拦截器interceptorChain

  public ChannelProcessor(ChannelSelector selector)  {    this.selector = selector;    this.interceptorChain = new InterceptorChain();  } 

而后在processEvent和processEventBatch(List<Event> events)

 public void processEvent(Event event)  {    event = this.interceptorChain.intercept(event); // 提交到拦截器链    if (event == null) {      return;    }    List requiredChannels = this.selector.getRequiredChannels(event); // 提交到channel 选择器    for (Iterator localIterator = requiredChannels.iterator(); localIterator.hasNext(); ) { reqChannel = (Channel)localIterator.next();      Transaction tx = reqChannel.getTransaction();      Preconditions.checkNotNull(tx, "Transaction object must not be null");      try {        tx.begin();        reqChannel.put(event);        tx.commit();      } catch (Throwable t) {        tx.rollback();        if ((t instanceof Error)) {          LOG.error("Error while writing to required channel: " + reqChannel, t);          throw ((Error)t);        }if ((t instanceof ChannelException)) {          throw ((ChannelException)t);        }        throw new ChannelException("Unable to put event on required channel: " + reqChannel, t);      }      finally      {        if (tx != null)          tx.close();      }    }    Channel reqChannel;    List optionalChannels = this.selector.getOptionalChannels(event);     for (Channel optChannel : optionalChannels) {      Transaction tx = null;      try {        tx = optChannel.getTransaction();        tx.begin();        optChannel.put(event); // 将event事件写入channel        tx.commit();      } catch (Throwable t) {        tx.rollback();        LOG.error("Unable to put event on optional channel: " + optChannel, t);        if ((t instanceof Error))          throw ((Error)t);      }      finally {        if (tx != null)          tx.close();      }    }  }}

  • 5)SinkProcessor启动sink,sink在channel中去轮询,取出channel中的event事件。
    SinkProcessor有三种,

    • DefaultSinkProcessor(默认的,外部无任何逻辑,只是单纯的调用sink)、
    • LoadBalancingSinkProcessor(负载平衡)、
    • FaioverSinkProcessor(容灾复原)