首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。 本篇基于Tomcat10.0.6。建议收藏起来慢慢看。
文章目录
一、前言
WebSocket是一种全双工通信协议,即客户端可以向服务端发送请求,服务端也可以主动向客户端推送数据。这样的特点,使得它在一些实时性要求比较高的场景效果斐然(比如微信朋友圈实时通知、在线协同编辑等)。主流浏览器以及一些常见服务端通信框架(Tomcat、netty、undertow、webLogic等)都对WebSocket进行了技术支持。那么,WebSocket具体是什么?为什么会出现WebSocket?如何做到全双工通信?解决了什么问题?
![]()
二、什么是WebSocket
1、HTTP/1.1的缺陷
HTTP/1.1最初是为网络中超文本资源(HTML),请求-响应传输而设计的,后来支持了传输更多类型的资源,如图片、视频等,但都没有改变它单向的请求-响应模式。
随着互联网的日益壮大,HTTP/1.1功能使用上已体现捉襟见肘的疲态。虽然可以通过某些方式满足需求(如Ajax、Comet),但是性能上还是局限于HTTP/1.1,那么HTTP/1.1有哪些缺陷呢:
- 请求-响应模式,只能客户端发送请求给服务端,服务端才可以发送响应数据给客户端。
- 传输数据为文本格式,且请求/响应头部冗长重复。
(为了区分HTTP/1.1和HTTP/1.2,下面描述中,HTTP均代表HTTP/1.1)
2、WebSocket发展历史
(1)背景
在WebSocket出现之前,主要通过长轮询和HTTP长连接实现实时数据更新,这种方式有个统称叫Comet,Tomcat8.5之前有对Comet基于流的HTTP长连接做支持,后来因为WebSocket的成熟和标准化,以及Comet自身依然是基于HTTP,在性能消耗和瓶颈上无法跳脱HTTP,就把Comet废弃了。
还有一个SPDY技术,也对HTTP进行了改进,多路复用流、服务器推送等,后来演化成HTTP/2.0,因为适用场景和解决的问题不同,暂不对HTTP/2.0做过多解释,不过对于HTTP/2.0和WebSocket在Tomcat实现中都是作为协议升级来处理的。
(Comet和SPDY的原理不是本篇重点,没有展开讲解,感兴趣的同学可自行百度)
(2)历史
在这种背景下,HTML5制定了WebSocket
- 筹备阶段,
WebSocket被划分为HTML5标准的一部分,2008年6月,Michael Carter进行了一系列讨论,最终形成了称为WebSocket的协议。 - 2009年12月,Google Chrome 4是第一个提供标准支持的浏览器,默认情况下启用了
WebSocket。 - 2010年2月,
WebSocket协议的开发从W3C和WHATWG小组转移到IETF(TheInternet Engineering Task Force),并在Ian Hickson的指导下进行了两次修订。 - 2011年,
IETF将WebSocket协议标准化为RFC 6455起,大多数Web浏览器都在实现支持WebSocket协议的客户端API。此外,已经开发了许多实现WebSocket协议的Java库。 - 2013年,发布JSR356标准,Java API for WebSocket。
(为什么要去了解WebSocket的发展历史和背景呢?个人认为可以更好的理解某个技术实现的演变历程,比如Tomcat,早期有Comet没有WebSocket时,Tomcat就对Comet做了支持,后来有WebSocket了,但是还没出JSR356标准,Tomcat就对Websocket做了支持,自定义API,再后来有了JSR356,Tomcat立马紧跟潮流,废弃自定义的API,实现JSR356那一套,这就使得在Tomcat7使用WebSocket的同学,想升为Tomcat8(其实Tomcat7.0.47之后就是JSR356标准了),发现WebSocket接入方式变了,而且一些细节也变了。)
3、WebSocket握手和双向通信
(1)定义
WebSocket全双工通信协议,在客户端和服务端建立连接后,可以持续双向通信,和HTTP同属于应用层协议,并且都依赖于传输层的TCP/IP协议。
虽然WebSocket有别于HTTP,是一种新协议,但是RFC 6455中规定:
it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries.
WebSocket通过HTTP端口80和443进行工作,并支持HTTP代理和中介,从而使其与HTTP协议兼容。- 为了实现兼容性,
WebSocket握手使用HTTPUpgrade头从HTTP协议更改为WebSocket协议。 Websocket使用ws或wss的统一资源标志符(URI),分别对应明文和加密连接。
(2)握手(建立连接)
在双向通信之前,必须通过握手建立连接。Websocket通过 HTTP/1.1 协议的101状态码进行握手,首先客户端(如浏览器)发出带有特殊消息头(Upgrade、Connection)的请求到服务器,服务器判断是否支持升级,支持则返回响应状态码101,表示协议升级成功,对于WebSocket就是握手成功。
客户端请求示例:
GET /test HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: tFGdnEL/5fXMS9yKwBjllg==
Origin: http://example.com
Sec-WebSocket-Protocol: v10.stomp, v11.stomp, v12.stomp
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Version: 13
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
Connection必须设置Upgrade,表示客户端希望连接升级。Upgrade: websocket表明协议升级为websocket。Sec-WebSocket-Key字段内记录着握手过程中必不可少的键值,由客户端(浏览器)生成,可以尽量避免普通HTTP请求被误认为Websocket协议。Sec-WebSocket-Version表示支持的Websocket版本。RFC6455要求使用的版本是13。Origin字段是必须的。如果缺少origin字段,WebSocket服务器需要回复HTTP 403 状态码(禁止访问),通过Origin可以做安全校验。
服务端响应示例:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HaA6EjhHRejpHyuO0yBnY4J4n3A=
Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15
Sec-WebSocket-Protocol: v12.stomp
- 1
- 2
- 3
- 4
- 5
- 6
Sec-WebSocket-Accept的字段值是由握手请求中的Sec-WebSocket-Key的字段值生成的。成功握手确立WebSocket连接之后,通信时不再使用HTTP的数据帧,而采用WebSocket独立的数据帧。
![]()
(3)消息帧
WebSocket使用二进制消息帧作为双向通信的媒介。何为消息帧?发送方将每个应用程序消息拆分为一个或多个帧,通过网络将它们传输到目的地,并重新组装解析出一个完整消息。
有别于HTTP/1.1文本消息格式(冗长的消息头和分隔符等),WebSocket消息帧规定一定的格式,以二进制传输,更加短小精悍。二者相同之处就是都是基于TCP/IP流式协议(没有规定消息边界)。
如下是消息帧的基本结构图:
![]()
FIN: 1 bit,表示该帧是否为消息的最后一帧。1-是,0-否。RSV1,RSV2,RSV3: 1 bit each,预留(3位),扩展的预留标志。一般情况为0,除非协商的扩展定义为非零值。如果接收到非零值且不为协商扩展定义,接收端必须使连接失败。Opcode: 4 bits,定义消息帧的操作类型,如果接收到一个未知Opcode,接收端必须使连接失败。(0x0-延续帧,0x1-文本帧,0x2-二进制帧,0x8-关闭帧,0x9-PING帧,0xA-PONG帧(在接收到PING帧时,终端必须发送一个PONG帧响应,除非它已经接收到关闭帧),0x3-0x7保留给未来的非控制帧,0xB-F保留给未来的控制帧)Mask: 1 bit,表示该帧是否为隐藏的,即被加密保护的。1-是,0-否。Mask=1时,必须传一个Masking-key,用于解除隐藏(客户端发送消息给服务器端,Mask必须为1)。Payload length: 7 bits, 7+16 bits, or 7+64 bits,有效载荷数据的长度(扩展数据长度+应用数据长度,扩展数据长度可以为0)。
if 0-125, that is the payload length. If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length. If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length.
Masking-key: 0 or 4 bytes,用于解除帧隐藏(加密)的key,Mask=1时不为空,Mask=0时不用传。Payload data: (x+y) bytes,有效载荷数据包括扩展数据(x bytes)和应用数据(y bytes)。有效载荷数据是用户真正要传输的数据。
这样的二进制消息帧设计,与HTTP协议相比,WebSocket协议可以提供约500:1的流量减少和3:1的延迟减少。
(4)挥手(关闭连接)
挥手相对于握手要简单很多,客户端和服务器端任何一方都可以通过发送关闭帧来发起挥手请求。发送关闭帧的一方,之后不再发送任何数据给对方;接收到关闭帧的一方,如果之前没有发送过关闭帧,则必须发送一个关闭帧作为响应。关闭帧中可以携带关闭原因。
在发送和接收一个关闭帧消息之后,就认为WebSocket连接已关闭,且必须关闭底层TCP连接。
除了通过关闭握手来关闭连接外,WebSocket连接也可能在另一方离开或底层TCP连接关闭时突然关闭。
4、WebSocket优点
-
较少的控制开销。在连接建立后,服务器和客户端之间交换数据时,用于协议控制的数据包头部相对于HTTP请求每次都要携带完整的头部,显著减少。
-
更强的实时性。由于协议是全双工的,所以服务器可以随时主动给客户端下发数据。相对于HTTP请求需要等待客户端发起请求服务端才能响应,延迟明显更少。
-
保持连接状态。与HTTP不同的是,
Websocket需要先建立连接,这就使得其成为一种有状态的协议,之后通信时可以省略部分状态信息。而HTTP请求可能需要在每个请求都携带状态信息(如身份认证等)。 -
更好的二进制支持。
Websocket定义了二进制帧,相对HTTP,可以更轻松地处理二进制内容。 -
支持扩展。
Websocket定义了扩展,用户可以扩展协议、实现部分自定义的子协议。 -
更好的压缩效果。相对于HTTP压缩,
Websocket在适当的扩展支持下,可以沿用之前内容的上下文,在传递类似的数据时,可以显著提高压缩率。
![]()
三、Java API for WebSocket(JSR356)
JSR356在Java EE7时归为Java EE标准的一部分(后来Java EE更名为Jakarta EE,世上再无Java EE,以下统一称Jakarta EE),所有兼容Jakarta EE的应用服务器,都必须遵循JSR356标准的WebSocket协议API。
![]()
根据JSR356规定, 建立WebSocket连接的服务器端和客户端,两端对称,可以互相通信,差异性较小,抽象成API,就是一个个Endpoint(端点),只不过服务器端的叫ServerEndpoint,客户端的叫ClientEndpoint。客户端向服务端发送WebSocket握手请求,建立连接后就创建一个ServerEndpoint对象。(这里的Endpoint和Tomcat连接器里的AbstractEndpoint名称上有点像,但是两个毫不相干的东西,就像周杰伦和周杰的关系。)
ServerEndpoint和ClientEndpoint在API上差异也很小,有相同的生命周期事件(OnOpen、OnClose、OnError、OnMessage),不同之处是ServerEndpoint作为服务器端点,可以指定一个URI路径供客户端连接,ClientEndpoint没有。
1、服务端API
服务器端的Endpoint有两种实现方式,一种是注解方式@ServerEndpoint,一种是继承抽象类Endpoint。
(1)注解方式@ServerEndpoint
首先看看@ServerEndpoint有哪些要素:
value,可以指定一个URI路径标识一个Endpoint。subprotocols,用户在WebSocket协议下自定义扩展一些子协议。decoders,用户可以自定义一些消息解码器,比如通信的消息是一个对象,接收到消息可以自动解码封装成消息对象。encoders,有解码器就有编码器,定义解码器和编码器的好处是可以规范使用层消息的传输。configurator,ServerEndpoint配置类,主要提供ServerEndpoint对象的创建方式扩展(如果使用Tomcat的WebSocket实现,默认是反射创建ServerEndpoint对象)。
![]()
@ServerEndpoint可以注解到任何类上,但是想实现服务端的完整功能,还需要配合几个生命周期的注解使用,这些生命周期注解只能注解在方法上:
@OnOpen建立连接时触发。@OnClose关闭连接时触发。@OnError发生异常时触发。@OnMessage接收到消息时触发。
(2)继承抽象类Endpoint
继承抽象类Endpoint,重写几个生命周期方法。
![]()
怎么没有onMessage方法,实现onMessage还需要继承实现一个接口jakarta.websocket.MessageHandler,MessageHandler接口又分为Partial和Whole,实现的MessageHandler需要在onOpen触发时注册到jakarta.websocket.Session中。
![]()
继承抽象类Endpoint的方式相对于注解方式要麻烦的多,除了继承Endpoint和实现接口MessageHandler外,还必须实现一个jakarta.websocket.server.ServerApplicationConfig来管理Endpoint,比如给Endpoint分配URI路径。
![]()
而encoders、decoders、configurator等配置信息由jakarta.websocket.server.ServerEndpointConfig管理,默认实现jakarta.websocket.server.DefaultServerEndpointConfig。
所以如果使用 Java 版WebSocket服务器端实现首推注解方式。
2、客户端API
对于客户端API,也是有注解方式和继承抽象类Endpoint方式。
- 注解方式,只需要将
@ServerEndpoint换成@ClientEndpoint。 - 继承抽象类
Endpoint方式,需要一个jakarta.websocket.ClientEndpointConfig来管理encoders、decoders、configurator等配置信息,默认实现jakarta.websocket.DefaultClientEndpointConfig。
3、上下文Session
WebSocket是一个有状态的连接,建立连接后的通信都是通过jakarta.websocket.Session保持状态,一个连接一个Session,每一个Session有一个唯一标识Id。
Session的主要职责涉及:
- 基础信息管理(
request信息(getRequestURI、getRequestParameterMap、getPathParameters等)、协议版本getProtocolVersion、子协议getNegotiatedSubprotocol等)。 - 连接管理(状态判断
isOpen、接收消息的MessageHandler、发送消息的异步远程端点RemoteEndpoint.Async和同步远程端点RemoteEndpoint.Basic等)。
4、HandshakeRequest 和 HandshakeResponse
HandshakeRequest 和 HandshakeResponse了解即可,这两个接口主要用于WebScoket握手升级过程中握手请求响应的封装,如果只是单纯使用WebSocket,不会接触到这两个接口。
(1)HandshakeRequest
![]()
(2)HandshakeResponse
![]()
Sec-WebSocket-Accept根据客户端传的Sec-WebSocket-Key生成,如下是Tomcat10.0.6 WebSocket源码实现中生成Sec-WebSocket-Accept的算法:
private static String getWebSocketAccept(String key) {
byte[] digest = ConcurrentMessageDigest.digestSHA1(
key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);
return Base64.encodeBase64String(digest);
}
- 1
- 2
- 3
- 4
- 5
5、WebSocketContainer
jakarta.websocket.WebSocketContainer顾名思义,就是WebSocket的容器,集大成者。其主要职责包括但不限于connectToServer,客户端连接服务器端,基于浏览器的WebSocket客户端连接服务器端,由浏览器支持,但是基于Java版的WebSocket客户端就可以通过WebSocketContainer#connectToServer向服务端发起连接请求。
![]()
![]()
四、WebSocket基于Tomcat应用
(如下使用的是javax.websocket包,未使用最新的jakarta.websocket,主要是测试项目基于SpringBoot+Tomcat9.x的,Java API for WebSocket版本需要保持一致。)
1、服务器端实现
(1)@ServerEndpoint注解方式
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@ServerEndpoint(value = "/ws/test/{userId}", encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}, configurator = MyServerConfigurator.class)
public class WebSocketServerEndpoint {
private Session session;
private String userId;
@OnOpen
public void OnOpen(Session session, @PathParam(value = "userId") String userId) {
this.session = session;
this.userId = userId;
// 建立连接后,将连接存到一个map里
endpointMap.put(userId, this);
Message message = new Message(0, "connected, hello " + userId);
sendMsg(message);
}
@OnClose
public void OnClose() {
// 关闭连接时触发,从map中删除连接
endpointMap.remove(userId);
System.out.println("server closed...");
}
@OnMessage
public void onMessage(Message message) {
System.out.println("server recive message=" + message.toString());
}
@OnError
public void onError(Throwable t) throws Throwable {
this.session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "系统异常"));
t.printStackTrace();
}
/**
* 群发
* @param data
*/
public void sendAllMsg(Message data) {
for (WebSocketServerEndpoint value : endpointMap.values()) {
value.sendMsgAsync(data);
}
}
/**
* 推送消息给指定 userId
* @param data
* @param userId
*/
public void sendMsg(Message data, String userId) {
WebSocketServerEndpoint endpoint = endpointMap.get(userId);
if (endpoint == null) {
System.out.println("not conected to " + userId);
return;
}
endpoint.sendMsgAsync(data);
}
private void sendMsg(Message data) {
try {
this.session.getBasicRemote().sendObject(data);
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (EncodeException e) {
e.printStackTrace();
}
}
private void sendMsgAsync(Message data) {
this.session.getAsyncRemote().sendObject(data);
}
// 存储建立连接的Endpoint
private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
每一个客户端与服务器端建立连接后,都会生成一个WebSocketServerEndpoint,可以通过一个Map将其与userId对应存起来,为后续群发广播和单独推送消息给某个客户端提供便利。
注意:@ServerEndpoint的encoders、decoders、configurator等配置信息在实际使用中可以不定义,如果项目简单,完全可以用默认的。
如果通信消息被封装成一个对象,如示例的Message(因为源码过于简单就不展示了,属性主要有code、msg、data),就必须提供编码器和解码器。也可以在每次发送消息时硬编码转为字符串,在接收到消息时转为Message。有了编码器和解码器,显得比较规范,转为字符串由编码器做,字符串转为对象由解码器做,但也使得架构变复杂了,视项目需求而定。
![]()
![]()
Configurator的用处就是自定义Endpoint对象创建方式,默认Tomcat提供的是通过反射。WebScoket是每个连接都会创建一个Endpoint对象,如果连接比较多,很频繁,通过反射创建,用后即毁,可能不是一个好主意,所以可以搞一个对象池,用过回收,用时先从对象池中拿,有就重置,省去实例化分配内存等消耗过程。
![]()
如果使用SpringBoot内置Tomcat、undertow、Netty等,接入WebSocket时除了加@ServerEndpoint还需要加一个@Component,再给Spring注册一个ServerEndpointExporter类,这样,服务端Endpoint就交由Spring去扫描注册了。
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
ServerEndpointExporter serverEndpointExporter = new ServerEndpointExporter();
return serverEndpointExporter;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
外置Tomcat就不需要这么麻烦,Tomcat会默认扫描classpath下带有@ServerEndpoint注解的类。(SpringBoot接入Websocket后续会单独出文章讲解,也挺有意思的)
(2)继承抽象类Endpoint方式
import javax.websocket.*;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
public class WebSocketServerEndpoint extends Endpoint {
private Session session;
private String userId;
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
this.session = session;
this.userId = session.getPathParameters().get("userId");
session.addMessageHandler(new MessageHandler());
endpointMap.put(userId, this);
Message message = new Message(0, "connected, hello " + userId);
sendMsg(message);
}
@Override
public void onClose(Session session, CloseReason closeReason) {
endpointMap.remove(userId);
}
@Override
public void onError(Session session, Throwable throwable) {
throwable.printStackTrace();
}
/**
* 群发
* @param data
*/
public void sendAllMsg(Message data) {
for (WebSocketServerEndpoint value : endpointMap.values()) {
value.sendMsgAsync(data);
}
}
/**
* 推送消息给指定 userId
* @param data
* @param userId
*/
public void sendMsg(Message data, String userId) {
WebSocketServerEndpoint endpoint = endpointMap.get(userId);
if (endpoint == null) {
System.out.println("not conected to " + userId);
return;
}
endpoint.sendMsgAsync(data);
}
private void sendMsg(Message data) {
try {
this.session.getBasicRemote().sendObject(data);
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (EncodeException e) {
e.printStackTrace();
}
}
private void sendMsgAsync(Message data) {
this.session.getAsyncRemote().sendObject(data);
}
private class MessageHandler implements javax.websocket.MessageHandler.Whole {
@Override
public void onMessage(Message message) {
System.out.println("server recive message=" + message.toString());
}
}
private static ConcurrentHashMap endpointMap = new ConcurrentHashMap();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
继承抽象类Endpoint方式比加注解@ServerEndpoint方式麻烦的很,主要是需要自己实现MessageHandler和ServerApplicationConfig。@ServerEndpoint的话都是使用默认的,原理上差不多,只是注解更自动化,更简洁。
MessageHandler做的事情,一个@OnMessage就搞定了,ServerApplicationConfig做的URI映射、decoders、encoders,configurator等,一个@ServerEndpoint就可以了。
import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.Endpoint;
import javax.websocket.server.ServerApplicationConfig;
import javax.websocket.server.ServerEndpointConfig;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class MyServerApplicationConfig implements ServerApplicationConfig {
@Override
public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {
Set<ServerEndpointConfig> result = new HashSet<ServerEndpointConfig>();
List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();
decoderList.add(MessageDecoder.class);
List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();
encoderList.add(MessageEncoder.class);
if (set.contains(WebSocketServerEndpoint3.class)) {
ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder
.create(WebSocketServerEndpoint3.class, "/ws/test3")
.decoders(decoderList)
.encoders(encoderList)
.configurator(new MyServerConfigurator())
.build();
result.add(serverEndpointConfig);
}
return result;
}
@Override
public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {
return set;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
如果使用SpringBoot内置Tomcat,则不需要ServerApplicationConfig了,但是需要给Spring注册一个ServerEndpointConfig。
@Bean
public ServerEndpointConfig serverEndpointConfig() {
List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();
decoderList.add(MessageDecoder.class);
List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();
encoderList.add(MessageEncoder.class);
ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder
.create(WebSocketServerEndpoint3.class, "/ws/test3/{userId}")
.decoders(decoderList)
.encoders(encoderList)
.configurator(new MyServerConfigurator())
.build();
return serverEndpointConfig;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
(3)早期Tomcat7中Server端实现对比
Tomcat7早期版本7.0.47之前还没有出JSR 356时,自己搞了一套接口,其实就是一个Servlet。
和遵循JSR356标准的版本对比,有一个比较大的变化是,createWebSocketInbound创建生命周期事件处理器StreamInbound的时机是WebSocket协议升级之前,此时还可以通过用户线程缓存(ThreadLocal等)的HttpServletRequest对象,获取一些请求头等信息。
而遵循JSR356标准的版本实现,创建生命周期事件处理的Endpoint是在WebSocket协议升级完成(经过HTTP握手)之后创建的,而WebSocket握手成功给客户端响应101前,会结束销毁HttpServletRequest对象,此时是获取不到请求头等信息的。
import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServletRequest;
@WebServlet(urlPatterns = "/ws/test")
public class MyWeSocketServlet extends WebSocketServlet {
@Override
protected StreamInbound createWebSocketInbound(String subProtocol, HttpServletRequest request) {
MyMessageInbound messageInbound = new MyMessageInbound(subProtocol, request);
return messageInbound;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.WsOutbound;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
public class MyMessageInbound extends MessageInbound {
private String subProtocol;
private HttpServletRequest request;
public MyMessageInbound(String subProtocol, HttpServletRequest request) {
this.subProtocol = subProtocol;
this.request = request;
}
@Override
protected void onOpen(WsOutbound outbound) {
String msg = "connected, hello";
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
try {
outbound.writeBinaryMessage(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void onClose(int status) {
}
@Override
protected void onBinaryMessage(ByteBuffer byteBuffer) throws IOException {
// 接收到客户端信息
}
@Override
protected void onTextMessage(CharBuffer charBuffer) throws IOException {
// 接收到客户端信息
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
2、客户端实现
(1)前端js版
js版的客户端主要依托浏览器对WebScoket的支持,在生命周期事件触发上和服务器端的差不多,这也应证了建立WebSocket连接的两端是对等的。
编写WebSocket客户端需要注意以下几点:
- 和服务器端商议好传输的消息的格式,一般为json字符串,比较直观,编码解码都很简单,也可以是其他商定的格式。
- 需要心跳检测,定时给服务器端发送消息,保持连接正常。
- 正常关闭连接,即关闭浏览器窗口前主动关闭连接,以免服务器端抛异常。
- 如果因为异常断开连接,支持重连。
// 对websocket进行简单封装
WebSocketOption.prototype = {
// 创建websocket操作
createWebSocket: function () {
try {
if('WebSocket' in window) {
this.ws = new WebSocket(this.wsUrl);
} else if('MozWebSocket' in window) {
this.ws = new MozWebSocket(this.wsUrl);
} else {
alert("您的浏览器不支持websocket协议,建议使用新版谷歌、火狐等浏览器,请勿使用IE10以下浏览器,360浏览器请使用极速模式,不要使用兼容模式!");
}
this.lifeEventHandle();
} catch(e) {
this.reconnect(this.wsUrl);
console.log(e);
}
},
// 生命周期事件操作
lifeEventHandle: function() {
var self = this;
this.ws.onopen = function (event) {
self.connectCount = 1;
//心跳检测重置
if (self.heartCheck == null) {
self.heartCheck = new HeartCheckObj(self.ws);
}
self.sendMsg(5, "")
self.heartCheck.reset().start();
console.log("websocket连接成功!" + new Date().toUTCString());
};
this.ws.onclose = function (event) {
// 全部设置为初始值
self.heartCheck = null;
self.reconnect(self.wsUrl);
console.log("websocket连接关闭!" + new Date().toUTCString());
};
this.ws.onerror = function () {
self.reconnect(self.wsUrl);
console.log("websocket连接错误!");
};
//如果获取到消息,心跳检测重置
this.ws.onmessage = function (event) {
//心跳检测重置
if (self.heartCheck == null) {
self.heartCheck = new HeartCheckObj(self.ws);
}
self.heartCheck.reset().start();
console.log("websocket收到消息啦:" + event.data);
// 业务处理
// 接收到的消息可以放到localStorage里,然后在其他地方取出来
}
},
// 断线重连操作
reconnect: function() {
var self = this;
if (this.lockReconnect) return;
console.log(this.lockReconnect)
this.lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多,重连时间设置按倍数增加
setTimeout(function () {
self.createWebSocket(self.wsUrl);
self.lockReconnect = false;
self.connectCount++;
}, 10000 * (self.connectCount));
},
// 发送消息操作
sendMsg: function(cmd, data) {
var sendData = {"cmd": cmd, "msg": data};
try {
this.ws.send(JSON.stringify(sendData));
} catch(err) {
console.log("发送数据失败, err=" + err)
}
},
// 关闭websocket接口操作
closeWs: function() {
this.ws.close();
}
}
/**
* 封装心跳检测对象
*/
function HeartCheckObj(ws) {
this.ws = ws;
// 心跳时间
this.timeout = 10000;
// 定时事件
this.timeoutObj = null;
// 自动断开事件
this.serverTimeoutObj = null;
}
HeartCheckObj.prototype = {
setWs: function(ws) {
this.ws = ws;
},
reset: function() {
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
// 开始心跳检测
start: function() {
var self = this;
this.timeoutObj = setTimeout(function() {
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
var ping = {"cmd":1, "msg": "ping"};
self.ws.send(JSON.stringify(ping));
//如果onmessage那里超过一定时间还没重置,说明后端主动断开了
self.serverTimeoutObj = setTimeout(function() {
//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
self.ws.close();
}, self.timeout)
}, self.timeout)
}
}
/**
* -------------------------
* 创建websocket的主流程 *
* -------------------------
*/
var currentDomain = document.domain;
var wsUrl = "ws://" + currentDomain + "/test"
var webSocketOption = new WebSocketOption(wsUrl)
webSocketOption.createWebSocket()
// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
webSocketOption.closeWs();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
这里推荐一个在线测试WebSocket连接和发送消息的网站easyswoole.com/wstool.html:
![]()
真的很牛逼,很方便,很简单。还有源码github:https://github.com/easy-swoole/wstool,感兴趣可以看看。
(2)@ClientEndpoint注解方式
Java版客户端不用多说,把@ServerEndpoint换成@ClientEndpoint就可以了,其他都一样。@ClientEndpoint比@ServerEndpoint就少了一个value,不需要设置URI。
@ClientEndpoint(encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class})
public class WebSocketClientEndpoint {
private Session session;
@OnOpen
public void OnOpen(Session session) {
this.session = session;
Message message = new Message(0, "connecting...");
sendMsg(message);
}
@OnClose
public void OnClose() {
Message message = new Message(0, "client closed...");
sendMsg(message);
System.out.println("client closed");
}
@OnMessage
public void onMessage(Message message) {
System.out.println("client recive message=" + message.toString());
}
@OnError
public void onError(Throwable t) throws Throwable {
t.printStackTrace();
}
public void sendMsg(Message data) {
try {
this.session.getBasicRemote().sendObject(data);
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (EncodeException e) {
e.printStackTrace();
}
}
public void sendMsgAsync(Message data) {
this.session.getAsyncRemote().sendObject(data);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
连接服务器端:
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(WebSocketClientEndpoint.class,
new URI("ws://localhost:8080/ws/test"));
- 1
- 2
- 3
(3)继承抽象类Endpoint方式
继承抽象类Endpoint方式也和服务器端的差不多,但是不需要实现ServerApplicationConfig,需要实例化一个ClientEndpointConfig。Endpoint实现类和服务器端的一样,就省略了,如下是连接服务器端的代码:
ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build();
container.connectToServer(new WebSocketClientEndpoint(),clientEndpointConfig,
new URI("ws://localhost:8080/websocket/hello"));
- 1
- 2
- 3
3、基于Nginx反向代理注意事项
一般web服务器会用Nginx做反向代理,经过Nginx反向转发的HTTP请求不会带上Upgrade和Connection消息头,所以需要在Nginx配置里显式指定需要升级为WebSocket的URI带上这两个头:
location /chat/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_connect_timeout 4s;
proxy_read_timeout 7200s;
proxy_send_timeout 12s;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
默认情况下,如果代理服务器在60秒内没有传输任何数据,连接将被关闭。这个超时可以通过proxy_read_timeout指令来增加。或者,可以将代理服务器配置为定期发送WebSocket PING帧以重置超时并检查连接是否仍然活跃。
具体可参考:http://nginx.org/en/docs/http/websocket.html
![]()
五、WebSocket在Tomcat中的源码实现
所有兼容Java EE的应用服务器,必须遵循JSR356 WebSocket Java API标准,Tomcat也不例外。而且Tomcat也是支持WebSocket最早的Web应用服务器框架(之一),在还没有出JSR356标准时,就已经自定义了一套WebSocket API,但是JSR356一出,不得不改弦更张。
通过前面的讲解,在使用上完全没有问题,但是有几个问题完全是黑盒的:
Server Endpoint是如何被扫描加载的?WebSocket是如何借助HTTP 进行握手升级的?WebSocket建立连接后如何保持连接不断,互相通信的?
(如下源码解析,需要对Tomcat连接器源码有一定了解)
1、WsSci初始化
Tomcat 提供了一个org.apache.tomcat.websocket.server.WsSci类来初始化、加载WebSocket。从类名上顾名思义,利用了Sci加载机制,何为Sci加载机制?就是实现接口 jakarta.servlet.ServletContainerInitializer,在Tomcat部署装载Web项目(org.apache.catalina.core.StandardContext#startInternal)时主动触发ServletContainerInitializer#onStartup,做一些扩展的初始化操作。
WsSci主要做了一件事,就是扫描加载Server Endpoint,并将其加到WebSocket容器里jakarta.websocket.WebSocketContainer。
WsSci主要会扫描三种类:
- 加了
@ServerEndpoint的类。 Endpoint的子类。ServerApplicationConfig的子类。
(1)WsSci#onStartup
@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class,
Endpoint.class})
public class WsSci implements ServletContainerInitializer {
@Override
public void onStartup(Set<Class<?>> clazzes, ServletContext ctx)
throws ServletException {
WsServerContainer sc = init(ctx, true);
if (clazzes == null || clazzes.size() == 0) {
return;
}
// Group the discovered classes by type
Set<ServerApplicationConfig> serverApplicationConfigs = new HashSet<>();
Set<Class<? extends Endpoint>> scannedEndpointClazzes = new HashSet<>();
Set<Class<?>> scannedPojoEndpoints = new HashSet<>();
try {
// wsPackage is "jakarta.websocket."
String wsPackage = ContainerProvider.class.getName();
wsPackage = wsPackage.substring(0, wsPackage.lastIndexOf('.') + 1);
for (Class<?> clazz : clazzes) {
JreCompat jreCompat = JreCompat.getInstance();
int modifiers = clazz.getModifiers();
if (!Modifier.isPublic(modifiers) ||
Modifier.isAbstract(modifiers) ||
Modifier.isInterface(modifiers) ||
!jreCompat.isExported(clazz)) {
// Non-public, abstract, interface or not in an exported
// package (Java 9+) - skip it.
continue;
}
// Protect against scanning the WebSocket API JARs
// 防止扫描WebSocket API jar
if (clazz.getName().startsWith(wsPackage)) {
continue;
}
if (ServerApplicationConfig.class.isAssignableFrom(clazz)) {
// 1、clazz是ServerApplicationConfig子类
serverApplicationConfigs.add(
(ServerApplicationConfig) clazz.getConstructor().newInstance());
}
if (Endpoint.class.isAssignableFrom(clazz)) {
// 2、clazz是Endpoint子类
@SuppressWarnings("unchecked")
Class<? extends Endpoint> endpoint =
(Class<? extends Endpoint>) clazz;
scannedEndpointClazzes.add(endpoint);
}
if (clazz.isAnnotationPresent(ServerEndpoint.class)) {
// 3、clazz是加了注解ServerEndpoint的类
scannedPojoEndpoints.add(clazz);
}
}
} catch (ReflectiveOperationException e) {
throw new ServletException(e);
}
// Filter the results
Set<ServerEndpointConfig> filteredEndpointConfigs = new HashSet<>();
Set<Class<?>> filteredPojoEndpoints = new HashSet<>();
if (serverApplicationConfigs.isEmpty()) {
// 从这里看出@ServerEndpoint的服务器端是可以不用ServerApplicationConfig的
filteredPojoEndpoints.addAll(scannedPojoEndpoints);
} else {
// serverApplicationConfigs不为空,
for (ServerApplicationConfig config : serverApplicationConfigs) {
Set<ServerEndpointConfig> configFilteredEndpoints =
config.getEndpointConfigs(scannedEndpointClazzes);
if (configFilteredEndpoints != null) {
filteredEndpointConfigs.addAll(configFilteredEndpoints);
}
// getAnnotatedEndpointClasses 对于 scannedPojoEndpoints起到一个过滤作用
// 不满足条件的后面不加到WsServerContainer里
Set<Class<?>> configFilteredPojos =
config.getAnnotatedEndpointClasses(
scannedPojoEndpoints);
if (configFilteredPojos != null) {
filteredPojoEndpoints.addAll(configFilteredPojos);
}
}
}
try {
// 继承抽象类Endpoint的需要使用者手动封装成ServerEndpointConfig
// 而加了注解@ServerEndpoint的类 Tomcat会自动封装成ServerEndpointConfig
// Deploy endpoints
for (ServerEndpointConfig config : filteredEndpointConfigs) {
sc.addEndpoint(config);
}
// Deploy POJOs
for (Class<?> clazz : filteredPojoEndpoints) {
sc.addEndpoint(clazz, true);
}
} catch (DeploymentException e) {
throw new ServletException(e);
}
}
static WsServerContainer init(ServletContext servletContext,
boolean initBySciMechanism) {
WsServerContainer sc = new WsServerContainer(servletContext);
servletContext.setAttribute(
Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);
// 注册监听器WsSessionListener给servletContext,
// 在http session销毁时触发 ws session的关闭销毁
servletContext.addListener(new WsSessionListener(sc));
// Can't register the ContextListener again if the ContextListener is
// calling this method
if (initBySciMechanism) {
// 注册监听器WsContextListener给servletContext,
// 在 servletContext初始化时触发WsSci.init
// 在 servletContext销毁时触发WsServerContainer的销毁
// 不过呢,只在WsSci.onStartup时注册一次
servletContext.addListener(new WsContextListener());
}
return sc;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
从上述源码中可以看出ServerApplicationConfig起到一个过滤的作用:
- 当没有
ServerApplicationConfig时,加了@ServerEndpoint的类会默认全部加到一个Set集合(filteredPojoEndpoints),所以加了@ServerEndpoint的类可以不需要自定义实现ServerApplicationConfig。 - 当有
ServerApplicationConfig时,ServerApplicationConfig#getEndpointConfigs用来过滤Endpoint子类,并且Endpoint子类必须封装成一个ServerEndpointConfig。 ServerApplicationConfig#getAnnotatedEndpointClasses用来过滤加了注解@ServerEndpoint的类,一般空实现就行了(如果不想某个类被加到WsServerContainer里,那不加@ServerEndpoint不就可以了)。
过滤之后的Endpoint子类和加了注解@ServerEndpoint的类会分别调用不同形参的WsServerContainer#addEndpoint,将其加到WsServerContainer里。
(2)WsServerContainer#addEndpoint
- 将
Endpoint子类加到WsServerContainer里,调用的是形参为ServerEndpointConfig的addEndpoint:
public void addEndpoint(ServerEndpointConfig sec) throws DeploymentException {
addEndpoint(sec, false);
}
- 1
- 2
- 3
因为Endpoint子类需要使用者封装成ServerEndpointConfig,不需要Tomcat来封装。
- 将加了注解
@ServerEndpoint的类加到WsServerContainer,调用的是形参为Class>的addEndpoint(fromAnnotatedPojo参数暂时在这个方法里没什么用处):
该方法主要职责就是解析@ServerEndpoint,获取path、decoders、encoders、configurator等构建一个ServerEndpointConfig对象
![]()
最终调用的都是如下这个比较复杂的方法,fromAnnotatedPojo表示是否是加了@ServerEndpoint的类。主要做了两件事:
-
对加了
@ServerEndpoint类的生命周期方法(@OnOpen、@OnClose、@OnError、@OnMessage)的扫描和映射封装。 -
对
path的有效性检查和path param解析。
![]()
(3)PojoMethodMapping方法映射和形参解析
PojoMethodMapping构造函数比较长,主要是对加了@OnOpen、@OnClose、@OnError、@OnMessage的方法进行校验和映射,以及对每个方法的形参进行解析和校验,主要逻辑总结如下:
- 对当前类以及其父类中的方法进行扫描。
- 当前类中不能存在多个相同注解的方法,否则会抛出Duplicate annotation异常。
- 父类和子类中存在相同注解的方法,子类必须重写该方法,否则会抛出Duplicate annotation异常。
- 对于
@OnMessage,可以有多个,但是接收消息的类型必须不同,消息类型大概分为三种:PongMessage心跳消息、字节型、字符型。 - 如果扫描到对的注解都是父类的方法,子类重写了该方法,但是没有加响应的注解,则会被清除。
- 形参解析。
public PojoMethodMapping(Class<?> clazzPojo, List<Class<? extends Decoder>> decoderClazzes, String wsPath,
InstanceManager instanceManager) throws DeploymentException {
this.wsPath = wsPath;
List<DecoderEntry> decoders = Util.getDecoders(decoderClazzes, instanceManager);
Method open = null;
Method close = null;
Method error = null;
Method[] clazzPojoMethods = null;
Class<?> currentClazz = clazzPojo;
while (!currentClazz.equals(Object.class)) {
Method[] currentClazzMethods = currentClazz.getDeclaredMethods();
if (currentClazz == clazzPojo) {
clazzPojoMethods = currentClazzMethods;
}
for (Method method : currentClazzMethods) {
if (method.isSynthetic()) {
// Skip all synthetic methods.
// They may have copies of annotations from methods we are
// interested in and they will use the wrong parameter type
// (they always use Object) so we can't used them here.
continue;
}
if (method.getAnnotation(OnOpen.class) != null) {
checkPublic(method);
if (open == null) {
open = method;
} else {
if (currentClazz == clazzPojo ||
!isMethodOverride(open, method)) {
// Duplicate annotation
// 抛出Duplicate annotation异常的两种情况:
// 1. 当前的类有多个相同注解的方法,如有两个@OnOpen
// 2. 当前类时父类,有相同注解的方法,但是其子类没有重写这个方法
// 即 父类和子类有多个相同注解的方法,且没有重写关系
throw new DeploymentException(sm.getString(
"pojoMethodMapping.duplicateAnnotation",
OnOpen.class, currentClazz));
}
}
} else if (method.getAnnotation(OnClose.class) != null) {
checkPublic(method);
if (close == null) {
close = method;
} else {
if (currentClazz == clazzPojo ||
!isMethodOverride(close, method)) {
// Duplicate annotation
throw new DeploymentException(sm.getString(
"pojoMethodMapping.duplicateAnnotation",
OnClose.class, currentClazz));
}
}
} else if (method.getAnnotation(OnError.class) != null) {
checkPublic(method);
if (error == null) {
error = method;
} else {
if (currentClazz == clazzPojo ||
!isMethodOverride(error, method)) {
// Duplicate annotation
throw new DeploymentException(sm.getString(
"pojoMethodMapping.duplicateAnnotation",
OnError.class, currentClazz));
}
}
} else if (method.getAnnotation(OnMessage.class) != null) {
checkPublic(method);
MessageHandlerInfo messageHandler = new MessageHandlerInfo(method, decoders);
boolean found = false;
// 第一次扫描OnMessage时,onMessage为空,不会走下面的for,然后就把messageHandler加到onMessage里
// 如果非首次扫描到这里,即向上扫描父类,允许有多个接收消息类型完全不同的onmessage
for (MessageHandlerInfo otherMessageHandler : onMessage) {
// 如果多个onmessage接收的消息类型有相同的,则可能会抛出Duplicate annotation
// 1. 同一个类中多个onmessage有接收相同类型的消息
// 2. 父子类中多个onmessage有接收相同类型的消息,但不是重写关系
if (messageHandler.targetsSameWebSocketMessageType(otherMessageHandler)) {
found = true;
if (currentClazz == clazzPojo ||
!isMethodOverride(messageHandler.m, otherMessageHandler.m)) {
// Duplicate annotation
throw new DeploymentException(sm.getString(
"pojoMethodMapping.duplicateAnnotation",
OnMessage.class, currentClazz));
}
}
}
if (!found) {
onMessage.add(messageHandler);
}
} else {
// Method not annotated
}
}
currentClazz = currentClazz.getSuperclass();
}
// If the methods are not on clazzPojo and they are overridden
// by a non annotated method in clazzPojo, they should be ignored
if (open != null && open.getDeclaringClass() != clazzPojo) {
// open 有可能是父类的,子类即clazzPojo有重写该方法,但是没有加OnOpen注解
// 则 open置为null
if (isOverridenWithoutAnnotation(clazzPojoMethods, open, OnOpen.class)) {
open = null;
}
}
if (close != null && close.getDeclaringClass() != clazzPojo) {
if (isOverridenWithoutAnnotation(clazzPojoMethods, close, OnClose.class)) {
close = null;
}
}
if (error != null && error.getDeclaringClass() != clazzPojo) {
if (isOverridenWithoutAnnotation(clazzPojoMethods, error, OnError.class)) {
error = null;
}
}
List<MessageHandlerInfo> overriddenOnMessage = new ArrayList<>();
for (MessageHandlerInfo messageHandler : onMessage) {
if (messageHandler.m.getDeclaringClass() != clazzPojo
&& isOverridenWithoutAnnotation(clazzPojoMethods, messageHandler.m, OnMessage.class)) {
overriddenOnMessage.add(messageHandler);
}
}
// 子类重写了的onmessage方法,但没有加OnMessage注解的需要从onMessage list 中删除
for (MessageHandlerInfo messageHandler : overriddenOnMessage) {
onMessage.remove(messageHandler);
}
this.onOpen = open;
this.onClose = close;
this.onError = error;
// 参数解析
onOpenParams = getPathParams(onOpen, MethodType.ON_OPEN);
onCloseParams = getPathParams(onClose, MethodType.ON_CLOSE);
onErrorParams = getPathParams(onError, MethodType.ON_ERROR);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
虽然方法名可以随意,但是形参却有着强制限制:
@onOpen方法,可以有的参数Session、EndpointConfig、@PathParam,不能有其他参数。@onError方法,可以有的参数Session、@PathParam, 必须有Throwable,不能有其他参数。@onClose方法,可以有的参数Session,CloseReason,@PathParam,不能有其他参数。
![]()
2、协议升级(握手)
Tomcat中WebSocket是通过UpgradeToken机制实现的,其具体的升级处理器为WsHttpUpgradeHandler。WebSocket协议升级的过程比较曲折,首先要通过过滤器WsFilter进行升级判断,然后调用org.apache.catalina.connector.Request#upgrade进行UpgradeToken的构建,最后通过org.apache.catalina.connector.Request#coyoteRequest回调函数action将UpgradeToken回传给连接器为后续升级处理做准备。
![]()
(1)WsFilter
WebSocket协议升级的过程比较曲折。带有WebSocket握手的请求会平安经过Tomcat的Connector,被转发到Servlet容器中,在业务处理之前经过过滤器WsFilter判断是否需要升级(WsFilter 在 org.apache.catalina.core.ApplicationFilterChain过滤链中触发):
- 首先判断
WsServerContainer是否有进行Endpoint的扫描和注册以及请头中是否有Upgrade: websocket。 - 获取请求
path即uri在WsServerContainer中找对应的ServerEndpointConfig。 - 调用
UpgradeUtil.doUpgrade进行升级。
![]()
(2)UpgradeUtil#doUpgrade
UpgradeUtil#doUpgrade主要做了如下几件事情:
- 检查
HttpServletRequest的一些请求头的有效性,如Connection: upgrade、Sec-WebSocket-Version:13、Sec-WebSocket-Key等。 - 给
HttpServletResponse设置一些响应头,如Upgrade:websocket、Connection: upgrade、根据Sec-WebSocket-Key的值生成响应头Sec-WebSocket-Accept的值。 - 封装
WsHandshakeRequest和WsHandshakeResponse。 - 调用
HttpServletRequest#upgrade进行升级,并获取WsHttpUpgradeHandler(具体的升级流程处理器)。
// org.apache.tomcat.websocket.server.UpgradeUtil#doUpgrade
public static void doUpgrade(WsServerContainer sc, HttpServletRequest req,
HttpServletResponse resp, ServerEndpointConfig sec,
Map<String,String> pathParams)
throws ServletException, IOException {
// Validate the rest of the headers and reject the request if that
// validation fails
String key;
String subProtocol = null;
// 检查请求头中是否有 Connection: upgrade
if (!headerContainsToken(req, Constants.CONNECTION_HEADER_NAME,
Constants.CONNECTION_HEADER_VALUE)) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
}
// 检查请求头中的 Sec-WebSocket-Version:13
if (!headerContainsToken(req, Constants.WS_VERSION_HEADER_NAME,
Constants.WS_VERSION_HEADER_VALUE)) {
resp.setStatus(426);
resp.setHeader(Constants.WS_VERSION_HEADER_NAME,
Constants.WS_VERSION_HEADER_VALUE);
return;
}
// 获取 Sec-WebSocket-Key
key = req.getHeader(Constants.WS_KEY_HEADER_NAME);
if (key == null) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
}
// Origin check,校验 Origin 是否有权限
String origin = req.getHeader(Constants.ORIGIN_HEADER_NAME);
if (!sec.getConfigurator().checkOrigin(origin)) {
resp.sendError(HttpServletResponse.SC_FORBIDDEN);
return;
}
// Sub-protocols
List<String> subProtocols = getTokensFromHeader(req,
Constants.WS_PROTOCOL_HEADER_NAME);
subProtocol = sec.getConfigurator().getNegotiatedSubprotocol(
sec.getSubprotocols(), subProtocols);
// Extensions
// Should normally only be one header but handle the case of multiple
// headers
List<Extension> extensionsRequested = new ArrayList<>();
Enumeration<String> extHeaders = req.getHeaders(Constants.WS_EXTENSIONS_HEADER_NAME);
while (extHeaders.hasMoreElements()) {
Util.parseExtensionHeader(extensionsRequested, extHeaders.nextElement());
}
// Negotiation phase 1. By default this simply filters out the
// extensions that the server does not support but applications could
// use a custom configurator to do more than this.
List<Extension> installedExtensions = null;
if (sec.getExtensions().size() == 0) {
installedExtensions = Constants.INSTALLED_EXTENSIONS;
} else {
installedExtensions = new ArrayList<>();
installedExtensions.addAll(sec.getExtensions());
installedExtensions.addAll(Constants.INSTALLED_EXTENSIONS);
}
List<Extension> negotiatedExtensionsPhase1 = sec.getConfigurator().getNegotiatedExtensions(
installedExtensions, extensionsRequested);
// Negotiation phase 2. Create the Transformations that will be applied
// to this connection. Note than an extension may be dropped at this
// point if the client has requested a configuration that the server is
// unable to support.
List<Transformation> transformations = createTransformations(negotiatedExtensionsPhase1);
List<Extension> negotiatedExtensionsPhase2;
if (transformations.isEmpty()) {
negotiatedExtensionsPhase2 = Collections.emptyList();
} else {
negotiatedExtensionsPhase2 = new ArrayList<>(transformations.size());
for (Transformation t : transformations) {
negotiatedExtensionsPhase2.add(t.getExtensionResponse());
}
}
// Build the transformation pipeline
Transformation transformation = null;
StringBuilder responseHeaderExtensions = new StringBuilder();
boolean first = true;
for (Transformation t : transformations) {
if (first) {
first = false;
} else {
responseHeaderExtensions.append(',');
}
append(responseHeaderExtensions, t.getExtensionResponse());
if (transformation == null) {
transformation = t;
} else {
transformation.setNext(t);
}
}
// Now we have the full pipeline, validate the use of the RSV bits.
if (transformation != null && !transformation.validateRsvBits(0)) {
throw new ServletException(sm.getString("upgradeUtil.incompatibleRsv"));
}
// 设置resp的响应头Upgrade:websocket、 Connection: upgrade 、Sec-WebSocket-Accept:
// If we got this far, all is good. Accept the connection.
resp.setHeader(Constants.UPGRADE_HEADER_NAME,
Constants.UPGRADE_HEADER_VALUE);
resp.setHeader(Constants.CONNECTION_HEADER_NAME,
Constants.CONNECTION_HEADER_VALUE);
// 通过Sec-WebSocket-Key生成Sec-WebSocket-Accept的值
resp.setHeader(HandshakeResponse.SEC_WEBSOCKET_ACCEPT,
getWebSocketAccept(key));
if (subProtocol != null && subProtocol.length() > 0) {
// RFC6455 4.2.2 explicitly states "" is not valid here
resp.setHeader(Constants.WS_PROTOCOL_HEADER_NAME, subProtocol);
}
if (!transformations.isEmpty()) {
resp.setHeader(Constants.WS_EXTENSIONS_HEADER_NAME, responseHeaderExtensions.toString());
}
WsHandshakeRequest wsRequest = new WsHandshakeRequest(req, pathParams);
WsHandshakeResponse wsResponse = new WsHandshakeResponse();
WsPerSessionServerEndpointConfig perSessionServerEndpointConfig =
new WsPerSessionServerEndpointConfig(sec);
sec.getConfigurator().modifyHandshake(perSessionServerEndpointConfig,
wsRequest, wsResponse);
wsRequest.finished();
// Add any additional headers
for (Entry<String,List<String>> entry :
wsResponse.getHeaders().entrySet()) {
for (String headerValue: entry.getValue()) {
resp.addHeader(entry.getKey(), headerValue);
}
}
// 调用 request.upgrade 进行升级
WsHttpUpgradeHandler wsHandler =
req.upgrade(WsHttpUpgradeHandler.class);
wsHandler.preInit(perSessionServerEndpointConfig, sc, wsRequest,
negotiatedExtensionsPhase2, subProtocol, transformation, pathParams,
req.isSecure());
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
(3)Request#upgrade
Request#upgrade主要做了三件事:
- 实例化
WsHttpUpgradeHandler并构建UpgradeToken。 - 回调
coyoteRequest.action,将UpgradeToken回传给连接器。 - 设置响应码101。
// org.apache.catalina.connector.Request#upgrade
public <T extends HttpUpgradeHandler> T upgrade(
Class<T> httpUpgradeHandlerClass) throws java.io.IOException, ServletException {
T handler;
InstanceManager instanceManager = null;
try {
// Do not go through the instance manager for internal Tomcat classes since they don't
// need injection
if (InternalHttpUpgradeHandler.class.isAssignableFrom(httpUpgradeHandlerClass)) {
handler = httpUpgradeHandlerClass.getConstructor().newInstance();
} else {
instanceManager = getContext().getInstanceManager();
handler = (T) instanceManager.newInstance(httpUpgradeHandlerClass);
}
} catch (ReflectiveOperationException | NamingException | IllegalArgumentException |
SecurityException e) {
throw new ServletException(e);
}
// 构建 UpgradeToken,UpgradeToken主要包含WsHttpUpgradeHandler、context、协议名称protocol
UpgradeToken upgradeToken = new UpgradeToken(handler, getContext(), instanceManager,
getUpgradeProtocolName(httpUpgradeHandlerClass));
// 回调action 进行升级
coyoteRequest.action(ActionCode.UPGRADE, upgradeToken);
// Output required by RFC2616. Protocol specific headers should have
// already been set.
// 设置响应101
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
return handler;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
(4)回调机制ActionHook#action
一些发生在Servlet容器的动作可能需要回传给连接器做处理,比如WebSocket的握手升级,所以连接器就给org.apache.coyote.Request设置了一个动作钩子``ActionHook#action。一些动作表示定义在枚举类ActionCode中,ActionCode.UPGRADE就代表协议升级动作。org.apache.coyote.AbstractProcessor实现了ActionHook接口,ActionCode.UPGRADE动作会调用org.apache.coyote.http11.Http11Processor#doHttpUpgrade,只是简单将upgradeToken设置给Http11Processor`。
![]()
![]()
(5)ConnectionHandler#process
Tomcat连接器是同步调用容器业务处理,容器中的业务处理结束后还是回到连接器继续往下执行。
连接器将请求转发给容器处理是在适配器里完成的,容器中流程处理结束返回到org.apache.catalina.connector.CoyoteAdapter#service,继续往下执行,最终结束并回收HttpServletrequest、HttpServletreponse对象。
![]()
org.apache.catalina.connector.CoyoteAdapter#service是在org.apache.coyote.http11.Http11Processor#service中调用的,
Http11Processor#service是HTTP请求处理主流程,通过upgradeToken != null来判断是否为升级操作,s是则返回SocketState.UPGRADING。
最后来到org.apache.coyote.AbstractProtocol.ConnectionHandler#process一个连接处理的主流程,根据Http11Processor#service返回SocketState.UPGRADING来进行升级操作,如下只截取了和WebSocket协议升级相关流程的代码:
- 获取
UpgradeToken,从中取出HttpUpgradeHandler,对于WebSocket来说是WsHttpUpgradeHandler。 - 调用
WsHttpUpgradeHandler#init启动协议升级处理。
![]()
(6)WsHttpUpgradeHandler#init握手成功
走到这里,基本上就是握手成功了,接下来就是创建WsSession和触发onOpen。
![]()
WsSession的构建中会实例化Endpoint,如果实例化出来的对象不是Endpoint类型,即加了@ServerEndpoint的实例对象,则用一个PojoEndpointServer进行包装,而PojoEndpointServer是继承了抽象类Endpoint的。
触发onOpen时会将WsSession传进去,对于加PojoEndpointServer,因为用户自定义的方法名和形参不确定,所以通过反射调用用户自定义的onopen形式的方法,并且会将通过@onMessage解析出的MessageHandler设置给WsSession。
![]()
3、数据传输和解析
握手成功之后就建立了双向通信的连接,该连接有别于HTTP/1.1长连接(应用服务器中工作线程循环占用),而是占用一条TCP连接。在连接建立是进行TCP三次握手,之后全双工互相通信,将不需要再进行耗时的TCP的三次握手和四次挥手,一方需要关闭WebSocket连接时,发送关闭帧,另一方接收到关闭帧之后,也发送个关闭帧作为响应,之后就认为WebSocket连接关闭了,并且关闭底层TCP连接(四次挥手)。
实则WebSocket全双工是建立在TCP的长链接上的,TCP长链接长时间没有消息通信,会定时保活,一般WebSocket会通过代理如nginx等进行连接通信,nginx有一个连接超时没有任何信息传输时,会断开,所以需要WebSocket一端定时发送心跳保活。
(1)接收客户端消息
客户端来了消息,由连接器的Poller轮询监测socket底层是否有数据到来,有数据可读,则封装成一个SocketProcessor扔到线程池里处理,org.apache.coyote.http11.upgrade.UpgradeProcessorInternal#dispatch具有处理升级协议连接,org.apache.tomcat.websocket.server.WsHttpUpgradeHandler#upgradeDispatch是专门处理WebSocket连接的处理器。
org.apache.tomcat.websocket.server.WsFrameServer是对服务器端消息帧处理的封装,包括读取底层数据,按消息帧格式解析、拼装出有效载荷数据,触发onMessage。
因为源码篇幅较多,只展示具体源码调用流程:
![]()
(2)发送消息给客户端
一般,客户端发送WebSocket握手请求,和服务器端建立连接后,服务器端需要将连接(Endpoint+WsSession)保存起来,为后续主动推送消息给客户端提供方便。
Tomcat提供了可以发送三种数据类型(文本、二进制、Object对象)和两种发送方式(同步、异步)的发送消息的方法。
org.apache.tomcat.websocket.WsRemoteEndpointAsync异步发送。org.apache.tomcat.websocket.WsRemoteEndpointBasic同步发送。
发送消息也同样需要按消息帧格式封装,然后通过socket写到网络里即可。
![]()
六、要点回顾
WebSocket的出现不是空穴来风,起初在HTTP/1.1基础上通过轮询和长连接达到信息实时同步的功能,但是这并没有跳出HTTP/1.1自身的缺陷。HTTP/1.1明显的两个缺陷:消息头冗长且为文本传输,请求响应模式。为此,WebSocket诞生了,跳出HTTP/1.1,建立一个新的真正全双工通信协议。
不仅仅要会在项目中使用WebSocket,还要知道其通信原理和在应用服务器中的实现原理,很多注意事项都是在查阅了官方资源和源码之后恍然大悟的。
- 在Tomcat中使用
WebSocket不可以在Endpoint里获取缓存的HttpServletRequest对象,因为在WebSocket握手之前,HTTP/1.1请求就算结束了(HttpServletRequest对象被回收),建立连接之后就更是独立于HTTP/1.1了。 - 建立连接的
WebSocket,会生成新的Endpoint和WsSession。 - 使用内置Tomcat需要注意,
WsSci做的事情交给了Spring做。 WebSocket全双工是建立在TCP长连接的基础之上。- … …
![]()
七、参考文献
- https://datatracker.ietf.org/doc/html/rfc6455(可能需要翻墙)
- https://www.oracle.com/technical-resources/articles/java/jsr356.html
- https://medium.com/swlh/websockets-with-spring-part-1-http-and-websocket-36c69df1c2ee(可能需要翻墙)
- http://nginx.org/en/docs/http/websocket.html
- https://zh.wikipedia.org/wiki/WebSocket
- 书籍:《Tomcat架构解析》刘光瑞(Tomcat8.5)11.3.4 Tomcat的WebSocket实现
- 书籍:《Tomcat内核设计剖析》汪建(Tomcat7)10.6 WebSocket协议的支持
- 书籍:《图解HTTP》9.3 使用浏览器进行全双工通信的WebSocket
- 极客时间:《深入拆解Tomcat & Jetty》李号双(Tomcat9.x)18.新特性:Tomcat如何支持WebSocket?
- Tomcat注释源码:https://gitee.com/stefanpy/tomcat-source-code-learning
如若文章有错误理解,欢迎批评指正,同时非常期待你的留言和点赞。如果觉得有用,不妨点个在看,让更多人受益。
评论记录:
回复评论: