Springboot整合RabbitMq测试TTL的方法详解

网友投稿 302 2022-08-24


Springboot整合RabbitMq测试TTL的方法详解

目录什么是TTL?如何设置TTL?设定整个队列的过期时间配置类编写测试配置测试总结代码下载

什么是TTL?

在RabbitMq中,存在一种高级特性 TTL。

TTL即Time To Live的缩写,含义为存活时间或者过期时间。即:

设定消息在队列中存活的时间。当指定时间内,消息依旧未被消费,则由队列自动将其删除。

如何设置TTL?

既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式:

设置整个队列的过期时间。设置单个消息的过期时间。

设定整个队列的过期时间

按照上一篇文章的依赖导入和配置编写方式进行。

Springboot——整合Rabbitmq之ConfirmIimBoM和Return详解

配置类编写

在原有基础之上,新创建几个配置的bean类,申明bean对象,并进行交换机与队列的关联,如下所示:、

package cn.linkpower.config;

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class MQConfiguration {

// =========================== Direct 直连模式 ==================================

//队列名称

public static final String QUEUQ_NAME = "xiangjiao.queue";

//交换器名称

public static final String EXCHANGE = "xiangjiao.exchange";

//路由key

public static final String ROUTING_KEY = "xiangjiao.routingKey";

// =========================== Direct 普通队列申明 和 交换机绑定 ===================

//创建队列

@Bean(value = "getQueue")

public Queue getQueue(){

//QueueBuilder.durable(QUEUQ_NAME).build();

return new Queue(QUEUQ_NAME);

}

//实例化交换机

@Bean(value = "getDirectExchange")

public DirectExchange getDirectExchange(){

//DirectExchange(String name, boolean durable, boolean autoDelete)

/**

* 参数一:交换机名称;

* 参数二:是否永久;

* 参数三:是否自动删除;

*/

//ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

return new DirectExchange(EXCHANGE, true, false);

//绑定消息队列和交换机

@Bean

public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange") DirectExchange exchange,

@Qualifier(value = "getQueue") Queue queue){

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);

// =========================== TTL ================================

public static final String ttl_queue_name = "xiangjiao.ttl.queue";

public static final String ttl_exchange_name = "xiangjiao.ttl.exchange";

public static final String ttl_routing_key = "xiangjiao.ttl.routingKey";

@Bean(value = "getTtlQueue")

public Queue getTtlQueue(){

// 设置 ttl 队列,并设定 x-message-ttl 参数,表示 消息存活最大时间,单位 ms

//return QueueBuilder.durable(ttl_queue_name).withArgument("x-message-ttl",10000).build();

Map arguments = new HashMap<>();

arguments.put("x-message-ttl",10000);

return new Queue(ttl_queue_name,true,false,false,arguments);

@Bean(value = "getTTlExchange")

public DirectExchange getTTlExchange(){

// 设置交换机属性,并保证交换机持久化

return new DirectExchange(ttl_exchange_name, true, false);

public Binding bindExchangeAndQueueTTL(@Qualifier(value = "getTTlExchange") DirectExchange getTTlExchange,

@Qualifier(value = "getTtlQueue") Queue queue){

return BindingBuilder.bind(queue).to(getTTlExchange).with(ttl_routing_key);

}

对比原有的配置类,不难发现区别:

对队列设置过期属性,只需要传递一个 x-message-ttl 的属性值即可。(单位:ms)

Map arguments = new HashMap<>();

arguments.put("x-message-ttl",10000);

return new Queue(ttl_queue_name,true,false,false,arguments);

然后定义交换机类型,并将指定的交换机和队列进行绑定。

为了测试效果,暂未定义任何该队列的消费者信息。

测试

为了便于测试,需要定义一个接口,生产新的数据信息,并将数据向对应的Exchange中传递。

/**

* 发送消息,指定ttl参数信息(队列)

* @return

*/

@RequestMapping("/sendQueueTtl")

@ResponseBody

public String sendQueueTtl(){

//发送10条消息

for (int i = 0; i < 10; i++) {

String msg = "msg"+i;

System.out.println("发送消息 msg:"+msg);

rabbitmqService.sendMessage(MQConfiguration.ttl_exchange_name,MQConfiguration.ttl_routing_key,msg);

//每两秒发送一次

try {

Thread.sleep(8000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

return "send ok";

}

两条消息之间的过期时间为8s。

请求链接进行测试,查看Rabbitmq web视图信息:

httIimBoMp://localhost/sendQueueTtl

查看控制台输出日志:

消息正常发送到了Exchange,同时Exchange 也将消息推送到了指定的队列 !

设置有Confirm和Return监听。

【说明:】

给队列设定时间后,单位时间内的消息如果未被消费,则队列会将其中的数据进行删除处理。

对单个消息设定过期时间

上面的操作和测试,已经验证对队列设定过期时间,会导致所有的消息过期时间都是一样的现象。

但实际开发中,可能一个队列需要存放不同过期时间的消息信息,如果需要进行实现,就不能再设定队列的过期时间信息了,需要采取下面要说到的针对单个消息,设置不同过期时间。

配置

既然是针对单个消息设定不同的过期时间操作,则需要去掉队列过期设置。

为了测试的简单化,此处采取直连 Direct 交换机类型,进行交换机和队列数据的绑定方式。如下所示:

// =========================== Direct 直连模式 ==================================

//队列名称

public static final String QUEUQ_NAME = "xiangjiao.queue";

//交换器名称

public static final String EXCHANGE = "xiangjiao.exchange";

//路由key

public static final String ROUTING_KEY = "xiangjiao.routingKey";

// =========================== Direct 普通队列申明 和 交换机绑定 ===================

//创建队列

@Bean(value = "getQueue")

public Queue getQueue(){

//QueueBuilder.durable(QUEUQ_NAME).build();

return new Queue(QUEUQ_NAME);

}

//实例化交换机

@Bean(value = "getDirectExchange")

public DirectExchange getDirectExchange(){

//DirectExchange(String name, boolean durable, boolean autoDelete)

/**

* 参数一:交换机名称;

* 参数二:是否永久;

* 参数三:是否自动删除;

*/

//ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

return new DirectExchange(EXCHANGE, true, false);

}

//绑定消息队列和交换机

@Bean

public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange") DirectExchange exchange,

@Qualifier(value = "getQueue") Queue queue){

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);

}

对于消息的发送,依旧沿用之前写的发送处理方式

设定confirm和return监听,保证消息能够正常到达指定的队列中。

测试

编写一个测试的接口,设定单个消息的过期时间属性,保证不同消息具备不同的过期时间。

在之前博客中,针对消息的持久化设置,需要保证消息向队列设定属性时,传递一个deliveryMode参数值信息。

同理,设定每个消息的过期时间,也需要设定对应的属性信息。如下所示:

/**

* 发送消息,指定ttl参数信息(单个消息);

* 测试需要将消息消费者关闭监听

* @return

*/

@RequestMapping("/sendTtl")

@ResponseBody

public String sendTtl(){

//发送10条消息

for (int i = 0; i < 10; i++) {

String msg = "msg"+i;

System.out.println("发送消息 msg:"+msg);

MessageProperties messageProperties = new MessageProperties();

messageProperties.setExpiration("5000"); // 针对消息设定时限

// 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中

Message message = new Message(msg.getBytes(), messageProperties);

rabbitmqService.sendMessage(MQConfiguration.EXCHANGE, MQConfiguration.ROUTING_KEY,message);

//每两秒发送一次

try {

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

return "send ok";

}

上面代码的编写核心为将消息内容体和消息对象属性进行封装。

MessageProperties messageProperties = new MessageProperties();

messageProperties.setExpiration("5000"); // 针对消息设定时限

// 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中

Message message = new Message(msg.getBytes(), messageProperties);

引申一点:消息的持久化Springboot 2.x ——RabbitTemplate为什么会默认消息持久化?

请求连接进行测试:

http://localhost/sendTtl

查看控制台打印日志情况:

总结

1、设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

2、设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

3、如果两者都进行了设置,以时间短的为准。

代码下载

gitee 代码下载


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:python_缺失值处理(python缺失值处理众数)
下一篇:python_rename(pythonrename函数)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~