java中的接口是类吗
319
2022-11-06
SpringMVC和rabbitmq集成的使用案例
1.添加maven依赖
2.spring主配置文件中加入rabbitMQ xml文件的配置
3.jdbc配置文件中加入 rabbitmq的链接配置
#rabbitMQ配置
mq.host=localhost
mq.username=donghao
mq.password=donghao
mq.port=5672
mq.vhost=testMQ
4.新建application-mq.xml文件,添加配置信息
xmlns:xsi="http://w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://springframework.org/schema/rabbit" xsi:schemaLocation="http://springframework.org/schema/beans http://springframework.org/schema/beans/spring-beans-3.0.xsd http://springframework.org/schema/rabbit http://springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
xmlns:xsi="http://w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://springframework.org/schema/rabbit"
xsi:schemaLocation="http://springframework.org/schema/beans
http://springframework.org/schema/beans/spring-beans-3.0.xsd
http://springframework.org/schema/rabbit
http://springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
5.新增公共入队类
@Service
public class MQProducerImpl{
@Resource
private AmqpTemplate amqpTemplate;
private final static Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
//公共入队方法
public void sendDataToQueue(String queueKey, Object object) {
try {
amqpTemplate.convertAndSend(queueKey, object);
} catch (Exception e) {
logger.error(e.toString());
}
}
}
6.创建监听类
import java.io.IOException;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.cn.framework.domain.BaseDto;
import com.cn.framework.util.ConstantUtils;
import com.cn.framework.util.RabbitMq.producer.MQProducer;
import com.kxs.service.activityService.IActivityService;
import com.kxs.service.messageService.IMessageService;
import com.rabbitmq.client.Channel;
/**
* 活动处理listener
* @author
* @date 2017年6月30日
**/
@Component
public class ActivityListener implements ChannelAwareMessageListener {
private static final Logger log = LoggerFactory.getLogger(ActivityListener.class);
@Override
@Transactional
public void onMessage(Message message,Channel channel) {
}
}
项目启动后 控制台会打印出监听的日志信息 这里写图片描述
结尾:仅供参考,自己用作学习记录,不喜勿喷,共勉!
补充:RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案
RabbitMQ本篇不介绍了,直接描述RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)。
使用了Spring-rabbit 发送消息和接收消息,我们使用的Maven来管理Jar包,在Maven的pom.xml文件中引入jar包
1.实现生产者
第一步:是要设置调用安装RabbitMQ的IP、端口等
配置一个global.properties文件
第二步:通过SpringMVC把global.properties文件读进来
第三步:配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类等,在SpringMVC的配置文件加入下面这些
第四步:实现消息类实体和发送消息
类实体
/**
* 消息
*
*/
public class RabbitMessage implements Serializable
{
private static final long serialVersionUID = -6487839157908352120L;
private Class>[] paramTypes;//参数类型
private String exchange;//交换器
private Object[] params;
private String routeKey;//路由key
public RabbitMessage(){}
public RabbitMessage(String exchange,String routeKey,Object...params)
{
this.params=params;
this.exchange=exchange;
this.routeKey=routeKey;
}
@SuppressWarnings("rawtypes")
public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
{
this.params=params;
this.exchange=exchange;
this.routeKey=routeKey;
int len=params.length;
Class[] clazzArray=new Class[len];
for(int i=0;i clazzArray[i]=params[i].getClass(); this.paramTypes=clazzArray; } public byte[] getSerialBytes() { byte[] res=new byte[0]; ByteArrayOutputStream baos=new ByteArrayOutputStream(); ObjectOutputStream oos; try { oos = new ObjectOutputStream(baos); oos.writeObject(this); oos.close(); res=baos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return res; } public String getRouteKey() { return routeKey; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public void setRouteKey(String routeKey) { this.routeKey = routeKey; } public Class>[] getParamTypes() { return paramTypes; } public Object[] getParams() { return params; } } 发送消息 /** * 生产着 * */ public class RmqProducer { @Resource private RabbitTemplate rabbitTemplate; /** * 发送信息 * @param msg */ public void sendMessage(RabbitMessage msg) { try { System.out.println(rabbitTemplate.getConnectionFactory().getHost()); System.out.println(rabbitTemplate.getConnectionFactory().getPort()); //发送信息 rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg); } catch (Exception e) { } } } 说明: 1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg); 源代码中的send调用的方法,一些发送消息帮我们实现好了。 2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。 我们也可以用代码申明: rabbitAdmin要申明:eclareExchange方法 参数是交换器 BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange rabbitAdmin.declareBinding(binding);//声明绑定关系 源代码有这些方法: 这样就可以实现交换器和队列的绑定关系 交换器我们可以申明为持久化,还有使用完不会自动删除 TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化 autoDelete:false使用完不删除 源代码: 队列也可以申明为持久化 第五步:实现测试类 @Resource private RmqProducer rmqProducer2; @Test public void test() throws IOException { String exchange="testExchange";交换器 String routeKey="testQueue";//队列 String methodName="test";//调用的方法 //参数 Map param.put("data","hello"); RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param); //发送消息 rmqProducer2.sendMessage(msg); } 结果:RabbitMQ有一条消息 2.消费者 第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些 说明: 1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列 2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter 有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。 3.交换器和队列的持久化在生产者有介绍过了。 4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时, DestinationType这个参数要注意点 源代码: 第二步:处理消息 /** * 消费者 * */ public class RmqConsumer { public void rmqProducerMessage(Object object){ RabbitMessage rabbitMessage=(RabbitMessage) object; System.out.println(rabbitMessage.getExchange()); System.out.println(rabbitMessage.getRouteKey()); System.out.println(rabbitMessage.getParams().toString()); } } 在启动过程中会报这样的错误,可能是你的交换器和队列没配置好
clazzArray[i]=params[i].getClass();
this.paramTypes=clazzArray;
}
public byte[] getSerialBytes()
{
byte[] res=new byte[0];
ByteArrayOutputStream baos=new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(this);
oos.close();
res=baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return res;
}
public String getRouteKey() {
return routeKey;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
public Class>[] getParamTypes() {
return paramTypes;
}
public Object[] getParams() {
return params;
}
}
发送消息
/**
* 生产着
*
*/
public class RmqProducer
{
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送信息
* @param msg
*/
public void sendMessage(RabbitMessage msg)
{
try {
System.out.println(rabbitTemplate.getConnectionFactory().getHost());
System.out.println(rabbitTemplate.getConnectionFactory().getPort());
//发送信息
rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
} catch (Exception e) {
}
}
}
说明:
1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
源代码中的send调用的方法,一些发送消息帮我们实现好了。
2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。
我们也可以用代码申明:
rabbitAdmin要申明:eclareExchange方法 参数是交换器
BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
rabbitAdmin.declareBinding(binding);//声明绑定关系
源代码有这些方法:
这样就可以实现交换器和队列的绑定关系
交换器我们可以申明为持久化,还有使用完不会自动删除
TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化 autoDelete:false使用完不删除
源代码:
队列也可以申明为持久化
第五步:实现测试类
@Resource
private RmqProducer rmqProducer2;
@Test
public void test() throws IOException
{
String exchange="testExchange";交换器
String routeKey="testQueue";//队列
String methodName="test";//调用的方法
//参数
Map
param.put("data","hello");
RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);
//发送消息
rmqProducer2.sendMessage(msg);
}
结果:RabbitMQ有一条消息
2.消费者
第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些
说明:
1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列
2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。
3.交换器和队列的持久化在生产者有介绍过了。
4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时,
DestinationType这个参数要注意点
源代码:
第二步:处理消息
/**
* 消费者
*
*/
public class RmqConsumer
{
public void rmqProducerMessage(Object object){
RabbitMessage rabbitMessage=(RabbitMessage) object;
System.out.println(rabbitMessage.getExchange());
System.out.println(rabbitMessage.getRouteKey());
System.out.println(rabbitMessage.getParams().toString());
}
}
在启动过程中会报这样的错误,可能是你的交换器和队列没配置好
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~