Springboot 配置RabbitMQ文档的方法步骤

网友投稿 398 2023-01-11


Springboot 配置RabbitMQ文档的方法步骤

简介

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

概念:

生产者 消息的产生方,负责将消息推送到消息队列

消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息

队列 消息的寄存器,负责存放生产者发送的消息

交换机 负责根据一定规则分发生产者产生的消息

绑定 完成交换机和队列之间的绑定

模式:

direct:直连模式,用于实例间的任务分发

topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列

headers:适用规则复杂的分发,用headers里的参数表达规则

fanout:分发给所有绑定到该exchange上的队列,忽略routing key

SpringBoot集成RabbitMQ

一、引入maven依赖

org.springframework.boot

spring-boot-starter-amqp

1.5.2.RELEASE

二、配置application.properties

# rabbitmq

spring.rabbitmq.host = dev-mq.a.pa.com

spring.rabbitmq.port = 5672

spring.rabbitmq.username = admin

spring.rabbitmq.password = admin

spring.rabbitmq.virtualHost = /message-test/

三、编写AmqpConfiguration配置文件

package message.test.configuration;

import org.springframework.amqp.core.AcknowledgeMode;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.boot.autoconfigure.amqp.RabbitProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class AmqpConfiguration {

/**

* 消息编码

*/

public static final String MESSAGE_ENCODING = "UTF-8";

public static final String EXCHANGE_ISSUE = "exchange_message_issue";

public static final String QUEUE_ISSUE_USER = "queue_message_issue_user";

public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user";

public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device";

public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city";

public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user";

public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user";

public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device";

public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city";

public static final String EXCHANGE_PUSH = "exchange_message_push";

public static final String QUEUE_PUSH_RESULT = "queue_message_push_result";

@Autowired

private RabbitProperties rabbitProperties;

@Bean

public Queue issueUserQueue() {

return new Queue(QUEUE_ISSUE_USER);

}

@Bean

public Queue issueAllUserQueue() {

return new Queue(QUEUE_ISSUE_ALL_USER);

}

@Bean

public Queue issueAllDeviceQueue() {

return new Queue(QUEUE_ISSUE_ALL_DEVICE);

}

@Bean

public Queue issueCityQueue() {

return new Queue(QUEUE_ISSUE_CITY);

}

@Bean

public Queue pushResultQueue() {

return new Queue(QUEUE_PUSH_RESULT);

}

@Bean

public DirectExchange issueExchange() {

return new DirectExchange(EXCHANGE_ISSUE);

}

@Bean

public DirectExchange pushExchange() {

// 参数1:队列

// 参数2:是否持久化

// 参数3:是否自动删除

return new DirectExchange(EXCHANGE_PUSH, true, true);

}

@Bean

public Binding issueUserQueueBinding(@Qualifier("issueUshttp://erQueue") Queue queue,

@Qualifier("issueExchange") DirectExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER);

}

@Bean

public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue,

@Qualifier("issueExchange") DirectExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER);

}

@Bean

public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue,

@Qualifier("issueExchange") DirectExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE);

}

@Bean

public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue,

@Qualifier("issueExchange") DirectExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY);

}

@Bean

public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue,

@Qualifier("pushExchange") DirectExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).withQueueName();

}

@Bean

public ConnectionFactory defaultConnectionFactory() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

connectionFactory.setHost(rabbitProperties.getHost());

connectionFactory.setPort(rabbitProperties.getPort());

connectionFactory.setUsername(rabbitProperties.getUsername());

connectionFactory.setPassword(rabbitProperties.getPassword());

connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());

return connectionFactory;

}

@Bean

public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(

@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

return factory;

}

@Bean

public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory)

{

return new RabbitTemplate(connectionFactory);

}

}

三、编写生产者

body = jsON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING);

rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE,

AmqpConfiguration.ROUTINGhttp://_KEY_ISSUE_USER, body);

四、编写消费者

@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT)

public void handlePushResult(@Payload byte[] data, Channel channel,

@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:银行研发管理平台建设方案(银行研发管理平台建设方案设计)
下一篇:注解实现接口缓存(清除缓存注解)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~