本文应用「署名 4.0 国内 (CC BY 4.0)」许可协定,欢送转载、或从新批改应用,但须要注明起源。 [署名 4.0 国内 (CC BY 4.0)]
本文作者: Nicksxs
创立工夫: 2021-09-12
本文链接: 聊一下 RocketMQ 的音讯存储二
CommitLog 是 rocketmq 的服务端,也就是 broker 存储音讯的的文件,跟 kafka 一样,也是程序写入,当然音讯是变长的,生成的规定是每个文件的默认1G =1024 * 1024 * 1024,commitlog的文件名fileName,名字长度为20位,右边补零,残余为起始偏移量;比方00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824, 音讯存储的时候会程序写入文件,当文件满了则写入下一个文件,代码中的定义

private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;

本地跑个 demo 验证下,也是这样,这里微妙有几个比拟奇妙的点(个人观点),首先文件就刚好是 1G,并且依照大小偏移量去生成下一个文件,这样获取音讯的时候按大小算一下就晓得在哪个文件里了,

代码中写入 CommitLog 的逻辑能够从这开始看

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {                msg.setStoreTimestamp(System.currentTimeMillis());                        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));                AppendMessageResult result = null;        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();        String topic = msg.getTopic();        int queueId = msg.getQueueId();        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {                        if (msg.getDelayTimeLevel() > 0) {                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());                }                topic = ScheduleMessageService.SCHEDULE_TOPIC;                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());                                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));                msg.setTopic(topic);                msg.setQueueId(queueId);            }        }        long eclipseTimeInLock = 0;        MappedFile unlockMappedFile = null;        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();        putMessageLock.lock();         try {            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();            this.beginTimeInLock = beginLockTimestamp;                                    msg.setStoreTimestamp(beginLockTimestamp);            if (null == mappedFile || mappedFile.isFull()) {                mappedFile = this.mappedFileQueue.getLastMappedFile(0);             }            if (null == mappedFile) {                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());                beginTimeInLock = 0;                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);            }            result = mappedFile.appendMessage(msg, this.appendMessageCallback);            switch (result.getStatus()) {                case PUT_OK:                    break;                case END_OF_FILE:                    unlockMappedFile = mappedFile;                                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);                    if (null == mappedFile) {                                                log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());                        beginTimeInLock = 0;                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);                    }                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);                    break;                case MESSAGE_SIZE_EXCEEDED:                case PROPERTIES_SIZE_EXCEEDED:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);                case UNKNOWN_ERROR:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);                default:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);            }            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;            beginTimeInLock = 0;        } finally {            putMessageLock.unlock();        }        if (eclipseTimeInLock > 500) {            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);        }        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);        }        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);                storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());        handleDiskFlush(result, putMessageResult, msg);        handleHA(result, putMessageResult, msg);        return putMessageResult;    }

后面也看到在CommitLog 目录下是有大小为 1G 的文件组成,在实现逻辑中,其实是通过 org.apache.rocketmq.store.MappedFileQueue ,外部是存的一个MappedFile的队列,对于写入的场景每次都是通过org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile() 获取最初一个文件,如果还没有创立,或者最初这个文件曾经满了,那就调用 org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long)

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {        long createOffset = -1;                      MappedFile mappedFileLast = getLastMappedFile();                if (mappedFileLast == null) {            createOffset = startOffset - (startOffset % this.mappedFileSize);        }                          if (mappedFileLast != null && mappedFileLast.isFull()) {                        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;        }        if (createOffset != -1 && needCreate) {                        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);            String nextNextFilePath = this.storePath + File.separator                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);            MappedFile mappedFile = null;                          if (this.allocateMappedFileService != null) {                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,                    nextNextFilePath, this.mappedFileSize);            } else {                try {                                      mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);                } catch (IOException e) {                    log.error("create mappedFile exception", e);                }            }            if (mappedFile != null) {                if (this.mappedFiles.isEmpty()) {                    mappedFile.setFirstCreateInQueue(true);                }                this.mappedFiles.add(mappedFile);            }            return mappedFile;        }        return mappedFileLast;    }

首先看下间接创立的,

public MappedFile(final String fileName, final int fileSize) throws IOException {        init(fileName, fileSize);    }private void init(final String fileName, final int fileSize) throws IOException {        this.fileName = fileName;        this.fileSize = fileSize;        this.file = new File(fileName);        this.fileFromOffset = Long.parseLong(this.file.getName());        boolean ok = false;        ensureDirOK(this.file.getParent());        try {                      this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();                      this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);            TOTAL_MAPPED_FILES.incrementAndGet();            ok = true;        } catch (FileNotFoundException e) {            log.error("create file channel " + this.fileName + " Failed. ", e);            throw e;        } catch (IOException e) {            log.error("map file " + this.fileName + " Failed. ", e);            throw e;        } finally {            if (!ok && this.fileChannel != null) {                this.fileChannel.close();            }        }    }

如果是提交给AllocateMappedFileService的话就用到了一些异步操作

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {        int canSubmitRequests = 2;        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) {                 canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();            }        }                        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;                if (nextPutOK) {                        if (canSubmitRequests <= 0) {                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());                this.requestTable.remove(nextFilePath);                return null;            }                      boolean offerOK = this.requestQueue.offer(nextReq);            if (!offerOK) {                log.warn("never expected here, add a request to preallocate queue failed");            }            canSubmitRequests--;        }                AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;        if (nextNextPutOK) {            if (canSubmitRequests <= 0) {                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());                this.requestTable.remove(nextNextFilePath);            } else {                boolean offerOK = this.requestQueue.offer(nextNextReq);                if (!offerOK) {                    log.warn("never expected here, add a request to preallocate queue failed");                }            }        }        if (hasException) {            log.warn(this.getServiceName() + " service has exception. so return null");            return null;        }        AllocateRequest result = this.requestTable.get(nextFilePath);        try {                      if (result != null) {                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);                if (!waitOK) {                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());                    return null;                } else {                    this.requestTable.remove(nextFilePath);                    return result.getMappedFile();                }            } else {                log.error("find preallocate mmap failed, this never happen");            }        } catch (InterruptedException e) {            log.warn(this.getServiceName() + " service has exception. ", e);        }        return null;    }

而真正去执行文件操作的就是 AllocateMappedFileService的 run 办法

public void run() {        log.info(this.getServiceName() + " service started");        while (!this.isStopped() && this.mmapOperation()) {        }        log.info(this.getServiceName() + " service end");    }private boolean mmapOperation() {        boolean isSuccess = false;        AllocateRequest req = null;        try {                      req = this.requestQueue.take();            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());            if (null == expectedRequest) {                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "                    + req.getFileSize());                return true;            }            if (expectedRequest != req) {                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);                return true;            }            if (req.getMappedFile() == null) {                long beginTime = System.currentTimeMillis();                MappedFile mappedFile;                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {                    try {                                              mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());                    } catch (RuntimeException e) {                        log.warn("Use default implementation.");                                              mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());                    }                } else {                                      mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());                }                long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);                if (eclipseTime > 10) {                    int queueSize = this.requestQueue.size();                    log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize                        + " " + req.getFilePath() + " " + req.getFileSize());                }                                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()                    .getMapedFileSizeCommitLog()                    &&                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());                }                req.setMappedFile(mappedFile);                this.hasException = false;                isSuccess = true;            }        } catch (InterruptedException e) {            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");            this.hasException = true;            return false;        } catch (IOException e) {            log.warn(this.getServiceName() + " service has exception. ", e);            this.hasException = true;            if (null != req) {                requestQueue.offer(req);                try {                    Thread.sleep(1);                } catch (InterruptedException ignored) {                }            }        } finally {            if (req != null && isSuccess)                              req.getCountDownLatch().countDown();        }        return true;    }