0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

netty推送消息接口及实现

jf_ro2CN3Fa 来源:芋道源码 作者:芋道源码 2022-11-02 16:14 次阅读

netty服务器

Netty配置

管道配置

自定义handler

推送消息接口及实现类

测试

学过 Netty 的都知道,Netty 对 NIO 进行了很好的封装,简单的 API,庞大的开源社区。深受广大程序员喜爱。基于此本文分享一下基础的 netty 使用。实战制作一个 Netty + websocket 的消息推送小栗子。

netty服务器

@Component
publicclassNettyServer{

staticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class);

/**
*端口号
*/
@Value("${webSocket.netty.port:8888}")
intport;

EventLoopGroupbossGroup;
EventLoopGroupworkGroup;

@Autowired
ProjectInitializernettyInitializer;

@PostConstruct
publicvoidstart()throwsInterruptedException{
newThread(()->{
bossGroup=newNioEventLoopGroup();
workGroup=newNioEventLoopGroup();
ServerBootstrapbootstrap=newServerBootstrap();
//bossGroup辅助客户端的tcp连接请求,workGroup负责与客户端之前的读写操作
bootstrap.group(bossGroup,workGroup);
//设置NIO类型的channel
bootstrap.channel(NioServerSocketChannel.class);
//设置监听端口
bootstrap.localAddress(newInetSocketAddress(port));
//设置管道
bootstrap.childHandler(nettyInitializer);

//配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuturechannelFuture=null;
try{
channelFuture=bootstrap.bind().sync();
log.info("Serverstartedandlistenon:{}",channelFuture.channel().localAddress());
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}).start();
}

/**
*释放资源
*/
@PreDestroy
publicvoiddestroy()throwsInterruptedException{
if(bossGroup!=null){
bossGroup.shutdownGracefully().sync();
}
if(workGroup!=null){
workGroup.shutdownGracefully().sync();
}
}
}

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

项目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro

视频教程:https://doc.iocoder.cn/video/

Netty配置

管理全局Channel以及用户对应的channel(推送消息)

publicclassNettyConfig{

/**
*定义全局单利channel组管理所有channel
*/
privatestaticvolatileChannelGroupchannelGroup=null;

/**
*存放请求ID与channel的对应关系
*/
privatestaticvolatileConcurrentHashMapchannelMap=null;

/**
*定义两把锁
*/
privatestaticfinalObjectlock1=newObject();
privatestaticfinalObjectlock2=newObject();


publicstaticChannelGroupgetChannelGroup(){
if(null==channelGroup){
synchronized(lock1){
if(null==channelGroup){
channelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
}
}
returnchannelGroup;
}

publicstaticConcurrentHashMapgetChannelMap(){
if(null==channelMap){
synchronized(lock2){
if(null==channelMap){
channelMap=newConcurrentHashMap<>();
}
}
}
returnchannelMap;
}

publicstaticChannelgetChannel(StringuserId){
if(null==channelMap){
returngetChannelMap().get(userId);
}
returnchannelMap.get(userId);
}
}

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

项目地址:https://gitee.com/zhijiantianya/yudao-cloud

视频教程:https://doc.iocoder.cn/video/

管道配置

@Component
publicclassProjectInitializerextendsChannelInitializer{

/**
*webSocket协议名
*/
staticfinalStringWEBSOCKET_PROTOCOL="WebSocket";

/**
*webSocket路径
*/
@Value("${webSocket.netty.path:/webSocket}")
StringwebSocketPath;
@Autowired
WebSocketHandlerwebSocketHandler;

@Override
protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{
//设置管道
ChannelPipelinepipeline=socketChannel.pipeline();
//流水线管理通道中的处理程序(Handler),用来处理业务
//webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
pipeline.addLast(newHttpServerCodec());
pipeline.addLast(newObjectEncoder());
//以块的方式来写的处理器
pipeline.addLast(newChunkedWriteHandler());
pipeline.addLast(newHttpObjectAggregator(8192));
pipeline.addLast(newWebSocketServerProtocolHandler(webSocketPath,WEBSOCKET_PROTOCOL,true,65536*10));
//自定义的handler,处理业务逻辑
pipeline.addLast(webSocketHandler);
}
}

自定义handler

@Component
@ChannelHandler.Sharable
publicclassWebSocketHandlerextendsSimpleChannelInboundHandler{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class);

/**
*一旦连接,第一个被执行
*/
@Override
publicvoidhandlerAdded(ChannelHandlerContextctx)throwsException{
log.info("有新的客户端链接:[{}]",ctx.channel().id().asLongText());
//添加到channelGroup通道组
NettyConfig.getChannelGroup().add(ctx.channel());
}

/**
*读取数据
*/
@Override
protectedvoidchannelRead0(ChannelHandlerContextctx,TextWebSocketFramemsg)throwsException{
log.info("服务器收到消息:{}",msg.text());

//获取用户ID,关联channel
JSONObjectjsonObject=JSONUtil.parseObj(msg.text());
Stringuid=jsonObject.getStr("uid");
NettyConfig.getChannelMap().put(uid,ctx.channel());

//将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
AttributeKeykey=AttributeKey.valueOf("userId");
ctx.channel().attr(key).setIfAbsent(uid);

//回复消息
ctx.channel().writeAndFlush(newTextWebSocketFrame("服务器收到消息啦"));
}

@Override
publicvoidhandlerRemoved(ChannelHandlerContextctx)throwsException{
log.info("用户下线了:{}",ctx.channel().id().asLongText());
//删除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
}

@Override
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{
log.info("异常:{}",cause.getMessage());
//删除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
ctx.close();
}

/**
*删除用户与channel的对应关系
*/
privatevoidremoveUserId(ChannelHandlerContextctx){
AttributeKeykey=AttributeKey.valueOf("userId");
StringuserId=ctx.channel().attr(key).get();
NettyConfig.getChannelMap().remove(userId);
}
}

推送消息接口及实现类

publicinterfacePushMsgService{

/**
*推送给指定用户
*/
voidpushMsgToOne(StringuserId,Stringmsg);

/**
*推送给所有用户
*/
voidpushMsgToAll(Stringmsg);

}
@Service
publicclassPushMsgServiceImplimplementsPushMsgService{

@Override
publicvoidpushMsgToOne(StringuserId,Stringmsg){
Channelchannel=NettyConfig.getChannel(userId);
if(Objects.isNull(channel)){
thrownewRuntimeException("未连接socket服务器");
}

channel.writeAndFlush(newTextWebSocketFrame(msg));
}

@Override
publicvoidpushMsgToAll(Stringmsg){
NettyConfig.getChannelGroup().writeAndFlush(newTextWebSocketFrame(msg));
}
}

测试

7c253b2c-573d-11ed-a3b6-dac502259ad0.png

链接服务器

7c607976-573d-11ed-a3b6-dac502259ad0.png7c8095ee-573d-11ed-a3b6-dac502259ad0.png

发送消息

7cb7271c-573d-11ed-a3b6-dac502259ad0.png7dd7ed98-573d-11ed-a3b6-dac502259ad0.png

调用接口,往前端推送消息!

7e0179ba-573d-11ed-a3b6-dac502259ad0.png7e1e56de-573d-11ed-a3b6-dac502259ad0.png

OK!

一个简单的 netty 小栗子就完成了。

审核编辑:彭静
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 接口
    +关注

    关注

    33

    文章

    8580

    浏览量

    151046
  • 封装
    +关注

    关注

    126

    文章

    7881

    浏览量

    142907
  • 服务器
    +关注

    关注

    12

    文章

    9129

    浏览量

    85348

原文标题:Spring Boot+Netty+Websocket实现后台向前端推送信息

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    基于多路复用模型的Netty框架

    Netty version: 4.1.55.Final 传统的IO模型的web容器,比如老版本的Tomcat,为了增加系统的吞吐量,需要不断增加系统核心线程数量,或者通过水平扩展服务器数量,来增加
    的头像 发表于 09-30 11:30 826次阅读

    基于阿里云移动推送的移动应用推送模式最佳实践

    推送系统自动分配,通过接口获取2.2 账号与deviceID一一对应,对于同一设备切换账号的场景,通过重新绑定账号实现2.3 别名一个deviceID可以对应多个别名别名是用户粒度的概念,建议用于单推
    发表于 03-02 11:48

    如何实现服务器自动推送消息?

    有个想法,想和大家探讨一下如何实现。功能:自己在本地写个日志,第二天自动推送到手机上想法:想法不成熟,因为知识面太少了,目前想的是自己在本地电脑写个日志,上传给服务器,服务器第二天定时推送到指定邮箱上。想知道如何
    发表于 03-16 11:34

    怎么去理解netty

    导读原创文章,转载请注明出处。本文源码地址:netty-source-code-analysis两篇开胃小菜过后,我已经有一些粉丝了,还有一些粉丝加了我的好友,有粉丝通过微信对我的文章表示了肯定
    发表于 08-31 06:42

    怎样使用springboot整合netty来开发一套高性能的通信系统呢

    怎样使用springboot整合netty来开发一套高性能的通信系统呢?为什么要用这两个框架来实现通信服务呢?如何去实现呢?
    发表于 02-22 06:09

    网络编程框架netty io介绍

    深入理解网络编程框架netty io欢迎大家下载学习
    发表于 09-28 07:36

    如何采用mqtt协议实现物联网模块消息推送

    如何采用mqtt协议实现物联网模块消息推送
    发表于 11-03 06:55

    单片机MQTT如何实现推送的简单使用

    本文档的主要内容详细介绍的是单片机MQTT如何实现推送的简单使用。
    发表于 07-19 17:37 9次下载
    单片机MQTT如何<b class='flag-5'>实现</b><b class='flag-5'>推送</b>的简单使用

    Springboot整合netty框架实现终端、通讯板子(单片机)TCP/UDP通信案例

    如何springboot和netty案例的源代码一个springboot整合netty框架的开发小案例,实现服务端与单片机终端实时通信的通讯架构案例。物联网通信给板子下发指令案例附带源码及整合流程步骤
    发表于 12-29 18:55 20次下载
    Springboot整合<b class='flag-5'>netty</b>框架<b class='flag-5'>实现</b>终端、通讯板子(单片机)TCP/UDP通信案例

    详解Netty高性能异步事件驱动的网络框架

    大家好,今天我们来聊聊Netty的那些事儿,我们都知道Netty是一个高性能异步事件驱动的网络框架。
    的头像 发表于 03-16 10:57 1843次阅读

    Netty如何实现消息推送

    Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。
    的头像 发表于 08-30 09:42 1465次阅读

    Netty如何做到单机百万并发?

    说起 Netty 的异步模型,我相信大多数人,只要是写过服务端的话,都是耳熟能详的,bossGroup 和 workerGroup 被 ServerBootstrap 所驱动,用起来简直是如虎添翼。
    的头像 发表于 09-07 10:51 1067次阅读

    一步步解决长连接Netty服务内存泄漏

    线上应用长连接 Netty 服务出现内存泄漏了!真让人头大
    的头像 发表于 04-27 14:06 1125次阅读
    一步步解决长连接<b class='flag-5'>Netty</b>服务内存泄漏

    聊聊Netty那些事儿之从内核角度看IO模型

    从今天开始我们来聊聊Netty的那些事儿,我们都知道Netty是一个高性能异步事件驱动的网络框架。
    的头像 发表于 05-23 10:27 1409次阅读
    聊聊<b class='flag-5'>Netty</b>那些事儿之从内核角度看IO模型

    jdk17下netty导致堆内存疯涨原因排查

    天网风控灵玑系统是基于内存计算实现的高吞吐低延迟在线计算服务,提供滑动或滚动窗口内的 count、distinctCout、max、min、avg、sum、std 及区间分布类的在线统计计算服务
    的头像 发表于 09-12 11:22 798次阅读
    jdk17下<b class='flag-5'>netty</b>导致堆内存疯涨原因排查