0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

实现一款高可用的TCP数据传输服务器(Java版)

Android编程精选 来源:Java知音 作者:Java知音 2022-11-15 11:23 次阅读

1.netty能做什么

首先netty是一款高性能、封装性良好且灵活、基于NIO(真·非阻塞IO)的开源框架。可以用来手写web服务器、TCP服务器等,支持的协议丰富,如:常用的HTTP/HTTPS/WEBSOCKET,并且提供的大量的方法,十分灵活,可以根据自己的需求量身DIV一款服务器。

用netty编写TCP的服务器/客户端

1.可以自己设计数据传输协议如下面这样:

8671b226-627e-11ed-8abf-dac502259ad0.png

2.可以自定义编码规则和解码规则

3.可以自定义客户端与服务端的数据交互细节,处理socket流攻击、TCP的粘包和拆包问题

2.Quick Start

创建一个普通的maven项目,不依赖任何的三方web服务器,用main方法执行即可。

8682f34c-627e-11ed-8abf-dac502259ad0.png

加入POM依赖

 

io.netty
netty-all
4.1.6.Final

 

com.fasterxml.jackson.core
jackson-databind
2.9.7

 

log4j
log4j
1.2.17

设计一套基于TCP的数据传输协议

publicclassTcpProtocol{
privatebyteheader=0x58;
privateintlen;
privatebyte[]data;
privatebytetail=0x63;

publicbytegetTail(){
returntail;
}

publicvoidsetTail(bytetail){
this.tail=tail;
}

publicTcpProtocol(intlen,byte[]data){
this.len=len;
this.data=data;
}

publicTcpProtocol(){
}

publicbytegetHeader(){
returnheader;
}

publicvoidsetHeader(byteheader){
this.header=header;
}

publicintgetLen(){
returnlen;
}

publicvoidsetLen(intlen){
this.len=len;
}

publicbyte[]getData(){
returndata;
}

publicvoidsetData(byte[]data){
this.data=data;
}
}

这里使用16进制表示协议的开始位和结束位,其中0x58代表开始,0x63代表结束,均用一个字节来进行表示。

TCP服务器的启动类

publicclassTcpServer{
privateintport;
privateLoggerlogger=Logger.getLogger(this.getClass());
publicvoidinit(){
logger.info("正在启动tcp服务器……");
NioEventLoopGroupboss=newNioEventLoopGroup();//主线程组
NioEventLoopGroupwork=newNioEventLoopGroup();//工作线程组
try{
ServerBootstrapbootstrap=newServerBootstrap();//引导对象
bootstrap.group(boss,work);//配置工作线程组
bootstrap.channel(NioServerSocketChannel.class);//配置为NIO的socket通道
bootstrap.childHandler(newChannelInitializer(){
protectedvoidinitChannel(SocketChannelch)throwsException{//绑定通道参数
ch.pipeline().addLast("logging",newLoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程
ch.pipeline().addLast("encode",newEncoderHandler());//编码器。发送消息时候用过
ch.pipeline().addLast("decode",newDecoderHandler());//解码器,接收消息时候用
ch.pipeline().addLast("handler",newBusinessHandler());//业务处理类,最终的消息会在这个handler中进行业务处理
}
});
bootstrap.option(ChannelOption.SO_BACKLOG,1024);//缓冲区
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);//ChannelOption对象设置TCP套接字的参数,非必须步骤
ChannelFuturefuture=bootstrap.bind(port).sync();//使用了Future来启动线程,并绑定了端口
logger.info("启动tcp服务器启动成功,正在监听端口:"+port);
future.channel().closeFuture().sync();//以异步的方式关闭端口

}catch(InterruptedExceptione){
logger.info("启动出现异常:"+e);
}finally{
work.shutdownGracefully();
boss.shutdownGracefully();//出现异常后,关闭线程组
logger.info("tcp服务器已经关闭");
}

}

publicstaticvoidmain(String[]args){
newTcpServer(8777).init();
}
publicTcpServer(intport){
this.port=port;
}
}

只要是基于netty的服务器,都会用到bootstrap 并用这个对象绑定工作线程组,channel的Class,以及用户DIV的各种pipeline的handler类,注意在添加自定义handler的时候,数据的流动顺序和pipeline中添加hanlder的顺序是一致的。也就是说,从上往下应该为:底层字节流的解码/编码handler、业务处理handler。

编码器

编码器是服务器按照协议格式返回数据给客户端时候调用的,继承MessageToByteEncoder代码:

publicclassEncoderHandlerextendsMessageToByteEncoder{
privateLoggerlogger=Logger.getLogger(this.getClass());
protectedvoidencode(ChannelHandlerContextctx,Objectmsg,ByteBufout)throwsException{
if(msginstanceofTcpProtocol){
TcpProtocolprotocol=(TcpProtocol)msg;
out.writeByte(protocol.getHeader());
out.writeInt(protocol.getLen());
out.writeBytes(protocol.getData());
out.writeByte(protocol.getTail());
logger.debug("数据编码成功:"+out);
}else{
logger.info("不支持的数据协议:"+msg.getClass()+"	期待的数据协议类是:"+TcpProtocol.class);
}
}
}

解码器

解码器属于比较核心的部分,自定义解码协议、粘包、拆包等都在里面实现,继承自ByteToMessageDecoder,其实ByteToMessageDecoder的内部已经帮我们处理好了拆包/粘包的问题,只需要按照它的设计原则去实现decode方法即可:

publicclassDecoderHandlerextendsByteToMessageDecoder{
//最小的数据长度:开头标准位1字节
privatestaticintMIN_DATA_LEN=6;
//数据解码协议的开始标志
privatestaticbytePROTOCOL_HEADER=0x58;
//数据解码协议的结束标志
privatestaticbytePROTOCOL_TAIL=0x63;
privateLoggerlogger=Logger.getLogger(this.getClass());
protectedvoiddecode(ChannelHandlerContextctx,ByteBufin,Listout)throwsException{

if(in.readableBytes()>MIN_DATA_LEN){
logger.debug("开始解码数据……");
//标记读操作的指针
in.markReaderIndex();
byteheader=in.readByte();
if(header==PROTOCOL_HEADER){
logger.debug("数据开头格式正确");
//读取字节数据的长度
intlen=in.readInt();
//数据可读长度必须要大于len,因为结尾还有一字节的解释标志位
if(len>=in.readableBytes()){
logger.debug(String.format("数据长度不够,数据协议len长度为:%1$d,数据包实际可读内容为:%2$d正在等待处理拆包……",len,in.readableBytes()));
in.resetReaderIndex();
/*
**结束解码,这种情况说明数据没有到齐,在父类ByteToMessageDecoder的callDecode中会对out和in进行判断
*如果in里面还有可读内容即in.isReadable为true,cumulation中的内容会进行保留,,直到下一次数据到来,将两帧的数据合并起来,再解码。
*以此解决拆包问题
*/
return;
}
byte[]data=newbyte[len];
in.readBytes(data);//读取核心的数据
bytetail=in.readByte();
if(tail==PROTOCOL_TAIL){
logger.debug("数据解码成功");
out.add(data);
//如果out有值,且in仍然可读,将继续调用decode方法再次解码in中的内容,以此解决粘包问题
}else{
logger.debug(String.format("数据解码协议结束标志位:%1$d [错误!],期待的结束标志位是:%2$d",tail,PROTOCOL_TAIL));
return;
}
}else{
logger.debug("开头不对,可能不是期待的客服端发送的数,将自动略过这一个字节");
}
}else{
logger.debug("数据长度不符合要求,期待最小长度是:"+MIN_DATA_LEN+"字节");
return;
}

}
}

首先是黏包问题:

如图,正常的数据传输应该是像数据A那样,一包就是一个完整的数据,但也有不正常的情况,比如一包数据包含多个数据。而在ByteToMessageDecoder会默认把二进制的字节码放在byteBuf中,因此我们在code的时候要知道会有这样的场景。

8697411c-627e-11ed-8abf-dac502259ad0.png

而粘包问题实际上不需要我们去解决,下面是ByteToMessageDecoder的源码,callDecode中回调我们手写解码器的decode方法。

protectedvoidcallDecode(ChannelHandlerContextctx,ByteBufin,Listout){
try{
while(in.isReadable()){//buf中是否还有数据
intoutSize=out.size();//标记out的size,解析成功的数据会添加的out中
if(outSize>0){
fireChannelRead(ctx,out,outSize);//这个是回调业务handler的channelRead方法
out.clear();
if(ctx.isRemoved()){
break;
}
outSize=0;//清空了out,将标记size清零
}
intoldInputLength=in.readableBytes();//这里开始准备调用decode方法,标记了解码前的可读内容
decode(ctx,in,out);//对应DecoderHandler中的decode方法
if(ctx.isRemoved()){
break;
}

if(outSize==out.size()){//相等说明,并没有解析出来新的object到out中
if(oldInputLength==in.readableBytes()){//这里很重要,若相等说明decode中没有读取任何内容出来,这里一般是发生拆包后,将ByteBuf的指针手动重置。重置后从这个方法break出来。让ByteToMessageDecoder去处理拆包问题。这里就体现了要按照netty的设计原则来写代码
break;
}else{
continue;//这里直接continue,是考虑让开发者去跳过某些字节,比如收到了socket攻击时,数据不按照协议体来的时候,就直接跳过这些字节
}
}

if(oldInputLength==in.readableBytes()){//这种情况属于,没有按照netty的设计原则来。要么是decode中没有任何逻辑代码,要么是在out中添加了内容后,调用了byteBuf的resetReaderIndex重置的读操作的指针
thrownewDecoderException(
StringUtil.simpleClassName(getClass())+
".decode()didnotreadanythingbutdecodedamessage.");
}

if(isSingleDecode()){//默认为false,用来设置只解析一条数据
break;
}
//这里结束后,继续wile循环,因为bytebuf仍然有可读的内容,将会继续调用decode方法解析bytebuf中的字节码,以此解决了粘包问题
}
}catch(DecoderExceptione){
throwe;
}catch(Throwablecause){
thrownewDecoderException(cause);
}
}

综合上面的源码分析后,我们发现:decode方法在while循环中,也就是bytebuf只要有内容就会一直调用decode方法进行解码操作,因此在解决粘包问题时,只需要按照正常流程来就行了,解析协议开头、数据字节、结束标志后将数据放入到out这个list中即可。后面将会有数据进行粘包测试。

拆包问题

有时候,我们接收到的数据是不完整的,一个包的数据被拆成了很多份被后再发送出去。这种情况有可能是数据太大,被分割成很多份发送出去。比如数据包B被拆成两份进行发送:

86a9e448-627e-11ed-8abf-dac502259ad0.png

拆包问题,同样在ByteToMessageDecoder 给我们解决了,我们只需要按照netty的设计原则去写decode代码即可。

首先,假设需要我们自己去解决拆包问题应该怎么实现?

先从问题开始分析,需要的是数据B,但是却只收到了数据B_1,这个时候应该等待剩余的数据B_2的到来,收到的数据B_1应该用一个累加器存起来,等到B_2到来的时候将两包数据合并起来再进行解码。

那么问题是,如何让ByteToMessageDecoder这个知道数据不完整呢,在DecoderHandler.decode中有这样一段代码:

if(len>=in.readableBytes()){
logger.debug(String.format("数据长度不够,数据协议len长度为:%1$d,数据包实际可读内容为:%2$d正在等待处理拆包……",len,in.readableBytes()));
in.resetReaderIndex();
/*
**结束解码,这种情况说明数据没有到齐,在父类ByteToMessageDecoder的callDecode中会对out和in进行判断
*如果in里面还有可读内容即in.isReadable为true,cumulation中的内容会进行保留,,直到下一次数据到来,将两帧的数据合并起来,再解码。
*以此解决拆包问题
*/
return;
}

当读到协议中的len大于bytebuf的可读内容时候说明数据不完整,发生了拆包,调用resetReaderIndex将读操作指针复位,并结束方法。再看看父类中的CallDecode方法的一段代码:

if(outSize==out.size()){//相等说明,并没有解析出来新的object到out中
if(oldInputLength==in.readableBytes()){//这里很重要,若相等说明decode中没有读取任何内容出来,这里一般是发生拆包后,将ByteBuf的指针手动重置。重置后从这个方法break出来。让ByteToMessageDecoder去处理拆包问题。这里就体现了要按照netty的设计原则来写代码
break;//退出该方法
}else{
continue;//这里直接continue,是考虑让开发者去跳过某些字节,比如收到了socket攻击时,数据不按照协议体来的时候,就直接跳过这些字节
}
}

退出callDecode后,返回到channelRead中:

publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
if(msginstanceofByteBuf){
CodecOutputListout=CodecOutputList.newInstance();
try{
ByteBufdata=(ByteBuf)msg;
first=cumulation==null;
if(first){
cumulation=data;
}else{
cumulation=cumulator.cumulate(ctx.alloc(),cumulation,data);
}
callDecode(ctx,cumulation,out);//注意这里传入的不是data,而是cumulator,这个对象相当于一个累加器,也就是说每次调用callDecode的时候传入的byteBuf实际上是经过累加后的cumulation
}catch(DecoderExceptione){
throwe;
}catch(Throwablet){
thrownewDecoderException(t);
}finally{
if(cumulation!=null&&!cumulation.isReadable()){//这里若是数据被读取完,会清空累加器cumulation
numReads=0;
cumulation.release();
cumulation=null;
}elseif(++numReads>=discardAfterReads){
//WedidenoughreadsalreadytrytodiscardsomebytessowenotrisktoseeaOOME.
//Seehttps://github.com/netty/netty/issues/4275
numReads=0;
discardSomeReadBytes();
}

intsize=out.size();
decodeWasNull=!out.insertSinceRecycled();
fireChannelRead(ctx,out,size);
out.recycle();
}
}else{
ctx.fireChannelRead(msg);
}
}

而channelRead方法是,收到一包数据后就会调用一次。至此,netty帮我们完美解决了拆包问题。我们只需要按着他的设计原则:len>byteBuf.readableBytes时候,重置读指针,结束decode即可。

业务处理handler类

这一层中数据已经被完整的解析出来了,可以直接使用了:

publicclassBusinessHandlerextendsChannelInboundHandlerAdapter{
privateObjectMapperobjectMapper=ByteUtils.InstanceObjectMapper();
privateLoggerlogger=Logger.getLogger(this.getClass());
@Override
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
if(msginstanceofbyte[]){
logger.debug("解码后的字节码:"+newString((byte[])msg,"UTF-8"));
try{
ObjectobjectContainer=objectMapper.readValue((byte[])msg,DTObject.class);
if(objectContainerinstanceofDTObject){
DTObjectdata=(DTObject)objectContainer;
if(data.getClassName()!=null&&data.getObject().length>0){
Objectobject=objectMapper.readValue(data.getObject(),Class.forName(data.getClassName()));
logger.info("收到实体对象:"+object);
}
}
}catch(Exceptione){
logger.info("对象反序列化出现问题:"+e);
}

}
}
}

由于在decode中并没有将字节码反序列成对象,因此需要进一步反序列化。在传输数据的时候,可能传递的对象不只是一种,因此在反序列化也要考虑到这一问题。解决办法是将传输的对象进行二次包装,将全名类信息包含进去:

publicclassDTObject{
privateStringclassName;
privatebyte[]object;
}

这样在反序列化的时候使用Class.forName()获取Class,避免了要写很多if循环判断反序列化的对象的Class。前提是要类名和包路径要完全匹配!

接下来编写一个TCP客户端进行测试

启动类的init方法:

publicvoidinit()throwsInterruptedException{
NioEventLoopGroupgroup=newNioEventLoopGroup();
try{
Bootstrapbootstrap=newBootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.handler(newChannelInitializer(){
@Override
protectedvoidinitChannel(Channelch)throwsException{
ch.pipeline().addLast("logging",newLoggingHandler("DEBUG"));
ch.pipeline().addLast(newEncoderHandler());
ch.pipeline().addLast(newEchoHandler());
}
});
bootstrap.remoteAddress(ip,port);
ChannelFuturefuture=bootstrap.connect().sync();

future.channel().closeFuture().sync();
}catch(InterruptedExceptione){
e.printStackTrace();
}finally{
group.shutdownGracefully().sync();
}
}

客户端的handler:

publicclassEchoHandlerextendsChannelInboundHandlerAdapter{

//连接成功后发送消息测试
@Override
publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
Useruser=newUser();
user.setBirthday(newDate());
user.setUID(UUID.randomUUID().toString());
user.setName("冉鹏峰");
user.setAge(22);
DTObjectdtObject=newDTObject();
dtObject.setClassName(user.getClass().getName());
dtObject.setObject(ByteUtils.InstanceObjectMapper().writeValueAsBytes(user));
TcpProtocoltcpProtocol=newTcpProtocol();
byte[]objectBytes=ByteUtils.InstanceObjectMapper().writeValueAsBytes(dtObject);
tcpProtocol.setLen(objectBytes.length);
tcpProtocol.setData(objectBytes);
ctx.write(tcpProtocol);
ctx.flush();
}
}

这个handler是为了模拟在TCP连接建立好之后发送一包的数据到服务端经行测试,通过channel的write去发送数据,只要在启动类TcpClient配置了编码器的EncoderHandler,就可以直接将对象tcpProtocol传进去,它将在EncoderHandler的encode方法中被自动转换成字节码放入bytebuf中。

正常数据传输测试:

结果:

2019-01-1416:30:34DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-1416:30:34DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-1416:30:34DEBUG[org.wisdom.server.decoder.DecoderHandler]数据解码成功
2019-01-1416:30:34DEBUG [org.wisdom.server.business.BusinessHandler]解码后的字节码:{"className":"pojo.User","object":"eyJuYW1lIjoi5YaJ6bmP5bOwIiwiYWdlIjoyNCwiYmlydGhkYXkiOiIyMDE5LzAxLzE0IDA0OjMwOjE0IiwidWlkIjoiOGY0OTM0OGEtMWNmMy00ZTEyLWEzZTAtY2M1ZTJjZTkzMDdlIn0="}
2019-01-1416:30:34INFO [org.wisdom.server.business.BusinessHandler]收到实体对象:User{name='冉鹏峰',age=24,UID='8f49348a-1cf3-4e12-a3e0-cc5e2ce9307e',birthday=MonJan1404:30:00CST2019}

可以看到最终的实体对象User被成功的解析出来。

在debug模式下还会看到这样的一个表格在控制台输出:

+-------------------------------------------------+
|0123456789abcdef|
+--------+-------------------------------------------------+----------------+
|00000000|58000000b57b22636c6173734e616d65|X....{"className|
|00000010|223a22706f6a6f2e55736572222c226f|":"pojo.User","o|
|00000020|626a656374223a2265794a755957316c|bject":"eyJuYW1l|
|00000030|496a6f693559614a36626d5035624f77|Ijoi5YaJ6bmP5bOw|
|00000040|496977695957646c496a6f794e437769|IiwiYWdlIjoyNCwi|
|00000050|596d6c796447686b59586b694f694979|YmlydGhkYXkiOiIy|
|00000060|4d4445354c7a41784c7a453049444130|MDE5LzAxLzE0IDA0|
|00000070|4f6a4d774f6a45304969776964576c6b|OjMwOjE0IiwidWlk|
|00000080|496a6f694f4759304f544d304f474574|IjoiOGY0OTM0OGEt|
|00000090|4d574e6d4d7930305a5445794c57457a|MWNmMy00ZTEyLWEz|
|000000a0|5a54417459324d315a544a6a5a546b7a|ZTAtY2M1ZTJjZTkz|
|000000b0|4d44646c496e303d227d63|MDdlIn0="}c|
+--------+-------------------------------------------------+----------------+

这个是相当于真实的数据抓包展示,数据被转换成字节码后是以二进制的形式在TCP缓存区冲传输过来。但是二进制太长了,所以一般都是转换成16进制显示的,一个表格显示一个字节的数据,数据由地位到高位由左到右,由上到下进行排列。

其中0x58为TcpProtocol中设置的开始标志,00 00 00 b5为数据的长度,因为是int类型所以占用了四个字节从7b--7d内容为要传输的数据内容,结尾的0x63为TcpProtocol设置的结束标志位。

粘包测试

为了模拟粘包,首先将启动类TcpClient中配置的编码器的EncoderHandler注释掉:

bootstrap.handler(newChannelInitializer(){
@Override
protectedvoidinitChannel(Channelch)throwsException{
ch.pipeline().addLast("logging",newLoggingHandler("DEBUG"));
//ch.pipeline().addLast(newEncoderHandler());因为需要在byteBuf中手动模拟粘包的场景
ch.pipeline().addLast(newEchoHandler());
}
});

然后在发送的时候故意将三帧的数据,放在一个包中就行发送,在EchoHanlder做如下修改:

publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
Useruser=newUser();
user.setBirthday(newDate());
user.setUID(UUID.randomUUID().toString());
user.setName("冉鹏峰");
user.setAge(24);
DTObjectdtObject=newDTObject();
dtObject.setClassName(user.getClass().getName());
dtObject.setObject(ByteUtils.InstanceObjectMapper().writeValueAsBytes(user));
TcpProtocoltcpProtocol=newTcpProtocol();
byte[]objectBytes=ByteUtils.InstanceObjectMapper().writeValueAsBytes(dtObject);
tcpProtocol.setLen(objectBytes.length);
tcpProtocol.setData(objectBytes);
ByteBufbuffer=ctx.alloc().buffer();
buffer.writeByte(tcpProtocol.getHeader());
buffer.writeInt(tcpProtocol.getLen());
buffer.writeBytes(tcpProtocol.getData());
buffer.writeByte(tcpProtocol.getTail());
//模拟粘包的第二帧数据
buffer.writeByte(tcpProtocol.getHeader());
buffer.writeInt(tcpProtocol.getLen());
buffer.writeBytes(tcpProtocol.getData());
buffer.writeByte(tcpProtocol.getTail());
//模拟粘包的第三帧数据
buffer.writeByte(tcpProtocol.getHeader());
buffer.writeInt(tcpProtocol.getLen());
buffer.writeBytes(tcpProtocol.getData());
buffer.writeByte(tcpProtocol.getTail());
ctx.write(buffer);
ctx.flush();
}

运行结果:

2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]数据解码成功
2019-01-14 1651 DEBUG [org.wisdom.server.business.BusinessHandler]解码后的字节码:{"className":"pojo.User","object":"eyJuYW1lIjoi5YaJ6bmP5bOwIiwiYWdlIjoyNCwiYmlydGhkYXkiOiIyMDE5LzAxLzE0IDA0OjQ0OjE0IiwidWlkIjoiODFkZTU5YWUtMzQ4Mi00ZDFhLWJjNDMtN2NjMTJmOTI1ZTUxIn0="}
2019-01-14 1651 INFO [org.wisdom.server.business.BusinessHandler]收到实体对象:User{name='冉鹏峰',age=24,UID='81de59ae-3482-4d1a-bc43-7cc12f925e51',birthday=MonJan140400CST2019}
2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]数据解码成功
2019-01-14 1651 DEBUG [org.wisdom.server.business.BusinessHandler]解码后的字节码:{"className":"pojo.User","object":"eyJuYW1lIjoi5YaJ6bmP5bOwIiwiYWdlIjoyNCwiYmlydGhkYXkiOiIyMDE5LzAxLzE0IDA0OjQ0OjE0IiwidWlkIjoiODFkZTU5YWUtMzQ4Mi00ZDFhLWJjNDMtN2NjMTJmOTI1ZTUxIn0="}
2019-01-14 1651 INFO [org.wisdom.server.business.BusinessHandler]收到实体对象:User{name='冉鹏峰',age=24,UID='81de59ae-3482-4d1a-bc43-7cc12f925e51',birthday=MonJan140400CST2019}
2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-141651DEBUG[org.wisdom.server.decoder.DecoderHandler]数据解码成功
2019-01-14 1651 DEBUG [org.wisdom.server.business.BusinessHandler]解码后的字节码:{"className":"pojo.User","object":"eyJuYW1lIjoi5YaJ6bmP5bOwIiwiYWdlIjoyNCwiYmlydGhkYXkiOiIyMDE5LzAxLzE0IDA0OjQ0OjE0IiwidWlkIjoiODFkZTU5YWUtMzQ4Mi00ZDFhLWJjNDMtN2NjMTJmOTI1ZTUxIn0="}
2019-01-14 1651 INFO [org.wisdom.server.business.BusinessHandler]收到实体对象:User{name='冉鹏峰',age=24,UID='81de59ae-3482-4d1a-bc43-7cc12f925e51',birthday=MonJan140400CST2019}

服务器成功解析出来了三帧的数据,BusinessHandler的channelRead方法被调用了三次。

而抓到的数据包也确实是模拟的三帧数据黏在一个包中:

+-------------------------------------------------+
|0123456789abcdef|
+--------+-------------------------------------------------+----------------+
|00000000|58000000b57b22636c6173734e616d65|X....{"className|
|00000010|223a22706f6a6f2e55736572222c226f|":"pojo.User","o|
|00000020|626a656374223a2265794a755957316c|bject":"eyJuYW1l|
|00000030|496a6f693559614a36626d5035624f77|Ijoi5YaJ6bmP5bOw|
|00000040|496977695957646c496a6f794e437769|IiwiYWdlIjoyNCwi|
|00000050|596d6c796447686b59586b694f694979|YmlydGhkYXkiOiIy|
|00000060|4d4445354c7a41784c7a453049444130|MDE5LzAxLzE0IDA0|
|00000070|4f6a51304f6a45304969776964576c6b|OjQ0OjE0IiwidWlk|
|00000080|496a6f694f44466b5a54553559575574|IjoiODFkZTU5YWUt|
|00000090|4d7a51344d6930305a4446684c574a6a|MzQ4Mi00ZDFhLWJj|
|000000a0|4e444d744e324e6a4d544a6d4f544931|NDMtN2NjMTJmOTI1|
|000000b0|5a545578496e303d227d【63】58000000b5|ZTUxIn0="}cX....|
|000000c0|7b22636c6173734e616d65223a22706f|{"className":"po|
|000000d0|6a6f2e55736572222c226f626a656374|jo.User","object|
|000000e0|223a2265794a755957316c496a6f6935|":"eyJuYW1lIjoi5|
|000000f0|59614a36626d5035624f774969776959|YaJ6bmP5bOwIiwiY|
|00000100|57646c496a6f794e437769596d6c7964|WdlIjoyNCwiYmlyd|
|00000110|47686b59586b694f6949794d4445354c|GhkYXkiOiIyMDE5L|
|00000120|7a41784c7a4530494441304f6a51304f|zAxLzE0IDA0OjQ0O|
|00000130|6a45304969776964576c6b496a6f694f|jE0IiwidWlkIjoiO|
|00000140|44466b5a545535595755744d7a51344d|DFkZTU5YWUtMzQ4M|
|00000150|6930305a4446684c574a6a4e444d744e|i00ZDFhLWJjNDMtN|
|00000160|324e6a4d544a6d4f5449315a54557849|2NjMTJmOTI1ZTUxI|
|00000170|6e303d227d【63】58000000b57b22636c61|n0="}cX....{"cla|
|00000180|73734e616d65223a22706f6a6f2e5573|ssName":"pojo.Us|
|00000190|6572222c226f626a656374223a226579|er","object":"ey|
|000001a0|4a755957316c496a6f693559614a3662|JuYW1lIjoi5YaJ6b|
|000001b0|6d5035624f77496977695957646c496a|mP5bOwIiwiYWdlIj|
|000001c0|6f794e437769596d6c796447686b5958|oyNCwiYmlydGhkYX|
|000001d0|6b694f6949794d4445354c7a41784c7a|kiOiIyMDE5LzAxLz|
|000001e0|4530494441304f6a51304f6a45304969|E0IDA0OjQ0OjE0Ii|
|000001f0|776964576c6b496a6f694f44466b5a54|widWlkIjoiODFkZT|
|00000200|5535595755744d7a51344d6930305a44|U5YWUtMzQ4Mi00ZD|
|00000210|46684c574a6a4e444d744e324e6a4d54|FhLWJjNDMtN2NjMT|
|00000220|4a6d4f5449315a545578496e303d227d|JmOTI1ZTUxIn0="}|
|00000230|【63】|c|
+--------+-------------------------------------------------+----------------+

可以看到确实存在三个尾巴【63】

在netty4.x版本中,粘包问题确实被netty的ByteToMessageDecoder中的CallDecode方法中给处理掉了。

拆包问题

这次还是将TcpClient中的编码器EncoderHandler注释掉,然后在EchoHandler的channelActive中模拟数据的拆包问题:

publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
Useruser=newUser();
user.setBirthday(newDate());
user.setUID(UUID.randomUUID().toString());
user.setName("冉鹏峰");
user.setAge(24);
DTObjectdtObject=newDTObject();
dtObject.setClassName(user.getClass().getName());
dtObject.setObject(ByteUtils.InstanceObjectMapper().writeValueAsBytes(user));
TcpProtocoltcpProtocol=newTcpProtocol();
byte[]objectBytes=ByteUtils.InstanceObjectMapper().writeValueAsBytes(dtObject);
tcpProtocol.setLen(objectBytes.length);
tcpProtocol.setData(objectBytes);
ByteBufbuffer=ctx.alloc().buffer();
buffer.writeByte(tcpProtocol.getHeader());
buffer.writeInt(tcpProtocol.getLen());
buffer.writeBytes(Arrays.copyOfRange(tcpProtocol.getData(),0,tcpProtocol.getLen()/2));//只发送二分之一的数据包
//模拟拆包
ctx.write(buffer);
ctx.flush();
Thread.sleep(3000);//模拟网络延时
buffer=ctx.alloc().buffer();
buffer.writeBytes(Arrays.copyOfRange(tcpProtocol.getData(),tcpProtocol.getLen()/2,tcpProtocol.getLen()));//将剩下的二分之和尾巴发送过去
buffer.writeByte(tcpProtocol.getTail());
ctx.write(buffer);
ctx.flush();

}

运行结果:

首先是客户端这边:

2019-01-141733DEBUG[DEBUG][id:0x3b8cbbbb,L:/127.0.0.1:51138-R:/127.0.0.1:8777]WRITE:95B
+-------------------------------------------------+
|0123456789abcdef|
+--------+-------------------------------------------------+----------------+
|00000000|58000000b57b22636c6173734e616d65|X....{"className|
|00000010|223a22706f6a6f2e55736572222c226f|":"pojo.User","o|
|00000020|626a656374223a2265794a755957316c|bject":"eyJuYW1l|
|00000030|496a6f693559614a36626d5035624f77|Ijoi5YaJ6bmP5bOw|
|00000040|496977695957646c496a6f794e437769|IiwiYWdlIjoyNCwi|
|00000050|596d6c796447686b59586b694f6949|YmlydGhkYXkiOiI|
+--------+-------------------------------------------------+----------------+
2019-01-141733DEBUG[DEBUG][id:0x3b8cbbbb,L:/127.0.0.1:51138-R:/127.0.0.1:8777]FLUSH
2019-01-141736DEBUG[DEBUG][id:0x3b8cbbbb,L:/127.0.0.1:51138-R:/127.0.0.1:8777]WRITE:92B
+-------------------------------------------------+
|0123456789abcdef|
+--------+-------------------------------------------------+----------------+
|00000000|794d4445354c7a41784c7a4530494441|yMDE5LzAxLzE0IDA|
|00000010|314f6a41344f6a45304969776964576c|1OjA4OjE0IiwidWl|
|00000020|6b496a6f694f5745795a6a49354d6d4d|kIjoiOWEyZjI5MmM|
|00000030|744d6a4d354f4330305a6a6b774c5746|tMjM5OC00ZjkwLWF|
|00000040|6b5a5759745a6d466c4e44457a5a6a55|kZWYtZmFlNDEzZjU|
|00000050|354e324533496e303d227d63|5N2E3In0="}c|
+--------+-------------------------------------------------+----------------+
2019-01-141736DEBUG[DEBUG][id:0x3b8cbbbb,L:/127.0.0.1:51138-R:/127.0.0.1:8777]FLUSH

确实是将数据分成两包发送出去了

再看看服务端的输出日志:

2019-01-141733DEBUG[DEBUG][id:0x8e5811b3,L:/127.0.0.1:8777-R:/127.0.0.1:51138]RECEIVED:95B
+-------------------------------------------------+
|0123456789abcdef|
+--------+-------------------------------------------------+----------------+
|00000000|58000000b57b22636c6173734e616d65|X....{"className|
|00000010|223a22706f6a6f2e55736572222c226f|":"pojo.User","o|
|00000020|626a656374223a2265794a755957316c|bject":"eyJuYW1l|
|00000030|496a6f693559614a36626d5035624f77|Ijoi5YaJ6bmP5bOw|
|00000040|496977695957646c496a6f794e437769|IiwiYWdlIjoyNCwi|
|00000050|596d6c796447686b59586b694f6949|YmlydGhkYXkiOiI|
+--------+-------------------------------------------------+----------------+
2019-01-141733DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141733DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-14 1733 DEBUG [org.wisdom.server.decoder.DecoderHandler]数据长度不够,数据协议len长度为:181,数据包实际可读内容为:90正在等待处理拆包……
2019-01-141736DEBUG[DEBUG][id:0x8e5811b3,L:/127.0.0.1:8777-R:/127.0.0.1:51138]RECEIVED:92B
+-------------------------------------------------+
|0123456789abcdef|
+--------+-------------------------------------------------+----------------+
|00000000|794d4445354c7a41784c7a4530494441|yMDE5LzAxLzE0IDA|
|00000010|314f6a41344f6a45304969776964576c|1OjA4OjE0IiwidWl|
|00000020|6b496a6f694f5745795a6a49354d6d4d|kIjoiOWEyZjI5MmM|
|00000030|744d6a4d354f4330305a6a6b774c5746|tMjM5OC00ZjkwLWF|
|00000040|6b5a5759745a6d466c4e44457a5a6a55|kZWYtZmFlNDEzZjU|
|00000050|354e324533496e303d227d63|5N2E3In0="}c|
+--------+-------------------------------------------------+----------------+
2019-01-141736DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141736DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-141736DEBUG[org.wisdom.server.decoder.DecoderHandler]数据解码成功
2019-01-14 1736 DEBUG [org.wisdom.server.business.BusinessHandler]解码后的字节码:{"className":"pojo.User","object":"eyJuYW1lIjoi5YaJ6bmP5bOwIiwiYWdlIjoyNCwiYmlydGhkYXkiOiIyMDE5LzAxLzE0IDA1OjA4OjE0IiwidWlkIjoiOWEyZjI5MmMtMjM5OC00ZjkwLWFkZWYtZmFlNDEzZjU5N2E3In0="}
2019-01-14 1736 INFO [org.wisdom.server.business.BusinessHandler]收到实体对象:User{name='冉鹏峰',age=24,UID='9a2f292c-2398-4f90-adef-fae413f597a7',birthday=MonJan140500CST2019}

在第一包数据,判断到bytebuf中的可读内容不够的时候,终止解码,并且从父类的callDecode中的while循环break出去,在父类的channelRead中等待下一包数据到来的时候将两包数据合并起来再次decode解码。

最后测试下同时出现拆包、粘包的场景

还是将TcpClient中的编码器EncoderHandler注释掉,然后在EchoHandler的ChannelActive方法:

publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
Useruser=newUser();
user.setBirthday(newDate());
user.setUID(UUID.randomUUID().toString());
user.setName("冉鹏峰");
user.setAge(24);
DTObjectdtObject=newDTObject();
dtObject.setClassName(user.getClass().getName());
dtObject.setObject(ByteUtils.InstanceObjectMapper().writeValueAsBytes(user));
TcpProtocoltcpProtocol=newTcpProtocol();
byte[]objectBytes=ByteUtils.InstanceObjectMapper().writeValueAsBytes(dtObject);
tcpProtocol.setLen(objectBytes.length);
tcpProtocol.setData(objectBytes);
ByteBufbuffer=ctx.alloc().buffer();
buffer.writeByte(tcpProtocol.getHeader());
buffer.writeInt(tcpProtocol.getLen());
buffer.writeBytes(Arrays.copyOfRange(tcpProtocol.getData(),0,tcpProtocol.getLen()/2));//拆包,只发送一半的数据

ctx.write(buffer);
ctx.flush();
Thread.sleep(3000);
buffer=ctx.alloc().buffer();
buffer.writeBytes(Arrays.copyOfRange(tcpProtocol.getData(),tcpProtocol.getLen()/2,tcpProtocol.getLen()));//拆包发送剩余的一半数据
buffer.writeByte(tcpProtocol.getTail());
//模拟粘包的第二帧数据
buffer.writeByte(tcpProtocol.getHeader());
buffer.writeInt(tcpProtocol.getLen());
buffer.writeBytes(tcpProtocol.getData());
buffer.writeByte(tcpProtocol.getTail());
//模拟粘包的第三帧数据
buffer.writeByte(tcpProtocol.getHeader());
buffer.writeInt(tcpProtocol.getLen());
buffer.writeBytes(tcpProtocol.getData());
buffer.writeByte(tcpProtocol.getTail());
ctx.write(buffer);
ctx.flush();

}

最后直接查看服务端的输出结果:

2019-01-141725DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141725DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-14 1725 DEBUG [org.wisdom.server.decoder.DecoderHandler]数据长度不够,数据协议len长度为:181,数据包实际可读内容为:90正在等待处理拆包……
2019-01-141728DEBUG[DEBUG][id:0xc46234aa,L:/127.0.0.1:8777-R:/127.0.0.1:51466]RECEIVED:466B
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]数据解码成功
2019-01-14 1728 DEBUG [org.wisdom.server.business.BusinessHandler]解码后的字节码:{"className":"pojo.User","object":"eyJuYW1lIjoi5YaJ6bmP5bOwIiwiYWdlIjoyNCwiYmlydGhkYXkiOiIyMDE5LzAxLzE0IDA1OjE5OjE0IiwidWlkIjoiODE2Zjg2ZDItNDBhMS00MDRkLTgwMWItZmY1NzgxMTJhNjFmIn0="}
2019-01-14 1728 INFO [org.wisdom.server.business.BusinessHandler]收到实体对象:User{name='冉鹏峰',age=24,UID='816f86d2-40a1-404d-801b-ff578112a61f',birthday=MonJan140500CST2019}
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]数据解码成功
2019-01-14 1728 DEBUG [org.wisdom.server.business.BusinessHandler]解码后的字节码:{"className":"pojo.User","object":"eyJuYW1lIjoi5YaJ6bmP5bOwIiwiYWdlIjoyNCwiYmlydGhkYXkiOiIyMDE5LzAxLzE0IDA1OjE5OjE0IiwidWlkIjoiODE2Zjg2ZDItNDBhMS00MDRkLTgwMWItZmY1NzgxMTJhNjFmIn0="}
2019-01-14 1728 INFO [org.wisdom.server.business.BusinessHandler]收到实体对象:User{name='冉鹏峰',age=24,UID='816f86d2-40a1-404d-801b-ff578112a61f',birthday=MonJan140500CST2019}
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]开始解码数据……
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]数据开头格式正确
2019-01-141728DEBUG[org.wisdom.server.decoder.DecoderHandler]数据解码成功
2019-01-14 1728 DEBUG [org.wisdom.server.business.BusinessHandler]解码后的字节码:{"className":"pojo.User","object":"eyJuYW1lIjoi5YaJ6bmP5bOwIiwiYWdlIjoyNCwiYmlydGhkYXkiOiIyMDE5LzAxLzE0IDA1OjE5OjE0IiwidWlkIjoiODE2Zjg2ZDItNDBhMS00MDRkLTgwMWItZmY1NzgxMTJhNjFmIn0="}
2019-01-14 1728 INFO [org.wisdom.server.business.BusinessHandler]收到实体对象:User{name='冉鹏峰',age=24,UID='816f86d2-40a1-404d-801b-ff578112a61f',birthday=MonJan140500CST2019}

总结

对于拆包、粘包只要配合netty的设计原则去实现代码,就能愉快且轻松的解决了。本例虽然通过DTObject包装了数据,避免解码时每增加一种对象类型,就要新增一个if判断的尴尬。但是仍然无法处理传输List、Map时候的场景。

审核编辑:汤梓红

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 服务器
    +关注

    关注

    12

    文章

    8947

    浏览量

    85065
  • JAVA
    +关注

    关注

    19

    文章

    2952

    浏览量

    104470
  • TCP
    TCP
    +关注

    关注

    8

    文章

    1346

    浏览量

    78924

原文标题:实现一款高可用的 TCP 数据传输服务器(Java版)

文章出处:【微信号:AndroidPush,微信公众号:Android编程精选】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    基于LABVIEW的TCP/IP实现两台电脑间的数据传输

    本帖最后由 eehome 于 2013-1-5 09:55 编辑 本人初学者,用LABVIEW实现TCP/IP间计算机数据传输,但是服务器端始终不能接收,用探针监视结果为未执行,
    发表于 05-08 18:05

    WIFI点对点的数据传输定需要服务器吗?

    我想做氧化碳的监控装置,浓度过高是会给我的手机报警,我也可以通过手机获取当前氧化碳数据,可是听别人说做点对点的数据传输还需要
    发表于 11-14 16:56

    请问GPRS如何跟服务器进行双向的网络数据传输

    请问GPRS如何跟服务器进行双向的网络数据传输的。我的意思就是GPRS如何发送数据服务器,那么GPRS又是如何接受到服务器传过来的指令呢?
    发表于 01-16 06:35

    请问怎么通过ESP8266WIFI模块将32的数据传输个云服务器

    想通过ESP8266WIFI模块将32的数据传输个云服务器上,最终实现微信可查看上传的数据求高人指点
    发表于 06-12 04:35

    如何去实现stm32f107vc lwip tcp客户端服务器数据传输

    怎么去建立LWIP客户端模式呢?如何去实现stm32f107vc lwip tcp客户端服务器数据传输呢?
    发表于 11-04 06:54

    如何通过ESP8266将STM32串口数据传输到MQTT服务器

    如何通过ESP8266将STM32串口数据传输到MQTT服务器
    发表于 12-13 06:19

    如何去实现COM口与TCP socket之间的数据传输

    数据传输的原理是什么?如何去实现COM口与TCP socket之间的数据传输呢?
    发表于 02-22 07:44

    nb模块与服务器的连接及数据传输方式

    (pcrf) 5. 应用服务器 IoT数据的最终聚集点,可根据客户需求执行数据处理和其他操作。 三、数据传输方式 为了将IoT数据发送到应用
    发表于 05-06 14:13

    tcp ip 数据传输

    tcp ip 数据传输 现有的许多具有串口管理功能的设备不能进行联网的管理和数据存取,我们可以利用先进的TCP/IP技术和管理方式对
    发表于 12-25 12:59 1072次阅读

    JAVA教程之TCP服务器

    JAVA教程之TCP服务器端,很好的JAVA的资料,快来学习吧
    发表于 04-11 17:28 10次下载

    一款使用GPRS进行无线数据传输的嵌入式终端

    模块简介 该DTU嵌入式模块是一款使用GPRS 进行 无线数据传输的嵌入式终端 ,体积小、接口简单,用户能方便的集成到自己的主板设备上去。支持PPP、TCP、UDP、ICMP等众多复杂网络协议和多
    的头像 发表于 04-08 16:45 8318次阅读

    tcp_ip 协议讲座:介绍数据传输

    介绍了tcp协议:数据传输的问题(交互式数据传输,批量数据传输,流量控制,拥塞避免)
    的头像 发表于 07-03 11:05 3413次阅读
    <b class='flag-5'>tcp</b>_ip 协议讲座:介绍<b class='flag-5'>数据传输</b>

    protues仿真串口数据上传至Web服务器(COM口 与 TCP socket之间数据传输)

    protues仿真串口数据上传至Web服务器(COM口 与 TCP socket之间数据传输)应用场景1.protues仿真
    发表于 12-29 18:56 5次下载
    protues仿真<b class='flag-5'>器</b>串口<b class='flag-5'>数据</b>上传至Web<b class='flag-5'>服务器</b>(COM口 与 <b class='flag-5'>TCP</b> socket之间<b class='flag-5'>数据传输</b>)

    工业控制领域基于TCP/IP的数据传输方案

    电子发烧友网站提供《工业控制领域基于TCP/IP的数据传输方案.pdf》资料免费下载
    发表于 11-16 10:52 0次下载
    工业控制领域基于<b class='flag-5'>TCP</b>/IP的<b class='flag-5'>数据传输</b>方案

    香港CPU服务器如何处理不同类型的数据传输

    香港CPU服务器处理不同类型的数据传输通常涉及以下几个方面: 1、网络配置:服务器需要有适当的网络配置,以支持不同类型的数据传输协议,如TCP
    的头像 发表于 05-21 17:23 313次阅读