关于kafka:Kafka-Producer消息收发设计

30次阅读

共计 2788 个字符,预计需要花费 7 分钟才能阅读完成。

前几篇文章剖析了 Kafka 的发送流程以及 NIO 的应用形式,然而还是留下了不少坑,这里就对剩下的问题做一个总结。

收到的数据为什么要缓存起来?

Kafka 中 Selector 读取从远端回来的数据的时候会先把收到的数据缓存起来

private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
  //if channel is ready and has bytes to read from socket or buffer, and has no
  //previous receive(s) already staged or otherwise in progress then read from it
  if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
      && !explicitlyMutedChannels.contains(channel)) {
      NetworkReceive networkReceive;
      while ((networkReceive = channel.read()) != null) {
          madeReadProgressLastPoll = true;
          addToStagedReceives(channel, networkReceive);
      }
  }
}

在 NetworkClient 中,往下传的是一个残缺的 ClientRequest,进到 Selector,暂存到 channel 中的,也是一个残缺的 Send 对象 (1 个数据包)。但这个 Send 对象,交由底层的 channel.write(Bytebuffer b) 的时候,并不一定一次能够齐全发送,可能要调用屡次 write,能力把一个 Send 对象齐全收回去。这是因为 write 是非阻塞的,不是等到齐全收回去,才会返回。

Send send = channel.write();
if (send != null) {this.completedSends.add(send);
    this.sensors.recordBytesSent(channel.id(), send.size());
}

这里如果返回 send==null 就示意没有发送结束,须要等到下一次 Selector.poll 再次进行发送。所以当下次发送的时候如果 Channel 外面的 Send 只发送了局部,那么此次这个 node 就不会处于 ready 状态,就不会从 RecordAccumulator 取出要往这个 node 发的数据, 等到 Send 对象发送结束之后,这个 node 才会处于 ready 状态,就又能够取出数据进行解决了。

同样,在接管的时候,channel.read(Bytebuffer b),一个 response 也可能要 read 屡次,能力齐全接管。所以就有了下面的 while 循环代码。

如何确定音讯接管实现?

从下面晓得,底层数据的通信,是在每一个 channel 下面,2 个源源不断的 byte 流,一个 send 流,一个 receive 流。
send 的时候,还好说,发送之前晓得一个残缺的音讯的大小。
然而当咱们接管音讯 response 的时候,这个信息可能是不残缺的 (残余的数据要晚些能力取得),也可能蕴含不止一条音讯。那么咱们是怎么判断音讯发送结束的呢?
对于音讯的读取咱们必须思考音讯结尾是如何示意的, 标识音讯结尾通常有以下几种形式:

  1. 固定的音讯大小。
  2. 将音讯的长度作为音讯的前缀。
  3. 用一个非凡的符号来标识音讯的完结。

很显著第一种和第三种形式不是很适合,因而 Kafka 采纳了第二种形式来确定要发送音讯的大小。在音讯头部放入了 4 个字节来确定音讯的大小。

// 接管音讯,前 4 个字节示意音讯的大小
public class NetworkReceive implements Receive {
  private final String source;

  // 确定音讯 size
  private final ByteBuffer size;

  private final int maxSize;

  // 整个音讯 response 的 buffer
  private ByteBuffer buffer;  

  public NetworkReceive(String source) {
      this.source = source;

      // 调配 4 字节的头部
      this.size = ByteBuffer.allocate(4);
      this.buffer = null;
      this.maxSize = UNLIMITED;
  }
}

// 音讯发送, 前 4 个字节示意音讯大小
public class NetworkSend extends ByteBufferSend {public NetworkSend(String destination, ByteBuffer buffer) {super(destination, sizeDelimit(buffer));
  }

  private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
  }

  private static ByteBuffer sizeBuffer(int size) {
    // 4 个字节示意音讯大小
    ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
    sizeBuffer.putInt(size);
    sizeBuffer.rewind();
    return sizeBuffer;
  }

}

OP_WRITE 何时就绪?

上一篇文章尽管讲了 epoll 的原理,然而我置信还是有人感觉很怅惘,这里换个简略的说法再说下 OP_WRITE 事件。
OP_WRITE 事件的就绪条件并不是产生在调用 channel 的 write 办法之后,也不是产生在调用 channel.register(selector,SelectionKey.OP_WRITE)后, 而是在当底层缓冲区有闲暇空间的状况下。因为写缓冲区在绝大部分时候都是有闲暇空间的,所以如果你注册了写事件,这会使得写事件始终处于写就绪,抉择解决现场就会始终占用着 CPU 资源。所以,只有当你的确有数据要写时再注册写操作,并在写完当前马上勾销注册。

max.in.flight.requests.per.connection

这个参数指定了生产者在收到服务器响应之前能够发送多少个音讯,找 Kafka Producer 中对应有一个类 InFlightRequests, 示意在天上飞的申请, 也就是申请收回去了 response 还没有回来的申请数, 这个参数也是判断节点是否 ready 的关键因素。只有 ready 的节点数据能力从 Accumulator 中取出来进行发送。

正文完
 0