SpringMVC和rabbitmq集成的使用案例

网友投稿 319 2022-11-06


SpringMVC和rabbitmq集成的使用案例

1.添加maven依赖

com.rabbitmq

amqp-client

3.5.1

org.springframework.amqp

spring-rabbit

1.4.5.RELEASE

2.spring主配置文件中加入rabbitMQ xml文件的配置

3.jdbc配置文件中加入 rabbitmq的链接配置

#rabbitMQ配置

mq.host=localhost

mq.username=donghao

mq.password=donghao

mq.port=5672

mq.vhost=testMQ

4.新建application-mq.xml文件,添加配置信息

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xmlns:rabbit="http://springframework.org/schema/rabbit"

xsi:schemaLocation="http://springframework.org/schema/beans

http://springframework.org/schema/beans/spring-beans-3.0.xsd

http://springframework.org/schema/rabbit

http://springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >

rabbitmq 连接服务配置

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xmlns:rabbit="http://springframework.org/schema/rabbit"

xsi:schemaLocation="http://springframework.org/schema/beans

http://springframework.org/schema/beans/spring-beans-3.0.xsd

http://springframework.org/schema/rabbit

http://springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >

rabbitmq 连接服务配置

5.新增公共入队类

@Service

public class MQProducerImpl{

@Resource

private AmqpTemplate amqpTemplate;

private final static Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);

//公共入队方法

public void sendDataToQueue(String queueKey, Object object) {

try {

amqpTemplate.convertAndSend(queueKey, object);

} catch (Exception e) {

logger.error(e.toString());

}

}

}

6.创建监听类

import java.io.IOException;

import java.util.List;

import javax.annotation.Resource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

import org.springframework.amqp.utils.SerializationUtils;

import org.springframework.stereotype.Component;

import org.springframework.transaction.annotation.Transactional;

import com.cn.framework.domain.BaseDto;

import com.cn.framework.util.ConstantUtils;

import com.cn.framework.util.RabbitMq.producer.MQProducer;

import com.kxs.service.activityService.IActivityService;

import com.kxs.service.messageService.IMessageService;

import com.rabbitmq.client.Channel;

/**

* 活动处理listener

* @author

* @date 2017年6月30日

**/

@Component

public class ActivityListener implements ChannelAwareMessageListener {

private static final Logger log = LoggerFactory.getLogger(ActivityListener.class);

@Override

@Transactional

public void onMessage(Message message,Channel channel) {

}

}

项目启动后 控制台会打印出监听的日志信息 这里写图片描述

结尾:仅供参考,自己用作学习记录,不喜勿喷,共勉!

补充:RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案

RabbitMQ本篇不介绍了,直接描述RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)。

使用了Spring-rabbit 发送消息和接收消息,我们使用的Maven来管理Jar包,在Maven的pom.xml文件中引入jar包

org.springframework.amqp

spring-rabbit

1.3.6.RELEASE

1.实现生产者

第一步:是要设置调用安装RabbitMQ的IP、端口等

配置一个global.properties文件

第二步:通过SpringMVC把global.properties文件读进来

classpath:global.properties

第三步:配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类等,在SpringMVC的配置文件加入下面这些

第四步:实现消息类实体和发送消息

类实体

/**

* 消息

*

*/

public class RabbitMessage implements Serializable

{

private static final long serialVersionUID = -6487839157908352120L;

private Class>[] paramTypes;//参数类型

private String exchange;//交换器

private Object[] params;

private String routeKey;//路由key

public RabbitMessage(){}

public RabbitMessage(String exchange,String routeKey,Object...params)

{

this.params=params;

this.exchange=exchange;

this.routeKey=routeKey;

}

@SuppressWarnings("rawtypes")

public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)

{

this.params=params;

this.exchange=exchange;

this.routeKey=routeKey;

int len=params.length;

Class[] clazzArray=new Class[len];

for(int i=0;i

clazzArray[i]=params[i].getClass();

this.paramTypes=clazzArray;

}

public byte[] getSerialBytes()

{

byte[] res=new byte[0];

ByteArrayOutputStream baos=new ByteArrayOutputStream();

ObjectOutputStream oos;

try {

oos = new ObjectOutputStream(baos);

oos.writeObject(this);

oos.close();

res=baos.toByteArray();

} catch (IOException e) {

e.printStackTrace();

}

return res;

}

public String getRouteKey() {

return routeKey;

}

public String getExchange() {

return exchange;

}

public void setExchange(String exchange) {

this.exchange = exchange;

}

public void setRouteKey(String routeKey) {

this.routeKey = routeKey;

}

public Class>[] getParamTypes() {

return paramTypes;

}

public Object[] getParams() {

return params;

}

}

发送消息

/**

* 生产着

*

*/

public class RmqProducer

{

@Resource

private RabbitTemplate rabbitTemplate;

/**

* 发送信息

* @param msg

*/

public void sendMessage(RabbitMessage msg)

{

try {

System.out.println(rabbitTemplate.getConnectionFactory().getHost());

System.out.println(rabbitTemplate.getConnectionFactory().getPort());

//发送信息

rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);

} catch (Exception e) {

}

}

}

说明:

1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);

源代码中的send调用的方法,一些发送消息帮我们实现好了。

2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。

我们也可以用代码申明:

rabbitAdmin要申明:eclareExchange方法 参数是交换器

BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange

rabbitAdmin.declareBinding(binding);//声明绑定关系

源代码有这些方法:

这样就可以实现交换器和队列的绑定关系

交换器我们可以申明为持久化,还有使用完不会自动删除

TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化 autoDelete:false使用完不删除

源代码:

队列也可以申明为持久化

第五步:实现测试类

@Resource

private RmqProducer rmqProducer2;

@Test

public void test() throws IOException

{

String exchange="testExchange";交换器

String routeKey="testQueue";//队列

String methodName="test";//调用的方法

//参数

Map param=new HashMap();

param.put("data","hello");

RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);

//发送消息

rmqProducer2.sendMessage(msg);

}

结果:RabbitMQ有一条消息

2.消费者

第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些

说明:

1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列

2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter

有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。

3.交换器和队列的持久化在生产者有介绍过了。

4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时,

DestinationType这个参数要注意点

源代码:

第二步:处理消息

/**

* 消费者

*

*/

public class RmqConsumer

{

public void rmqProducerMessage(Object object){

RabbitMessage rabbitMessage=(RabbitMessage) object;

System.out.println(rabbitMessage.getExchange());

System.out.println(rabbitMessage.getRouteKey());

System.out.println(rabbitMessage.getParams().toString());

}

}

在启动过程中会报这样的错误,可能是你的交换器和队列没配置好

clazzArray[i]=params[i].getClass();

this.paramTypes=clazzArray;

}

public byte[] getSerialBytes()

{

byte[] res=new byte[0];

ByteArrayOutputStream baos=new ByteArrayOutputStream();

ObjectOutputStream oos;

try {

oos = new ObjectOutputStream(baos);

oos.writeObject(this);

oos.close();

res=baos.toByteArray();

} catch (IOException e) {

e.printStackTrace();

}

return res;

}

public String getRouteKey() {

return routeKey;

}

public String getExchange() {

return exchange;

}

public void setExchange(String exchange) {

this.exchange = exchange;

}

public void setRouteKey(String routeKey) {

this.routeKey = routeKey;

}

public Class>[] getParamTypes() {

return paramTypes;

}

public Object[] getParams() {

return params;

}

}

发送消息

/**

* 生产着

*

*/

public class RmqProducer

{

@Resource

private RabbitTemplate rabbitTemplate;

/**

* 发送信息

* @param msg

*/

public void sendMessage(RabbitMessage msg)

{

try {

System.out.println(rabbitTemplate.getConnectionFactory().getHost());

System.out.println(rabbitTemplate.getConnectionFactory().getPort());

//发送信息

rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);

} catch (Exception e) {

}

}

}

说明:

1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);

源代码中的send调用的方法,一些发送消息帮我们实现好了。

2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。

我们也可以用代码申明:

rabbitAdmin要申明:eclareExchange方法 参数是交换器

BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange

rabbitAdmin.declareBinding(binding);//声明绑定关系

源代码有这些方法:

这样就可以实现交换器和队列的绑定关系

交换器我们可以申明为持久化,还有使用完不会自动删除

TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化 autoDelete:false使用完不删除

源代码:

队列也可以申明为持久化

第五步:实现测试类

@Resource

private RmqProducer rmqProducer2;

@Test

public void test() throws IOException

{

String exchange="testExchange";交换器

String routeKey="testQueue";//队列

String methodName="test";//调用的方法

//参数

Map param=new HashMap();

param.put("data","hello");

RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);

//发送消息

rmqProducer2.sendMessage(msg);

}

结果:RabbitMQ有一条消息

2.消费者

第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些

说明:

1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列

2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter

有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。

3.交换器和队列的持久化在生产者有介绍过了。

4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时,

DestinationType这个参数要注意点

源代码:

第二步:处理消息

/**

* 消费者

*

*/

public class RmqConsumer

{

public void rmqProducerMessage(Object object){

RabbitMessage rabbitMessage=(RabbitMessage) object;

System.out.println(rabbitMessage.getExchange());

System.out.println(rabbitMessage.getRouteKey());

System.out.println(rabbitMessage.getParams().toString());

}

}

在启动过程中会报这样的错误,可能是你的交换器和队列没配置好


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:入门经典编程题
下一篇:护照查询API(护照查询个人信息查询)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~