SpringBoot整合MQTT并实现异步线程调用的问题

网友投稿 1328 2022-09-13


SpringBoot整合MQTT并实现异步线程调用的问题

目录为什么选择MQTT使用背景代码实现基础代码异步线程处理实现

为什么选择MQTT

MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来

先从使用MQTT需要什么开始分析:

消息服务器

不同应用/设备之间的频繁交互

可能涉及一对多的消息传递

基于SpringBoot通过注解实现对mqtt消息处理的异步调用

使用背景

生产环境下, 由于mqtt 生产者生产的消息逐渐增多, 可能会导致消息堆积. 因此需要消费者去快速的消费.

而其中的一个方案便是使用异步线程去加速消费消息. 下面介绍下思路

我们可以在原来的mqtt工具类上面进行改装.

首先创建一个类MqttMessageListener并继承IMqttMessageListener实现messageArrived, 用于处理这些消息(业务编写)

然后改写mqtt客户端订阅的方法, 注入MqttMessageListener, 并在订阅方法中新增该参数

在然后在启动类开启异步线程, 编写一个配置类配置线程池参数并且在messageArrived加上@Async开启异步线程调用

代码实现

基础代码

指没有开启线程池的代码

MqttPushClient 主要定义了连接参数

import org.eclipse.paho.client.mqttv3.IMqttToken;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.util.List;

/**

* @Author

* @Date

* @Description 连接至EMQ X 服务器,获取mqtt连接,发布消息

*/

@Component

public class MqttPushClient{

private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);

@Autowired

private PushCallback pushCallback;

private static MqttClient client;

public static void setClient(MqttClient client) {

MqttPushClient.client = client;

}

public static MqttClient getClient() {

return client;

}

public void connect(String host, String clientID, String username, String password, int timeout, inuFtJkLHnDst keepalive, List topicList) {

MqttClient client;

try {

client = new MqttClient(host, clientID, new MemoryPersistence());

MqttConnectOptions options = new MqttConnectOptions();

options.setCleanSession(true);

if (username != null) {

options.setUserName(username);

}

if (password != null) {

options.setPassword(password.toCharArray());

}

options.setConnectionTimeout(thttp://imeout);

options.setKeepAliveInterval(keepalive);

MqttPushClient.setClient(client);

try {

//设置回调类

client.setCallback(pushCallback);

//client.connect(options);

IMqttToken iMqttToken = client.connectWithResult(options);

boolean complete = iMqttToken.isComplete();

log.info("MQTT连接"+(complete?"成功":"失败"));

/** 订阅主题 **/

for (String topic : topicList) {

log.info("连接订阅主题:{}", topic);

client.subscribe(topic, 0);

}

} catch (Exception e) {

e.printStackTrace();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

PushCallback 回调类, 实现重连, 消息发送监听, 消息接收监听

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

/**

* @Author

* @Date

* @Description 消息回调,处理接收的消息

*/

@Component

public class PushCallback implements MqttCallback {

private static final Logger log = LoggerFactory.getLogger(PushCallback.class);

@Autowired

private MqttConfiguration mqttConfiguration;

@Autowired

private MqttTopic mqttTopic;

@Override

public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连

log.info("连接断开,正在重连");

MqttPushClient mqttPushClient = mqttConfiguration.getMqttPushClient();

if (null != mqttPushClient) {

mqttPushClient.connect(mqttConfiguration.getHost(), mqttConfiguration.getClientid(), mqttConfiguration.getUsername(),

mqttConfiguration.getPassword(), mqttConfiguration.getTimeout(), mqttConfiguration.getKeepalive(), mqttConfiguration.getTopic());

log.info("已重连");

}

}

/**

* 发送消息,消息到达后处理方法

* @param token

*/

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

int messageId = token.getMessageId();

String[] topics = token.getTopics();

log.info("消息发送完成,messageId={},topics={}",messageId,topics.toString());

}

/**

* 订阅主题接收到消息处理方法

* @param topic

* @param message

*/

@Override

public void messageArrived(String topic, MqttMessage message) {

// subscribe后得到的消息会执行到这里面,这里在控制台有输出

String messageStr = new String(message.getPayload());

// messageDistribute.distriuFtJkLHnDsbute(topic, messageStr);

log.info("接收的主题:" + topic + ";接收到的信息:" + messageStr);

}

}

MqttConfiguration 配置了mqtt相关参数, 并初始化连接(mqtt在这里启动)

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.stereotype.Component;

import java.util.List;

/**

* @Author

* @Date mqtt配置及连接

* @Description

*/

@Slf4j

@Component

@Configuration

@ConfigurationProperties(MqttConfiguration.PREFIX)

public class MqttConfiguration {

@Autowired

private MqttPushClient mqttPushClient;

/**

* 指定配置文件application-local.properties中的属性名前缀

*/

public static final String PREFIX = "std.mqtt";

private String host;

private String clientId;

private String userName;

private String password;

private int timeout;

private int keepAlive;

private List topic;

public String getClientid() {

return clientId;

}

public void setClientid(String clientid) {

this.clientId = clientid;

}

public String getUsername() {

return userName;

}

public void setUsername(String username) {

this.userName = username;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

public int getTimeout() {

return timeout;

}

public void setTimeout(int timeout) {

this.timeout = timeout;

}

public int getKeepalive() {

return keepAlive;

}

public void setKeepalive(int keepalive) {

this.keepAlive = keepalive;

}

public String getHost() {

return host;

}

public void setHost(String host) {

this.host = host;

}

public List getTopic() {

return topic;

}

public void setTopic(List topic) {

this.topic = topic;

}

/**

* 连接至mqtt服务器,获取mqtt连接

* @return

*/

@Bean

public MqttPushClient getMqttPushClient() {

//连接至mqtt服务器,获取mqtt连接

mqttPushClient.connect(host, clientId, userName, password, timeout, keepAlive, topic);

return mqttPushClient;

}

}

properties.yml 配置文件

std.mqtt:

host: tcp://x.x.x.x:1883

username: your_username

password: your_password

#MQTT-连接服务器默认客户端ID

clientid: your_clientid

#连接超时

timeout: 1000

# deviceId

deviceId: your_deviceId

# mqtt-topic

topic[0]: your_tpoic

TopicOperation 定义了发布订阅的方法

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttTopic;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

/**

* @Author chy

*/

public class TopicOperation {

private static final Logger log = LoggerFactory.getLogger(TopicOperation.class);

/**

* 订阅主题

* @param topic 主题名称

*/

public static void subscribe(String topic) {

try {

MqttClient client = MqttPushClient.getClient();

if (client == null) {

return;

};

client.subscribe(topic, 0);

log.info("订阅主题:{}",topic);

} catch (MqttException e) {

e.printStackTrace();

}

}

/**

* 发布主题

*

* @param topic

* @param pushMessage

*/

public static void publish(String topic, String pushMessage) {

log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage);

MqttMessage message = new MqttMessage();

message.setQos(0);

// 非持久化

message.setRetained(false);

message.setPayload(pushMessage.getBytes());

MqttClient client = MqttPushClient.getClient();

if (client == null) {

return;

};

MqttTopic mTopic = client.getTopic(topic);

if (null == mTopic) {

log.error("主题不存在:{}",mTopic);

}

try {

mTopic.publish(message);

} catch (Exception e) {

log.error("mqtt发送消息异常:",e);

}

}

}

定义了发布和订阅的相关主题

import com.sxd.onlinereservation.exception.BusinessException;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

/**

* @Author

* @Date topic名称

* @Description

*/

@Component

public class MqttTopic {

@Value("${std.mqtt.deviceId}")

private String[] deviceId;

public String getSubscribeTopic(String type){

switch (type){

case "appointTopic":

return String.format("/v1/%s/service/appointTopic", deviceId[0]);

default:

throw new BusinessException("mqtt 订阅主题获取错误");

}

}

public String getPublishTopic(String type) {

switch (type){

//1.0接口立即取号发布主题

case "appointTopic":

return String.format("/v1/%s/service/appointTopic", deviceId[1]);

default:

throw new BusinessException("mqtt 发布主题获取错误");

}

}

}

ps: 如果想要使用该工具类进行消息发送和接收看下面demo

//消息发布操作

TopicOperation.publish(mqttTopic.getPublishTopic("appointTopic"), "消息体"));

//消息订阅操作

TopicOperation.subscribe(mqttTopic.getSubscribeTopic("appointTopic"), "消息体"));

异步线程处理实现

总结

创建消息监听类 , 用于监听消息并进行业务处理

在原来订阅时, 注入并使用第一步创建的监听类

通过注解开启异步线程并配置处理方式

创建消息监听类 , 用于监听消息并进行业务处理

@Slf4j

@Component

public class MqttMessageListener implements IMqttMessageListener {

@Resource

private BusinessService businessService;

@Autowired

private MqttTopic mqttTopic;

@Autowired

private ThreeCallmachineService threeCallmachineService;

@Autowired

private BusinessHallService businessHallService;

@Autowired

private BusinessMaterialService businessMaterialService;

@Autowired

private BusinessWaitService businessWaitService;

@Autowired

private AppointmentService appointmentService;

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

String messageStr = new String(message.getPayload());

log.info("接收的主题:" + topic + ";接收到的信息:" + messageStr);

//进行 业务处理

}

}

在原来订阅时, 注入并使用第一步创建的监听类

注入了 MqttMessageListener , 并且在订阅时加入 client.subscribe(topic, mqttMessageListener);

修改MqttPushClient (必须)

@Component

public class MqttPushClient{

private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);

@Autowired

private PushCallback pushCallback;

@Autowired //这里进行了注入操作

private MqttMessageListener mqttMessageListener;

private static MqttClient client;

public static void setClient(MqttClient client) {

MqttPushClient.client = client;

}

public static MqttClient getClient() {

return client;

}

public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List topicList) {

MqttClient client;

try {

client = new MqttClient(host, clientID, new MemoryPersistence());

MqttConnectOptions options = new MqttConnectOptions();

options.setCleanSession(true);

if (username != null) {

options.setUserName(username);

}

if (password != null) {

options.setPassword(password.toCharArray());

}

options.setConnectionTimeout(timeout);

options.setKeepAliveInterval(keepalive);

MqttPushClient.setClient(client);

try {

//设置回调类

client.setCallback(pushCallback);

//client.connect(options);

IMqttToken iMqttToken = client.connectWithResult(options);

boolean complete = iMqttToken.isComplete();

log.info("MQTT连接"+(complete?"成功":"失败"));

/** 订阅主题 **/

for (String topic : topicList) {

log.info("连接订阅主题:{}", topic);

//client.subscribe(topic, 0);

client.subscribe(topic, mqttMessageListener);

}

} catch (Exception e) {

e.printStackTrace();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

如果业务还使用了手动订阅, 则也需要在订阅的类上面注入MqttMessageListener , 并且在订阅方法中作为参数使用. 但是我们需要将方法改成非静态的, 因此在使用该方法时我们需要new该对象然后才能够调用. 但是手动订阅很少用到. 因此有无此步骤都可

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttTopic;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

/**

* @Author chy

* @Date

* @Description

*/

public class TopicOperation {

private static final Logger log = LoggerFactory.getLogger(TopicOperation.class);

//注入MqttMessageListener

@Autowired

private MqttMessageListener mqttMessageListener;

/**

* 订阅主题

* @param topic 主题名称

*/

public void subscribe(String topic) {

try {

MqttClient client = MqttPushClient.getClient();

if (client == null) {

return;

};

//client.subscribe(topic, 0);

//在订阅方法中作为参数使用

client.subscribe(topic, mqttMessageListener);

log.info("订阅主题:{}",topic);

} catch (MqttException e) {

e.printStackTrace();

}

}

/**

* 发布主题

*

* @param topic

* @param pushMessage

*/

public static void publish(String topic, String pushMessage) {

log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage);

MqttMessage message = new MqttMessage();

message.setQos(0);

// 非持久化

message.setRetained(false);

message.setPayload(pushMessage.getBytes());

MqttClient client = MqttPushClient.getClient();

if (client == null) {

return;

};

MqttTopic mTopic = client.getTopic(topic);

if (null == mTopic) {

log.error("主题不存在:{}",mTopic);

}

try {

mTopic.publish(message);

} catch (Exception e) {

log.error("mqtt发送消息异常:",e);

}

}

}

通过注解开启异步线程并配置处理方式 启动类开启 @EnableAsync(proxyTargetClass=true )

@SpringBootApplication

@MapperScan(basePackages = "com.x.x.mapper")

@EnableTransactionManagement

@EnableAsync(proxyTargetClass=true )

public class XXApplication {

public static void main(String[] args) {

SpringApplication.run(XXApplication.class, args);

}

}

配置类配置线程池参数

@Slf4j

@Configuration

public class ExecutorConfig {

@Bean

public Executor asyncServiceExecutor() {

log.info("start asyncServiceExecutor");

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//配置核心线程数

executor.setCorePoolSize(9);

//配置最大线程数

executor.setMaxPoolSize(20);

//配置队列大小

executor.setQueueCapacity(200);

//配置线程池中的线程的名称前缀

executor.setThreadNamePrefix("sxd-async-service-");

// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务

// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//执行初始化

executor.initialize();

return executor;

}

}

MqttMessageListener的实现方法messageArrived开启@Async("asyncServiceExecutor")

@Slf4j

@Component

public class MqttMessageListener implements IMqttMessageListener {

@Resource

private BusinessService businessService;

@Autowired

private MqttTopic mqttTopic;

@Autowired

private ThreeCallmachineService threeCallmachineService;

@Autowired

private BusinessHallService businessHallService;

@Autowired

private BusinessMaterialService businessMaterialService;

@Autowired

private BusinessWaitService businessWaitService;

@Autowired

private AppointmentService appointmentService;

@Override

@Async("asyncServiceExecutor")

public void messageArrived(String topic, MqttMessage message) throws Exception {

String messageStr = new String(message.getPayload());

log.info("接收的主题:" + topic + ";接收到的信息:" + messageStr);

System.out.println("线程名称:【" + Thread.currentThread().getName() + "】");

//进行 业务处理

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:centos7手把手教你搭建zabbix监控(zabbix如何监控服务器硬件)
下一篇:利用VM虚拟机联通网络(vmos虚拟机联网)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~