SpringBoot webSocket实现发送广播、点对点消息和Android接收

网友投稿 841 2023-06-02


SpringBoot webSocket实现发送广播、点对点消息和Android接收

1、SpringBoot webSocket

SpringBoot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做STOMP的协议。

1.1 STOMP协议说明

STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。

它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,类似于OpenWire(一种二进制协议)。

由于其设计简单,很容易开发客户端,因此在多种语言和多种平台上得到广泛应用。其中最流行的STOMP消息代理是Apache ActiveMQ。

1.2 搭建

本人使用的是Inject idea 搭建的springBoot websocket,并未采用熟悉的gradle,而是采用了maven方式搭建。

项目结构如下

pom.xml:

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.drawthink

websocketdemo

0.0.1-SNAPSHOT

jar

webSocketdemo

webSocketDemo project for Spring Boot

org.springframework.boot

spring-boot-starter-parent

1.3.6.RELEASE

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter-thymeleaf

org.springframework.boot

spring-boot-starter-websocket

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.drawthink

websocketdemo

0.0.1-SNAPSHOT

jar

webSocketdemo

webSocketDemo project for Spring Boot

org.springframework.boot

spring-boot-starter-parent

1.3.6.RELEASE

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter-thymeleaf

org.springframework.boot

spring-boot-starter-websocket

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

Application:

package com.drawthink;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class WebSocketdemoApplication {

public static void main(String[] args) {

SpringApplication.run(WebSocketdemoApplication.class, args);

}

}

WebSocketConfig

package com.drawthink.websocket;

import org.springframework.context.annotation.Configuration;

import org.springframework.messaging.simp.config.MessageBrokerRegistry;

import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;

import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

/**

* Created by lincoln on 16-10-25

*/

@Configuration

@EnableWebSocketMessageBroker

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

@Override

public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {

//允许使用socketjs方式访问,访问点为hello,允许跨域

stompEndpointRegistry.addEndpoint("/hello").setAllowedOrigins("*").withSockJS();

}

@Override

public void configureMessageBroker(MessageBrokerRegistry registry) {

//订阅Broker名称

registry.enableSimpleBroker("/topic","/user");

//全局使用的订阅前缀(客户端订阅路径上会体现出来)

registry.setApplicationDestinationPrefixes("/app/");

//点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/

//registry.setUserDestinationPrefix("/user/");

}

}

WebSocketController

package com.drawthink.websocket.controller;

import com.drawthink.message.ClientMessage;

import com.drawthink.message.ServerMessage;

import com.drawthink.message.ToUserMessage;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.handler.annotation.MessageMapping;

import org.springframework.messaging.handler.annotation.SendTo;

import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.stereotype.Controller;

/**

* Created by lincoln on 16-10-25

*/

@Controller

public class WebSocketController {

@MessageMapping("/welcome")

//SendTo 发送至 Broker 下的指定订阅路径

@SendTo("/topic/getResponse")

public ServerMessage say(ClientMessage clientMessage){

//方法用于广播测试

System.out.println("clientMessage.getName() = " + clientMessage.getName());

return new ServerMessage("Welcome , "+clientMessage.getName()+" !");

}

//注入SimpMessagingTemplate 用于点对点消息发送

@Autowired

private SimpMessagingTemplate messagingTemplate;

@MessageMapping("/cheat")

// 发送的订阅路径为/user/{userId}/message

// /user/路径是默认的一个,如果想要改变,必须在config 中setUserDestinationPrefix

public void cheatTo(ToUserMessage toUserMessage){

//方法用于点对点测试

System.out.println("toUserMessage.getMessage() = " + toUserMessage.getMessage());

System.out.println("toUserMessage.getUserId() = " + toUserMessage.getUserId()); messagingTemplate.convertAndSendToUser(toUserMessage.getUserId(),"/message",toUserMessage.getMessage());

}

}

Vo

package com.drawthink.message;

/**

* Created by lincoln on 16-10-25

*/

public class ClientMessage {

private String name;

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

}

package com.drawthink.message;

/**

* Created by lincoln on 16-10-25

*/

public class ServerMessage {

private String responseMessage;

public ServerMessage(String responseMessage) {

this.responseMessage = responseMessage;

}

public String getResponseMessage() {

return responseMessage;

}

public void setResponseMessage(String responseMessage) {

this.responseMessage = responseMessage;

}

}

package com.drawthink.message;

/**

* Created by lincoln on 16-10-25

*/

public class ToUserMessage {

private String userId;

private String message;

public String getUserId() {

return userId;

}

public void setUserId(String userId) {

this.userId = userId;

}

public String getMessage() {

return message;

}

public void setMessage(String message) {

this.message = message;

}

}

android 客户端

STOMP协议在Android系统中没有默认实现,必须自行去实现。不过好消息是,开源大神们已经完成了Android上使用STOMP协议的实现,所以我们只需要使用就好了。

地址:StompProtocolAndroid_jb51.rar

搭建

build.gradle(app)

apply plugin: 'com.android.application'

android {

compileSdkVersion 24

buildToolsVersion "24.0.3"

defaultConfig {

applicationId "com.drawthink.websocket"

minSdkVersion 16

targetSdkVersion 24

versionCode 1

versionName "1.0"

testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"

}

buildTypes {

release {

minifyEnabled false

proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'

}

}

}

dependencies {

compile fileTree(include: ['*.jar'], dir: 'libs')

androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {

exclude group: 'com.android.support', module: 'support-annotations'

})

compile 'com.android.support:appcompat-v7:24.2.1'

testCompile 'junit:junit:4.12'

//依赖STOMP协议的Android实现

compile 'com.github.NaikSoftware:StompProtocolAndroid:1.1.1'

//StompProtocolAndroid 依赖于webSocket的标准实现

compile 'org.java-websocket:Java-WebSocket:1.3.0'

}

接收广播实例:

package com.drawthink.websocket;

import android.content.Intent;

import android.os.Bundle;

import android.support.v7.app.AppCompatActivity;

import android.util.Log;

import android.view.View;

import android.widget.Button;

import android.widget.EditText;

import android.widget.TextView;

import android.widget.Toast;

import org.java_websocket.WebSocket;

import rx.Subscriber;

import rx.functions.Action1;

import ua.naiksoftware.stomp.LifecycleEvent;

import ua.naiksoftware.stomp.Stomp;

import ua.naiksoftware.stomp.client.StompClient;

import ua.naiksoftware.stomp.client.StompMessage;

import static android.content.ContentValues.TAG;

public class MainActivity extends AppCompatActivity {

private TextView serverMessage;

private Button start;

private Button stop;

private Button send;

private EditText editText;

private StompClient mStompClient;

private Button cheat;

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_main);

bindView();

start.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View v) {

//创建client 实例

createStompClient();

//订阅消息

registerStompTopic();

}

});

send.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View v) {

mStompClient.send("/app/welcome","{\"name\":\""+editText.getText()+"\"}")

.subscribe(new Subscriber() {

@Override

public void onCompleted() {

toast("发送成功");

}

@Override

public void onError(Throwable e) {

e.printStackTrace();

toast("发送错误");

}

@Override

public void onNext(Void aVoid) {

}

});

}

});

stop.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View v) {

mStompClient.disconnect();

}

});

cheat.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View v) {

startActivity(new Intent(MainActivity.this,CheatActivity.class));

if(mStompClient != null) {

mStompClient.disconnect();

}

finish();

}

});

}

private void showMessage(final StompMessage stompMessage) {

runOnUiThread(new Runnable() {

@Override

public void run() {

serverMessage.setText("stomp command is --->"+stompMessage.getStompCommand() +" body is --->"+stompMessage.getPayload());

}

});

}

//创建client 实例

private void createStompClient() {

mStompClient = Stomp.over(WebSocket.class, "ws://192.168.0.46:8080/hello/websocket");

mStompClient.connect();

Toast.makeText(MainActivity.this,"开始连接 192.168.0.46:8080",Toast.LENGTH_SHORT).show();

mStompClient.lifecycle().subscribe(new Action1() {

@Override

public void call(LifecycleEvent lifecycleEvent) {

switch (lifecycleEvent.getType()) {

case OPENED:

Log.d(TAG, "Stomp connection opened");

toast("连接已开启");

break;

case ERROR:

Log.e(TAG, "Stomp Error", lifecycleEvent.getException());

toast("连接出错");

break;

case CLOSED:

Log.d(TAG, "Stomp connection closed");

toast("连接关闭");

break;

}

}

});

}

//订阅消息

private void registerStompTopic() {

mStompClient.topic("/topic/getResponse").subscribe(new Action1() {

@Override

public void call(StompMessage stompMessage) {

Log.e(TAG, "call: " +stompMessage.getPayload() );

showMessage(stompMessage);

}

});

}

private void toast(final String message) {

runOnUiThread(new Runnable() {

@Override

public void run() {

Toast.makeText(MainActivity.this,message,Toast.LENGTH_SHORT).show();

}

});

}

private void bindView() {

serverMessage = (TextView) findViewById(R.id.serverMessage);

start = (Button) findViewById(R.id.start);

stop = (Button) findViewById(R.id.stop);

send = (Button) findViewById(R.id.send);

editText = (EditText) findViewById(R.id.clientMessage);

cheat = (Button) findViewById(R.id.cheat);

}

}

点对点

package com.drawthink.websocket;

import android.os.Bundle;

import android.support.v7.app.AppCompatActivity;

import android.util.Log;

import android.view.View;

import android.widget.Button;

import android.widget.EditText;

import android.widget.LinearLayout;

import android.widget.TextView;

import android.widget.Toast;

import org.java_websocket.WebSocket;

import rx.Subscriber;

import rx.functions.Action1;

import ua.naiksoftware.stomp.LifecycleEvent;

import ua.naiksoftware.stomp.Stomp;

import ua.naiksoftware.stomp.client.StompClient;

import ua.naiksoftware.stomp.client.StompMessage;

import static android.content.ContentValues.TAG;

public class CheatActivity extends AppCompatActivity {

private EditText cheat;

private Button send;

private LinearLayout message;

private StompClient mStompClient;

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_cheat);

bindView();

createStompClient();

registerStompTopic();

send.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View v) {

// 向/app/cheat发送Json数据

mStompClient.send("/app/cheat","{\"userId\":\"lincoln\",\"message\":\""+cheat.getText()+"\"}")

.subscribe(new Subscriber() {

@Override

public void onCompleted() {

toast("发送成功");

}

@Override

public void onError(Throwable e) {

e.printStackTrace();

toast("发送错误");

}

@Override

public void onNext(Void aVoid) {

}

});

}

});

}

private void bindView() {

cheat = (EditText) findViewById(R.id.cheat);

send = (Button) findViewById(R.id.send);

message = (LinearLayout) findViewById(R.id.message);

}

private void createStompClient() {

mStompClient = Stomp.over(WebSocket.class, "ws://192.168.0.46:8080/hello/websocket");

mStompClient.connect();

Toast.makeText(CheatActivity.this,"开始连接 192.168.0.46:8080",Toast.LENGTH_SHORT).show();

mStompClient.lifecycle().subscribe(new Action1() {

@Override

public void call(LifecycleEvent lifecycleEvent) {

switch (lifecycleEvent.getType()) {

case OPENED:

Log.d(TAG, "Stomp connection opened");

toast("连接已开启");

break;

case ERROR:

Log.e(TAG, "Stomp Error", lifecycleEvent.getException());

toast("连接出错");

break;

case CLOSED:

Log.d(TAG, "Stomp connection closed");

toast("连接关闭");

break;

}

}

});

}

// 接收/user/xiaoli/message路径发布的消息

private void registerStompTopic() {

mStompClient.topic("/user/xiaoli/message").subscribe(new Action1() {

@Override

public void call(StompMessage stompMessage) {

Log.e(TAG, "call: " +stompMessage.getPayload() );

showMessage(stompMessage);

}

});

}

private void showMessage(qOmaoNvrRfinal StompMessage stompMessage) {

runOnUiThread(new Runnable() {

@Override

public void run() {

TextView text = new TextView(CheatActivity.this);

text.setLayoutParams(new LinearLayout.LayoutParams(LinearLayout.LayoutParams.MATCH_PARENT, LinearLayout.LayoutParams.WRAP_CONTENT));

text.setText(System.currentTimeMillis() +" body is --->"+stompMessage.getPayload());

message.addView(text);

}

});

}

private void toast(final String message) {

runOnUiThread(new Runnable() {

@Override

public void run() {

Toast.makeText(CheatActivity.this,message,Toast.LENGTH_SHORT).show();

}

});

}

}

代码比较乱,说明一下。

1、STOMP 使用的时候,关键是发布订阅的关系,使用过消息队列,例如rabbitMQ的应该很容易理解。

服务器端 WebSocketConfig.Java文件控制的就是订阅发布的路径关系。

2、websocket的路径说明,本例中连接的是ws://192.168.0.46:8080/hello/websocket路径,/hello是在WebSocketConfig的stompEndpointRegistry.addEndpoint(“/hello”).setAllowedOrigins(““).withSockJS();*确定的, 如果有多个endpoint,这个地方的路径也会随之变化。

3、发布路径

发布信息的路径是由WebSocketConfig中的 setApplicationDestinationPrefixes(“/app/”); 和 Controller 中@MessageMapping(“/welcome”) 组合确定的。

例如发广播消息,路径为/app/welcome

例如发点对点消息,路径为/app/cheat

4、消息订阅路径

订阅broker源自WebSocketConfig中的registry.enableSimpleBroker(“/topic”,”/user”);此处开放了两个broker,具体的订阅服务路径给基于Controller中的 @SendTo(“/topic/getResponse”)或SimpMessagingTemplate中给定。(注:此处,服务器和客户端须约定订阅路径)

5、关于心跳

订阅发布模型的心跳很简单,客户端向一个指定的心跳路径发送心跳,服务器处理,服务器使用指定的订阅路径向客户端发心跳,即可。因为没有Socket,只需要记录是否联通的状态即可,重连客户端做一下就好了。

本人菜鸟,肯定有些地方没有搞清楚,如果有误,请大神斧正。

代码下载地址:blogRepository_jb51.rar


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Vue.2.0.5过渡效果使用技巧
下一篇:Java Resource路径整理总结
相关文章

 发表评论

暂时没有评论,来抢沙发吧~