数据存储这一块,RocketMQ应用的是文件编程模型。为了进步文件的写入性能通常会引入内存映射机制,数据先写入页缓存而后再择机将页缓存数据刷盘到磁盘,写入波及性能与数据可靠性是必须要思考的。针对刷盘策略个别会有同步刷盘与异步刷盘,RocketMQ也是如此,默认应用异步刷盘。
先来简略看下RocketMQ刷盘操作的代码块:

            try {                    //We only append data to fileChannel or mappedByteBuffer, never both.                    if (writeBuffer != null || this.fileChannel.position() != 0) {                        this.fileChannel.force(false);                    } else {                        // 正文4.8.1:同步落盘                        this.mappedByteBuffer.force();                    }                } catch (Throwable e) {                    log.error("Error occurred when force data to disk.", e);                }

能够看到刷盘其实就是调用了MappedByteBuffer的force办法。

同步刷盘

同步刷盘指的 Broker 端收到音讯发送者的音讯后,先写入内存,而后将内容长久化到磁盘后才向客户端返回音讯发送胜利。
刷盘要分两条线进行剖析:

  1. 第一条线是broker在启动的时候会启动一个刷盘线程,调用门路为:BrokerController#start()->DefaultMessageStore#start()->CommitLog#start()->GroupCommitService#start()->MappedFileQueue#flush();
  2. 第二条线是broker在接管到音讯后加载或更新MappedFile而后存入MappedFileQueue,调用门路为:SendMessageProcessor#processRequest()->DefaultMessageStore#putMessage()->CommitLog#putMessage()->CommitLog#handleDiskFlush()->GroupCommitRequest#waitForFlush().

第一条线的刷盘线程会在一个while循环里每距离10ms执行一次刷盘操作,刷盘胜利后会唤醒第二条线里中期待响应的线程,在第二条线里组装好MappedFileQueue(CopyOnWriteArrayList类型)之后便会调用countDownLatch的await办法期待刷盘线程的执行。

    //broker接管到音讯组装好MappedFileQueue后期待刷盘线程执行    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {        // Synchronization flush        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;            if (messageExt.isWaitStoreMsgOK()) {                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                service.putRequest(request);                //期待刷盘线程执行                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());                if (!flushOK) {                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                        + " client address: " + messageExt.getBornHostString());                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);                }            } else {                service.wakeup();            }        }        // Asynchronous flush        // 正文4.8.2:异步刷盘        else {            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {                flushCommitLogService.wakeup();            } else {                commitLogService.wakeup();            }        }    }
                   for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            //以后已刷盘指针大于该条音讯对应的物理偏移量阐明已刷完                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                //刷盘操作                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        //唤醒期待刷盘的线程                        req.wakeupCustomer(flushOK);                    }                

对于同步刷盘须要提一下就是每次刷盘并非只刷写一条音讯,而是一组音讯。

异步刷盘

同步刷盘的长处是能保障音讯不失落,即向客户断返回胜利就代表这条音讯已被长久化到磁盘,即音讯十分牢靠,然而以就义写入性能为前提条件的,但因为 RocketMQ 的音讯是先写入 PageCache,故音讯失落的可能性较小,如果能容 忍肯定几率的音讯失落,但能进步性能,能够思考应用异步刷盘。

异步刷盘指的是 Broker 将音讯存储到 PageCache 后就立刻返回胜利,而后开启一个异步线程定时执行 FileChannel 的 forece 办法将内存中的数据定时刷写到磁盘,默认距离为 500ms。在 RocketMQ 的异步刷盘实现类为 FlushRealTimeService。看到这个默认距离为 500ms,大家是不是会猜想 FlushRealTimeService 是应用了定时工作?
其实不然。这里引入了带超时工夫的 CountDown await 办法,这样做的益处时如果没有新的音讯写入,会休眠 500ms,但收到了新的音讯后,能够被唤醒,做到音讯及时被刷盘,而不是肯定要等 500 ms。
刷盘线程的期待在CommitRealTimeService#run办法里,唤醒刷盘线程刷盘是在CommitLog#handleDiskFlush的异步分支里。

文件的复原

这里只是简略提一下。
文件复原分了失常退出后文件复原与异样退出的文件复原。

  • 失常退出后的复原:以ConsumerQueue为根据,获取外面最初一条音讯生产的物理偏移量。如果这个偏移量大于CommitLog文件里的偏移量,则会删除ConsumerQueue里多余的数据;如果小于CommitLog文件里的偏移量,则将多进去的物理偏移量对应的音讯进行重发,保障两个文件统一。
  • 异样后的复原:broker会记录commitlog、index、consumequeue 等文件的最初一次刷盘工夫戳,之后还会记录一个checkpoint工夫戳。以checkpoint里的工夫戳为基准比照commitlog里的刷盘工夫戳进行相应操作。

文件复原入口:DefaultMessageStore#recover,详情能够参考:从 RocketMQ 学基于文件的编程模式(二)

另外,这里波及到的缓存页与MappedByteBuffer,零拷贝无关,能够参考之前的一篇文章:Java里的零拷贝

相干的文章:RocketMQ源码-MappedFile介绍,其中波及到了TransientStorePool暂存池,MappedFile预调配,写入与刷盘
参考的文章:从 RocketMQ 学基于文件的编程模式(二)