本文应用「署名 4.0 国内 (CC BY 4.0)」许可协定,欢送转载、或从新批改应用,但须要注明起源。[署名 4.0 国内 (CC BY 4.0)]
本文作者: Nicksxs
创立工夫: 2021-09-12
本文链接: 聊一下 RocketMQ 的音讯存储二
CommitLog 是 rocketmq 的服务端,也就是 broker 存储音讯的的文件,跟 kafka 一样,也是程序写入,当然音讯是变长的,生成的规定是每个文件的默认 1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字长度为 20 位,右边补零,残余为起始偏移量;比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1 073 741 824Byte;当这个文件满了,第二个文件名字为 00000000001073741824,起始偏移量为 1073741824, 音讯存储的时候会程序写入文件,当文件满了则写入下一个文件,代码中的定义
private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
本地跑个 demo 验证下,也是这样,这里微妙有几个比拟奇妙的点(个人观点),首先文件就刚好是 1G,并且依照大小偏移量去生成下一个文件,这样获取音讯的时候按大小算一下就晓得在哪个文件里了,
代码中写入 CommitLog 的逻辑能够从这开始看
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {msg.setStoreTimestamp(System.currentTimeMillis());
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {if (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock();
try {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
if (null == mappedFile) {log.error("create mapped file1 error, topic:" + msg.getTopic() + "clientAddr:" + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {log.error("create mapped file2 error, topic:" + msg.getTopic() + "clientAddr:" + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {putMessageLock.unlock();
}
if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
后面也看到在 CommitLog 目录下是有大小为 1G 的文件组成,在实现逻辑中,其实是通过 org.apache.rocketmq.store.MappedFileQueue
,外部是存的一个 MappedFile
的队列,对于写入的场景每次都是通过org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()
获取最初一个文件,如果还没有创立,或者最初这个文件曾经满了,那就调用 org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long)
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
首先看下间接创立的,
public MappedFile(final String fileName, final int fileSize) throws IOException {init(fileName, fileSize);
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {log.error("create file channel" + this.fileName + "Failed.", e);
throw e;
} catch (IOException e) {log.error("map file" + this.fileName + "Failed.", e);
throw e;
} finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();
}
}
}
如果是提交给 AllocateMappedFileService
的话就用到了一些异步操作
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) {canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
}
}
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error," +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file," +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
if (hasException) {log.warn(this.getServiceName() + "service has exception. so return null");
return null;
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {if (result != null) {boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {log.warn("create mmap timeout" + result.getFilePath() + " " + result.getFileSize());
return null;
} else {this.requestTable.remove(nextFilePath);
return result.getMappedFile();}
} else {log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {log.warn(this.getServiceName() + "service has exception.", e);
}
return null;
}
而真正去执行文件操作的就是 AllocateMappedFileService
的 run 办法
public void run() {log.info(this.getServiceName() + "service started");
while (!this.isStopped() && this.mmapOperation()) { }
log.info(this.getServiceName() + "service end");
}
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {log.warn("this mmap request expired, maybe cause timeout" + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {log.warn("never expected here, maybe cause timeout" + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
if (req.getMappedFile() == null) {long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms)" + eclipseTime + "queue size" + queueSize
+ "" + req.getFilePath() +" " + req.getFileSize());
}
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMapedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {log.warn(this.getServiceName() + "interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {log.warn(this.getServiceName() + "service has exception.", e);
this.hasException = true;
if (null != req) {requestQueue.offer(req);
try {Thread.sleep(1);
} catch (InterruptedException ignored) {}}
} finally {if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}