-
1.源码包结构

2.测试线程
1.启动mina-example下echoserver#Main
2.启动jconsole查看线程:
1.NioSocketAcceptor
Runnable为AbstractPollingIoAcceptor$Acceptor
3.启动com.game.landon.entrance.EchoClient,连接已启动的echoserver. ->此时查看线程->多了一个
2.NioProcessor
Runnable为AbstractPollingIoProcessor$Processor
4.两个线程池来源:
1.AbstractIoService(IoSessionConfig sessionConfig,Executor executor)
该构造函数中判断
if(executor == null){
this.executor = Executors.newCachedThreadPool();
}
详见源码.
2.SimpleIoProcessorPool
其构造函数判断同上
3.内部基本流程
1.SocketAcceptor acceptor = new NioSocketAcceptor();
1.初始化NioSocketAcceptor线程池 {@link AbstractIoService }
2.初始化NioProcessor线程池 {@link SimpleIoProcessorPool }
3. NioSocketAcceptor#init
初始化Selector:selector = Selector.open();
2.acceptor.bind(new InetSocketAddress(PORT));
1.AbstractPollingIoAcceptor#bindInternal
#startupAcceptor
- >NioSocketAcceptor线程池执行Acceptor这个任务.
//1.startupAcceptor启动的时候也会判断是否已存在Acceptor这个任务,如果不存在才会创建.
// 2.Accetpor这个任务结束条件:所有bind的端口unbind->也会将Acceptor引用置为null.
// 3.每个NioSocketAcceptor只有一个selector/且只对应一个Acceptor任务,即只有一个Acceptor线程.所以我们可以说Acceptor就是单线程的.(即便是一个CachedThreadPool)
3.Acceptor#run
1.while(selectable)
1.selector.select
2.#registerHandles
->NioSocketAcceptor#open
设置socket选项并向selector注册OP_ACCEPT事件
3.#processHandles
1.NioSocketAcceptor#accept->返回NioSocketSession
2.SimpleIoProcessorPool#add
1.根据sessionId将session与pool中的某个NioProcessor绑定 {@link SimpleIoProcessorPool#getProcessor}
2.AbstractPollingIoProcessor#add
3. AbstractPollingIoProcessor#startupProcessor
->NioProcessor线程池执行Processor这个任务
// 从这段代码看出:
//1.Processor这个任务只会创建一次.即每一个NioProcessor对象最多拥有一个Processor
// 2.每个NioProcessor只会向线程池提交一次Processor任务.而Processor任务是一个无限循环的任务.
// 也可以说,每个Processor就占据着线程池的一个线程.->即每个NioProcessor对象对应线程池中的一个线程
// 3.而session是与某个固定的NioProcessor绑定的(取模)->也就是说每个session的处理都是单线程的.(均在NioProcessor的唯一Processor线程执行)
// 4.public NioSocketAcceptor(int processorCount)构造中可指定processor的数目,其实最终是指定CacheThreadPool中多少用于prossor的线程数目.
//(默认:private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1)
// 5.每个NioProcessor中只有一个selector.

private void startupProcessor()
{
Processor processor = processorRef.get();


if (processor == null)
{
processor = new Processor();


if (processorRef.compareAndSet(null, processor))
{
executor.execute(new NamePreservingRunnable(processor, threadName));
}
}

// Just stop the select() and start it again, so that the processor
// can be activated immediately.
wakeup();
}

/** *//**
* This private class is used to accept incoming connection from
* clients. It's an infinite loop, which can be stopped when all
* the registered handles have been removed (unbound).
*/
private class Acceptor implements Runnable

{
public void run()

{
int nHandles = 0;
lastIdleCheckTime = System.currentTimeMillis();

// Release the lock
lock.release();

while ( selectable )

{
try

{
int selected = select( SELECT_TIMEOUT );

nHandles += registerHandles();

if ( nHandles == 0 )

{
try

{
lock.acquire();

if ( registerQueue.isEmpty() && cancelQueue.isEmpty() )

{
acceptor = null;
break;
}
}
finally

{
lock.release();
}
}

if ( selected > 0 )

{
processReadySessions( selectedHandles() );
}

long currentTime = System.currentTimeMillis();
flushSessions( currentTime );
nHandles -= unregisterHandles();

notifyIdleSessions( currentTime );
}
catch ( ClosedSelectorException cse )

{
// If the selector has been closed, we can exit the loop
break;
}
catch ( Exception e )

{
ExceptionMonitor.getInstance().exceptionCaught( e );

try

{
Thread.sleep( 1000 );
}
catch ( InterruptedException e1 )

{
}
}
}

if ( selectable && isDisposing() )

{
selectable = false;
try

{
destroy();
}
catch ( Exception e )

{
ExceptionMonitor.getInstance().exceptionCaught( e );
}
finally

{
disposalFuture.setValue( true );
}
}
}
}
2.Processor#run
1.for(;;)
1.select(SELECT_TIMEOUT)
2.#handleNewSessions
1. AbstractPollingIoProcessor#addNow
2. AbstractPollingIoProcessor#init
NioProcessor#init->session.getChannel向selector注册OP_READ事件
3.#updateTrafficmask
4.#process
//process reads/process writes
5.#flush
6.#nofifyIdleSessions

/** *//**
* The main loop. This is the place in charge to poll the Selector, and to
* process the active sessions. It's done in
* - handle the newly created sessions
* -
*/

private class Processor implements Runnable
{

public void run()
{
assert (processorRef.get() == this);

int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();


for (;;)
{

try
{
// This select has a timeout so that we can manage
// idle session when we get out of the select every
// second. (note : this is a hack to avoid creating
// a dedicated thread).
long t0 = System.currentTimeMillis();
int selected = select(SELECT_TIMEOUT);
long t1 = System.currentTimeMillis();
long delta = (t1 - t0);


if ((selected == 0) && !wakeupCalled.get() && (delta < 100))
{
// Last chance : the select() may have been
// interrupted because we have had an closed channel.

if (isBrokenConnection())
{
LOG.warn("Broken connection");

// we can reselect immediately
// set back the flag to false
wakeupCalled.getAndSet(false);

continue;

} else
{
LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
// Ok, we are hit by the nasty epoll
// spinning.
// Basically, there is a race condition
// which causes a closing file descriptor not to be
// considered as available as a selected channel, but
// it stopped the select. The next time we will
// call select(), it will exit immediately for the same
// reason, and do so forever, consuming 100%
// CPU.
// We have to destroy the selector, and
// register all the socket on a new one.
registerNewSelector();
}

// Set back the flag to false
wakeupCalled.getAndSet(false);

// and continue the loop
continue;
}

// Manage newly created session first
nSessions += handleNewSessions();

updateTrafficMask();

// Now, if we have had some incoming or outgoing events,
// deal with them

if (selected > 0)
{
//LOG.debug("Processing
"); // This log hurts one of the MDCFilter test
process();
}

// Write the pending requests
long currentTime = System.currentTimeMillis();
flush(currentTime);

// And manage removed sessions
nSessions -= removeSessions();

// Last, not least, send Idle events to the idle sessions
notifyIdleSessions(currentTime);

// Get a chance to exit the infinite loop if there are no
// more sessions on this Processor

if (nSessions == 0)
{
processorRef.set(null);


if (newSessions.isEmpty() && isSelectorEmpty())
{
// newSessions.add() precedes startupProcessor
assert (processorRef.get() != this);
break;
}

assert (processorRef.get() != this);


if (!processorRef.compareAndSet(null, this))
{
// startupProcessor won race, so must exit processor
assert (processorRef.get() != this);
break;
}

assert (processorRef.get() == this);
}

// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.

if (isDisposing())
{

for (Iterator<S> i = allSessions(); i.hasNext();)
{
scheduleRemove(i.next());
}

wakeup();
}

} catch (ClosedSelectorException cse)
{
// If the selector has been closed, we can exit the loop
break;

} catch (Throwable t)
{
ExceptionMonitor.getInstance().exceptionCaught(t);


try
{
Thread.sleep(1000);

} catch (InterruptedException e1)
{
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}


try
{

synchronized (disposalLock)
{

if (disposing)
{
doDispose();
}
}

} catch (Throwable t)
{
ExceptionMonitor.getInstance().exceptionCaught(t);

} finally
{
disposalFuture.setValue(true);
}
}
}
4.相关类图关系
1.IoService关系类图
2.IoProcessor关系类图
3.IoSession关系类图
5.总结:
本篇只是引入篇,着重介绍了mina2内部的两个acceptor线程池和processor线程池.关于nio相关请看我之前的文章.
http://www.blogjava.net/landon/archive/2013/08/16/402947.html
posted on 2013-11-18 17:24
landon 阅读(2047)
评论(2) 编辑 收藏 所属分类:
Sources