介绍
由于前段时间我实现了一个库【Spring Cloud】一个配置注解实现 WebSocket 集群方案
以至于我对WebSocket的各种集成方式做了一些研究
目前我所了解到的就是下面这些了(就一个破ws都有这么多花里胡哨的集成方式了?)
Javax
WebMVC
WebFlux
Java-WebSocket
SocketIO
Netty
今天主要介绍一下前3种方式,毕竟现在的主流框架还是Spring Boot
而后3种其实和Spring Boot并不强行绑定,基于Java就可以支持,不过我也会对后3种做个简单的介绍,大家先混个眼熟就行了
那么接下来我们就来讲讲前3种方式(Javax,WebMVC,WebFlux)在Spring Boot中的服务端和客户端配置(客户端配置也超重要的有木有,平时用不到,用到了却基本找不到文档,这也太绝望了)
Javax
在java的扩展包javax.websocket中就定义了一套WebSocket的接口规范
服务端
一般使用注解的方式来进行配置
第一步
@Component @ServerEndpoint("/websocket/{type}") publicclassJavaxWebSocketServerEndpoint{ @OnOpen publicvoidonOpen(Sessionsession,EndpointConfigconfig, @PathParam(value="type")Stringtype){ //连接建立 } @OnClose publicvoidonClose(Sessionsession,CloseReasonreason){ //连接关闭 } @OnMessage publicvoidonMessage(Sessionsession,Stringmessage){ //接收文本信息 } @OnMessage publicvoidonMessage(Sessionsession,PongMessagemessage){ //接收pong信息 } @OnMessage publicvoidonMessage(Sessionsession,ByteBuffermessage){ //接收二进制信息,也可以用byte[]接收 } @OnError publicvoidonError(Sessionsession,Throwablee){ //异常处理 } }
我们在类上添加@ServerEndpoint注解来表示这是一个服务端点,同时可以在注解中配置路径,这个路径可以配置成动态的,使用{}包起来就可以了
@OnOpen用来标记对应的方法作为客户端连接上来之后的回调,Session就相当于和客户端的连接啦,我们可以把它缓存起来用于发送消息;通过@PathParam注解就可以获得动态路径中对应值了
@OnClose用来标记对应的方法作为客户端断开连接之后的回调,我们可以在这个方法中移除对应Session的缓存,同时可以接受一个CloseReason的参数用于获取关闭原因
@OnMessage用来标记对应的方法作为接收到消息之后的回调,我们可以接受文本消息,二进制消息和pong消息
@OnError用来标记对应的方法作为抛出异常之后的回调,可以获得对应的Session和异常对象
第二步
implementation'org.springframework.boot:spring-boot-starter-websocket' @Configuration(proxyBeanMethods=false) publicclassJavaxWebSocketConfiguration{ @Bean publicServerEndpointExporterserverEndpointExporter(){ returnnewServerEndpointExporter(); } }
依赖Spring的WebSocket模块,手动注入ServerEndpointExporter就可以了
需要注意ServerEndpointExporter是Spring中的类,算是Spring为了支持javax.websocket的原生用法所提供的支持类
冷知识
javax.websocket库中定义了PongMessage而没有PingMessage
通过我的测试发现基本上所有的WebSocket包括前端js自带的,都实现了自动回复;也就是说当接收到一个ping消息之后,是会自动回应一个pong消息,所以没有必要再自己接受ping消息来处理了,即我们不会接受到ping消息
当然我上面讲的ping和pong都是需要使用框架提供的api,如果是我们自己通过Message来自定义心跳数据的话是没有任何的处理的,下面是对应的api
//发送ping session.getAsyncRemote().sendPing(ByteBufferbuffer); //发送pong session.getAsyncRemote().sendPong(ByteBufferbuffer);
然后我又发现js自带的WebSocket是没有发送ping的api的,所以是不是可以猜想当初就是约定服务端发送ping,客户端回复pong
客户端
客户端也是使用注解配置
第一步
@ClientEndpoint publicclassJavaxWebSocketClientEndpoint{ @OnOpen publicvoidonOpen(Sessionsession){ //连接建立 } @OnClose publicvoidonClose(Sessionsession,CloseReasonreason){ //连接关闭 } @OnMessage publicvoidonMessage(Sessionsession,Stringmessage){ //接收文本消息 } @OnMessage publicvoidonMessage(Sessionsession,PongMessagemessage){ //接收pong消息 } @OnMessage publicvoidonMessage(Sessionsession,ByteBuffermessage){ //接收二进制消息 } @OnError publicvoidonError(Sessionsession,Throwablee){ //异常处理 } }
客户端使用@ClientEndpoint来标记,其他的@OnOpen,@OnClose,@OnMessage,@OnError和服务端一模一样
第二步
WebSocketContainercontainer=ContainerProvider.getWebSocketContainer(); Sessionsession=container.connectToServer(JavaxWebSocketClientEndpoint.class,uri);
我们可以通过ContainerProvider来获得一个WebSocketContainer,然后调用connectToServer方法将我们的客户端类和连接的uri传入就行了
冷知识
通过ContainerProvider#getWebSocketContainer获得WebSocketContainer其实是基于SPI实现的
在Spring的环境中我更推荐大家使用ServletContextAware来获得,代码如下
@Component publicclassJavaxWebSocketContainerimplementsServletContextAware{ privatevolatileWebSocketContainercontainer; publicWebSocketContainergetContainer(){ if(container==null){ synchronized(this){ if(container==null){ container=ContainerProvider.getWebSocketContainer(); } } } returncontainer; } @Override publicvoidsetServletContext(@NonNullServletContextservletContext){ if(container==null){ container=(WebSocketContainer)servletContext .getAttribute("javax.websocket.server.ServerContainer"); } } }
发消息
Sessionsession=... //发送文本消息 session.getAsyncRemote().sendText(Stringmessage); //发送二进制消息 session.getAsyncRemote().sendBinary(ByteBuffermessage); //发送对象消息,会尝试使用Encoder编码 session.getAsyncRemote().sendObject(Objectmessage); //发送ping session.getAsyncRemote().sendPing(ByteBufferbuffer); //发送pong session.getAsyncRemote().sendPong(ByteBufferbuffer);
WebMVC
依赖肯定是必不可少的
implementation'org.springframework.boot:spring-boot-starter-websocket'
服务端
第一步
importorg.springframework.web.socket.WebSocketHandler; importorg.springframework.web.socket.WebSocketMessage; importorg.springframework.web.socket.WebSocketSession; publicclassServletWebSocketServerHandlerimplementsWebSocketHandler{ @Override publicvoidafterConnectionEstablished(@NonNullWebSocketSessionsession)throwsException{ //连接建立 } @Override publicvoidhandleMessage(@NonNullWebSocketSessionsession,@NonNullWebSocketMessage>message)throwsException{ //接收消息 } @Override publicvoidhandleTransportError(@NonNullWebSocketSessionsession,@NonNullThrowableexception)throwsException{ //异常处理 } @Override publicvoidafterConnectionClosed(@NonNullWebSocketSessionsession,@NonNullCloseStatuscloseStatus)throwsException{ //连接关闭 } @Override publicbooleansupportsPartialMessages(){ //是否支持接收不完整的消息 returnfalse; } }
我们实现一个WebSocketHandler来处理WebSocket的连接,关闭,消息和异常
第二步
@Configuration @EnableWebSocket publicclassServletWebSocketServerConfigurerimplementsWebSocketConfigurer{ @Override publicvoidregisterWebSocketHandlers(@NonNullWebSocketHandlerRegistryregistry){ registry //添加处理器到对应的路径 .addHandler(newServletWebSocketServerHandler(),"/websocket") .setAllowedOrigins("*"); } }
首先需要添加@EnableWebSocket来启用WebSocket
然后实现WebSocketConfigurer来注册WebSocket路径以及对应的WebSocketHandler
握手拦截
提供了HandshakeInterceptor来拦截握手
@Configuration @EnableWebSocket publicclassServletWebSocketServerConfigurerimplementsWebSocketConfigurer{ @Override publicvoidregisterWebSocketHandlers(@NonNullWebSocketHandlerRegistryregistry){ registry //添加处理器到对应的路径 .addHandler(newServletWebSocketServerHandler(),"/websocket") //添加握手拦截器 .addInterceptors(newServletWebSocketHandshakeInterceptor()) .setAllowedOrigins("*"); } publicstaticclassServletWebSocketHandshakeInterceptorimplementsHandshakeInterceptor{ @Override publicbooleanbeforeHandshake(ServerHttpRequestrequest,ServerHttpResponseresponse,WebSocketHandlerwsHandler,Mapattributes)throwsException{ //握手之前 //继续握手返回true,中断握手返回false returnfalse; } @Override publicvoidafterHandshake(ServerHttpRequestrequest,ServerHttpResponseresponse,WebSocketHandlerwsHandler,Exceptionexception){ //握手之后 } } }
冷知识
我在集成的时候发现这种方式没办法动态匹配路径,它的路径就是固定的,没办法使用如/websocket/**这样的通配符
我在研究了一下之后发现可以在UrlPathHelper上做点文章
@Configuration @EnableWebSocket publicclassServletWebSocketServerConfigurerimplementsWebSocketConfigurer{ @Override publicvoidregisterWebSocketHandlers(@NonNullWebSocketHandlerRegistryregistry){ if(registryinstanceofServletWebSocketHandlerRegistry){ //替换UrlPathHelper ((ServletWebSocketHandlerRegistry)registry) .setUrlPathHelper(newPrefixUrlPathHelper("/websocket")); } registry //添加处理器到对应的路径 .addHandler(newServletWebSocketServerHandler(),"/websocket/**") .setAllowedOrigins("*"); } publicclassPrefixUrlPathHelperextendsUrlPathHelper{ privateStringprefix; @Override public@NonNullStringresolveAndCacheLookupPath(@NonNullHttpServletRequestrequest){ //获得原本的Path Stringpath=super.resolveAndCacheLookupPath(request); //如果是指定前缀就返回对应的通配路径 if(path.startsWith(prefix)){ returnprefix+"/**"; } returnpath; } } }
因为它内部实际上就是用一个Map
主要是有现成的AntPathMatcher实现通配应该不麻烦才对啊
客户端
第一步
publicclassServletWebSocketClientHandlerimplementsWebSocketHandler{ @Override publicvoidafterConnectionEstablished(@NonNullWebSocketSessionsession)throwsException{ //连接建立 } @Override publicvoidhandleMessage(@NonNullWebSocketSessionsession,@NonNullWebSocketMessage>message)throwsException{ //接收消息 } @Override publicvoidhandleTransportError(@NonNullWebSocketSessionsession,@NonNullThrowableexception)throwsException{ //异常处理 } @Override publicvoidafterConnectionClosed(@NonNullWebSocketSessionsession,@NonNullCloseStatuscloseStatus)throwsException{ //连接关闭 } @Override publicbooleansupportsPartialMessages(){ //是否支持接收不完整的消息 returnfalse; } }
和服务端一样我们需要先实现一个WebSocketHandler来处理WebSocket的连接,关闭,消息和异常
第二步
WebSocketClientclient=newStandardWebSocketClient(); WebSocketHandlerhandler=newServletWebSocketClientHandler(); WebSocketConnectionManagermanager=newWebSocketConnectionManager(client,handler,uri); manager.start();
首先我们需要先new一个StandardWebSocketClient,可以传入一个WebSocketContainer参数,获得该对象的方式我之前已经介绍过了,这边就先略过
然后new一个WebSocketConnectionManager传入WebSocketClient,WebSocketHandler还有路径uri
最后调用一下WebSocketConnectionManager的start方法就可以啦
冷知识
这里如果大家去看WebSocketClient的实现类就会发现有StandardWebSocketClient还有JettyWebSocketClient等等,所以大家可以根据自身项目所使用的容器来选择不同的WebSocketClient实现类
这里给大家贴一小段Spring适配不同容器WebSocket的代码
publicabstractclassAbstractHandshakeHandlerimplementsHandshakeHandler,Lifecycle{ privatestaticfinalbooleantomcatWsPresent; privatestaticfinalbooleanjettyWsPresent; privatestaticfinalbooleanjetty10WsPresent; privatestaticfinalbooleanundertowWsPresent; privatestaticfinalbooleanglassfishWsPresent; privatestaticfinalbooleanweblogicWsPresent; privatestaticfinalbooleanwebsphereWsPresent; static{ ClassLoaderclassLoader=AbstractHandshakeHandler.class.getClassLoader(); tomcatWsPresent=ClassUtils.isPresent( "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler",classLoader); jetty10WsPresent=ClassUtils.isPresent( "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer",classLoader); jettyWsPresent=ClassUtils.isPresent( "org.eclipse.jetty.websocket.server.WebSocketServerFactory",classLoader); undertowWsPresent=ClassUtils.isPresent( "io.undertow.websockets.jsr.ServerWebSocketContainer",classLoader); glassfishWsPresent=ClassUtils.isPresent( "org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler",classLoader); weblogicWsPresent=ClassUtils.isPresent( "weblogic.websocket.tyrus.TyrusServletWriter",classLoader); websphereWsPresent=ClassUtils.isPresent( "com.ibm.websphere.wsoc.WsWsocServerContainer",classLoader); } }
发消息
importorg.springframework.web.socket.*; WebSocketSessionsession=... //发送文本消息 session.sendMessage(newTextMessage(CharSequencemessage); //发送二进制消息 session.sendMessage(newBinaryMessage(ByteBuffermessage)); //发送ping session.sendMessage(newPingMessage(ByteBuffermessage)); //发送pong session.sendMessage(newPongMessage(ByteBuffermessage));
WebFlux
WebFlux的WebSocket不需要额外的依赖包
服务端
第一步
importorg.springframework.web.reactive.socket.WebSocketHandler; importorg.springframework.web.reactive.socket.WebSocketSession; publicclassReactiveWebSocketServerHandlerimplementsWebSocketHandler{ @NonNull @Override publicMonohandle(WebSocketSessionsession){ Mono send=session.send(Flux.create(sink->{ //可以持有sink对象在任意时候调用next发送消息 sink.next(WebSocketMessagemessage); })).doOnError(it->{ //异常处理 }); Mono receive=session.receive() .doOnNext(it->{ //接收消息 }) .doOnError(it->{ //异常处理 }) .then(); @SuppressWarnings("all") Disposabledisposable=session.closeStatus() .doOnError(it->{ //异常处理 }) .subscribe(it->{ //连接关闭 }); returnMono.zip(send,receive).then(); } }
首先需要注意这里的WebSocketHandler和WebSocketSession是reactive包下的
通过WebSocketSession#send方法来持有一个FluxSink
通过WebSocketSession#receive来订阅消息
通过WebSocketSession#closeStatus来订阅连接关闭事件
第二步
@Component publicclassReactiveWebSocketServerHandlerMappingextendsSimpleUrlHandlerMapping{ publicReactiveWebSocketServerHandlerMapping(){ Mapmap=newHashMap<>(); map.put("/websocket/**",newReactiveWebSocketServerHandler()); setUrlMap(map); setOrder(100); } }
注册一个HandlerMapping同时配置路径和对应的WebSocketHandler
第三步
@Configuration(proxyBeanMethods=false) publicclassReactiveWebSocketConfiguration{ @Bean publicWebSocketHandlerAdapterwebSocketHandlerAdapter(){ returnnewWebSocketHandlerAdapter(); } }
注入WebSocketHandlerAdapter
冷知识
我们自定义的HandlerMapping需要设置order,如果不设置,默认为Ordered.LOWEST_PRECEDENCE,会导致这个HandlerMapping被放在最后,当有客户端连接上来时会被其他的HandlerMapping优先匹配上而连接失败
客户端
第一步
publicclassReactiveWebSocketClientHandlerimplementsWebSocketHandler{ @NonNull @Override publicMonohandle(WebSocketSessionsession){ Mono send=session.send(Flux.create(sink->{ //可以持有sink对象在任意时候调用next发送消息 sink.next(WebSocketMessagemessage); })).doOnError(it->{ //处理异常 }); Mono receive=session.receive() .doOnNext(it->{ //接收消息 }) .doOnError(it->{ //异常处理 }) .then(); @SuppressWarnings("all") Disposabledisposable=session.closeStatus() .doOnError(it->{ //异常处理 }) .subscribe(it->{ //连接关闭 }); returnMono.zip(send,receive).then(); } }
客户端WebSocketHandler的写法和服务端的一样
第二步
importorg.springframework.web.reactive.socket.client.WebSocketClient; WebSocketClientclient=ReactorNettyWebSocketClient(); WebSocketHandlerhandler=newReactiveWebSocketClientHandler(); client.execute(uri,handler).subscribe();
首先我们需要先new一个ReactorNettyWebSocketClient
然后调用一下WebSocketClient的execute方法传入路径uri和WebSocketHandler并继续调用subscribe方法就行啦
冷知识
和WebMVC中的WebSocketClient一样,Reactive包中的WebSocketClient也有很多实现类,比如ReactorNettyWebSocketClient,JettyWebSocketClient,UndertowWebSocketClient,TomcatWebSocketClient等等,也是需要大家基于自身项目的容器使用不同的实现类
这里也给大家贴一小段Reactive适配不同容器WebSocket的代码
publicclassHandshakeWebSocketServiceimplementsWebSocketService,Lifecycle{ privatestaticfinalbooleantomcatPresent; privatestaticfinalbooleanjettyPresent; privatestaticfinalbooleanjetty10Present; privatestaticfinalbooleanundertowPresent; privatestaticfinalbooleanreactorNettyPresent; static{ ClassLoaderloader=HandshakeWebSocketService.class.getClassLoader(); tomcatPresent=ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler",loader); jettyPresent=ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory",loader); jetty10Present=ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer",loader); undertowPresent=ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler",loader); reactorNettyPresent=ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse",loader); } }
发消息
我们需要使用在WebSocketHandler中获得的FluxSink
importorg.springframework.web.reactive.socket.CloseStatus; importorg.springframework.web.reactive.socket.WebSocketMessage; importorg.springframework.web.reactive.socket.WebSocketSession; publicclassReactiveWebSocket{ privatefinalWebSocketSessionsession; privatefinalFluxSinksender; publicReactiveWebSocket(WebSocketSessionsession,FluxSink sender){ this.session=session; this.sender=sender; } publicStringgetId(){ returnsession.getId(); } publicURIgetUri(){ returnsession.getHandshakeInfo().getUri(); } publicvoidsend(Objectmessage){ if(messageinstanceofWebSocketMessage){ sender.next((WebSocketMessage)message); }elseif(messageinstanceofString){ //发送文本消息 sender.next(session.textMessage((String)message)); }elseif(messageinstanceofDataBuffer){ //发送二进制消息 sender.next(session.binaryMessage(factory->(DataBuffer)message)); }elseif(messageinstanceofByteBuffer){ 发送二进制消息 sender.next(session.binaryMessage(factory->factory.wrap((ByteBuffer)message))); }elseif(messageinstanceofbyte[]){ 发送二进制消息 sender.next(session.binaryMessage(factory->factory.wrap((byte[])message))); }else{ thrownewIllegalArgumentException("Messagetypenotmatch"); } } publicvoidping(){ //发送ping sender.next(session.pingMessage(factory->factory.wrap(ByteBuffer.allocate(0)))); } publicvoidpong(){ //发送pong sender.next(session.pongMessage(factory->factory.wrap(ByteBuffer.allocate(0)))); } publicvoidclose(CloseStatusreason){ sender.complete(); session.close(reason).subscribe(); } }
Java-WebSocket
这是一个纯java的第三方库,专门用于实现WebSocket
SocketIO
该库使用的协议是经过自己封装的,支持很多的语言,提供了统一的接口,所以需要使用它提供的Server和Client来连接,如socket.io-server-java和socket.io-client-java
这个库我了解下来主要用于实时聊天等场景,所以如果只是普通的WebSocket功能就有点大材小用了
Netty
这个大家应该都比较熟悉了,就算没用过肯定也听过
网上的文档和示例也非常多,我这里就不介绍有的没的了,Github传送门。
审核编辑:刘清
-
二进制
+关注
关注
2文章
772浏览量
41553 -
JAVA语言
+关注
关注
0文章
138浏览量
20061 -
缓存器
+关注
关注
0文章
63浏览量
11639 -
WebSocket
+关注
关注
0文章
27浏览量
3723
原文标题:WebSocket 的 6 种集成方式
文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论