基于Java实现Socket编程入门

网友投稿 363 2022-08-22


基于Java实现Socket编程入门

目录认识Socket建立socket的基本流程1.最基本的Socket示范1.1单向通信1.2双向通信2.发送更多的消息:结束的界定2.1使用特殊符号2.2根据长度界定3.处理更多的连接:多线程3.1同时实现消息的发送与接收3.2使用线程池优化服务端并发能力4.连接保活4.1使用心跳包4.2断开时重连

认识Socket

socket,又称套接字,是在不同的进程间进行网络通讯的一种协议、约定或者说是规范。

对于socket编程,它更多的时候像是基于TCP/UDP等协议做的一层封装或者说抽象,是一套系统所提供的用于进行网络通信相关编程的接口。

建立socket的基本流程

我们以linux操作系统提供的基本api为例,了解建立一个socket通信的基本流程:

可以看到本质上,socket是对tcp连接(当然也有可能是udp等其他连接)协议,在编程层面上的简化和抽象。

1.最基本的Socket示范

1.1 单向通信

首先,我们从只发送和接收一次消息的socket基础代码开始:

服务端:

package com.marklux.socket.base;

import java.io.IOException;

import java.io.InputStream;

import java.net.ServerSocket;

import java.net.Socket;

/**

* The very basic socket server that only listen one single message.

*/

public class BaseSocketServer {

private ServerSocket server;

private Socket socket;

private int port;

private InputStream inputStream;

private static final int MAX_BUFFER_SIZE = 1024;

public int getPort() {

return port;

}

public void setPort(int port) {

this.port = port;

}

public BaseSocketServer(int port) {

this.port = port;

}

public void runServerSingle() throws IOException {

this.server = new ServerSocket(this.port);

System.out.println("base socket server started.");

// the code will block here till the request come.

this.socket = server.accept();

this.inputStream = this.socket.getInputStream();

byte[] readBytes = new byte[MAX_BUFFER_SIZE];

int msgLen;

StringBuilder stringBuilder = new StringBuilder();

while ((msgLen = inputStream.read(readBytes)) != -1) {

stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8"));

}

System.out.println("get message from client: " + stringBuilder);

inputStream.close();

socket.close();

server.close();

}

public static void main(String[] args) {

BaseSocketServer bs = new BaseSocketServer(9799);

try {

bs.runServerSingle();

}catch (IOException e) {

e.printStackTrace();

}

}

}

客户端:

package com.marklux.socket.base;

import java.io.IOException;

import java.io.OutputStream;

import java.io.UnsupportedEncodingException;

import java.net.Socket;

/**

* The very basic socket client that only send one single message.

*/

public class BaseSocketClient {

private String serverHost;

private int serverPort;

private Socket socket;

private OutputStream outputStream;

public BaseSocketClient(String host, int port) {

this.serverHost = host;

this.serverPort = port;

}

public void connetServer() throws IOException {

this.socket = new Socket(this.serverHost, this.serverPort);

this.outputStream = socket.getOutputStream();

// why the output stream?

}

public void sendSingle(String message) throws IOException {

try {

this.outputStream.write(message.getBytes("UTF-8"));

} catch (UnsupportedEncodingException e) {

System.out.println(e.getMessage());

}

this.outputStream.close();

this.socket.close();

}

public static void main(String[] args) {

BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799);

try {

bc.connetServer();

bc.sendSingle("Hi from mark.");

}catch (IOException e) {

e.printStackTrace();

}

}

}

先运行服务端,再运行客户端,就可以看到效果。

注意这里的IO操作实现,我们使用了一个大小为MAX_BUFFER_SIZE的byte数组作为缓冲区,然后从输入流中取出字节放置到缓冲区,再从缓冲区中取出字节构建到字符串中去,这在输入流文件很大时非常有用,事实上,后面要讲到的NIO也是基于这种思路实现的。

1.2 双向通信

上面的例子只实现了一次单向的通信,这显然有点浪费通道。socket连接支持全双工的双向通信(底层是tcp),下面的例子中,服务端在收到客户端的消息后,将返回给客户端一个回执。

并且我们使用了一些java.io包装好的方法,来简化整个通信的流程(因为消息长度不大,不再使用缓冲区)。

服务端:

public void runServer() throws IOException {

this.serverSocket = new ServerSocket(port);

this.socket = serverSocket.accept();

this.inputStream = socket.getInputStream();

String message = new String(inputStream.readAllBytes(), "UTF-8");

System.out.println("received message: " + message);

this.socket.shutdownInput(); // 告诉客户端接收已经完毕,之后只能发送

// write the receipt.

this.outputStream = this.socket.getOutputStream();

String receipt = "We received your message: " + message;

outputStream.write(receipt.getBytes("UTF-8"));

this.outputStream.close();

this.socket.close();

}

客户端:

public void sendMessage(String message) throws IOException {

this.socket = new Socket(host,port);

this.outputStream = socket.getOutputStream();

this.outputStream.write(message.getBytes("UTF-8"));

this.socket.shutdownOutput(); // 告诉服务器,所有的发送动作已经结束,之后只能接收

this.inputStream = socket.getInputStream();

String receipt = new String(inputStream.readAllBytes(), "UTF-8");

System.out.println("got receipt: " + receipt);

this.inputStream.close();

this.socket.close();

}

注意这里我们在服务端接受到消息以及客户端发送消息后,分别调用了shutdownInput()和shutdownOutput()而不是直接close对应的stream,这是因为在关闭任何一个stream,都会直接导致socket的关闭,也就无法进行后面回执的发送了。但是注意,调用shutdownInput()和shutdownOutput()之后,对应的流也会被关闭,不能再次向socket发送/写入了。

2. 发送更多的消息:结束的界定

刚才的两个例子中,每次打开流,都只能进行一次写入/读取操作,结束后对应流被关闭,就无法再次写入/读取了。

在这种情况下,如果要发送两次消息,就不得不建立两个socket,既耗资源又麻烦。其实我们完全可以不关闭对应的流,只要分次写入消息就可以了。

但是这样的话,我们就必须面对另一个问题:如何判断一次消息发送的结束呢?

2.1 使用特殊符号

最简单的办法是使用一些特殊的符号来标记一次发送完成,服务端只要读到对应的符号就可以完成一次读取,然后进行相关的处理操作。

下面的例子中我们使用换行符\n来标记一次发送的结束,服务端每接收到一个消息,就打印一次,并且使用了Scanner来简化操作:

服务端:

public void runServer() throws IOException {

this.server = new ServerSocket(this.port);

System.out.println("base socket server started.");

this.socket = server.accept();

// the code will block here till the request come.

this.inputStream = this.socket.getInputStream();

Scanner sc = new Scanner(this.inputStream);

while (sc.hasNextLine()) {

System.out.println("get info from client: " + sc.nextLine());

} // 循环接收并输出消息内容

this.inputStream.close();

socket.close();

}

客户端:

public void connetServer() throws IOException {

this.socket = new Socket(this.serverHost, this.serverPort);

this.outputStream = socket.getOutputStream();

}

public void send(String message) throws IOException {

String sendMsg = message + "\n"; // we mark \n as a end of line.

try {

this.outputStream.write(sendMsg.getBytes("UTF-8"));

} catch (UnsupportedEncodingException e) {

System.out.println(e.getMessage());

}

// this.outputStream.close();

// this.socket.shutdownOutput();

}

public static void main(String[] args) {

CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799);

try {

cc.connetServer();

Scanner sc = new Scanner(System.in);

while (sc.hasNext()) {

String line = sc.nextLine();

cc.send(line);

}

}catch (IOException e) {

e.printStackTrace();

}

}

运行后效果是,客户端每输入一行文字按下回车后,服务端就会打印出对应的消息读取记录。

2.2 根据长度界定

回到原点,我们之所以不好定位消息什么时候结束,是因为我们不能够确定每次消息的长度。

那么其实可以先将消息的长度发送出去,当服务端知道消息的长度后,就能够完成一次消息的接收了。

总的来说,发送一次消息变成了两个步骤

发送消息的长度发送消息

最后的问题就是,“发送消息的长度”这一步骤所发送的字节量必须是固定的,否则我们仍然会陷入僵局。

一般来说,我们可以使用固定的字节数来保存消息的长度,比如规定前2个字节就是消息的长度,不过这样我们能够传送的消息最大长度也就被固定死了,以2个字节为例,我们发送的消息最大长度不超过2^16个字节即64K。

如果你了解一些字符的编码,就会知道,其实我们可以使用变长的空间来储存消息的长度,比如:

第一个字节首位为0:即0XXXXXXX,表示长度就一个字节,最大128,表示128B第一个字节首位为110,那么附带后面一个字节表示长度:即110XXXXX 10XXXXXX,最大2048,表示2K第一个字节首位为1110,那么附带后面二个字节表示长度:即110XXXXX 10XXXXXX 10XXXXXX,最大131072,表示128K依次类推

当然这样实现起来会麻烦一些,因此下面的例子里我们仍然使用固定的两个字节来记录消息的长度。

服务端:

public void runServer() throws IOException {

this.serverSocket = new ServerSocket(this.port);

this.socket = serverSocket.accept();

this.inputStream = socket.getInputStream();

byte[] bytes;

while (true) {

// 先读第一个字节

int first = inputStream.read();

if (first == -1) {

// 如果是-1,说明输入流已经被关闭了,也就不需要继续监听了

this.socket.close();

break;

}

// 读取第二个字节

int second = inputStream.read();

int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度

bytes = new byte[length]; // 构建指定长度的字节大小来储存消息即可

inputStream.read(bytes);

System.out.println("receive message: " + new String(bytes,"UTF-8"));

}

}

客户端:

public void connetServer() throws IOException {

this.socket = new Socket(host,port);

this.outputStream = socket.getOutputStream();

}

public void sendMessage(String message) throws IOException {

// 首先要把message转换成bytes以便处理

byte[] bytes = message.getBytes("UTF-8");

// 接下来传输两个字节的长度,依然使用移位实现

int length = bytes.length;

this.outputStream.write(length >> 8); // write默认一次只传输一个字节

this.outputStream.write(length);

// 传输完长度后,再正式传送消息

this.outputStream.write(bytes);

}

public static void main(String[] args) {

LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799);

try {

lc.connetServer();

Scanner sc = new Scanner(System.in);

while (sc.hasNextLine()) {

lc.sendMessage(sc.nextLine());

}

} catch (IOException e) {

e.printStackTrace();

}

}

3. 处理更多的连接:多线程

3.1 同时实现消息的发送与接收

在考虑服务端处理多连接之前,我们先考虑使用多线程改造一下原有的一对一对话实例。

在原有的例子中,消息的接收方并不能主动地向对方发送消息,换句话说我们并没有实现真正的互相对话,这主要是因为消息的发送和接收这两个动作并不能同时进行,因此我们需要使用两个线程,其中一个用于监听键盘输入并将其写入socket,另一个则负责监听socket并将接受到的消息显示。

出于简单考虑,我们直接让主线程负责键盘监听jEVuJpC和消息发送,同时另外开启一个线程用于拉取消息并显示。

消息拉取线程 ListenThread.java

public class ListenThread implements Runnable {

private Socket socket;

private InputStream inputStream;

public ListenThread(Socket socket) {

this.socket = socket;

}

@Override

public void run() throws RuntimeException{

try {

this.inputStream = socket.getInputStream();

} catch (IOException e) {http://

e.printStackTrace();

throw new RuntimeException(e.getMessage());

}

while (true) {

try {

int first = this.inputStream.read();

if (first == -1) {

// 输入流已经被关闭,无需继续读取

throw new RuntimeException("disconnected.");

}

int second = this.inputStream.read();

int msgLength = (first<<8) + second;

byte[] readBuffer = new byte[msgLength];

this.inputStream.read(readBuffer);

System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8"));

} catch (IOException e) {

e.printStackTrace();

throw new RuntimeException(e.getMessage());

}

}

}

}

主线程,启动时由用户选择是作为server还是client:

public class ChatSocket {

private String host;

private int port;

private Socket socket;

private ServerSocket serverSocket;

private OutputStream outputStream;

// 以服务端形式启动,创建会话

public void runAsServer(int port) throws IOException {

this.serverSocket = new ServerSocket(port);

System.out.println("[log] server started at port " + port);

// 等待客户端的加入

this.socket = serverSocket.accept();

System.out.println("[log] successful connected with " + socket.getInetAddress());

// 启动监听线程

Thread listenThread = new Thread(new ListenThread(this.socket));

listenThread.start();

waitAndSend();

}

// 以客户端形式启动,加入会话

public void runAsClient(String host, int port) throws IOException {

this.socket = new Socket(host, port);

System.out.println("[log] successful connected to server " + socket.getInetAddress());

Thread listenThread = new Thread(new ListenThread(this.socket));

listenThread.start();

waitAndSend();

}

public void waitAndSend() throws IOException {

this.outputStream = this.socket.getOutputStream();

Scanner sc = new Scanner(System.in);

while (sc.hasNextLine()) {

this.sendMessage(sc.nextLine());

}

}

public void sendMessage(String message) throws IOException {

byte[] msgBytes = message.getBytes("UTF-8");

int length = msgBytes.length;

outputStream.write(length>>8);

outputStream.write(length);

outputStream.write(msgBytes);

}

public static void main(String[] args) {

Scanner scanner = new Scanner(System.in);

ChatSocket chatSocket = new ChatSocket();

System.out.println("select connect type: 1 for server and 2 for client");

int type = Integer.parseInt(scanner.nextLine().toString());

if (type == 1) {

System.out.print("input server port: ");

int port = scanner.nextInt();

try {

chatSocket.runAsServer(port);

} catch (IOException e) {

e.printStackTrace();

}

}else if (type == 2) {

System.out.print("input server host: ");

String host = scanner.nextLine();

System.out.print("input server port: ");

int port = scanner.nextInt();

try {

chatSocket.runAsClient(host, port);

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

3.2 使用线程池优化服务端并发能力

作为服务端,如果一次只跟一个客户端建立socket连接,未免显得太过浪费资源,因此我们完全可以让服务端和多个客户端建立多个socket。

那么既然要处理多个连接,就不得不面对并发问题了(当然,你也可以写循环轮流处理)。我们可以使用多线程来处理并发,不过线程的创建和销毁都会消耗大量的资源和时间,所以最好一步到位,用一个线程池来实现。

下面给出一个示范性质的服务端代码:

public class SocketServer {

public static void main(String args[]) throws Exception {

// 监听指定的端口

int port = 55533;

ServerSocket server = new ServerSocket(port);

// server将一直等待连接的到来

System.out.println("server将一直等待连接的到来");

//如果使用多线程,那就需要线程池,防止并发过高时创建过多线程耗尽资源

ExecutorService threadPool = Executors.newFixedThreadPool(100);

while (true) {

Socket socket = server.accept();

Runnable runnable=()->{

try {

// 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取

InputStream inputStream = socket.getInputStream();

byte[] bytes = new byte[1024];

int len;

StringBuilder sb = new StringBuilder();

while ((len = inputStream.read(bytes)) != -1) {

// 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8

sb.append(new String(bytes, 0, len, "UTF-8"));

}

System.out.println("get message from client: " + sb);

inputStream.close();

socket.close();

} catch (Exception e) {

e.printStackTrace();

}

};

threadPool.submit(runnable);

}

}

}

4. 连接保活

我想你不难发现一个问题,那就是当socket连接成功建立后,如果中途发生异常导致其中一方断开连接,此时另一方是无法发现的,只有在再次尝试发送/接收消息才会因为抛出异常而退出。

简单的说,就是我们维持的socket连接,是一个长连接,但我们没有保证它的时效性,上一秒它可能还是可以用的,但是下一秒就不一定了。

4.1 使用心跳包

保证连接随时可用的最常见方法就是定时发送心跳包,来检测连接是否正常。这对于实时性要求很高的服务而言,还是非常重要的(比如消息推送)。

大体的方案如下:

双方约定好心跳包的格式,要能够区别于普通的消息。客户端每隔一定时间,就向服务端发送一个心跳包服务端每接收到心跳包时,将其抛弃如果客户端的某个心跳包发送失败,就可以判断连接已经断开如果对实时性要求很高,服务端也可以定时检查客户端发送心跳包的频率,如果超过一定时间没有发送可以认为连接已经断开

4.2 断开时重连

使用心跳包必然会增加带宽和性能的负担,对于普通的应用我们其实并没有必要使用这种方案,如果消息发送时抛出了连接异常,直接尝试重新连接就好了。

跟上面的方案对比,其实这个抛出异常的消息就充当了心跳包的角色。

总的来说,连接是否要保活,如何保活,需要根据具体的业务场景灵活地思考和定制。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:mybatis plus 关联数据库排除不必要字段方式
下一篇:一篇文章超详细的介绍Java继承
相关文章