消息队列 RabbitMQ 与 Spring 整合使用的实例代码

网友投稿 182 2023-04-20


消息队列 RabbitMQ 与 Spring 整合使用的实例代码

一、什么是 RabbitMQ

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包。

二、如何与 Spring 集成

1. 我们都需要哪些 Jar 包?

抛开单独使用 Spring 的包不说,引入 RabbitMQ 我们还需要两个:

com.rabbitmq

amqp-client

3.5.1

org.springframework.amqp

spring-rabbit

1.4.5.RELEASE

2. 使用外部参数文件 application.properties:

mq.host=127.0.0.1

mq.username=queue

mq.password=1234

mq.port=8001

# 统一XML配置中易变部分的命名

mq.queue=test_mq

易变指的是在实际项目中,如果测试与生产环境使用的同一个 RabbitMQ 服务器。那我们在部署时直接修改 properties 文件的参数即可,防止测试与生产环境混淆。

修改 applicationContext.xml 文件,引入我们创建的 properties 文件

3. 连接 RabbitMQ 服务器

password="${mq.password}" port="${mq.port}" />

4. 声明一个 RabbitMQ Template

复制代码 代码如下:

5. 在 applicationContext.xml 中声明一个交换机,name 属性需配置到 RabbitMQ 服务器。

交换机的四种模式:

direct:转发消息到 routigKey 指定的队列。

topic:按规则转发消息(最灵活)。

headers:(这个还没有接触到)

fanout:转发消息到所有绑定队列

交换器的属性:

持久性:如果启用,交换器将会在server重启前都有效。

自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。

惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。

如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。

一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。

topic 类型交换器通过模式匹配分析消息的 routing-key 属性。它将 routing-key 和 binding-key 的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配 routing key:usd.stcok 和 eur.stock.db,但是不匹配 stock.nana。

因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。

6. 在 applicationContext.xml 中声明一个队列,name 属性是需要配置到 RabbitMQ 服务器的。

复制代码 代码如下:

durable:是否持久化

exclusive:仅创建者可以使用的私有队列,断开后自动删除

auto-delete:当所有消费端连接断开后,是否自动删除队列

7. 创建生产者端

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

/**

* @Description: 消息队列发送者

* @Author:

* @CreateTime:

*/

@Service

public class Producer {

@Autowired

private AmqpTemplate amqpTemplate;

public void sendQueue(String exchange_key, String queue_key, Object object) {

// convertAndSend 将java对象转换为消息发送至匹配key的交换机中Exchange

amqpTemplate.convertAndSend(exchange_key, queue_key, object);

}

}

8. 在 applicationContext.xml 中配置监听及消费者端

  

  

消费者 Java 代码:

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageListener;

public class RabbitmqService implements MessageListener {

public void onMessage(Message message) {

System.out.println("消息消费者 = " + message.toString());

}

}

至此,我们的所有配置文件就写完了,最终如下:

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xmlns:context="http://springframework.org/schema/context"

xmlns:util="http://springframework.org/schema/util"

xmlns:aop="http://springframework.org/schema/aop"

xmlns:tx="http://springframework.org/schema/tx"

xmlns:rabbit="http://springframework.org/schema/rabbit"

xmlns:p="http://springframework.org/schema/p"

xsi:schemaLocation="

http://springframework.org/schema/context

http://springframework.org/schema/context/spring-context-3.0.xsd

http://springframework.org/schema/beans

http://springframework.org/schema/beans/spring-beans-3.0.xsd

http://springframework.org/schema/util

http://springframework.org/schema/util/spring-util-3.0.xsd

http://springframework.org/schema/aop

http://springframework.org/schema/aop/spring-aop-3.0.xsd

http://springframework.org/schema/tx

http://springframework.org/schema/tx/spring-tx-3.0.xsd

http://springframework.org/schema/rabbit

http://springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

password="${mq.password}" port="${mq.port}" />

9. 如何使用 RabbitMQ 发送一个消息

@Autowired

private Producer producer;

@Value("#{appConfig['mq.queue']}")

private String queueId;

/**

* @Description: 消息队列

* @Author:

* @CreateTime:

*/

@ResponseBody

@RequestMapping("/sendQueue")

public String testQueue() {

try {

Map map = new HashMap();

map.put("data", "hello rabbitmq");

producer.sendQueue(queueId + "_exchange", queueId + "_patt", map);

} catch (Exception e) {

e.printStackTrace();

}

return "发送完毕";

}

嗯。这个测试是 SpringMVC 框架。

password="${mq.password}" port="${mq.port}" />

4. 声明一个 RabbitMQ Template

复制代码 代码如下:

5. 在 applicationContext.xml 中声明一个交换机,name 属性需配置到 RabbitMQ 服务器。

交换机的四种模式:

direct:转发消息到 routigKey 指定的队列。

topic:按规则转发消息(最灵活)。

headers:(这个还没有接触到)

fanout:转发消息到所有绑定队列

交换器的属性:

持久性:如果启用,交换器将会在server重启前都有效。

自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。

惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。

如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。

一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。

topic 类型交换器通过模式匹配分析消息的 routing-key 属性。它将 routing-key 和 binding-key 的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配 routing key:usd.stcok 和 eur.stock.db,但是不匹配 stock.nana。

因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。

6. 在 applicationContext.xml 中声明一个队列,name 属性是需要配置到 RabbitMQ 服务器的。

复制代码 代码如下:

durable:是否持久化

exclusive:仅创建者可以使用的私有队列,断开后自动删除

auto-delete:当所有消费端连接断开后,是否自动删除队列

7. 创建生产者端

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

/**

* @Description: 消息队列发送者

* @Author:

* @CreateTime:

*/

@Service

public class Producer {

@Autowired

private AmqpTemplate amqpTemplate;

public void sendQueue(String exchange_key, String queue_key, Object object) {

// convertAndSend 将java对象转换为消息发送至匹配key的交换机中Exchange

amqpTemplate.convertAndSend(exchange_key, queue_key, object);

}

}

8. 在 applicationContext.xml 中配置监听及消费者端

  

  

消费者 Java 代码:

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageListener;

public class RabbitmqService implements MessageListener {

public void onMessage(Message message) {

System.out.println("消息消费者 = " + message.toString());

}

}

至此,我们的所有配置文件就写完了,最终如下:

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xmlns:context="http://springframework.org/schema/context"

xmlns:util="http://springframework.org/schema/util"

xmlns:aop="http://springframework.org/schema/aop"

xmlns:tx="http://springframework.org/schema/tx"

xmlns:rabbit="http://springframework.org/schema/rabbit"

xmlns:p="http://springframework.org/schema/p"

xsi:schemaLocation="

http://springframework.org/schema/context

http://springframework.org/schema/context/spring-context-3.0.xsd

http://springframework.org/schema/beans

http://springframework.org/schema/beans/spring-beans-3.0.xsd

http://springframework.org/schema/util

http://springframework.org/schema/util/spring-util-3.0.xsd

http://springframework.org/schema/aop

http://springframework.org/schema/aop/spring-aop-3.0.xsd

http://springframework.org/schema/tx

http://springframework.org/schema/tx/spring-tx-3.0.xsd

http://springframework.org/schema/rabbit

http://springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

password="${mq.password}" port="${mq.port}" />

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xmlns:context="http://springframework.org/schema/context"

xmlns:util="http://springframework.org/schema/util"

xmlns:aop="http://springframework.org/schema/aop"

xmlns:tx="http://springframework.org/schema/tx"

xmlns:rabbit="http://springframework.org/schema/rabbit"

xmlns:p="http://springframework.org/schema/p"

xsi:schemaLocation="

http://springframework.org/schema/context

http://springframework.org/schema/context/spring-context-3.0.xsd

http://springframework.org/schema/beans

http://springframework.org/schema/beans/spring-beans-3.0.xsd

http://springframework.org/schema/util

http://springframework.org/schema/util/spring-util-3.0.xsd

http://springframework.org/schema/aop

http://springframework.org/schema/aop/spring-aop-3.0.xsd

http://springframework.org/schema/tx

http://springframework.org/schema/tx/spring-tx-3.0.xsd

http://springframework.org/schema/rabbit

http://springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

password="${mq.password}" port="${mq.port}" />

password="${mq.password}" port="${mq.port}" />

9. 如何使用 RabbitMQ 发送一个消息

@Autowired

private Producer producer;

@Value("#{appConfig['mq.queue']}")

private String queueId;

/**

* @Description: 消息队列

* @Author:

* @CreateTime:

*/

@ResponseBody

@RequestMapping("/sendQueue")

public String testQueue() {

try {

Map map = new HashMap();

map.put("data", "hello rabbitmq");

producer.sendQueue(queueId + "_exchange", queueId + "_patt", map);

} catch (Exception e) {

e.printStackTrace();

}

return "发送完毕";

}

嗯。这个测试是 SpringMVC 框架。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:浅谈angular2 组件的生命周期钩子
下一篇:自动化集成接口系统设计(自动化系统集成技术基本概念)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~