前几篇文章剖析了Kafka的发送流程以及NIO的应用形式,然而还是留下了不少坑,这里就对剩下的问题做一个总结。

收到的数据为什么要缓存起来?

Kafka中Selector读取从远端回来的数据的时候会先把收到的数据缓存起来

private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {  //if channel is ready and has bytes to read from socket or buffer, and has no  //previous receive(s) already staged or otherwise in progress then read from it  if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)      && !explicitlyMutedChannels.contains(channel)) {      NetworkReceive networkReceive;      while ((networkReceive = channel.read()) != null) {          madeReadProgressLastPoll = true;          addToStagedReceives(channel, networkReceive);      }  }}

在NetworkClient中,往下传的是一个残缺的ClientRequest,进到Selector,暂存到channel中的,也是一个残缺的Send对象(1个数据包)。但这个Send对象,交由底层的channel.write(Bytebuffer b)的时候,并不一定一次能够齐全发送,可能要调用屡次write,能力把一个Send对象齐全收回去。这是因为write是非阻塞的,不是等到齐全收回去,才会返回。

Send send = channel.write();if (send != null) {    this.completedSends.add(send);    this.sensors.recordBytesSent(channel.id(), send.size());}

这里如果返回send==null就示意没有发送结束,须要等到下一次Selector.poll再次进行发送。所以当下次发送的时候如果Channel外面的Send只发送了局部,那么此次这个node就不会处于ready状态,就不会从RecordAccumulator取出要往这个node发的数据,等到Send对象发送结束之后,这个node才会处于ready状态,就又能够取出数据进行解决了。

同样,在接管的时候,channel.read(Bytebuffer b),一个response也可能要read屡次,能力齐全接管。所以就有了下面的while循环代码。

如何确定音讯接管实现?

从下面晓得,底层数据的通信,是在每一个channel下面,2个源源不断的byte流,一个send流,一个receive流。
send的时候,还好说,发送之前晓得一个残缺的音讯的大小。
然而当咱们接管音讯response的时候,这个信息可能是不残缺的(残余的数据要晚些能力取得),也可能蕴含不止一条音讯。那么咱们是怎么判断音讯发送结束的呢?
对于音讯的读取咱们必须思考音讯结尾是如何示意的,标识音讯结尾通常有以下几种形式:

  1. 固定的音讯大小。
  2. 将音讯的长度作为音讯的前缀。
  3. 用一个非凡的符号来标识音讯的完结。

很显著第一种和第三种形式不是很适合,因而Kafka采纳了第二种形式来确定要发送音讯的大小。在音讯头部放入了4个字节来确定音讯的大小。

//接管音讯,前4个字节示意音讯的大小public class NetworkReceive implements Receive {  private final String source;  //确定音讯size  private final ByteBuffer size;  private final int maxSize;  //整个音讯response的buffer  private ByteBuffer buffer;    public NetworkReceive(String source) {      this.source = source;      //调配4字节的头部      this.size = ByteBuffer.allocate(4);      this.buffer = null;      this.maxSize = UNLIMITED;  }}//音讯发送,前4个字节示意音讯大小public class NetworkSend extends ByteBufferSend {  public NetworkSend(String destination, ByteBuffer buffer) {      super(destination, sizeDelimit(buffer));  }  private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {      return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};  }  private static ByteBuffer sizeBuffer(int size) {    //4个字节示意音讯大小    ByteBuffer sizeBuffer = ByteBuffer.allocate(4);    sizeBuffer.putInt(size);    sizeBuffer.rewind();    return sizeBuffer;  }}

OP_WRITE何时就绪?

上一篇文章尽管讲了epoll的原理,然而我置信还是有人感觉很怅惘,这里换个简略的说法再说下OP_WRITE事件。
OP_WRITE事件的就绪条件并不是产生在调用channel的write办法之后,也不是产生在调用channel.register(selector,SelectionKey.OP_WRITE)后,而是在当底层缓冲区有闲暇空间的状况下。因为写缓冲区在绝大部分时候都是有闲暇空间的,所以如果你注册了写事件,这会使得写事件始终处于写就绪,抉择解决现场就会始终占用着CPU资源。所以,只有当你的确有数据要写时再注册写操作,并在写完当前马上勾销注册。

max.in.flight.requests.per.connection

这个参数指定了生产者在收到服务器响应之前能够发送多少个音讯,找Kafka Producer中对应有一个类InFlightRequests,示意在天上飞的申请,也就是申请收回去了response还没有回来的申请数,这个参数也是判断节点是否ready的关键因素。只有ready的节点数据能力从Accumulator中取出来进行发送。