RocketMQ

网友投稿 292 2022-10-14


RocketMQ

概述

RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;

预设值的延迟时间间隔为:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;

在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;

broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

CommitLog.putMessage()

//获取消息的sysflag

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

//非事务消息 或 已commit事务消息

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE

|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

// Delay Delivery 判断消息是否设置延迟

if (msg.getDelayTimeLevel() > 0) {

//判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级

if (msg.getDelayTimeLevel() > this.defaultMessageStore.getSchttp://heduleMessageService().getMaxDelayLevel()) {

msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());

}

//延迟消息的topic为 SCHEDULE_TOPIC_XXXX

topic = ScheduleMessageService.SCHEDULE_TOPIC;

//获取延迟级别,一个延迟级别对应一个Queue

queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId

//消息原始的topic,queueid保存到消息的property中

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));

msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);

msg.setQueueId(queueId);

}

}

1、判断消息类型,如果是非事务消息、HCqzNd已commit事务消息,才能处理延迟消息

2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息

3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟http://级别设置为最大级

4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX

5、获取延迟级别,一个延迟级别对应一个Queue

6、消息原始的topic,queueid保存到消息的property中

7、修改消息的topci、queueid

启动延迟消息定时任务

ScheduleMessageService.start()

延迟消息投递


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:RFC2544优化步长测试——信而泰网络测试仪实操
下一篇:BGP路径属性:
相关文章

 发表评论

暂时没有评论,来抢沙发吧~