SpringBoot集成RabbitMQ的方法(死信队列)

网友投稿 274 2023-01-07


SpringBoot集成RabbitMQ的方法(死信队列)

介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:

1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false

2.队列达到最大长度

3.消息TTL过期

场景

1.小时进入初始队列,等待30分钟后进入5分钟队列

2.消息等待5分钟后进入执行队列

3.执行失败后重新回到5分钟队列

4.失败5次后,消息进入2小时队列

5.消息等待2小时进入执行队列

6.失败5次后,将消息丢弃或做其他处理

使用

安装MQ

使用docker方式安装,选择带mangement的版本

docker pull rabbitmq:management

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

项目配置

(1)创建springboot项目

(2)在application.properties配置文件中配置mq连接信息

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

(3)队列配置

package com.df.ps.mq;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.lqTkkXistener.SimpleMessageListenerContainer;

import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;

import org.springframework.beans.factory.annotation.Autowire;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class MqConfig {

//time

@Value("${spring.df.buffered.min:120}")

private int springdfBufferedTime;

@Value("${spring.df.high-buffered.min:5}")

private int springdfHighBufferedTime;

@Value("${spring.df.low-buffered.min:120}")

private int springdfLowBufferedTime;

// 30min Buffered Queue

@Value("${spring.df.queue:spring-df-buffered-queue}")

private String springdfBufferedQueue;

@Value("${spring.df.topic:spring-df-buffered-topic}")

private String springdfBufferedTopic;

@Value("${spring.df.route:spring-df-buffered-route}")

private String springdfBufferedRouteKey;

// 5M Buffered Queue

@Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")

private String springdfHighBufferedQueue;

@Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")

private String springdfHighBufferedTopic;

@Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")

private String springdfHighBufferedRouteKey;

// High Queue

@Value("${spring.df.high.queue:spring-df-high-queue}")

private String springdfHighQueue;

@Value("${spring.df.high.topic:spring-df-high-topic}")

private String springdfHighTopic;

@Value("${spring.df.high.route:spring-df-high-route}")

private String springdfHighRouteKey;

// 2H Low Buffered Queue

@Value("${spring.df.low-bufferehttp://d.queue:spring-df-low-buffered-queue}")

private String springdfLowBufferedQueue;

@Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")

private String springdfLowBufferedTopic;

@Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")

private String springdfLowBufferedRouteKey;

// Low Queue

@Value("${spring.df.low.queue:spring-df-low-queue}")

private String springdfLowQueue;

@Value("${spring.df.low.topic:spring-df-low-topic}")

private String springdfLowTopic;

@Value("${spring.df.low.route:spring-df-low-route}")

private String springdfLowRouteKey;

@Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")

Queue springdfBufferedQueue() {

int bufferedTime = 1000 * 60 * springdfBufferedTime;

return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")

Queue springdfHighBufferedQueue() {

int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;

return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")

Queue springdfHighQueue() {

return new Queue(springdfHighQueue, true);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")

Queue springdfLowBufferedQueue() {

int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;

return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")

Queue springdfLowQueue() {

return new Queue(springdfLowQueue, true);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")

TopicExchange springdfBufferedTopic() {

return new TopicExchange(springdfBufferedTopic);

}

@Bean

Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {

return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")

TopicExchange springdfHighBufferedTopic() {

return new TopicExchange(springdfHighBufferedTopic);

}

@Bean

Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {

return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")

TopicExchange springdfHighTopic() {

return new TopicExchange(springdfHighTopic);

}

@Bean

Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {

return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")

TopicExchange springdfLowBufferedTopic() {

return new TopicExchange(springdfLowBufferedTopic);

}

@Bean

Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {

return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);

}

@Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")

TopicExchange springdfLowTopic() {

return new TopicExchange(springdfLowTopic);

}

@Bean

Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {

return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);

}

@Bean

SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,

MessageListenerAdapter listenerAdapter) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

container.setQueueNames(springdfHighQueue, springdfLowQueue);

container.setMessageListener(listenerAdapter);

return container;

}

@Bean

MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {

MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);

adapter.setDefaultListenerMethod("receive");

Map queueOrTagToMethodName = new HashMap<>();

queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");

queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");

adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);

return adapter;

}

private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {

Map args = new HashMap<>();

args.put("x-dead-letter-exchange", topic);

args.put("x-dead-letter-routing-key", routeKey);

args.put("x-message-ttl", bufferedTime);

// 是否持久化

boolean durable = true;

// 仅创建者可以使用的私有队列,断开后自动删除

boolean exclusive = false;

// 当所有消费客户端连接断开后,是否自动删除队列

boolean autoDelete = false;

return new Queue(queueName, durable, exclusive, autoDelete, args);

}

}

消费者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

@Value("${high-retry:5}")

private int highRetry;

@Value("${low-retry:5}")

private int lowRetry;

@Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")

private String springdfHighBufferedTopic;

@Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")

private String springdfHighBufferedRouteKey;

@Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")

private String springdfLowBufferedTopic;

@Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")

private String springdfLowBufferedRouteKey;

private final RabbitTemplate rabbitTemplate;

@Autowired

public MqReceiver(RabbitTemplate rabbitTemplate) {

this.rabbitTemplate = rabbitTemplate;

}

public void receive(Object message) {

if (logger.isInfoEnabled()) {

logger.info("default receiver: " + message);

}

}

/**

* 消息从初始队列进入5分钟的高速缓冲队列

* @param message

*/

public void highReceiver(Object message){

ObjectMapper mapper = new ObjectMapper();

Map msg = mapper.convertValue(message, Map.class);

try{

logger.info("这里做消息处理...");

}catch (Exception e){

int times = msg.get("times") == null ? 0 : (int) msg.get("times");

if (times < highRetry) {

msg.put("times", times + 1);

rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);

} else {

msg.put("times", 0);

rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);

}

}

}

/**

* 消息从5分钟缓冲队列进入2小时缓冲队列

* @param message

*/

public void lowReceiver(Object message){

ObjectMapper mapper = new ObjectMapper();

Map msg = mapper.convertValue(message, Map.class);

try {

logger.info("这里做消息处理...");

}catch (Exception e){

int times = msg.get("times") == null ? 0 : (int) msg.get("times");

if (times < lowRetry) {

rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);

}else{

logger.info("消息无法被消费...");

}

}

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JAVA发送http get/post请求,调用http接口、方法详解
下一篇:自学接口测试工作怎么样(自学接口测试工作怎么样啊)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~