SpringBoot整合RabbitMQ处理死信队列和延迟队列(springboot整合rabbitmq的持久化)

网友投稿 513 2022-07-27


目录简http://介实例代码路由配置控制器发送器接收器application.yml实例测试

简介

说明

本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列。

RabbitMQ消息简介

RabbitMQ的消息默认不会超时。

什么是死信队列?什么是延迟队列?

死信队列:

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

以下几种情况会导致消息变成死信:

消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;消息过期;队列达到最大长度。

延迟队列:

延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

相关网址

详解RabbitMQ中死信队列和延迟队列的使用详解

实例代码

路由配置

package com.example.config;

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitRouterConfig {

public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";

public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute";

public static final String EXCHANGE_TOPIC_DELAY = "Exchange@topic.delay";

public static final String ROUTINGKEY_HELLOS = "hello.#";

public static final String ROUTINGKEY_DELAY = "delay.#";

public static final String QUEUE_HELLO = "Queue@hello";

public static final String QUEUE_HI = "Queue@hi";

public static final String QUEUE_UNROUTE = "Queue@unroute";

public static final String QUEUE_DELAY = "Queue@delay";

public static final Integer TTL_QUEUE_MESSAGE = 5000;

@Autowired

AmqpAdmin amqpAdmin;

@Bean

Object initBindingTest() {

amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build());

amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build());

amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)

.durable(true)

.autoDelete()

.withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE)

.build());

amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());

amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)

.withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY)

.withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY)

.withArgument("x-message-ttl", TTL_QUEUE_MESSAGE)

.build());

amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());

amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());

amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,

EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));

amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,

EXCHANGE_FANOUT_UNROUTE, "", null));

amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,

EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null));

return new Object();

}

}

控制器

package com.example.controller;

import com.example.config.RabbitRouterConfig;

import com.example.mq.Sender;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

@RestController

public class HelloController {

@Autowired

private Sender sender;

@PostMapping("/hi")

public void hi() {

sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());

}

@PostMapping("/hello1")

public void hello1() {

sender.send("hello.a", "hello1 message:" + LocalDateTime.now());

}

@PostMapping("/hello2")

public void hello2() {

sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());

}

@PostMapping("/ae")

public void aeTest() {

sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());

}

}

发送器

package com.example.mq;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.util.Date;

@Component

public class Sender {

@Autowired

private AmqpTemplate rabbitTemplate;

public void send(String routingKey, String message) {

this.rabbitTemplate.convertAndSend(routingKey, message);

}

public void send(String exchange, String routingKey, String message) {

this.rabbitTemplate.convertAndSend(exchange, routingKey, message);

}

}

接收器

package com.example.mq;

import com.example.config.RabbitRouterConfig;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class Receiver {

@RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)

public void hi(String payload) {

System.out.println ("Receiver(hi) : " + payload);

}

XfTgnyW // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)

// public void hello(String hello) throws InterruptedException {

// System.out.println ("Receiver(hello) : " + hello);

// Thread.sleep(5 * 1000);

// System.out.println("(hello):sleep over");

// }

//

// @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)

// public void unroute(String hello) throws InterruptedException {

// System.out.println ("Receiver(unroute) : " + hello);

// Thread.sleep(5 * 1000);

// System.out.println("(unroute):sleep over");

// }

@RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY)

public void delay(String hello) throws InterruptedException {

System.out.println ("Receiver(delay) : " + hello);

Thread.sleep(5 * 1000);

System.out.println("(delay):sleep over");

}

}

application.yml

server:

# port: 9100

port: 9101

spring:

application:

# name: demo-rabbitmq-sender

name: demo-rabbitmq-receiver

rabbitmq:

host: localhost

port: 5672

username: admin

password: 123456

# virtualHost: /

publisher-confirms: true

publisher-returns: true

# listener:

# simple:

# acknowledge-mode: manual

# direct:

# acknowledge-mode: manual

实例测试

分别启动发送者和接收者。

访问:http://localhost:9100/hello2

五秒钟后输出:

Receiver(delay) : hello2 message:2020-11-27T09:30:51.548(delay):sleep over

以上就是SpringBoot整合RabbitMQ处理死信队列和延迟队列的详细内容,更多关于SpringBoot RabbitMQ死信队列 延迟队列的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SpringBoot2底层注解@Import用法详解(springboot import注解执行时间)
下一篇:浅析SpringBoot2底层注解@Conditional@ImportResource(springboot的三个注解)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~