SpringCloud Stream 整合RabbitMQ的基本步骤

网友投稿 306 2022-08-17


SpringCloud Stream 整合RabbitMQ的基本步骤

目录一、项目介绍二、生产者三、消费者四、验证 在postman 访问生产者接口:

本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤:

引入SpringCloud引入SpringCloud Stream相关依赖定义绑定接口: 消息生产者(Output…Binding) 、消息消费者(Input…Binding)@EnableBinding 在对应类上进行定义@StreamListener 在对应方法上创建监听用来消费消息调用output的send()方法生产消息

一、项目介绍

演示SpringCloud Stream 整合RabbitMQ,项目可以在一个工程里完成,本次建立了一个工程mq-service,其中包含三个Module:

mq-service-base :基础模块(包含了共用依赖、共用变量)mq-service-producer :生产者mq-service-consumer :消费者

注: 完全可以在一个工程里实现,这里为了区分,并为了后续单独启动或停止生产者或消费者做实验,也为了适应实际应用项目,所以创建了不同Module

(1)版本

SpringBoot : 2.0.6.RELEASESpringCloud : Finchley.SR2RabbitMQ : 3.8.1

(2)项目整体结构

(3)基础模块

1)pom.xml

这里作为公共模块引入SpringCloud、Spring Cloud Stream等,其中也再此引入fastjson、lombok等工具依赖(完整代码见文章最下面)其中Spring Cloud Stream如下:

org.springframework.cloud

spring-cloud-stream

org.springframework.cloud

spring-cloud-starter-stream-rabbit

2) model

定义共用的变量,如CollectionRequest.java

二、生产者

(1)结构

(2)pom.xml

导入base的依赖即可,因为相关共用依赖在base中已经引入

com.zrk

mq-service-base

0.0.1-SNAPSHOT

(3)定义绑定(接口)

OutputMessageBinding.java

public interface OutputMessageBinding {

/** Topic 名称*/

String OUTPUT = "message-center-out";

@Output(OUTPUT)

MessageChannel output();

}

(4)添加配置

# rabbitmq连接信息

spring.rabbitmq.addresses=192.168.1.125

spring.rabbitmq.port=5672

spring.rabbitmq.username=admin

spring.rabbitmq.password=123456

spring.cloud.stream.bindings.message-center-out.destination=message-center

spring.cloud.stream.rabbit.bindings.message-center-out.consumer.exchangeType=fanout

(5) 调用方法

CollectionServiceImpl.java

@Service

@EnableBinding(OutputMessageBinding.class)

public class CollectionServiceImpl implements CollectionService{

@Resource

private OutputMessageBinding outputMessageBinding;

/**

* @param schoolName

* @param content

*/

@Override

public void getCollection(String schoolName, String content) {

CollectionRequest request = new CollectionRequest();

request.setSchoolName(schoolName);

request.setContent(content);

outputMessageBinding.output().send(MessageBuilder.withPayload(request).build());

}

}

注: 主要是两点

@EnableBinding 定义outputMessageBinding.output().send(MessageBuilder.withPayload(request).build()); 生产消息

三、消费者

(1)结构

(2)pom.xml

导入base的依赖即可,因为相关共用依赖在base中已经引入

com.zrk

mq-service-base

0.0.1-SNAPSHOT

(3)定义绑定(接口)

InputMessageBinding.java

public interface InputMessageBinding {

String INPUT = "message-center-input";

@Input(INPUT)

SubscribableChannel input();

}

注: 消费者这里与生产者不同,用的是SubscribableChannel ,而生产者用的是MessageChannel

(4)添加配置

# rabbitmq连接信息

spring.rabbitmq.addresses=192.168.1.125

spring.rabbitmq.port=5672

spring.rabbitmq.username=admin

spring.rabbitmq.password=123456

spring.cloud.stream.bindings.message-center-input.destination=message-center

spring.cloud.stream.bindings.message-center-input.group=${spring.application.name}

(5) 调用方法

CollectionReceiver.java

@Slf4j

@EnableBinding(InputMessageBinding.class)

public class CollectionReceiver {

@StreamListener(InputMessageBinding.INPUT)

public void handle(String value){

log.info("[消息] 接收到发送消息MQ: {}", value);

CollectionRequest request = JSON.parseObject(value, CollectionRequest.class);

log.info("处理收集信息:" + request.toString());

}

}

注: 主要是两点

@EnableBinding 定义@StreamListener 注册监听

至此,生产者与消费者都创建完成,分别启动两个项目,并调用生产者接口进行验证:

四、验证 在postman 访问生产者接口:

localhost:30110/collection/getCollectionschoolName=‘zrk’&content=‘send message to rabbitmq’

观察消费者日志:

查看rabbitmq首页

则证明已经整合成功,接下来将研究一下更多的配置与用法。

如果有需要,可以参考项目完整代码:https://github.com/zrk333/mq-service


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:spring注解之@Valid和@Validated的区分总结
下一篇:springboot 接收List 入参的几种方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~