0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

基于多路复用模型的Netty框架

科技绿洲 来源:Java技术指北 作者:Java技术指北 2023-09-30 11:30 次阅读

Netty

version: 4.1.55.Final

传统的IO模型的web容器,比如老版本的Tomcat,为了增加系统的吞吐量,需要不断增加系统核心线程数量,或者通过水平扩展服务器数量,来增加系统处理请求的能力。 有了NIO之后,一个线程即可处理多个连接事件,基于多路复用模型的Netty框架,不仅降低了使用NIO的复杂度,

优点

Netty是一款以java NIO为基础,基于事件驱动模型支持异步、高并发的网络应用框架

  • API使用简单,开发门槛低,简化了NIO开发网络程序的复杂度
  • 功能强大,预置多种编解码功能,支持多种主流协议,比如Http、WebSocket。
  • 定制能力强,可以通过ChannelHandler对通信框架灵活扩展。
  • 性能高,支持异步非阻塞通信模型
  • 成熟稳定,社区活跃,已经修复了Java NIO所有的Bug。
  • 经历了大规模商业应用的考验,质量有保证。

IO模型

select、poll和epoll

操作系统内核基于这些函数实现非阻塞IO,以此实现多路复用模型

  • select

select

  1. select 调用需要传入 fd 数组,需要拷贝一份到内核,高并发场景下这样的拷贝消耗的资源是惊人的。(可优化为不复制)
  2. select 在内核层仍然是通过遍历的方式检查文件描述符的就绪状态,是个同步过程,只不过无系统调用切换上下文的开销。(内核层可优化为异步事件通知)
  3. select 仅仅返回可读文件描述符的个数,具体哪个可读还是要用户自己遍历。(可优化为只返回给用户就绪的文件描述符,无需用户做无效的遍历)
  • pool

和 select 的主要区别就是,去掉了 select 只能监听 1024 个文件描述符的限制

  • epool

epool

  1. 内核中保存一份文件描述符集合,无需用户每次都重新传入,只需告诉内核修改的部分即可。
  2. 内核不再通过轮询的方式找到就绪的文件描述符,而是通过异步 IO 事件唤醒。
  3. 内核仅会将有 IO 事件的文件描述符返回给用户,用户也无需遍历整个文件描述符集合。

Reactor模型

一、单Reactor单线程 1)可以实现通过一个阻塞对象监听多个链接请求

2)Reactor对象通过select监听客户端请求事件,通过dispatch进行分发

3)如果是建立链接请求,则由Acceptor通过accept处理链接请求,然后创建一个Handler对象处理完成链接后的各种事件

4)如果不是链接请求,则由Reactor分发调用链接对应的Handler来处理

5)Handler会完成Read->业务处理->send的完整业务流程

reactor

二、单Reactor多线程 1)Reactor对象通过select监听客户端请求事件,收到事件后,通过dispatch分发

2)如果是建立链接请求,则由Acceptor通过accept处理链接请求,然后创建一个Handler对象处理完成链接后的各种事件

3)如果不是链接请求,则由Reactor分发调用链接对应的Handler来处理

4)Handler只负责事件响应不做具体业务处理

5)通过read读取数据后,分发到worker线程池处理,处理完成后返回给Handler,Handler收到后,通过send将结果返回给client

reactor

三、主从Reactor多线程 1)Reactor主线程MainReactor对象通过select监听链接事件,通过Acceptor处理

2)当Acceptor处理链接事件后,MainReactor将链接分配给SubReactor

3)SubReactor将链接加入到队列进行监听,并创建Handler进行事件处理

4)当有新事件发生时,SubReactor就会调用对应的Handler处理

5)Handler通过read读取数据,分发到worker线程池处理,处理完成后返回给Handler,Handler收到后,通过send将结果返回给client

6)Reactor主线程可以对应多个Reactor子线程

reactor

三种模式用生活案例来理解 1)单Reactor单线程,前台接待员和服务员是同一个人,全程为顾客服务

2)单Reactor多线程,1个前台接待员,多个服务员,接待员只负责接待

3)主从Reactor多线程,多个前台接待员,多个服务员

Reactor模型具有如下优点 1)响应快,不必为单个同步事件所阻塞,虽然Reactor本身依然是同步的

2)可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销

3)扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU资源

4)复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性

核心组件

1.Bootstrap 一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件。

Handler,为了支持各种协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。

2.ChannelInboundHandler 一个最常用的Handler。这个Handler的作用就是处理接收到数据时的事件,也就是说,我们的业务逻辑一般就是写在这个Handler里面的,ChannelInboundHandler就是用来处理我们的核心业务逻辑。

3.ChannelInitializer 当一个链接建立时,我们需要知道怎么来接收或者发送数据,当然,我们有各种各样的Handler实现来处理它,那么ChannelInitializer便是用来配置这些Handler,它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline。

4.ChannelPipeline 一个Netty应用基于ChannelPipeline机制,这种机制需要依赖于EventLoop和EventLoopGroup,因为它们三个都和事件或者事件处理相关。

EventLoops的目的是为Channel处理IO操作,一个EventLoop可以为多个Channel服务。

EventLoopGroup会包含多个EventLoop。

5.Channel 代表了一个Socket链接,或者其它和IO操作相关的组件,它和EventLoop一起用来参与IO处理。

6.Future 在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发。

示例

通过一个简单的示例,首先了解怎么基于netty开发一个通信程序,包括服务的与客户端:

Server:

@Slf4j
public class Server {

    private EventLoopGroup boosGroup;

    private EventLoopGroup workGroup;

    public Server(int port){
        try {
            init(port);
            log.info("----- 服务启动成功 -----");
        } catch (InterruptedException e) {
            log.error("启动服务出错:{}", e.getCause());
        }
    }

    private void init(int port) throws InterruptedException {
        // 处理连接
        this.boosGroup = new NioEventLoopGroup();
        // 处理业务
        this.workGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        // 绑定
        bootstrap.group(boosGroup, workGroup)
                .channel(NioServerSocketChannel.class) //配置服务端
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_RCVBUF, 1024)
                .childOption(ChannelOption.SO_SNDBUF, 1024)
                .childHandler(new ChannelInitializer< SocketChannel >() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.bind(port).sync();
        channelFuture.channel().closeFuture().sync();
    }

    public void close(){
        this.boosGroup.shutdownGracefully();
        this.workGroup.shutdownGracefully();
    }

}

@Slf4j
class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(" >> >> >> > server active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //1. 读取客户端的数据(缓存中去取并打印到控制台)
        ByteBuf buf = (ByteBuf) msg;
        byte[] request = new byte[buf.readableBytes()];
        buf.readBytes(request);
        String requestBody = new String(request, "utf-8");
        log.info(" >> >> >> >> > receive message: {}", requestBody);

        //2. 返回响应数据
        ctx.writeAndFlush(Unpooled.copiedBuffer((requestBody+" too").getBytes()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

Client:

@Slf4j
public class Client {

    private EventLoopGroup workGroup;
    private ChannelFuture channelFuture;

    public Client(int port){
        init(port);
    }

    private void init(int port){
        this.workGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024)
                .option(ChannelOption.SO_SNDBUF, 1024)
                .handler(new ChannelInitializer< SocketChannel >() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });

        this.channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
    }

    /**
     *
     * @param message
     */
    public void send(String message){
        this.channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
    }

    /**
     *
     */
    public void close(){
        try {
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        workGroup.shutdownGracefully();
    }
}

@Slf4j
class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(" >> >> >> > client active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);

            String body = new String(req, "utf-8");
            log.info(" >> >> >> >> > receive message: {}", body);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

测试:

public class StarterTests {

    static int port = 9011;

    @Test
    public void startServer(){
        Server server = new Server(9011);
    }

    @Test
    public void startClient(){
        Client client = new Client(port);
        client.send("Hello Netty!");
        while (true){}
    }

}

生态

  • Dubbo
  • Spring Reactive

类似技术

Mina、Netty、Grizzly

其他

Proactor非阻塞异步网络模型

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • Web
    Web
    +关注

    关注

    2

    文章

    1261

    浏览量

    69435
  • 框架
    +关注

    关注

    0

    文章

    403

    浏览量

    17471
  • 容器
    +关注

    关注

    0

    文章

    495

    浏览量

    22060
  • 模型
    +关注

    关注

    1

    文章

    3218

    浏览量

    48801
收藏 人收藏

    评论

    相关推荐

    [6.4.1]--多路复用

    多路复用数字逻辑
    李开鸿
    发布于 :2022年11月13日 01:18:45

    多路复用ICSP引脚如何控制开关?

    您好,我有一个应用程序,我将使用PIC32MX575F512L。现在,我将有一个有限数量的电线,我可以取出框,将主机的电子。因此,我需要多路复用两条线:通常它们会产生一个串行端口(ttl),但是我
    发表于 04-01 08:19

    AD8183-EVAL是用于视频路由和多路复用系统的多路复用器评估板

    AD8183-EVAL,用于视频路由和多路复用系统的三路2:1模拟多路复用器评估板。 AD8183评估板经过精心布局和测试,以展示器件的指定高速性能
    发表于 06-17 12:40

    如何在Mx1051的FlexCAN1中配置简单信号多路复用和扩展信号多路复用

    我们正在研究 FlexCAN1 的 mxrt1051。我们是第一次在 FlexCAN 上工作,请协助以下几点: - 如何在 Mx1051 的 FlexCAN1 中配置简单信号多路复用和扩展信号
    发表于 05-05 11:05

    多路复用与数字复接

    多路复用与数字复接8.1 频分多路复用(FDM)原理8.2 时分多路复用(TDM)原理8.3 准同步数字体系(PDH) 8.4 同步数字体系(SDH)  
    发表于 10-22 13:26 0次下载

    多路复用技术

    2.3  多路复用技术2.3.1  频分多路复用2.3.2  时分多路复用2.3.3  波分多路复用2.3.4  码分
    发表于 06-27 21:46 0次下载

    基于CPLD的非多路复用多路复用总线转换桥的设计与实现

    基于CPLD的非多路复用多路复用总线转换桥的设计与实现 微处理器对外并行总线接口方式一般分为两种,一种为多路复用方式,数据与地址采用共用引脚,分时传输;另一
    发表于 03-28 15:08 849次阅读
    基于CPLD的非<b class='flag-5'>多路复用</b>与<b class='flag-5'>多路复用</b>总线转换桥的设计与实现

    多路复用多路复用总线转换桥的设计与实现

    多路复用多路复用总线转换桥的设计与实现 提出了一种新颖的非多路复用总线与多路复用总线的转换接口电路。以两种总线的典型代表芯片TMS
    发表于 03-28 15:14 952次阅读
    非<b class='flag-5'>多路复用</b>与<b class='flag-5'>多路复用</b>总线转换桥的设计与实现

    复用器的多路复用

    复用器的多路复用  多路复用
    发表于 01-07 14:27 1187次阅读

    频分多路复用(FDM),频分多路复用(FDM)是什么意思

    频分多路复用(FDM),频分多路复用(FDM)是什么意思 “复用”是一种将若干个彼此独立的信号,合并为一个可在同一信道上同时传输的复合信号的方法。
    发表于 03-19 14:00 7539次阅读

    时分多路复用(TDM),时分多路复用(TDM)的原理是什么?

    时分多路复用(TDM),时分多路复用(TDM)的原理是什么?  为了提高信道利用率,使多个信号沿同一信道传输而互相不干扰,称
    发表于 03-19 14:07 1w次阅读

    什么是异步时分多路复用(ATDM)

    什么是异步时分多路复用(ATDM) 异步时分多路复用技术 (ATDM,Asynchronism Time-Division Multiplexing)
    发表于 04-03 15:25 1900次阅读

    时分多路复用(TDM),时分多路复用(TDM)是什么意思

    时分多路复用(TDM),时分多路复用(TDM)是什么意思 这种方法是把传输信道按时间来分割,为每个用户指定一个时间间隔,每个间隔里传输信号
    发表于 04-03 15:28 5904次阅读

    如何改进开关/多路复用器LTspice模型

    如果我的模拟设计中包含开关和多路复用器,那么还能改进开关/多路复用器LTspice模型吗?
    的头像 发表于 03-01 13:34 3441次阅读
    如何改进开关/<b class='flag-5'>多路复用</b>器LTspice<b class='flag-5'>模型</b>

    频分多路复用和时分多路复用的区别有哪些

    频分多路复用(FDM)和时分多路复用(TDM)是两种主要的多路复用技术,它们在通信系统中扮演着至关重要的角色。
    的头像 发表于 05-07 15:24 2541次阅读