关于rocketmq:RocketMQ源码broker-消息接收流程写入commitLog

95次阅读

共计 25162 个字符,预计需要花费 63 分钟才能阅读完成。

从本文开始,咱们来剖析 rocketMq 音讯接管、散发以及投递流程。

RocketMq 音讯解决整个流程如下:

  1. 音讯接管:音讯接管是指接管 producer 的音讯,解决类是 SendMessageProcessor,将音讯写入到 commigLog 文件后,接管流程处理完毕;
  2. 音讯散发:broker 解决音讯散发的类是 ReputMessageService,它会启动一个线程,一直地将 commitLong 分到到对应的 consumerQueue,这一步操作会写两个文件:consumerQueue 与 indexFile,写入后,音讯散发流程解决 结束;
  3. 音讯投递:音讯投递是指将音讯发往 consumer 的流程,consumer 会发动获取音讯的申请,broker 收到申请后,调用 PullMessageProcessor 类解决,从 consumerQueue 文件获取音讯,返回给 consumer 后,投递流程处理完毕。

以上就是 rocketMq 解决音讯的流程了,接下来咱们就从源码来看相干流程的实现。

1. remotingServer 的启动流程

在正式剖析接管与投递流程前,咱们来理解下 remotingServer 的启动。

remotingServer 是一个 netty 服务,他开启了一个端口用来解决 producer 与 consumer 的网络申请。

remotingServer 是在 BrokerController#start 中启动的,代码如下:

    public void start() throws Exception {
        // 启动各组件
        ...

        if (this.remotingServer != null) {this.remotingServer.start();
        }

        ...
    }

持续查看 remotingServer 的启动流程,进入 NettyRemotingServer#start 办法:

public void start() {
    ...

    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            ...
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
                        .addLast(defaultEventExecutorGroup, 
                            HANDSHAKE_HANDLER_NAME, handshakeHandler)
                        .addLast(defaultEventExecutorGroup,
                            encoder,
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, 
                                nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            connectionManageHandler,
                            // 解决业务申请的 handler
                            serverHandler
                        );
                }
            });

    ...

}

这就是一个规范的 netty 服务启动流程了,套路与 nameServer 的启动是一样的。对于 netty 的相干内容,这里咱们仅关注 pipeline 上的 channelHandler,在 netty 中,解决读写申请的操作为一个个 ChannelHandler,remotingServer 中解决读写申请的 ChanelHandler 为 NettyServerHandler,代码如下:

 @ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {processMessageReceived(ctx, msg);
    }
}

这块的操作与 nameServer 对外提供的服务极类似(就是同一个类),最终调用的是 NettyRemotingAbstract#processRequestCommand 办法:

 public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
     // 依据 code 从 processorTable 获取 Pair
    final Pair<NettyRequestProcessor, ExecutorService> matched 
        = this.processorTable.get(cmd.getCode());
    // 找不到默认值    
    final Pair<NettyRequestProcessor, ExecutorService> pair =  
        null == matched ? this.defaultRequestProcessor : matched;

    ...

    // 从 pair 中拿到 Processor 进行解决
    NettyRequestProcessor processor = pair.getObject1();
    // 解决申请
    RemotingCommand response = processor.processRequest(ctx, cmd);

    ....
 }

如果进入源码去看,会发现这个办法十分长,这里省略了异步解决、异样解决及返回值结构等,仅列出了关键步骤:

  1. 依据 code 从 processorTable 拿到对应的 Pair
  2. 从 Pair 里获取 Processor

最终解决申请的就是 Processor 了。

2. Processor 的注册

从下面的剖析中可知,Processor 是解决音讯的要害,它是从 processorTable 中获取的,这个 processorTable 是啥呢?

processorTable 是 NettyRemotingAbstract 成员变量,外面的内容是 BrokerController 在初始化时(执行 BrokerController#initialize 办法)注册的。之前在剖析 BrokerController 的初始化流程时,就提到过 Processor 的提供操作,这里再回顾下:

BrokerController 的初始化办法 initialize 会调用 BrokerController#registerProcessor,Processor 的注册操作就在这个办法里:

public class BrokerController {

    private final PullMessageProcessor pullMessageProcessor;

    /**
     * 构造方法
     */
    public BrokerController(...) {
        // 解决 consumer 拉音讯申请的
        this.pullMessageProcessor = new PullMessageProcessor(this);
    }

    /**
     * 注册操作
     */
    public void registerProcessor() {
        // SendMessageProcessor
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
        // 解决 Processor
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
            sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, 
            sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, 
            sendProcessor, this.sendMessageExecutor);

        // PullMessageProcessor
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, 
            this.pullMessageProcessor, this.pullMessageExecutor);

        // 省略其余许许多多的 Processor 注册    
        ...

    }

    ...

须要指明的是,sendProcessor 用来解决 producer 申请过去的音讯,pullMessageProcessor 用来解决 consumer 拉取音讯的申请。

3. 接管 producer 音讯

理解完 remotingServer 的启动与 Processor 的注册内容后,接下来咱们就能够剖析接管 producer 音讯的流程了。

producer 发送音讯到 broker 时,发送的申请 code 为 SEND_MESSAGE(RocketMQ 源码 5 -producer 同步发送和单向发送 第 1.4 大节),依据下面的剖析,当音讯过去时,会应用 NettyServerHandler 这个 ChannelHandler 来解决,之后会调用到 NettyRemotingAbstract#processRequestCommand 办法。

在 NettyRemotingAbstract#processRequestCommand 办法中,会依据音讯的 code 获取对应的 Processor 来解决,从 Processor 的注册流程来看,解决该 SEND_MESSAGE 的 Processor 为 SendMessageProcessor,咱们进入 SendMessageProcessor#processRequest 看看它的流程:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
    RemotingCommand response = null;
    try {
        // broker 解决接管音讯
        response = asyncProcessRequest(ctx, request).get();} catch (InterruptedException | ExecutionException e) {log.error("process SendMessage error, request :" + request.toString(), e);
    }
    return response;
}

没干啥事,一路跟上来,间接看一般音讯的流程,进入 SendMessageProcessor#asyncSendMessage 办法:

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, 
        RemotingCommand request, SendMessageContext mqtraceContext, 
        SendMessageRequestHeader requestHeader) {final RemotingCommand response = preSend(ctx, request, requestHeader);
    final SendMessageResponseHeader responseHeader 
        = (SendMessageResponseHeader)response.readCustomHeader();

    if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);
    }

    final byte[] body = request.getBody();

    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager()
        .selectTopicConfig(requestHeader.getTopic());

    // 如果没指定队列,就随机指定一个队列
    if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
    }

    // 将音讯包装为 MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    // 省略解决 msgInner 的流程
    ...

    CompletableFuture<PutMessageResult> putMessageResult = null;
    Map<String, String> origProps = MessageDecoder
        .string2messageProperties(requestHeader.getProperties());
    String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 发送事务音讯
    if (transFlag != null && Boolean.parseBoolean(transFlag)) {
        ...
        // 发送事务音讯
        putMessageResult = this.brokerController.getTransactionalMessageService()
            .asyncPrepareMessage(msgInner);
    } else {
        // 发送一般音讯
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, 
        responseHeader, mqtraceContext, ctx, queueIdInt);
}

这个办法是在筹备音讯的发送数据,所做的工作如下:

  1. 如果没指定队列,就随机指定一个队列,个别状况下不会给音讯指定队列的,但如果要发送程序音讯,就须要指定队列了,这点前面再剖析。
  2. 结构 MessageExtBrokerInner 对象,就是将 producer 上送的音讯包装下,加上一些额定的信息,如音讯标识 msgId、发送工夫、topic、queue 等。
  3. 发送音讯,这里只是分为两类:事务音讯与一般音讯,这里咱们次要关注一般音讯,事务音讯前面再剖析。

进入一般音讯的发送办法 DefaultMessageStore#asyncPutMessage:

public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    ...
    // 保留到 commitLog
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
    ...
}

3.1 commitLog 写入原理

一个 broker 逻辑上对应着一个 commitLog,你能够把它看作一个大文件,而后这个 broker 收到的所有音讯都写到这个外面,然而物理上 ROCKET_HOME/commitlog/00000000000000000000 这个门路存储的,它是由若干个文件组成的,每个文件默认大小是 1G,而后每个文件都对应这个一个 MappedFile,00000000000000000000 就是第一个 MappedFile 对应的物理文件,每个文件的文件名就是在 commitLog 外面的一个其实 offset,第二个文件名就是 00000000001073741824,也就是上一个 MappedFile 文件起始 offset 加上每个文件的大小,这个 MappedFile 就是 RocketMQ 的黑科技,应用了内存映射技术来进步文件的访问速度与写入速度,而后都是采纳追加写的形式进步写入速度。

咱们间接看官网的形容(链接:github.com/apache/rock…):

rocketMq 音讯存储架构图

音讯存储架构图中次要有上面三个跟音讯存储相干的文件形成。

(1) CommitLog:音讯主体以及元数据的存储主体,存储 Producer 端写入的音讯主体内容, 音讯内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,右边补零,残余为起始偏移量,比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。音讯次要是程序写入日志文件,当文件满了,写入下一个文件;

(2) ConsumeQueue:音讯生产队列,引入的目标次要是进步音讯生产的性能,因为 RocketMQ 是基于主题 topic 的订阅模式,音讯生产是针对主题进行的,如果要遍历 commitlog 文件中依据 topic 检索音讯是十分低效的。Consumer 即可依据 ConsumeQueue 来查找待生产的音讯。其中,ConsumeQueue(逻辑生产队列)作为生产音讯的索引,保留了指定 Topic 下的队列音讯在 CommitLog 中的起始物理偏移量 offset,音讯大小 size 和音讯 Tag 的 HashCode 值。consumequeue 文件能够看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织形式如下:topic/queue/file 三层组织构造,具体存储门路为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,别离为 8 字节的 commitlog 物理偏移量、4 字节的音讯长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,能够像数组一样随机拜访每一个条目,每个 ConsumeQueue 文件大小约 5.72M;

(3) IndexFile:IndexFile(索引文件)提供了一种能够通过 key 或工夫区间来查问音讯的办法。Index 文件的存储地位是:HOME\store\index{fileName},文件名 fileName 是以创立时的工夫戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 能够保留 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 构造,故 rocketmq 的索引文件其底层实现为 hash 索引。

在下面的 RocketMQ 的音讯存储整体架构图中能够看出,RocketMQ 采纳的是混合型的存储构造,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。RocketMQ 的混合型存储构造 (多个 Topic 的音讯实体内容都存储于一个 CommitLog 中) 针对 Producer 和 Consumer 别离采纳了数据和索引局部相拆散的存储构造,Producer 发送音讯至 Broker 端,而后 Broker 端应用同步或者异步的形式对音讯刷盘长久化,保留至 CommitLog 中。只有音讯被刷盘长久化至磁盘文件 CommitLog 中,那么 Producer 发送的音讯就不会失落。

正因为如此,Consumer 也就必定有机会去生产这条音讯。当无奈拉取到音讯后,能够等下一次音讯拉取,同时服务端也反对长轮询模式,如果一个音讯拉取申请未拉取到音讯,Broker 容许期待 30s 的工夫,只有这段时间内有新音讯达到,将间接返回给生产端。这里,RocketMQ 的具体做法是,应用 Broker 端的后盾服务线程—ReputMessageService 不停地散发申请并异步构建 ConsumeQueue(逻辑生产队列)和 IndexFile(索引文件)数据。

3.2 CommitLog#asyncPutMessage

持续进入 CommitLog#asyncPutMessage 办法,这个办法有点长,咱们分局部:

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // Set the storage time 设置存储工夫
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    // 设置 crc
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    // 获取事务状态
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 事务
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery 延时音讯的解决
        if (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            // 设置提早队列
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    ...

这一部分其实就是从 msg 中获取一些信息,判断解决一下这个延时音讯。

long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 获取最初一个 MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

// 获取写入锁
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    // 开始在锁里的工夫
    this.beginTimeInLock = beginLockTimestamp;

    // Here settings are stored timestamp, in order to ensure an orderly
    // global
    // 设置写入的工夫戳,确保它是有序的,msg.setStoreTimestamp(beginLockTimestamp);

    // 判断 MappedFile 是否是 null 或者是否是满了
    if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    }
    if (null == mappedFile) {log.error("create mapped file1 error, topic:" + msg.getTopic() + "clientAddr:" + msg.getBornHostString());
        beginTimeInLock = 0;
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
    }

    // todo 往 mappedFile 追加音讯
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);

    ...

这一部分就比拟重要了,首先是从 mappedFileQueue 中获取最初一个 MappedFile,这个就是拿汇合最初一个元素,因为都是有序的,最初一个元素就是最初一个 MappedFile,接着就是获取锁了,这个锁也是比拟有考究的,能够设置应用 ReentrantLock 也能够设置应用 cas,默认是应用 cas,接着就是设置 beginTimeInLock 这个变量了,这个变量咱们在判断 os page cache 忙碌的时候说过,就是获取到锁的一个工夫戳,在开释锁之前会重置成 0,接着就是判断 mappedFile 是不是 null 或者是不是满了,如果是的话就要新建一个了。

接着就是最最最重要的了 往 mappedFile 中追加音讯,

mappedFile.appendMessage

/**
 * 将音讯追加到 MappedFile 文件中
 */
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;

    // 获取 MappedFile 以后文件写指针
    int currentPos = this.wrotePosition.get();

    // 如果 currentPos 小于文件大小
    if (currentPos < this.fileSize) {ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 单个音讯
        if (messageExt instanceof MessageExtBrokerInner) {
            // todo
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        // 批量音讯
        } else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    // 如果 currentPos 大于或等于文件大小,表明文件已写满,抛出异样
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

这里首先获取了一下这个 mappedFile 写到哪个地位了,它这个地位是从 0 开始的,而后判断一下以后地位与文件大小做比照,要是大于的就超了文件大小了,接着是获取 writerbuffer 因为这里是没有开启 transientStorePool 的,所以它是个空的,就会应用 mmapedByteBuffer,接着就是调用回调的 doAppend 追加音讯了,咱们看下它的参数,第一个是开始 offset,这个 offset 是 commitlog 的一个 offset,举个例子,第一个 MappedFile 的开始 offset 是 0,而后一个 MappedFile 的大小是 1g,而后第二个 MappedFile 就得从 1073741824(1g)开始了,第二个参数是 bytebuffer,这个不必多说,第三个是这个 MappedFile 还空多少字节没用,第四个就是音讯了。

咱们来看下这个 doAppend 办法,这个也有点长,咱们须要离开看下:

/**
 * // 只是将音讯追加到内存中
 * @param fileFromOffset 文件的第一个偏移量(就是 MappedFile 是从哪个中央开始的)*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    // PHY OFFSET
    long wroteOffset = fileFromOffset + byteBuffer.position();

    int sysflag = msgInner.getSysFlag();

    int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
    int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
    ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
    ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

    this.resetByteBuffer(storeHostHolder, storeHostLength);
    // 创立全局惟一音讯 id
    String msgId;
    if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
    } else {msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
    }

    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    // 获取该音讯在音讯队列的物理偏移量
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }

    // Transaction messages that require special handling
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) {
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queue
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    }

这一部分次要就是 计算了一下这个音讯写在 commitlog 中的一个 offset,接着就是生成一个 msgId,而后依据 topic 与 queueId 从缓存中获取了一下这个 queueId 对应的一个 queue 的 offset,这个其实就是增加一个音讯加 1,而后就是事务的货色了,如果有事务,而后还在筹备阶段或者回滚阶段,就将 queue offset 设置成 0,再往下其实就是解决音讯,而后写到 buffer 中了。

/**
 * Serialize message
 */
final byte[] propertiesData =
    msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

if (propertiesLength > Short.MAX_VALUE) {log.warn("putMessage message properties length too long. length={}", propertiesData.length);
    return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}

final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;

final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

// todo 计算音讯总长度
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

// Exceeds the maximum message  
if (msgLen > this.maxMessageSize) {  // 最大音讯长度 65536
    CommitLog.log.warn("message size exceeded, msg total size:" + msgLen + ", msg body size:" + bodyLength
        + ", maxMessageSize:" + this.maxMessageSize);
    return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
...

这里首先是获取了一下音讯外面的 properties,将它转成字节数组,计算了一下长度,接着就是将 topic 转成字节数据,计算了一下长度,获取了一下 body 的长度,就是你往 Message 塞得内容长度,重点来,计算 音讯的总长度,而后判断一下长度是否超长。其中 calMsgLength 如下:

    // 计算音讯长度
    protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
        int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
        final int msgLen = 4 //TOTALSIZE 音讯条目总长度,4 字节
            + 4 //MAGICCODE 魔数,4 字节。固定值 0xdaa320a7
            + 4 //BODYCRC 音讯体的 crc 校验码,4 字节
            + 4 //QUEUEID 音讯生产队列 ID,4 字节
            + 4 //FLAG 音讯标记,RocketMQ 对其不做解决,供应用程序应用,默认 4 字节
            + 8 //QUEUEOFFSET 音讯在 ConsumeQuene 文件中的物理偏移量,8 字节。+ 8 //PHYSICALOFFSET 音讯在 CommitLog 文件中的物理偏移量,8 字节
            + 4 //SYSFLAG 音讯零碎标记,例如是否压缩、是否是事务音讯 等,4 字节
            + 8 //BORNTIMESTAMP 音讯生产者调用音讯发送 API 的工夫戳,8 字 节
            + bornhostLength //BORNHOST 音讯发送者 IP、端口号,8 字节
            + 8 //STORETIMESTAMP 音讯存储工夫戳,8 字节
            + storehostAddressLength //STOREHOSTADDRESS  Broker 服务器 IP+ 端口号,8 字节
            + 4 //RECONSUMETIMES 音讯重试次数,4 字节
            + 8 //Prepared Transaction Offset 事务音讯的物理偏移量,8 字节。+ 4  // 音讯体长度,4 字节
            + (bodyLength > 0 ? bodyLength : 0) // BODY  音讯体内容,长度为 bodyLenth 中存储的值
            + 1 // 主题存储长度,1 字节,示意主题名称不能超过 255 个字符。+ topicLength //TOPIC 主题,长度为 TopicLength 中存储的值
            + 2  // 音讯属性长度,2 字节,示意音讯属性长 度不能超过 65536 个字符。+ (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength  音讯属性,长度为 PropertiesLength 中存储的 值
            + 0;
        return msgLen;
    }

持续:

...
// todo 音讯长度 +END_FILE_MIN_BLANK_LENGTH 大于 commitLog 的闲暇空间,则返回 END_OF_FILE
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
    // 1 TOTALSIZE  4 字节存储以后文件的残余空间
    this.msgStoreItemMemory.putInt(maxBlank);
    // 2 MAGICCODE 4 字节存储魔数
    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
    // 3 The remaining space may be any value
    // Here the length of the specially set maxBlank
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
...

判断剩下的空间能不能放开,如果放不开的话,就塞上一个完结的货色,8 个字节是正经的,剩下的随便,而后返回文件满了的状态。

...
// Initialization of storage space
// 初始化存储空间
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
    this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
    this.msgStoreItemMemory.put(propertiesData);

final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
...

这个就是封装音讯了,最初将音讯放到 byteBuffer 中。

...
// 创立 AppendMessageResult
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

    switch (tranType) {
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            // The next update ConsumeQueue information
            // 更新音讯队列的逻辑偏移量
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
            break;
        default:
            break;
    }
    return result;
}

最初就是封装追加音讯的后果是 put_ok,而后更新 queue offset,其实就是 +1。

接下来咱们回过头来看下 appendMessagesInner 的后半局部,

...
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;

这里其实就是更新了一下 这个 MappedFile 写到哪个中央了,更新了下写入工夫。

回到 commitLog 的 putMessage 办法:

...
    // todo 往 mappedFile 追加音讯
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
        case PUT_OK:
            break;
        case END_OF_FILE:
            unlockMappedFile = mappedFile;
            // Create a new file, re-write the message
            mappedFile = this.mappedFileQueue.getLastMappedFile(0);
            if (null == mappedFile) {
                // XXX: warn and notify me
                log.error("create mapped file2 error, topic:" + msg.getTopic() + "clientAddr:" + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
            }
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            break;
        case MESSAGE_SIZE_EXCEEDED:
        case PROPERTIES_SIZE_EXCEEDED:
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
        case UNKNOWN_ERROR:
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
        default:
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
    }

    elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    beginTimeInLock = 0;
} finally {putMessageLock.unlock();
}
...

这里追加实现了,就须要判断追加状态了,如果是那种 MappedFile 放不开音讯的状况,它会从新获取一个 MappedFile,而后从新追加,在开释锁之前,它还会将 beginTimeInLock 这个字段重置为 0;

...
if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

    // todo 音讯首先进入 pagecache,而后执行刷盘操作,CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    // todo 接着调用 submitReplicaRequest 办法将音讯提交到 HaService, 进行数据复制
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

    // todo 这里应用了 ComplateFuture 的 thenCombine 办法,将刷盘、复制当成一
    // todo 个联结工作执行,这里设置音讯追加的最终状态
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);
            if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                        msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
            }
        }
        return putMessageResult;
    });
}

判断了一下耗时,如果是大于 500ms 的话,打印正告,封装 put 音讯的后果,统计 store,能够看到前面调用了 2 个办法,一个是刷盘的,一个是同步音讯的,咱们这里要看下这个刷盘动作:

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    // Synchronization flush 同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            // 构建 GroupCommitRequest 同步工作并提交到 GroupCommitRequest
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            // 刷盘申请
            service.putRequest(request);
            return request.future();} else {service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // Asynchronous flush  异步刷盘 这个就是靠 os
    else {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();
        } else  {commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

如果 broker 配置的 SYNC_FLUSH 并且是个同步音讯,这个时候就会创立一个刷盘申请,而后提交刷盘申请,这个时候会等着刷盘实现,默认就是 5s。

接着就是到存储器的 putMessage 办法的后半局部了:

...
// todo 存储音讯
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

putResultFuture.thenAccept((result) -> {long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }
    // 记录状态
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

    if (null == result || !result.isOk()) {
        // 记录状态
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }
});

return putResultFuture;

commitlog 存入音讯之后,咱们这块也就算是实现了,最初就是回到那个 processor,而后将 put 后果写入对应的 channel 给返回去,通知音讯生产者音讯写入后果。音讯存储其实就是找对应的 MappedFile,依照肯定的格局往文件外面写入,须要留神的是内存映射文件。

这里附一张音讯存储字段存储程序与字段长度的图:

4. 总结

本文次要剖析了 broker 接管 producer 音讯的流程,流程如下:

  1. 解决音讯接管的底层服务为 netty,在 BrokerController#start 办法中启动
  2. netty 服务中,解决音讯接管的 channelHandler 为 NettyServerHandler,最终会调用 SendMessageProcessor#processRequest 来解决音讯接管
  3. 音讯接管流程的最初,MappedFile#appendMessage(…)办法会将音讯内容写入到 commitLog 文件中。

正文完
 0