springboot整合netty

网友投稿 310 2022-08-20


springboot整合netty

目录1.添加依赖2.源码3.运行测试

1.添加依赖

org.jetlinks

netty-mqtt-client

http:// 1.0.0

junit</groupId>

junit

4.13.2

test

2.源码

application.yml

#mqtt配置

mqtt:

username: admin

password: 123456

#推送信息的连接地址

url: localhost

port: 1884

#默认发送的主题

defaultTopic: topic

#clientid

clientId: client

#连接超时时间 单位为秒

completionTimeout: 300

#设置会话心跳时间 单位为秒

keepAliveInterval: 20

MqttProperties.java

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

@Data

@ConfigurationProperties(prefix = "mqtt")

public class MqttProperties {

private String username;

private String password;

private String url;

private int port;

private String clientId;

private String defaultTopic;

private int completionTimeout;

private int keepAliveInterval;

}

MqttConfig.java

import com.xingyun.netty.mqtt.prop.MqttProperties;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import lombok.AllArgsConstructor;

import org.jetlinks.mqtt.client.*;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@AllArgsConstructor

@Configuration

@EnableConfigurationProperties(MqttProperties.class)

public class MqttConfig {

private final MqttProperties mqttProperties;

@Bean

public MqttClientConfig getMqttClientConfig() {

MqttClientConfig mqttClientConfig = new MqttClientConfig();

mqttClientConfig.setClientId(mqttProperties.getClientId());

mqttClientConfig.setUsername(mqttProperties.getClientId());

mqttClientConfig.setPassword(mqttProperties.getPassword());

/*mqttClientConfig.setTimeoutSeconds(mqttProperties.getCompletionTimeout());

mqttClientConfig.setRetryInterval(mqttProperties.getKeepAliveInterval());

mqttClientConfig.setProtocolVersion(MqttVersion.MQTT_3_1_1);

mqttClientConfig.setReconnect(true);*/

return mqttClientConfig;

}

@Bean

public MqttClient getMqttClient(){

EventLoopGroup loop = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);

MqttClient mqttClient = new MqttClientImpl(getMqttClientConfig(),null);

mqttClient.setEventLoop(loop);

mqttClient.setCallback(getMqttClientCallback());

mqttClient.connect(mqttProperties.getUrl(), mqttProperties.getPort()).addListener(future -> {

if (future.isSuccess()){

System.out.println("mqtt客户端已建立连接");

//#为多层通配符,+为单层通配符

mqttClient.on("#",getMqttHandler());

}

});

return mqttClient;

}

@Bean

public MqttHandler getMqttHandler(){

return (topic,payload) -> {

System.out.println("消息主题:" + topic);

System.out.println("消息内容:" + payload);

};

}

@Bean

public MqttClientCallback getMqttClientCallback(){

return new MqttClientCallback() {

@Override

public void connectionLost(Throwable cause) {

cause.printStackTrace();

}

@Override

public void onSuccessfulReconnect() {

System.out.println("客户端已重连");

}

};

}

}

3.运行测试

客户端利用不同主题,发送消息

控制台

消息主题:testTopic/001消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))消息主题:testTopic/001消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))消息主题:test/sub/001消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 31, widx: 31, cap: 496))消息主题:test1消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 496))

单元测试发布消息MqttSeviceDemo.java

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import org.jetlinks.mqtt.client.MqttClient;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)

@SpringBootTest

public class MqttSeviceDemo {

@Autowired

private MqttClient mqttClient;

@Test

public void publishMessage(){

String test = "I am client9527";

byte[] bytes = test.getBytes();

ByteBuf byteBuf = Unpooled.copiedBuffer(bytes);

mqttClient.publish("test/pub/001",byteBuf);

System.out.println("消息已发布");

}

}

客户端订阅到消息


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java理论基础Stream元素的匹配与查找
下一篇:java理论基础函数式接口特点示例解析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~