I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0
1.NioSocketAcceptor持有一个Selector对象.->调用bind方法后->AbstractPollingIoAcceptor#bindInternal. 

protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
            
// 创建了一个绑定请求的future operation.当selector处理注册的时候,会signal future.
            AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
    
            
// 将请求加入注册队列
            registerQueue.add(request);
    
            
// 创建acceptor任务并启动(acceptor-worker线程),这个是单线程的
            startupAcceptor();
    
            
// 这块的细节很重要.因为Acceptor任务是一个while(){selector.select},此时accepor线程因为select操作为阻塞,因为此时没有任何事件发生.
            
// 所以这边用了一个信号量.在初始化Acceptor这个任务后并启动后,释放这个许可(信号量初始化为1).然后lock.acquire继续执行.
            try {
                lock.acquire();
    
                
// 这里等待了10毫秒,是要给acceptor-worker线程机会执行任务.即进入while(select),即执行到selector处
                Thread.sleep(10);
                
// 因为acceptor任务中的selector此时因为select操作阻塞,所以这里执行唤醒selector操作.进而可以处理之前加入注册队列的请求.
                wakeup();
            }
 finally {
                lock.release();
            }

    
            
// 阻塞,等到注册队列的请求被处理完成
            request.awaitUninterruptibly();
            
            
    }

2.

private class Acceptor implements Runnable {
        
public void run() {
            
            
// 释放一个许可,使得主线程可以执行后续后续调度(唤醒selector).
            lock.release();

            
// break的条件是之前bind的serversocket全部unbind了.
            while (selectable) {
                
try {
                    
                    
// selector执行select.
                    
// 1.有新连接出现则被唤醒 2.在首次阻塞的时候被主线程wakeup(处理注册OP_ACCEPT)
                    int selected = select();

                    
// registerHandles做的主要事情是将注册队列的绑定地址,执行NioSocketAcceptor#open.
                    
// 即(nio的一系列配置)1.ServerSocketChannel.open() 2.channel.configureBlocking(false)
                    
// 3.ServerSocket socket = channel.socket() 4.socket.setReuseAddress(isReuseAddress())
                    
// 5.socket.bind(localAddress, getBacklog()) 6.channel.register(selector, SelectionKey.OP_ACCEPT) 向selector注册Acceptor事件
                    
// 这里有两个ServerSocket的参数可以设置 reuseAddress/backlog
                    nHandles += registerHandles();

                    
..检查regiser是否成功.如果不成功则break
                    
..检查取消队列是否为空,如果为空则break(即没有serversocket监听了,都unbind了).

                    
// 表明有新连接请求进来.
                    if (selected > 0{
                        
// 处理新连接请求.
                        
// 1.accept返回new NioSocketSession 2.初始化session 3.将其绑定到processor池(SimpleIoProcessorPool)的一个NioProcessor(SimpleIoProcessorPool#getProcessor,取模)
                        
// 4.AbstractPollingIoProcessor#add->将session加入NioProcessor的新创建的session队列并startupProcessor
                        
// 注:startupProcessor方法做了引用判断,即一个NioProcessor只会启动一个Processor任务.(所以对于session的io读写也是单线程的.因为session是已经绑定了一个固定的NioProcessor中)
                        processHandles(selectedHandles());
                    }


                    
// 检查是否调用了unbind.如果unbind则加入取消队列.
                    nHandles -= unregisterHandles();
                    
                    
.
        }


3.NioProcessor持有一个Selector对象.其初始化的时候会open selector.


private class Processor implements Runnable {
        
public void run() {
            
            
int nSessions = 0;
            
// 上一次空闲检查时间
            lastIdleCheckTime = System.currentTimeMillis();

            
// 无限循环.说明proceeeor会始终占用线程池的一个线程.并可以这样说,NioProcessor的数目就是线程池工作线程的数目.
            for (;;) {
                
try {
                    
// 这里select有一个超时,是为了管理空闲session,超时时间是1s
                    long t0 = System.currentTimeMillis();
                    
int selected = select(SELECT_TIMEOUT);
                    
long t1 = System.currentTimeMillis();
                    
long delta = (t1 - t0);

                    
//(处理java6的nio的bug)
                    
// 下面if这段代码的大致意思是说如果select未超时且select未被唤醒且未有读写事件发生的一种情况.
                    
// 1.说明可能select被中断了.->然后检查是否有channel被close了(如果有的话则key.cancel).如果是的话则继续执行select.
                    
// 2.如果检查发现没有channel被close则重新注册一个新的Selector.
                    
//(注意这里的检查是之前NIO的bug.Selector应该只在2种情况有返回值,即有网络事件发生或者超时。但是Selector有时却会在没有获得任何selectionKey的情况返回.)
                    
//(http://bugs.java.com/view_bug.do?bug_id=6693490)(http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933)
                    if ((selected == 0&& !wakeupCalled.get() && (delta < 100)) {
                        
// Last chance : the select() may have been
                        
// interrupted because we have had an closed channel.
                        if (isBrokenConnection()) {
                            LOG.warn("Broken connection");

                            
// we can reselect immediately
                            
// set back the flag to false
                            wakeupCalled.getAndSet(false);

                            
continue;
                        }
 else {
                            LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                            
// Ok, we are hit by the nasty epoll
                            
// spinning.
                            
// Basically, there is a race condition
                            
// which causes a closing file descriptor not to be
                            
// considered as available as a selected channel, but
                            
// it stopped the select. The next time we will
                            
// call select(), it will exit immediately for the same
                            
// reason, and do so forever, consuming 100%
                            
// CPU.
                            
// We have to destroy the selector, and
                            
// register all the socket on a new one.
                            registerNewSelector();
                        }


                        
// Set back the flag to false
                        wakeupCalled.getAndSet(false);

                        
// and continue the loop
                        continue;
                    }


                    
// 处理新session
                    
// 1.初始化NioSession.{@link NioProcessor#init},即将channel配置为非阻塞模式并向selector注册OP_READ
                    
// 2.fireSessionCreated/fireSessionOpened两个事件.(注意这两个区别,如果配置了线程模型ExecutorFilter.则sessionOpened事件在该线程模型内执行.因为其只覆写了该方法,而没有覆写sessionOpened)
                    nSessions += handleNewSessions();

                    updateTrafficMask();

                    
// 处理读写事件(对于已select的session)
                    
// 1.处理读的时候,即AbstractPollingIoProcessor#read,读到的字节>0则触发fireMessageReceived.另外对ReadBufferSize这个参数做了一些判断(buffer会分配该大小).(即如果设置的太大则decrease,设置的太小则increase,根据读到的字节数目.所以说为了避开这个判断,该参数可设置在(readByte,2*readByte]这个区间)
                    
// 2.处理写,将session加入flush队列.
                    if (selected > 0{
                        
//LOG.debug("Processing "); // This log hurts one of the MDCFilter test
                        process();
                    }


                    
// 写未执行的请求
                    
// 1.通过session.write(msg)时,AbstractIoSession#write时->会触发fireFilterWrite事件.该触发链是沿着tail->header的方向触发的.
                    
// 2.HeadFilter#filterWrite,session上有一个WriteRequestQueue.将WriteRequest加入该队列.
                    
// 3.唤醒selector.
                    
//(注意第一次在write的时候,即writeRequestQueue为空的时候,是直接schedule_flush并wakeup selector(所以第一次也 没有必要向selecor注册写事件,第一次肯定是可写的).而后续的写请求则是直接将请求插入队列而已.只有再次写队列为空的时候则会再次schedule_flush并wakeup.另外如果session的写请求未执行完毕则会向selector注册写事件,在可写的时候依然会继续执行写.)
                    long currentTime = System.currentTimeMillis();
                    
// 1.遍历flushingSessions队列. 重置该session schedule flush flag(这个标识表示该session有写的request还未写完).2.flushNow,从writeRequestQueue依次取出写请求.
                    
// 3.maxWrittenBytes = 1.5 * maxReadBufferSize,读写公平(注意这里:flushNow的while循环结束条件是writtenBytes < maxWrittenBytes.即一次flush不会超过最大写字节数.)
                    
// (其实这个处理就是为了读写公平,防止因为写的数据过多而导致read不能得到及时响应.因为都是在一个processor线程处理的.)
                    
// 4.如果session中当前请求的buffer已发送完毕,则触发fireMessageSent事件.
                   
// 5.如果session中请求的数据未全部发送完毕(buffer.hasRemaining),则session重新向selector注册写事件 OP_WRITE.
                    flush(currentTime);

                    
// 注意这里:
                    
// 1.当processor正在执行read的时候,如果客户端端掉了连接,则NioProcessor.read这里就会抛出一个io异常:java.io.IOException: 远程主机强迫关闭了一个现有的连接
                    
// 2.read这段代码在try/catch异常的时候:判断了一下异常如果是ioexception且该异常不是PortUnreachableException或者不是udp相关,则执行scheduleRemove->removingSessions.add(session)
                    
// 而下面这句代码则是处理removeSessions.->removeNow->对被移除的session进行destory处理(close_channel/cancel_key)并清理session的写队列,fireSessionDestroyed->fireSessionClosed
                    nSessions -= removeSessions();

                    
// if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT),即timeout内未有读写事件发生则通知空闲
                    
// 遍历session 判读当前时间与上次事件发生时间的差是否大于空闲时间
                    notifyIdleSessions(currentTime);

                    
.
    }


4.再谈io.
     IO分两个阶段:
   1.通知内核准备数据。2.数据从内核缓冲区拷贝到应用缓冲区

   根据这2点IO类型可以分成:
       1.阻塞IO,在两个阶段上面都是阻塞的。
       2.非阻塞IO,在第1阶段,程序不断的轮询直到数据准备好,第2阶段还是阻塞的
       3.IO复用,在第1阶段,当一个或者多个IO准备就绪时,通知程序,第2阶段还是阻塞的,在第1阶段还是轮询实现的,只是所有的IO都集中在一个地方,这个地方进行轮询
       4.信号IO,当数据准备完毕的时候,信号通知程序数据准备完毕,第2阶段阻塞
       5.异步IO,1,2都不阻塞
      
   当然write是从应用缓冲区到内核缓冲区.
   2.selector底层基础实现就应该是不断的轮训内核缓冲区的状态.
   3.select模型仅仅是轮训,知道有IO事件发生了.但是并不知道是哪些channel.所以只能轮训所有的注册channel,然后依次判断读写;引入epoll->会把哪个channel发生了什么io事件直接通知.

posted on 2014-03-07 17:01 landon 阅读(1771) 评论(2)  编辑  收藏 所属分类: ProgramSources

FeedBack:
# re: apache-mina-2.07源码笔记6-nio细节
2014-03-09 11:34 | 鹏达锁业
给力支持 博主。。。。。。赞一个

  回复  更多评论
  
# re: apache-mina-2.07源码笔记6-nio细节
2015-11-07 16:53 | qwert
楼主大赞,分析的详细多了,比其他的  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: