Java搭建RabbitMq消息中间件过程详解

网友投稿 220 2022-12-18


Java搭建RabbitMq消息中间件过程详解

这篇文章主要介绍了java搭建RabbitMq消息中间件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

前言

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。

名词

exchange: 交换机

routingkey: 路由key

queue:队列

控制台端口:15672

exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

使用场景

1.技能订单3分钟自动取消,改变状态

2.直播开始前15分钟提醒

3.直播状态自动结束

流程

生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列

—> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者

第一步:在pom文件中添加

org.springframework.boot

spring-boot-starter-amqp

第二步:在application.properties文件中添加

spring.rabbitmq.host=172.xx.xx.xxx

spring.rabbitmq.port=5672

spring.rabbitmq.username=rabbit

spring.rabbitmq.password=123456

spring.rabbitmq.virtual-host=/

spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true

spring.rabbitmq.publisher-returns=true

spring.rabbitmq.template.mandatory=true

第三步:配置 OrderQueueConfig

package com.tuohang.platform.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.ahttp://mqp.core.QueueBuilder;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列)

*

*

* @author Administrator

* @version 1.0

* @Date 2018年9月18日

*/

@Configuration

public class OrderQueueConfig {

/**

* 订单缓冲交换机名称

*/

public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";

/**

* 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】

*/

public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";

/**

* 订单的交换机DLX 名字

*/

final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";

/**

* 订单message时间过期后进入的队列,也就是订单实际的消费队列

*/

public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";

/**

* 订单在缓冲队列过期时间(毫秒)30分钟

*/

public final static int ORDER_QUEUE_EXPIRATION = 1800000;

/**

* 订单缓冲交换机

*

* @return

*/

@Bean

public DirectExchange preOrderExange() {

return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);

}

/**

* 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列

*

* @return

*/

@Bean

public Queue delayQueuePerOrderTTLQueue() {

return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)

.withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX

.withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key

.withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间

.build();

}

/**

* 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列

*

* @param delayQueuePerOrderTTLQueue

* @param preOrderExange

* @return

*/

@Bean

public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {

return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);

}

/**

* 创建订单的DLX exchange

*

* @return

*/

@Bean

public DirectExchange delayOrderExchange() {

return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);

}

/**

* 创建order_delay_process_queue队列,也就是订单实际消费队列

*

* @return

*/

@Bean

public Queue delayProcessOrderQueue() {

return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();

}

/**

* 将DLX绑定到实际消费队列

*

* @param delayProcessOrderQueue

* @param delayExchange

* @return

*/

@Bean

public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {

return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);

}

/**

* 监听订单实际消费者队列order_delay_process_queue

*

* @param connectionFactory

* @param processReceiver

* @return

*/

@Bean

public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,

OrderProcessReceiver processReceiver) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue

container.setMessageListener(new MessageListenerAdapter(processReceiver));

return container;

}

}

消费者 OrderProcessReceiver :

package com.tuohang.platform.config;

import java.util.Objects;

import org.apache.tools.ant.types.resources.selectors.Date;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

/**

* 订单延迟处理消费者

*

*

* @author Administrator

* @version 1.0

* @Date 2018年9月18日

*/

@Component

public class OrderProcessReceiver implements ChannelAwareMessageListener {

private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);

String msg = "The failed message will auto retry after a certain delay";

@Override

public void onMessage(Message message, Channel channel) throws Exception {

try {

processMessage(message);

} catch (Exception e) {

// 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做

channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,

msg.getBytes());

}

}

/**

* 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)

*

* @param message

* @throws Exception

*/

public void processMessage(Message message) throws Exception {

String realMessage = new String(message.getBody());

logger.info("Received <" + realMessage + ">");

// 取消订单

if(!Objects.equals(realMessage, msg)) {

// SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));

System.out.println("测试111111-----------"+new Date());

System.out.println(message);

}

}

}

或者

/**

* 测试 rabbit 消费者vYVSydDUHH

*

*

* @author Administrator

* @version 1.0

* @Date 2018年9月25日

*/

@Component

@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)

public class TestProcessReceiver {

private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);

String msg = "The failed message will auto retry after a certain delay";

@RabbitHandler

public void onMessage(Message message, Channel channel) throws Exception {

try {

processMessage(message);

//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

// 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做

channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,

msg.getBytes());

}

}

/**

* 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)

*

* @param message

* @throws Exception

*/

public void processMessage(Message message) throws Exception {

String realMessage = new String(message.getBody());

logger.info("Received < " + realMessage + " >");

// 取消订单

if(!Objects.equals(realMessage, msg)) {

System.out.println("测试111111-----------"+new Date());

}else {

System.out.println("rabbit else...");

}

}

}

生产者

/**

* 测试rabbitmq

*

* @return

*/

@RequestMapping(value = "/testrab")

public String testraa() {

GenericResult gr = null;

try {

String name = "test_pre_ttl_delay_queue";

long expiration = 10000;//10s 过期时间

rabbitTemplate.convertAndSend(name,String.valueOf(123456));

// 在单个消息上设置过期时间

//rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));

} catch (ServiceException e) {

e.printStackTrace();

gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());

}

return getWrite(gr);

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:php接口文档生成(php写api接口教程)
下一篇:mybatis分页绝对路径写法过程详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~