上善若水
In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
posts - 146,comments - 147,trackbacks - 0

Connector概述

Connector是Jetty中可以直接接受客户端连接的抽象,一个Connector监听Jetty服务器的一个端口,所有客户端的连接请求首先通过该端口,而后由操作系统分配一个新的端口(Socket)与客户端进行数据通信(先握手,然后建立连接,但是使用不同的端口)。不同的Connector实现可以使用不同的底层结构,如Socket Connector、NIO Connector等,也可以使用不同的协议,如Ssl Connector、AJP Connector,从而在不同的Connector中配置不同的EndPoint和Connection。EndPoint用于连接(Socket)的读写数据(参见深入Jetty源码之EndPoint),Connection用于关联Request、Response、EndPoint、Server,并将解析出来的Request、Response传递给在Server中注册的Handler来处理(参见深入Jetty源码之Connection)。在Jetty中Connector的有:SocketConnector、SslSocketConnector、Ajp13SocketConnector、BlockingChannelConnector、SelectChannelConnector、SslSelectChannelConnector、LocalConnector、NestedConnector等,其类图如下。

Connector类图


Connector接口

首先Connector实现了LifeCycle接口,在启动Jetty服务器时,会调用其的start方法,用于初始化Connector内部状态,并打开Connector以接受客户端的请求(调用open方法);而在停止Jetty服务器时会调用其stop方法,以关闭Connector以及内部元件(如Connection等)以及做一些清理工作。因而open、close方法是Connector中用于处理生命周期的方法;对每个Connector都有name字段用于标记该Connector,默认值为:hostname:port;Connector中还有Server的引用,可以从中获取ThreadPool,并作为Handler的容器被使用(在创建HttpConnection时,Server实例作为构造函数参数传入,并在handleRequest()方法中将解析出来的Request、Response传递给Server注册的Handler);Connector还定义了一些用于配置当前Connector的方法,如Buffer Size、Max Idle Time、Low Resource Max Idle Time,以及一些统计信息,如当前Connector总共处理过的请求数、总共处理过的连接数、当前打开的连接数等信息。
Connector的接口定义如下:
public interface Connector extends LifeCycle { 
    // Connector名字,默认值hostname:port
    String getName();
    // 打开当前Connector
    void open() throws IOException;
    // 关闭当前Connector
    void close() throws IOException;
    // 对Server的引用
    void setServer(Server server);
    Server getServer();

    // 在处理请求消息头时使用的Buffer大小
    int getRequestHeaderSize();
    void setRequestHeaderSize(int size);
    // 在处理响应消息头时使用的Buffer大小
    int getResponseHeaderSize();
    void setResponseHeaderSize(int size);
    // 在处理请求消息内容时使用的Buffer大小
    int getRequestBufferSize();
    void setRequestBufferSize(int requestBufferSize);
    // 在处理响应消息内容使用的Buffer大小
    int getResponseBufferSize();
    void setResponseBufferSize(int responseBufferSize);
    // 在处理请求消息时使用的Buffer工厂
    Buffers getRequestBuffers();
    // 在处理响应消息时使用的Buffer工厂
    Buffers getResponseBuffers();

    // User Data Constraint的配置可以是None、Integral、Confidential,对这三种值的解释:
    // None:A value of NONE means that the application does not require any transport guarantees.
    // Integral:A value of INTEGRAL means that the application requires the data sent between the client and server to be sent in such a way that it can't be changed in transit.
    // Confidential:A value of CONFIDENTIAL means that the application requires the data to be transmitted in a fashion that prevents other entities from observing the contents of the transmission on.
    // In most cases, the presence of the INTEGRAL or CONFIDENTIAL flag indicates that the use of SSL is required.参考:这里
    // 如果配置了user-data-constraint为Integral或confidential表示所有相应请求都会重定向到使用Integral/Confidential Schema/Port构建的新的URL中。
    int getIntegralPort();
    String getIntegralScheme();
    boolean isIntegral(Request request);
    int getConfidentialPort();
    String getConfidentialScheme();
    boolean isConfidential(Request request);

    // 在将HttpConnection交给Server中的Handlers处理前,根据当前Connector,自定义一些EndPoint和Request的配置,如设置EndPoint的MaxIdleTime,Request的timestamp,
    // 清除SelectChannelEndPoint中的idleTimestamp,检查forward头等。
    void customize(EndPoint endpoint, Request request) throws IOException;

    // 主要用于SelectChannelConnector中,重置SelectChannelEndPoint中的idleTimestamp,即重新计时idle的时间。
   /它在每个Request处理结束,EndPoint还未关闭,并且当前连接属于keep-alive类型的时候被调用。
    void persist(EndPoint endpoint) throws IOException;

    // 底层的链接实例,如ServerSocket、ServerSocketChannel等。
    Object getConnection();
    
    //是否对"X-Forwarded-For"头进行DNS名字解析
    boolean getResolveNames();
    
    // 当前Connector绑定的主机名、端口号等。貌似在Connector的实现中没有一个默认的主机名。
    /由于端口号可以设置为0,表示由操作系统随机的分配一个还没有被使用的端口,因而这里由LocalPort用于存储Connector实际上绑定的端口号;
    /其中-1表示这个Connector还未开启,-2表示Connector已经关闭。
    String getHost();
    void setHost(String hostname);
    void setPort(int port);
    int getPort();
    int getLocalPort();
    
    // Socket的最大空闲时间,以及在资源比较少(如线程池中的任务数比最大可用线程数要多)的情况下的最大空闲时间,当空闲时间超过这个时间后关闭当前连接(Socket)。
    int getMaxIdleTime();
    void setMaxIdleTime(int ms);
    int getLowResourceMaxIdleTime();
    void setLowResourceMaxIdleTime(int ms);

    // 是否当前Connector处于LowResources状态,即线程池中的任务数比最大可用线程数要多
    public boolean isLowResources();

    /* ------------------------以下是一些获取和当前Connector相关的统计信息------------------------------------ */
    // 打开或关闭统计功能
    public void setStatsOn(boolean on);
    // 统计功能的开闭状态
    public boolean getStatsOn();
    // 重置统计数据,以及统计开始时间
    public void statsReset();
    // 统计信息的开启时间戳
    public long getStatsOnMs();

    // 当前Connector处理的请求数
    public int getRequests();
    // 当前Connector接收到过的连接数
    public int getConnections() ;
    // 当前Connector所有当前还处于打开状态的连接数
    public int getConnectionsOpen() ;
    // 当前Connector历史上同时处于打开状态的最大连接数
    public int getConnectionsOpenMax() ;
    // 当前Connector所有连接的持续时间总和
    public long getConnectionsDurationTotal();
    // 当前Connector的最长连接持续时间
    public long getConnectionsDurationMax();
    // 当前Connector平均连接持续时间
    public double getConnectionsDurationMean() ;
    // 当前Connector所有连接持续时间的标准偏差
    public double getConnectionsDurationStdDev() ;
    // 当前Connector的所有连接的平均请求数
    public double getConnectionsRequestsMean() ;
    // 当前Connector的所有连接的请求数标准偏差
    public double getConnectionsRequestsStdDev() ;
    // 当前Connector的所有连接的最大请求数
    public int getConnectionsRequestsMax();
}

AbstractConnector实现

Jetty中所有的Connector都继承自AbstractConnector,而它自身继承自HttpBuffers,HttpBuffers包含了Request、Response的Buffer工厂的创建以及相应Size的配置。AbstractConnector还包含了Name、Server、maxIdleTime以及一些统计信息的引用,用于实现Connector接口中的方法,只是一些Get、Set方法,不详述。除了Connector接口相关的配置,AbstractConnector还为定义了两个字段:_acceptQueueSize用于表示ServerSocket 或ServerSocketChannel中最大可等待的请求数,_acceptors用于表示用于调用ServerSocket或ServerSocketChannel中accept方法的线程数,建议这个数字小于或等于可用处理器的2倍。

在AbstractConnector中还定义了Acceptor内部类,它实现了Runnable接口,在其run方法实现中,它将自己的Thread实例赋值给AbstractConnector中的_acceptorThread数组中acceptor号对应的bucket,并更新线程名为:<name> Acceptor<index> <Connector.toString>。然后根据配置的_acceptorPriorityOffset设置当前线程的priority。只要当前Connector处于Running状态,并且底层链接实例不为null,不断的调用accept方法()。在退出的finally语句快中清理_acceptorThread数组中相应的Bucket值为null,并将线程原来的Name、Priority设置回来。

AbstractConnector实现了doStart()方法,它首先保证Server实例的存在;然后打开当前Connector(调用其open()方法),并调用父类的doStart方法(这里是HttpBuffers,用于初始化对应的Buffers);如果没有自定义的ThreadPool,则从Server中获取ThreadPool;最后根据acceptors的值创建_acceptorThread数组,将acceptors个Acceptor实例dispatch给ThreadPool。在doStop方法实现中,它首先调用close方法,然后对非Server中的ThreadPool调用其stop方法,再调用父类的doStop方法清理Buffers的引用,最后遍历_acceptorThread数据,调用每个Thread的interrupt方法。

在各个子类的accept方法实现中,他们在获取客户端过来的Socket连接后,都会对该Socket做一些配置,即调用AbstractConnector的configure方法,它首先设置Socket的TCP_NODELAY为true,即禁用Nagle算法(关于禁用的理由可以参考:http://jerrypeng.me/2013/08/mythical-40ms-delay-and-tcp-nodelay/#sec-4-2,简单的,如果该值为false,则TCP的数据包要么达到TCP Segment Size,要么收到一个Ack,才会发送出去,即有Delay);然后如果设置了_soLingerTime,则开启Socket中SO_LINGER选项,否则,关闭该选项(SO_LINGER选项用于控制关闭一个Socket的行为,如果开启了该选项,则在关闭Socket时会等待_soLingerTime时间,此时如果有数据还未发送完,则会发送这些数据;如果关闭了该选项,则Socket的关闭会立即返回,此时也有可能继续发送未发送完成的数据,具体参考:http://blog.csdn.net/factor2000/article/details/3929816)。

在ServerSocket和ServerSocketChannel中还有一个SO_REUSEADDR的配置,一般来说当一个端口被释放后会等待两分钟再被使用,此时如果重启服务器,可能会导致启动时的绑定错误,设置该值可以让端口释放后可以立即被使用(具体参考:http://www.cnblogs.com/mydomain/archive/2011/08/23/2150567.html)。在AbstractConnector中可以使用setReuseAddress方法来配置,默认该值设置为true。

AbstractConnector中还实现了customize方法,它在forwarded设置为true的情况下设置相应的attribute:javax.servlet.request.cipher_suite, javax.servlet.request.ssl_session_id,以及Request中对应Host、Server等头信息。这个逻辑具体含义目前还不是很了解。。。。

最后关于统计数据的更新方法,AbstractConnector定义了如下方法:
protected void connectionOpened(Connection connection)
protected void connectionUpgraded(Connection oldConnection, Connection newConnection)
protected void connectionClosed(Connection connection)

SocketConnector实现

有了AbstractConnector的实现,SocketConnector的实现就变的非常简单了,它保存了一个EndPoint的Set,表示所有在这个Connector下正在使用的EndPoint,然后是ServerSocket,在open方法中创建,并在getConnection()方法中返回,还有一个localPort字段,当ServerSocket被创建时从ServerSocket实例中获取,并在getLocalPort()方法中返回。在close方法中关闭ServerSocket,并设置localPort为-2;在accept方法中,调用ServerSocket的accept方法,返回一个Socket,调用configure方法对新创建的Socket做一些基本的配置,然后使用该Socket创建ConnectorEndPoint,并调用其dispatch方法;在customize方法中,在调用AbstractConnector的customize方法的同时还设置ConnectorEndPoint的MaxIdleTime,即设置Socket的SO_TIMEOUT选项,用于配置该Socket的空闲可等待时间;在doStart中会先清理ConnectorEndPoint的集合,而在doStop中会关闭所有还处于打开状态的ConnectorEndPoint。

SelectChannelConnector实现

SelectChannelConnector内部使用ServerSocketChannel,在open方法中创建ServerSocketChannel,配置其为非blocking模式,并设置localPort值;在accept方法中调用ServerSocket的accept方法获得一个SocketChannel,配置该Channel为非blocking模式,调用AbstractChannel的configure方法做相应Socket配置,最后将该SocketChannel注册给ConnectorSelectManager;在doStart方法中,它会初始化ConnectorSelectManager的SelectSets值为acceptors值、MaxIdleTime、LowResourceConnections、LowResourcesMaxIdleTime等值,并启动该Manager,并dispatch acceptors个线程,不断的调用Manager的doSelect方法;在close方法中会先stop ConnectorSelectManager,然后关闭ServerSocketChannel,设置localPort为-2;在customize方法中会清除SelectChannelEndPoint的idleTimestamp,重置其MaxIdleTime以及重置Request中的timestamp的值;在persist方法中会重置SelectChannelEndPoint中idleTimestamp的值。

BlockingChannelConnector实现

BlockingChannelConnector实现类似SocketConnector,不同的是它使用ServerSocketChannel,并且其EndPoint为BlockingChannelEndPoint。所不同的是它需要在doStart方法中启动一个线程不断的检查所有还在connections集合中的BlockingChannelEndPoint是否已经超时,每400ms检查一次,如果超时则关闭该EndPoint。

其他的Connector的实现都比较类似,而SSL相关的Connector需要也只是加入了SSL相关的逻辑,这里不再赘述。
posted on 2014-05-01 18:40 DLevin 阅读(4335) 评论(0)  编辑  收藏 所属分类: Jetty

只有注册用户登录后才能发表评论。


网站导航: