Java NIO (一)

avatar
作者
筋斗云
阅读量:0

因工作需要我接触到了netty框架,这让我想起之前为夺高薪而在CSDN购买的Netty课程。如今看来,这套课程买的很值。这套课程中关于NIO的讲解,让我对Tomcat产生了浓厚的兴趣,于是我阅读了Tomcat中关于服务端和客户端之间连接部分的源码。从今天起想对这块内容进行一次全面梳理。下面就让我们一起看看这块看似贫瘠,实则充满生机的域外圣地吧!

1 概述

在正式梳理前,我们先看一下JDK提供的一种新IO——Java NIO,其全称为:java non-blocking IO。它是JDK从1.4版本开始提供的新API。它对原有输入/输出做了一些改进,因此又被称为NIO,即New IO。它是一种同步非阻塞IO。

JDK中与NIO相关的类均被放在java.nio包及其子包下,并且它们对原java.io包中的很多类都进行了改写。NIO有三大核心部分:Channel(通道)、Buffer(缓冲区)、Selector(选择器)。下面这幅图就展示了NIO的通信模型:

关于这个通信模型,我想谈一下自己的理解:从图中可以看出无论是NIO模式下,还是BIO模式下,均存在两个角色:服务端和客户端。服务端用于监听指定端口,等待客户端的连接请求,如果有客户端请求连接,则服务端会创建对应的连接对象,并将该请求的核心诉求交由下游的业务组件进行处理。客户端则与指定服务端之间建立连接,并与之进行交互。相比之下客户端的操作非常简单,就是发送连接请求,然后与服务端进行通信。既然如此是不是就可以认为NIO和BIO是一样的呢?答案肯定是不能。NIO和BIO在处理客户端与服务端之间的连接的方式上有很大差别:在BIO模式下,客户端与服务端成功建立连接后,服务端会将这个连接交由一个单独的线程处理,即服务端线程与客户端之间是1:1的关系,具体如下图所示:

 也就是说可以同时接入的客户端的数量是由服务端自己决定的,因此如果同时接入的客户端的数量超过服务端所能承受的最大限度,就会造成服务端崩溃,影响客户端体验。个人认为客户端与服务端之间的同步阻塞式通信是造成这种问题的根本原因。这种通信模式的特点就是客户端与服务端之间的连接只要建立起来,服务端分配的处理当前连接的线程就必须同步等待客户端的所有操作,即即使当前连接对应的客户端被莫名挂起(发送数据、处理数据或其他莫名原因),这段时间内,该服务端线程即使有足够精力处理其他事情,其也不能做,譬如:在客户端数据未发送完成的时间段内服务端线程必须等待,不能干其他事情。这就好比在路口等红绿灯一样,即使这个红绿灯左右两侧的道路空空如也,或者红绿灯前后的道路上排起了万米长龙,在红绿灯提示车辆可以通行之前,所有车辆也必须等待。借助这个案例,我们可以对BIO模式下的通信模型有更加具象化的认识。计算机的这种特征,也从侧面说明了即使其拥有远超人类的计算性能,其也无法逃脱永恒的自然规律。说到这里,我不禁在想,这种废柴通信模型,应该不会有什么项目会用吧?可现实就喜欢打我的脸,又或许是我喜欢被现实打脸吧!老版的Tomcat就用了,而且利用这种通信模型开发的Tomcat还被广泛应用于各大公司的各个项目中。那Tomcat的工程师是如何解决BIO通信模式下服务端的线程数量随客户端数量增加而不断增加的问题呢?解决的方案很简单,线程池,即规定服务端所能承受的最大线程数,超过规定数量的客户端连接直接被废弃或延迟处理。这种方案舍弃了一部分客户的诉求,只服务了一部分客户,所以说现实中真的存在绝对公平吗?不过这种方案充分发挥了计算机高速处理的能力。然而这种方案依旧没有解决计算机资源浪费的问题。好了,我们还是先看一下NIO是如何处理客户端与服务端之间的连接吧!下面这幅图展示NIO模式下客户端与服务端之间连接的处理方式:

从图中可以看出客户端与服务端建立连接后,服务端会将该连接包装成一个Channel对象,然后将其注册到Selector上,接着启动一个单独的线程处理该Selector。注意:Channel注册到Selector上时,会指定自己关心的事件,如果Selector检测到该事件发生,则会获取事件,并对其进行处理。这就好比现在的银行通过在大堂增加客户服务台一样。先由其对到银行办理业务的客户进行过滤,如果客户需求简单,比如咨询等,则直接处理,否则流转该客户到业务处理窗口处理。在这里大堂中新增加的客户服务台就是上图中的选择器——Selector,而客户就是上图中的客户端——Client,银行服务大楼就是我们通常所说的服务器——Server,各个服务窗口就是服务器中的线程——Thread,而服务窗口中坐着的办事员就是本文中提到的业务组件。通过银行这个服务客户的案例,我对NIO的三大组件的作用有了一些具象的认识,那如何利用这些工具写一个可用的服务器呢?

2 NIO在 Tomcat 中的应用

通过上一小节我对服务端编程中的一些基本概念有了大致的了解,但这无法让我停下 探索这片繁茂之地的脚步。在不考虑性能和其他复杂问题的情况下,我们可以利用BIO工具在很短时间内写出一个小型服务端程序,但我实在不知道如何利用NIO工具快速写出一个这样的服务端小程序。为了解决这个问题,我曾疯狂百度,甚至购买各种课程,包括本篇文章开头提到的Netty课程,然而结果令人遗憾。于是我开启了“摆烂”模式,主动寻找那些与这件事情无关,却能够带给我激情的事情做,比如玩游戏。直到一年前,我发现自己所面临的问题依旧如此严峻,于是又开始神经质式的学习模式。这一时期,我主动搜索并购买各种与Tomcat相关的资料,想以此构建全方位的Tomcat知识体系,并提升自己的专业技术能力,进而博得不错的机会。不过,这次和上次追求NIO速成的结果一样。然而有句俗话讲:失之东隅,收之桑榆。在这次追求速成的过程中,我对NIO这个知识体系却有了一些不一样的理解。虽然我不知道如何描述这种理解,但总觉得有必要对其中使用的NIO体系来一次全面梳理。

我们都知道Tomcat是一个Web服务器,通常用于部署Web应用程序。想必使用过Tomcat的你也知道其启动后,通常会监听服务端8080端口,以等待客户端的连接。看到这不知您是否想到了Socket编程中配置监听端口的步骤,所以按照Java Socket编程来理解Tomcat中应该也有这样的步骤。在正式梳理前,先来看一下Tomcat的体系结构,具体如下图所示:

从图中不难看出,Tomcat的体系结构非常复杂的,因此想要快速学好,对于基础薄弱的人来讲是有一定难度的。从图中可以看出,Tomcat的组件是一层套一层的,比如Server组件包含一个Service组件,Service组件包含一个Connector组件和Engine组件,Engine组件包含了一个Host组件,Host组件包含了Context组件,Context组件包含了其他组件和Web应用。这种模式与俄罗斯套娃非常类似。如果没有玩过俄罗斯套娃,相信您应该在抖音短视频中刷到过《大杨哥整蛊小杨哥》的视频,每当“单纯”的小杨哥以为要拿到礼物时,总会被容器内缩小版的容器整到崩溃。看着他愤怒又无奈的样子,着实可笑。继续回到要讨论的Connector组件上。从图中可以知道它位于Service组件中,并且持有一个线程池。另外从图中也可以看出浏览器到Web应用的所有请求都要先进到Connector组件中。那Connector组件是做什么的呢?Connector代表了一个连接,通过它可以实现客户端对Tomcat的接入,比如可以启动一个服务端套接字ServerSocket接收客户端的请求等。那Tomcat的工程师团队开发这个组件的思路是什么呢?

  1. 初始化。这个过程位于ConnectorCreateRule的begin()方法中。这个方法中有这样一段代码:Connector con = new Connector(attributes.getValue("protocol"));
  2. 启动。这个过程位于StandardService的startInternal()方法中。这个方法中有这样一段代码:
for (Connector connector: connectors) {     try {         // If it has already failed, don't try and start it         if (connector.getState() != LifecycleState.FAILED) {             connector.start();         }     } catch (Exception e) {         log.error(sm.getString(                 "standardService.connector.startFailed",                 connector), e);     } } 

 这里就不再罗列该组件停止的逻辑了,因为它与启动的逻辑基本一致,有兴趣的可以自己翻看一下Tomcat源码。下面继续翻阅Connector类的start()方法,最终会走到Connector类的startInternal()方法中,这个方法有这样一段代码:protocolHandler.start()。这段代码就是启动ProtocolHandler的实现类。那这个实现类会是谁呢?通过阅读Connector类的源码可以知道这个实现类是org.apache.coyote.http11.Http11NioProtocol。这个类是干什么的?在继续梳理这个类之前,还是先看一下Connector的继承结构,具体如下图所示:

这里的Lifecycle接口定义了Tomcat中各组件的生命周期,其中的方法都与这个生命周期有关,比如:init()、start()、stop()。【这句描述仅是个人理解,如有不对,还请见谅】这块知识并非本系列文章的重点,所以这里不再赘述。下面还是看一下与本系列主题具有密切关系的组件Http11NioProtocol,其继承结构如下所示:

图中的浅色部分就是Http11NioProtocol类的继承结构。从图中可以看出Http11NioProtocol最终实现了ProtocolHandler接口。现在看这些类的继承结构又有什么意义呢?还记得前面提到的Connector对象的初始化吗?在Connector类的构造函数中有这样一段代码,如下所示:

Class<?> clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); 

也就是说,在创建Connector对象的时候,就会为该类中ProtocolHandler类型的属性protocolHandler进行初始化,而且其实际类型就是刚刚看到的Http11NioProtocol【注意:这里是通过java反射方式创建ProtocolHandler对象的】。知道这个又有什么意义呢?我们先来看一下Http11NioProtocol类的构造方法,如下所示:

public Http11NioProtocol() {         super(new NioEndpoint()); } 

这个构造方法会调用父类构造方法,并传递一个NioEndpoint类型的对象进去。那这个父类构造做了什么呢?具体源码如下所示:

public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S,?> endpoint) {     super(endpoint); }  public AbstractHttp11Protocol(AbstractEndpoint<S,?> endpoint) {     super(endpoint);     setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);     ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);     setHandler(cHandler);     getEndpoint().setHandler(cHandler); }  public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {     this.endpoint = endpoint;     setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);     setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); } 

从源码可以看到,这些构造方法其实就是做一些属性的初始化,比如:connectionTimeout、soLinger、tcpNoDelay、endpoint、handler等。这里尤其要注意的是endpoint和handler这两个属性,主要关注以下几点:

  1. ProtocolHandler的实现类Http11NioProtocol对象持有一个Handler<S>类型的对象
  2. AbstractEndpoint<S,U>的实现类NioEndpoint对象持有一个Handler<S>类型的对象
  3. ProtocolHandler的实现类Http11NioProtocol对象持有一个AbstractEndpoint<S,U>类型的对象

注意:Http11NioProtocol对象和NioEndpoint对象持有的Handler<S>对象是同一个。

说实在话,我有点搞不清楚自己为什么要梳理这个了,但不梳理又总觉得无法构建起一个完整的知识体系。因此在下恳请各位看官稍安勿躁。先让我们一起看一下NioEndpoint的继承体系,具体如下图所示:

从图中可以看出NioEndpoint类最终继承了AbstractEndpoint<S, U>抽象类,同时其指定的泛型类型为NioChannel和SocketChannel。

下面我们再来看一下Handler<S>的继承体系及其实现类,具体的继承体系可以参见下面这幅图:

从图中不难看出Handler接口位于AbstractEndpoint类中,前者是AbstractEndpoint类中的一个静态内部接口。而ConnectionHandler实现了这个接口,不过这个实现类位于AbstractProtocol类中,且前者是后者中的一个静态内部类。看到这里是不是有种无间道的感觉?如果您可以一下子理解,我真的由衷钦佩,因为直到现在我还是有点懵,不知道为什么会这样做。不过再看一下前面梳理出来的三个重要步骤,我觉得自己好像懂了,这三个步骤是:

不过这种认识好像不是特别清晰,所以我想先了解一下NioEndpoint类的作用。了解一个事务的最好的方法就是触碰它。NioEndpoint类究竟做了什么呢?我觉得可以从下面几个方面来了解一下。

2.1 初始化

为了弄清楚NioEndpoint类的作用,本小节我们以Connector类的initInternal()方法为切入点来了解。在这个方法中有这样一句代码:protocolHandler.init()。也就是说Connector在进行初始化的时候,会主动调用ProtocolHandler实现类中的init()方法,根据上面的梳理这个调用先走到AbstractEndpoint<S,U>类中的init()方法中然后这个方法会先判断bindOnInit的值是否为true,如果是则继续调用AbstractEndpoint<S,U>类中的bindWithCleanup()方法,接着这个方法会调用AbstractEndpoint<S, U>中的bind()方法。注意这个方法是个抽象方法,具体的实现逻辑位于子类中。因此这个调用最终走的是NioEndpoint类中的bind()方法,最后这个方法又调用了本类中的initServerSocket()方法。这个方法主要完成了ServerSocketChannel对象的创建及初始化。这个调用过程涉及到的源码如下所示:

// 1) Connector类中的initInternal()方法 protected void initInternal() throws LifecycleException {     // ......... try {         // 调用 ProtocolHandler 的 init 方法完成相应的初始化         // 执行过程为:调用 AbstractHttp11Protocol 的 init 方法(该方法内部调用父类的 init 方法,父类的 init 方法中再调用 endpoint 的 init 方法)         // Endpoint 类的 init 方法主要完成         protocolHandler.init();     } catch (Exception e) {         throw new LifecycleException(                 sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);     } } // 2) AbstractProtocol类中的init()方法 public void init() throws Exception { // .........     // 调用 NioEndpoint 类的 init 方法     /**      * Endpoint 类结构图为:      * AbstractEndpoint - 抽象类      * AbstractJsseEndpoint 继承于 AbstractEndpoint;AprEndpoint 继承于 AbstractEndpoint      * Nio2Endpoint / NioEndpoint 继承于 AbstractJsseEndpoint      */     endpoint.init(); } // 3) AbstractEndpoint类中的init()方法 public void init() throws Exception {     if (bindOnInit) {         bindWithCleanup();         bindState = BindState.BOUND_ON_INIT;     }     // ......... } // 4) AbstractEndpoint类中的bindWithCleanup()方法 private void bindWithCleanup() throws Exception {     try {         bind(); // 绑定到特定端口     } catch (Throwable t) {         // Ensure open sockets etc. are cleaned up if something goes         // wrong during bind         ExceptionUtils.handleThrowable(t);         unbind(); // 释放绑定         throw t;     } } // 4) NioEndpoint类中的bind()方法 public void bind() throws Exception {     // 初始化 ServerSocket     initServerSocket();     setStopLatch(new CountDownLatch(1));     // Initialize SSL if needed     initialiseSsl(); } // 5) NioEndpoint类中的initServerSocket()方法 protected void initServerSocket() throws Exception {     if (getUseInheritedChannel()) {         // Retrieve the channel provided by the OS         // System.inheritedChannel() 用于返回从生成当前JVM(Java虚拟机)的实体继承的通道         Channel ic = System.inheritedChannel();         if (ic instanceof ServerSocketChannel) {             serverSock = (ServerSocketChannel) ic;         }         if (serverSock == null) {             throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));         }     } else {         // 调用 ServerSocketChannel.open() 方法来打开 ServerSocketChannel         // serverSock 为 ServerSocketChannel 类型         serverSock = ServerSocketChannel.open();         socketProperties.setProperties(serverSock.socket());         // TODO 绑定 8080 端口,并对其进行监听         // bind()方法将 ServerSocket 类绑定到指定的地址,而 ServerSocketChannel 类也有bind() 方法,该方法 public final ServerSocketChannel bind(SocketAddress local) 的作用是将通道的套接字绑定到本地地址并侦听连接         InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());         serverSock.socket().bind(addr,getAcceptCount());     }     serverSock.configureBlocking(true); //mimic APR behavior } 

梳理到这里我们看到了Tomcat中用到的NIO相关的第一个类ServerSocketChannel。通过这个源码不难看出,Tomcat的工程师在开发时也是按照JDK的规范一步步书写的:

  1. 通过ServerSocketChannel类提供的open()方法创建一个ServerSocketChannel对象
  2. 为该ServerSocketChannel对象设置相关属性,譬如:soTimeout、receiveBufferSize等
  3. 将该ServerSocketChannel对象绑定到指定端口上,譬如8080,注意这里的8080是通过配置文件配置的

2.2 启动

在上一小节中,通过梳理initInternal()方法,我们知道了NioEndpoint的第一个作用:初始化ServerSocketChannel对象。这一小节我们继续梳理NioEndpoint的作用,不过这次的切入点是Connector类的startInternal()方法。该方法有这样一句代码:protocolHandler.start()。这说明Connector类在启动的时候会主动调用ProtocolHandler实现类中的start()方法,根据前面的梳理这个调用先走到AbstractEndpoint<S,U>类中的init()方法中然后这个方法会先判断bindState的状态是否是BindState.UNBOUND,如果是则继续走初始化流程,否则调用AbstractEndpoint<S,U>类中的startInternal()方法。注意这个方法是个抽象方法,具体的实现逻辑位于子类中。因此这个调用最终走到了NioEndpoint类中的startInternal()方法中。上面这个调用过程涉及到的源码如下所示:

// 1) Connector类中的startInternal()方法 protected void startInternal() throws LifecycleException { // .........     try {         // 执行 protocolHandler 对象的 start 方法         protocolHandler.start();     } catch (Exception e) {         throw new LifecycleException(                 sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);     } } // 2) AbstractProtocol类中的start()方法 public void start() throws Exception {     // .........     endpoint.start();     // ......... } // 3) AbstractEndpoint类中的start()方法 public final void start() throws Exception {     // 如果没有绑定到相关端口,则执行绑定操作     if (bindState == BindState.UNBOUND) {         bindWithCleanup();         bindState = BindState.BOUND_ON_START;     }     // 启动监听线程     startInternal(); }  // 4) NioEndpoint类中的bind()方法 public void bind() throws Exception {     // 初始化 ServerSocket     initServerSocket();     setStopLatch(new CountDownLatch(1));     // Initialize SSL if needed     initialiseSsl(); } // 5) NioEndpoint类中的startInternal()方法 public void startInternal() throws Exception {      if (!running) {         running = true;         paused = false;          if (socketProperties.getProcessorCache() != 0) {             processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,                     socketProperties.getProcessorCache());         }         if (socketProperties.getEventCache() != 0) {             eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,                     socketProperties.getEventCache());         }         if (socketProperties.getBufferPool() != 0) {             nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,                     socketProperties.getBufferPool());         }          // Create worker collection         if (getExecutor() == null) {             createExecutor();         }          // 初始化 LimitLatch ,其用于控制最大线程数         initializeConnectionLatch();          // Start poller thread         poller = new Poller();         // 创建通道管理器 Selector ,并启动在后台运行         Thread pollerThread = new Thread(poller, getName() + "-Poller");         pollerThread.setPriority(threadPriority);         pollerThread.setDaemon(true);         pollerThread.start();          // 开启接收线程,后台运行,用于接收客户端发来的请求,调用的是父类的方法         // 这个方法实际是在它的超类AbstractEndpoint里面         startAcceptorThread();     } } 

通过NioEndpoint类的源码不难看出startInternal()这个方法主要完成了这样几件事:

  1. 创建工作线程池【这里涉及到了线程池的知识】
  2. 创建Poller工作线程,为其设置相关属性,并启动之
  3. 启动Acceptor线程

这里我们主要看Poller和Acceptor这两个类。它们都实现了Runnable接口,并对其中的run方法进行了实现。这里要注意一下:Acceptor类持有了当前的AbstractEndpoint对象,即NioEndpoint对象,同时Acceptor的泛型为SocketChannel。下面先来看一下Acceptor类的run()方法,其源码如下所示:

public void run() {      int errorDelay = 0;     long pauseStart = 0;      try {         // Loop until we receive a shutdown command         while (!stopCalled) {              // Loop if endpoint is paused.             // There are two likely scenarios here.             // The first scenario is that Tomcat is shutting down. In this             // case - and particularly for the unit tests - we want to exit             // this loop as quickly as possible. The second scenario is a             // genuine pause of the connector. In this case we want to avoid             // excessive CPU usage.             // Therefore, we start with a tight loop but if there isn't a             // rapid transition to stop then sleeps are introduced.             // < 1ms       - tight loop             // 1ms to 10ms - 1ms sleep             // > 10ms      - 10ms sleep             while (endpoint.isPaused() && !stopCalled) {                 if (state != AcceptorState.PAUSED) {                     pauseStart = System.nanoTime();                     // Entered pause state                     state = AcceptorState.PAUSED;                 }                 if ((System.nanoTime() - pauseStart) > 1_000_000) {                     // Paused for more than 1ms                     try {                         if ((System.nanoTime() - pauseStart) > 10_000_000) {                             Thread.sleep(10);                         } else {                             Thread.sleep(1);                         }                     } catch (InterruptedException e) {                         // Ignore                     }                 }             }              if (stopCalled) {                 break;             }             state = AcceptorState.RUNNING;              try {                 //if we have reached max connections, wait                 // 如果达到最大连接数,则等待                 // 判断连接数是否大于阈值 (默认为 10000), 否则将会休眠                 // 里面用到了 AQS                 endpoint.countUpOrAwaitConnection();                  // Endpoint might have been paused while waiting for latch                 // If that is the case, don't accept new connections                 if (endpoint.isPaused()) {                     continue;                 }                  // 下面这段相当于 SocketChannel socket = null;                 U socket = null;                 try {                     // Accept the next incoming connection from the server socket                     // 接收请求,serverSocketAccept 方法由子类实现,本代码中的子类即 NioEndpoint                     // 阻塞, 直到有连接                     // bind 方法中已经设置为 阻塞                     // 监听socket负责接收新连接                     // Acceptor 在启动后会阻塞在 endpoint.serverSocketAccept(); 方法处,当有新连接到达时,该方法返回一个 SocketChannel                     // 这里的 endpoint 为 NioEndpoint                     socket = endpoint.serverSocketAccept();                 } catch (Exception ioe) {                     // We didn't get a socket                     endpoint.countDownConnection();                     if (endpoint.isRunning()) {                         // Introduce delay if necessary                         errorDelay = handleExceptionWithDelay(errorDelay);                         // re-throw                         throw ioe;                     } else {                         break;                     }                 }                 // Successful accept, reset the error delay                 errorDelay = 0;                  // Configure the socket                 if (!stopCalled && !endpoint.isPaused()) {                     // setSocketOptions() will hand the socket off to                     // an appropriate processor if successful                     // 将 Socket 交给 Poller 线程处理                     //处理接受到的 socket 对象并发布事件                     if (!endpoint.setSocketOptions(socket)) {                         endpoint.closeSocket(socket);                     }                 } else {                     endpoint.destroySocket(socket);                 }             } catch (Throwable t) {                 ExceptionUtils.handleThrowable(t);                 String msg = sm.getString("endpoint.accept.fail");                 // APR specific.                 // Could push this down but not sure it is worth the trouble.                 if (t instanceof Error) {                     Error e = (Error) t;                     if (e.getError() == 233) {                         // Not an error on HP-UX so log as a warning                         // so it can be filtered out on that platform                         // See bug 50273                         log.warn(msg, t);                     } else {                         log.error(msg, t);                     }                 } else {                         log.error(msg, t);                 }             }         }     } finally {         stopLatch.countDown();     }     state = AcceptorState.ENDED; } 

这个方法看着很长,但仔细研究您会发现,其本质上就是一个while(true)循环,循环体内的主要逻辑就是:socket = endpoint.serverSocketAccept()和endpoint.setSocketOptions(socket),其中前者是从被监听端口(8080)上拿到客户端的Socket连接,然后将其交给Acceptor持有的NioEndpoint对象(这个对象是在Connector创建ProtocolHandler时创建的)进行处理(调用NioEndpoint类上的setSocketOptions()方法)。因此我个人觉得Acceptor线程的主要作用就是接收并创建客户端Socket连接。至于前面提到的NioEndpoint中的setSocketOptions()方法究竟做了什么,个人觉得没有必要详细描述,大家翻看源码便可一目了然。这里我只用自己的话总结一下:这个方法将从8080端口获取到的SocketChannel连接包装到NioSocketWrapper对象中,然后将该对象注册到Poller线程上,并由其完成相应的处理。这里提到的Poller是我们要梳理的第二个重要的后台线程。它是一个位于NioEndpoint类中且实现了Runnable接口的线程类。它里面有一个Selector类型的属性selector,这个属性的初始化是在Poller线程的构造方法中完成的,具体代码为:this.selector = Selector.open()。这就是我们要梳理的NIO中的第二个重要类Selector。梳理到这里我不禁好奇Poller类创建这个对象干做什么呢?因为按照Netty课程中的讲解,Selector不是出现在这里的。如果要了解Selector对象的作用还得翻看Poller类的源码,先来看一下Poller类的run()方法,其源码如下所示:

public void run() {     // Loop until destroy() is called     // 线程启动后,这里会一直循环,直到有相关事件过来,可以处理为止     while (true) {          boolean hasEvents = false;          try {             if (!close) {                 // 当前通道是否有需要执行的事件                 // 检查 events 事件队列是否存在元素                 // 然后遍历 events 事件队列的所有元素, 将 Socket 以 OP_READ 事件注册到 Selector 上                 hasEvents = events();                 if (wakeupCounter.getAndSet(-1) > 0) {                     // If we are here, means we have other stuff to do                     // Do a non blocking select                     keyCount = selector.selectNow();                     System.out.println("111111111111");                 } else {                     keyCount = selector.select(selectorTimeout);                     System.out.println("2222222222222");                 }                 wakeupCounter.set(0);             }             if (close) {                 events();                 timeout(0, false);                 try {                     selector.close();                 } catch (IOException ioe) {                     log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);                 }                 break;             }             // Either we timed out or we woke up, process events first             if (keyCount == 0) {                 hasEvents = (hasEvents | events());             }         } catch (Throwable x) {             ExceptionUtils.handleThrowable(x);             log.error(sm.getString("endpoint.nio.selectorLoopError"), x);             continue;         }          Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;         // Walk through the collection of ready keys and dispatch         // any active event.         while (iterator != null && iterator.hasNext()) {             System.out.println("222222222222212412341234123412341");             SelectionKey sk = iterator.next();             iterator.remove();             NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();             // Attachment may be null if another thread has called             // cancelledKey()             // 处理到达的请求             if (socketWrapper != null) {                 // 处理 socket 请求                 processKey(sk, socketWrapper);             }         }          // Process timeouts         timeout(keyCount,hasEvents);     }      getStopLatch().countDown(); } 

Netty课程中提到的编码逻辑在源码的后半部分体现出来了。首先调用Selector的selectorKeys()方法获得发生了某种事件SocketChannel,然后判断事件类型,并作进一步处理。原本的SocketChannel被NioSocketWrapper替换了;原本判断事件类型并做进一步处理的逻辑,在这里被交给了processKey()方法(这个方法会判断当前发生的事件类型是读,还是写,又或者是文件处理)。有兴趣的读者可以自己翻阅一下Tomcat源码。到这里我们看到了NIO三大组件中的第二个Selector(当然这里也出现了SocketChannel,由于它与ServerSocketChannel同属Channel,所以这里就不再罗列了)。

2.3 Tomcat中用到的NIO Buffer

在前两小节我们梳理了Tomcat用到的NIO中的两大组件Channel和Selector,那Tomcat是否有用到NIO中的第三大组件Buffer呢?这个答案自然是是的。还记得第二小节提到的包装SocketChannel的方法吗?是的,就是NioEndpoint中的setSocketOptions()方法,这个方法会首先创建一个SocketBufferHandler对象,具体创建代码为:

SocketBufferHandler bufhandler = new SocketBufferHandler(         socketProperties.getAppReadBufSize(),         socketProperties.getAppWriteBufSize(),         socketProperties.getDirectBuffer()); // 下面再看一下这个类的构造方法 public SocketBufferHandler(int readBufferSize, int writeBufferSize, boolean direct) {     this.direct = direct;     if (direct) {         // 创建直接缓冲区 - 用于读         readBuffer = ByteBuffer.allocateDirect(readBufferSize);         // 创建直接缓冲区 - 用于写         writeBuffer = ByteBuffer.allocateDirect(writeBufferSize);     } else {         // 设置缓冲区初始容量 - 用于读         readBuffer = ByteBuffer.allocate(readBufferSize);         // 设置缓冲区初始容量 - 用于写         writeBuffer = ByteBuffer.allocate(writeBufferSize);     } } 

从这段源码中可以看到一个名为ByteBuffer的类,它就是NIO中的第三大组件Buffer。关于它的作用及用法我们将在后续章节中逐步展开,这里就不再啰嗦了。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!