Springboot整合mqtt服务的示例代码

网友投稿 352 2022-08-19


Springboot整合mqtt服务的示例代码

首先在pom文件里引入mqtt的依赖配置

org.eclipse.paho

org.eclipse.paho.client.mqttv3

1.2.4

其次在springboot 的配置yml文件,配置mqtt的服务配置

spring:

mqtt:

url: tcp://127.0.0.1:1883

client-id: niubility-tiger

username:

password:

topic: [/unify/test]

创建 MqttProperties配置参数类

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

@Data

@ConfigurationProperties("spring.mqtt")

public class MqttProperties {

private String url;

private String clientId;

private String username;

private String password;

private String[] topic;

}

创建 MqttConfiguration 配置类

import org.eclipse.paho.client.mqttv3.IMqttClient;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springblade.core.tool.utils.Func;

import org.springblade.ubw.listener.MqttSubscribeListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

@EnableConfigurationProperties({MqttProperties.class})

public class MqttConfiguratiotMcvXqwsn {

private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);

@Autowired

private MqttProperties mqttProperties;

public MqttConfiguration() {

}

@Bean

public MqttConnectOptions mqttConnectOptions() {

MqttConnectOptions connectOptions = new MqttConnectOptions();

connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()});

if (Func.isNotBlank(this.mqttProperties.getUrl())) {

connectOptions.setUserName(this.mqttProperties.getUsername());

}

if (Func.isNotBlank(this.mqttProperties.getPassword())) {

connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());

}

connectOptions.setKeepAliveInterval(60);

return connectOptions;

}

@Bean

public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {

IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());

mqttClient.connect(options);

for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) {

mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener());

}

return mqttClient;

}

}

创建 订阅事件类

import org.springframework.context.ApplicationEvent;

public class UWBMqttSubscribeEvent extends ApplicationEvent {

private String topic;

public UWBMqttSubscribeEvent(String topic, Object source) {

super(source);

tMcvXqws this.topic = topic;

}

public String getTopic() {

return this.topic;

}

}

创建订阅事件监听器

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springblade.core.tool.utils.SpringUtil;

import org.springblade.ubw.event.UWBMqttSubscribeEvent;

public class MqttSubscribeListener implements IMqttMessageListener {

@Override

public void messageArrived(String s, MqttMessage mqttMessage) {

String content = new String(mqttMessage.getPayload());

UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content);

SpringUtil.publishEvent(event);

}

}

创建mqtt消息事件异步处理监听器

import com.baomidou.mybatisplus.core.toolkit.StringPool;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springblade.core.tool.utils.Func;

import org.springblade.ubw.config.MqttProperties;

import org.springblade.ubw.event.UWBMqttSubscribeEvent;

import org.springblade.ubw.service.MqttService;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.event.EventListener;

import org.springframework.scheduling.annotation.Async;

import javax.annotation.Resource;

import java.util.Arrays;

import java.util.List;

@Configuration

public class MqttEventListener {

private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);

@Resource

private MqttProperties mqttProperties;

@Resource

private MqttService mqttService;

private String processTopic (String topic) {

List topics = Arrays.asList(mqttProperties.getTopic());

for (String wild : topics) {

wild = wild.replace(StringPool.HASH, StringPool.EMPTY);

if (topic.startsWith(wild)) {

return topic.replace(wild, StringPool.EMPTY);

}

}

return StringPool.EMPTY;

}

@Async

@EventListener(UWBMqttSubscribeEvent.class)

public void listen (UWBMqttSubscribeEvent event) {

String topic = processTopic(event.getTopic());

Object source = event.getSource();

if (Func.isEmpty(source)) {

return;

}

mqttService.issue(topic,source);

// log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source);

}

}

创建MqttService 数据处理服务类

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springblade.core.tool.utils.Func;

import org.springblade.ubw.area.entity.WorkArea;

import org.springblade.ubw.area.entity.WorkSite;

import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo;

import org.springblade.ubw.area.entity.WorkSitePassInfo;

import org.springblade.ubw.area.service.WorkAreaService;

import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService;

import org.springblade.ubw.area.service.WorkSitePassInfoService;

import org.springblade.ubw.area.service.WorkSiteService;

import org.springblade.ubw.constant.UbwConstant;

import org.springblade.ubw.history.entity.HistoryLocusInfo;

import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo;

import org.springblade.ubw.history.service.HistoryLocusInfoService;

import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService;

import org.springblade.ubw.loc.entity.LocStatusInfo;

import org.springblade.ubw.loc.entity.LocStatusInfoHistory;

import org.springblade.ubw.loc.service.LocStatusInfoHistoryService;

import org.springblade.ubw.loc.service.LocStatusInfoService;

import org.springblade.ubw.msg.entity.*;

import org.springblade.ubw.msg.service.*;

import org.springblade.ubw.system.entity.*;

import org.springblade.ubw.system.service.*;

import org.springblade.ubw.system.wrapper.MqttWrapper;

import org.springframework.stereotype.Service;

import javax.annotation.Resource;

import java.util.List;

import java.util.stream.Collectors;

@Service

public class MqttService {

private static final Logger log = LoggerFactory.getLogger(MqttService.class);

@Resource

private EmployeeAndDepartmentService employeeAndDepartmentService;

@Resource

private VehicleInfoService vehicleInfoService;

@Resource

private WorkSiteService workSiteService;

@Resource

private LocStatusInfoService locStatusInfoService;

@Resource

private LocStatusInfoHistoryService locStatusInfoHistoryService;

@Resource

private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService;

@Resource

private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService;

@Resource

private LocSosAlarminfoService locSosAlarminfoService;

@Resource

private AttendanceInfoService attendanceInfoService;

@Resource

private HistoryLocusInfoService historyLocusInfoService;

@Resource

private WorkSitePassInfoService workSitePassInfoService;

@Resource

private EnvironmentalMonitorInfoService environmentalMonitorInfoService;

@Resource

private TrAlertService trAlertService;

@Resource

private AddEvacuateInfoService addEvacuateInfoService;

@Resource

private CancelEvacuateInfoService cancelEvacuateInfoService;

@Resource

private WorkSiteNeighbourInfoService workSiteNeighbourInfoService;

@Resource

private LinkMsgAlarmInfoService linkMsgAlarmInfoService;

@Resource

private LeaderEmployeeInfoService leaderEmployeeInfoService;

@Resource

private ElectricmsgInfoService electricMsgInfoService;

@Resource

private WorkAreaService workAreaService;

@Resource

private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService;

@Resource

private SpecialWorksService specialWorksService;

@Resource

private AttendanceLocusInfoService attendanceLocusInfoService;

@Resource

private WorkTypeService workTypeService;

@Resource

private OfficePositionService officePositionService;

@Resource

private ClassTeamService classTeamService;

/**

* 方法描述: 消息分发

*

* @param topic

* @param source

* @author liwenbin

* @date 2021年12月14日 14:14:09

*/

public void issue(String topic,Object source){

switch(topic){

case UbwConstant.TOPIC_EMP :

//人员和部门信息

employeeAndDepartmentService.saveBatch(source);

break;

case UbwConstant.TOPIC_VEHICLE :

//车辆信息

List vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo());

vehicleInfoService.deleteAll();

vehicleInfoService.saveBatch(vehicleInfos);

break;

case UbwConstant.TOPIC_WORK_SITE :

//基站信息

List workSites = MqttWrapper.build().toEntityList(source,new WorkSite());

workSiteService.deleteAll();

workSiteService.saveBatch(workSites);

break;

case UbwConstant.TOPIC_LOC_STATUS:

//井下车辆人员实时

List locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo());

if (Func.isEmpty(locStatusInfos)){

break;

}

locStatusInfoService.deleteAll();

//筛选入井人员列表

List inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList());

locStatusInfoService.saveBatch(inWellList);

//人员历史数据入库

List locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory());

locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys);

break;

case UbwConstant.TOPIC_LOC_OVER_TIME:

//超时报警信息

List locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo());

locOverTimeSosAlarmihttp://nfoService.saveBatch(locOverTimeSosAlarmInfos);

break;

case UbwConstant.TOPIC_LOC_OVER_AREA:

//超员报警信息

List locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo());

locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos);

break;

case UbwConstant.TOPIC_LOC_SOS:

//求救报警信息

List locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo());

locSosAlarminfoService.saveBatch(locSosAlarmInfos);

break;

case UbwConstant.TOPIC_ATTEND:

//考勤信息

List attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo());

attendanceInfoService.saveBatch(attendanceInfos);

break;

case UbwConstant.TOPIC_HISTORY_LOCUS:

//精确轨迹信息

List historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo());

historyLocusInfoService.saveBatch(historyLocusInfos);

break;

case UbwConstant.TOPIC_WORK_SITE_PASS:

//基站经过信息

List workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo());

workSitePassInfoService.saveBatch(workSitePassInfos);

break;

case UbwConstant.TOPIC_ENV_MON:

//环境监测信息

List environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo());

environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos);

break;

case UbwConstant.TOPIC_TR_ALERT:

//环境监测报警信息

List trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert());

trAlertService.saveBatch(trAlerts);

break;

case UbwConstant.TOPIC_ADD_EVA:

//下发撤离信息

List addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo());

addEvacuateInfoService.saveBatch(addEvacuateInfos);

break;

case UbwConstant.TOPIC_CANCEL_EVA:

//取消撤离信息

List cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo());

cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos);

break;

case UbwConstant.TOPIC_WORK_SITE_NEI:

//相邻基站关系信息

workSiteNeighbourInfoService.deleteAll();

List workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo());

workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos);

break;

case UbwConstant.TOPIC_LINK_MSG:

//基站链路信息

linkMsgAlarmInfoService.deleteAll();

List linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo());

linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos);

break;

case UbwConstant.TOPIC_LEADER_EMP:

//带班领导信息

leaderEmployeeInfoService.deleteAll();

List leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo());

leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos);

break;

case UbwConstant.TOPIC_ELE_MSG:

//低电报警信息

List electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo());

electricMsgInfoService.saveBatch(electricMsgInfos);

break;

case UbwConstant.TOPIC_WORK_AREA:

//区域信息

workAreaService.deleteAll();

List workAreas = MqttWrapper.build().toEntityList(source,new WorkArea());

workAreaService.saveBatch(workAreas);

break;

case UbwConstant.TOPIC_HIS_OVER_TIME_SOS:

//历史超时报警信息

List historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo());

historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos);

break;

case UbwConstant.TOPIC_SPECIAL_WORK:

//特种人员预设线路信息

specialWorksService.deleteAll();

List specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks());

specialWorksService.saveBatch(specialWorks);

break;

case UbwConstant.TOPIC_ATTEND_LOC:

//历史考勤轨迹信息

List attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo());

attendanceLocusInfoService.saveBatch(attendanceLocusInfos);

break;

case UbwConstant.TOPIC_WORK_TYPE:

//工种信息

workTypeService.deleteAll();

List workTypes = MqttWrapper.build().toEntityList(source,new WorkType());

workTypeService.saveBatch(workTypes);

break;

case UbwConstant.TOPIC_OFFICE_POS:

//职务信息

officePositionService.deleteAll();

List officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition());

officePositionService.saveBatch(officePositions);

break;

case UbwConstant.TOPIC_CLASS_TEAM:

//班组信息

classTeamService.deleteAll();

List classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam());

classTeamService.saveBatch(classTeams);

break;

default : //可选

break;

}

}

}

完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!

以上就是Springboot整合mqtt服务的示例代码的详细内容,更多关于Springboot整合mqtt的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java基本语法之内部类示例详解
下一篇:剑指Offer之Java算法习题精讲N叉树的遍历及数组与字符串
相关文章

 发表评论

暂时没有评论,来抢沙发吧~