zookeeper python接口实例详解
533
2023-05-12
java中处理socket通信过程中粘包的情况
这两天学习了java中处理socket通信过程中粘包的情况,而且很重要,所以,今天添加一点小笔记。
处理粘包程序是客户端的接受消息线程:
客户端:
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Reader;
import java.net.Socket;
import java.nio.CharBuffer;
public class TestSocketClient {
public static void main(String[] args) {
// TODO Auto-generated method stub
new TestSocketClient().start();
}
class SendThread extends Thread{
private Socket socket;
public SendThread(Socket socket){
this.socket=socket;
}
@Override
public void run(){
while(true){
try{
Thread.sleep(1000);
String send="
PrintWriter pw=new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
pw.write(send);
pw.flush();
}catch(Exception e){
e.printStackTrace();
}
}
}
}
class ReceiveThread extends Thread{
private Socket socket;
private volatile byte[] bytes=new byte[0];
public ReceiveThread(Socket socket){
this.socket=socket;
}
public byte[] mergebyte(byte[] a,byte[] b,int begin,int end){
byte[] add=new byte[a.length+end-begin];
int i=0;
for(i=0;i add[i]=a[i]; } for(int k=begin;k add[i]=b[k]; } return add; } @Override public void run(){ while(true){ try{ InputStream reader=socket.getInputStream(); if(bytes.length<2){ byte[] head=new byte[2-bytes.length]; int couter=reader.read(head); if(couter<0){ continue; } bytes=mergebyte(bytes,head,0,couter); if(couter<2){ continue; } } //下面这个值请注意,一定要取2长度的字节子数组作为报文长度,你懂得 byte[] temp=new byte[0]; temp=mergebyte(temp,bytes,0,2); String templength=new String(temp); int bodylength=Integer.parseInt(templength); if(bytes.length-2 byte[] body=new byte[bodylength+2-bytes.length]; int couter=reader.read(body); if(couter<0){ continue; } bytes=mergebyte(bytes,body,0,couter); if(couter continue; } } byte[] body=new byte[0]; body=mergebyte(body, bytes, 2, bytes.length); System.out.println("client receive body: "+new String(body)); bytes=new byte[0]; }catch(Exception e){ e.printStackTrace(); } } } } public void start(){ try{ Socket socket=new Socket("127.0.0.1",18889); new SendThread(socket).start(); new ReceiveThread(socket).start(); }catch(Exception e){ e.printStackTrace(); } } } 服务端: package com.meituan.service.bankgate.gateway; /** * Created by cqx on 16/7/19. */ import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.nio.CharBuffer; import java.util.Date; public class TESTAHAHHA { private final static String SOAP_BEGIN = " private final static String SOAP_END = " public static void main(String[] args) { // TODO Auto-generated method stub TESTAHAHHA testserver=new TESTAHAHHA(); testserver.start(); } public void start(){ try{ ServerSocket serversocket=new ServerSocket(18889); while(true){ Socket socket=serversocket.accept(); new SocketThread(socket).start(); } }catch(Exception e){ e.printStackTrace(); } } class SocketThread extends Thread{ private Socket socket; private String temp; public SocketThread(Socket socket){ this.socket=socket; } public Socket getsohttp://cket(){ return this.socket; } public void setsocjet(Socket socket){ this.socket=socket; } @Override public void run(){ try{ Reader reader=new InputStreamReader(socket.getInputStream()); // Writer writer=new PrintWriter(new OutputStreamWriter(socket.getOutputStream(),"UTF-8")); OutputStream writer=socket.getOutputStream(); CharBuffer charbuffer=CharBuffer.allocate(8192); int readindex=-1; while((readindex=reader.read(charbuffer))!=-1){ charbuffer.flip(); temp+=charbuffer.toString(); if(temp.indexOf(SOAP_BEGIN)!=-1 && temp.indexOf(SOAP_END)!=-1){ //System.out.println(new Date().toLocaleString()+"server:"+temp); temp=""; String str="receive the soap message hahahah"; byte[] headbytes=str.getBytes(); int length=headbytes.length; String l=String.valueOf(length); byte[] lengthbytes=l.getBytes(); byte[] bytes=new byte[length+lengthbytes.length]; int i=0; for(i=0;i bytes[i]=lengthbytes[i]; } for(int j=i,k=0;k bytes[j]=headbytes[k]; } System.out.println("server send:"+new String(bytes)); writer.write(bytes); writer.flush(); }else if(temp.indexOf(SOAP_BEGIN)!=-1){ temp=temp.substring(temp.indexOf(SOAP_BEGIN)); } if(temp.length()>1024*16){ break; } } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket!=null){ try{ if(!socket.isClosed()){ socket.close(); } }catch(Exception e){ e.printStackTrace(); } } } } } }
add[i]=a[i];
}
for(int k=begin;k add[i]=b[k]; } return add; } @Override public void run(){ while(true){ try{ InputStream reader=socket.getInputStream(); if(bytes.length<2){ byte[] head=new byte[2-bytes.length]; int couter=reader.read(head); if(couter<0){ continue; } bytes=mergebyte(bytes,head,0,couter); if(couter<2){ continue; } } //下面这个值请注意,一定要取2长度的字节子数组作为报文长度,你懂得 byte[] temp=new byte[0]; temp=mergebyte(temp,bytes,0,2); String templength=new String(temp); int bodylength=Integer.parseInt(templength); if(bytes.length-2 byte[] body=new byte[bodylength+2-bytes.length]; int couter=reader.read(body); if(couter<0){ continue; } bytes=mergebyte(bytes,body,0,couter); if(couter continue; } } byte[] body=new byte[0]; body=mergebyte(body, bytes, 2, bytes.length); System.out.println("client receive body: "+new String(body)); bytes=new byte[0]; }catch(Exception e){ e.printStackTrace(); } } } } public void start(){ try{ Socket socket=new Socket("127.0.0.1",18889); new SendThread(socket).start(); new ReceiveThread(socket).start(); }catch(Exception e){ e.printStackTrace(); } } } 服务端: package com.meituan.service.bankgate.gateway; /** * Created by cqx on 16/7/19. */ import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.nio.CharBuffer; import java.util.Date; public class TESTAHAHHA { private final static String SOAP_BEGIN = " private final static String SOAP_END = " public static void main(String[] args) { // TODO Auto-generated method stub TESTAHAHHA testserver=new TESTAHAHHA(); testserver.start(); } public void start(){ try{ ServerSocket serversocket=new ServerSocket(18889); while(true){ Socket socket=serversocket.accept(); new SocketThread(socket).start(); } }catch(Exception e){ e.printStackTrace(); } } class SocketThread extends Thread{ private Socket socket; private String temp; public SocketThread(Socket socket){ this.socket=socket; } public Socket getsohttp://cket(){ return this.socket; } public void setsocjet(Socket socket){ this.socket=socket; } @Override public void run(){ try{ Reader reader=new InputStreamReader(socket.getInputStream()); // Writer writer=new PrintWriter(new OutputStreamWriter(socket.getOutputStream(),"UTF-8")); OutputStream writer=socket.getOutputStream(); CharBuffer charbuffer=CharBuffer.allocate(8192); int readindex=-1; while((readindex=reader.read(charbuffer))!=-1){ charbuffer.flip(); temp+=charbuffer.toString(); if(temp.indexOf(SOAP_BEGIN)!=-1 && temp.indexOf(SOAP_END)!=-1){ //System.out.println(new Date().toLocaleString()+"server:"+temp); temp=""; String str="receive the soap message hahahah"; byte[] headbytes=str.getBytes(); int length=headbytes.length; String l=String.valueOf(length); byte[] lengthbytes=l.getBytes(); byte[] bytes=new byte[length+lengthbytes.length]; int i=0; for(i=0;i bytes[i]=lengthbytes[i]; } for(int j=i,k=0;k bytes[j]=headbytes[k]; } System.out.println("server send:"+new String(bytes)); writer.write(bytes); writer.flush(); }else if(temp.indexOf(SOAP_BEGIN)!=-1){ temp=temp.substring(temp.indexOf(SOAP_BEGIN)); } if(temp.length()>1024*16){ break; } } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket!=null){ try{ if(!socket.isClosed()){ socket.close(); } }catch(Exception e){ e.printStackTrace(); } } } } } }
add[i]=b[k];
}
return add;
}
@Override
public void run(){
while(true){
try{
InputStream reader=socket.getInputStream();
if(bytes.length<2){
byte[] head=new byte[2-bytes.length];
int couter=reader.read(head);
if(couter<0){
continue;
}
bytes=mergebyte(bytes,head,0,couter);
if(couter<2){
continue;
}
}
//下面这个值请注意,一定要取2长度的字节子数组作为报文长度,你懂得
byte[] temp=new byte[0];
temp=mergebyte(temp,bytes,0,2);
String templength=new String(temp);
int bodylength=Integer.parseInt(templength);
if(bytes.length-2 byte[] body=new byte[bodylength+2-bytes.length]; int couter=reader.read(body); if(couter<0){ continue; } bytes=mergebyte(bytes,body,0,couter); if(couter continue; } } byte[] body=new byte[0]; body=mergebyte(body, bytes, 2, bytes.length); System.out.println("client receive body: "+new String(body)); bytes=new byte[0]; }catch(Exception e){ e.printStackTrace(); } } } } public void start(){ try{ Socket socket=new Socket("127.0.0.1",18889); new SendThread(socket).start(); new ReceiveThread(socket).start(); }catch(Exception e){ e.printStackTrace(); } } } 服务端: package com.meituan.service.bankgate.gateway; /** * Created by cqx on 16/7/19. */ import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.nio.CharBuffer; import java.util.Date; public class TESTAHAHHA { private final static String SOAP_BEGIN = " private final static String SOAP_END = " public static void main(String[] args) { // TODO Auto-generated method stub TESTAHAHHA testserver=new TESTAHAHHA(); testserver.start(); } public void start(){ try{ ServerSocket serversocket=new ServerSocket(18889); while(true){ Socket socket=serversocket.accept(); new SocketThread(socket).start(); } }catch(Exception e){ e.printStackTrace(); } } class SocketThread extends Thread{ private Socket socket; private String temp; public SocketThread(Socket socket){ this.socket=socket; } public Socket getsohttp://cket(){ return this.socket; } public void setsocjet(Socket socket){ this.socket=socket; } @Override public void run(){ try{ Reader reader=new InputStreamReader(socket.getInputStream()); // Writer writer=new PrintWriter(new OutputStreamWriter(socket.getOutputStream(),"UTF-8")); OutputStream writer=socket.getOutputStream(); CharBuffer charbuffer=CharBuffer.allocate(8192); int readindex=-1; while((readindex=reader.read(charbuffer))!=-1){ charbuffer.flip(); temp+=charbuffer.toString(); if(temp.indexOf(SOAP_BEGIN)!=-1 && temp.indexOf(SOAP_END)!=-1){ //System.out.println(new Date().toLocaleString()+"server:"+temp); temp=""; String str="receive the soap message hahahah"; byte[] headbytes=str.getBytes(); int length=headbytes.length; String l=String.valueOf(length); byte[] lengthbytes=l.getBytes(); byte[] bytes=new byte[length+lengthbytes.length]; int i=0; for(i=0;i bytes[i]=lengthbytes[i]; } for(int j=i,k=0;k bytes[j]=headbytes[k]; } System.out.println("server send:"+new String(bytes)); writer.write(bytes); writer.flush(); }else if(temp.indexOf(SOAP_BEGIN)!=-1){ temp=temp.substring(temp.indexOf(SOAP_BEGIN)); } if(temp.length()>1024*16){ break; } } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket!=null){ try{ if(!socket.isClosed()){ socket.close(); } }catch(Exception e){ e.printStackTrace(); } } } } } }
byte[] body=new byte[bodylength+2-bytes.length];
int couter=reader.read(body);
if(couter<0){
continue;
}
bytes=mergebyte(bytes,body,0,couter);
if(couter
continue;
}
}
byte[] body=new byte[0];
body=mergebyte(body, bytes, 2, bytes.length);
System.out.println("client receive body: "+new String(body));
bytes=new byte[0];
}catch(Exception e){
e.printStackTrace();
}
}
}
}
public void start(){
try{
Socket socket=new Socket("127.0.0.1",18889);
new SendThread(socket).start();
new ReceiveThread(socket).start();
}catch(Exception e){
e.printStackTrace();
}
}
}
服务端:
package com.meituan.service.bankgate.gateway;
/**
* Created by cqx on 16/7/19.
*/
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.CharBuffer;
import java.util.Date;
public class TESTAHAHHA {
private final static String SOAP_BEGIN = " private final static String SOAP_END = "
public static void main(String[] args) {
// TODO Auto-generated method stub
TESTAHAHHA testserver=new TESTAHAHHA();
testserver.start();
}
public void start(){
try{
ServerSocket serversocket=new ServerSocket(18889);
while(true){
Socket socket=serversocket.accept();
new SocketThread(socket).start();
}
}catch(Exception e){
e.printStackTrace();
}
}
class SocketThread extends Thread{
private Socket socket;
private String temp;
public SocketThread(Socket socket){
this.socket=socket;
}
public Socket getsohttp://cket(){
return this.socket;
}
public void setsocjet(Socket socket){
this.socket=socket;
}
@Override
public void run(){
try{
Reader reader=new InputStreamReader(socket.getInputStream());
// Writer writer=new PrintWriter(new OutputStreamWriter(socket.getOutputStream(),"UTF-8"));
OutputStream writer=socket.getOutputStream();
CharBuffer charbuffer=CharBuffer.allocate(8192);
int readindex=-1;
while((readindex=reader.read(charbuffer))!=-1){
charbuffer.flip();
temp+=charbuffer.toString();
if(temp.indexOf(SOAP_BEGIN)!=-1 && temp.indexOf(SOAP_END)!=-1){
//System.out.println(new Date().toLocaleString()+"server:"+temp);
temp="";
String str="receive the soap message hahahah";
byte[] headbytes=str.getBytes();
int length=headbytes.length;
String l=String.valueOf(length);
byte[] lengthbytes=l.getBytes();
byte[] bytes=new byte[length+lengthbytes.length];
int i=0;
for(i=0;i bytes[i]=lengthbytes[i]; } for(int j=i,k=0;k bytes[j]=headbytes[k]; } System.out.println("server send:"+new String(bytes)); writer.write(bytes); writer.flush(); }else if(temp.indexOf(SOAP_BEGIN)!=-1){ temp=temp.substring(temp.indexOf(SOAP_BEGIN)); } if(temp.length()>1024*16){ break; } } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket!=null){ try{ if(!socket.isClosed()){ socket.close(); } }catch(Exception e){ e.printStackTrace(); } } } } } }
continue;
}
}
byte[] body=new byte[0];
body=mergebyte(body, bytes, 2, bytes.length);
System.out.println("client receive body: "+new String(body));
bytes=new byte[0];
}catch(Exception e){
e.printStackTrace();
}
}
}
}
public void start(){
try{
Socket socket=new Socket("127.0.0.1",18889);
new SendThread(socket).start();
new ReceiveThread(socket).start();
}catch(Exception e){
e.printStackTrace();
}
}
}
服务端:
package com.meituan.service.bankgate.gateway;
/**
* Created by cqx on 16/7/19.
*/
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.CharBuffer;
import java.util.Date;
public class TESTAHAHHA {
private final static String SOAP_BEGIN = " private final static String SOAP_END = "
private final static String SOAP_END = "
public static void main(String[] args) {
// TODO Auto-generated method stub
TESTAHAHHA testserver=new TESTAHAHHA();
testserver.start();
}
public void start(){
try{
ServerSocket serversocket=new ServerSocket(18889);
while(true){
Socket socket=serversocket.accept();
new SocketThread(socket).start();
}
}catch(Exception e){
e.printStackTrace();
}
}
class SocketThread extends Thread{
private Socket socket;
private String temp;
public SocketThread(Socket socket){
this.socket=socket;
}
public Socket getsohttp://cket(){
return this.socket;
}
public void setsocjet(Socket socket){
this.socket=socket;
}
@Override
public void run(){
try{
Reader reader=new InputStreamReader(socket.getInputStream());
// Writer writer=new PrintWriter(new OutputStreamWriter(socket.getOutputStream(),"UTF-8"));
OutputStream writer=socket.getOutputStream();
CharBuffer charbuffer=CharBuffer.allocate(8192);
int readindex=-1;
while((readindex=reader.read(charbuffer))!=-1){
charbuffer.flip();
temp+=charbuffer.toString();
if(temp.indexOf(SOAP_BEGIN)!=-1 && temp.indexOf(SOAP_END)!=-1){
//System.out.println(new Date().toLocaleString()+"server:"+temp);
temp="";
String str="receive the soap message hahahah";
byte[] headbytes=str.getBytes();
int length=headbytes.length;
String l=String.valueOf(length);
byte[] lengthbytes=l.getBytes();
byte[] bytes=new byte[length+lengthbytes.length];
int i=0;
for(i=0;i bytes[i]=lengthbytes[i]; } for(int j=i,k=0;k bytes[j]=headbytes[k]; } System.out.println("server send:"+new String(bytes)); writer.write(bytes); writer.flush(); }else if(temp.indexOf(SOAP_BEGIN)!=-1){ temp=temp.substring(temp.indexOf(SOAP_BEGIN)); } if(temp.length()>1024*16){ break; } } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket!=null){ try{ if(!socket.isClosed()){ socket.close(); } }catch(Exception e){ e.printStackTrace(); } } } } } }
bytes[i]=lengthbytes[i];
}
for(int j=i,k=0;k bytes[j]=headbytes[k]; } System.out.println("server send:"+new String(bytes)); writer.write(bytes); writer.flush(); }else if(temp.indexOf(SOAP_BEGIN)!=-1){ temp=temp.substring(temp.indexOf(SOAP_BEGIN)); } if(temp.length()>1024*16){ break; } } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket!=null){ try{ if(!socket.isClosed()){ socket.close(); } }catch(Exception e){ e.printStackTrace(); } } } } } }
bytes[j]=headbytes[k];
}
System.out.println("server send:"+new String(bytes));
writer.write(bytes);
writer.flush();
}else if(temp.indexOf(SOAP_BEGIN)!=-1){
temp=temp.substring(temp.indexOf(SOAP_BEGIN));
}
if(temp.length()>1024*16){
break;
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(socket!=null){
try{
if(!socket.isClosed()){
socket.close();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
}
}
}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~