首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)

  • 25-02-22 03:01
  • 2067
  • 5897
blog.csdn.net

首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。 本篇基于Tomcat10.0.6。建议收藏起来慢慢看。

文章目录

    • 一、前言
    • 二、什么是WebSocket
      • 1、HTTP/1.1的缺陷
      • 2、WebSocket发展历史
        • (1)背景
        • (2)历史
      • 3、WebSocket握手和双向通信
        • (1)定义
        • (2)握手(建立连接)
        • (3)消息帧
        • (4)挥手(关闭连接)
      • 4、WebSocket优点
    • 三、Java API for WebSocket(JSR356)
      • 1、服务端API
        • (1)注解方式@ServerEndpoint
        • (2)继承抽象类Endpoint
      • 2、客户端API
      • 3、上下文Session
      • 4、HandshakeRequest 和 HandshakeResponse
        • (1)HandshakeRequest
        • (2)HandshakeResponse
      • 5、WebSocketContainer
    • 四、WebSocket基于Tomcat应用
      • 1、服务器端实现
        • (1)@ServerEndpoint注解方式
        • (2)继承抽象类Endpoint方式
        • (3)早期Tomcat7中Server端实现对比
      • 2、客户端实现
        • (1)前端js版
        • (2)@ClientEndpoint注解方式
        • (3)继承抽象类Endpoint方式
      • 3、基于Nginx反向代理注意事项
    • 五、WebSocket在Tomcat中的源码实现
      • 1、WsSci初始化
        • (1)WsSci#onStartup
        • (2)WsServerContainer#addEndpoint
        • (3)PojoMethodMapping方法映射和形参解析
      • 2、协议升级(握手)
        • (1)WsFilter
        • (2)UpgradeUtil#doUpgrade
        • (3)Request#upgrade
        • (4)回调机制ActionHook#action
        • (5)ConnectionHandler#process
        • (6)WsHttpUpgradeHandler#init握手成功
      • 3、数据传输和解析
        • (1)接收客户端消息
        • (2)发送消息给客户端
    • 六、要点回顾
    • 七、参考文献

一、前言

WebSocket是一种全双工通信协议,即客户端可以向服务端发送请求,服务端也可以主动向客户端推送数据。这样的特点,使得它在一些实时性要求比较高的场景效果斐然(比如微信朋友圈实时通知、在线协同编辑等)。主流浏览器以及一些常见服务端通信框架(Tomcat、netty、undertow、webLogic等)都对WebSocket进行了技术支持。那么,WebSocket具体是什么?为什么会出现WebSocket?如何做到全双工通信?解决了什么问题?

灿烂而孤独的神

二、什么是WebSocket

1、HTTP/1.1的缺陷

HTTP/1.1最初是为网络中超文本资源(HTML),请求-响应传输而设计的,后来支持了传输更多类型的资源,如图片、视频等,但都没有改变它单向的请求-响应模式。

随着互联网的日益壮大,HTTP/1.1功能使用上已体现捉襟见肘的疲态。虽然可以通过某些方式满足需求(如Ajax、Comet),但是性能上还是局限于HTTP/1.1,那么HTTP/1.1有哪些缺陷呢:

  • 请求-响应模式,只能客户端发送请求给服务端,服务端才可以发送响应数据给客户端。
  • 传输数据为文本格式,且请求/响应头部冗长重复。

(为了区分HTTP/1.1和HTTP/1.2,下面描述中,HTTP均代表HTTP/1.1)

2、WebSocket发展历史

(1)背景

在WebSocket出现之前,主要通过长轮询和HTTP长连接实现实时数据更新,这种方式有个统称叫Comet,Tomcat8.5之前有对Comet基于流的HTTP长连接做支持,后来因为WebSocket的成熟和标准化,以及Comet自身依然是基于HTTP,在性能消耗和瓶颈上无法跳脱HTTP,就把Comet废弃了。

还有一个SPDY技术,也对HTTP进行了改进,多路复用流、服务器推送等,后来演化成HTTP/2.0,因为适用场景和解决的问题不同,暂不对HTTP/2.0做过多解释,不过对于HTTP/2.0和WebSocket在Tomcat实现中都是作为协议升级来处理的。

(Comet和SPDY的原理不是本篇重点,没有展开讲解,感兴趣的同学可自行百度)

(2)历史

在这种背景下,HTML5制定了WebSocket

  • 筹备阶段,WebSocket被划分为HTML5标准的一部分,2008年6月,Michael Carter进行了一系列讨论,最终形成了称为WebSocket的协议。
  • 2009年12月,Google Chrome 4是第一个提供标准支持的浏览器,默认情况下启用了WebSocket。
  • 2010年2月,WebSocket协议的开发从W3C和WHATWG小组转移到IETF(TheInternet Engineering Task Force),并在Ian Hickson的指导下进行了两次修订。
  • 2011年,IETF将WebSocket协议标准化为RFC 6455起,大多数Web浏览器都在实现支持WebSocket协议的客户端API。此外,已经开发了许多实现WebSocket协议的Java库。
  • 2013年,发布JSR356标准,Java API for WebSocket。

(为什么要去了解WebSocket的发展历史和背景呢?个人认为可以更好的理解某个技术实现的演变历程,比如Tomcat,早期有Comet没有WebSocket时,Tomcat就对Comet做了支持,后来有WebSocket了,但是还没出JSR356标准,Tomcat就对Websocket做了支持,自定义API,再后来有了JSR356,Tomcat立马紧跟潮流,废弃自定义的API,实现JSR356那一套,这就使得在Tomcat7使用WebSocket的同学,想升为Tomcat8(其实Tomcat7.0.47之后就是JSR356标准了),发现WebSocket接入方式变了,而且一些细节也变了。)

3、WebSocket握手和双向通信

(1)定义

WebSocket全双工通信协议,在客户端和服务端建立连接后,可以持续双向通信,和HTTP同属于应用层协议,并且都依赖于传输层的TCP/IP协议。

虽然WebSocket有别于HTTP,是一种新协议,但是RFC 6455中规定:

it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries.

  • WebSocket通过HTTP端口80和443进行工作,并支持HTTP代理和中介,从而使其与HTTP协议兼容。
  • 为了实现兼容性,WebSocket握手使用HTTP Upgrade头从HTTP协议更改为WebSocket协议。
  • Websocket使用ws或wss的统一资源标志符(URI),分别对应明文和加密连接。
(2)握手(建立连接)

在双向通信之前,必须通过握手建立连接。Websocket通过 HTTP/1.1 协议的101状态码进行握手,首先客户端(如浏览器)发出带有特殊消息头(Upgrade、Connection)的请求到服务器,服务器判断是否支持升级,支持则返回响应状态码101,表示协议升级成功,对于WebSocket就是握手成功。

客户端请求示例:

GET /test HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: tFGdnEL/5fXMS9yKwBjllg==
Origin: http://example.com
Sec-WebSocket-Protocol: v10.stomp, v11.stomp, v12.stomp
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Version: 13
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • Connection必须设置Upgrade,表示客户端希望连接升级。
  • Upgrade: websocket表明协议升级为websocket。
  • Sec-WebSocket-Key字段内记录着握手过程中必不可少的键值,由客户端(浏览器)生成,可以尽量避免普通HTTP请求被误认为Websocket协议。
  • Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13。
  • Origin字段是必须的。如果缺少origin字段,WebSocket服务器需要回复HTTP 403 状态码(禁止访问),通过Origin可以做安全校验。

服务端响应示例:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HaA6EjhHRejpHyuO0yBnY4J4n3A=
Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15
Sec-WebSocket-Protocol: v12.stomp
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Sec-WebSocket-Accept的字段值是由握手请求中的Sec-WebSocket-Key的字段值生成的。成功握手确立WebSocket连接之后,通信时不再使用HTTP的数据帧,而采用WebSocket独立的数据帧。

WebSocket通信-来自图解HTTP

(3)消息帧

WebSocket使用二进制消息帧作为双向通信的媒介。何为消息帧?发送方将每个应用程序消息拆分为一个或多个帧,通过网络将它们传输到目的地,并重新组装解析出一个完整消息。

有别于HTTP/1.1文本消息格式(冗长的消息头和分隔符等),WebSocket消息帧规定一定的格式,以二进制传输,更加短小精悍。二者相同之处就是都是基于TCP/IP流式协议(没有规定消息边界)。

如下是消息帧的基本结构图:

Base Framing Protocol

  • FIN: 1 bit,表示该帧是否为消息的最后一帧。1-是,0-否。
  • RSV1,RSV2,RSV3: 1 bit each,预留(3位),扩展的预留标志。一般情况为0,除非协商的扩展定义为非零值。如果接收到非零值且不为协商扩展定义,接收端必须使连接失败。
  • Opcode: 4 bits,定义消息帧的操作类型,如果接收到一个未知Opcode,接收端必须使连接失败。(0x0-延续帧,0x1-文本帧,0x2-二进制帧,0x8-关闭帧,0x9-PING帧,0xA-PONG帧(在接收到PING帧时,终端必须发送一个PONG帧响应,除非它已经接收到关闭帧),0x3-0x7保留给未来的非控制帧,0xB-F保留给未来的控制帧)
  • Mask: 1 bit,表示该帧是否为隐藏的,即被加密保护的。1-是,0-否。Mask=1时,必须传一个Masking-key,用于解除隐藏(客户端发送消息给服务器端,Mask必须为1)。
  • Payload length: 7 bits, 7+16 bits, or 7+64 bits,有效载荷数据的长度(扩展数据长度+应用数据长度,扩展数据长度可以为0)。

if 0-125, that is the payload length. If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length. If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length.

  • Masking-key: 0 or 4 bytes,用于解除帧隐藏(加密)的key,Mask=1时不为空,Mask=0时不用传。
  • Payload data: (x+y) bytes,有效载荷数据包括扩展数据(x bytes)和应用数据(y bytes)。有效载荷数据是用户真正要传输的数据。

这样的二进制消息帧设计,与HTTP协议相比,WebSocket协议可以提供约500:1的流量减少和3:1的延迟减少。

(4)挥手(关闭连接)

挥手相对于握手要简单很多,客户端和服务器端任何一方都可以通过发送关闭帧来发起挥手请求。发送关闭帧的一方,之后不再发送任何数据给对方;接收到关闭帧的一方,如果之前没有发送过关闭帧,则必须发送一个关闭帧作为响应。关闭帧中可以携带关闭原因。

在发送和接收一个关闭帧消息之后,就认为WebSocket连接已关闭,且必须关闭底层TCP连接。

除了通过关闭握手来关闭连接外,WebSocket连接也可能在另一方离开或底层TCP连接关闭时突然关闭。

4、WebSocket优点

  • 较少的控制开销。在连接建立后,服务器和客户端之间交换数据时,用于协议控制的数据包头部相对于HTTP请求每次都要携带完整的头部,显著减少。

  • 更强的实时性。由于协议是全双工的,所以服务器可以随时主动给客户端下发数据。相对于HTTP请求需要等待客户端发起请求服务端才能响应,延迟明显更少。

  • 保持连接状态。与HTTP不同的是,Websocket需要先建立连接,这就使得其成为一种有状态的协议,之后通信时可以省略部分状态信息。而HTTP请求可能需要在每个请求都携带状态信息(如身份认证等)。

  • 更好的二进制支持。Websocket定义了二进制帧,相对HTTP,可以更轻松地处理二进制内容。

  • 支持扩展。Websocket定义了扩展,用户可以扩展协议、实现部分自定义的子协议。

  • 更好的压缩效果。相对于HTTP压缩,Websocket在适当的扩展支持下,可以沿用之前内容的上下文,在传递类似的数据时,可以显著提高压缩率。

鬼怪

三、Java API for WebSocket(JSR356)

JSR356在Java EE7时归为Java EE标准的一部分(后来Java EE更名为Jakarta EE,世上再无Java EE,以下统一称Jakarta EE),所有兼容Jakarta EE的应用服务器,都必须遵循JSR356标准的WebSocket协议API。

WebSocket API

根据JSR356规定, 建立WebSocket连接的服务器端和客户端,两端对称,可以互相通信,差异性较小,抽象成API,就是一个个Endpoint(端点),只不过服务器端的叫ServerEndpoint,客户端的叫ClientEndpoint。客户端向服务端发送WebSocket握手请求,建立连接后就创建一个ServerEndpoint对象。(这里的Endpoint和Tomcat连接器里的AbstractEndpoint名称上有点像,但是两个毫不相干的东西,就像周杰伦和周杰的关系。)

ServerEndpoint和ClientEndpoint在API上差异也很小,有相同的生命周期事件(OnOpen、OnClose、OnError、OnMessage),不同之处是ServerEndpoint作为服务器端点,可以指定一个URI路径供客户端连接,ClientEndpoint没有。

1、服务端API

服务器端的Endpoint有两种实现方式,一种是注解方式@ServerEndpoint,一种是继承抽象类Endpoint。

(1)注解方式@ServerEndpoint

首先看看@ServerEndpoint有哪些要素:

  • value,可以指定一个URI路径标识一个Endpoint。
  • subprotocols,用户在WebSocket协议下自定义扩展一些子协议。
  • decoders,用户可以自定义一些消息解码器,比如通信的消息是一个对象,接收到消息可以自动解码封装成消息对象。
  • encoders,有解码器就有编码器,定义解码器和编码器的好处是可以规范使用层消息的传输。
  • configurator,ServerEndpoint配置类,主要提供ServerEndpoint对象的创建方式扩展(如果使用Tomcat的WebSocket实现,默认是反射创建ServerEndpoint对象)。

ServerEndpoint

@ServerEndpoint可以注解到任何类上,但是想实现服务端的完整功能,还需要配合几个生命周期的注解使用,这些生命周期注解只能注解在方法上:

  • @OnOpen 建立连接时触发。
  • @OnClose 关闭连接时触发。
  • @OnError 发生异常时触发。
  • @OnMessage 接收到消息时触发。
(2)继承抽象类Endpoint

继承抽象类Endpoint,重写几个生命周期方法。

抽象类Endpoint

怎么没有onMessage方法,实现onMessage还需要继承实现一个接口jakarta.websocket.MessageHandler,MessageHandler接口又分为Partial和Whole,实现的MessageHandler需要在onOpen触发时注册到jakarta.websocket.Session中。

MessageHandler

继承抽象类Endpoint的方式相对于注解方式要麻烦的多,除了继承Endpoint和实现接口MessageHandler外,还必须实现一个jakarta.websocket.server.ServerApplicationConfig来管理Endpoint,比如给Endpoint分配URI路径。

ServerApplicationConfig

而encoders、decoders、configurator等配置信息由jakarta.websocket.server.ServerEndpointConfig管理,默认实现jakarta.websocket.server.DefaultServerEndpointConfig。

所以如果使用 Java 版WebSocket服务器端实现首推注解方式。

2、客户端API

对于客户端API,也是有注解方式和继承抽象类Endpoint方式。

  • 注解方式,只需要将@ServerEndpoint换成@ClientEndpoint。
  • 继承抽象类Endpoint方式,需要一个jakarta.websocket.ClientEndpointConfig来管理encoders、decoders、configurator等配置信息,默认实现jakarta.websocket.DefaultClientEndpointConfig。

3、上下文Session

WebSocket是一个有状态的连接,建立连接后的通信都是通过jakarta.websocket.Session保持状态,一个连接一个Session,每一个Session有一个唯一标识Id。

Session的主要职责涉及:

  • 基础信息管理(request信息(getRequestURI、getRequestParameterMap、getPathParameters等)、协议版本getProtocolVersion、子协议getNegotiatedSubprotocol等)。
  • 连接管理(状态判断isOpen、接收消息的MessageHandler、发送消息的异步远程端点RemoteEndpoint.Async和同步远程端点RemoteEndpoint.Basic等)。

4、HandshakeRequest 和 HandshakeResponse

HandshakeRequest 和 HandshakeResponse了解即可,这两个接口主要用于WebScoket握手升级过程中握手请求响应的封装,如果只是单纯使用WebSocket,不会接触到这两个接口。

(1)HandshakeRequest

HandshakeRequest

(2)HandshakeResponse

HandshakeResponse

Sec-WebSocket-Accept根据客户端传的Sec-WebSocket-Key生成,如下是Tomcat10.0.6 WebSocket源码实现中生成Sec-WebSocket-Accept的算法:

private static String getWebSocketAccept(String key) {
    byte[] digest = ConcurrentMessageDigest.digestSHA1(
            key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);
    return Base64.encodeBase64String(digest);
}
  • 1
  • 2
  • 3
  • 4
  • 5

5、WebSocketContainer

jakarta.websocket.WebSocketContainer顾名思义,就是WebSocket的容器,集大成者。其主要职责包括但不限于connectToServer,客户端连接服务器端,基于浏览器的WebSocket客户端连接服务器端,由浏览器支持,但是基于Java版的WebSocket客户端就可以通过WebSocketContainer#connectToServer向服务端发起连接请求。

WebSocketContainer局部
鬼怪

四、WebSocket基于Tomcat应用

(如下使用的是javax.websocket包,未使用最新的jakarta.websocket,主要是测试项目基于SpringBoot+Tomcat9.x的,Java API for WebSocket版本需要保持一致。)

1、服务器端实现

(1)@ServerEndpoint注解方式
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

@ServerEndpoint(value = "/ws/test/{userId}", encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}, configurator = MyServerConfigurator.class)
public class WebSocketServerEndpoint {
    private Session session;
    private String userId;
    @OnOpen
    public void OnOpen(Session session, @PathParam(value = "userId") String userId) {
        this.session = session;
        this.userId = userId;
        // 建立连接后,将连接存到一个map里
        endpointMap.put(userId, this);
        Message message = new Message(0, "connected, hello " + userId);
        sendMsg(message);
    }

    @OnClose
    public void OnClose() {
        // 关闭连接时触发,从map中删除连接
        endpointMap.remove(userId);
        System.out.println("server closed...");
    }

    @OnMessage
    public void onMessage(Message message) {
        System.out.println("server recive message=" + message.toString());
    }

    @OnError
    public void onError(Throwable t) throws Throwable {
        this.session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "系统异常"));
        t.printStackTrace();
    }
    
    /**
     * 群发
     * @param data
     */
    public void sendAllMsg(Message data) {
        for (WebSocketServerEndpoint value : endpointMap.values()) {
            value.sendMsgAsync(data);
        }
    }

    /**
     * 推送消息给指定 userId
     * @param data
     * @param userId
     */
    public void sendMsg(Message data, String userId) {
        WebSocketServerEndpoint endpoint = endpointMap.get(userId);
        if (endpoint == null) {
            System.out.println("not conected to " + userId);
            return;
        }
        endpoint.sendMsgAsync(data);
    }

    private void sendMsg(Message data) {
        try {
            this.session.getBasicRemote().sendObject(data);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } catch (EncodeException e) {
            e.printStackTrace();
        }
    }

    private void sendMsgAsync(Message data) {
        this.session.getAsyncRemote().sendObject(data);
    }
    // 存储建立连接的Endpoint
    private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

每一个客户端与服务器端建立连接后,都会生成一个WebSocketServerEndpoint,可以通过一个Map将其与userId对应存起来,为后续群发广播和单独推送消息给某个客户端提供便利。

注意:@ServerEndpoint的encoders、decoders、configurator等配置信息在实际使用中可以不定义,如果项目简单,完全可以用默认的。

如果通信消息被封装成一个对象,如示例的Message(因为源码过于简单就不展示了,属性主要有code、msg、data),就必须提供编码器和解码器。也可以在每次发送消息时硬编码转为字符串,在接收到消息时转为Message。有了编码器和解码器,显得比较规范,转为字符串由编码器做,字符串转为对象由解码器做,但也使得架构变复杂了,视项目需求而定。

MessageEncoder

MessageDecoder

Configurator的用处就是自定义Endpoint对象创建方式,默认Tomcat提供的是通过反射。WebScoket是每个连接都会创建一个Endpoint对象,如果连接比较多,很频繁,通过反射创建,用后即毁,可能不是一个好主意,所以可以搞一个对象池,用过回收,用时先从对象池中拿,有就重置,省去实例化分配内存等消耗过程。

MyServerConfigurator

如果使用SpringBoot内置Tomcat、undertow、Netty等,接入WebSocket时除了加@ServerEndpoint还需要加一个@Component,再给Spring注册一个ServerEndpointExporter类,这样,服务端Endpoint就交由Spring去扫描注册了。

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        ServerEndpointExporter serverEndpointExporter = new ServerEndpointExporter();
        return serverEndpointExporter;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

外置Tomcat就不需要这么麻烦,Tomcat会默认扫描classpath下带有@ServerEndpoint注解的类。(SpringBoot接入Websocket后续会单独出文章讲解,也挺有意思的)

(2)继承抽象类Endpoint方式
import javax.websocket.*;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

public class WebSocketServerEndpoint extends Endpoint {
    private Session session;
    private String userId;

    @Override
    public void onOpen(Session session, EndpointConfig endpointConfig) {
        this.session = session;
        this.userId = session.getPathParameters().get("userId");
        session.addMessageHandler(new MessageHandler());
        endpointMap.put(userId, this);
        Message message = new Message(0, "connected, hello " + userId);
        sendMsg(message);
    }

    @Override
    public void onClose(Session session, CloseReason closeReason) {
        endpointMap.remove(userId);
    }

    @Override
    public void onError(Session session, Throwable throwable) {
        throwable.printStackTrace();
    }
    
    /**
     * 群发
     * @param data
     */
    public void sendAllMsg(Message data) {
        for (WebSocketServerEndpoint value : endpointMap.values()) {
            value.sendMsgAsync(data);
        }
    }

    /**
     * 推送消息给指定 userId
     * @param data
     * @param userId
     */
    public void sendMsg(Message data, String userId) {
        WebSocketServerEndpoint endpoint = endpointMap.get(userId);
        if (endpoint == null) {
            System.out.println("not conected to " + userId);
            return;
        }
        endpoint.sendMsgAsync(data);
    }

    private void sendMsg(Message data) {
        try {
            this.session.getBasicRemote().sendObject(data);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } catch (EncodeException e) {
            e.printStackTrace();
        }
    }

    private void sendMsgAsync(Message data) {
        this.session.getAsyncRemote().sendObject(data);
    }

    private class MessageHandler implements javax.websocket.MessageHandler.Whole {

        @Override
        public void onMessage(Message message) {
            System.out.println("server recive message=" + message.toString());
        }
    }

    private static ConcurrentHashMap endpointMap = new ConcurrentHashMap();

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

继承抽象类Endpoint方式比加注解@ServerEndpoint方式麻烦的很,主要是需要自己实现MessageHandler和ServerApplicationConfig。@ServerEndpoint的话都是使用默认的,原理上差不多,只是注解更自动化,更简洁。

MessageHandler做的事情,一个@OnMessage就搞定了,ServerApplicationConfig做的URI映射、decoders、encoders,configurator等,一个@ServerEndpoint就可以了。

import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.Endpoint;
import javax.websocket.server.ServerApplicationConfig;
import javax.websocket.server.ServerEndpointConfig;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class MyServerApplicationConfig implements ServerApplicationConfig {
    @Override
    public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {
        Set<ServerEndpointConfig> result = new HashSet<ServerEndpointConfig>();
        List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();
        decoderList.add(MessageDecoder.class);
        List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();
        encoderList.add(MessageEncoder.class);

        if (set.contains(WebSocketServerEndpoint3.class)) {
            ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder
                    .create(WebSocketServerEndpoint3.class, "/ws/test3")
                    .decoders(decoderList)
                    .encoders(encoderList)
                    .configurator(new MyServerConfigurator())
                    .build();
            result.add(serverEndpointConfig);
        }
        return result;
    }

    @Override
    public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {
        return set;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

如果使用SpringBoot内置Tomcat,则不需要ServerApplicationConfig了,但是需要给Spring注册一个ServerEndpointConfig。

@Bean
public ServerEndpointConfig serverEndpointConfig() {
    List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();
    decoderList.add(MessageDecoder.class);
    List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();
    encoderList.add(MessageEncoder.class);
    ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder
            .create(WebSocketServerEndpoint3.class, "/ws/test3/{userId}")
            .decoders(decoderList)
            .encoders(encoderList)
            .configurator(new MyServerConfigurator())
            .build();
    return serverEndpointConfig;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
(3)早期Tomcat7中Server端实现对比

Tomcat7早期版本7.0.47之前还没有出JSR 356时,自己搞了一套接口,其实就是一个Servlet。

和遵循JSR356标准的版本对比,有一个比较大的变化是,createWebSocketInbound创建生命周期事件处理器StreamInbound的时机是WebSocket协议升级之前,此时还可以通过用户线程缓存(ThreadLocal等)的HttpServletRequest对象,获取一些请求头等信息。

而遵循JSR356标准的版本实现,创建生命周期事件处理的Endpoint是在WebSocket协议升级完成(经过HTTP握手)之后创建的,而WebSocket握手成功给客户端响应101前,会结束销毁HttpServletRequest对象,此时是获取不到请求头等信息的。

import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;

import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServletRequest;

@WebServlet(urlPatterns = "/ws/test")
public class MyWeSocketServlet extends WebSocketServlet {

    @Override
    protected StreamInbound createWebSocketInbound(String subProtocol, HttpServletRequest request) {
        MyMessageInbound messageInbound = new MyMessageInbound(subProtocol, request);
        return messageInbound;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.WsOutbound;

import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;

public class MyMessageInbound extends MessageInbound {

    private String subProtocol;
    private HttpServletRequest request;

    public MyMessageInbound(String subProtocol, HttpServletRequest request) {
        this.subProtocol = subProtocol;
        this.request = request;
    }

    @Override
    protected void onOpen(WsOutbound outbound) {
        String msg = "connected, hello";
        ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
        try {
            outbound.writeBinaryMessage(byteBuffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void onClose(int status) {
    }


    @Override
    protected void onBinaryMessage(ByteBuffer byteBuffer) throws IOException {
        // 接收到客户端信息
    }

    @Override
    protected void onTextMessage(CharBuffer charBuffer) throws IOException {
        // 接收到客户端信息
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

2、客户端实现

(1)前端js版

js版的客户端主要依托浏览器对WebScoket的支持,在生命周期事件触发上和服务器端的差不多,这也应证了建立WebSocket连接的两端是对等的。

编写WebSocket客户端需要注意以下几点:

  • 和服务器端商议好传输的消息的格式,一般为json字符串,比较直观,编码解码都很简单,也可以是其他商定的格式。
  • 需要心跳检测,定时给服务器端发送消息,保持连接正常。
  • 正常关闭连接,即关闭浏览器窗口前主动关闭连接,以免服务器端抛异常。
  • 如果因为异常断开连接,支持重连。
// 对websocket进行简单封装
WebSocketOption.prototype = {
    // 创建websocket操作
    createWebSocket: function () {
        try {
            if('WebSocket' in window) {
                this.ws = new WebSocket(this.wsUrl);
            } else if('MozWebSocket' in window) {  
                this.ws = new MozWebSocket(this.wsUrl);
            } else {
                alert("您的浏览器不支持websocket协议,建议使用新版谷歌、火狐等浏览器,请勿使用IE10以下浏览器,360浏览器请使用极速模式,不要使用兼容模式!"); 
            }
            this.lifeEventHandle();
        } catch(e) {
            this.reconnect(this.wsUrl);
            console.log(e);
        } 
    },
    // 生命周期事件操作
    lifeEventHandle: function() {
        var self = this;
        this.ws.onopen = function (event) {
            self.connectCount = 1;
            //心跳检测重置
            if (self.heartCheck == null) {
                self.heartCheck = new HeartCheckObj(self.ws);
            }
            self.sendMsg(5, "")
            self.heartCheck.reset().start();   
            console.log("websocket连接成功!" + new Date().toUTCString());
        };
        this.ws.onclose = function (event) {
            // 全部设置为初始值
            self.heartCheck = null;
            self.reconnect(self.wsUrl);  
            console.log("websocket连接关闭!" + new Date().toUTCString());
        };
        this.ws.onerror = function () {
            self.reconnect(self.wsUrl);
            console.log("websocket连接错误!");
        };
        //如果获取到消息,心跳检测重置
        this.ws.onmessage = function (event) {  
            //心跳检测重置
            if (self.heartCheck == null) {
                self.heartCheck = new HeartCheckObj(self.ws);
            }
            self.heartCheck.reset().start();      
            console.log("websocket收到消息啦:" + event.data);
            // 业务处理
            // 接收到的消息可以放到localStorage里,然后在其他地方取出来
        }
    },
    // 断线重连操作
    reconnect: function() {
        var self = this;
        if (this.lockReconnect) return;
        console.log(this.lockReconnect)
        this.lockReconnect = true;
        //没连接上会一直重连,设置延迟避免请求过多,重连时间设置按倍数增加
        setTimeout(function () {     
            self.createWebSocket(self.wsUrl);
            self.lockReconnect = false;
            self.connectCount++;
        }, 10000 * (self.connectCount));
    },
    // 发送消息操作
    sendMsg: function(cmd, data) {
        var sendData = {"cmd": cmd, "msg": data};
        try {
            this.ws.send(JSON.stringify(sendData));
        } catch(err) {
            console.log("发送数据失败, err=" + err)
        }
    },
    // 关闭websocket接口操作
    closeWs: function() {
        this.ws.close();
    } 
}

/**
 * 封装心跳检测对象

*/ function HeartCheckObj(ws) { this.ws = ws; // 心跳时间 this.timeout = 10000; // 定时事件 this.timeoutObj = null; // 自动断开事件 this.serverTimeoutObj = null; } HeartCheckObj.prototype = { setWs: function(ws) { this.ws = ws; }, reset: function() { clearTimeout(this.timeoutObj); clearTimeout(this.serverTimeoutObj); return this; }, // 开始心跳检测 start: function() { var self = this; this.timeoutObj = setTimeout(function() { //这里发送一个心跳,后端收到后,返回一个心跳消息, //onmessage拿到返回的心跳就说明连接正常 var ping = {"cmd":1, "msg": "ping"}; self.ws.send(JSON.stringify(ping)); //如果onmessage那里超过一定时间还没重置,说明后端主动断开了 self.serverTimeoutObj = setTimeout(function() { //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次 self.ws.close(); }, self.timeout) }, self.timeout) } } /** * ------------------------- * 创建websocket的主流程 * * ------------------------- */ var currentDomain = document.domain; var wsUrl = "ws://" + currentDomain + "/test" var webSocketOption = new WebSocketOption(wsUrl) webSocketOption.createWebSocket() // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function() { webSocketOption.closeWs(); }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

这里推荐一个在线测试WebSocket连接和发送消息的网站easyswoole.com/wstool.html:

wstool

真的很牛逼,很方便,很简单。还有源码github:https://github.com/easy-swoole/wstool,感兴趣可以看看。

(2)@ClientEndpoint注解方式

Java版客户端不用多说,把@ServerEndpoint换成@ClientEndpoint就可以了,其他都一样。@ClientEndpoint比@ServerEndpoint就少了一个value,不需要设置URI。

@ClientEndpoint(encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class})
public class WebSocketClientEndpoint {
    private Session session;
    @OnOpen
    public void OnOpen(Session session) {
        this.session = session;
        Message message = new Message(0, "connecting...");

        sendMsg(message);
    }

    @OnClose
    public void OnClose() {
        Message message = new Message(0, "client closed...");

        sendMsg(message);
        System.out.println("client closed");
    }

    @OnMessage
    public void onMessage(Message message) {
        System.out.println("client recive message=" + message.toString());
    }

    @OnError
    public void onError(Throwable t) throws Throwable {
        t.printStackTrace();
    }

    public void sendMsg(Message data) {
        try {
            this.session.getBasicRemote().sendObject(data);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } catch (EncodeException e) {
            e.printStackTrace();
        }
    }

    public void sendMsgAsync(Message data) {
        this.session.getAsyncRemote().sendObject(data);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

连接服务器端:

WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(WebSocketClientEndpoint.class,
        new URI("ws://localhost:8080/ws/test"));
  • 1
  • 2
  • 3
(3)继承抽象类Endpoint方式

继承抽象类Endpoint方式也和服务器端的差不多,但是不需要实现ServerApplicationConfig,需要实例化一个ClientEndpointConfig。Endpoint实现类和服务器端的一样,就省略了,如下是连接服务器端的代码:

ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build();
container.connectToServer(new WebSocketClientEndpoint(),clientEndpointConfig,
        new URI("ws://localhost:8080/websocket/hello"));
  • 1
  • 2
  • 3

3、基于Nginx反向代理注意事项

一般web服务器会用Nginx做反向代理,经过Nginx反向转发的HTTP请求不会带上Upgrade和Connection消息头,所以需要在Nginx配置里显式指定需要升级为WebSocket的URI带上这两个头:

location /chat/ {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    
    proxy_connect_timeout 4s; 
    proxy_read_timeout 7200s; 
    proxy_send_timeout 12s; 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

默认情况下,如果代理服务器在60秒内没有传输任何数据,连接将被关闭。这个超时可以通过proxy_read_timeout指令来增加。或者,可以将代理服务器配置为定期发送WebSocket PING帧以重置超时并检查连接是否仍然活跃。

具体可参考:http://nginx.org/en/docs/http/websocket.html

鬼怪

五、WebSocket在Tomcat中的源码实现

所有兼容Java EE的应用服务器,必须遵循JSR356 WebSocket Java API标准,Tomcat也不例外。而且Tomcat也是支持WebSocket最早的Web应用服务器框架(之一),在还没有出JSR356标准时,就已经自定义了一套WebSocket API,但是JSR356一出,不得不改弦更张。

通过前面的讲解,在使用上完全没有问题,但是有几个问题完全是黑盒的:

  • Server Endpoint 是如何被扫描加载的?
  • WebSocket是如何借助HTTP 进行握手升级的?
  • WebSocket建立连接后如何保持连接不断,互相通信的?

(如下源码解析,需要对Tomcat连接器源码有一定了解)

1、WsSci初始化

Tomcat 提供了一个org.apache.tomcat.websocket.server.WsSci类来初始化、加载WebSocket。从类名上顾名思义,利用了Sci加载机制,何为Sci加载机制?就是实现接口 jakarta.servlet.ServletContainerInitializer,在Tomcat部署装载Web项目(org.apache.catalina.core.StandardContext#startInternal)时主动触发ServletContainerInitializer#onStartup,做一些扩展的初始化操作。

WsSci主要做了一件事,就是扫描加载Server Endpoint,并将其加到WebSocket容器里jakarta.websocket.WebSocketContainer。

WsSci主要会扫描三种类:

  • 加了@ServerEndpoint的类。
  • Endpoint的子类。
  • ServerApplicationConfig的子类。
(1)WsSci#onStartup
@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class,
        Endpoint.class})
public class WsSci implements ServletContainerInitializer {

    @Override
    public void onStartup(Set<Class<?>> clazzes, ServletContext ctx)
            throws ServletException {

        WsServerContainer sc = init(ctx, true);

        if (clazzes == null || clazzes.size() == 0) {
            return;
        }

        // Group the discovered classes by type
        Set<ServerApplicationConfig> serverApplicationConfigs = new HashSet<>();
        Set<Class<? extends Endpoint>> scannedEndpointClazzes = new HashSet<>();
        Set<Class<?>> scannedPojoEndpoints = new HashSet<>();

        try {
            // wsPackage is "jakarta.websocket."
            String wsPackage = ContainerProvider.class.getName();
            wsPackage = wsPackage.substring(0, wsPackage.lastIndexOf('.') + 1);
            for (Class<?> clazz : clazzes) {
                JreCompat jreCompat = JreCompat.getInstance();
                int modifiers = clazz.getModifiers();
                if (!Modifier.isPublic(modifiers) ||
                        Modifier.isAbstract(modifiers) ||
                        Modifier.isInterface(modifiers) ||
                        !jreCompat.isExported(clazz)) {
                    // Non-public, abstract, interface or not in an exported
                    // package (Java 9+) - skip it.
                    continue;
                }
                // Protect against scanning the WebSocket API JARs
                // 防止扫描WebSocket API jar
                if (clazz.getName().startsWith(wsPackage)) {
                    continue;
                }
                if (ServerApplicationConfig.class.isAssignableFrom(clazz)) {
                    // 1、clazz是ServerApplicationConfig子类
                    serverApplicationConfigs.add(
                            (ServerApplicationConfig) clazz.getConstructor().newInstance());
                }
                if (Endpoint.class.isAssignableFrom(clazz)) {
                    // 2、clazz是Endpoint子类
                    @SuppressWarnings("unchecked")
                    Class<? extends Endpoint> endpoint =
                            (Class<? extends Endpoint>) clazz;
                    scannedEndpointClazzes.add(endpoint);
                }
                if (clazz.isAnnotationPresent(ServerEndpoint.class)) {
                    // 3、clazz是加了注解ServerEndpoint的类
                    scannedPojoEndpoints.add(clazz);
                }
            }
        } catch (ReflectiveOperationException e) {
            throw new ServletException(e);
        }

        // Filter the results
        Set<ServerEndpointConfig> filteredEndpointConfigs = new HashSet<>();
        Set<Class<?>> filteredPojoEndpoints = new HashSet<>();

        if (serverApplicationConfigs.isEmpty()) {
            // 从这里看出@ServerEndpoint的服务器端是可以不用ServerApplicationConfig的
            filteredPojoEndpoints.addAll(scannedPojoEndpoints);
        } else {
            // serverApplicationConfigs不为空,
            for (ServerApplicationConfig config : serverApplicationConfigs) {
                Set<ServerEndpointConfig> configFilteredEndpoints =
                        config.getEndpointConfigs(scannedEndpointClazzes);
                if (configFilteredEndpoints != null) {
                    filteredEndpointConfigs.addAll(configFilteredEndpoints);
                }
                // getAnnotatedEndpointClasses 对于 scannedPojoEndpoints起到一个过滤作用
                // 不满足条件的后面不加到WsServerContainer里
                Set<Class<?>> configFilteredPojos =
                        config.getAnnotatedEndpointClasses(
                                scannedPojoEndpoints);
                if (configFilteredPojos != null) {
                    filteredPojoEndpoints.addAll(configFilteredPojos);
                }
            }
        }

        try {
            // 继承抽象类Endpoint的需要使用者手动封装成ServerEndpointConfig
            // 而加了注解@ServerEndpoint的类 Tomcat会自动封装成ServerEndpointConfig
            // Deploy endpoints
            for (ServerEndpointConfig config : filteredEndpointConfigs) {
                sc.addEndpoint(config);
            }
            // Deploy POJOs
            for (Class<?> clazz : filteredPojoEndpoints) {
                sc.addEndpoint(clazz, true);
            }
        } catch (DeploymentException e) {
            throw new ServletException(e);
        }
    }


    static WsServerContainer init(ServletContext servletContext,
            boolean initBySciMechanism) {

        WsServerContainer sc = new WsServerContainer(servletContext);

        servletContext.setAttribute(
                Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);
        // 注册监听器WsSessionListener给servletContext,
        // 在http session销毁时触发 ws session的关闭销毁
        servletContext.addListener(new WsSessionListener(sc));
        // Can't register the ContextListener again if the ContextListener is
        // calling this method
        if (initBySciMechanism) {
            // 注册监听器WsContextListener给servletContext,
            // 在 servletContext初始化时触发WsSci.init
            // 在 servletContext销毁时触发WsServerContainer的销毁
            // 不过呢,只在WsSci.onStartup时注册一次
            servletContext.addListener(new WsContextListener());
        }
        return sc;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125

从上述源码中可以看出ServerApplicationConfig起到一个过滤的作用:

  • 当没有ServerApplicationConfig时,加了@ServerEndpoint的类会默认全部加到一个Set集合(filteredPojoEndpoints),所以加了@ServerEndpoint的类可以不需要自定义实现ServerApplicationConfig。
  • 当有ServerApplicationConfig时,ServerApplicationConfig#getEndpointConfigs用来过滤Endpoint子类,并且Endpoint子类必须封装成一个ServerEndpointConfig。
  • ServerApplicationConfig#getAnnotatedEndpointClasses用来过滤加了注解@ServerEndpoint的类,一般空实现就行了(如果不想某个类被加到WsServerContainer里,那不加@ServerEndpoint不就可以了)。

过滤之后的Endpoint子类和加了注解@ServerEndpoint的类会分别调用不同形参的WsServerContainer#addEndpoint,将其加到WsServerContainer里。

(2)WsServerContainer#addEndpoint
  • 将Endpoint子类加到WsServerContainer里,调用的是形参为ServerEndpointConfig的addEndpoint:
public void addEndpoint(ServerEndpointConfig sec) throws DeploymentException {
    addEndpoint(sec, false);
}
  • 1
  • 2
  • 3

因为Endpoint子类需要使用者封装成ServerEndpointConfig,不需要Tomcat来封装。

  • 将加了注解@ServerEndpoint的类加到WsServerContainer,调用的是形参为Class的addEndpoint(fromAnnotatedPojo参数暂时在这个方法里没什么用处):

该方法主要职责就是解析@ServerEndpoint,获取path、decoders、encoders、configurator等构建一个ServerEndpointConfig对象

@ServerEndpoint-andEndpoint

最终调用的都是如下这个比较复杂的方法,fromAnnotatedPojo表示是否是加了@ServerEndpoint的类。主要做了两件事:

  • 对加了@ServerEndpoint类的生命周期方法(@OnOpen、@OnClose、@OnError、@OnMessage)的扫描和映射封装。

  • 对path的有效性检查和path param解析。

addEndpoint

(3)PojoMethodMapping方法映射和形参解析

PojoMethodMapping构造函数比较长,主要是对加了@OnOpen、@OnClose、@OnError、@OnMessage的方法进行校验和映射,以及对每个方法的形参进行解析和校验,主要逻辑总结如下:

  • 对当前类以及其父类中的方法进行扫描。
  • 当前类中不能存在多个相同注解的方法,否则会抛出Duplicate annotation异常。
  • 父类和子类中存在相同注解的方法,子类必须重写该方法,否则会抛出Duplicate annotation异常。
  • 对于@OnMessage,可以有多个,但是接收消息的类型必须不同,消息类型大概分为三种:PongMessage心跳消息、字节型、字符型。
  • 如果扫描到对的注解都是父类的方法,子类重写了该方法,但是没有加响应的注解,则会被清除。
  • 形参解析。
public PojoMethodMapping(Class<?> clazzPojo, List<Class<? extends Decoder>> decoderClazzes, String wsPath,
        InstanceManager instanceManager) throws DeploymentException {

    this.wsPath = wsPath;

    List<DecoderEntry> decoders = Util.getDecoders(decoderClazzes, instanceManager);
    Method open = null;
    Method close = null;
    Method error = null;
    Method[] clazzPojoMethods = null;
    Class<?> currentClazz = clazzPojo;
    while (!currentClazz.equals(Object.class)) {
        Method[] currentClazzMethods = currentClazz.getDeclaredMethods();
        if (currentClazz == clazzPojo) {
            clazzPojoMethods = currentClazzMethods;
        }
        for (Method method : currentClazzMethods) {
            if (method.isSynthetic()) {
                // Skip all synthetic methods.
                // They may have copies of annotations from methods we are
                // interested in and they will use the wrong parameter type
                // (they always use Object) so we can't used them here.
                continue;
            }
            if (method.getAnnotation(OnOpen.class) != null) {
                checkPublic(method);
                if (open == null) {
                    open = method;
                } else {
                    if (currentClazz == clazzPojo ||
                            !isMethodOverride(open, method)) {
                        // Duplicate annotation
                        // 抛出Duplicate annotation异常的两种情况:
                        // 1. 当前的类有多个相同注解的方法,如有两个@OnOpen
                        // 2. 当前类时父类,有相同注解的方法,但是其子类没有重写这个方法
                        // 即 父类和子类有多个相同注解的方法,且没有重写关系
                        throw new DeploymentException(sm.getString(
                                "pojoMethodMapping.duplicateAnnotation",
                                OnOpen.class, currentClazz));
                    }
                }
            } else if (method.getAnnotation(OnClose.class) != null) {
                checkPublic(method);
                if (close == null) {
                    close = method;
                } else {
                    if (currentClazz == clazzPojo ||
                            !isMethodOverride(close, method)) {
                        // Duplicate annotation
                        throw new DeploymentException(sm.getString(
                                "pojoMethodMapping.duplicateAnnotation",
                                OnClose.class, currentClazz));
                    }
                }
            } else if (method.getAnnotation(OnError.class) != null) {
                checkPublic(method);
                if (error == null) {
                    error = method;
                } else {
                    if (currentClazz == clazzPojo ||
                            !isMethodOverride(error, method)) {
                        // Duplicate annotation
                        throw new DeploymentException(sm.getString(
                                "pojoMethodMapping.duplicateAnnotation",
                                OnError.class, currentClazz));
                    }
                }
            } else if (method.getAnnotation(OnMessage.class) != null) {
                checkPublic(method);
                MessageHandlerInfo messageHandler = new MessageHandlerInfo(method, decoders);
                boolean found = false;
                // 第一次扫描OnMessage时,onMessage为空,不会走下面的for,然后就把messageHandler加到onMessage里
                // 如果非首次扫描到这里,即向上扫描父类,允许有多个接收消息类型完全不同的onmessage
                for (MessageHandlerInfo otherMessageHandler : onMessage) {
                    // 如果多个onmessage接收的消息类型有相同的,则可能会抛出Duplicate annotation
                    // 1. 同一个类中多个onmessage有接收相同类型的消息
                    // 2. 父子类中多个onmessage有接收相同类型的消息,但不是重写关系
                    if (messageHandler.targetsSameWebSocketMessageType(otherMessageHandler)) {
                        found = true;
                        if (currentClazz == clazzPojo ||
                            !isMethodOverride(messageHandler.m, otherMessageHandler.m)) {
                            // Duplicate annotation
                            throw new DeploymentException(sm.getString(
                                    "pojoMethodMapping.duplicateAnnotation",
                                    OnMessage.class, currentClazz));
                        }
                    }
                }
                if (!found) {
                    onMessage.add(messageHandler);
                }
            } else {
                // Method not annotated
            }
        }
        currentClazz = currentClazz.getSuperclass();
    }
    // If the methods are not on clazzPojo and they are overridden
    // by a non annotated method in clazzPojo, they should be ignored
    if (open != null && open.getDeclaringClass() != clazzPojo) {
        // open 有可能是父类的,子类即clazzPojo有重写该方法,但是没有加OnOpen注解
        // 则 open置为null
        if (isOverridenWithoutAnnotation(clazzPojoMethods, open, OnOpen.class)) {
            open = null;
        }
    }
    if (close != null && close.getDeclaringClass() != clazzPojo) {
        if (isOverridenWithoutAnnotation(clazzPojoMethods, close, OnClose.class)) {
            close = null;
        }
    }
    if (error != null && error.getDeclaringClass() != clazzPojo) {
        if (isOverridenWithoutAnnotation(clazzPojoMethods, error, OnError.class)) {
            error = null;
        }
    }
    List<MessageHandlerInfo> overriddenOnMessage = new ArrayList<>();
    for (MessageHandlerInfo messageHandler : onMessage) {
        if (messageHandler.m.getDeclaringClass() != clazzPojo
                && isOverridenWithoutAnnotation(clazzPojoMethods, messageHandler.m, OnMessage.class)) {
            overriddenOnMessage.add(messageHandler);
        }
    }
    // 子类重写了的onmessage方法,但没有加OnMessage注解的需要从onMessage list 中删除
    for (MessageHandlerInfo messageHandler : overriddenOnMessage) {
        onMessage.remove(messageHandler);
    }
    this.onOpen = open;
    this.onClose = close;
    this.onError = error;
    // 参数解析
    onOpenParams = getPathParams(onOpen, MethodType.ON_OPEN);
    onCloseParams = getPathParams(onClose, MethodType.ON_CLOSE);
    onErrorParams = getPathParams(onError, MethodType.ON_ERROR);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135

虽然方法名可以随意,但是形参却有着强制限制:

  • @onOpen方法,可以有的参数Session、EndpointConfig、@PathParam,不能有其他参数。
  • @onError方法,可以有的参数Session、@PathParam, 必须有Throwable,不能有其他参数。
  • @onClose方法,可以有的参数Session, CloseReason, @PathParam,不能有其他参数。

getPathParams

2、协议升级(握手)

Tomcat中WebSocket是通过UpgradeToken机制实现的,其具体的升级处理器为WsHttpUpgradeHandler。WebSocket协议升级的过程比较曲折,首先要通过过滤器WsFilter进行升级判断,然后调用org.apache.catalina.connector.Request#upgrade进行UpgradeToken的构建,最后通过org.apache.catalina.connector.Request#coyoteRequest回调函数action将UpgradeToken回传给连接器为后续升级处理做准备。

WebSocket升级过程

(1)WsFilter

WebSocket协议升级的过程比较曲折。带有WebSocket握手的请求会平安经过Tomcat的Connector,被转发到Servlet容器中,在业务处理之前经过过滤器WsFilter判断是否需要升级(WsFilter 在 org.apache.catalina.core.ApplicationFilterChain过滤链中触发):

  • 首先判断WsServerContainer是否有进行Endpoint的扫描和注册以及请头中是否有Upgrade: websocket。
  • 获取请求path即uri在WsServerContainer中找对应的ServerEndpointConfig。
  • 调用UpgradeUtil.doUpgrade进行升级。

WsFilter

(2)UpgradeUtil#doUpgrade

UpgradeUtil#doUpgrade主要做了如下几件事情:

  • 检查HttpServletRequest的一些请求头的有效性,如Connection: upgrade、Sec-WebSocket-Version:13、Sec-WebSocket-Key等。
  • 给HttpServletResponse设置一些响应头,如Upgrade:websocket、Connection: upgrade、根据Sec-WebSocket-Key的值生成响应头Sec-WebSocket-Accept的值。
  • 封装WsHandshakeRequest和WsHandshakeResponse。
  • 调用HttpServletRequest#upgrade进行升级,并获取WsHttpUpgradeHandler(具体的升级流程处理器)。
// org.apache.tomcat.websocket.server.UpgradeUtil#doUpgrade
public static void doUpgrade(WsServerContainer sc, HttpServletRequest req,
        HttpServletResponse resp, ServerEndpointConfig sec,
        Map<String,String> pathParams)
        throws ServletException, IOException {

    // Validate the rest of the headers and reject the request if that
    // validation fails
    String key;
    String subProtocol = null;
    // 检查请求头中是否有 Connection: upgrade
    if (!headerContainsToken(req, Constants.CONNECTION_HEADER_NAME,
            Constants.CONNECTION_HEADER_VALUE)) {
        resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
        return;
    }
    // 检查请求头中的 Sec-WebSocket-Version:13
    if (!headerContainsToken(req, Constants.WS_VERSION_HEADER_NAME,
            Constants.WS_VERSION_HEADER_VALUE)) {
        resp.setStatus(426);
        resp.setHeader(Constants.WS_VERSION_HEADER_NAME,
                Constants.WS_VERSION_HEADER_VALUE);
        return;
    }
    // 获取 Sec-WebSocket-Key
    key = req.getHeader(Constants.WS_KEY_HEADER_NAME);
    if (key == null) {
        resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
        return;
    }


    // Origin check,校验 Origin 是否有权限
    String origin = req.getHeader(Constants.ORIGIN_HEADER_NAME);
    if (!sec.getConfigurator().checkOrigin(origin)) {
        resp.sendError(HttpServletResponse.SC_FORBIDDEN);
        return;
    }
    // Sub-protocols
    List<String> subProtocols = getTokensFromHeader(req,
            Constants.WS_PROTOCOL_HEADER_NAME);
    subProtocol = sec.getConfigurator().getNegotiatedSubprotocol(
            sec.getSubprotocols(), subProtocols);

    // Extensions
    // Should normally only be one header but handle the case of multiple
    // headers
    List<Extension> extensionsRequested = new ArrayList<>();
    Enumeration<String> extHeaders = req.getHeaders(Constants.WS_EXTENSIONS_HEADER_NAME);
    while (extHeaders.hasMoreElements()) {
        Util.parseExtensionHeader(extensionsRequested, extHeaders.nextElement());
    }
    // Negotiation phase 1. By default this simply filters out the
    // extensions that the server does not support but applications could
    // use a custom configurator to do more than this.
    List<Extension> installedExtensions = null;
    if (sec.getExtensions().size() == 0) {
        installedExtensions = Constants.INSTALLED_EXTENSIONS;
    } else {
        installedExtensions = new ArrayList<>();
        installedExtensions.addAll(sec.getExtensions());
        installedExtensions.addAll(Constants.INSTALLED_EXTENSIONS);
    }
    List<Extension> negotiatedExtensionsPhase1 = sec.getConfigurator().getNegotiatedExtensions(
            installedExtensions, extensionsRequested);

    // Negotiation phase 2. Create the Transformations that will be applied
    // to this connection. Note than an extension may be dropped at this
    // point if the client has requested a configuration that the server is
    // unable to support.
    List<Transformation> transformations = createTransformations(negotiatedExtensionsPhase1);

    List<Extension> negotiatedExtensionsPhase2;
    if (transformations.isEmpty()) {
        negotiatedExtensionsPhase2 = Collections.emptyList();
    } else {
        negotiatedExtensionsPhase2 = new ArrayList<>(transformations.size());
        for (Transformation t : transformations) {
            negotiatedExtensionsPhase2.add(t.getExtensionResponse());
        }
    }

    // Build the transformation pipeline
    Transformation transformation = null;
    StringBuilder responseHeaderExtensions = new StringBuilder();
    boolean first = true;
    for (Transformation t : transformations) {
        if (first) {
            first = false;
        } else {
            responseHeaderExtensions.append(',');
        }
        append(responseHeaderExtensions, t.getExtensionResponse());
        if (transformation == null) {
            transformation = t;
        } else {
            transformation.setNext(t);
        }
    }

    // Now we have the full pipeline, validate the use of the RSV bits.
    if (transformation != null && !transformation.validateRsvBits(0)) {
        throw new ServletException(sm.getString("upgradeUtil.incompatibleRsv"));
    }
    // 设置resp的响应头Upgrade:websocket、 Connection: upgrade 、Sec-WebSocket-Accept:
    // If we got this far, all is good. Accept the connection.
    resp.setHeader(Constants.UPGRADE_HEADER_NAME,
            Constants.UPGRADE_HEADER_VALUE);
    resp.setHeader(Constants.CONNECTION_HEADER_NAME,
            Constants.CONNECTION_HEADER_VALUE);
    // 通过Sec-WebSocket-Key生成Sec-WebSocket-Accept的值
    resp.setHeader(HandshakeResponse.SEC_WEBSOCKET_ACCEPT,
            getWebSocketAccept(key));
    if (subProtocol != null && subProtocol.length() > 0) {
        // RFC6455 4.2.2 explicitly states "" is not valid here
        resp.setHeader(Constants.WS_PROTOCOL_HEADER_NAME, subProtocol);
    }
    if (!transformations.isEmpty()) {
        resp.setHeader(Constants.WS_EXTENSIONS_HEADER_NAME, responseHeaderExtensions.toString());
    }

    WsHandshakeRequest wsRequest = new WsHandshakeRequest(req, pathParams);
    WsHandshakeResponse wsResponse = new WsHandshakeResponse();
    WsPerSessionServerEndpointConfig perSessionServerEndpointConfig =
            new WsPerSessionServerEndpointConfig(sec);
    sec.getConfigurator().modifyHandshake(perSessionServerEndpointConfig,
            wsRequest, wsResponse);
    wsRequest.finished();

    // Add any additional headers
    for (Entry<String,List<String>> entry :
            wsResponse.getHeaders().entrySet()) {
        for (String headerValue: entry.getValue()) {
            resp.addHeader(entry.getKey(), headerValue);
        }
    }
    // 调用 request.upgrade 进行升级
    WsHttpUpgradeHandler wsHandler =
            req.upgrade(WsHttpUpgradeHandler.class);
    wsHandler.preInit(perSessionServerEndpointConfig, sc, wsRequest,
            negotiatedExtensionsPhase2, subProtocol, transformation, pathParams,
            req.isSecure());

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
(3)Request#upgrade

Request#upgrade主要做了三件事:

  • 实例化WsHttpUpgradeHandler并构建UpgradeToken。
  • 回调coyoteRequest.action,将UpgradeToken回传给连接器。
  • 设置响应码101。
// org.apache.catalina.connector.Request#upgrade
public <T extends HttpUpgradeHandler> T upgrade(
        Class<T> httpUpgradeHandlerClass) throws java.io.IOException, ServletException {
    T handler;
    InstanceManager instanceManager = null;
    try {
        // Do not go through the instance manager for internal Tomcat classes since they don't
        // need injection
        if (InternalHttpUpgradeHandler.class.isAssignableFrom(httpUpgradeHandlerClass)) {
            handler = httpUpgradeHandlerClass.getConstructor().newInstance();
        } else {
            instanceManager = getContext().getInstanceManager();
            handler = (T) instanceManager.newInstance(httpUpgradeHandlerClass);
        }
    } catch (ReflectiveOperationException | NamingException | IllegalArgumentException |
            SecurityException e) {
        throw new ServletException(e);
    }
    // 构建 UpgradeToken,UpgradeToken主要包含WsHttpUpgradeHandler、context、协议名称protocol
    UpgradeToken upgradeToken = new UpgradeToken(handler, getContext(), instanceManager,
            getUpgradeProtocolName(httpUpgradeHandlerClass));
    // 回调action 进行升级
    coyoteRequest.action(ActionCode.UPGRADE, upgradeToken);

    // Output required by RFC2616. Protocol specific headers should have
    // already been set.
    // 设置响应101
    response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);

    return handler;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
(4)回调机制ActionHook#action

一些发生在Servlet容器的动作可能需要回传给连接器做处理,比如WebSocket的握手升级,所以连接器就给org.apache.coyote.Request设置了一个动作钩子``ActionHook#action。一些动作表示定义在枚举类ActionCode中,ActionCode.UPGRADE就代表协议升级动作。org.apache.coyote.AbstractProcessor实现了ActionHook接口,ActionCode.UPGRADE动作会调用org.apache.coyote.http11.Http11Processor#doHttpUpgrade,只是简单将upgradeToken设置给Http11Processor`。

action

doHttpUpgrade

(5)ConnectionHandler#process

Tomcat连接器是同步调用容器业务处理,容器中的业务处理结束后还是回到连接器继续往下执行。

连接器将请求转发给容器处理是在适配器里完成的,容器中流程处理结束返回到org.apache.catalina.connector.CoyoteAdapter#service,继续往下执行,最终结束并回收HttpServletrequest、HttpServletreponse对象。

CoyoteAdapter.service

org.apache.catalina.connector.CoyoteAdapter#service是在org.apache.coyote.http11.Http11Processor#service中调用的,

Http11Processor#service是HTTP请求处理主流程,通过upgradeToken != null来判断是否为升级操作,s是则返回SocketState.UPGRADING。

最后来到org.apache.coyote.AbstractProtocol.ConnectionHandler#process一个连接处理的主流程,根据Http11Processor#service返回SocketState.UPGRADING来进行升级操作,如下只截取了和WebSocket协议升级相关流程的代码:

  • 获取UpgradeToken,从中取出HttpUpgradeHandler,对于WebSocket来说是WsHttpUpgradeHandler。
  • 调用WsHttpUpgradeHandler#init启动协议升级处理。

process

(6)WsHttpUpgradeHandler#init握手成功

走到这里,基本上就是握手成功了,接下来就是创建WsSession和触发onOpen。

init

WsSession的构建中会实例化Endpoint,如果实例化出来的对象不是Endpoint类型,即加了@ServerEndpoint的实例对象,则用一个PojoEndpointServer进行包装,而PojoEndpointServer是继承了抽象类Endpoint的。

触发onOpen时会将WsSession传进去,对于加PojoEndpointServer,因为用户自定义的方法名和形参不确定,所以通过反射调用用户自定义的onopen形式的方法,并且会将通过@onMessage解析出的MessageHandler设置给WsSession。

doOnOpen

3、数据传输和解析

握手成功之后就建立了双向通信的连接,该连接有别于HTTP/1.1长连接(应用服务器中工作线程循环占用),而是占用一条TCP连接。在连接建立是进行TCP三次握手,之后全双工互相通信,将不需要再进行耗时的TCP的三次握手和四次挥手,一方需要关闭WebSocket连接时,发送关闭帧,另一方接收到关闭帧之后,也发送个关闭帧作为响应,之后就认为WebSocket连接关闭了,并且关闭底层TCP连接(四次挥手)。

实则WebSocket全双工是建立在TCP的长链接上的,TCP长链接长时间没有消息通信,会定时保活,一般WebSocket会通过代理如nginx等进行连接通信,nginx有一个连接超时没有任何信息传输时,会断开,所以需要WebSocket一端定时发送心跳保活。

(1)接收客户端消息

客户端来了消息,由连接器的Poller轮询监测socket底层是否有数据到来,有数据可读,则封装成一个SocketProcessor扔到线程池里处理,org.apache.coyote.http11.upgrade.UpgradeProcessorInternal#dispatch具有处理升级协议连接,org.apache.tomcat.websocket.server.WsHttpUpgradeHandler#upgradeDispatch是专门处理WebSocket连接的处理器。

org.apache.tomcat.websocket.server.WsFrameServer是对服务器端消息帧处理的封装,包括读取底层数据,按消息帧格式解析、拼装出有效载荷数据,触发onMessage。

因为源码篇幅较多,只展示具体源码调用流程:

wensocket接收消息

(2)发送消息给客户端

一般,客户端发送WebSocket握手请求,和服务器端建立连接后,服务器端需要将连接(Endpoint+WsSession)保存起来,为后续主动推送消息给客户端提供方便。

Tomcat提供了可以发送三种数据类型(文本、二进制、Object对象)和两种发送方式(同步、异步)的发送消息的方法。

  • org.apache.tomcat.websocket.WsRemoteEndpointAsync异步发送。
  • org.apache.tomcat.websocket.WsRemoteEndpointBasic 同步发送。

发送消息也同样需要按消息帧格式封装,然后通过socket写到网络里即可。

鬼怪

六、要点回顾

WebSocket的出现不是空穴来风,起初在HTTP/1.1基础上通过轮询和长连接达到信息实时同步的功能,但是这并没有跳出HTTP/1.1自身的缺陷。HTTP/1.1明显的两个缺陷:消息头冗长且为文本传输,请求响应模式。为此,WebSocket诞生了,跳出HTTP/1.1,建立一个新的真正全双工通信协议。

不仅仅要会在项目中使用WebSocket,还要知道其通信原理和在应用服务器中的实现原理,很多注意事项都是在查阅了官方资源和源码之后恍然大悟的。

  • 在Tomcat中使用WebSocket不可以在Endpoint里获取缓存的HttpServletRequest对象,因为在WebSocket握手之前,HTTP/1.1请求就算结束了(HttpServletRequest对象被回收),建立连接之后就更是独立于HTTP/1.1了。
  • 建立连接的WebSocket,会生成新的Endpoint和WsSession。
  • 使用内置Tomcat需要注意,WsSci做的事情交给了Spring做。
  • WebSocket全双工是建立在TCP长连接的基础之上。
  • … …

鬼怪

七、参考文献

  1. https://datatracker.ietf.org/doc/html/rfc6455(可能需要翻墙)
  2. https://www.oracle.com/technical-resources/articles/java/jsr356.html
  3. https://medium.com/swlh/websockets-with-spring-part-1-http-and-websocket-36c69df1c2ee(可能需要翻墙)
  4. http://nginx.org/en/docs/http/websocket.html
  5. https://zh.wikipedia.org/wiki/WebSocket
  6. 书籍:《Tomcat架构解析》刘光瑞(Tomcat8.5)11.3.4 Tomcat的WebSocket实现
  7. 书籍:《Tomcat内核设计剖析》汪建(Tomcat7)10.6 WebSocket协议的支持
  8. 书籍:《图解HTTP》9.3 使用浏览器进行全双工通信的WebSocket
  9. 极客时间:《深入拆解Tomcat & Jetty》李号双(Tomcat9.x)18.新特性:Tomcat如何支持WebSocket?
  10. Tomcat注释源码:https://gitee.com/stefanpy/tomcat-source-code-learning

如若文章有错误理解,欢迎批评指正,同时非常期待你的留言和点赞。如果觉得有用,不妨点个在看,让更多人受益。

文章知识点与官方知识档案匹配,可进一步学习相关知识
网络技能树首页概览45702 人正在系统学习中
徐同学呀
微信公众号
专注于Java底层架构开发和源码分析领域
注:本文转载自blog.csdn.net的徐同学呀的文章"https://stefan.blog.csdn.net/article/details/120025498"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top