Spring Boot+RabbitMQ 通过fanout模式实现消息接收功能(支持消费者多实例部署)
328
2022-08-20
springboot整合netty
目录1.添加依赖2.源码3.运行测试
1.添加依赖
http://
2.源码
application.yml
#mqtt配置
mqtt:
username: admin
password: 123456
#推送信息的连接地址
url: localhost
port: 1884
#默认发送的主题
defaultTopic: topic
#clientid
clientId: client
#连接超时时间 单位为秒
completionTimeout: 300
#设置会话心跳时间 单位为秒
keepAliveInterval: 20
MqttProperties.java
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private String username;
private String password;
private String url;
private int port;
private String clientId;
private String defaultTopic;
private int completionTimeout;
private int keepAliveInterval;
}
MqttConfig.java
import com.xingyun.netty.mqtt.prop.MqttProperties;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.AllArgsConstructor;
import org.jetlinks.mqtt.client.*;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@AllArgsConstructor
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfig {
private final MqttProperties mqttProperties;
@Bean
public MqttClientConfig getMqttClientConfig() {
MqttClientConfig mqttClientConfig = new MqttClientConfig();
mqttClientConfig.setClientId(mqttProperties.getClientId());
mqttClientConfig.setUsername(mqttProperties.getClientId());
mqttClientConfig.setPassword(mqttProperties.getPassword());
/*mqttClientConfig.setTimeoutSeconds(mqttProperties.getCompletionTimeout());
mqttClientConfig.setRetryInterval(mqttProperties.getKeepAliveInterval());
mqttClientConfig.setProtocolVersion(MqttVersion.MQTT_3_1_1);
mqttClientConfig.setReconnect(true);*/
return mqttClientConfig;
}
@Bean
public MqttClient getMqttClient(){
EventLoopGroup loop = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
MqttClient mqttClient = new MqttClientImpl(getMqttClientConfig(),null);
mqttClient.setEventLoop(loop);
mqttClient.setCallback(getMqttClientCallback());
mqttClient.connect(mqttProperties.getUrl(), mqttProperties.getPort()).addListener(future -> {
if (future.isSuccess()){
System.out.println("mqtt客户端已建立连接");
//#为多层通配符,+为单层通配符
mqttClient.on("#",getMqttHandler());
}
});
return mqttClient;
}
@Bean
public MqttHandler getMqttHandler(){
return (topic,payload) -> {
System.out.println("消息主题:" + topic);
System.out.println("消息内容:" + payload);
};
}
@Bean
public MqttClientCallback getMqttClientCallback(){
return new MqttClientCallback() {
@Override
public void connectionLost(Throwable cause) {
cause.printStackTrace();
}
@Override
public void onSuccessfulReconnect() {
System.out.println("客户端已重连");
}
};
}
}
3.运行测试
客户端利用不同主题,发送消息
控制台
消息主题:testTopic/001消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))消息主题:testTopic/001消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))消息主题:test/sub/001消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 31, widx: 31, cap: 496))消息主题:test1消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 496))
单元测试发布消息MqttSeviceDemo.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetlinks.mqtt.client.MqttClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqttSeviceDemo {
@Autowired
private MqttClient mqttClient;
@Test
public void publishMessage(){
String test = "I am client9527";
byte[] bytes = test.getBytes();
ByteBuf byteBuf = Unpooled.copiedBuffer(bytes);
mqttClient.publish("test/pub/001",byteBuf);
System.out.println("消息已发布");
}
}
客户端订阅到消息
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~