RocketMQ设计之故障规避机制

网友投稿 355 2022-08-19


RocketMQ设计之故障规避机制

NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外

MQFaultStrategy类的:

public class MQFaultStrategy {

private final static InternalLogger log = ClientLogger.getLog();

private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl();

private boolean sendLatencyFaultEnable = false;

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

public long[] getNotAvailableDuration() {

return notAvailableDuration;

}

public void setNotAvailableDuration(final long[] notAvailableDuration) {

this.notAvailableDuration = notAvailableDuration;

}

public long[] getLatencyMax() {

return latencyMax;

}

public void setLatencyMax(final long[] latencyMax) {

this.latencyMax = latencyMax;

}

public boolean isSendLatencyFaultEnable() {

return sendLatencyFaultEnable;

}

public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {

this.sendLatencyFaultEnable = sendLatencyFaultEnable;

}

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

//是否开启故障延迟机制

if (this.sendLatencyFaultEnable) {

try {

int index = tpInfo.getSendWhichQueue().getAndIncrement();

for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {

int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

if (pos < 0)

pos = 0;

MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

//判断Queue是否可用

if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {

if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

return mq;

}

}

final String notBestBrokerrXZQO = latencyFaultTolerance.pickOneAtLeast();

int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

if (writeQueueNums > 0) {

final MessageQueue mq = tpInfo.selectOneMessageQueue();

if (notBestBroker != null) {

mq.setBrokerName(notBestBroker);

mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);

}

return mq;

} else {

latencyFaultTolerance.remove(notBestBroker);

}

} catch (Exception e) {

log.error("Error occurred when selecting message queue", e);

}

return tpInfo.selectOneMessageQueue();

}

//默认轮询

return tpInfo.selectOneMessageQueue(lastBrokerName);

}

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {

if (this.sendLatencyFaultEnable) {

long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);

this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);

}

}

private long computeNotAvailableDuration(final long currentLatency) {

for (int i = latencyMax.length - 1; i >= 0; i--) {

if (currentLatency >= latencyMax[i])

return this.notAvailableDuration[i];

}

return 0;

}

}

在选择查找路由时,选择消息队列的关键步骤:

先按轮询算法选择一个消息队列从故障列表判断该消息队列是否可用

LatencyFaultToleranceImpl中判断是否可用:

@Override

public boolean isAvailable(final String name) {

final FaultItem faultItem = this.faultItemTable.get(name);

if (faultItem != null) {

return faultItem.isAvailable();

}

return true;

}

public boolean isAvailable() {

return (System.currentTimeMillis() - startTimestamp) >= 0;

}

判断是否在故障列表中,不在故障列表中代表可用。在故障列表中判断当前时间是否大于等于故障规避的开始时间startTimestamp

在消息发送结束后和发送出现异常时调用updateFaultItem()方法来更新故障列表,computeNotAvailableDuration()根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker异常、客户端异常都是固定响应时长30s,它们故障周期时长为10分钟。消息发送成功或线程中断异常响应时间在100毫秒以内,故障周期时长为0。

LatencyFaultToleranceImpl类的updateFaultItem方法:

@Override

public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {

FaultItem old = this.faultItemTable.get(name);

if (null == old) {

final FaultItem faultItem = new FaultItem(name);

faultItem.setCurrentLatency(currentLatency);

faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

//加入故障列表

old = this.faultItemTable.putIfAbsent(name, faultItem);

if (old != null) {

old.setCurrentLatency(currentLatency);

old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

}

} else {

old.setCurrentLatency(currentLatency);

old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

}

}

FaultItem存储Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断Queue是否可用


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:spring配置不扫描service层的原因解答
下一篇:使用SpringMVC在redirect重定向的时候携带参数的问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~