0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

一个注解实现WebSocket集群的方案

jf_ro2CN3Fa 来源:CSDN 2023-04-10 11:37 次阅读

介绍

WebSocket大家应该是再熟悉不过了,如果是单体应用确实不会有什么问题,但是当我们的项目使用微服务架构时,就可能会存在问题

比如服务A有两个实例A1和A2,前端的WebSocket客户端C通过网关的负载均衡连到了A1,这个时候当A2触发消息发送的逻辑,需要将某个消息发送给所有的客户端时,C就接受不到消息

这个时候我们很快就能想到一种最简单的解决方案,就是把A2的消息转发给A1,A1再把消息发送给C,这样C就能收到A2发送的消息了

81f46d56-d6c8-11ed-bfe3-dac502259ad0.png

基于这个思路,我实现了一个库,一个配置注解搞定一切

用法

接下来让我们看看这个库的用法

首先我们需要在启动类上添加一个注解@EnableWebSocketLoadBalanceConcept

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
publicclassAServiceApplication{

publicstaticvoidmain(String[]args){
SpringApplication.run(AServiceApplication.class,args);
}
}

接着我们在需要发送消息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨实例发消息啦

@RestController
@RequestMapping("/ws")
publicclassWsController{

@Autowired
privateWebSocketLoadBalanceConceptconcept;

@RequestMapping("/send")
publicvoidsend(@RequestParamStringmsg){
concept.send(msg);
}
}

是不是很简单,有没有觉得比自己集成单体应用的WebSocket还要简单!

当你的同事还在头疼要实现手动转发时你已经通过一个配置注解实现了功能并开始泡茶喝

你的同事肯定对你刮目相看啊(又能开始摸鱼了)

不知道大家看了之后是不是对具体实现已经有了一些思路呢

接下来我就来讲讲这个库的实现流程

抽象思路

其实我之前有专门针对WebSocket实现过类似功能的模块,只是当时的一些场景都是基于项目定死的,所以相对来说实现比较简单,但是过于定制化不好扩展

有一天在和我的一个前同事聊天的过程中得知,他们在考虑让设备和服务直连,并且服务要部署成多实例

设备和服务直连无非就是通过TCP这种长连接来实现,可以使用缓存来保存连接和服务地址的映射关系来实现点对点转发的功能需求

听到这里,是不是感觉似曾相识?当时就有一道光穿过我的脑瓜子,真相只有一个!这不就和WebSocket在集群模式下的问题一样么

于是我从原来针对WebSocket的思考,变成了对各种长连接的思考,最终我将这个问题抽象成了:长连接的集群方案

而不管是WebSocket还是TCP都是长连接的一种具体实现

所以我们可以抽象一个顶级接口Connection,然后实现WebSocketConnection或者是TCPConnection

其实从抽象的角度来说不仅仅是长连接,短连接也在我们的抽象范围之内,只不过类似HTTP等协议并不存在上述的问题,但是并不妨碍你实现一个HTTPConnection用于转发消息,所以大家不要被先入为主的思维束缚住了

转发思路

之前讲到,这个库的主要思路就是将消息转发给其他的服务实例来达到一个单播或广播的效果

所以消息转发的设计就非常重要了

首先消息转发需要凭借一些支持数据交互的技术手段

比如HTTP,MQ,TCP,WebSocket

说到这里。。。大家是不是。。。你TM原来自己就能搞定啊(掀桌)

长连接不就是用来交互数据的吗,所以完全可以自给自足啊

于是就有一个精妙的想法在我脑子里形成:

如果每个服务实例都把自己作为一个客户端,连接到其他服务上呢?

WebSocket的场景下,我们将当前服务实例作为一个WebSocket客户端去连接其他服务实例的WebSocket服务端

TCP的场景下,我们将当前服务实例作为一个TCP的客户端去连接其他服务实例的TCP服务端

这样其他服务实例就可以把消息发到这些伪装的客户端上,当服务实例上伪装的客户端接收到消息之后就可以再转发给自己管理的真正的客户端

撒花家人们,自闭(自我闭环)了属于是

所以我们首先需要先让服务实例之间相互连接上

连接流程

让我们来看看互相建立连接是怎么设计的

82155278-d6c8-11ed-bfe3-dac502259ad0.png

我定义了一个ConnectionSubscriber的接口,大家可以理解为我们的服务实例要去订阅监听其他服务发送的消息

同时提供了默认实现,就是基于自身的协议进行连接和消息的发送

当然也能够灵活的支持其他方式,只需要自定义一个ConnectionSubscriber就可以了,如果使用MQ的方式就可以实现一个MQConnectionSubscriber或者使用HTTP就可以实现一个HTTPConnectionSubscriber

只不过使用自身的协议就可以不用依赖其他的库或是中间件了,当然如果你对消息的丢失率有比较严格的要求也可以使用MQ作为消息转发的中介,而以我之前参与过的项目来说,一般普通的WebSocket场景基本上还是能忍受一定的丢失率的

获取服务实例信息

那么我们怎么知道要去连接哪些实例呢

我定义了一个ConnectionServerManager的接口用来管理服务信息

当然我们完全可以自己实现一个,比如通过配置文件来配置服务实例信息

不过我们有更方便的方式,那就是依赖Spring Cloud的服务发现组件了,不管是Eureka还是Nacos还是其他的注册中心相当于都支持了,这就是抽象的魅力啊

我们可以通过DiscoveryClient#getInstances(Registration.getServiceId())来获得所有的实例,排除掉自身就是需要连接的服务实例了

当我们的服务实例连接上其他的服务实例之后,发送一个自身实例信息的消息过去,其他的服务实例接收到对应的消息之后反过来连接我们的服务实例,保证一定的连接及时性,这样双方的连接就搭建起来了,可以互相转发消息了

同时我还添加了心跳检测和自动重连,当一段时间没有收到心跳回复后就会断开连接,并且每隔一段时间就会重新查询一遍实例信息,如果发现存在某个服务实例没有对应的连接,就会重新进行连接,这样就能在某些偶尔网络不好的情况下有一定的容错

到目前为止,我们基本的框架已经建立了,当我们启动服务之后,服务间就会自动建立连接

连接区分和管理

基于上述的思路,我们肯定需要区分真实的客户端和用来转发的客户端

于是我就把这些连接做了一个分类

类别 说明
Client 普通的连接
Subscriber 服务实例伪装的连接,用于接受需要转发的消息
Observable 服务实例伪装的连接,用于发送需要转发的消息

然后对于这些连接进行一个统一的管理

826dec12-d6c8-11ed-bfe3-dac502259ad0.png

通过连接工厂ConnectionFactory我们可以将任意的连接适配成Connection对象,并实现各种连接间的消息转发

每个连接都会配置一个MessageEncoder和MessageDecoder用于消息的编码和解码,而且不同类别的连接对应的编码器和解码器肯定是不一样的,比如转发的消息和发给真实客户端的消息很大程度上都是有区别的,所以额外定义了一个MessageCodecAdapter用来适配不同类型的编解码器,也能让大家在自定义时方便管理

消息发送

现在当我们发送某条消息之后,消息就会被转发到其他的服务实例,所有的客户端就都能收到了

不对啊,在有些情况下我们不想让所有客户端都收到啊,能不能我们想让谁收到就让谁收到啊

真麻烦,来,我把所有的连接都给你,你自己选吧

连接选择

我们需要在消息发送时确定发送给哪些连接

82cd30be-d6c8-11ed-bfe3-dac502259ad0.png

于是我就定义了一个连接选择器ConnectionSelector

每次要发送消息的时候,我都会匹配一个连接选择器,然后通过选择器来获得需要发送消息的连接,而我们可以通过自定义连接选择器来实现我们消息的精准发送

这里其实就是我为什么会取名WebSocketLoadBalanceConcept的原因,为什么要叫LoadBalance呢

Ribbon通过IRule来选择一个Server

我通过ConnectionSelector来选择一个Connection集合

是不是有异曲同工之妙

继续来说自定义选择器

准备工作:

我们的Connection有一个metadata字段用于存放自定义属性

我们的Message有一个headers字段用于存放消息头

给指定用户发送消息

很多场景下我们需要给指定的用户发送消息

首先当客户端连接上来时,可以通过参数或者主动发送一个消息将userId发给服务端,然后服务端将得到的userId存在Connection的metadata中

接着我们给需要发送的Message添加一个header,将对应的userId作为消息头

这样我们就可以自定义一个连接选择器通过判断Message是否包含userId消息头来作为匹配的条件,当Message的headers中存在userId时,对Connection中的metadata进行userId的匹配来筛选需要发送消息的连接

由于userId是唯一的,当我们自身服务连上来的客户端中已经匹配到就不需要再转发了,如果没有匹配到就通过其他服务实例的客户端进行消息转发

库中已经实现了对应的UserSelector和UserMessage,可以使用配置开启并通过在连接路径上添加userId参数来标记用户

当然我们也可以借用缓存来精确的判断需不需要转发或者是需要转发给哪几个服务,把userId和服务的instanceId等一些具有唯一性的数据缓存在Redis中,当给用户发送消息时,从Redis中获得用户对应的服务实例的instanceId或是具有唯一性的数据,如果经过匹配就是当前服务就可以直接下发,如果是其他服务就转发给那个对应的服务就行了

给指定路径发送消息

还有一种场景也比较常见就是类似主题订阅,如订阅设备状态更新的数据,就要给每一个对应路径的连接发送消息了

我们可以使用不同的路径来表示不同主题,然后自定义一个连接选择器来匹配连接的路径和消息头中指定的路径

当然库中也已经实现了对应的PathSelector和PathMessage,可以通过配置开启

结束

最后请允许我发表一点对于抽象的拙见

抽象其实就和 “道生一,一生二,二生三,三生万物” 一样,根据你的顶级接口(也就是核心功能)不断的向外展开,你的顶级接口就是道(狭义的来讲)

以这个库为例,ConnectionLoadBalanceConcept就是这个库的道,他的核心功能就是发送消息,至于怎么发,发给谁,不确定,像是一个混沌的状态

那么什么是一,二,三呢,我们发送消息需要载体于是就有了Connection和Message,我们需要对Connection进行管理于是就有了ConnectionRepository,我们需要转发消息于是就有了ConnectionSubscriber等等

而万物就像是具体的实现,是能落实的,基于Spring Cloud服务发现的连接管理器DiscoveryConnectionServerManager,基于路径的连接选择器PathSelector,基于Reactive的WebSocket连接ReactiveWebSocketConnection

就像是你创造的世界,不断的衍生出各种各样的规则,这些规则相辅相成,让你的世界平稳的运行。






审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • HTTP协议
    +关注

    关注

    0

    文章

    66

    浏览量

    9823
  • TCP通信
    +关注

    关注

    0

    文章

    146

    浏览量

    4298
  • WebSocket
    +关注

    关注

    0

    文章

    29

    浏览量

    3819

原文标题:一个注解实现 WebSocket 集群方案,这样玩才爽!

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    hyper-v 集群,hyper-v集群的共享存储

    面对日益庞大的业务体系,如同面对广袤的疆土,批量管理就像是睿智的统治者,实现对每寸土地的有效掌控。今天小编要讲解hyper-v集群的共享存储。    配置Hyper-V集群的共享存储
    的头像 发表于 02-08 11:25 126次阅读
    hyper-v <b class='flag-5'>集群</b>,hyper-v<b class='flag-5'>集群</b>的共享存储

    hyper-v 集群,Hyper-V 集群:搭建Hyper-V集群的步骤

    ”。今天就为大家介绍Hyper-V集群:搭建Hyper-V集群的步骤。    在当今的IT环境中,虚拟化技术已经成为提高资源利用率和实现高可用性的关键手段之。Hyper-V作为微软提
    的头像 发表于 02-07 09:47 116次阅读
    hyper-v <b class='flag-5'>集群</b>,Hyper-V <b class='flag-5'>集群</b>:搭建Hyper-V<b class='flag-5'>集群</b>的步骤

    EE-98:使用外部总线仲裁将两以上的ADSP-21065L组合到多处理集群

    电子发烧友网站提供《EE-98:使用外部总线仲裁将两以上的ADSP-21065L组合到多处理集群中.pdf》资料免费下载
    发表于 01-05 09:38 0次下载
    EE-98:使用外部总线仲裁将两<b class='flag-5'>个</b>以上的ADSP-21065L组合到<b class='flag-5'>一</b><b class='flag-5'>个</b>多处理<b class='flag-5'>集群</b>中

    国产智算集群黑马!曦源号SADA算力集群综合评测表现优异

    稳定性、线性度、模型支持度等多个维度均表现优异。加佳科技长期深耕国产替代数字科技的技术研发、平台运营与解决方案提供。旗下曦源号SADA万卡集群通过构建开放、标准、
    的头像 发表于 12-25 11:16 429次阅读
    国产智算<b class='flag-5'>集群</b>黑马!曦源<b class='flag-5'>一</b>号SADA算力<b class='flag-5'>集群</b>综合评测表现优异

    韩国无晶圆厂初创公司Panmnesia展示第一个支持CXL的AI集群

    在2024 OCP全球峰会上,开发CXL交换机SoC和CXL IP的韩国无晶圆厂初创公司Panmnesia展示了第一个支持CXL的AI集群,该集群采用CXL 3.1交换机。 OCP全球峰会由世界上
    的头像 发表于 11-28 11:04 504次阅读

    socket 和 WebSocket 的区别

    在现代网络通信中,Socket和WebSocket是两种常见的通信协议。它们在实现网络通信、数据传输等方面发挥着重要作用。然而,它们之间存在些关键的区别。 1. Socket(套接字) 1.1
    的头像 发表于 11-12 14:33 668次阅读

    socket与WebSocket的区别与联系

    ) : Socket是种通信端点,它在网络编程中用于实现不同主机之间的通信。Socket可以是TCP套接字或UDP套接字,分别对应于TCP(传输控制协议)和UDP(用户数据报协议)。 TCP套接字提供了可靠的、面向连接的通信服务,而UDP套接字则提供了不可靠的、无连接的
    的头像 发表于 11-04 09:19 619次阅读

    不可错过的Air780E之WebSocket应用示范!小白篇

    咋们今天说的Air780E之WebSocket应用示范针对小白绝对是不可以错过的示例。
    的头像 发表于 11-03 20:16 950次阅读
    不可错过的Air780E之<b class='flag-5'>WebSocket</b>应用示范!小白篇

    在多FPGA集群实现高级并行编程

    今天我们看的这篇论文介绍了在多FPGA集群实现高级并行编程的研究,其主要目标是为非FPGA专家提供成熟且易于使用的环境,以便在多个并行运行的设备上扩展高性能计算(HPC)应用。
    的头像 发表于 07-24 14:54 1444次阅读

    websocket.c RTOS演示中缺少对wifi_connect()的调用怎么办?

    在 RTOS SDK 1.3 中,有名为 /examples/websocket_demo/websocket/websocket.c
    发表于 07-18 06:37

    请问websocket库怎么读取服务器发来的数据?

    官方websocket库怎么读取服务器发来的数据?
    发表于 06-25 06:40

    鸿蒙开发网络管理:ohos.net.webSocket WebSocket连接

    使用WebSocket建立服务器与客户端的双向连接,需要先通过[createWebSocket]方法创建[WebSocket]对象,然后通过[connect]方法连接到服务器。当连接成功后,客户端
    的头像 发表于 06-19 17:12 700次阅读
    鸿蒙开发网络管理:ohos.net.<b class='flag-5'>webSocket</b> <b class='flag-5'>WebSocket</b>连接

    ESP32进行websocket通信接收数据出错的原因?

    开发,支持一对一私聊和对多公聊 W (104983) WEBSOCKET: Total payload length=277, data_len=277, current payload offset
    发表于 06-14 07:42

    HBase集群数据在线迁移方案探索

    、背景 订单本地化系统目前一个月的订单的读写已经切至jimkv存储,对应的HBase集群已下线。但存储全量数据的HBase集群仍在使用,计划将这个HBase
    的头像 发表于 06-12 11:54 1240次阅读
    HBase<b class='flag-5'>集群</b>数据在线迁移<b class='flag-5'>方案</b>探索

    鸿蒙原生应用开发-网络管理WebSocket连接

    。使用该功能需要申请ohos.permission.INTERNET权限。具体接口说明如下表。 三、开发步骤 1.导入需要的webSocket模块。 2.创建WebSocket连接
    发表于 04-07 09:46