RocketMQ设计之同步刷盘

网友投稿 349 2022-08-19


RocketMQ设计之同步刷盘

同步刷盘方式:在返回http://写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件。

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {

// Synchronization flush

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {

final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;

if (messageExt.isWaitStoreMsgOK()) {

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

service.putRequest(request);

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

if (!flushOK) {

log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()

+ " client address: " + messageExt.getBornHostString());

putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);

}

} else {

service.wakeup();

}

}

// Asynchronous flush

else {

if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {

flushCommitLogService.wakeup();

} else {

commitLogService.wakeup();

}

}

}

class GroupCommitService extends FlushCommitLogService {

private volatile List requestsWrite = new ArrayList();

private volatile List requestsRead = new ArrayList();

//提交刷盘任务到任务列表

public synchronized void putRequest(final GroupCommitRequest request) {

synchronized (this.requestsWrite) {

this.requestsWrite.add(request);

}

if (hasNotified.compareAndSet(false, true)) {

waitPoint.countDown(); // notify

}

}

private void swapRequests() {

List tmp = this.requestsWrite;

this.requestsWrite = this.requestsRead;

this.requestsRead = tmp;

}

private void doCommit() {

synchronized (this.requestsRead) {

if (!this.requestsRead.isEmpty()) {

for (GroupCommitRequest req : this.requestsRead) {

// There may be a message in the next file, so a maximum of

// two times the flush

boolean flushOK = false;

for (int i = 0; i < 2 && !flushOK; i++) {

flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

if (!flushOK) {

CommitLog.this.mappedFileQueue.flush(0);

}

}

req.wakeupCustomer(flushOK);

}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();

if (storeTimestamp > 0) {

CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicmsgTimestamp(storeTimestamp);

}

this.requestsRead.clear();

} else {

// Because of individual messages is set to not sync flush, it

// will come to this process

CommitLog.this.mappedFileQueue.flush(0);

}

}

}

public void run() {

CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {

try {

this.waitForRunning(10);

this.doCommit();

} catch (Exception e) {

CommitLog.lohttp://g.warn(this.getServiceName() + " service has exception. ", e);

}

}

// Under normal circumstances shutdown, wait for the arrival of the

// request, and then flush

try {

Thread.sleep(10);

} catch (InterruptedException e) {

CommitLog.log.warn("GroupCommitService Exception, ", e);

}

synchronized (this) {

this.swapRequests();

}

this.doCommit();

CommitLog.log.info(this.getServiceName() + " service end");

}

@Override

protected void onWaitEnd() {

this.swapRequests();

}

@Override

public String getServiceName() {

return GroupCommitService.class.getSimpleName();

}

@Override

public long getJointime() {

return 1000 * 60 * 5;

}

}

GroupCommitRequest是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程

GroupCommitService每隔10毫秒写一批数据到磁盘。之所以不直接写是磁盘io压力大,写入性能低,每隔10毫秒写一次可以提升磁盘io效率和写入性能。

putRequest(request) 提交刷盘任务到任务列表request.waitForFlush同步等待GroupCommitService将任务列表中的任务刷盘完成。

两个队列读写分离,requestsWrite是写队列,用户保存添加进来的刷盘任务,requestsRead是读队列,在刷盘之前会把写队列的数据放入读队列。

CommitLog的doCommit方法:

private void doCommit() {

synchronized (this.requestsRead) {

if (!this.requestsRead.isEmpty()) {

for (GroupCommitRequest req : this.requestsRead) {

// There may be a message in the next file, so a maximum of

// two times the flush

boolean flushOK = false;

for (int i = 0; i < 2 && !flushOK; i++) {

//根据offset确定是否已经刷盘

flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

if (!flushOK) {

CommitLog.this.mappedFileQueue.flush(0);

}

}

req.wakeupCustomer(flushOK);

}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();

if (storeTimestamp > 0) {

CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

}

//清空已刷盘的列表

this.requestsRead.clear();

} else {

// Because of individual messages is set to not sync flush, it

// will come to this process

CommitLog.this.mappedFileQueue.flush(0);

}

}

}

刷盘的时候依次读取requestsRead中的数据写入磁盘,写入完成后清空requestsRead。

读写分离设计的目的是在刷盘时不影响任务提交到列表。

CommitLog.this.mappedFileQueue.flush(0);是刷盘操作:

public boolean flush(final int flushLeastPages) {

boolean result = true;

MappedFile mappedFile =sxWiVxJoBY this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

if (mappedFile != null) {

long tmpTimeStamp = mappedFile.getStoreTimestamp();

int offset = mappedFile.flush(flushLeastPages);

long where = mappedFile.getFileFromOffset() + offset;

result = where == this.flushedWhere;

this.flushedWhere = where;

if (0 == flushLeastPages) {

this.storeTimestamp = tmpTimeStamp;

}

}

return result;

}

通过MappedFile映射的CommitLog文件写入磁盘

这就是RocketMQ高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而GroupCommitService每隔10毫秒写一批数据到磁盘。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java超详细分析泛型与通配符
下一篇:spring配置不扫描service层的原因解答
相关文章