项目背景
最近公司某物联网项目需要使用socket长连接进行消息通讯,捣鼓了一版代码上线,结果BUG不断,本猿寝食难安,于是求助度娘,数日未眠项目终于平稳运行了,本着开源共享的精神,本猿把项目代码提炼成了一个demo项目,尽量摒弃了其中丑陋的业务部分,希望与同学们共同学习进步。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
正文
一、项目架构
本项目使用了netty、redis以及springboot2.2.0
二、项目模块
本项目目录结构如下图:
netty-tcp-core是公共模块,主要是工具类。netty-tcp-server是netty服务端,服务端仅作测试使用,实际项目中我们只使用了客户端。netty-tcp-client是客户端,也是本文的重点。
三、业务流程
我们实际项目中使用RocketMQ作为消息队列,本项目由于是demo项目于是改为了BlockingQueue。数据流为:
生产者->消息队列->消费者(客户端)->tcp通道->服务端->tcp通道->客户端。
当消费者接收到某设备发送的消息后,将判断缓存中是否存在该设备与服务端的连接,如果存在并且通道活跃则使用该通道发送消息,如果不存在则创建通道并在通道激活后立即发送消息,当客户端收到来自服务端的消息时进行响应的业务处理。
四、代码详解
1.消息队列
由于本demo项目移除了消息中间件,于是需要自己创建一个本地队列模拟真实使用场景
packageorg.example.client; importorg.example.client.model.NettyMsgModel; importjava.util.concurrent.ArrayBlockingQueue; /** *本项目为演示使用本地队列实际生产中应该使用消息中间件代替(rocketmq或rabbitmq) * *@authorReWind00 *@date2023/2/1511:20 */ publicclassQueueHolder{ privatestaticfinalArrayBlockingQueuequeue=newArrayBlockingQueue<>(100); publicstaticArrayBlockingQueue get(){ returnqueue; } }
使用一个类保存队列的静态实例以便在任何类中都可以快速引用。接下来我们需要启动一个线程去监听队列中的消息,一但消息投递到队列中,我们就取出消息然后异步多线程处理该消息。
publicclassLoopThreadimplementsRunnable{ @Override publicvoidrun(){ for(inti=0;i< MAIN_THREAD_POOL_SIZE; i++) { executor.execute(() ->{ while(true){ //取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到 try{ NettyMsgModelnettyMsgModel=QueueHolder.get().take(); messageProcessor.process(nettyMsgModel); }catch(InterruptedExceptione){ log.error(e.getMessage(),e); } } }); } } }
使用take方法会使该线程一直阻塞直到队列收到消息后进入下一次循环。
2.执行类
process方法来自于MessageProcessor类,该类为单例,但是会有多线程同时执行。
publicvoidprocess(NettyMsgModelnettyMsgModel){ Stringimei=nettyMsgModel.getImei(); try{ synchronized(this){//为避免收到同一台设备多条消息后重复创建客户端,必须加锁 if(redisCache.hasKey(NETTY_QUEUE_LOCK+imei)){//上一条消息处理中 log.info("imei={}消息处理中,重新入列",imei); //放回队列重新等待消费延迟x秒(实际项目中应该使用rocketmq或者rabbitmq实现延迟消费) newTimer().schedule(newTimerTask(){ @Override publicvoidrun(){ QueueHolder.get().offer(nettyMsgModel); } },2000); log.info("imei={}消息处理中,重新入列完成",imei); return; }else{ //如果没有在连接中的直接加锁 redisCache.setCacheObject(NETTY_QUEUE_LOCK+imei,"1",120,TimeUnit.SECONDS); } } //缓存中存在则发送消息 if(NettyClientHolder.get().containsKey(imei)){ NettyClientnettyClient=NettyClientHolder.get().get(imei); if(null!=nettyClient.getChannelFuture()&&nettyClient.getChannelFuture().channel().isActive()){//通道活跃直接发送消息 if(!nettyClient.getChannelFuture().channel().isWritable()){ log.warn("警告,通道不可写,imei={},channelId={}",nettyClient.getImei(), nettyClient.getChannelFuture().channel().id()); } nettyClient.send(nettyMsgModel.getMsg()); }else{ log.info("clientimei={},通道不活跃,主动关闭",nettyClient.getImei()); nettyClient.close(); //重新创建客户端发送 this.createClientAndSend(nettyMsgModel); } }else{//缓存中不存在则创建新的客户端 this.createClientAndSend(nettyMsgModel); } }catch(Exceptione){ log.error(e.getMessage(),e); }finally{ //执行完后解锁 redisCache.deleteObject(NETTY_QUEUE_LOCK+imei); } }
其中imei是我们设备的唯一标识,我们可以用imei作为缓存的key来确认是否已创建过连接。由于我们消息的并发量可能会很大,所以存在当某设备的连接正在创建的过程中,另一个线程收到该设备消息也开始创建连接的情况,所以我们使用synchronized 代码块以及redis分布式锁来避免此情况的发生。当一条消息获得锁后,在锁释放前,后续消息将会被重新放回消息队列并延迟消费。
获取锁的线程会根据imei判断缓存是否存在连接,如果存在直接发送消息,如果不存在则进入创建客户端的方法。
privatevoidcreateClientAndSend(NettyMsgModelnettyMsgModel){ log.info("创建客户端执行中imei={}",nettyMsgModel.getImei()); //此处的DemoClientHandler可以根据自己的业务定义 NettyClientnettyClient=SpringUtils.getBean(NettyClient.class,nettyMsgModel.getImei(),nettyMsgModel.getBizData(), this.createDefaultWorkGroup(this.workerThread),DemoClientHandler.class); executor.execute(nettyClient);//执行客户端初始化 try{ //利用锁等待客户端激活 synchronized(nettyClient){ longc1=System.currentTimeMillis(); nettyClient.wait(5000);//最多阻塞5秒5秒后客户端仍然未激活则自动解锁 longc2=System.currentTimeMillis(); log.info("创建客户端wait耗时={}ms",c2-c1); } if(null!=nettyClient.getChannelFuture()&&nettyClient.getChannelFuture().channel().isActive()){//连接成功 //存入缓存 NettyClientHolder.get().put(nettyMsgModel.getImei(),nettyClient); //客户端激活后发送消息 nettyClient.send(nettyMsgModel.getMsg()); }else{//连接失败 log.warn("客户端创建失败,imei={}",nettyMsgModel.getImei()); nettyClient.close(); //可以把消息重新入列处理 } }catch(Exceptione){ log.error("客户端初始化发送消息异常===>{}",e.getMessage(),e); } }
当netty客户端实例创建后使用线程池执行初始化,由于是异步执行,我们此时立刻发送消息很可能客户端还没有完成连接,因此必须加锁等待。进入synchronized 代码块,使用wait方法等待客户端激活后解锁,参数5000为自动解锁的毫秒数,意思是如果客户端出现异常情况迟迟未能连接成功并激活通道、解锁,则最多5000毫秒后该锁自动解开。
这参数在实际使用时可以视情况调整,在并发量很大的情况下,5秒的阻塞可能会导致线程池耗尽,或内存溢出。待客户端创建成功并激活后则立即发送消息。
3.客户端
packageorg.example.client; importio.netty.bootstrap.Bootstrap; importio.netty.buffer.Unpooled; importio.netty.channel.*; importio.netty.channel.socket.SocketChannel; importio.netty.channel.socket.nio.NioSocketChannel; importio.netty.handler.codec.DelimiterBasedFrameDecoder; importio.netty.handler.codec.string.StringDecoder; importio.netty.handler.codec.string.StringEncoder; importio.netty.handler.timeout.IdleStateHandler; importio.netty.util.CharsetUtil; importlombok.Getter; importlombok.NoArgsConstructor; importlombok.extern.slf4j.Slf4j; importorg.example.client.handler.BaseClientHandler; importorg.example.core.util.SpringUtils; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.context.annotation.Scope; importorg.springframework.stereotype.Component; importorg.springframework.util.StringUtils; importjava.util.Map; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.atomic.AtomicBoolean; importjava.util.concurrent.atomic.AtomicInteger; /** *@authorReWind00 *@date2023/2/159:59 */ @Slf4j @Component @Scope("prototype") @Getter @NoArgsConstructor publicclassNettyClientimplementsRunnable{ @Value("${netty.server.port}") privateintport; @Value("${netty.server.host}") privateStringhost; //客户端唯一标识 privateStringimei; //自定义业务数据 privateMapbizData; privateEventLoopGroupworkGroup; privateClass clientHandlerClass; privateChannelFuturechannelFuture; publicNettyClient(Stringimei,Map bizData,EventLoopGroupworkGroup,Class clientHandlerClass){ this.imei=imei; this.bizData=bizData; this.workGroup=workGroup; this.clientHandlerClass=clientHandlerClass; } @Override publicvoidrun(){ try{ this.init(); log.info("客户端启动imei={}",imei); }catch(Exceptione){ log.error("客户端启动失败:{}",e.getMessage(),e); } } publicvoidclose(){ if(null!=this.channelFuture){ this.channelFuture.channel().close(); } NettyClientHolder.get().remove(this.imei); } publicvoidsend(Stringmessage){ try{ if(!this.channelFuture.channel().isActive()){ log.info("通道不活跃imei={}",this.imei); return; } if(!StringUtils.isEmpty(message)){ log.info("队列消息发送===>{}",message); this.channelFuture.channel().writeAndFlush(message); } }catch(Exceptione){ log.error(e.getMessage(),e); } } privatevoidinit()throwsException{ //将本实例传递到handler BaseClientHandlerclientHandler=SpringUtils.getBean(clientHandlerClass,this); Bootstrapb=newBootstrap(); //2通过辅助类去构造server/client b.group(workGroup) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000) .option(ChannelOption.SO_RCVBUF,1024*32) .option(ChannelOption.SO_SNDBUF,1024*32) .handler(newChannelInitializer (){ @Override protectedvoidinitChannel(SocketChannelch)throwsException{ ch.pipeline().addLast(newDelimiterBasedFrameDecoder(1024*1024,Unpooled.copiedBuffer(" ".getBytes()))); ch.pipeline().addLast(newStringEncoder(CharsetUtil.UTF_8));//String解码。 ch.pipeline().addLast(newStringDecoder(CharsetUtil.UTF_8));//String解码。 ////心跳设置 ch.pipeline().addLast(newIdleStateHandler(0,0,600,TimeUnit.SECONDS)); ch.pipeline().addLast(clientHandler); } }); this.connect(b); } privatevoidconnect(Bootstrapb)throwsInterruptedException{ longc1=System.currentTimeMillis(); finalintmaxRetries=2;//重连2次 finalAtomicIntegercount=newAtomicInteger(); finalAtomicBooleanflag=newAtomicBoolean(false); try{ this.channelFuture=b.connect(host,port).addListener( newChannelFutureListener(){ publicvoidoperationComplete(ChannelFuturefuture)throwsException{ if(!future.isSuccess()){ if(count.incrementAndGet()>maxRetries){ log.warn("imei={}重连超过{}次",imei,maxRetries); }else{ log.info("imei={}重连第{}次",imei,count); b.connect(host,port).addListener(this); } }else{ log.info("imei={}连接成功,连接IP:{}连接端口:{}",imei,host,port); flag.set(true); } } }).sync();//同步连接 }catch(Exceptione){ log.error(e.getMessage(),e); } log.info("设备imei={},channelId={}连接耗时={}ms",imei,channelFuture.channel().id(),System.currentTimeMillis()-c1); if(flag.get()){ channelFuture.channel().closeFuture().sync();//连接成功后将持续阻塞该线程 } } }
netty客户端为多实例,每个实例绑定一个线程,持续阻塞到客户端关闭为止,每个客户端中可以保存自己的业务数据,以便在后续与服务端交互时处理业务使用。客户端执行连接时,给了2次重试的机会,如果3次都没连接成功则放弃。后续可以选择将该消息重新入列消费。我们实际项目中,此处还应该预先给服务端发送一条登录消息,待服务端确认后才能执行后续通讯,这需要视实际情况进行调整。
另一个需要注意的点是EventLoopGroup是从构造函数传入的,而不是在客户端中创建的,因为当客户端数量非常多时,每个客户端都创建自己的线程组会极大的消耗服务器资源,因此我们在实际使用中是按业务去创建统一的线程组给该业务下的所有客户端共同使用的,线程组的大小需要根据业务需求灵活配置。
在init方法中,我们给客户端加上了一个handler来处理与服务端的交互,下面来看一下具体实现。
packageorg.example.client.handler; importio.netty.channel.ChannelHandlerContext; importio.netty.handler.timeout.IdleState; importio.netty.handler.timeout.IdleStateEvent; importlombok.extern.slf4j.Slf4j; importorg.example.client.NettyClient; importorg.springframework.context.annotation.Scope; importorg.springframework.stereotype.Component; importjava.util.Map; /** *@authorReWind00 *@date2023/2/1510:09 */ @Slf4j @Component @Scope("prototype") publicclassDemoClientHandlerextendsBaseClientHandler{ privatefinalStringimei; privatefinalMapbizData; privatefinalNettyClientnettyClient; privateintallIdleCounter=0; privatestaticfinalintMAX_IDLE_TIMES=3; publicDemoClientHandler(NettyClientnettyClient){ this.nettyClient=nettyClient; this.imei=nettyClient.getImei(); this.bizData=nettyClient.getBizData(); } @Override publicvoidchannelActive(ChannelHandlerContextctx)throwsException{ log.info("客户端imei={},通道激活成功",this.imei); synchronized(this.nettyClient){//当通道激活后解锁队列线程,然后再发送消息 this.nettyClient.notify(); } } @Override publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{ log.warn("客户端imei={},通道断开连接",this.imei); } @Override publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{ log.info("客户端imei={},收到消息:{}",this.imei,msg); //处理业务... if("shutdown".equals(msg)){ this.nettyClient.close(); } } @Override publicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{ if(evtinstanceofIdleStateEvent){ IdleStateEvente=(IdleStateEvent)evt; booleanflag=false; if(e.state()==IdleState.ALL_IDLE){ this.allIdleCounter++; log.info("客户端imei={}触发闲读或写第{}次",this.imei,this.allIdleCounter); if(this.allIdleCounter>=MAX_IDLE_TIMES){ flag=true; } } if(flag){ log.warn("读写超时达到{}次,主动断开连接",MAX_IDLE_TIMES); ctx.channel().close(); } } } @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ log.error("客户端imei={},连接异常{}",imei,cause.getMessage(),cause); } }
DemoClientHandler也是多实例bean,每个实例持有自己的NettyClient引用,以便在后续处理具体业务。在channelActive方法中,我们可以看到执行了客户端实例的notify方法,此处就是在客户端创建成功并且通道激活后解除wait锁的地方。channelRead方法就是我们处理服务端发送过来的消息的方法,我们的具体业务应该在该方法执行,当然不建议长时间阻塞客户端的工作线程,可以考虑异步处理。
最后我们看一下客户端缓存类。
packageorg.example.client; importjava.util.concurrent.ConcurrentHashMap; /** *@authorReWind00 *@date2023/2/1511:01 */ publicclassNettyClientHolder{ privatestaticfinalConcurrentHashMapclientMap=newConcurrentHashMap<>(); publicstaticConcurrentHashMap get(){ returnclientMap; } }
由于netty的通道无法序列化,因此不能存入redis,只能缓存在本地内存中,其本质就是一个ConcurrentHashMap。
五、测试
packageorg.example.client.controller; importorg.example.client.QueueHolder; importorg.example.client.model.NettyMsgModel; importorg.springframework.web.bind.annotation.GetMapping; importorg.springframework.web.bind.annotation.RequestMapping; importorg.springframework.web.bind.annotation.RequestParam; importorg.springframework.web.bind.annotation.RestController; /** *@authorReWind00 *@date2023/2/1513:48 */ @RestController @RequestMapping("/demo") publicclassDemoController{ /** *间隔发送两条消息 */ @GetMapping("testOne") publicvoidtestOne(){ QueueHolder.get().offer(NettyMsgModel.create("87654321","HelloWorld!")); try{ Thread.sleep(5000); }catch(InterruptedExceptione){ e.printStackTrace(); } QueueHolder.get().offer(NettyMsgModel.create("87654321","HelloWorldToo!")); } /** *任意发送消息 * *@paramimei *@parammsg */ @GetMapping("testTwo") publicvoidtestTwo(@RequestParamStringimei,@RequestParamStringmsg){ QueueHolder.get().offer(NettyMsgModel.create(imei,msg)); } /** *连续发送两条消息第二条由于redis锁将会重新放回队列延迟消费 */ @GetMapping("testThree") publicvoidtestThree(){ QueueHolder.get().offer(NettyMsgModel.create("12345678","HelloWorld!")); QueueHolder.get().offer(NettyMsgModel.create("12345678","HelloWorldToo!")); } }
测试接口代码如上,调用testOne,日志如下:
可以看到第一条消息触发了客户端创建流程,创建后发送了消息,而5秒后的第二条消息直接通过已有通道发送了。
测试接口代码如上,调用testTwo,日志如下:
发送shutdown可以主动断开已有连接。
测试接口代码如上,调用testThree,日志如下:
可以看到第二条消息重新入列并被延迟消费了。
六、源码
https://gitee.com/jaster/netty-tcp-demo
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
后记
本demo项目仅作学习交流使用,如果要应用到生产环境还有些许不足,有问题的同学可以留言交流。
-
通讯
+关注
关注
9文章
906浏览量
34944 -
TCP
+关注
关注
8文章
1370浏览量
79126 -
开源
+关注
关注
3文章
3365浏览量
42551 -
spring
+关注
关注
0文章
340浏览量
14353 -
服务端
+关注
关注
0文章
66浏览量
7021 -
SpringBoot
+关注
关注
0文章
173浏览量
183
原文标题:使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案
文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论