关于rocketmq:聊一下-RocketMQ-的消息存储二

10次阅读

共计 10735 个字符,预计需要花费 27 分钟才能阅读完成。

本文应用「署名 4.0 国内 (CC BY 4.0)」许可协定,欢送转载、或从新批改应用,但须要注明起源。[署名 4.0 国内 (CC BY 4.0)]
本文作者: Nicksxs
创立工夫: 2021-09-12
本文链接: 聊一下 RocketMQ 的音讯存储二
CommitLog 是 rocketmq 的服务端,也就是 broker 存储音讯的的文件,跟 kafka 一样,也是程序写入,当然音讯是变长的,生成的规定是每个文件的默认 1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字长度为 20 位,右边补零,残余为起始偏移量;比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1 073 741 824Byte;当这个文件满了,第二个文件名字为 00000000001073741824,起始偏移量为 1073741824, 音讯存储的时候会程序写入文件,当文件满了则写入下一个文件,代码中的定义


private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;

本地跑个 demo 验证下,也是这样,这里微妙有几个比拟奇妙的点(个人观点),首先文件就刚好是 1G,并且依照大小偏移量去生成下一个文件,这样获取音讯的时候按大小算一下就晓得在哪个文件里了,

代码中写入 CommitLog 的逻辑能够从这开始看

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {msg.setStoreTimestamp(System.currentTimeMillis());
        
        
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {if (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        putMessageLock.lock(); 
        try {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            
            
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
            }
            if (null == mappedFile) {log.error("create mapped file1 error, topic:" + msg.getTopic() + "clientAddr:" + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {log.error("create mapped file2 error, topic:" + msg.getTopic() + "clientAddr:" + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {putMessageLock.unlock();
        }

        if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        handleDiskFlush(result, putMessageResult, msg);
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

后面也看到在 CommitLog 目录下是有大小为 1G 的文件组成,在实现逻辑中,其实是通过 org.apache.rocketmq.store.MappedFileQueue,外部是存的一个 MappedFile 的队列,对于写入的场景每次都是通过org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile() 获取最初一个文件,如果还没有创立,或者最初这个文件曾经满了,那就调用 org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long)

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        long createOffset = -1;
              
        MappedFile mappedFileLast = getLastMappedFile();

        
        if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);
        }
  
                
        if (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }

        if (createOffset != -1 && needCreate) {String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

              
            if (this.allocateMappedFileService != null) {
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {log.error("create mappedFile exception", e);
                }
            }

            if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

首先看下间接创立的,

public MappedFile(final String fileName, final int fileSize) throws IOException {init(fileName, fileSize);
    }
private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;

        ensureDirOK(this.file.getParent());

        try {this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
          
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {log.error("create file channel" + this.fileName + "Failed.", e);
            throw e;
        } catch (IOException e) {log.error("map file" + this.fileName + "Failed.", e);
            throw e;
        } finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();
            }
        }
    }

如果是提交给 AllocateMappedFileService 的话就用到了一些异步操作

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        int canSubmitRequests = 2;
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) {canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
            }
        }
                
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
        
        if (nextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error," +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                this.requestTable.remove(nextFilePath);
                return null;
            }
          
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");
            }
            canSubmitRequests--;
        }

        
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file," +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                this.requestTable.remove(nextNextFilePath);
            } else {boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }

        if (hasException) {log.warn(this.getServiceName() + "service has exception. so return null");
            return null;
        }

        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {if (result != null) {boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {log.warn("create mmap timeout" + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {this.requestTable.remove(nextFilePath);
                    return result.getMappedFile();}
            } else {log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {log.warn(this.getServiceName() + "service has exception.", e);
        }

        return null;
    }

而真正去执行文件操作的就是 AllocateMappedFileService的 run 办法

public void run() {log.info(this.getServiceName() + "service started");

        while (!this.isStopped() && this.mmapOperation()) { }
        log.info(this.getServiceName() + "service end");
    }
private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {req = this.requestQueue.take();
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            if (null == expectedRequest) {log.warn("this mmap request expired, maybe cause timeout" + req.getFilePath() + " "
                    + req.getFileSize());
                return true;
            }
            if (expectedRequest != req) {log.warn("never expected here,  maybe cause timeout" + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;
            }

            if (req.getMappedFile() == null) {long beginTime = System.currentTimeMillis();

                MappedFile mappedFile;
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    try {mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {log.warn("Use default implementation.");
                      
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }

                long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
                if (eclipseTime > 10) {int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms)" + eclipseTime + "queue size" + queueSize
                        + "" + req.getFilePath() +" " + req.getFileSize());
                }

                
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMapedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                }

                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {log.warn(this.getServiceName() + "interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {log.warn(this.getServiceName() + "service has exception.", e);
            this.hasException = true;
            if (null != req) {requestQueue.offer(req);
                try {Thread.sleep(1);
                } catch (InterruptedException ignored) {}}
        } finally {if (req != null && isSuccess)
              
                req.getCountDownLatch().countDown();
        }
        return true;
    }

正文完
 0