Spring Boot+RabbitMQ 通过fanout模式实现消息接收功能(支持消费者多实例部署)
282
2022-08-03
Spring boot 集成 MQTT详情
目录一、简介二、主要特性三、集成步骤1.引入相关jar包2.核心配置类3.网关配置4.编写测试类5.yml配置信息
一、简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,可以以极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务。目前在物联网、小型设备、移动应用等方面有较广泛的应用。
二、主要特性
(1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。(2)对负载内容屏蔽的消息传输。(3)使用TCP/IP提供网络连接。(4)有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
(5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。(6)使用Last Will和Testament特性通知有关各方客户端异常中断的机制。
Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。
Testament:遗嘱机制,功能类似于Last Will。
三、集成步骤
1.引入相关jar包
2.核心配置类
@Configuration
public class MqttConfig
{
@Autowired
private MqttProperties mqttProperties;
/**
* 连接器
* @return
*/
@Bean
public MqttConnectOptions getMqttConnectOptions()
{
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
// 设置超时时间,默认30秒
mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeOut());
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
mqttConnectOptions.setAutomaticReconnect(true);
// 设置连接的用户名
mqttConnectOptions.setUserName(mqttProperties.getUsername());
// 设置连接的密码
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
//服务器地址
mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
/***
* MQTT客户端
* @return
*/
@Bean("mqttClientFactory")
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/*----------------- 消息生产者的配置 ---------------------*/
/**
* MQTT生产端发布处理器
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHaEJTCMndler messageHandler = new MqttPahoMessageHandler(mqttProperties.getProducerClientId(), mqttClientFactory());
messageHandler.setAsync(true);
return messageHandler;
}
/**
* MQTT生产端发布通道
* @return
*/
@Bean("mqttOutboundChannel")
public MessageChannel mqttOutboundChannel()
{
return new DirectChannel();
}
/*----------------- 消息消费者的配置 ---------------------*/
/**
* MQTT消费端订阅通道
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = "mqttInboundChannel")
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消费端连接配置
*
* @param channel {@link org.springframework.messaging.MessageChannel}
* @param factory {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
* @return {@link org.springframework.integration.core.MessageProducer}
*/
@Bean
public MessageProducer inbound(
@Qualifier("mqttInboundChannel") MessageChannel channel,
@Qualifier("mqttClientFactory") MqttPahoClientFactory factory) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getConsumerClientId(), factory, "test");
adapter.setCompletionTimeout(30000);
adapter.setConverter(new DefaultPahoMessageConverter());
// 0 至多一次,数据可能丢失
// 1 至少一次,数据可能重复
// 2 只有一次,且仅有一次,最耗性能
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(channel);
return adapter;
}
}
@ConfigurationProperties("mqtt")
@Component
public class MqttProperties implements Serializable
{
private static final long serialVersionUID = -1425980007744001158L;
private String url;
private String username;
private String password;
private int keepAlive;
private int connectionTimeOut;
private String producerClientId;
private String producerQos;
private String consumerClientId;
private String consumerQos;
private String consumerTopic;
private int completionTimeout;
private String defaultTopic;
//get、set方法省略
}
3.网关配置
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway
{
void sendToMqtt(byte[] data,@Header(MqttHeaders.TOPIC) String topic);
}
4.编写测试类
@Autowired
private MqttGateway mqttGateway;
@RequestMapping("/sendTest")
public String sendMqttTest(String msg)
{
mqttGateway.send("test",msg);
return "OK";
}
5.yml配置信息
mqtt:
url: tcp://localhost:1883
username: test
password: test1234
keep-alive: 30
connection-timeout: 3000
producerClientId: test-producer
producerQos: 1
consumerClientId: test-consumer
consumerQos: 1
deafultTopic : test
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~