关于rocketmq:RocketMQ学习十二消息刷盘

29次阅读

共计 3583 个字符,预计需要花费 9 分钟才能阅读完成。

数据存储这一块,RocketMQ 应用的是文件编程模型。为了进步文件的写入性能通常会引入内存映射机制,数据先写入页缓存而后再择机将页缓存数据刷盘到磁盘,写入波及性能与数据可靠性是必须要思考的。针对刷盘策略个别会有同步刷盘与异步刷盘,RocketMQ 也是如此,默认应用异步刷盘。
先来简略看下 RocketMQ 刷盘操作的代码块:

            try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);
                    } else {
                        // 正文 4.8.1:同步落盘
                        this.mappedByteBuffer.force();}
                } catch (Throwable e) {log.error("Error occurred when force data to disk.", e);
                }

能够看到刷盘其实就是调用了 MappedByteBuffer 的 force 办法。

同步刷盘

同步刷盘指的 Broker 端收到音讯发送者的音讯后,先写入内存,而后将内容长久化到磁盘后才向客户端返回音讯发送胜利。
刷盘要分两条线进行剖析:

  1. 第一条线是 broker 在启动的时候会启动一个刷盘线程,调用门路为:BrokerController#start()->DefaultMessageStore#start()->CommitLog#start()->GroupCommitService#start()->MappedFileQueue#flush();
  2. 第二条线是 broker 在接管到音讯后加载或更新 MappedFile 而后存入 MappedFileQueue,调用门路为:SendMessageProcessor#processRequest()->DefaultMessageStore#putMessage()->CommitLog#putMessage()->CommitLog#handleDiskFlush()->GroupCommitRequest#waitForFlush().

第一条线的刷盘线程会在一个 while 循环里每距离 10ms 执行一次刷盘操作,刷盘胜利后会唤醒第二条线里中期待响应的线程,在第二条线里组装好 MappedFileQueue(CopyOnWriteArrayList 类型)之后便会调用 countDownLatch 的 await 办法期待刷盘线程的执行。

    //broker 接管到音讯组装好 MappedFileQueue 后期待刷盘线程执行
    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                // 期待刷盘线程执行
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic:" + messageExt.getTopic() + "tags:" + messageExt.getTags()
                        + "client address:" + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {service.wakeup();
            }
        }
        // Asynchronous flush
        // 正文 4.8.2:异步刷盘
        else {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();
            } else {commitLogService.wakeup();
            }
        }
    }
                   for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            // 以后已刷盘指针大于该条音讯对应的物理偏移量阐明已刷完
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {
                                // 刷盘操作
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }
                        // 唤醒期待刷盘的线程
                        req.wakeupCustomer(flushOK);
                    }                

对于同步刷盘须要提一下就是每次刷盘并非只刷写一条音讯,而是一组音讯。

异步刷盘

同步刷盘的长处是能保障音讯不失落,即向客户断返回胜利就代表这条音讯已被长久化到磁盘,即音讯十分牢靠,然而以就义写入性能为前提条件的,但因为 RocketMQ 的音讯是先写入 PageCache,故音讯失落的可能性较小,如果能容 忍肯定几率的音讯失落,但能进步性能,能够思考应用异步刷盘。

异步刷盘指的是 Broker 将音讯存储到 PageCache 后就立刻返回胜利,而后开启一个异步线程定时执行 FileChannel 的 forece 办法将内存中的数据定时刷写到磁盘,默认距离为 500ms。在 RocketMQ 的异步刷盘实现类为 FlushRealTimeService。看到这个默认距离为 500ms,大家是不是会猜想 FlushRealTimeService 是应用了定时工作?
其实不然。这里引入了带超时工夫的 CountDown await 办法,这样做的益处时如果没有新的音讯写入,会休眠 500ms,但收到了新的音讯后,能够被唤醒,做到音讯及时被刷盘,而不是肯定要等 500 ms。
刷盘线程的期待在 CommitRealTimeService#run 办法里,唤醒刷盘线程刷盘是在 CommitLog#handleDiskFlush 的异步分支里。

文件的复原

这里只是简略提一下。
文件复原分了失常退出后文件复原与异样退出的文件复原。

  • 失常退出后的复原:以 ConsumerQueue 为根据,获取外面最初一条音讯生产的物理偏移量。如果这个偏移量大于 CommitLog 文件里的偏移量,则会删除 ConsumerQueue 里多余的数据;如果小于 CommitLog 文件里的偏移量,则将多进去的物理偏移量对应的音讯进行重发,保障两个文件统一。
  • 异样后的复原:broker 会记录 commitlog、index、consumequeue 等文件的最初一次刷盘工夫戳,之后还会记录一个 checkpoint 工夫戳。以 checkpoint 里的工夫戳为基准比照 commitlog 里的刷盘工夫戳进行相应操作。

文件复原入口:DefaultMessageStore#recover,详情能够参考:从 RocketMQ 学基于文件的编程模式(二)

另外,这里波及到的缓存页与 MappedByteBuffer,零拷贝无关,能够参考之前的一篇文章:Java 里的零拷贝

相干的文章:RocketMQ 源码 -MappedFile 介绍,其中波及到了 TransientStorePool 暂存池,MappedFile 预调配,写入与刷盘
参考的文章:从 RocketMQ 学基于文件的编程模式(二)

正文完
 0