首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Tomcat源码解析(六):Connector、ProtocolHandler、Endpoint

  • 25-03-03 07:42
  • 3393
  • 12577
blog.csdn.net

Tomcat源码系列文章

Tomcat源码解析(一):Tomcat整体架构

Tomcat源码解析(二):Bootstrap和Catalina

Tomcat源码解析(三):LifeCycle生命周期管理

Tomcat源码解析(四):StandardServer和StandardService

Tomcat源码解析(五):StandardEngine、StandardHost、StandardContext、StandardWrapper

Tomcat源码解析(六):Connector、ProtocolHandler、Endpoint


文章目录

  • 前言
  • 一、Connector
    • 1、解析server.xml
    • 2、解析\标签
    • 3、Connector实例化
    • 4、生命周期方法
  • 二、ProtocolHandler
    • 1、Http11NioProtocol实例化
    • 2、生命周期方法
  • 三、Endpoint
    • 1、init初始化
    • 2、start启动
      • 2.1、创建线程池ThreadPoolExecutor
      • 2.2、初始化LimitLatch对象
      • 2.3、接收器Acceptor线程
      • 2.4、Poller
    • 3、stop停止和destroy销毁
  • 总结


前言

  前文中我们介绍了容器Engine、Host、Context、Wrapper的启动,代表整个tomcat容器启动就算完成了。接下来介绍下连接器,处理Socket连接,负责网络字节流与Request和Response对象的转化。
  Tomcat设计了3个组件。Endpoint(网络通信)负责提供字节流给Processor;Processor(应用层协议解析)负责提供Tomcat Request对象给Adapter;Adapter(请求响应转化)负责提供ServletRequest对象给容器。将网络通信和应用层协议解析放在一起考虑封装到ProtocolHandler中。
在这里插入图片描述


一、Connector

Connector的类图如下

  • Connector间接继承了Lifecycle接口,因此也具有了生命周期方法

在这里插入图片描述

1、解析server.xml

  • 主要作用是接受连接请求,创建Request和Response对象用于和请求端交换数据
  • 然后分配线程让Engine来处理这个请求,并把产生的Request和Response对象传给Engine



<Connector port="8080" protocol="HTTP/1.1" 
    	connectionTimeout="20000" redirectPort="8443" />

<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2、解析标签

  • 标签内容用来实例化连接器Connector
  • 通过Service的addConnector方法将Connector对象设置到Service的connectors数组中
# Catalina#createStartDigester方法

/** 解析标签实例化Connector对象 **/
digester.addRule("Server/Service/Connector",
                 new ConnectorCreateRule());
                 
/** 设置属性 **/   
digester.addRule("Server/Service/Connector",
                 new SetAllPropertiesRule(new String[]{"executor", "sslImplementationName"}));

/** 通过Service的`addConnector`方法将Connector对象设置到Service的`connectors数组`中 **/
digester.addSetNext("Server/Service/Connector",
                    "addConnector",
                    "org.apache.catalina.connector.Connector");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3、Connector实例化

  • protocol是解析server.xml中Connector标签的protocol请求协议属性
  • tomcat默认两个Connector标签,两个不同的请求协议,那么会生成两个Connector对象
  • Tomcat为了实现支持多种I/O模型和应用层协议,一个容器可能对接多个连接器
  • 我们平常默认使用的就是非阻塞NIO和HTTP/1.1协议,对应ProtocolHandler的ClassName为org.apache.coyote.http11.Http11NioProtocol
// Connector类有参构造方法

// 协议处理程序类名。默认为HTTP1.1协议处理程序
protected String protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";

public Connector(String protocol) {
	// 根据协议设置ProtocolHandler的ClassName
    setProtocol(protocol);
    // 实例化ProtocolHandler
    ProtocolHandler p = null;
    try {
        Class<?> clazz = Class.forName(protocolHandlerClassName);
        p = (ProtocolHandler) clazz.getConstructor().newInstance();
    } catch (Exception e) {
        log.error(sm.getString(
                "coyoteConnector.protocolHandlerInstantiationFailed"), e);
    } finally {
        this.protocolHandler = p;
    }
    ...
}

@Deprecated
public void setProtocol(String protocol) {

    boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
            AprLifecycleListener.getUseAprConnector();

    if ("HTTP/1.1".equals(protocol) || protocol == null) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
        }
    } else if ("AJP/1.3".equals(protocol)) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
        }
    } else {
        setProtocolHandlerClassName(protocol);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

4、生命周期方法

initInternal初始化方法

  • 创建一个CoyoteAdapter,该类负责将网络请求从连接器传递到容器中
  • 调用protocolHandler的init方法
protected void initInternal() throws LifecycleException {
	// 调用父类LifecycleMBeanBase的initInternal方法,jmx内容
    super.initInternal();
    adapter = new CoyoteAdapter(this);
    // 将CoyoteAdapter添加到protocolHandler对象
    protocolHandler.setAdapter(adapter);
    // 其余代码
    try {
        protocolHandler.init();
    } catch (Exception e) {
		...
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

startInternal启动方法

  • 主要内容就是修改生命周期状态和调用protocolHandler的start方法
@Override
protected void startInternal() throws LifecycleException {

    // 校验端口不能小于0
    if (getPort() < 0) {
        throw new LifecycleException(sm.getString(
                "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
    }

    setState(LifecycleState.STARTING);

    try {
        protocolHandler.start();
    } catch (Exception e) {
		...
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

二、ProtocolHandler

Http11NioProtocol的类图如下

  • ProtocolHandler的生命周期方法由子类AbstractProtocol与AbstractHttp11Protocol实现

在这里插入图片描述

1、Http11NioProtocol实例化

  • 在Connector的构造方法中,用反射创建了一个Http11NioProtocol对象
  • Http11NioProtocol构造方法里,第一步创建了NioEndpoint对象
  • Endpoint是具体的Socket接收和发送处理器,用来实现TCP/IP协议的
public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {
    public Http11NioProtocol() {
        super(new NioEndpoint());
    }
	...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

抽象父类AbstractHttp11JsseProtocol、AbstractHttp11Protocol

  • Http11NioProtocol构造调用父类AbstractHttp11JsseProtocol构造
  • AbstractHttp11JsseProtocol调用父类AbstractHttp11Protocol构造
public abstract class AbstractHttp11JsseProtocol<S>extends AbstractHttp11Protocol<S> {
    public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S> endpoint) {
        super(endpoint);
    }
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 超时时间,默认60s,socket的soTimeout属性,serverSocket.accept();阻塞监听端口,设置的阻塞超时时间,超时抛出异常
  • ConnectionHandler连接处理器:定义处理socket事件方法;将它设置到NioEndpoint对象中
public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
	...
    public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
        super(endpoint);
        // 超时时间
        // int DEFAULT_CONNECTION_TIMEOUT = 60000;
        setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        // 连接处理器
        ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
        setHandler(cHandler);
        getEndpoint().setHandler(cHandler);
    }
    ...
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、生命周期方法

init初始化方法

  • 主要内容调用父类AbstractProtocol的init方法
public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
    @Override
    public void init() throws Exception {
        // 升级协议设置(websocket或者HTTP2)
        for (UpgradeProtocol upgradeProtocol : upgradeProtocols) {
            configureUpgradeProtocol(upgradeProtocol);
        }

        super.init();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 主要内容调用endpoint也就是上面创建的NioEndpoint的init方法
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
    public void init() throws Exception {
        ...
        // endpointName名称http-nio-8080
        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length() - 1));
        endpoint.setDomain(domain);
        endpoint.init();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

start初始化方法

  • 主要内容调用endpoint也就是上面创建的NioEndpoint的start方法
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
    // 超时线程
    private AsyncTimeout asyncTimeout = null;
    
    public void start() throws Exception {
        ...
        
        endpoint.start();
        
        // 启动异步线程处理一些超时的请求
        asyncTimeout = new AsyncTimeout();
        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
        int priority = endpoint.getThreadPriority();
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            priority = Thread.NORM_PRIORITY;
        }
        timeoutThread.setPriority(priority);
        timeoutThread.setDaemon(true);
        timeoutThread.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • stop和destroy方法也是AbstractProtocol抽象类中重写,主要调用NioEndpoint的对应方法

三、Endpoint

Tomcat的NioEndpoint包含LimitLatch、Acceptor、Poller、和Executor共4个重要组件

  • LimitLatch连接控制器,它负责维护连接数的计算
    • nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求
  • Acceptor负责接收连接,默认是1个线程单独执行
    • 内部通过while循环一直运行,调用accept方法来接受新连接
    • accpet方法返回一个Channel对象,接着把Channel对象交给Poller去处理
  • Poller本质也就是Selector,单独线程执行
    • 内部维护一个channel数组,通过while循环里不断检测Channel的数据就绪状态
    • 一旦有Channel可读,就生成一个SocketProcessor任务对象扔给Executor去处理
  • Executor就是线程池,负责运行SocketProcessor任务类
    • 线程池默认核心线程数为10,最大线程数为200
    • 这个线程池维护的线程就是我们非常熟悉的“http-nio-8080-exec-N”线程,也就是用户请求的实际处理线程

在这里插入图片描述

NioEndpoint的类图如下

  • 抽象类AbstractEndpoint定义了生命周期方法的公共部分,具体实现留给子类NioEndpoint,典型的模板方法设计模式

在这里插入图片描述

1、init初始化

// AbstractEndpoint类方法
public void init() throws Exception {
    if (bindOnInit) {
    	// 创建socket绑定端口,具体实现由NioEndpoint完成
        bind();
        bindState = BindState.BOUND_ON_INIT;
    }
	// 省略jmx内容
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 这块内容都是nio的东西,我当初看到这里看不懂,然后写了一篇NIO详解
  • 获取TCP读写网络中的数据的通道ServerSocketChannel serverSock
  • 绑定服务器ip地址和端口,请求传入连接队列的最大长度(这里默认100)
    • 当请求队列满,而又有Socket对象发出连接请求时,此连接会被拒绝,客户端抛出ConnectException
  • 设置为阻塞模式,那么serverSock会阻塞在accept()方法等待客户端连接
// NioEndpoint类方法

// 接收器线程数量
protected int acceptorThreadCount = 1;
// 轮训器线程数量
private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());

// 服务端socket
private volatile ServerSocketChannel serverSock = null;

// 线程安全、无阻塞选择器池
private NioSelectorPool selectorPool = new NioSelectorPool();

@Override
public void bind() throws Exception {

    if (!getUseInheritedChannel()) {
    	// 获取TCP读写网络中的数据的通道
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
        // 绑定服务器ip地址和端口,请求传入连接队列的最大长度(这里默认100)
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
		...
    }
    // 设置为阻塞模式,那么serverSock会阻塞在accept()方法等待客户端连接
    serverSock.configureBlocking(true); //mimic APR behavior

    // 初始化接受器、轮询器的线程计数默认值
    if (acceptorThreadCount == 0) {
        acceptorThreadCount = 1;
    }
    if (pollerThreadCount <= 0) {
        // 最少一个轮询器线程
        pollerThreadCount = 1;
    }
    setStopLatch(new CountDownLatch(pollerThreadCount));

    // https相关内容,略过
    initialiseSsl();

	// 获取选择器
    selectorPool.open();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

2、start启动

// AbstractEndpoint类方法
public final void start() throws Exception {
    if (bindState == BindState.UNBOUND) {
    	// 如果没有绑定ip端口就调用上面NioEndpoint的bind方法
        bind();
        bindState = BindState.BOUND_ON_START;
    }
    // 调用子类NioEndpoint的方法
    startInternal();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • SynchronizedStack主要属性Object[] stack,存放对象数组,通过push推入数据,pop推出数据
  • 创建线程池ThreadPoolExecutor
  • 初始化LimitLatch,后续会限制请求连接数量,这里默认是10000
  • 创建多个Poller(本质就是select)线程,并启动
  • 创建接收器Acceptor线程,并启动
// NioEndpoint类方法

// endpoint状态,默认false
protected volatile boolean running = false;

@Override
public void startInternal() throws Exception {

    if (!running) {
    	// Acceptor通过while(running)一直循环接受连接
    	// 默认false,这里修改为true,就可以一直循环,下面会说到
        running = true;
        paused = false;
		
		// SynchronizedStack内的Object[] stack,存放对象数组
        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());

        // 创建线程池
        if (getExecutor() == null) {
            createExecutor();
        }
		
		// 初始化LimitLatch,限制请求连接数量,这里默认是10000
        initializeConnectionLatch();

        // 创建多个Poller(本质就是select)线程,并启动
        pollers = new Poller[getPollerThreadCount()];
        for (int i = 0; i < pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }
		
		// 创建接收器线程,并启动
        startAcceptorThreads();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

2.1、创建线程池ThreadPoolExecutor

  • 默认核心线程数10,最大线程数200
// AbstractEndpoint类方法
public void createExecutor() {
    internalExecutor = true;
    TaskQueue taskqueue = new TaskQueue();
    TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
    taskqueue.setParent( (ThreadPoolExecutor) executor);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

tomcat和jdk线程池区别

类名一样但是不同类:

// tomcat中的ThreadPoolExecutor
org.apache.tomcat.util.threads.ThreadPoolExecutor
 
// jdk中的ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor
  • 1
  • 2
  • 3
  • 4
  • 5

jdk 线程池策略:

  • 当线程池中线程数量小于corePoolSize,每来一个任务,就会创建一个线程执行这个任务
  • 当前线程池线程数量大于等于corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列中
    • 若是添加成功,则该任务会等待线程将其取出去执行
    • 若添加失败(一般来说任务缓存队列已满),则会尝试创建新的线程执行
  • 当前线程池线程数量等于maximumPoolSize,则会采取任务拒绝策略进行处理

tomcat 线程池策略:

  • 当前线程数小于corePoolSize,则去创建工作线程
  • 当前线程数大于corePoolSize,但小于maximumPoolSize,则去创建工作线程
  • 当前线程数大于maximumPoolSize,则将任务放入到阻塞队列中,当阻塞队列满了之后,则调用拒绝策略丢弃任务
    • 队列默认大小Integer.MAX_VALUE

2.2、初始化LimitLatch对象

  • 初始化了connectionLimitLatch属性,这个属性是用来限制tomcat的最大连接数的,可以看到这里默认大小是10000
// AbstractEndpoint类方法
protected LimitLatch initializeConnectionLatch() {
    if (maxConnections==-1) return null;
    if (connectionLimitLatch==null) {
        connectionLimitLatch = new LimitLatch(getMaxConnections());
    }
    return connectionLimitLatch;
}

// 默认最大连接10000
private int maxConnections = 10000;
public int  getMaxConnections() { 
	return this.maxConnections; 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.3、接收器Acceptor线程

  • Acceptor类实现了runnable接口,startAcceptorThreads方法启动线程,并将该线程设置为守护线程
// AbstractEndpoint类方法
protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createAcceptor();
        String threadName = getName() + "-Acceptor-" + i;
        acceptors[i].setThreadName(threadName);
        Thread t = new Thread(acceptors[i], threadName);
        t.setPriority(getAcceptorThreadPriority());
        // 守护线程
        t.setDaemon(getDaemon());
        t.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • while循环接受连接获取到SocketChannel,这里accept()是阻塞的
  • 如果请求连接已达最大连接数,会将此线程插入队列并挂起直到有资源被释放
  • 设置socket然后交给Poller
// NioEndpoint类的内部类
protected class Acceptor extends AbstractEndpoint.Acceptor {

    @Override
    public void run() {

        int errorDelay = 0;

        // 默认false,上面startInternal赋值true
        // 一直循环,直到我们收到关机命令
        while (running) {

            // 服务stop,paused为置为true,就会进入以下循环
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;

            try {
                // 如果我们已达到最大连接数,当前线程插入队列被挂起
                countUpOrAwaitConnection();

                SocketChannel socket = null;
                try {
                    // 接受连接,因为serverSock设置为阻塞模式,所以没有连接接入的话,accept方法是阻塞的
                    socket = serverSock.accept();
                } catch (IOException ioe) {
					...
                }

                // 配置套接字
                if (running && !paused) {
                    // 配置成功则将它交给Poller,否则关闭socket
                    if (!setSocketOptions(socket)) {
                        closeSocket(socket);
                    }
                } else {
                    closeSocket(socket);
                }
            } catch (Throwable t) {
				...
            }
        }
        state = AcceptorState.ENDED;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 设置socketChannel为非阻塞模式
  • 设置socket属性,阻塞超时时间等等
  • 创建SocketBufferHandler类包含两个ByteBuffer,读readBuffer,写writeBuffer
  • 将Socket和SocketBufferHandler包装为NioChannel对象
  • 将NioChannel包装为NioSocketWrapper对象
  • 再将NioChannel和NioSocketWrapper包装为PollerEvent对象
  • 最后将PollerEvent添加到Poller中定义的队列中等待处理
// NioEndpoint类方法
protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        // socketChannel设置为非阻塞模式
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        // 设置socket属性,阻塞超时时间等等
        socketProperties.setProperties(sock);

        NioChannel channel = nioChannels.pop();
        if (channel == null) {
        	// SocketBufferHandler类包含两个ByteBuffer,读readBuffer,写writeBuffer
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        getPoller0().register(channel);
    } catch (Throwable t) {
		...
        return false;
    }
    return true;
}

public void register(final NioChannel socket) {
    socket.setPoller(this);
    NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
    socket.setSocketWrapper(ka);
    ka.setPoller(this);
    ka.setReadTimeout(getSocketProperties().getSoTimeout());
    ka.setWriteTimeout(getSocketProperties().getSoTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setSecure(isSSLEnabled());
    ka.setReadTimeout(getConnectionTimeout());
    ka.setWriteTimeout(getConnectionTimeout());
    PollerEvent r = eventCache.pop();
    // 监听读事件
    ka.interestOps(SelectionKey.OP_READ);
    if (r == null) {
        r = new PollerEvent(socket, ka, OP_REGISTER);
    } else {
        r.reset(socket, ka, OP_REGISTER);
    }
    addEvent(r);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

小结一下:

  在NioEndpoint init的时候,创建阻塞通道ServerSocketChannel,后面start的时候,开启多个Acceptor(默认一个),没个Acceptor循环调用ServerSocketChannel的accept()方法获取新的连接,然后调用setSocketOptions处理新的连接,之后再进入循环accept下一个连接。

2.4、Poller

  • Poller也是单独的线程,直接看run方法
  • while一直循环通过SocketChannel注册监听读事件
  • 轮训获取就绪的事件
    • 然后使用NioSocketWrapper包装为SocketProcessor
    • 调用executor.execute(socketProcessor)线程池处理
    • SocketProcessor实现了Runnable方法,核心内容在run方法中(下篇单独将)
  • 相同请求连接的超时时间,相同请求默认2秒内不需要重新建立接受连接
// NioEndpoint内部类
public class Poller implements Runnable {

    private Selector selector;
    // 存放PollerEvent的队列
    private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();

    public Poller() throws IOException {
    	// 获取选择器
        this.selector = Selector.open();
    }

    private AtomicLong wakeupCounter = new AtomicLong(0);

	// poller核心实现,也就是这里扮演着selector的角色,IO多路复用的select()实现
    @Override
    public void run() {
        // 循环直到destroy()被调用
        while (true) {

            boolean hasEvents = false;

            try {
                if (!close) {
                	// 重要方法:遍历调用events中存放PollerEvent的run方法
                	// 通过PollerEvent中的NioChannel下的SocketChannel注册监听读事件
                    hasEvents = events();
                    if (wakeupCounter.getAndSet(-1) > 0) {
                        // 非阻塞轮训
                        keyCount = selector.selectNow();
                    } else {
                    	// 轮询已经就绪的事件,有事件keyCount>0,没有事件阻塞,这里默认是1秒
                        keyCount = selector.select(selectorTimeout);
                    }
                    wakeupCounter.set(0);
                }
                // 调用destroy()方法会调用,略过
				...
            } catch (Throwable x) {
				...
                continue;
            }
            
			...
			
			// keyCount>0有事件
			// 获取选择器中所有注册的通道中已准备好的事件
            Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
            // 遍历事件
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                // 上面events()方法注册事件时候传入的NioSocketWrapper
                NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment();
                // Attachment may be null if another thread has called
                // cancelledKey()
                if (attachment == null) {
                	// 移除事件
                    iterator.remove();
                } else {
                    iterator.remove();
                    // 处理读事件,因为上面events(),注册的就是监听读事件
                    // 使用NioSocketWrapper包装为SocketProcessor
                    // 调用executor.execute(socketProcessor)线程池处理
                    processKey(sk, attachment);
                }
            }// while

            // 连接的超时时间,默认2秒
            // 相同连接2秒内发请求,不需要重新接受连接
            // 超过两秒需要重新acceptor()
            timeout(keyCount, hasEvents);
        }// while

        getStopLatch().countDown();
    }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

envents():遍历调用events中存放PollerEvent的run方法

// NioEndpoint内部类Poller类的方法
public boolean events() {
    boolean result = false;

    PollerEvent pe = null;
    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
        result = true;
        try {
        	// 核心方法
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error("",x);
        }
    }

    return result;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 查看NioEndpoint内部类PollerEvent类的run方法
  • 核心方法将本次客户端注册到选择器上,并监听读事件
@Override
public void run() {
	// 如果是register事件
    if (interestOps == OP_REGISTER) {
        try {
        	// 将本客户端注册到选择器上,并监听读事件
            socket.getIOChannel().register(
                    socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
        } catch (Exception x) {
            log.error(sm.getString("endpoint.nio.registerFail"), x);
        }
    } else {
		// 上面说的相同的请求2秒内不需要重新建立连接,这2秒内就会调用这里,省略掉
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

3、stop停止和destroy销毁

// AbstractEndpoint类方法
public final void stop() throws Exception {
    stopInternal();
    if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) {
        unbind();
        bindState = BindState.UNBOUND;
    }
}

public final void destroy() throws Exception {
    if (bindState == BindState.BOUND_ON_INIT) {
        unbind();
        bindState = BindState.UNBOUND;
    }
    // jmx内容
	...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 解锁等待中的连接,阻止接受新连接,停止poller线程,关闭线程池
// NioEndpoint类方法
@Override
public void stopInternal() {
    releaseConnectionLatch();
    if (!paused) {
        // 暂停终结点,这将阻止它接受新连接
        pause();
    }
    if (running) {
        running = false;
        // 解锁等待acceptor的线程,等待1000毫秒
        unlockAccept();
        for (int i=0; pollers!=null && i<pollers.length; i++) {
            if (pollers[i]==null) continue;
            // 调用销毁方法,停止poller线程
            pollers[i].destroy();
            pollers[i] = null;
        }
        // 关闭线程池
        shutdownExecutor();
        eventCache.clear();
        nioChannels.clear();
        processorCache.clear();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 关闭ServerSocket连接
@Override
public void unbind() throws Exception {
    if (log.isDebugEnabled()) {
        log.debug("Destroy initiated for "+new InetSocketAddress(getAddress(),getPort()));
    }
    if (running) {
        stop();
    }
    // 关闭ServerSocket连接
    doCloseServerSocket();
    destroySsl();
    super.unbind();
    if (getHandler() != null ) {
        getHandler().recycle();
    }
    selectorPool.close();
    if (log.isDebugEnabled()) {
        log.debug("Destroy completed for "+new InetSocketAddress(getAddress(),getPort()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

总结

  • Tomcat为了实现支持多种I/O模型和应用层协议,一个容器可能对接多个连接器
  • 连接器Connector的初始化和启动实际就是Endpoint的初始化和启动
  • Endpoint中主要存在三种线程
    • Acceptor线程:一直死循环通过SocketChannel的accept方法接受连接,阻塞方法
    • Poller线程:获取到Acceptor线程的连接,通过SocketChannel注册监听读事件,交给连接池处理
    • 任务线程:读取解析socket请求数据封装为request和response调用Servelt方法(下篇文章单独讲)
文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览147154 人正在系统学习中
注:本文转载自blog.csdn.net的冬天vs不冷的文章"https://blog.csdn.net/qq_35512802/article/details/137260508"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top